Set up Data Streams Monitoring through Manual Instrumentation

Data Streams Monitoring (DSM) propagates context through message headers. Use manual instrumentation to set up DSM if you are using:

  • a message queue technology that is not supported by DSM
  • a message queue technology without headers, such as Kinesis, or
  • Lambdas

Manual instrumentation installation

  1. Ensure you’re using the Datadog Agent v7.34.0 or later.

  2. On services sending or consuming messages, declare the supported types. For example:

    kinesis, kafka, rabbitmq, sqs, sns

  3. Call the Data Streams Monitoring checkpoints when messages are produced and when they are consumed, as shown in the example code below:

    import datadog.trace.api.experimental.*;
    
    Carrier headersAdapter = new Carrier(headers);
    
    // before calling produce
    DataStreamsCheckpointer.get().setProduceCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
    
    // after calling consume
    DataStreamsCheckpointer.get().setConsumeCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
    
    // example: logging a kafka consume checkpoint on the 'customer-checkout' topic would look like
    DataStreamsCheckpointer.get().setConsumeCheckpoint("kafka", "customer-checkout", headersAdapter);
    
    // replace headers with whatever you're using to pass the context
    private class Carrier implements DataStreamsContextCarrier {
    	private Headers headers;
    	
    	public Carrier(Headers headers) {
    		this.headers = headers;
    	}
    
    	public Set<Entry<String, Object>> entries() {
    		return this.headers.entrySet();
    	}
    
    	public void set(String key, String value){
    		this.headers.put(key, value);
    	}
    }
    const tracer = require('dd-trace').init({})
    
    // before calling produce
    const headers = {}
    tracer.dataStreamsCheckpointer.setProduceCheckpoint(
    "<datastream-type>", "<queue-name>", headers
    )
    
    // after calling consume
    tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
    "<datastream-type>", "<queue-name>", headers
    )
    from ddtrace.data_streams import set_consume_checkpoint
    from ddtrace.data_streams import set_produce_checkpoint
    
    # before calling produce
    headers = {}
    set_produce_checkpoint(
    "<datastream-type>", "<datastream-name>", headers.setdefault
    )
    
    # after calling consume
    set_consume_checkpoint(
    "<datastream-type>", "<datastream-name>", headers.get
    )

Further Reading

Additional helpful documentation, links, and articles: