Setup Data Streams Monitoring for Go

The following instrumentation types are available:

Prerequisites

To start with Data Streams Monitoring, you need recent versions of the Datadog Agent and Data Streams Monitoring libraries:

Supported libraries

TechnologyLibraryMinimal tracer versionRecommended tracer version
Kafkaconfluent-kafka-go1.56.11.66.0 or later
KafkaSarama1.56.11.66.0 or later

Installation

Automatic Instrumentation

Automatic instrumentation uses Orchestrion to install dd-trace-go and supports both the Sarama and Confluent Kafka libraries.

To automatically instrument your service:

  1. Follow the Orchestrion Getting Started guide to compile or run your service using Orchestrion.
  2. Set the DD_DATA_STREAMS_ENABLED=true environment variable

Manual instrumentation

Sarama Kafka client

To manually instrument the Sarama Kafka client with Data Streams Monitoring:

  1. Import the ddsarama go library
import (
  ddsarama "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama" // 1.x
  // ddsarama "github.com/DataDog/dd-trace-go/contrib/Shopify/sarama/v2" // 2.x
)

2. Wrap the producer with `ddsarama.WrapAsyncProducer`

...
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer([]string{bootStrapServers}, config)

// ADD THIS LINE
producer = ddsarama.WrapAsyncProducer(config, producer, ddsarama.WithDataStreams())
Confluent Kafka client

To manually instrument Confluent Kafka with Data Streams Monitoring:

  1. Import the ddkafka go library
import (
  ddkafka "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka.v2" // 1.x
  // ddkafka "github.com/DataDog/dd-trace-go/contrib/confluentinc/confluent-kafka-go/kafka.v2/v2" // 2.x
)
  1. Wrap the producer creation with ddkafka.NewProducer and use the ddkafka.WithDataStreams() configuration
// CREATE PRODUCER WITH THIS WRAPPER
producer, err := ddkafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers": bootStrapServers,
}, ddkafka.WithDataStreams())

If a service consumes data from one point and produces to another point, propagate context between the two places using the Go context structure:

  1. Extract the context from headers
ctx = datastreams.ExtractFromBase64Carrier(ctx, ddsarama.NewConsumerMessageCarrier(message))
  1. Inject it into the header before producing downstream
    datastreams.InjectToBase64Carrier(ctx, ddsarama.NewProducerMessageCarrier(message))
    

Other queuing technologies or protocols

You can also use manual instrumentation. For example, you can propagate context through Kinesis.

Instrumenting the produce call
  1. Ensure your message supports the TextMapWriter interface.
  2. Inject the context into your message and instrument the produce call by calling:
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(ctx, options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, "direction:out", "type:kinesis", "topic:kinesis_arn")
if ok {
  datastreams.InjectToBase64Carrier(ctx, message)
}
Instrumenting the consume call
  1. Ensure your message supports the TextMapReader interface.
  2. Extract the context from your message and instrument the consume call by calling:
	ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), message), options.CheckpointParams{PayloadSize: payloadSize}, "direction:in", "type:kinesis", "topic:kinesis_arn")

Monitoring connectors

Confluent Cloud connectors

Data Streams Monitoring can automatically discover your Confluent Cloud connectors and visualize them within the context of your end-to-end streaming data pipeline.

Setup
  1. Install and configure the Datadog-Confluent Cloud integration.

  2. In Datadog, open the Confluent Cloud integration tile.

    The Confluent Cloud integration tile in Datadog, on the Configure tab. Under an Actions heading, a table titled '13 Resources autodiscovered' containing a list of resources and checkboxes for each resource.

    Under Actions, a list of resources populates with detected clusters and connectors. Datadog attempts to discover new connectors every time you view this integration tile.

  3. Select the resources you want to add.

  4. Click Add Resources.

  5. Navigate to Data Streams Monitoring to visualize the connectors and track connector status and throughput.

Further reading