Setup Data Streams Monitoring for Go

The following instrumentation types are available:

Prerequisites

To start with Data Streams Monitoring, you need recent versions of the Datadog Agent and Data Streams Monitoring libraries:

Supported libraries

TechnologyLibraryMinimal tracer versionRecommended tracer version
Kafkaconfluent-kafka-go1.56.11.66.0 or later
KafkaSarama1.56.11.66.0 or later

Installation

Automatic Instrumentation

Automatic instrumentation uses Orchestrion to install dd-trace-go and supports both the Sarama and Confluent Kafka libraries.

To automatically instrument your service:

  1. Follow the Orchestrion Getting Started guide to compile or run your service using Orchestrion.
  2. Set the DD_DATA_STREAMS_ENABLED=true environment variable

Manual instrumentation

Sarama Kafka client

To manually instrument the Sarama Kafka client with Data Streams Monitoring:

  1. Import the ddsarama go library
import (
  ddsarama "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama"
)

2. Wrap the producer with `ddsarama.WrapAsyncProducer`

...
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer([]string{bootStrapServers}, config)

// ADD THIS LINE
producer = ddsarama.WrapAsyncProducer(config, producer, ddsarama.WithDataStreams())
Confluent Kafka client

To manually instrument Confluent Kafka with Data Streams Monitoring:

  1. Import the ddkafka go library
import (
  ddkafka "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka.v2"
)
  1. Wrap the producer creation with ddkafka.NewProducer and use the ddkafka.WithDataStreams() configuration
// CREATE PRODUCER WITH THIS WRAPPER
producer, err := ddkafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers": bootStrapServers,
}, ddkafka.WithDataStreams())

If a service consumes data from one point and produces to another point, propagate context between the two places using the Go context structure:

  1. Extract the context from headers
ctx = datastreams.ExtractFromBase64Carrier(ctx, ddsarama.NewConsumerMessageCarrier(message))
  1. Inject it into the header before producing downstream
    datastreams.InjectToBase64Carrier(ctx, ddsarama.NewProducerMessageCarrier(message))
    

Other queuing technologies or protocols

You can also use manual instrumentation. For example, you can propagate context through Kinesis.

Instrumenting the produce call
  1. Ensure your message supports the TextMapWriter interface.
  2. Inject the context into your message and instrument the produce call by calling:
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(ctx, options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, "direction:out", "type:kinesis", "topic:kinesis_arn")
if ok {
  datastreams.InjectToBase64Carrier(ctx, message)
}
Instrumenting the consume call
  1. Ensure your message supports the TextMapReader interface.
  2. Extract the context from your message and instrument the consume call by calling:
	ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), message), options.CheckpointParams{PayloadSize: payloadSize}, "direction:in", "type:kinesis", "topic:kinesis_arn")