Data Streams Monitoring for Go のセットアップ
利用可能ないくつかのインスツルメンテーションの種類は以下のとおりです:
前提条件
Data Streams Monitoring を開始するには、Datadog Agent と Data Streams Monitoring ライブラリの最新バージョンが必要です。
サポートされるライブラリ
IT業界 | ライブラリ | Minimal tracer version | Recommended tracer version |
---|
Kafka | confluent-kafka-go | 1.56.1 | 1.66.0 以降 |
Kafka | Sarama | 1.56.1 | 1.66.0 以降 |
インストール
自動インスツルメンテーション
自動インスツルメンテーションでは Orchestrion を使用して dd-trace-go をインストールし、Sarama と Confluent Kafka の両方のライブラリをサポートします。
サービスを自動インスツルメンテーションするには:
- Orchestrion Getting Started ガイドに従って、Orchestrion を使用してサービスをコンパイルまたは実行します。
DD_DATA_STREAMS_ENABLED=true
という環境変数を設定します。
手動インスツルメンテーション
Sarama Kafka クライアント
Data Streams Monitoring を使用して Sarama Kafka クライアントを手動でインスツルメンテーションするには:
ddsarama
Go ライブラリをインポートします
import (
ddsarama "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama"
)
2. プロデューサーを `ddsarama.WrapAsyncProducer` でラップします
...
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer([]string{bootStrapServers}, config)
// この行を追加してください
producer = ddsarama.WrapAsyncProducer(config, producer, ddsarama.WithDataStreams())
Confluent Kafka クライアント
Data Streams Monitoring を使用して Confluent Kafka を手動でインスツルメンテーションするには:
ddkafka
Go ライブラリをインポートします
import (
ddkafka "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka.v2"
)
ddkafka.NewProducer
でプロデューサーの作成をラップし、ddkafka.WithDataStreams()
構成を使用します。
// このラッパーを使用してプロデューサーを作成します
producer, err := ddkafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": bootStrapServers,
}, ddkafka.WithDataStreams())
サービスがある地点からデータを消費し、別の地点にデータを生成する場合、Go コンテキスト構造を使用して、2つの地点間でコンテキストを伝播します。
- ヘッダーからコンテキストを抽出します。
ctx = datastreams.ExtractFromBase64Carrier(ctx, ddsarama.NewConsumerMessageCarrier(message))
- ダウンストリームへデータを生成する前にヘッダーにコンテキストを注入します。
datastreams.InjectToBase64Carrier(ctx, ddsarama.NewProducerMessageCarrier(message))
その他のキューイング技術やプロトコル
また、手動インスツルメンテーションを使用することもできます。例えば、Kinesis を介してコンテキストを伝搬させることができます。
produce 呼び出しのインスツルメンテーション
- メッセージが TextMapWriter インターフェイスをサポートしていることを確認します。
- コンテキストをメッセージに注入し、以下を呼び出して produce 呼び出しをインスツルメントします。
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(ctx, options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, "direction:out", "type:kinesis", "topic:kinesis_arn")
if ok {
datastreams.InjectToBase64Carrier(ctx, message)
}
consume 呼び出しのインスツルメンテーション
- メッセージが TextMapReader インターフェイスをサポートしていることを確認します。
- メッセージからコンテキストを抽出し、以下を呼び出して consume 呼び出しをインスツルメントします。
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), message), options.CheckpointParams{PayloadSize: payloadSize}, "direction:in", "type:kinesis", "topic:kinesis_arn")