Set up Data Streams Monitoring through Manual Instrumentation

이 페이지는 아직 영어로 제공되지 않습니다. 번역 작업 중입니다.
현재 번역 프로젝트에 대한 질문이나 피드백이 있으신 경우 언제든지 연락주시기 바랍니다.

Data Streams Monitoring (DSM) propagates context through message headers. Use manual instrumentation to set up DSM if you are using:

  • a message queue technology that is not supported by DSM
  • a message queue technology without headers, such as Kinesis, or
  • Lambdas

Manual instrumentation installation

  1. Ensure you’re using the Datadog Agent v7.34.0 or later.

  2. On services sending or consuming messages, declare the supported types. For example:

    kinesis, kafka, rabbitmq, sqs, sns, servicebus

  3. Call the Data Streams Monitoring checkpoints when messages are produced and when they are consumed, as shown in the example code below:

    import datadog.trace.api.experimental.*;
    
    Carrier headersAdapter = new Carrier(headers);
    
    // before calling produce
    DataStreamsCheckpointer.get().setProduceCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
    
    // after calling consume
    DataStreamsCheckpointer.get().setConsumeCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
    
    // example: logging a kafka consume checkpoint on the 'customer-checkout' topic would look like
    DataStreamsCheckpointer.get().setConsumeCheckpoint("kafka", "customer-checkout", headersAdapter);
    
    // replace headers with whatever you're using to pass the context
    private class Carrier implements DataStreamsContextCarrier {
    	private Headers headers;
    	
    	public Carrier(Headers headers) {
    		this.headers = headers;
    	}
    
    	public Set<Entry<String, Object>> entries() {
    		return this.headers.entrySet();
    	}
    
    	public void set(String key, String value){
    		this.headers.put(key, value);
    	}
    }
    const tracer = require('dd-trace').init({})
    
    // before calling produce
    const headers = {}
    tracer.dataStreamsCheckpointer.setProduceCheckpoint(
    "<datastream-type>", "<queue-name>", headers
    )
    
    // after calling consume
    tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
    "<datastream-type>", "<queue-name>", headers
    )
    from ddtrace.data_streams import set_consume_checkpoint
    from ddtrace.data_streams import set_produce_checkpoint
    
    # before calling produce
    headers = {}
    set_produce_checkpoint(
    "<datastream-type>", "<datastream-name>", headers.setdefault
    )
    
    # after calling consume
    set_consume_checkpoint(
    "<datastream-type>", "<datastream-name>", headers.get
    )

Further Reading

추가 유용한 문서, 링크 및 기사: