Setup Data Streams Monitoring for Go
The following instrumentation types are available:
Prerequisites
To start with Data Streams Monitoring, you need recent versions of the Datadog Agent and Data Streams Monitoring libraries:
Supported libraries
Technology | Library | Minimal tracer version | Recommended tracer version |
---|
Kafka | confluent-kafka-go | 1.56.1 | 1.66.0 or later |
Kafka | Sarama | 1.56.1 | 1.66.0 or later |
Installation
Automatic Instrumentation
Automatic instrumentation uses Orchestrion to install dd-trace-go and supports both the Sarama and Confluent Kafka libraries.
To automatically instrument your service:
- Follow the Orchestrion Getting Started guide to compile or run your service using Orchestrion.
- Set the
DD_DATA_STREAMS_ENABLED=true
environment variable
Manual instrumentation
Sarama Kafka client
To manually instrument the Sarama Kafka client with Data Streams Monitoring:
- Import the
ddsarama
go library
import (
ddsarama "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama"
)
2. Wrap the producer with `ddsarama.WrapAsyncProducer`
...
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer([]string{bootStrapServers}, config)
// ADD THIS LINE
producer = ddsarama.WrapAsyncProducer(config, producer, ddsarama.WithDataStreams())
Confluent Kafka client
To manually instrument Confluent Kafka with Data Streams Monitoring:
- Import the
ddkafka
go library
import (
ddkafka "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka.v2"
)
- Wrap the producer creation with
ddkafka.NewProducer
and use the ddkafka.WithDataStreams()
configuration
// CREATE PRODUCER WITH THIS WRAPPER
producer, err := ddkafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": bootStrapServers,
}, ddkafka.WithDataStreams())
If a service consumes data from one point and produces to another point, propagate context between the two places using the Go context structure:
- Extract the context from headers
ctx = datastreams.ExtractFromBase64Carrier(ctx, ddsarama.NewConsumerMessageCarrier(message))
- Inject it into the header before producing downstream
datastreams.InjectToBase64Carrier(ctx, ddsarama.NewProducerMessageCarrier(message))
Other queuing technologies or protocols
You can also use manual instrumentation. For example, you can propagate context through Kinesis.
Instrumenting the produce call
- Ensure your message supports the TextMapWriter interface.
- Inject the context into your message and instrument the produce call by calling:
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(ctx, options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, "direction:out", "type:kinesis", "topic:kinesis_arn")
if ok {
datastreams.InjectToBase64Carrier(ctx, message)
}
Instrumenting the consume call
- Ensure your message supports the TextMapReader interface.
- Extract the context from your message and instrument the consume call by calling:
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), message), options.CheckpointParams{PayloadSize: payloadSize}, "direction:in", "type:kinesis", "topic:kinesis_arn")