利用可能ないくつかのインスツルメンテーションの種類は以下のとおりです:

前提条件

Data Streams Monitoring を開始するには、Datadog Agent と Data Streams Monitoring ライブラリの最新バージョンが必要です。

サポートされるライブラリ

IT業界ライブラリMinimal tracer versionRecommended tracer version
Kafkaconfluent-kafka-go1.56.11.66.0 以降
KafkaSarama1.56.11.66.0 以降

インストール

自動インスツルメンテーション

自動インスツルメンテーションでは Orchestrion を使用して dd-trace-go をインストールし、Sarama と Confluent Kafka の両方のライブラリをサポートします。

サービスを自動インスツルメンテーションするには:

  1. Orchestrion Getting Started ガイドに従って、Orchestrion を使用してサービスをコンパイルまたは実行します。
  2. DD_DATA_STREAMS_ENABLED=true という環境変数を設定します。

手動インスツルメンテーション

Sarama Kafka クライアント

Data Streams Monitoring を使用して Sarama Kafka クライアントを手動でインスツルメンテーションするには:

  1. ddsarama Go ライブラリをインポートします
import (
ddsarama "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama"
)

2. プロデューサーを `ddsarama.WrapAsyncProducer` でラップします

...
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer([]string{bootStrapServers}, config)

// この行を追加してください
producer = ddsarama.WrapAsyncProducer(config, producer, ddsarama.WithDataStreams())
Confluent Kafka クライアント

Data Streams Monitoring を使用して Confluent Kafka を手動でインスツルメンテーションするには:

  1. ddkafka Go ライブラリをインポートします
import (
  ddkafka "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka.v2"
)
  1. ddkafka.NewProducer でプロデューサーの作成をラップし、ddkafka.WithDataStreams() 構成を使用します。
// このラッパーを使用してプロデューサーを作成します
producer, err := ddkafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": bootStrapServers,
}, ddkafka.WithDataStreams())

サービスがある地点からデータを消費し、別の地点にデータを生成する場合、Go コンテキスト構造を使用して、2つの地点間でコンテキストを伝播します。

  1. ヘッダーからコンテキストを抽出します。
ctx = datastreams.ExtractFromBase64Carrier(ctx, ddsarama.NewConsumerMessageCarrier(message))
  1. ダウンストリームへデータを生成する前にヘッダーにコンテキストを注入します。
    datastreams.InjectToBase64Carrier(ctx, ddsarama.NewProducerMessageCarrier(message))
    

その他のキューイング技術やプロトコル

また、手動インスツルメンテーションを使用することもできます。例えば、Kinesis を介してコンテキストを伝搬させることができます。

produce 呼び出しのインスツルメンテーション
  1. メッセージが TextMapWriter インターフェイスをサポートしていることを確認します。
  2. コンテキストをメッセージに注入し、以下を呼び出して produce 呼び出しをインスツルメントします。
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(ctx, options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, "direction:out", "type:kinesis", "topic:kinesis_arn")
if ok {
  datastreams.InjectToBase64Carrier(ctx, message)
}
consume 呼び出しのインスツルメンテーション
  1. メッセージが TextMapReader インターフェイスをサポートしていることを確認します。
  2. メッセージからコンテキストを抽出し、以下を呼び出して consume 呼び出しをインスツルメントします。
    ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), message), options.CheckpointParams{PayloadSize: payloadSize}, "direction:in", "type:kinesis", "topic:kinesis_arn")