Configurar la monitorización de secuencias de datos para Go

Requisitos previos

Para empezar con Data Streams Monitoring, necesitas versiones recientes de las bibliotecas del Datadog Agent y de Data Streams Monitoring:

Instalación

Existen dos tipos de instrumentación:

  • Instrumentación para cargas de trabajo basadas en Kafka
  • Instrumentación personalizada para cualquier otra tecnología o protocolo de puesta en cola

Cliente de Confluent Kafka

importar (
  ddkafka "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka.v2"
)

...
// CREA UN PRODUCTOR CON ESTA ENVOLTURA
productor, error:= ddkafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": bootStrapServers,
}, ddkafka.WithDataStreams())

Si un servicio consume datos de un punto y produce a otro punto, propague el contexto entre los dos lugares utilizando la estructura de contexto de Go:

  1. Extraer el contexto de los encabezados:

    ctx = datastreams.ExtractFromBase64Carrier(ctx, ddsarama.NewConsumerMessageCarrier(message))
    
  2. Insértelo en el encabezado antes de producir aguas abajo:

    datastreams.InjectToBase64Carrier(ctx, ddsarama.NewProducerMessageCarrier(message))
    

Cliente de Sarama Kafka

importar (
  ddsarama "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama"
)

...
configurar:= sarama.NewConfig()
productor, error:= sarama.NewAsyncProducer([]string{bootStrapServers}, config)

// AÑADE ESTA LÍNEA
productor = ddsarama.WrapAsyncProducer(config, producer, ddsarama.WithDataStreams())

Instrumentación manual

También puedes utilizar la instrumentación manual. Por ejemplo, puedes propagar el contexto a través de Kinesis.

Instrumentación de la llamada a producción

  1. Asegúrate de que tu mensaje sea compatible con la interfaz TextMapWriter.
  2. Inserta el contexto en tu mensaje e instrumenta la llamada a producción llamando a:
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(ctx, options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, "direction:out", "type:kinesis", "topic:kinesis_arn")
si ok {
  datastreams.InjectToBase64Carrier(ctx, message)
}

Instrumentación de la llamada al consumo

  1. Asegúrate de que tu mensaje sea compatible con la interfaz TextMapReader.
  2. Extrae el contexto de tu mensaje e instrumenta la llamada a consumir llamando a:
    ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), message), options.CheckpointParams{PayloadSize: payloadSize}, "direction:in", "type:kinesis", "topic:kinesis_arn")