Flink

Supported OS Linux Mac OS Windows

Présentation

Ce check surveille Flink. Datadog recueille des métriques Flink via le HTTP Reporter Datadog de Flink, qui repose sur l’API HTTP de Datadog.

Configuration

Installation

Le check Flink est inclus avec le package de l’Agent Datadog. Vous n’avez donc rien d’autre à installer sur votre serveur.

Configuration

Collecte de métriques

  1. Configurez le HTTP Reporter Datadog dans Flink.

    Copiez <RÉPERTOIRE_FLINK>/opt/flink-metrics-datadog-<VERSION_REPORTER_DATADOG>.jar dans votre dossier <RÉPERTOIRE_FLINK>/lib. Ajoutez les lignes suivantes dans votre fichier <RÉPERTOIRE_FLINK>/conf/flink-conf.yaml, en remplaçant <DATADOG_API_KEY> par votre clé d’API Datadog :

    metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
    metrics.reporter.dghttp.apikey: <DATADOG_API_KEY>
    metrics.reporter.dghttp.dataCenter: 
    
  2. Remappez les contextes système dans votre fichier <RÉPERTOIRE_FLINK>/conf/flink-conf.yaml.

    metrics.scope.jm: flink.jobmanager
    metrics.scope.jm.job: flink.jobmanager.job
    metrics.scope.tm: flink.taskmanager
    metrics.scope.tm.job: flink.taskmanager.job
    metrics.scope.task: flink.task
    metrics.scope.operator: flink.operator
    

    Remarque : vous devez remapper les contextes système pour envoyer des métriques Flink. Sans cela, les données sont envoyées sous la forme de métriques custom.

  3. Configurez des tags supplémentaires dans le fichier <RÉPERTOIRE_FLINK>/conf/flink-conf.yaml. L’exemple ci-dessous définit des tags personnalisés :

    metrics.reporter.dghttp.tags: <KEY1>:<VALUE1>, <KEY1>:<VALUE2>
    

    Remarque : par défaut, toutes les variables des noms de métriques sont envoyées sous la forme de tags. Vous n’avez donc pas besoin d’ajouter de tags personnalisés pour job_id, task_id, etc.

  4. Redémarrez Flink pour commencer à envoyer vos métriques Flink à Datadog.

Collecte de logs

Disponible à partir de la version > 6.0 de l’Agent

  1. Flink utilise le logger log4j par défaut. Pour activer la journalisation dans un fichier et personnaliser le format, modifiez le fichier log4j.properties, log4j-cli.properties, log4j-yarn-session.properties ou log4j-console.properties. Consultez la documentation de Flink (en anglais) pour obtenir les configurations par défaut. Voici la configuration par défaut de log4j.properties :

    log4j.appender.file=org.apache.log4j.FileAppender
    log4j.appender.file.file=${log.file}
    log4j.appender.file.append=false
    log4j.appender.file.layout=org.apache.log4j.PatternLayout
    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    
  2. Par défaut, notre pipeline d’intégration prend en charge l’expression de conversion suivante :

    %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    

    2020-02-03 18:43:12,251 est un exemple d’horodatage valide.

    Dupliquez et modifiez le pipeline d’intégration si vous utilisez un autre format.

  3. La collecte de logs est désactivée par défaut dans l’Agent Datadog. Vous devez l’activer dans datadog.yaml :

    logs_enabled: true
    
  4. Supprimez la mise en commentaire du bloc de configuration des logs du fichier flink.d/conf.yaml et modifiez les paramètres. Modifiez les valeurs des paramètres path et service en fonction de votre environnement. Consultez le fichier d’exemple flink.d/conf.yaml pour découvrir toutes les options de configuration disponibles.

    logs:
      - type: file
        path: /var/log/flink/server.log
        source: flink
        service: myapp
        #To handle multi line that starts with yyyy-mm-dd use the following pattern
        #log_processing_rules:
        #  - type: multi_line
        #    pattern: \d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])
        #    name: new_log_start_with_date
    
  5. Redémarrez l’Agent.

Validation

Lancez la sous-commande status de l’Agent et cherchez flink dans la section Checks.

Données collectées

Métriques

