OTel Kafka Metrics Remapping is in public alpha. It is available in versions >= 0.93.0 of the collector. If you have feedback related to this, reach out to your account team to provide your input.

Overview

OpenTelemetry Kafka metrics in OOTB Kafka dashboard

The Kafka metrics receiver, JMX Receiver/ JMX Metrics Gatherer allow collecting Kafka metrics and access to the out of the box Kafka Dashboard, “Kafka, Zookeeper and Kafka Consumer Overview”.

Please note that the JMX Receiver and JMX Metrics Gatherer should be considered as replacements. They collect the same set of metrics (JMX Receiver launches the JMX Metrics Gatherer).

Kafka metrics receiver

receivers:
  kafkametrics:
    brokers: "${env:KAFKA_BROKER_ADDRESS}"
    protocol_version: 2.0.0
    scrapers:
      - brokers
      - topics
      - consumers

The Kafka metrics receiver needs to be used in a collector in deployment mode with a single replica. This ensures that the same metric is not collected multiple times. The collector in deployment mode can then leverage the Datadog Exporter to export the metrics directly to Datadog, or leverage the OTLP exporter to forward the metrics to another collector instance.

Add the following lines to values.yaml:

mode: deployment

Add the following in the Collector configuration:

receivers:
  kafkametrics:
    brokers: ${env:KAFKA_BROKER_ADDRESS}
    protocol_version: 2.0.0
    scrapers:
      - brokers
      - topics
      - consumers

JMX receiver

The JMX Receiver has the following requirements:

  • JRE is available on the host where you are running the collector.
  • The JMX Metric Gatherer JAR is available on the host where you are running the collector. You can download the most recent release of the JMX Metric Gatherer JAR from the opentelemetry-java-contrib releases page.

Add the following in the Collector configuration:

receivers:
  jmx:
    jar_path: /path/to/opentelemetry-jmx-metrics.jar
    endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
    target_system: kafka,jvm
  jmx/consumer:
    jar_path: /path/to/opentelemetry-jmx-metrics.jar
    endpoint: ${env:KAFKA_CONSUMER_JMX_ADDRESS}
    target_system: kafka-consumer
  jmx/producer:
    jar_path: /path/to/opentelemetry-jmx-metrics.jar
    endpoint: ${env:KAFKA_PRODUCER_JMX_ADDRESS}
    target_system: kafka-producer

The JMX receiver needs to be used in a collector in deployment mode with a single replica. This ensures that the same metric is not collected multiple times. The collector in deployment mode can then leverage the Datadog Exporter to export the metrics directly to Datadog, or leverage the OTLP exporter to forward the metrics to another collector instance.

The JMX Receiver has the following requirements:

  • JRE is available on the host in which you are running the collector.
  • The JMX Metric Gatherer JAR is available on the host in which you are running the collector. You can download the most recent release of the JMX Metric Gatherer JAR here.

Because the OTel collector default image does not meet the requirements above, a custom image needs to be built. Please refer to the Dockerfile below for an example image that contains the collector binary, JRE, and JMX Metrics Gatherer Jar.

Dockerfile:

FROM alpine:latest as prep

# OpenTelemetry Collector Binary
ARG OTEL_VERSION=0.92.0
ARG TARGETARCH=linux_amd64
ADD "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_${TARGETARCH}.tar.gz" /otelcontribcol
RUN tar -zxvf /otelcontribcol

# JMX Metrics Gatherer Jar
ARG JMX_GATHERER_JAR_VERSION=1.27.0
ADD https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v${JMX_GATHERER_JAR_VERSION}/opentelemetry-jmx-metrics.jar /opt/opentelemetry-jmx-metrics.jar
# nonroot user id (https://groups.google.com/g/distroless-users/c/-DpzCr7xRDY/m/eQqJmJroCgAJ)
ARG USER_UID=65532
RUN chown ${USER_UID} /opt/opentelemetry-jmx-metrics.jar


FROM gcr.io/distroless/java17-debian11:nonroot

COPY --from=prep /opt/opentelemetry-jmx-metrics.jar /opt/opentelemetry-jmx-metrics.jar
COPY --from=prep /otelcol-contrib /otelcol-contrib

EXPOSE 4317 55680 55679
ENTRYPOINT ["/otelcol-contrib"]
CMD ["--config", "/etc/otelcol-contrib/config.yaml"]

Add the following lines to values.yaml:

mode: deployment

Add the following in the Collector configuration:

receivers:
  jmx:
    jar_path: /path/to/opentelemetry-jmx-metrics.jar
    endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
    target_system: kafka,jvm
  jmx/consumer:
    jar_path: /path/to/opentelemetry-jmx-metrics.jar
    endpoint: ${env:KAFKA_CONSUMER_JMX_ADDRESS}
    target_system: kafka-consumer
  jmx/producer:
    jar_path: /path/to/opentelemetry-jmx-metrics.jar
    endpoint: ${env:KAFKA_PRODUCER_JMX_ADDRESS}
    target_system: kafka-producer

JMX Metrics Gatherer

The JMX Metric Gatherer is intended to be run as an uber jar and configured with properties from the command line.

Please make sure that JRE is available on the host in which you are running the collector. If not, please make sure to download it, e.g.

apt-get update && \
apt-get -y install default-jre-headless

Once you have done this, download the most recent release of the JMX Metric Gatherer JAR here and run:

// Kafka Broker
java -jar -Dotel.jmx.service.url=service:jmx:rmi:///jndi/rmi://{KAFKA_BROKER_JMX_ADDRESS}/jmxrmi \ -Dotel.jmx.target.system=kafka,jvm \
-Dotel.metrics.exporter=otlp \
-Dotel.exporter.otlp.endpoint=http://localhost:4317 \
-jar /path/to/opentelemetry-jmx-metrics.jar

// Kafka Producer
java -jar -Dotel.jmx.service.url=service:jmx:rmi:///jndi/rmi://{KAFKA_PRODUCER_JMX_ADDRESS}/jmxrmi \ -Dotel.jmx.target.system=kafka-producer \
-Dotel.metrics.exporter=otlp \
-Dotel.exporter.otlp.endpoint=http://localhost:4317 \
-jar /path/to/opentelemetry-jmx-metrics.jar

// Kafka Consumer
java -jar -Dotel.jmx.service.url=service:jmx:rmi:///jndi/rmi://{KAFKA_CONSUMER_JMX_ADDRESS}/jmxrmi \ -Dotel.jmx.target.system=kafka-consumer \
-Dotel.metrics.exporter=otlp \
-Dotel.exporter.otlp.endpoint=http://localhost:4317 \
-jar /path/to/opentelemetry-jmx-metrics.jar

The JMX Metric Gatherer is intended to be run as an uber jar and configured with properties from the command line.

In order to deploy this in Kubernetes, you need to build an image which contains JRE and the JMX Metrics Gatherer Jar. Please see the Dockerfile below for an example image that contains JRE and JMX Metrics Gatherer Jar.

Dockerfile:

FROM alpine:latest as prep

# JMX Metrics Gatherer Jar
ARG JMX_GATHERER_JAR_VERSION=1.27.0
ADD https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v${JMX_GATHERER_JAR_VERSION}/opentelemetry-jmx-metrics.jar /opt/opentelemetry-jmx-metrics.jar
# nonroot user id (https://groups.google.com/g/distroless-users/c/-DpzCr7xRDY/m/eQqJmJroCgAJ)
ARG USER_UID=65532
RUN chown ${USER_UID} /opt/opentelemetry-jmx-metrics.jar

FROM gcr.io/distroless/java17-debian11:nonroot

COPY --from=prep /opt/opentelemetry-jmx-metrics.jar /opt/opentelemetry-jmx-metrics.jar

EXPOSE 4317 55680 55679
ENTRYPOINT ["java"]
CMD ["-Dotel.jmx.service.url=service:jmx:rmi:///jndi/rmi://kafka:1099/jmxrmi", \
"-Dotel.jmx.target.system=kafka,jvm", \
"-Dotel.metrics.exporter=otlp", \
"-Dotel.exporter.otlp.endpoint=http://otelcol:4317", \
"-jar", \
"/opt/opentelemetry-jmx-metrics.jar"]

Log collection

See Log Collection for instructions on how to collect logs using the OpenTelemetry Collector.

To appear in the out-of-the-box Kafka Dashboard, the Kafka logs need to be tagged with source:kafka. To do this, use an attributes processor:

processors:
  attributes:
    actions:
      - key: ddtags
        value: "source:kafka"
        action: insert

In order to ensure this attribute only gets added to your Kafka logs, use include/exclude filtering of the attributes processor.

Data collected

OTELDATADOGDESCRIPTIONFILTERTRANSFORM
kafka.consumer.bytes-consumed-ratekafka.consumer.bytes_consumedThe average number of bytes consumed per secondRename attribute key client-id to client
kafka.consumer.fetch-size-avgkafka.consumer.fetch_size_avgThe average number of bytes fetched per requestRename attribute key client-id to client
kafka.consumer.records-consumed-ratekafka.consumer.records_consumedThe average number of records consumed per secondRename attribute key client-id to client
kafka.consumer.total.bytes-consumed-ratekafka.consumer.bytes_inThe average number of bytes consumed for all topics per second.
kafka.consumer.total.records-consumed-ratekafka.consumer.messages_inThe average number of records consumed for all topics per second.
kafka.consumer_group.lagkafka.consumer_lagCurrent approximate lag of consumer group at partition of topicRename attribute key group to consumer_group
kafka.consumer_group.offsetkafka.consumer_offsetCurrent offset of the consumer group at partition of topicRename attribute key group to consumer_group
kafka.controller.active.countkafka.replication.active_controller_countController is active on broker
kafka.isr.operation.countkafka.replication.isr_expands.rateThe number of in-sync replica shrink and expand operationsoperation: expand
kafka.isr.operation.countkafka.replication.isr_shrinks.rateThe number of in-sync replica shrink and expand operationsoperation: shrink
kafka.leader.election.ratekafka.replication.leader_elections.rateLeader election rate - increasing indicates broker failures
kafka.logs.flush.time.countkafka.log.flush_rate.rateLog flush count
kafka.max.lagkafka.replication.max_lagMax lag in messages between follower and leader replicas
kafka.message.countkafka.messages_in.rateThe number of messages received by the broker
kafka.network.iokafka.net.bytes_in.rateThe bytes received or sent by the brokerstate: in
kafka.network.iokafka.net.bytes_out.rateThe bytes received or sent by the brokerstate: out
kafka.partition.countkafka.replication.partition_countThe number of partitions on the broker
kafka.partition.current_offsetkafka.broker_offsetCurrent offset of partition of topic.Rename attribute key group to consumer_group
kafka.partition.offlinekafka.replication.offline_partitions_countThe number of partitions offline
kafka.partition.under_replicatedkafka.replication.under_replicated_partitionsThe number of under replicated partitions
kafka.producer.byte-ratekafka.producer.bytes_outThe average number of bytes sent per second for a topic.Rename attribute key client-id to client
kafka.producer.compression-ratekafka.producer.compression_rateThe average compression rate of record batches for a topic.Rename attribute key client-id to client
kafka.producer.io-wait-time-ns-avgkafka.producer.io_waitThe average length of time the I/O thread spent waiting for a socket ready for reads or writes.
kafka.producer.outgoing-byte-ratekafka.producer.bytes_outThe average number of outgoing bytes sent per second to all servers.
kafka.producer.record-error-ratekafka.producer.record_error_rateThe average per-second number of record sends that resulted in errors for a topic.Rename attribute key client-id to client
kafka.producer.record-retry-ratekafka.producer.record_retry_rateThe average per-second number of retried record sends for a topic.Rename attribute key client-id to client
kafka.producer.record-send-ratekafka.producer.record_send_rateThe average number of records sent per second for a topic.Rename attribute key client-id to client
kafka.producer.request-latency-avgkafka.producer.request_latency_avgThe average request latency.
kafka.producer.request-ratekafka.producer.request_rateThe average number of requests sent per second.
kafka.producer.response-ratekafka.producer.response_rateResponses received per second.
kafka.purgatory.sizekafka.request.fetch_request_purgatory.sizeThe number of requests waiting in purgatorytype: fetch
kafka.purgatory.sizekafka.request.producer_request_purgatory.sizeThe number of requests waiting in purgatorytype: produce
kafka.request.failedkafka.request.fetch.failed.rateThe number of requests to the broker resulting in a failuretype: fetch
kafka.request.failedkafka.request.produce.failed.rateThe number of requests to the broker resulting in a failuretype: produce
kafka.request.queuekafka.request.channel.queue.sizeSize of the request queue
kafka.request.time.99pkafka.request.fetch_consumer.time.99percentileThe 99th percentile time the broker has taken to service requeststype: fetchconsumer
kafka.request.time.99pkafka.request.fetch_follower.time.99percentileThe 99th percentile time the broker has taken to service requeststype: fetchfollower
kafka.request.time.99pkafka.request.produce.time.99percentileThe 99th percentile time the broker has taken to service requeststype: produce
kafka.request.time.avgkafka.request.fetch_consumer.time.avgThe average time the broker has taken to service requeststype: fetchconsumer
kafka.request.time.avgkafka.request.fetch_follower.time.avgThe average time the broker has taken to service requeststype: fetchfollower
kafka.request.time.avgkafka.request.produce.time.avgThe average time the broker has taken to service requeststype: produce
kafka.unclean.election.ratekafka.replication.unclean_leader_elections.rateUnclean leader election rate - increasing indicates broker failures

Note: In Datadog - gets translated to _. For the metrics prepended by otel., this means that the OTel metric name and the Datadog metric name are the same (for example, kafka.producer.request-rate and kafka.producer.request_rate). In order to avoid double counting for these metrics, the OTel metric is then prepended with otel..

See OpenTelemetry Metrics Mapping for more information.

Full example configuration

For a full working example configuration with the Datadog exporter, see kafka.yaml.

Example logging output

Resource SchemaURL: https://opentelemetry.io/schemas/1.20.0
Resource attributes:
     -> service.name: Str(unknown_service:java)
     -> telemetry.sdk.language: Str(java)
     -> telemetry.sdk.name: Str(opentelemetry)
     -> telemetry.sdk.version: Str(1.27.0)
ScopeMetrics #0
ScopeMetrics SchemaURL: 
InstrumentationScope io.opentelemetry.contrib.jmxmetrics 1.27.0-alpha
Metric #0
Descriptor:
     -> Name: kafka.message.count
     -> Description: The number of messages received by the broker
     -> Unit: {messages}
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative
NumberDataPoints #0
StartTimestamp: 2024-01-22 15:50:24.212 +0000 UTC
Timestamp: 2024-01-22 15:51:24.218 +0000 UTC
Value: 25

Example app

Please see the following example application which demonstrates the configurations discussed in this documentation. This example application is comprised of a producer, consumer, broker and zookeeper instance. It demonstrates using the Kafka metrics receiver, JMX Receiver and/or JMX Metrics Gatherer.