Set up Data Streams Monitoring through Manual Instrumentation

이 페이지는 아직 한국어로 제공되지 않으며 번역 작업 중입니다. 번역에 관한 질문이나 의견이 있으시면 언제든지 저희에게 연락해 주십시오.

Data Streams Monitoring (DSM) propagates context through message headers. Use manual instrumentation to set up DSM if you are using:

  • a message queue technology that is not supported by DSM
  • a message queue technology without headers, such as Kinesis, or
  • Lambdas

Manual instrumentation installation

  1. Ensure you’re using the Datadog Agent v7.34.0 or later.

  2. On services sending or consuming messages, declare the supported types. For example:

    kinesis, kafka, rabbitmq, sqs, sns

  3. Call the Data Streams Monitoring checkpoints when messages are produced and when they are consumed, as shown in the example code below:

    import datadog.trace.api.experimental.*;
    
    Carrier headersAdapter = new Carrier(headers);
    
    // before calling produce
    DataStreamsCheckpointer.get().setProduceCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
    
    // after calling consume
    DataStreamsCheckpointer.get().setConsumeCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
    
    // example: logging a kafka consume checkpoint on the 'customer-checkout' topic would look like
    DataStreamsCheckpointer.get().setConsumeCheckpoint("kafka", "customer-checkout", headersAdapter);
    
    // replace headers with whatever you're using to pass the context
    private class Carrier implements DataStreamsContextCarrier {
    	private Headers headers;
    	
    	public Carrier(Headers headers) {
    		this.headers = headers;
    	}
    
    	public Set<Entry<String, Object>> entries() {
    		return this.headers.entrySet();
    	}
    
    	public void set(String key, String value){
    		this.headers.put(key, value);
    	}
    }
    const tracer = require('dd-trace').init({})
    
    // before calling produce
    const headers = {}
    tracer.dataStreamsCheckpointer.setProduceCheckpoint(
    "<datastream-type>", "<queue-name>", headers
    )
    
    // after calling consume
    tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
    "<datastream-type>", "<queue-name>", headers
    )
    from ddtrace.data_streams import set_consume_checkpoint
    from ddtrace.data_streams import set_produce_checkpoint
    
    # before calling produce
    headers = {}
    set_produce_checkpoint(
    "<datastream-type>", "<datastream-name>", headers.setdefault
    )
    
    # after calling consume
    set_consume_checkpoint(
    "<datastream-type>", "<datastream-name>", headers.get
    )

Further Reading

추가 유용한 문서, 링크 및 기사: