Kafka Broker

Supported OS Linux Windows Mac OS

Integration version2.16.0

Kafka Dashboard

Overview

View Kafka broker metrics collected for a 360-view of the health and performance of your Kafka clusters in real time. With this integration, you can collect metrics and logs from your Kafka deployment to visualize telemetry and alert on the performance of your Kafka stack.

If you would benefit from visualizing the topology of your streaming data pipelines and identifying the root cause of bottlenecks, learn more about Data Streams Monitoring.

Note:

  • This check has a limit of 350 metrics per instance. The number of returned metrics is indicated in the Agent status output. Specify the metrics you are interested in by editing the configuration below. For more detailed instructions on customizing the metrics to collect, see the JMX Checks documentation.
  • This integration attached sample configuration works only for Kafka >= 0.8.2. If you are running a version older than that, see the Agent v5.2.x released sample files.
  • To collect Kafka consumer metrics, see the kafka_consumer check.

Setup

Installation

The Agent’s Kafka check is included in the Datadog Agent package, so you don’t need to install anything else on your Kafka nodes.

The check collects metrics from JMX with JMXFetch. A JVM is needed on each kafka node so the Agent can run JMXFetch. The same JVM that Kafka uses can be used for this.

Note: The Kafka check cannot be used with Managed Streaming for Apache Kafka (Amazon MSK). Use the Amazon MSK integration instead.

Configuration

Host

To configure this check for an Agent running on a host:

Metric collection
  1. Edit the kafka.d/conf.yaml file, in the conf.d/ folder at the root of your Agent’s configuration directory. Kafka bean names depend on the exact Kafka version you’re running. Use the example configuration file that comes packaged with the Agent as a base since it is the most up-to-date configuration. Note: the Agent version in the example may be for a newer version of the Agent than what you have installed.

  2. Restart the Agent.

Log collection

Available for Agent versions >6.0

  1. Kafka uses the log4j logger by default. To activate logging to a file and customize the format edit the log4j.properties file:

      # Set root logger level to INFO and its only appender to R
      log4j.rootLogger=INFO, R
      log4j.appender.R.File=/var/log/kafka/server.log
      log4j.appender.R.layout=org.apache.log4j.PatternLayout
      log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
    
  2. By default, the Datadog integration pipeline supports the following conversion patterns:

      %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
      %d [%t] %-5p %c - %m%n
      %r [%t] %p %c %x - %m%n
      [%d] %p %m (%c)%n
    

    Clone and edit the integration pipeline if you have a different format.

  3. Collecting logs is disabled by default in the Datadog Agent, enable it in your datadog.yaml file:

    logs_enabled: true
    
  4. Add the following configuration block to your kafka.d/conf.yaml file. Change the path and service parameter values based on your environment. See the sample kafka.d/conf.yaml for all available configuration options.

    logs:
      - type: file
        path: /var/log/kafka/server.log
        source: kafka
        service: myapp
        #To handle multi line that starts with yyyy-mm-dd use the following pattern
        #log_processing_rules:
        #  - type: multi_line
        #    name: log_start_with_date
        #    pattern: \d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])
    
  5. Restart the Agent.

Containerized

Metric collection

For containerized environments, see the Autodiscovery with JMX guide.

Log collection

Available for Agent versions >6.0

Collecting logs is disabled by default in the Datadog Agent. To enable it, see Kubernetes Log Collection.

ParameterValue
<LOG_CONFIG>{"source": "kafka", "service": "<SERVICE_NAME>"}

Validation

Run the Agent’s status subcommand and look for kafka under the JMXFetch section:

========
JMXFetch
========
  Initialized checks
  ==================
    kafka
      instance_name : kafka-localhost-9999
      message :
      metric_count : 46
      service_check_count : 0
      status : OK

Data Collected

Metrics

kafka.consumer.bytes_consumed
(gauge)
The average number of bytes consumed per second for a specific topic.
Shown as byte
kafka.consumer.bytes_in
(gauge)
Consumer bytes in rate.
Shown as byte
kafka.consumer.delayed_requests
(gauge)
Number of delayed consumer requests.
Shown as request
kafka.consumer.expires_per_second
(gauge)
Rate of delayed consumer request expiration.
Shown as eviction
kafka.consumer.fetch_rate
(gauge)
The minimum rate at which the consumer sends fetch requests to a broker.
Shown as request
kafka.consumer.fetch_size_avg
(gauge)
The average number of bytes fetched per request for a specific topic.
Shown as byte
kafka.consumer.fetch_size_max
(gauge)
The maximum number of bytes fetched per request for a specific topic.
Shown as byte
kafka.consumer.kafka_commits
(gauge)
Rate of offset commits to Kafka.
Shown as write
kafka.consumer.max_lag
(gauge)
Maximum consumer lag.
Shown as offset
kafka.consumer.messages_in
(gauge)
Rate of consumer message consumption.
Shown as message
kafka.consumer.records_consumed
(gauge)
The average number of records consumed per second for a specific topic.
Shown as record
kafka.consumer.records_per_request_avg
(gauge)
The average number of records in each request for a specific topic.
Shown as record
kafka.consumer.zookeeper_commits
(gauge)
Rate of offset commits to ZooKeeper.
Shown as write
kafka.expires_sec
(gauge)
Rate of delayed producer request expiration.
Shown as eviction
kafka.follower.expires_per_second
(gauge)
Rate of request expiration on followers.
Shown as eviction
kafka.log.flush_rate.rate
(gauge)
Log flush rate.
Shown as flush
kafka.messages_in.rate
(gauge)
Incoming message rate.
Shown as message
kafka.net.bytes_in.rate
(gauge)
Incoming byte rate.
Shown as byte
kafka.net.bytes_out
(gauge)
Outgoing byte total.
Shown as byte
kafka.net.bytes_out.rate
(gauge)
Outgoing byte rate.
Shown as byte
kafka.net.bytes_rejected.rate
(gauge)
Rejected byte rate.
Shown as byte
kafka.net.processor.avg.idle.pct.rate
(gauge)
Average fraction of time the network processor threads are idle.
Shown as fraction
kafka.producer.available_buffer_bytes
(gauge)
The total amount of buffer memory that is not being used (either unallocated or in the free list)
Shown as byte
kafka.producer.batch_size_avg
(gauge)
The average number of bytes sent per partition per-request.
Shown as byte
kafka.producer.batch_size_max
(gauge)
The max number of bytes sent per partition per-request.
Shown as byte
kafka.producer.buffer_bytes_total
(gauge)
The maximum amount of buffer memory the client can use (whether or not it is currently used).
Shown as byte
kafka.producer.bufferpool_wait_ratio
(gauge)
The fraction of time an appender waits for space allocation.
kafka.producer.bufferpool_wait_time
(gauge)
The fraction of time an appender waits for space allocation.
kafka.producer.bufferpool_wait_time_ns_total
(gauge)
The total time in nanoseconds an appender waits for space allocation.
Shown as nanosecond
kafka.producer.bytes_out
(gauge)
Producer bytes out rate.
Shown as byte
kafka.producer.compression_rate
(gauge)
The average compression rate of record batches for a topic
Shown as fraction
kafka.producer.compression_rate_avg
(rate)
The average compression rate of record batches.
Shown as fraction
kafka.producer.delayed_requests
(gauge)
Number of producer requests delayed.
Shown as request
kafka.producer.expires_per_seconds
(gauge)
Rate of producer request expiration.
Shown as eviction
kafka.producer.io_wait
(gauge)
Producer I/O wait time.
Shown as nanosecond
kafka.producer.message_rate
(gauge)
Producer message rate.
Shown as message
kafka.producer.metadata_age
(gauge)
The age in seconds of the current producer metadata being used.
Shown as second
kafka.producer.record_error_rate
(gauge)
The average per-second number of errored record sends for a topic
Shown as error
kafka.producer.record_queue_time_avg
(gauge)
The average time in ms record batches spent in the record accumulator.
Shown as millisecond
kafka.producer.record_queue_time_max
(gauge)
The maximum time in ms record batches spent in the record accumulator.
Shown as millisecond
kafka.producer.record_retry_rate
(gauge)
The average per-second number of retried record sends for a topic
Shown as record
kafka.producer.record_send_rate
(gauge)
The average number of records sent per second for a topic
Shown as record
kafka.producer.record_size_avg
(gauge)
The average record size.
Shown as byte
kafka.producer.record_size_max
(gauge)
The maximum record size.
Shown as byte
kafka.producer.records_per_request
(gauge)
The average number of records sent per second.
Shown as record
kafka.producer.request_latency_avg
(gauge)
Producer average request latency.
Shown as millisecond
kafka.producer.request_latency_max
(gauge)
The maximum request latency in ms.
Shown as millisecond
kafka.producer.request_rate
(gauge)
Number of producer requests per second.
Shown as request
kafka.producer.requests_in_flight
(gauge)
The current number of in-flight requests awaiting a response.
Shown as request
kafka.producer.response_rate
(gauge)
Number of producer responses per second.
Shown as response
kafka.producer.throttle_time_avg
(gauge)
The average time in ms a request was throttled by a broker.
Shown as millisecond
kafka.producer.throttle_time_max
(gauge)
The maximum time in ms a request was throttled by a broker.
Shown as millisecond
kafka.producer.waiting_threads
(gauge)
The number of user threads blocked waiting for buffer memory to enqueue their records.
Shown as thread
kafka.replication.active_controller_count
(gauge)
Number of active controllers in the cluster.
Shown as node
kafka.replication.isr_expands.rate
(gauge)
Rate of replicas joining the ISR pool.
Shown as node
kafka.replication.isr_shrinks.rate
(gauge)
Rate of replicas leaving the ISR pool.
Shown as node
kafka.replication.leader_count
(gauge)
Number of leaders on this broker.
Shown as node
kafka.replication.leader_elections.rate
(gauge)
Leader election rate.
Shown as event
kafka.replication.max_lag
(gauge)
Maximum lag in messages between the follower and leader replicas.
Shown as offset
kafka.replication.offline_partitions_count
(gauge)
Number of partitions that don't have an active leader.
kafka.replication.partition_count
(gauge)
Number of partitions across all topics in the cluster.
kafka.replication.unclean_leader_elections.rate
(gauge)
Unclean leader election rate.
Shown as event
kafka.replication.under_min_isr_partition_count
(gauge)
Number of under min ISR partitions.
kafka.replication.under_replicated_partitions
(gauge)
Number of under replicated partitions.
kafka.request.channel.queue.size
(gauge)
Number of queued requests.
Shown as request
kafka.request.fetch.failed.rate
(gauge)
Client fetch request failures rate.
Shown as request
kafka.request.fetch_consumer.rate
(gauge)
Fetch consumer requests rate.
Shown as request
kafka.request.fetch_consumer.time.99percentile
(gauge)
Total time in ms to serve the specified request.
Shown as millisecond
kafka.request.fetch_consumer.time.avg
(gauge)
Total time in ms to serve the specified request.
Shown as millisecond
kafka.request.fetch_follower.rate
(gauge)
Fetch follower requests rate.
Shown as request
kafka.request.fetch_follower.time.99percentile
(gauge)
Total time in ms to serve the specified request.
Shown as millisecond
kafka.request.fetch_follower.time.avg
(gauge)
Total time in ms to serve the specified request.
Shown as millisecond
kafka.request.fetch_request_purgatory.size
(gauge)
Number of requests waiting in the producer purgatory.
Shown as request
kafka.request.handler.avg.idle.pct.rate
(gauge)
Average fraction of time the request handler threads are idle.
Shown as fraction
kafka.request.metadata.time.99percentile
(gauge)
Time for metadata requests for 99th percentile.
Shown as millisecond
kafka.request.metadata.time.avg
(gauge)
Average time for metadata request.
Shown as millisecond
kafka.request.offsets.time.99percentile
(gauge)
Time for offset requests for 99th percentile.
Shown as millisecond
kafka.request.offsets.time.avg
(gauge)
Average time for an offset request.
Shown as millisecond
kafka.request.produce.failed.rate
(gauge)
Failed produce requests rate.
Shown as request
kafka.request.produce.rate
(gauge)
Produce requests rate.
Shown as request
kafka.request.produce.time.99percentile
(gauge)
Time for produce requests for 99th percentile.
Shown as millisecond
kafka.request.produce.time.avg
(gauge)
Average time for a produce request.
Shown as millisecond
kafka.request.producer_request_purgatory.size
(gauge)
Number of requests waiting in the producer purgatory
Shown as request
kafka.request.update_metadata.time.99percentile
(gauge)
Time for update metadata requests for 99th percentile.
Shown as millisecond
kafka.request.update_metadata.time.avg
(gauge)
Average time for a request to update metadata.
Shown as millisecond
kafka.server.socket.connection_count
(gauge)
Number of currently open connections to the broker.
Shown as connection
kafka.session.fetch.count
(gauge)
Number of fetch sessions.
kafka.session.fetch.eviction
(gauge)
Eviction rate of fetch session.
Shown as event
kafka.session.zookeeper.disconnect.rate
(gauge)
Zookeeper client disconnect rate.
Shown as event
kafka.session.zookeeper.expire.rate
(gauge)
Zookeeper client session expiration rate.
Shown as event
kafka.session.zookeeper.readonly.rate
(gauge)
Zookeeper client readonly rate.
Shown as event
kafka.session.zookeeper.sync.rate
(gauge)
Zookeeper client sync rate.
Shown as event
kafka.topic.messages_in.rate
(gauge)
Incoming message rate by topic
Shown as message
kafka.topic.net.bytes_in.rate
(gauge)
Incoming byte rate by topic.
Shown as byte
kafka.topic.net.bytes_out.rate
(gauge)
Outgoing byte rate by topic.
Shown as byte
kafka.topic.net.bytes_rejected.rate
(gauge)
Rejected byte rate by topic.
Shown as byte

Events

The Kafka check does not include any events.

Service Checks

kafka.can_connect
Returns CRITICAL if the Agent is unable to connect to and collect metrics from the monitored Kafka instance, WARNING if no metrics are collected, and OK otherwise.
Statuses: ok, critical, warning

Troubleshooting

Further Reading

Kafka Consumer Integration

Kafka Dashboard

Overview

This Agent integration collects message offset metrics from your Kafka consumers. This check fetches the highwater offsets from the Kafka brokers, consumer offsets that are stored in Kafka (or Zookeeper for old-style consumers), and then calculates consumer lag (which is the difference between the broker offset and the consumer offset).

If you would benefit from visualizing the topology of your streaming data pipelines and identifying the root cause of bottlenecks, learn more about Data Streams Monitoring.

Note:

  • This integration ensures that consumer offsets are checked before broker offsets; in the worst case, consumer lag may be a little overstated. Checking these offsets in the reverse order can understate consumer lag to the point of having negative values, which is a dire scenario usually indicating messages are being skipped.
  • If you want to collect JMX metrics from your Kafka brokers or Java-based consumers/producers, see the Kafka Broker integration.

Setup

Installation

The Agent’s Kafka consumer check is included in the Datadog Agent package. No additional installation is needed on your Kafka nodes.

