Kafka

Supported OS Linux Mac OS Windows

Intégration2.13.0

Dashboard Kafka

Présentation

Associez Kafka à Datadog pour :

  • Visualiser les performances de votre cluster en temps réel
  • Corréler les performances de Kafka avec le reste de vos applications

Ce check prévoit une limite de 350 métriques par instance. Le nombre de métriques renvoyées est indiqué dans la page d’information. Choisissez des métriques pertinentes en modifiant la configuration ci-dessous. Pour découvrir comment personnaliser la liste des métriques à recueillir, consultez la documentation relative aux checks JMX afin d’obtenir des instructions détaillées.

Pour recueillir les métriques relatives aux consommateurs de Kafka, consultez la section check kafka_consumer.

Remarque : cet exemple de configuration de l’intégration ne fonctionne que pour Kafka 0.8.2 ou les versions ultérieures. Si vous utilisez une version antérieure, consultez les exemples de fichier pour les versions 5.2.x de l’Agent.

Configuration

Installation

Le check Kafka de l’Agent est inclus avec le paquet de l’Agent Datadog : vous n’avez donc rien d’autre à installer sur vos nœuds Kafka.

Le check recueille des métriques à partir de JMX avec JMXFetch. Pour que l’Agent puisse exécuter JMXFetch, chaque nœud Kafka nécessite une JVM. Il peut s’agir de la même JVM que celle utilisée par Kafka.

Remarque : Il n’est pas possible d’utiliser le check Kafka avec Managed Streaming for Apache Kafka (Amazon MSK). Utilisez plutôt l’intégration Amazon MSK.

Configuration

Host

Pour configurer ce check lorsque l’Agent est exécuté sur un host :

Collecte de métriques
  1. Modifiez le fichier kafka.d/conf.yaml dans le dossier conf.d/ à la racine du répertoire de configuration de votre Agent. Les noms des beans de Kafka dépendent de la version précise de Kafka que vous exécutez. Utilisez le fichier d’exemple de configuration fourni avec l’Agent pour vous guider. Il s’agit de la configuration la plus récente. Remarque : la version de l’Agent citée dans l’exemple peut correspondre à une version plus récente que celle que vous avez installée.

  2. Redémarrez l’Agent.

Collecte de logs

Disponible à partir des versions > 6.0 de l’Agent

  1. Kafka utilise le logger log4j par défaut. Pour activer la journalisation dans un fichier et personnaliser le format, modifiez le fichier log4j.properties :

      # Set root logger level to INFO and its only appender to R
      log4j.rootLogger=INFO, R
      log4j.appender.R.File=/var/log/kafka/server.log
      log4j.appender.R.layout=org.apache.log4j.PatternLayout
      log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
    
  2. Par défaut, le pipeline d’intégration de Datadog prend en charge les expressions de conversion suivantes :

      %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
      %d [%t] %-5p %c - %m%n
      %r [%t] %p %c %x - %m%n
      [%d] %p %m (%c)%n
    

    Dupliquez et modifiez le pipeline d’intégration si vous utilisez un autre format.

  3. La collecte de logs est désactivée par défaut dans l’Agent Datadog. Vous devez l’activer dans datadog.yaml :

    logs_enabled: true
    
  4. Ajoutez le bloc de configuration suivant à votre fichier kafka.d/conf.yaml. Modifiez les valeurs des paramètres path et service en fonction de votre environnement. Consultez le fichier d’exemple kafka.d/conf.yaml pour découvrir toutes les options de configuration disponibles.

    logs:
      - type: file
        path: /var/log/kafka/server.log
        source: kafka
        service: myapp
        #To handle multi line that starts with yyyy-mm-dd use the following pattern
        #log_processing_rules:
        #  - type: multi_line
        #    name: log_start_with_date
        #    pattern: \d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])
    
  5. Redémarrez l’Agent.

Environnement conteneurisé

Collecte de métriques

Pour les environnements conteneurisés, consultez le guide Autodiscovery avec JMX.

Collecte de logs

Disponible à partir des versions > 6.0 de l’Agent

La collecte des logs est désactivée par défaut dans l’Agent Datadog. Pour l’activer, consultez la section Collecte de logs Kubernetes.

ParamètreValeur
<CONFIG_LOG>{"source": "kafka", "service": "<NOM_SERVICE>"}

Validation

Lancez la sous-commande status de l’Agent et cherchez kafka dans la section JMXFetch :

========
JMXFetch
========
  Initialized checks
  ==================
    kafka
      instance_name : kafka-localhost-9999
      message :
      metric_count : 46
      service_check_count : 0
      status : OK

Données collectées

Métriques

kafka.consumer.bytes_consumed
(gauge)
The average number of bytes consumed per second for a specific topic.
Shown as byte
kafka.consumer.bytes_in
(gauge)
Consumer bytes in rate.
Shown as byte
kafka.consumer.delayed_requests
(gauge)
Number of delayed consumer requests.
Shown as request
kafka.consumer.expires_per_second
(gauge)
Rate of delayed consumer request expiration.
Shown as eviction
kafka.consumer.fetch_rate
(gauge)
The minimum rate at which the consumer sends fetch requests to a broker.
Shown as request
kafka.consumer.fetch_size_avg
(gauge)
The average number of bytes fetched per request for a specific topic.
Shown as byte
kafka.consumer.fetch_size_max
(gauge)
The maximum number of bytes fetched per request for a specific topic.
Shown as byte
kafka.consumer.kafka_commits
(gauge)
Rate of offset commits to Kafka.
Shown as write
kafka.consumer.max_lag
(gauge)
Maximum consumer lag.
Shown as offset
kafka.consumer.messages_in
(gauge)
Rate of consumer message consumption.
Shown as message
kafka.consumer.records_consumed
(gauge)
The average number of records consumed per second for a specific topic.
Shown as record
kafka.consumer.records_per_request_avg
(gauge)
The average number of records in each request for a specific topic.
Shown as record
kafka.consumer.zookeeper_commits
(gauge)
Rate of offset commits to ZooKeeper.
Shown as write
kafka.expires_sec
(gauge)
Rate of delayed producer request expiration.
Shown as eviction
kafka.follower.expires_per_second
(gauge)
Rate of request expiration on followers.
Shown as eviction
kafka.log.flush_rate.rate
(gauge)
Log flush rate.
Shown as flush
kafka.messages_in.rate
(gauge)
Incoming message rate.
Shown as message
kafka.net.bytes_in.rate
(gauge)
Incoming byte rate.
Shown as byte
kafka.net.bytes_out
(gauge)
Outgoing byte total.
Shown as byte
kafka.net.bytes_out.rate
(gauge)
Outgoing byte rate.
Shown as byte
kafka.net.bytes_rejected.rate
(gauge)
Rejected byte rate.
Shown as byte
kafka.net.processor.avg.idle.pct.rate
(gauge)
Average fraction of time the network processor threads are idle.
Shown as fraction
kafka.producer.available_buffer_bytes
(gauge)
The total amount of buffer memory that is not being used (either unallocated or in the free list)
Shown as byte
kafka.producer.batch_size_avg
(gauge)
The average number of bytes sent per partition per-request.
Shown as byte
kafka.producer.batch_size_max
(gauge)
The max number of bytes sent per partition per-request.
Shown as byte
kafka.producer.buffer_bytes_total
(gauge)
The maximum amount of buffer memory the client can use (whether or not it is currently used).
Shown as byte
kafka.producer.bufferpool_wait_time
(gauge)
The fraction of time an appender waits for space allocation.
kafka.producer.bytes_out
(gauge)
Producer bytes out rate.
Shown as byte
kafka.producer.compression_rate
(gauge)
The average compression rate of record batches for a topic
Shown as fraction
kafka.producer.compression_rate_avg
(rate)
The average compression rate of record batches.
Shown as fraction
kafka.producer.delayed_requests
(gauge)
Number of producer requests delayed.
Shown as request
kafka.producer.expires_per_seconds
(gauge)
Rate of producer request expiration.
Shown as eviction
kafka.producer.io_wait
(gauge)
Producer I/O wait time.
Shown as nanosecond
kafka.producer.message_rate
(gauge)
Producer message rate.
Shown as message
kafka.producer.metadata_age
(gauge)
The age in seconds of the current producer metadata being used.
Shown as second
kafka.producer.record_error_rate
(gauge)
The average per-second number of errored record sends for a topic
Shown as error
kafka.producer.record_queue_time_avg
(gauge)
The average time in ms record batches spent in the record accumulator.
Shown as millisecond
kafka.producer.record_queue_time_max
(gauge)
The maximum time in ms record batches spent in the record accumulator.
Shown as millisecond
kafka.producer.record_retry_rate
(gauge)
The average per-second number of retried record sends for a topic
Shown as record
kafka.producer.record_send_rate
(gauge)
The average number of records sent per second for a topic
Shown as record
kafka.producer.record_size_avg
(gauge)
The average record size.
Shown as byte
kafka.producer.record_size_max
(gauge)
The maximum record size.
Shown as byte
kafka.producer.records_per_request
(gauge)
The average number of records sent per second.
Shown as record
kafka.producer.request_latency_avg
(gauge)
Producer average request latency.
Shown as millisecond
kafka.producer.request_latency_max
(gauge)
The maximum request latency in ms.
Shown as millisecond
kafka.producer.request_rate
(gauge)
Number of producer requests per second.
Shown as request
kafka.producer.requests_in_flight
(gauge)
The current number of in-flight requests awaiting a response.
Shown as request
kafka.producer.response_rate
(gauge)
Number of producer responses per second.
Shown as response
kafka.producer.throttle_time_avg
(gauge)
The average time in ms a request was throttled by a broker.
Shown as millisecond
kafka.producer.throttle_time_max
(gauge)
The maximum time in ms a request was throttled by a broker.
Shown as millisecond
kafka.producer.waiting_threads
(gauge)
The number of user threads blocked waiting for buffer memory to enqueue their records.
Shown as thread
kafka.replication.active_controller_count
(gauge)
Number of active controllers in the cluster.
Shown as node
kafka.replication.isr_expands.rate
(gauge)
Rate of replicas joining the ISR pool.
Shown as node
kafka.replication.isr_shrinks.rate
(gauge)
Rate of replicas leaving the ISR pool.
Shown as node
kafka.replication.leader_count
(gauge)
Number of leaders on this broker.
Shown as node
kafka.replication.leader_elections.rate
(gauge)
Leader election rate.
Shown as event
kafka.replication.max_lag
(gauge)
Maximum lag in messages between the follower and leader replicas.
Shown as offset
kafka.replication.offline_partitions_count
(gauge)
Number of partitions that don't have an active leader.
kafka.replication.partition_count
(gauge)
Number of partitions across all topics in the cluster.
kafka.replication.unclean_leader_elections.rate
(gauge)
Unclean leader election rate.
Shown as event
kafka.replication.under_min_isr_partition_count
(gauge)
Number of under min ISR partitions.
kafka.replication.under_replicated_partitions
(gauge)
Number of under replicated partitions.
kafka.request.channel.queue.size
(gauge)
Number of queued requests.
Shown as request
kafka.request.fetch.failed.rate
(gauge)
Client fetch request failures rate.
Shown as request
kafka.request.fetch_consumer.rate
(gauge)
Fetch consumer requests rate.
Shown as request
kafka.request.fetch_consumer.time.99percentile
(gauge)
Total time in ms to serve the specified request.
Shown as millisecond
kafka.request.fetch_consumer.time.avg
(gauge)
Total time in ms to serve the specified request.
Shown as millisecond
kafka.request.fetch_follower.rate
(gauge)
Fetch follower requests rate.
Shown as request
kafka.request.fetch_follower.time.99percentile
(gauge)
Total time in ms to serve the specified request.
Shown as millisecond
kafka.request.fetch_follower.time.avg
(gauge)
Total time in ms to serve the specified request.
Shown as millisecond
kafka.request.fetch_request_purgatory.size
(gauge)
Number of requests waiting in the producer purgatory.
Shown as request
kafka.request.handler.avg.idle.pct.rate
(gauge)
Average fraction of time the request handler threads are idle.
Shown as fraction
kafka.request.metadata.time.99percentile
(gauge)
Time for metadata requests for 99th percentile.
Shown as millisecond
kafka.request.metadata.time.avg
(gauge)
Average time for metadata request.
Shown as millisecond
kafka.request.offsets.time.99percentile
(gauge)
Time for offset requests for 99th percentile.
Shown as millisecond
kafka.request.offsets.time.avg
(gauge)
Average time for an offset request.
Shown as millisecond
kafka.request.produce.failed.rate
(gauge)
Failed produce requests rate.
Shown as request
kafka.request.produce.rate
(gauge)
Produce requests rate.
Shown as request
kafka.request.produce.time.99percentile
(gauge)
Time for produce requests for 99th percentile.
Shown as millisecond
kafka.request.produce.time.avg
(gauge)
Average time for a produce request.
Shown as millisecond
kafka.request.producer_request_purgatory.size
(gauge)
Number of requests waiting in the producer purgatory
Shown as request
kafka.request.update_metadata.time.99percentile
(gauge)
Time for update metadata requests for 99th percentile.
Shown as millisecond
kafka.request.update_metadata.time.avg
(gauge)
Average time for a request to update metadata.
Shown as millisecond
kafka.server.socket.connection_count
(gauge)
Number of currently open connections to the broker.
Shown as connection
kafka.session.fetch.count
(gauge)
Number of fetch sessions.
kafka.session.fetch.eviction
(gauge)
Eviction rate of fetch session.
Shown as event
kafka.session.zookeeper.disconnect.rate
(gauge)
Zookeeper client disconnect rate.
Shown as event
kafka.session.zookeeper.expire.rate
(gauge)
Zookeeper client session expiration rate.
Shown as event
kafka.session.zookeeper.readonly.rate
(gauge)
Zookeeper client readonly rate.
Shown as event
kafka.session.zookeeper.sync.rate
(gauge)
Zookeeper client sync rate.
Shown as event
kafka.topic.messages_in.rate
(gauge)
Incoming message rate by topic
Shown as message
kafka.topic.net.bytes_in.rate
(gauge)
Incoming byte rate by topic.
Shown as byte
kafka.topic.net.bytes_out.rate
(gauge)
Outgoing byte rate by topic.
Shown as byte
kafka.topic.net.bytes_rejected.rate
(gauge)
Rejected byte rate by topic.
Shown as byte

Événements

Le check Kafka n’inclut aucun événement.

Checks de service

kafka.can_connect
Renvoie CRITICAL si l’Agent n’est pas capable de se connecter à l’instance Kafka qu’il surveille et d’y recueillir des métriques. Si ce n’est pas le cas, renvoie OK.
Statuses: ok, critical

Dépannage

Pour aller plus loin

Intégration Kafka Consumer

Dashboard Kafka

Présentation

Ce check d’Agent recueille uniquement les métriques pour les décalages de messages. Si vous souhaitez recueillir les métriques JMX sur des agents Kafka ou des consommateurs/producteurs basés sur Java, consultez le check Kafka.

Ce check récupère les décalages records des agents Kafka, les décalages des consommateurs qui sont stockés dans Kafka ou Zookeeper (pour ceux qui l’utilisent encore) et le retard des consommateurs calculé (qui correspond à la différence entre le décalage des agents et celui des consommateurs).

Remarque : cette intégration veille à ce que les décalages des consommateurs soient vérifiés avant les décalages des agents. Avec un tel procédé, dans le pire des cas, le retard des consommateurs est légèrement exagéré. L’approche inverse peut minimiser le retard des consommateurs, à tel point qu’il est possible d’atteindre des valeurs négatives. De telles valeurs apparaissent uniquement pour l’un des pires scénarios, à savoir lorsque des messages sont ignorés.

Configuration

Installation

Le check Kafka Consumer de l’Agent est inclus avec le package de l’Agent Datadog : vous n’avez donc rien d’autre à installer sur vos nœuds Kafka.

Configuration

Host

Pour configurer ce check lorsque l’Agent est exécuté sur un host :

Collecte de métriques
  1. Modifiez le fichier kafka_consumer.d/conf.yaml dans le dossier conf.d/ à la racine du répertoire de configuration de votre Agent. Consultez le fichier d’exemple kafka_consumer.d/conf.yaml pour découvrir toutes les options de configuration disponibles.

  2. Redémarrez l’Agent.

Collecte de logs

Ce check ne recueille aucun log supplémentaire. Pour recueillir des logs à partir de vos agents Kafka, consultez les instructions de collecte de logs pour Kafka.

Environnement conteneurisé

Pour les environnements conteneurisés, consultez le guide Autodiscovery avec JMX.

Validation

Lancez la sous-commande status de l’Agent et cherchez kafka_consumer dans la section Checks.

Données collectées

Métriques

kafka.broker_offset
(gauge)
Current message offset on broker.
Shown as offset
kafka.consumer_lag
(gauge)
Lag in messages between consumer and broker.
Shown as offset
kafka.consumer_offset
(gauge)
Current message offset on consumer.
Shown as offset
kafka.estimated_consumer_lag
(gauge)
Lag in seconds between consumer and broker.
Shown as second

