前提条件

Data Streams Monitoring を開始するには、Datadog Agent と Data Streams Monitoring ライブラリの最新バージョンが必要です。

インストール

2 種類のインスツルメンテーションが用意されています。

  • Kafka ベースのワークロードのためのインスツルメンテーション
  • その他のキューイング技術やプロトコルのためのカスタムインスツルメンテーション

Confluent Kafka クライアント

import (
  ddkafka "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka.v2"
)

...
// このラッパーでプロデューサーを作成します
producer, err := ddkafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": bootStrapServers,
}, ddkafka.WithDataStreams())

サービスがある地点からデータを消費し、別の地点にデータを生成する場合、Go コンテキスト構造を使用して、2つの地点間でコンテキストを伝播します。

  1. ヘッダーからコンテキストを抽出します。

    ctx = datastreams.ExtractFromBase64Carrier(ctx, ddsarama.NewConsumerMessageCarrier(message))
    
  2. 下流にデータを生成する前に、そのコンテキストをヘッダーに注入します。

    datastreams.InjectToBase64Carrier(ctx, ddsarama.NewProducerMessageCarrier(message))
    

Sarama Kafka クライアント

import (
  ddsarama "gopkg.in/DataDog/dd-trace-go.v1/contrib/IBM/sarama.v1"
)

...
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer([]string{bootStrapServers}, config)

// この行を追加します
producer = ddsarama.WrapAsyncProducer(config, producer, ddsarama.WithDataStreams())

手動インスツルメンテーション

また、手動インスツルメンテーションを使用することもできます。例えば、Kinesis を介してコンテキストを伝搬させることができます。

produce 呼び出しのインスツルメンテーション

  1. メッセージが TextMapWriter インターフェースをサポートしていることを確認します。
  2. コンテキストをメッセージに注入し、以下を呼び出して produce 呼び出しをインスツルメントします。
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(ctx, options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, "direction:out", "type:kinesis", "topic:kinesis_arn")
if ok {
  datastreams.InjectToBase64Carrier(ctx, message)
}

consume 呼び出しのインスツルメンテーション

  1. メッセージが TextMapReader インターフェースをサポートしていることを確認します。
  2. メッセージからコンテキストを抽出し、以下を呼び出して consume 呼び出しをインスツルメントします。
    ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), message), options.CheckpointParams{PayloadSize: payloadSize}, "direction:in", "type:kinesis", "topic:kinesis_arn")