- 필수 기능
- 시작하기
- Glossary
- 표준 속성
- Guides
- Agent
- 통합
- 개방형텔레메트리
- 개발자
- Administrator's Guide
- API
- Datadog Mobile App
- CoScreen
- Cloudcraft
- 앱 내
- 서비스 관리
- 인프라스트럭처
- 애플리케이션 성능
- APM
- Continuous Profiler
- 스팬 시각화
- 데이터 스트림 모니터링
- 데이터 작업 모니터링
- 디지털 경험
- 소프트웨어 제공
- 보안
- AI Observability
- 로그 관리
- 관리
Data Streams Monitoring (DSM) propagates context through message headers. Use manual instrumentation to set up DSM if you are using:
Ensure you’re using the Datadog Agent v7.34.0 or later.
On services sending or consuming messages, declare the supported types. For example:
kinesis, kafka, rabbitmq, sqs, sns
Call the Data Streams Monitoring checkpoints when messages are produced and when they are consumed, as shown in the example code below:
import datadog.trace.api.experimental.*;
Carrier headersAdapter = new Carrier(headers);
// before calling produce
DataStreamsCheckpointer.get().setProduceCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
// after calling consume
DataStreamsCheckpointer.get().setConsumeCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
// example: logging a kafka consume checkpoint on the 'customer-checkout' topic would look like
DataStreamsCheckpointer.get().setConsumeCheckpoint("kafka", "customer-checkout", headersAdapter);
// replace headers with whatever you're using to pass the context
private class Carrier implements DataStreamsContextCarrier {
private Headers headers;
public Carrier(Headers headers) {
this.headers = headers;
}
public Set<Entry<String, Object>> entries() {
return this.headers.entrySet();
}
public void set(String key, String value){
this.headers.put(key, value);
}
}
const tracer = require('dd-trace').init({})
// before calling produce
const headers = {}
tracer.dataStreamsCheckpointer.setProduceCheckpoint(
"<datastream-type>", "<queue-name>", headers
)
// after calling consume
tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
"<datastream-type>", "<queue-name>", headers
)
from ddtrace.data_streams import set_consume_checkpoint
from ddtrace.data_streams import set_produce_checkpoint
# before calling produce
headers = {}
set_produce_checkpoint(
"<datastream-type>", "<datastream-name>", headers.setdefault
)
# after calling consume
set_consume_checkpoint(
"<datastream-type>", "<datastream-name>", headers.get
)
추가 유용한 문서, 링크 및 기사: