Setup Data Streams Monitoring for Go

Prerequisites

To start with Data Streams Monitoring, you need recent versions of the Datadog Agent and Data Streams Monitoring libraries:

Installation

Two types of instrumentation are available:

  • Instrumentation for Kafka-based workloads
  • Custom instrumentation for any other queuing technology or protocol

Confluent Kafka client

import (
  ddkafka "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka.v2"
)

...
// CREATE PRODUCER WITH THIS WRAPPER
producer, err := ddkafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers": bootStrapServers,
}, ddkafka.WithDataStreams())

If a service consumes data from one point and produces to another point, propagate context between the two places using the Go context structure:

  1. Extract the context from headers:

    ctx = datastreams.ExtractFromBase64Carrier(ctx, ddsarama.NewConsumerMessageCarrier(message))
    
  2. Inject it into the header before producing downstream:

    datastreams.InjectToBase64Carrier(ctx, ddsarama.NewProducerMessageCarrier(message))
    

Sarama Kafka client

import (
  ddsarama "gopkg.in/DataDog/dd-trace-go.v1/contrib/IBM/sarama.v1"
)

...
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer([]string{bootStrapServers}, config)

// ADD THIS LINE
producer = ddsarama.WrapAsyncProducer(config, producer, ddsarama.WithDataStreams())

Manual instrumentation

You can also use manual instrumentation. For example, you can propagate context through Kinesis.

Instrumenting the produce call

  1. Ensure your message supports the TextMapWriter interface.
  2. Inject the context into your message and instrument the produce call by calling:
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(ctx, options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, "direction:out", "type:kinesis", "topic:kinesis_arn")
if ok {
  datastreams.InjectToBase64Carrier(ctx, message)
}

Instrumenting the consume call

  1. Ensure your message supports the TextMapReader interface.
  2. Extract the context from your message and instrument the consume call by calling:
	ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), message), options.CheckpointParams{PayloadSize: payloadSize}, "direction:in", "type:kinesis", "topic:kinesis_arn")