Événements

consumer_lag :
L’Agent Datadog génère un événement lorsque la valeur de la métrique consumer_lag descend en dessous de 0, et lui ajoute les tags topic, partition et consumer_group.

Checks de service

Le check Kafka-consumer n’inclut aucun check de service.

Dépannage

Authentification à l’aide de la GSSAPI Kerberos

Selon la configuration Kerberos de votre cluster Kafka, vous devrez peut-être configurer les éléments suivants :

  • Le client Kafka permettant à l’Agent Datadog de se connecter à l’agent Kafka. Le client Kafka doit être ajouté en tant que principal Kerberos, puis ajouté à un keytab Kerberos. Le client Kafka doit également posséder un ticket Kerberos valide.
  • Le certificat TLS permettant d’authentifier une connexion sécurisée à l’agent Kafka.
    • Si le keystore JKS est utilisé, un certificat doit être exporté à partir de ce keystore et le chemin du fichier doit être configuré avec les options tls_cert et/ou tls_ca_cert applicables.
    • Si une clé privée est requise pour authentifier le certificat, elle doit être configurée avec l’option tls_private_key. Le mot de passe de la clé privée doit être configuré avec l’option tls_private_key_password, le cas échéant.
  • La variable d’environnement KRB5_CLIENT_KTNAME pointant vers l’emplacement du keytab Kerberos du client Kafka, si son emplacement ne correspond pas au chemin par défaut (par exemple, KRB5_CLIENT_KTNAME=/etc/krb5.keytab).
  • La variable d’environnement KRB5CCNAME pointant vers le cache du ticket des identifiants Kerberos du client Kafka, si son emplacement ne correspond pas au chemin par défaut (par exemple, KRB5CCNAME=/tmp/krb5cc_xxx).
  • Si l’Agent Datadog ne peut pas accéder aux variables d’environnement, définissez-les dans un fichier de remplacement de la configuration du service de l’Agent Datadog pour votre système d’exploitation. La procédure de modification du fichier d’unité du service de l’Agent Datadog peut varier selon la version du système d’exploitation Linux utilisée. Par exemple, dans un environnement Linux systemd :

Exemple de Systemd Linux

  1. Configurez les variables d’environnement dans un fichier d’environnement. Par exemple : /chemin/vers/fichier/environnement
KRB5_CLIENT_KTNAME=/etc/krb5.keytab
KRB5CCNAME=/tmp/krb5cc_xxx
  1. Créez un fichier de remplacement de la configuration du service de l’Agent Datadog : sudo systemctl edit datadog-agent.service.

  2. Configurez ce qui suit dans le fichier de remplacement :

[Service]
EnvironmentFile=/path/to/environment/file
  1. Exécutez les commandes suivantes pour recharger le daemon systemd, le service datadog-agent et l’Agent Datadog :
sudo systemctl daemon-reload
sudo systemctl restart datadog-agent.service
sudo service datadog-agent restart

Pour aller plus loin