Cette page n'est pas encore disponible en français, sa traduction est en cours. Si vous avez des questions ou des retours sur notre projet de traduction actuel, n'hésitez pas à nous contacter.
Data Streams Monitoring (DSM) tracks how data flows through queues and services. If your message system is not automatically supported (for example, your queue technology and language is not instrumented or the library you use in the language isn’t automatically instrumented), you must manually record checkpoints so DSM can connect producers and consumers.
Produce checkpoint: records when a message is published, injects DSM context into the message.
Consume checkpoint: records when a message is received, extracting the DSM context if it exists, and prepares future produce checkpoints to carry that context forward.
The DSM context must travel with the message. If your system supports headers, store it there. Otherwise, embed it directly in the payload.
queueType: message system (for example kafka, rabbitmq, sqs, sns, kinesis, servicebus). Recognized strings surface system-specific DSM metrics; other strings are allowed.
name: queue, topic, or subscription name.
carrier: an implementation of DataStreamsContextCarrier. This is where DSM context is stored with the message (typically a headers map, but could be payload fields if no headers exist).
carrier: an implementation of DataStreamsContextCarrier. This is where DSM context is retrieved from the message.
Note: This checkpoint does two things: it links the current message to the data stream, and it prepares this consumer to automatically pass the context to any messages it produces next.
Examples in context (single block)
importdatadog.trace.api.experimental.*;importjava.util.*;// ==========================// producer-service.java// ==========================publicclassProducerService{privatefinalChannelchannel;// your MQ/Kafka/etc. clientpublicProducerService(Channelchannel){this.channel=channel;}publicvoidpublishOrder(Orderorder){Headersheaders=newHeaders();// your header structureCarrierheadersAdapter=newCarrier(headers);// Mark DSM produce checkpoint right before sending the message.DataStreamsCheckpointer.get().setProduceCheckpoint("kafka",// queueType"orders",// nameheadersAdapter);// Send the message with DSM context attached.Stringpayload=serialize(order);channel.send("orders",payload,headers);}}// ==========================// consumer-service.java// ==========================publicclassConsumerService{publicvoidhandleMessage(Stringpayload,Headersheaders){CarrierheadersAdapter=newCarrier(headers);// Mark DSM consume checkpoint when receiving the message.DataStreamsCheckpointer.get().setConsumeCheckpoint("kafka",// queueType (match producer)"orders",// name (match producer)headersAdapter);// Continue with your application logic.Orderorder=deserialize(payload);processOrder(order);}}// ==========================// carrier implementation// ==========================privateclassCarrierimplementsDataStreamsContextCarrier{privateHeadersheaders;publicCarrier(Headersheaders){this.headers=headers;}@OverridepublicSet<Map.Entry<String,Object>>entries(){returnthis.headers.entrySet();}@Overridepublicvoidset(Stringkey,Stringvalue){this.headers.put(key,value);}}
queueType: same value used by the producer (recognized strings preferred, other strings allowed).
name: same queue, topic, or subscription name.
carrier: readable key/value container to retrieve DSM context from the message (headers object if supported; otherwise the parsed payload).
Note: This checkpoint does two things: it links the current message to the data stream, and it prepares this consumer to automatically pass the context to any messages it produces next.
Examples in context (single block)
// ==========================
// producer-service.js
// ==========================
consttracer=require('dd-trace').init({})// init in the producer service
asyncfunctionpublishOrder(order,channel){// Use headers if supported; otherwise embed context in the payload.
constheaders={}// Mark DSM produce checkpoint right before sending the message.
tracer.dataStreamsCheckpointer.setProduceCheckpoint('rabbitmq',// queueType
'orders',// name
headers// carrier: where DSM context will be stored
)// Send the message with DSM context attached.
constpayload=JSON.stringify(order)publisher.publish('orders',Buffer.from(payload),{headers})}// ==========================
// consumer-service.js
// ==========================
consttracer=require('dd-trace').init({})// init in the consumer service
functionhandleMessage(msg){// Retrieve DSM context at the top of your handler.
// If headers aren't supported, build a carrier that reads from your payload.
constheaders=msg.properties?.headers||{}tracer.dataStreamsCheckpointer.setConsumeCheckpoint('rabbitmq',// queueType (match producer)
'orders',// name (match producer)
headers// carrier: where DSM context was stored
)// Continue with application logic.
constbody=JSON.parse(msg.content.toString())processOrder(body)}
API reference
set_produce_checkpoint(queue_type, name, setter)
queue_type: message system (for example kafka, rabbitmq, sqs, sns, kinesis, servicebus). Recognized strings surface system-specific DSM metrics; other strings are allowed.
name: queue, topic, or subscription name.
setter: a callable (key, value) used to store DSM context in the message.
If headers are supported: use headers.setdefault.
If not: use a function that writes into the message payload (like a JSON field).
set_consume_checkpoint(queue_type, name, getter)
queue_type: same as producer.
name: same as producer.
getter: a callable (key) used to retrieve DSM context from the message.
If headers are supported: use headers.get.
If not: use a function that reads from the payload.
Note: This checkpoint does two things: it links the current message to the data stream, and it prepares this consumer to automatically pass the context to any messages it produces next.
Examples in context (single block)
# ==========================# producer_service.py# ==========================fromddtrace.data_streamsimportset_produce_checkpointdefpublish_order(order,channel):headers={}# Mark DSM produce checkpoint before sending the message.set_produce_checkpoint("rabbitmq",# queue_type"orders",# nameheaders.setdefault# setter: store DSM context in headers or payload)# Send the message with DSM context attached.payload=order.to_json()publisher.publish(topic="orders",body=payload,properties=headers)# ==========================# consumer_service.py# ==========================fromddtrace.data_streamsimportset_consume_checkpointdefhandle_message(message,properties):headers=getattr(properties,"headers",{})# Mark DSM consume checkpoint when receiving the message.set_consume_checkpoint("rabbitmq",# queue_type (match producer)"orders",# name (match producer)headers.get# getter: retrieve DSM context from headers or payload)# Continue with your application logic.process_order(message)
queue_type: the message system (for example rabbitmq, kafka, sqs, sns, kinesis, servicebus). Using a recognized queue_type helps surface metrics related to that system in Data Streams, but other strings are allowed if needed.
name: the queue, topic, or subscription name.
block: yields (key, pathway_context). Your block must store the DSM context with the message, under the given key
queue_type: same message system as the producer. Using a recognized queue_type helps surface metrics related to that system in Data Streams, but other strings are also allowed.
name: same queue, topic, or subscription name.
block: yields (key). Your block must retrieve the DSM context from the message.
Whichever method (header or message body), the message was produced with
Note: This checkpoint does two things: it links the current message to the data stream, and it prepares this consumer to automatically pass the context to any messages it produces next.
Examples in context
# Producer sidedefpublish_order(order)headers={}# Mark DSM produce checkpoint before sending the message.Datadog::DataStreams.set_produce_checkpoint("rabbitmq","orders")do|key,pathway_context|# Store DSM context in the message# - If headers supported: headers[key] = pathway_context# - If no headers: message[key] = pathway_contextheaders[key]=pathway_contextend# Send the message with DSM context attached@publisher.publish(topic:"orders",payload:orders.to_json,headers:headers)end# Consumer sidedefhandle_message(message)# Mark DSM consume checkpoint when receiving the message.Datadog::DataStreams..set_consume_checkpoint("rabbitmq","orders")do|key|# Retrieve DSM context from the message# - If headers supported pull them from there# - If no headers: parsed_message[key]message.headers[key]end# Continue with application logicprocess_order(message.body)end
Further Reading
Documentation, liens et articles supplémentaires utiles: