Supported OS Linux Windows Mac OS

インテグレーションバージョン2.15.0

Kafka ダッシュボード

概要

Kafka を Datadog に接続して、以下のことができます。

  • クラスターのパフォーマンスをリアルタイムに可視化できます。
  • Kafka のパフォーマンスを他のアプリケーションと関連付けることができます。

このチェックでは、インスタンスごとに 350 メトリクスの制限があります。返されたメトリクスの数は、Agent のステータス出力に表示されます。以下の構成を編集して、関心のあるメトリクスを指定します。収集するメトリクスのカスタマイズの詳細については、JMX チェックのドキュメントを参照してください。

Kafka コンシューマーメトリクスを収集する方法については、kafka_consumer チェックを参照してください。

ストリーミングデータパイプラインのトポロジーを視覚化したり、データストリームセットアップ内の局所的なボトルネックを調査したりすることが有益な場合は、Data Streams Monitoring をご覧ください。

: このインテグレーションにアタッチされたサンプル構成は、Kafka >= 0.8.2 に対してのみ機能します。 それ以前のバージョンをお使いの場合は、Agent v5.2.x リリース版サンプルファイルをご覧ください。

計画と使用

インフラストラクチャーリスト

Agent の Kafka チェックは Datadog Agent パッケージに含まれています。Kafka ノードに追加でインストールする必要はありません。

チェックは、JMXFetch を使用して JMX からメトリクスを収集します。Agent が JMXFetch を実行できるように、各 kafka ノードで JVM が必要です。Kafka が使用しているのと同じ JVM を使用することができます。

: Kafka チェックは Managed Streaming for Apache Kafka (Amazon MSK) と共に使用することはできません。代わりに Amazon MSK インテグレーションを使用してください。

ブラウザトラブルシューティング

メトリクスベース SLO

ホストで実行中の Agent に対してこのチェックを構成するには

メトリクスの収集
  1. Agent のコンフィギュレーションディレクトリのルートにある conf.d/ フォルダーの kafka.d/conf.yaml ファイルを編集します。Kafka Bean 名は、実行している Kafka のバージョンに依存します。Agent と一緒にパッケージ化されているサンプルコンフィギュレーションファイルは最新の構成なので、これをベースとして使用してください。: サンプル内の Agent バージョンは、インストールされている Agent のバージョンより新しいバージョンである場合があります。

  2. Agent を再起動します

収集データ

Agent バージョン 6.0 以降で利用可能

  1. Kafka はデフォルトで log4j ロガーを使用します。ファイルへのログ記録をアクティブにし、フォーマットをカスタマイズするには、log4j.properties ファイルを編集します。

      # Set root logger level to INFO and its only appender to R
      log4j.rootLogger=INFO, R
      log4j.appender.R.File=/var/log/kafka/server.log
      log4j.appender.R.layout=org.apache.log4j.PatternLayout
      log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
    
  2. Datadog のインテグレーションパイプラインは、デフォルトで、次の変換パターンをサポートします。

      %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
      %d [%t] %-5p %c - %m%n
      %r [%t] %p %c %x - %m%n
      [%d] %p %m (%c)%n
    

    フォーマットが異なる場合は、インテグレーションパイプラインを複製して編集してください。

  3. Datadog Agent で、ログの収集はデフォルトで無効になっています。以下のように、datadog.yaml ファイルでこれを有効にします。

    logs_enabled: true
    
  4. 次のコンフィギュレーションブロックを kafka.d/conf.yaml ファイルに追加します。環境に基づいて、path パラメーターと service パラメーターの値を変更してください。使用可能なすべてのコンフィギュレーションオプションの詳細については、サンプル kafka.d/conf.yaml を参照してください。

    logs:
      - type: file
        path: /var/log/kafka/server.log
        source: kafka
        service: myapp
        #To handle multi line that starts with yyyy-mm-dd use the following pattern
        #log_processing_rules:
        #  - type: multi_line
        #    name: log_start_with_date
        #    pattern: \d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])
    
  5. Agent を再起動します

コンテナ化

メトリクスの収集

コンテナ環境の場合は、JMX を使用したオートディスカバリーのガイドを参照してください。

収集データ

Agent バージョン 6.0 以降で利用可能

Datadog Agent で、ログの収集はデフォルトで無効になっています。有効にする方法については、Kubernetes ログ収集を参照してください。

パラメーター
<LOG_CONFIG>{"source": "kafka", "service": "<サービス名>"}

検証

Agent の status サブコマンドを実行し、JMXFetch セクションの kafka を探します。

========
JMXFetch
========
  Initialized checks
  ==================
    kafka
      instance_name : kafka-localhost-9999
      message :
      metric_count : 46
      service_check_count : 0
      status : OK

リアルユーザーモニタリング

データセキュリティ

kafka.consumer.bytes_consumed
(gauge)
The average number of bytes consumed per second for a specific topic.
Shown as byte
kafka.consumer.bytes_in
(gauge)
Consumer bytes in rate.
Shown as byte
kafka.consumer.delayed_requests
(gauge)
Number of delayed consumer requests.
Shown as request
kafka.consumer.expires_per_second
(gauge)
Rate of delayed consumer request expiration.
Shown as eviction
kafka.consumer.fetch_rate
(gauge)
The minimum rate at which the consumer sends fetch requests to a broker.
Shown as request
kafka.consumer.fetch_size_avg
(gauge)
The average number of bytes fetched per request for a specific topic.
Shown as byte
kafka.consumer.fetch_size_max
(gauge)
The maximum number of bytes fetched per request for a specific topic.
Shown as byte
kafka.consumer.kafka_commits
(gauge)
Rate of offset commits to Kafka.
Shown as write
kafka.consumer.max_lag
(gauge)
Maximum consumer lag.
Shown as offset
kafka.consumer.messages_in
(gauge)
Rate of consumer message consumption.
Shown as message
kafka.consumer.records_consumed
(gauge)
The average number of records consumed per second for a specific topic.
Shown as record
kafka.consumer.records_per_request_avg
(gauge)
The average number of records in each request for a specific topic.
Shown as record
kafka.consumer.zookeeper_commits
(gauge)
Rate of offset commits to ZooKeeper.
Shown as write
kafka.expires_sec
(gauge)
Rate of delayed producer request expiration.
Shown as eviction
kafka.follower.expires_per_second
(gauge)
Rate of request expiration on followers.
Shown as eviction
kafka.log.flush_rate.rate
(gauge)
Log flush rate.
Shown as flush
kafka.messages_in.rate
(gauge)
Incoming message rate.
Shown as message
kafka.net.bytes_in.rate
(gauge)
Incoming byte rate.
Shown as byte
kafka.net.bytes_out
(gauge)
Outgoing byte total.
Shown as byte
kafka.net.bytes_out.rate
(gauge)
Outgoing byte rate.
Shown as byte
kafka.net.bytes_rejected.rate
(gauge)
Rejected byte rate.
Shown as byte
kafka.net.processor.avg.idle.pct.rate
(gauge)
Average fraction of time the network processor threads are idle.
Shown as fraction
kafka.producer.available_buffer_bytes
(gauge)
The total amount of buffer memory that is not being used (either unallocated or in the free list)
Shown as byte
kafka.producer.batch_size_avg
(gauge)
The average number of bytes sent per partition per-request.
Shown as byte
kafka.producer.batch_size_max
(gauge)
The max number of bytes sent per partition per-request.
Shown as byte
kafka.producer.buffer_bytes_total
(gauge)
The maximum amount of buffer memory the client can use (whether or not it is currently used).
Shown as byte
kafka.producer.bufferpool_wait_ratio
(gauge)
The fraction of time an appender waits for space allocation.
kafka.producer.bufferpool_wait_time
(gauge)
The fraction of time an appender waits for space allocation.
kafka.producer.bufferpool_wait_time_ns_total
(gauge)
The total time in nanoseconds an appender waits for space allocation.
Shown as nanosecond
kafka.producer.bytes_out
(gauge)
Producer bytes out rate.
Shown as byte
kafka.producer.compression_rate
(gauge)
The average compression rate of record batches for a topic
Shown as fraction
kafka.producer.compression_rate_avg
(rate)
The average compression rate of record batches.
Shown as fraction
kafka.producer.delayed_requests
(gauge)
Number of producer requests delayed.
Shown as request
kafka.producer.expires_per_seconds
(gauge)
Rate of producer request expiration.
Shown as eviction
kafka.producer.io_wait
(gauge)
Producer I/O wait time.
Shown as nanosecond
kafka.producer.message_rate
(gauge)
Producer message rate.
Shown as message
kafka.producer.metadata_age
(gauge)
The age in seconds of the current producer metadata being used.
Shown as second
kafka.producer.record_error_rate
(gauge)
The average per-second number of errored record sends for a topic
Shown as error
kafka.producer.record_queue_time_avg
(gauge)
The average time in ms record batches spent in the record accumulator.
Shown as millisecond
kafka.producer.record_queue_time_max
(gauge)
The maximum time in ms record batches spent in the record accumulator.
Shown as millisecond
kafka.producer.record_retry_rate
(gauge)
The average per-second number of retried record sends for a topic
Shown as record
kafka.producer.record_send_rate
(gauge)
The average number of records sent per second for a topic
Shown as record
kafka.producer.record_size_avg
(gauge)
The average record size.
Shown as byte
kafka.producer.record_size_max
(gauge)
The maximum record size.
Shown as byte
kafka.producer.records_per_request
(gauge)
The average number of records sent per second.
Shown as record
kafka.producer.request_latency_avg
(gauge)
Producer average request latency.
Shown as millisecond
kafka.producer.request_latency_max
(gauge)
The maximum request latency in ms.
Shown as millisecond
kafka.producer.request_rate
(gauge)
Number of producer requests per second.
Shown as request
kafka.producer.requests_in_flight
(gauge)
The current number of in-flight requests awaiting a response.
Shown as request
kafka.producer.response_rate
(gauge)
Number of producer responses per second.
Shown as response
kafka.producer.throttle_time_avg
(gauge)
The average time in ms a request was throttled by a broker.
Shown as millisecond
kafka.producer.throttle_time_max
(gauge)
The maximum time in ms a request was throttled by a broker.
Shown as millisecond
kafka.producer.waiting_threads
(gauge)
The number of user threads blocked waiting for buffer memory to enqueue their records.
Shown as thread
kafka.replication.active_controller_count
(gauge)
Number of active controllers in the cluster.
Shown as node
kafka.replication.isr_expands.rate
(gauge)
Rate of replicas joining the ISR pool.
Shown as node
kafka.replication.isr_shrinks.rate
(gauge)
Rate of replicas leaving the ISR pool.
Shown as node
kafka.replication.leader_count
(gauge)
Number of leaders on this broker.
Shown as node
kafka.replication.leader_elections.rate
(gauge)
Leader election rate.
Shown as event
kafka.replication.max_lag
(gauge)
Maximum lag in messages between the follower and leader replicas.
Shown as offset
kafka.replication.offline_partitions_count
(gauge)
Number of partitions that don't have an active leader.
kafka.replication.partition_count
(gauge)
Number of partitions across all topics in the cluster.
kafka.replication.unclean_leader_elections.rate
(gauge)
Unclean leader election rate.
Shown as event
kafka.replication.under_min_isr_partition_count
(gauge)
Number of under min ISR partitions.
kafka.replication.under_replicated_partitions
(gauge)
Number of under replicated partitions.
kafka.request.channel.queue.size
(gauge)
Number of queued requests.
Shown as request
kafka.request.fetch.failed.rate
(gauge)
Client fetch request failures rate.
Shown as request
kafka.request.fetch_consumer.rate
(gauge)
Fetch consumer requests rate.
Shown as request
kafka.request.fetch_consumer.time.99percentile
(gauge)
Total time in ms to serve the specified request.
Shown as millisecond
kafka.request.fetch_consumer.time.avg
(gauge)
Total time in ms to serve the specified request.
Shown as millisecond
kafka.request.fetch_follower.rate
(gauge)
Fetch follower requests rate.
Shown as request
kafka.request.fetch_follower.time.99percentile
(gauge)
Total time in ms to serve the specified request.
Shown as millisecond
kafka.request.fetch_follower.time.avg
(gauge)
Total time in ms to serve the specified request.
Shown as millisecond
kafka.request.fetch_request_purgatory.size
(gauge)
Number of requests waiting in the producer purgatory.
Shown as request
kafka.request.handler.avg.idle.pct.rate
(gauge)
Average fraction of time the request handler threads are idle.
Shown as fraction
kafka.request.metadata.time.99percentile
(gauge)
Time for metadata requests for 99th percentile.
Shown as millisecond
kafka.request.metadata.time.avg
(gauge)
Average time for metadata request.
Shown as millisecond
kafka.request.offsets.time.99percentile
(gauge)
Time for offset requests for 99th percentile.
Shown as millisecond
kafka.request.offsets.time.avg
(gauge)
Average time for an offset request.
Shown as millisecond
kafka.request.produce.failed.rate
(gauge)
Failed produce requests rate.
Shown as request
kafka.request.produce.rate
(gauge)
Produce requests rate.
Shown as request
kafka.request.produce.time.99percentile
(gauge)
Time for produce requests for 99th percentile.
Shown as millisecond
kafka.request.produce.time.avg
(gauge)
Average time for a produce request.
Shown as millisecond
kafka.request.producer_request_purgatory.size
(gauge)
Number of requests waiting in the producer purgatory
Shown as request
kafka.request.update_metadata.time.99percentile
(gauge)
Time for update metadata requests for 99th percentile.
Shown as millisecond
kafka.request.update_metadata.time.avg
(gauge)
Average time for a request to update metadata.
Shown as millisecond
kafka.server.socket.connection_count
(gauge)
Number of currently open connections to the broker.
Shown as connection
kafka.session.fetch.count
(gauge)
Number of fetch sessions.
kafka.session.fetch.eviction
(gauge)
Eviction rate of fetch session.
Shown as event
kafka.session.zookeeper.disconnect.rate
(gauge)
Zookeeper client disconnect rate.
Shown as event
kafka.session.zookeeper.expire.rate
(gauge)
Zookeeper client session expiration rate.
Shown as event
kafka.session.zookeeper.readonly.rate
(gauge)
Zookeeper client readonly rate.
Shown as event
kafka.session.zookeeper.sync.rate
(gauge)
Zookeeper client sync rate.
Shown as event
kafka.topic.messages_in.rate
(gauge)
Incoming message rate by topic
Shown as message
kafka.topic.net.bytes_in.rate
(gauge)
Incoming byte rate by topic.
Shown as byte
kafka.topic.net.bytes_out.rate
(gauge)
Outgoing byte rate by topic.
Shown as byte
kafka.topic.net.bytes_rejected.rate
(gauge)
Rejected byte rate by topic.
Shown as byte

ヘルプ

Kafka チェックには、イベントは含まれません。

ヘルプ

kafka.can_connect
Agent が監視対象の Kafka インスタンスに接続できず、メトリクスを収集できない場合は、CRITICAL を返します。それ以外の場合は、OK を返します。
Statuses: ok, クリティカル

ヘルプ

その他の参考資料

Kafka コンシューマーインテグレーション

Kafka ダッシュボード

概要

この Agent チェックでは、メッセージオフセットのメトリクスのみを収集します。Kafka ブローカーまたは Java ベースのコンシューマー/プロデューサーから JMX メトリクスを収集したい場合は、Kafka チェックを参照してください。

ストリーミングデータパイプラインのトポロジーの視覚化や、データストリームセットアップ内の局所的なボトルネックの調査が有益であれば、Data Streams Monitoring を確認してください。

このチェックでは、Kafka ブローカーからのハイウォーターオフセット、Kafka または Zookeeper に保存されているコンシューマーオフセット (旧式のコンシューマーの場合)、および計算されたコンシューマーラグ (ブローカーオフセットとコンシューマーオフセットの差) を取得します。

注: このインテグレーションでは、コンシューマーオフセットはブローカーオフセットの前にチェックされます。最悪の場合、コンシューマーラグは少し誇張される可能性があります。これらのオフセットを逆の順序でチェックすると、コンシューマーラグが負の値まで過小評価される可能性があり、これは通常メッセージがスキップされていることを示す悲惨なシナリオです。

セットアップ

インストール

Agent の Kafka コンシューマーチェックは、Datadog Agent パッケージに含まれています。Kafka ノードに追加インストールする必要はありません。

構成

メトリクスベース SLO

ホストで実行中の Agent に対してこのチェックを構成するには

メトリクスの収集
  1. Agent のコンフィギュレーションディレクトリのルートにある conf.d/ フォルダーの kafka_consumer.d/conf.yaml ファイルを編集します。使用可能なすべてのコンフィギュレーションオプションの詳細については、サンプル kafka_consumer.d/conf.yaml を参照してください。

  2. Agent を再起動します

収集データ

このチェックは、その他のログを収集しません。Kafka ブローカーからログを収集するには、Kafka のログコレクション手順をご参照ください。

コンテナ化

コンテナ環境の場合は、オートディスカバリーのインテグレーションテンプレートのガイドを参照して、次のパラメーターを適用してください。

メトリクスの収集
パラメーター
<INTEGRATION_NAME>kafka_consumer
<INIT_CONFIG>空白または {}
<INSTANCE_CONFIG>{"kafka_connect_str": <KAFKA_CONNECT_STR>}
例: {"kafka_connect_str": "server:9092"}
収集データ

このチェックは、その他のログを収集しません。Kafka ブローカーからログを収集するには、Kafka のログコレクション手順をご参照ください。

検証

Agent の status サブコマンドを実行し、Checks セクションで kafka_consumer を探します。

リアルユーザーモニタリング

データセキュリティ

kafka.broker_offset
(gauge)
Current message offset on broker.
Shown as offset
kafka.consumer_lag
(gauge)
Lag in messages between consumer and broker.
Shown as offset
kafka.consumer_offset
(gauge)
Current message offset on consumer.
Shown as offset
kafka.estimated_consumer_lag
(gauge)
Lag in seconds between consumer and broker. This metric is provided through Data Streams Monitoring. Additional charges may apply.
Shown as second

ヘルプ

consumer_lag:
Datadog Agent は、consumer_lag メトリクスの値が 0 未満になると、topicpartition、および consumer_group のタグを付けてイベントを送信します。

ヘルプ

Kafka コンシューマーチェックには、サービスのチェック機能は含まれません。

ヘルプ

Kerberos GSSAPI 認証

Kafka クラスターの Kerberos 設定によっては、以下の構成が必要になる場合があります。

  • Datadog Agent が Kafka ブローカーに接続するために構成された Kafka クライアント。Kafka クライアントは、Kerberos プリンシパルとして追加し、Kerberos keytab に追加する必要があります。また、Kafka クライアントには、有効な Kerberos チケットが必要です。
  • Kafka ブローカーとのセキュアな接続を認証するための TLS 証明書。
    • JKS keystore を使用する場合、証明書は keystore からエクスポートする必要があり、ファイルパスは適切な tls_cert および tls_ca_cert オプションで構成される必要があります。
    • 証明書を認証するために秘密鍵が必要な場合、tls_private_key オプションで秘密鍵を構成する必要があります。また、秘密鍵のパスワードは tls_private_key_password オプションで構成する必要があります。
  • Kafka クライアントの Kerberos keytab の場所がデフォルトのパスと異なる場合は、その場所を指す KRB5_CLIENT_KTNAME 環境変数 (例: KRB5_CLIENT_KTNAME=/etc/krb5.keytab)
  • Kafka クライアントの Kerberos 資格情報チケットキャッシュがデフォルトのパスと異なる場合は、そのキャッシュを指す KRB5CCNAME 環境変数 (例: KRB5CCNAME=/tmp/krb5cc_xxx)
  • Datadog Agent が環境変数にアクセスできない場合は、オペレーティングシステム用の Datadog Agent サービス構成オーバーライドファイルで環境変数を構成してください。Datadog Agent のサービスユニットファイルを変更する手順は、Linux オペレーティングシステムによって異なる場合があります。例えば、Linux の systemd 環境では、以下のようになります。

Linux Systemd の例

  1. 環境ファイルで環境変数を構成します。 例: /path/to/environment/file
KRB5_CLIENT_KTNAME=/etc/krb5.keytab
KRB5CCNAME=/tmp/krb5cc_xxx
  1. Datadog Agent サービス構成オーバーライドファイル sudo systemctl edit datadog-agent.service を作成します。

  2. オーバーライドファイルで以下のように構成します。

[Service]
EnvironmentFile=/path/to/environment/file
  1. 以下のコマンドを実行して、systemd デーモン、datadog-agent サービス、および Datadog Agent を再ロードします。
sudo systemctl daemon-reload
sudo systemctl restart datadog-agent.service
sudo service datadog-agent restart

その他の参考資料