flink.jobmanager.Status.JVM.CPU.Load
(gauge)
The recent CPU usage of the JVM in the jobmanager
Shown as percent
flink.jobmanager.Status.JVM.CPU.Time
(gauge)
The CPU time used by the JVM in the jobmanager
Shown as second
flink.jobmanager.Status.JVM.ClassLoader.ClassesLoaded
(count)
The total number of classes loaded since the start of the JVM in the jobmanager
flink.jobmanager.Status.JVM.ClassLoader.ClassesUnloaded
(count)
The total number of classes unloaded since the start of the JVM in the jobmanager
flink.jobmanager.Status.JVM.Memory.Direct.Count
(count)
The number of buffers in the direct buffer pool in the jobmanager
Shown as buffer
flink.jobmanager.Status.JVM.Memory.Direct.MemoryUsed
(gauge)
The amount of memory used by the JVM for the direct buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Direct.TotalCapacity
(count)
The total capacity of all buffers in the direct buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Heap.Committed
(gauge)
The amount of heap memory guaranteed to be available to the JVM in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Heap.Max
(gauge)
The maximum amount of heap memory that can be used for memory management in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Heap.Used
(gauge)
The amount of heap memory currently used in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Mapped.Count
(gauge)
The number of buffers in the mapped buffer pool in the jobmanager
Shown as buffer
flink.jobmanager.Status.JVM.Memory.Mapped.MemoryUsed
(gauge)
The amount of memory used by the JVM for the mapped buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Mapped.TotalCapacity
(count)
The total capacity of all buffers in the mapped buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Committed
(gauge)
The amount of non-heap memory guaranteed to be available to the JVM in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Max
(gauge)
The maximum amount of non-heap memory that can be used for memory management in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Used
(gauge)
The amount of non-heap memory currently used in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Threads.Count
(count)
The total number of live threads in the jobmanager
Shown as thread
flink.jobmanager.job.downtime
(gauge)
For jobs currently in a failing/recovering situation- the time elapsed during this outage. Returns 0 for running jobs and -1 for completed jobs
Shown as millisecond
flink.jobmanager.job.lastCheckpointAlignmentBuffered
(gauge)
The number of buffered bytes during alignment over all subtasks for the last checkpoint
Shown as byte
flink.jobmanager.job.lastCheckpointDuration
(gauge)
The time it took to complete the last checkpoint
Shown as millisecond
flink.jobmanager.job.lastCheckpointExternalPath
(gauge)
The path where the last external checkpoint was stored
flink.jobmanager.job.lastCheckpointRestoreTimestamp
(gauge)
Timestamp when the last checkpoint was restored at the coordinator
Shown as millisecond
flink.jobmanager.job.lastCheckpointSize
(gauge)
The total size of the last checkpoint
Shown as byte
flink.jobmanager.job.numRestarts
(gauge)
The total number of restarts since this job was submitted, including full restarts and fine-grained restarts
flink.jobmanager.job.numberOfCompletedCheckpoints
(count)
The number of successfully completed checkpoints
flink.jobmanager.job.numberOfFailedCheckpoints
(count)
The number of failed checkpoints
flink.jobmanager.job.numberOfInProgressCheckpoints
(gauge)
The number of in progress checkpoints
flink.jobmanager.job.restartingTime
(gauge)
The time it took to restart the job or how long the current restart has been in progress
Shown as millisecond
flink.jobmanager.job.totalNumberOfCheckpoints
(count)
The number of total checkpoints (in progress completed and failed)
flink.jobmanager.job.uptime
(gauge)
The time that the job has been running without interruption. Returns -1 for completed jobs
Shown as millisecond
flink.jobmanager.numRegisteredTaskManagers
(gauge)
The number of registered taskmanagers
flink.jobmanager.numRunningJobs
(gauge)
The number of running jobs
Shown as job
flink.jobmanager.taskSlotsTotal
(gauge)
The total number of task slots
flink.operator.commitsFailed
(count)
The total number of offset commit failures to Kafka if offset committing is turned on and checkpointing is enabled. Note that committing offsets back to Kafka is only a means to expose consumer progress so a commit failure does not affect the integrity of Flink's checkpointed partition offsets
Shown as commit
flink.operator.commitsSucceeded
(count)
The total number of successful offset commits to Kafka if offset committing is turned on and checkpointing is enabled
Shown as commit
flink.operator.currentInput1Watermark
(gauge)
The last watermark this operator has received in its first input. Only for operators with 2 inputs
Shown as millisecond
flink.operator.currentInput2Watermark
(gauge)
The last watermark this operator has received in its second input. Only for operators with 2 inputs
Shown as millisecond
flink.operator.currentInputWatermark
(gauge)
The last watermark this operator has received. For tasks with 2 inputs this is the minimum of the last received watermarks
Shown as millisecond
flink.operator.currentOutputWatermark
(gauge)
The last watermark this operator has emitted
Shown as millisecond
flink.operator.numLateRecordsDropped
(count)
The number of records this operator has dropped due to arriving late
Shown as record
flink.operator.numRecordsIn
(count)
The total number of records this operator has received
Shown as record
flink.operator.numRecordsInPerSecond
(gauge)
The number of records this operator receives per second
Shown as record
flink.operator.numRecordsOut
(count)
The total number of records this operator has emitted
Shown as record
flink.operator.numRecordsOutPerSec
(gauge)
The total number of records this operator has emitted per second
Shown as record
flink.operator.numSplitsProcessed
(count)
The total number of InputSplits this data source has processed (if the operator is a data source)
flink.task.Shuffle.Netty.Input.Buffers.inPoolUsage
(gauge)
An estimate of the input buffers usage
flink.task.Shuffle.Netty.Input.Buffers.inputQueueLength
(gauge)
The number of queued input buffers
Shown as buffer
flink.task.Shuffle.Netty.Input.numBuffersInLocal
(count)
The total number of network buffers this task has read from a local source
Shown as buffer
flink.task.Shuffle.Netty.Input.numBuffersInLocalPerSecond
(gauge)
The number of network buffers this task reads from a local source per second
flink.task.Shuffle.Netty.Input.numBuffersInRemote
(count)
The total number of network buffers this task has read from a remote source
Shown as buffer
flink.task.Shuffle.Netty.Input.numBuffersInRemotePerSecond
(gauge)
The number of network buffers this task reads from a remote source per second
Shown as buffer
flink.task.Shuffle.Netty.Input.numBytesInLocal
(count)
The total number of bytes this task has read from a local source
Shown as byte
flink.task.Shuffle.Netty.Input.numBytesInLocalPerSecond
(gauge)
The number of bytes this task reads from a local source per second
Shown as byte
flink.task.Shuffle.Netty.Input.numBytesInRemote
(count)
The total number of bytes this task has read from a remote source
Shown as byte
flink.task.Shuffle.Netty.Input.numBytesInRemotePerSecond
(gauge)
The number of bytes this task reads from a remote source per second
Shown as byte
flink.task.Shuffle.Netty.Output.Buffers.outPoolUsage
(gauge)
An estimate of the output buffers usage
flink.task.Shuffle.Netty.Output.Buffers.outputQueueLength
(gauge)
The number of queued output buffers
Shown as buffer
flink.task.checkpointAlignmentTime
(gauge)
The time in nanoseconds that the last barrier alignment took to complete or how long the current alignment has taken so far
Shown as nanosecond
flink.task.currentInputWatermark
(gauge)
The last watermark this task has received. For tasks with 2 inputs this is the minimum of the last received watermarks
Shown as millisecond
flink.task.numBuffersOut
(count)
The total number of network buffers this task has emitted
Shown as buffer
flink.task.numBuffersOutPerSecond
(gauge)
The number of network buffers this task emits per second
Shown as buffer
flink.task.numBytesOut
(count)
The total number of bytes this task has emitted
Shown as byte
flink.task.numBytesOutPerSecond
(gauge)
The number of bytes this task emits per second
Shown as byte
flink.task.numLateRecordsDropped
(count)
The number of records this task has dropped due to arriving late
Shown as record
flink.task.numRecordsIn
(count)
The total number of records this task has received
Shown as record
flink.task.numRecordsInPerSecond
(gauge)
The number of records this task receives per second
Shown as record
flink.task.numRecordsOut
(count)
The total number of records this task has emitted
Shown as record
flink.task.numRecordsOutPerSec
(gauge)
The total number of records this task has emitted per second
Shown as record
flink.taskmanager.Status.JVM.CPU.Load
(gauge)
The recent CPU usage of the JVM in the taskmanager
Shown as percent
flink.taskmanager.Status.JVM.CPU.Time
(gauge)
The CPU time used by the JVM in the taskmanager
Shown as second
flink.taskmanager.Status.JVM.ClassLoader.ClassesLoaded
(count)
The total number of classes loaded since the start of the JVM in the taskmanager
flink.taskmanager.Status.JVM.ClassLoader.ClassesUnloaded
(count)
The total number of classes unloaded since the start of the JVM in the taskmanager
flink.taskmanager.Status.JVM.Memory.Direct.Count
(gauge)
The number of buffers in the direct buffer pool in the taskmanager
Shown as buffer
flink.taskmanager.Status.JVM.Memory.Direct.MemoryUsed
(gauge)
The amount of memory used by the JVM for the direct buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Direct.TotalCapacity
(count)
The total capacity of all buffers in the direct buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Heap.Committed
(gauge)
The amount of heap memory guaranteed to be available to the JVM in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Heap.Max
(gauge)
The maximum amount of heap memory that can be used for memory management in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Heap.Used
(gauge)
The amount of heap memory currently used in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Mapped.Count
(gauge)
The number of buffers in the mapped buffer pool in the taskmanager
flink.taskmanager.Status.JVM.Memory.Mapped.MemoryUsed
(gauge)
The amount of memory used by the JVM for the mapped buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Mapped.TotalCapacity
(count)
The total capacity of all buffers in the mapped buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.NonHeap.Committed
(gauge)
The amount of non-heap memory guaranteed to be available to the JVM in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.NonHeap.Max
(gauge)
The maximum amount of non-heap memory that can be used for memory management in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.NonHeap.Used
(gauge)
The amount of non-heap memory currently used in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Threads.Count
(count)
The total number of live threads in the taskmanager
Shown as thread
flink.taskmanager.Status.Shuffle.Netty.AvailableMemorySegments
(gauge)
The number of unused memory segments in the taskmanager
flink.taskmanager.Status.Shuffle.Netty.TotalMemorySegments
(gauge)
The number of allocated memory segments in the taskmanager

Checks de service

Flink n’inclut aucun check de service.

Événements

Flink n’inclut aucun événement.

Dépannage

Besoin d’aide ? Contactez l’assistance Datadog.