Configuration

Host

To configure this check for an Agent running on a host running your Kafka consumers:

Metric collection
  1. Edit the kafka_consumer.d/conf.yaml file, in the conf.d/ folder at the root of your Agent’s configuration directory. See the sample kafka_consumer.d/conf.yaml for all available configuration options.

  2. Restart the Agent.

Log collection

This check does not collect additional logs. To collect logs from Kafka brokers, see log collection instructions for Kafka.

Containerized

For containerized environments, see the Autodiscovery Integration Templates for guidance on applying the parameters below.

Metric collection
ParameterValue
<INTEGRATION_NAME>kafka_consumer
<INIT_CONFIG>blank or {}
<INSTANCE_CONFIG>{"kafka_connect_str": <KAFKA_CONNECT_STR>}
For example, {"kafka_connect_str": "server:9092"}
Log collection

This check does not collect additional logs. To collect logs from Kafka brokers, see log collection instructions for Kafka.

Validation

Run the Agent’s status subcommand and look for kafka_consumer under the Checks section.

Data Collected

Metrics

kafka.broker_offset
(gauge)
Current message offset on broker.
Shown as offset
kafka.consumer_lag
(gauge)
Lag in messages between consumer and broker.
Shown as offset
kafka.consumer_offset
(gauge)
Current message offset on consumer.
Shown as offset
kafka.estimated_consumer_lag
(gauge)
Lag in seconds between consumer and broker. This metric is provided through Data Streams Monitoring. Additional charges may apply.
Shown as second

Events

consumer_lag:
The Datadog Agent emits an event when the value of the consumer_lag metric goes below 0, tagging it with topic, partition and consumer_group.

Service Checks

The Kafka-consumer check does not include any service checks.

Troubleshooting

Kerberos GSSAPI Authentication

Depending on your Kafka cluster’s Kerberos setup, you may need to configure the following:

  • Kafka client configured for the Datadog Agent to connect to the Kafka broker. The Kafka client should be added as a Kerberos principal and added to a Kerberos keytab. The Kafka client should also have a valid kerberos ticket.
  • TLS certificate to authenticate a secure connection to the Kafka broker.
    • If JKS keystore is used, a certificate needs to be exported from the keystore and the file path should be configured with the applicable tls_cert and/or tls_ca_cert options.
    • If a private key is required to authenticate the certificate, it should be configured with the tls_private_key option. If applicable, the private key password should be configured with the tls_private_key_password.
  • KRB5_CLIENT_KTNAME environment variable pointing to the Kafka client’s Kerberos keytab location if it differs from the default path (for example, KRB5_CLIENT_KTNAME=/etc/krb5.keytab)
  • KRB5CCNAME environment variable pointing to the Kafka client’s Kerberos credentials ticket cache if it differs from the default path (for example, KRB5CCNAME=/tmp/krb5cc_xxx)
  • If the Datadog Agent is unable to access the environment variables, configure the environment variables in a Datadog Agent service configuration override file for your operating system. The procedure for modifying the Datadog Agent service unit file may vary for different Linux operating systems. For example, in a Linux systemd environment:

Linux Systemd Example

  1. Configure the environment variables in an environment file. For example: /path/to/environment/file
KRB5_CLIENT_KTNAME=/etc/krb5.keytab
KRB5CCNAME=/tmp/krb5cc_xxx
  1. Create a Datadog Agent service configuration override file: sudo systemctl edit datadog-agent.service

  2. Configure the following in the override file:

[Service]
EnvironmentFile=/path/to/environment/file
  1. Run the following commands to reload the systemd daemon, datadog-agent service, and Datadog Agent:
sudo systemctl daemon-reload
sudo systemctl restart datadog-agent.service
sudo service datadog-agent restart

Further Reading