Set up Data Streams Monitoring through Manual Instrumentation

This page is not yet available in Spanish. We are working on its translation.
If you have any questions or feedback about our current translation project, feel free to reach out to us!

Data Streams Monitoring (DSM) propagates context through message headers. Use manual instrumentation to set up DSM if you are using:

  • a message queue technology that is not supported by DSM
  • a message queue technology without headers, such as Kinesis, or
  • Lambdas

Manual instrumentation installation

  1. Ensure you’re using the Datadog Agent v7.34.0 or later.

  2. On services sending or consuming messages, declare the supported types. For example:

    kinesis, kafka, rabbitmq, sqs, sns

  3. Call the Data Streams Monitoring checkpoints when messages are produced and when they are consumed, as shown in the example code below:

    import datadog.trace.api.experimental.*;
    
    Carrier headersAdapter = new Carrier(headers);
    
    // before calling produce
    DataStreamsCheckpointer.get().setProduceCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
    
    // after calling consume
    DataStreamsCheckpointer.get().setConsumeCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
    
    // example: logging a kafka consume checkpoint on the 'customer-checkout' topic would look like
    DataStreamsCheckpointer.get().setConsumeCheckpoint("kafka", "customer-checkout", headersAdapter);
    
    // replace headers with whatever you're using to pass the context
    private class Carrier implements DataStreamsContextCarrier {
    	private Headers headers;
    	
    	public Carrier(Headers headers) {
    		this.headers = headers;
    	}
    
    	public Set<Entry<String, Object>> entries() {
    		return this.headers.entrySet();
    	}
    
    	public void set(String key, String value){
    		this.headers.put(key, value);
    	}
    }
    const tracer = require('dd-trace').init({})
    
    // before calling produce
    const headers = {}
    tracer.dataStreamsCheckpointer.setProduceCheckpoint(
    "<datastream-type>", "<queue-name>", headers
    )
    
    // after calling consume
    tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
    "<datastream-type>", "<queue-name>", headers
    )
    from ddtrace.data_streams import set_consume_checkpoint
    from ddtrace.data_streams import set_produce_checkpoint
    
    # before calling produce
    headers = {}
    set_produce_checkpoint(
    "<datastream-type>", "<datastream-name>", headers.setdefault
    )
    
    # after calling consume
    set_consume_checkpoint(
    "<datastream-type>", "<datastream-name>", headers.get
    )

Further Reading

Más enlaces, artículos y documentación útiles: