---
title: Setup Data Streams Monitoring for Go
description: Datadog, the leading service for cloud-scale monitoring.
breadcrumbs: >-
  Docs > Data Streams Monitoring > Setup Data Streams Monitoring > Setup Data
  Streams Monitoring for Go
---

# Setup Data Streams Monitoring for Go

{% callout %}
# Important note for users on the following Datadog sites: app.ddog-gov.com

{% alert level="danger" %}
This product is not supported for your selected [Datadog site](https://docs.datadoghq.com/getting_started/site.md). ().
{% /alert %}

{% /callout %}

The following instrumentation types are available:

- Automatic instrumentation for Kafka-based workloads
- Manual Instrumentation for Kafka-based workloads
- Manual instrumentation for other queuing technology or protocol

### Prerequisites{% #prerequisites %}

To start with Data Streams Monitoring, you need recent versions of the Datadog Agent and Data Streams Monitoring libraries.

- [Datadog Agent v7.34.0 or later](https://docs.datadoghq.com/agent.md)
- [dd-trace-go v1.56.1 or later](https://github.com/DataDog/dd-trace-go)

**Note**: This documentation uses v2 of the Go tracer, which Datadog recommends for all users. If you are using v1, see the [migration guide](https://docs.datadoghq.com/tracing/trace_collection/custom_instrumentation/go/migration.md) to upgrade to v2.

Data Streams Monitoring has not been changed between v1 and v2 of the tracer.

### Supported libraries{% #supported-libraries %}

| Technology | Library                                                                  | Minimal tracer version | Recommended tracer version |
| ---------- | ------------------------------------------------------------------------ | ---------------------- | -------------------------- |
| Kafka      | [confluent-kafka-go](https://github.com/confluentinc/confluent-kafka-go) | 1.56.1                 | 1.66.0 or later            |
| Kafka      | [Sarama](https://github.com/IBM/sarama)                                  | 1.56.1                 | 1.66.0 or later            |
| Kafka      | [kafka-go](https://github.com/segmentio/kafka-go)                        | 1.63.0                 | 1.63.0 or later            |

### Installation{% #installation %}

### Monitoring Kafka Pipelines{% #monitoring-kafka-pipelines %}

Data Streams Monitoring uses message headers to propagate context through Kafka streams. If `log.message.format.version` is set in the Kafka broker configuration, it must be set to `0.11.0.0` or higher. Data Streams Monitoring is not supported for versions lower than this.

### Monitoring RabbitMQ pipelines{% #monitoring-rabbitmq-pipelines %}

The [RabbitMQ integration](https://docs.datadoghq.com/tracing/trace_collection/custom_instrumentation/go/migration.md) can provide detailed monitoring and metrics of your RabbitMQ deployments. For full compatibility with Data Streams Monitoring, Datadog recommends configuring the integration as follows:

```yaml
instances:
  - prometheus_plugin:
      url: http://<HOST>:15692
      unaggregated_endpoint: detailed?family=queue_coarse_metrics&family=queue_consumer_count&family=channel_exchange_metrics&family=channel_queue_exchange_metrics&family=node_coarse_metrics
```

This ensures that all RabbitMQ graphs populate, and that you see detailed metrics for individual exchanges as well as queues.

#### Automatic Instrumentation{% #automatic-instrumentation %}

Automatic instrumentation uses [Orchestrion](https://datadoghq.dev/orchestrion/) to install dd-trace-go and supports both the Sarama and Confluent Kafka libraries.

To automatically instrument your service:

1. Follow the [Orchestrion Getting Started](https://datadoghq.dev/orchestrion/docs/getting-started/) guide to compile or run your service using [Orchestrion](https://datadoghq.dev/orchestrion/).
1. Set the `DD_DATA_STREAMS_ENABLED=true` environment variable

#### Manual instrumentation{% #manual-instrumentation %}

##### Sarama Kafka client{% #sarama-kafka-client %}

To manually instrument the Sarama Kafka client with Data Streams Monitoring:

1. Import the `ddsarama` go library

```go
import (
  ddsarama "github.com/DataDog/dd-trace-go/contrib/IBM/sarama/v2"
)

2. Wrap the producer with `ddsarama.WrapAsyncProducer`

...
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer([]string{bootStrapServers}, config)

// ADD THIS LINE
producer = ddsarama.WrapAsyncProducer(config, producer, ddsarama.WithDataStreams())
```

##### Confluent Kafka client{% #confluent-kafka-client %}

To manually instrument Confluent Kafka with Data Streams Monitoring:

1. Import the `ddkafka` go library

```go
import (
  ddkafka "github.com/DataDog/dd-trace-go/contrib/confluentinc/confluent-kafka-go/kafka.v2/v2"
)
```
Wrap the producer creation with `ddkafka.NewProducer` and use the `ddkafka.WithDataStreams()` configuration
```go
// CREATE PRODUCER WITH THIS WRAPPER
producer, err := ddkafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers": bootStrapServers,
}, ddkafka.WithDataStreams())
```

If a service consumes data from one point and produces to another point, propagate context between the two places using the Go context structure:

Extract the context from headers

```go
ctx = datastreams.ExtractFromBase64Carrier(ctx, ddsarama.NewConsumerMessageCarrier(message))
```

Inject it into the header before producing downstream

```go
datastreams.InjectToBase64Carrier(ctx, ddsarama.NewProducerMessageCarrier(message))
```

#### Other queuing technologies or protocols{% #other-queuing-technologies-or-protocols %}

You can also use manual instrumentation. For example, you can propagate context through Kinesis.

##### Instrumenting the produce call{% #instrumenting-the-produce-call %}

1. Ensure your message supports the [TextMapWriter interface](https://github.com/DataDog/dd-trace-go/blob/main/datastreams/propagation.go#L37).
1. Inject the context into your message and instrument the produce call by calling:

```go
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(ctx, options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, "direction:out", "type:kinesis", "topic:kinesis_arn")
if ok {
  datastreams.InjectToBase64Carrier(ctx, message)
}
```

##### Instrumenting the consume call{% #instrumenting-the-consume-call %}

1. Ensure your message supports the [TextMapReader interface](https://github.com/DataDog/dd-trace-go/blob/main/datastreams/propagation.go#L44).
1. Extract the context from your message and instrument the consume call by calling:

```go
	ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), message), options.CheckpointParams{PayloadSize: payloadSize}, "direction:in", "type:kinesis", "topic:kinesis_arn")
```

### Monitoring connectors{% #monitoring-connectors %}

#### Confluent Cloud connectors{% #confluent-cloud-connectors %}

Data Streams Monitoring can automatically discover your [Confluent Cloud](https://docs.datadoghq.com/tracing/trace_collection/custom_instrumentation/go/migration.md) connectors and visualize them within the context of your end-to-end streaming data pipeline.

##### Setup{% #setup %}

1. Install and configure the [Datadog-Confluent Cloud integration](https://app.datadoghq.com/integrations/confluent-cloud).

1. In Datadog, open the [Confluent Cloud integration tile](https://app.datadoghq.com/integrations/confluent-cloud).

Under **Actions**, a list of resources populates with detected clusters and connectors. Datadog attempts to discover new connectors every time you view this integration tile.

1. Select the resources you want to add.

1. Click **Add Resources**.

1. Navigate to [Data Streams Monitoring](https://app.datadoghq.com/data-streams/) to visualize the connectors and track connector status and throughput.

## Further reading{% #further-reading %}

- [Autodiscover Confluent Cloud connectors and easily monitor performance in Data Streams Monitoring](https://www.datadoghq.com/blog/confluent-connector-dsm-autodiscovery/)
