以下のサンプルコードのように、メッセージが生成されたときと消費されたときに Data Streams Monitoring のチェックポイントを呼び出します。
importdatadog.trace.api.experimental.*;CarrierheadersAdapter=newCarrier(headers);// データベース PUT を呼び出す前DataStreamsCheckpointer.get().setProduceCheckpoint("<database-type>","<topic-name>",headersAdapter);// データベース GET を呼び出した後DataStreamsCheckpointer.get().setConsumeCheckpoint("<database-type>","<topic-name>",headersAdapter);// ヘッダーを、コンテキストを渡すために使用しているものに置き換えてくださいprivateclassCarrierimplementsDataStreamsContextCarrier{privateHeadersheaders;publicCarrier(Headersheaders){this.headers=headers;}publicSet<Entry<String,Object>>entries(){returnthis.headers.entrySet();}publicvoidset(Stringkey,Stringvalue){this.headers.put(key,value);}}
consttracer=require('dd-trace').init({})// produce を呼び出す前
constheaders={}tracer.dataStreamsCheckpointer.setProduceCheckpoint("<datastream-type>","<queue-name>",headers)// consume を呼び出した後
tracer.dataStreamsCheckpointer.setConsumeCheckpoint("<datastream-type>","<queue-name>",headers)
fromddtrace.data_streamsimportset_consume_checkpointfromddtrace.data_streamsimportset_produce_checkpoint# before calling produceheaders={}set_produce_checkpoint("<datastream-type>","<datastream-name>",headers.setdefault)# after calling consumeset_consume_checkpoint("<datastream-type>","<datastream-name>",headers.get)