---
title: Setup Data Streams Monitoring for Go
description: Datadog, the leading service for cloud-scale monitoring.
breadcrumbs: >-
  Docs > Data Streams Monitoring > Setup Data Streams Monitoring > Setup Data
  Streams Monitoring for Go
---

# Setup Data Streams Monitoring for Go

{% callout %}
# Important note for users on the following Datadog sites: app.ddog-gov.com, us2.ddog-gov.com

{% alert level="danger" %}
This product is not supported for your selected [Datadog site](https://docs.datadoghq.com/getting_started/site.md). ({% placeholder "user-datadog-site-name" /%}).
{% /alert %}

{% /callout %}

The following instrumentation types are available:

- Automatic instrumentation for Kafka-based workloads
- Manual Instrumentation for Kafka-based workloads
- Manual instrumentation for other queuing technology or protocol

### Prerequisites{% #prerequisites %}

To start with Data Streams Monitoring, you need recent versions of the Datadog Agent and Data Streams Monitoring libraries.

- [Datadog Agent v7.34.0 or later](https://docs.datadoghq.com/agent.md)
- [dd-trace-go v1.56.1 or later](https://github.com/DataDog/dd-trace-go)

**Note**: This documentation uses v2 of the Go tracer, which Datadog recommends for all users. If you are using v1, see the [migration guide](https://docs.datadoghq.com/tracing/trace_collection/custom_instrumentation/go/migration.md) to upgrade to v2.

Data Streams Monitoring has not been changed between v1 and v2 of the SDK.

### Supported libraries{% #supported-libraries %}

| Technology | Library                                                                  | Minimal tracer version | Recommended tracer version |
| ---------- | ------------------------------------------------------------------------ | ---------------------- | -------------------------- |
| Kafka      | [confluent-kafka-go](https://github.com/confluentinc/confluent-kafka-go) | 1.56.1                 | 1.66.0 or later            |
| Kafka      | [Sarama](https://github.com/IBM/sarama)                                  | 1.56.1                 | 1.66.0 or later            |
| Kafka      | [kafka-go](https://github.com/segmentio/kafka-go)                        | 1.63.0                 | 1.63.0 or later            |

### Installation{% #installation %}

### Monitoring Kafka Pipelines{% #monitoring-kafka-pipelines %}

Data Streams Monitoring uses message headers to propagate context through Kafka streams. If `log.message.format.version` is set in the Kafka broker configuration, it must be set to `0.11.0.0` or higher. Data Streams Monitoring is not supported for versions lower than this.

### Monitoring RabbitMQ pipelines{% #monitoring-rabbitmq-pipelines %}

The [RabbitMQ integration](https://docs.datadoghq.com/tracing/trace_collection/custom_instrumentation/go/migration.md) can provide detailed monitoring and metrics of your RabbitMQ deployments. For full compatibility with Data Streams Monitoring, Datadog recommends configuring the integration as follows:

```yaml
instances:
  - prometheus_plugin:
      url: http://<HOST>:15692
      unaggregated_endpoint: detailed?family=queue_coarse_metrics&family=queue_consumer_count&family=channel_exchange_metrics&family=channel_queue_exchange_metrics&family=node_coarse_metrics
```

This ensures that all RabbitMQ graphs populate, and that you see detailed metrics for individual exchanges as well as queues.

#### Automatic Instrumentation{% #automatic-instrumentation %}

Automatic instrumentation uses [Orchestrion](https://datadoghq.dev/orchestrion/) to install dd-trace-go and supports the Sarama, Confluent Kafka, and kafka-go libraries.

To automatically instrument your service:

1. Follow the [Orchestrion Getting Started](https://datadoghq.dev/orchestrion/docs/getting-started/) guide to compile or run your service using [Orchestrion](https://datadoghq.dev/orchestrion/).
1. Set the `DD_DATA_STREAMS_ENABLED=true` environment variable

#### Manual instrumentation{% #manual-instrumentation %}

##### Sarama Kafka client{% #sarama-kafka-client %}

To manually instrument the Sarama Kafka client with Data Streams Monitoring:

1. Import the `ddsarama` go library

```go
import (
  ddsarama "github.com/DataDog/dd-trace-go/contrib/IBM/sarama/v2"
)

2. Wrap the producer with `ddsarama.WrapAsyncProducer`

...
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer([]string{bootStrapServers}, config)

// ADD THIS LINE
producer = ddsarama.WrapAsyncProducer(config, producer, ddsarama.WithDataStreams())
```

##### Confluent Kafka client{% #confluent-kafka-client %}

To manually instrument Confluent Kafka with Data Streams Monitoring:

1. Import the `ddkafka` go library

```go
import (
  ddkafka "github.com/DataDog/dd-trace-go/contrib/confluentinc/confluent-kafka-go/kafka.v2/v2"
)
```
Wrap the producer creation with `ddkafka.NewProducer` and use the `ddkafka.WithDataStreams()` configuration
```go
// CREATE PRODUCER WITH THIS WRAPPER
producer, err := ddkafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers": bootStrapServers,
}, ddkafka.WithDataStreams())
```

If a service consumes data from one point and produces to another point, propagate context between the two places using the Go context structure. The `ctx` returned by `ExtractFromBase64Carrier` carries the upstream DSM pathway. Pass it to `SetDataStreamsCheckpointWithParams` when you produce, then inject it into the outbound message with `InjectToBase64Carrier`. Passing `context.Background()` at the produce site creates a new pathway root and breaks end-to-end visibility. This happens, for example, in a worker goroutine that has lost the consume `ctx`.

Extract the context from headers

```go
ctx = datastreams.ExtractFromBase64Carrier(ctx, ddsarama.NewConsumerMessageCarrier(message))
```

Inject it into the header before producing downstream

```go
datastreams.InjectToBase64Carrier(ctx, ddsarama.NewProducerMessageCarrier(message))
```

##### Goroutines and channels{% #goroutines-and-channels %}

Go channels and goroutines do not carry `context.Context` automatically. If your service fans consumed messages out to worker goroutines before producing, pass the consume `ctx` to the produce site by including it in the work item you send over the channel:

```go
type job struct {
    ctx     context.Context
    payload []byte
}

// consume side
ctx, _ = tracer.SetDataStreamsCheckpointWithParams(
    datastreams.ExtractFromBase64Carrier(context.Background(), ddsarama.NewConsumerMessageCarrier(msg)),
    options.CheckpointParams{PayloadSize: int64(len(msg.Value))},
    "direction:in", "type:kafka", "topic:"+inTopic, "group:"+group,
)
// context.WithoutCancel preserves the pathway if the handler's ctx is canceled before the worker runs (Go 1.21+)
jobs <- job{ctx: context.WithoutCancel(ctx), payload: msg.Value}

// worker goroutine
for j := range jobs {
    out := &sarama.ProducerMessage{Topic: outTopic, Value: sarama.ByteEncoder(j.payload)}
    ctx, ok := tracer.SetDataStreamsCheckpointWithParams(j.ctx,
        options.CheckpointParams{PayloadSize: int64(out.Value.Length())},
        "direction:out", "type:kafka", "topic:"+outTopic)
    if ok {
        datastreams.InjectToBase64Carrier(ctx, ddsarama.NewProducerMessageCarrier(out))
    }
    producer.SendMessage(out)
}
```

- **Fan-out**: When one consumed message fans out to multiple produce calls, pass the same consume `ctx` to each produce checkpoint. Each call creates its own child node in the pathway.
- **Fan-in**: When many consumed messages merge into one produce call, combine the inbound contexts with `datastreams.MergeContexts(ctxs...)` before producing.

#### Other queuing technologies or protocols{% #other-queuing-technologies-or-protocols %}

You can also use manual instrumentation. For example, you can propagate context through Kinesis.

##### Instrumenting the produce call{% #instrumenting-the-produce-call %}

1. Ensure your message supports the [TextMapWriter interface](https://github.com/DataDog/dd-trace-go/blob/main/datastreams/propagation.go#L37).
1. Inject the context into your message and instrument the produce call by calling:

```go
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(ctx, options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, "direction:out", "type:kinesis", "topic:kinesis_arn")
if ok {
  datastreams.InjectToBase64Carrier(ctx, message)
}
```

##### Instrumenting the consume call{% #instrumenting-the-consume-call %}

1. Ensure your message supports the [TextMapReader interface](https://github.com/DataDog/dd-trace-go/blob/main/datastreams/propagation.go#L44).
1. Extract the context from your message and instrument the consume call by calling:

```go
	ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), message), options.CheckpointParams{PayloadSize: payloadSize}, "direction:in", "type:kinesis", "topic:kinesis_arn")
```

### Monitoring connectors{% #monitoring-connectors %}

#### Confluent Cloud connectors{% #confluent-cloud-connectors %}

Data Streams Monitoring can automatically discover your [Confluent Cloud](https://docs.datadoghq.com/tracing/trace_collection/custom_instrumentation/go/migration.md) connectors and visualize them within the context of your end-to-end streaming data pipeline.

##### Setup{% #setup %}

1. Install and configure the [Datadog-Confluent Cloud integration](https://app.datadoghq.com/integrations/confluent-cloud).

1. In Datadog, open the [Confluent Cloud integration tile](https://app.datadoghq.com/integrations/confluent-cloud).

Under **Actions**, a list of resources populates with detected clusters and connectors. Datadog attempts to discover new connectors every time you view this integration tile.

1. Select the resources you want to add.

1. Click **Add Resources**.

1. Navigate to [Data Streams Monitoring](https://app.datadoghq.com/data-streams/) to visualize the connectors and track connector status and throughput.

## Further reading{% #further-reading %}

- [Autodiscover Confluent Cloud connectors and easily monitor performance in Data Streams Monitoring](https://www.datadoghq.com/blog/confluent-connector-dsm-autodiscovery/)
