This page is not yet available in Spanish. We are working on its translation. If you have any questions or feedback about our current translation project, feel free to reach out to us!
Data Streams Monitoring (DSM) propagates context through message headers. Use manual instrumentation to set up DSM if you are using:
a message queue technology that is not supported by DSM
a message queue technology without headers, such as Kinesis, or
On services sending or consuming messages, declare the supported types. For example:
kinesis, kafka, rabbitmq, sqs, sns
Call the Data Streams Monitoring checkpoints when messages are produced and when they are consumed, as shown in the example code below:
importdatadog.trace.api.experimental.*;CarrierheadersAdapter=newCarrier(headers);// before calling produceDataStreamsCheckpointer.get().setProduceCheckpoint("<datastream-type>","<queue-or-topic-name>",headersAdapter);// after calling consumeDataStreamsCheckpointer.get().setConsumeCheckpoint("<datastream-type>","<queue-or-topic-name>",headersAdapter);// example: logging a kafka consume checkpoint on the 'customer-checkout' topic would look likeDataStreamsCheckpointer.get().setConsumeCheckpoint("kafka","customer-checkout",headersAdapter);// replace headers with whatever you're using to pass the contextprivateclassCarrierimplementsDataStreamsContextCarrier{privateHeadersheaders;publicCarrier(Headersheaders){this.headers=headers;}publicSet<Entry<String,Object>>entries(){returnthis.headers.entrySet();}publicvoidset(Stringkey,Stringvalue){this.headers.put(key,value);}}
consttracer=require('dd-trace').init({})// before calling produce
constheaders={}tracer.dataStreamsCheckpointer.setProduceCheckpoint("<datastream-type>","<queue-name>",headers)// after calling consume
tracer.dataStreamsCheckpointer.setConsumeCheckpoint("<datastream-type>","<queue-name>",headers)
fromddtrace.data_streamsimportset_consume_checkpointfromddtrace.data_streamsimportset_produce_checkpoint# before calling produceheaders={}set_produce_checkpoint("<datastream-type>","<datastream-name>",headers.setdefault)# after calling consumeset_consume_checkpoint("<datastream-type>","<datastream-name>",headers.get)