Supported OS

Versión de la integración3.0.0

Información general

Este check monitoriza Flink. Datadog recopila las métricas de Flink a través de Flink Datadog HTTP Reporter, que utiliza la [API HTTP de Datadog] (https://docs.datadoghq.com/api/?lang=bash#api-reference).

Configuración

Instalación

El check de Flink está incluido en el paquete (https://app.datadoghq.com/account/settings/agent/latest) del [Datadog Agent]. No es necesaria ninguna instalación adicional en tu servidor.

Configuración

Recopilación de métricas

  1. Configura el StatsD reporter en Flink. En tu <FLINK_HOME>/conf/flink-conf.yaml, añade estas líneas:
    metrics.reporter.stsd.factory.class: org.apache.flink.metrics.statsd.StatsDReporterFactory
    metrics.reporter.stsd.host: datadog-agent
    metrics.reporter.stsd.port: 8125
    metrics.reporter.stsd.interval: 60 SECONDS 
    
  2. Asegúrate de que DogStatsD/StatsD esté habilitado en el Datadog Agent y considera la posibilidad de configurar DD_DOGSTATSD_NON_LOCAL_TRAFFIC=true para entornos en contenedores.
  3. Reinicia Flink para empezar a enviar tus métricas de Flink a Datadog.

  1. Configura el Datadog HTTP Reporter en Flink.

    En tu <FLINK_HOME>/conf/flink-conf.yaml, añade estas líneas, sustituyendo <DATADOG_API_KEY> por tu [clave de API] de Datadog (https://app.datadoghq.com/organization-settings/api-keys):

    metrics.reporter.dghttp.factory.class: org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
    metrics.reporter.dghttp.apikey: <DATADOG_API_KEY>
    metrics.reporter.dghttp.dataCenter: US #(optional) The data center (EU/US) to connect to, defaults to US.
    
  2. Vuelve a asignar contextos de sistema en tu <FLINK_HOME>/conf/flink-conf.yaml.

    metrics.scope.jm: flink.jobmanager
    metrics.scope.jm.job: flink.jobmanager.job
    metrics.scope.tm: flink.taskmanager
    metrics.scope.tm.job: flink.taskmanager.job
    metrics.scope.task: flink.task
    metrics.scope.operator: flink.operator
    

    Nota: Los ámbitos del sistema deben reasignarse para que tus métricas de Flink sean compatibles, de lo contrario se envían como métricas personalizadas.

  3. Configura [etiquetas (tags)] adicionales (https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/metric_reporters/#datadog) en <FLINK_HOME>/conf/flink-conf.yaml. He aquí un ejemplo de etiquetas (tags) personalizadas:

    metrics.reporter.dghttp.scope.variables.additional: <KEY1>:<VALUE1>, <KEY1>:<VALUE2>
    

    Nota: En forma predeterminada, cualquier variable en los nombres de las métricas se envía como etiquetas (tags), por lo que no es necesario añadir etiquetas (tags) personalizadas para job_id, task_id, etc.

  4. Reinicia Flink para empezar a enviar tus métricas de Flink a Datadog.

Recopilación de logs

Disponible para la versión 6.0 o posteriores del Agent

  1. Flink utiliza el registrador log4j en forma predeterminada. Para habilitar el registro en un archivo, personaliza el formato editando los archivos de configuración log4j*.properties en el directorio conf/ de la distribución de Flink. Consulta la documentación de registro de Flink para obtener información sobre qué archivo de configuración es relevante para tu configuración. Consulta repositorio de Flink para configuraciones predeterminadas.

  2. Por defecto, el pipeline de la integración admite el siguiente patrón de diseño:

    %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    

    Un ejemplo de marca de tiempo válida es: 2020-02-03 18:43:12,251.

    Clona y edita el pipeline de integración si tienes un formato diferente.

  3. La recopilación de logs está desactivada en forma predeterminada en el Datadog Agent, actívala en tu archivo datadog.yaml:

    logs_enabled: true
    
  4. Quita los comentarios y edita el bloque de configuración de logs en tu archivo flink.d/conf.yaml. Cambia los valores de los parámetros path y service en función de tu entorno. Consulta ejemplo de flink.d/conf.yaml para ver todas las opciones de configuración disponibles.

    logs:
      - type: file
        path: /var/log/flink/server.log
        source: flink
        service: myapp
        #To handle multi line that starts with yyyy-mm-dd use the following pattern
        #log_processing_rules:
        #  - type: multi_line
        #    pattern: \d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])
        #    name: new_log_start_with_date
    
  5. Reinicia el Agent.

Validación

[Ejecuta el subcomando de estado del Agent(https://docs.datadoghq.com/agent/guide/agent-commands/#agent-status-and-information) y busque flink en la sección Checks.

Datos recopilados

Métricas

flink.jobmanager.Status.JVM.CPU.Load
(medidor)
El uso reciente de la CPU de la JVM en el jobmanager
Mostrado como porcentaje.
flink.jobmanager.Status.JVM.CPU.Time
(medidor)
El tiempo de la CPU utilizado por la JVM en el jobmanager
Mostrado en segundos.
flink.jobmanager.Status.JVM.ClassLoader.ClassesLoaded
(count)
El número total de clases cargadas desde el inicio de la JVM en el jobmanager.
flink.jobmanager.Status.JVM.ClassLoader.ClassesUnloaded
(count)
El número total de clases descargadas desde el inicio de la JVM en el jobmanager.
flink.jobmanager.Status.JVM.Memory.Direct.Count
(count)
El número de búferes en la mezcla directa de búferes en el jobmanager
Mostrado como búfer
flink.jobmanager.Status.JVM.Memory.Direct.MemoryUsed
(medidor)
La cantidad de memoria utilizada por la JVM para la mezcla directa de búferes en el jobmanager
Mostrado como byte
flink.jobmanager.Status.JVM.Memory.Direct.TotalCapacity
(count)
La capacidad total de todos los búferes de la mezcla directa de búferes en el jobmanager
Mostrado como byte
flink.jobmanager.Status.JVM.Memory.Heap.Committed
(medidor)
La cantidad de memoria heap garantizada para estar disponible para la JVM en el jobmanager
Mostrado como byte
flink.jobmanager.Status.JVM.Memory.Heap.Max
(medidor)
La cantidad máxima de memoria heap que se puede utilizar para la gestión de memoria en el jobmanager
Mostrado como byte
flink.jobmanager.Status.JVM.Memory.Heap.Used
(medidor)
La cantidad de memoria heap utilizada actualmente en el jobmanager
Mostrado como byte
flink.jobmanager.Status.JVM.Memory.Mapped.Count
(medidor)
El número de búferes en la mezcla de búferes asignada en el jobmanager
Mostrado como búfer
flink.jobmanager.Status.JVM.Memory.Mapped.MemoryUsed
(medidor)
La cantidad de memoria utilizada por la JVM para la mezcla de búferes asignada en el jobmanager
Mostrado como byte
flink.jobmanager.Status.JVM.Memory.Mapped.TotalCapacity
(count)
La capacidad total de todos los búferes de la mezcla de búferes asignada en el jobmanager
Mostrado como byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Committed
(medidor)
La cantidad de memoria no-Heap garantizada para estar disponible para la JVM en el jobmanager
Mostrado como byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Max
(medidor)
La cantidad máxima de memoria no-Heap que se puede utilizar para la gestión de memoria en el jobmanager
Mostrado como byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Used
(medidor)
La cantidad de memoria no-heap utilizada actualmente en el jobmanager
Mostrado como byte
flink.jobmanager.Status.JVM.Threads.Count
(count)
El número total de hilos activos en el jobmanager
Mostrado como hilo
flink.jobmanager.job.downtime
(medidor)
Para trabajos actualmente en situación de fallo/recuperación- el tiempo transcurrido durante esta interrupción. Devuelve 0 para trabajos en ejecución y -1 para trabajos finalizados
Se muestra en milisegundos
flink.jobmanager.job.lastCheckpointAlignmentBuffered
(medidor)
El número de bytes almacenados en búfer durante la alineación sobre todas las subtareas para el último punto de control
Mostrado como byte
flink.jobmanager.job.lastCheckpointDuration
(medidor)
El tiempo que se tardó en finalizar el último punto de control
Se muestra en milisegundos
flink.jobmanager.job.lastCheckpointExternalPath
(medidor)
La ruta donde se almacenó el último punto de control externo
flink.jobmanager.job.lastCheckpointRestoreTimestamp
(medidor)
Marca de tiempo en la que se restauró el último punto de control en el coordinador
Mostrado en milisegundos
flink.jobmanager.job.lastCheckpointSize
(medidor)
El tamaño total del último punto de control
Mostrado como byte
flink.jobmanager.job.numRestarts
(medidor)
El número total de reinicios desde que se envió este job, incluidos los reinicios completos y los reinicios precisos.
flink.jobmanager.job.numberOfCompletedCheckpoints
(count)
Número de puntos de control finalizados con éxito
flink.jobmanager.job.numberOfFailedCheckpoints
(count)
Número de puntos de control fallidos
flink.jobmanager.job.numberOfInProgressCheckpoints
(medidor)
Número de puntos de control en curso
flink.jobmanager.job.restartingTime
(medidor)
El tiempo que ha tardado en reiniciarse job o el tiempo que lleva en curso el reinicio actual
Mostrado en milisegundos
flink.jobmanager.job.totalNumberOfCheckpoints
(count)
Número total de puntos de control (en curso, finalizados y fallidos)
flink.jobmanager.job.uptime
(medidor)
El tiempo que el job se ha estado ejecutando sin interrupción. Devuelve -1 para trabajos finalizados
Mostrado en milisegundos.
flink.jobmanager.numRegisteredTaskManagers
(medidor)
Número de gestores de tareas registrados
flink.jobmanager.numRunningJobs
(medidor)
El número de trabajos en ejecución
Mostrado com job
flink.jobmanager.taskSlotsTotal
(medidor)
El número total de ranuras de tareas
flink.operator.commitsFailed
(count)
El número total de fallos en la confirmaciones de compensaciones a Kafka si la confirmación de compensaciones está activada y la comprobación de puntos de control está habilitada. Ten en cuenta que la confirmación de compensaciones a Kafka es sólo un medio para exponer el progreso del consumidor, por lo que un fallo de confirmación no afecta a la integridad de las compensaciones de partición comprobadas de Flink
Mostrado como confirmación.
flink.operator.commitsSucceeded
(count)
El número total de confirmaciones de compensaciones realizadas con éxito en Kafka si la confirmación de compensación está activada y la comprobación de puntos de control está activada
Mostrado como confirmación
flink.operator.currentInput1Watermark
(medidor)
La última marca de agua que este operador ha recibido en su primera entrada. Sólo para operadores con 2 entradas
Mostrado como milisegundo.
flink.operator.currentInput2Watermark
(medidor)
La última marca de agua que este operador ha recibido en su segunda entrada. Sólo para operadores con 2 entradas
Mostrado como milisegundo.
flink.operator.currentInputWatermark
(medidor)
La última marca de agua que ha recibido este operador. Para tareas con 2 entradas, es el mínimo de las últimas marcas de agua recibidas
Mostrado como milisegundo.
flink.operator.currentOutputWatermark
(medidor)
La última marca de agua que ha emitido este operador
Mostrado como milisegundo
flink.operator.numLateRecordsDropped
(count)
El número de registros que este operador ha abandonado por llegar tarde
Mostrado como registro
flink.operator.numRecordsIn
(count)
El número total de registros que ha recibido este operador
Mostrado como registro
flink.operator.numRecordsInPerSecond
(medidor)
El número de registros que recibe este operador por segundo
Mostrado como registro
flink.operator.numRecordsOut
(count)
El número total de registros que ha emitido este operador
Mostrado como registro
flink.operator.numRecordsOutPerSec
(medidor)
El número total de registros que este operador ha emitido por segundo
Mostrado como registro
flink.operator.numSplitsProcessed
(count)
El número total de InputSplits que este source (fuente) de datos ha procesado (si el operador es un source (fuente) de datos)
flink.task.Shuffle.Netty.Input.Buffers.inPoolUsage
(medidor)
Una estimación del uso de los búferes de entrada
flink.task.Shuffle.Netty.Input.Buffers.inputQueueLength
(medidor)
El número de búferes de entrada en cola
Mostrado como búfer
flink.task.Shuffle.Netty.Input.numBuffersInLocal
(count)
El número total de búferes de red que esta tarea ha leído de un source (fuente) local
Mostrado como búfer
flink.task.Shuffle.Netty.Input.numBuffersInLocalPerSecond
(medidor)
El número de búferes de red que esta tarea lee de un source (fuente) local por segundo.
flink.task.Shuffle.Netty.Input.numBuffersInRemote
(count)
El número total de búferes de red que esta tarea ha leído de un source (fuente) remoto
Mostrado como búfer
flink.task.Shuffle.Netty.Input.numBuffersInRemotePerSecond
(medidor)
El número de búferes de red que esta tarea lee de un source (fuente) remoto por segundo
Mostrado como búfer
flink.task.Shuffle.Netty.Input.numBytesInLocal
(count)
El número total de bytes que esta tarea ha leído de un source (fuente) local
Mostrado como byte
flink.task.Shuffle.Netty.Input.numBytesInLocalPerSecond
(medidor)
El número de bytes que esta tarea lee de un source (fuente) local por segundo
Mostrado como byte
flink.task.Shuffle.Netty.Input.numBytesInRemote
(count)
El número total de bytes que esta tarea ha leído de un source (fuente) remoto
Mostrado como byte
flink.task.Shuffle.Netty.Input.numBytesInRemotePerSecond
(medidor)
El número de bytes que esta tarea lee de un source (fuente) remoto por segundo
Mostrado como byte
flink.task.Shuffle.Netty.Output.Buffers.outPoolUsage
(medidor)
Una estimación del uso de los búferes de salida
flink.task.Shuffle.Netty.Output.Buffers.outputQueueLength
(medidor)
El número de búferes de salida en cola
Mostrado como búfer
flink.task.checkpointAlignmentTime
(medidor)
El tiempo en nanosegundos que tardó en finalizarse la última alineación de barrera o cuánto ha tardado hasta ahora la alineación actual
Mostrado como nanosegundo
flink.task.currentInputWatermark
(medidor)
La última marca de agua que ha recibido esta tarea. Para tareas con 2 entradas, es el mínimo de las últimas marcas de agua recibidas
Mostrado como milisegundo.
flink.task.numBuffersOut
(count)
El número total de búferes de red que esta tarea ha emitido
Mostrado como búfer
flink.task.numBuffersOutPerSecond
(medidor)
El número de búferes de red que esta tarea emite por segundo
Mostrado como búfer
flink.task.numBytesOut
(count)
El número total de bytes que esta tarea ha emitido
Mostrado como byte
flink.task.numBytesOutPerSecond
(medidor)
El número de bytes que esta tarea emite por segundo
Mostrado como byte
flink.task.numLateRecordsDropped
(count)
El número de registros que esta tarea ha abandonado por llegar tarde
Mostrado como registro
flink.task.numRecordsIn
(count)
El número total de registros que ha recibido esta tarea
Mostrado como registro
flink.task.numRecordsInPerSecond
(medidor)
El número de registros que esta tarea recibe por segundo
Mostrado como registro
flink.task.numRecordsOut
(count)
El número total de registros que esta tarea ha emitido
Mostrado como registro
flink.task.numRecordsOutPerSec
(medidor)
El número total de registros que esta tarea ha emitido por segundo
Mostrado como registro
flink.taskmanager.Status.JVM.CPU.Load
(medidor)
El uso reciente de la CPU de la JVM en el administrador de tareas
Mostrado como porcentaje.
flink.taskmanager.Status.JVM.CPU.Time
(medidor)
El tiempo de la CPU utilizado por la JVM en el administrador de tareas
Mostrado en segundos.
flink.taskmanager.Status.JVM.ClassLoader.ClassesLoaded
(count)
El número total de clases cargadas desde el inicio de la JVM en el administrador de tareas.
flink.taskmanager.Status.JVM.ClassLoader.ClassesUnloaded
(count)
El número total de clases descargadas desde el inicio de la JVM en el administrador de tareas.
flink.taskmanager.Status.JVM.Memory.Direct.Count
(medidor)
El número de búferes en la mezcla directa de búferes en el administrador de tareas
Mostrado como búfer
flink.taskmanager.Status.JVM.Memory.Direct.MemoryUsed
(medidor)
La cantidad de memoria utilizada por la JVM para la mezcla directa de búferes en el administrador de tareas
Mostrado como byte
flink.taskmanager.Status.JVM.Memory.Direct.TotalCapacity
(count)
La capacidad total de todos los búferes en la mezcla directa de búferes en el administrador de tareas
Mostrado como byte
flink.taskmanager.Status.JVM.Memory.Heap.Committed
(medidor)
La cantidad de memoria heap garantizada para estar disponible para la JVM en el administrador de tareas
Mostrado como byte
flink.taskmanager.Status.JVM.Memory.Heap.Max
(medidor)
La cantidad máxima de memoria heap que se puede utilizar para la gestión de memoria en el administrador de tareas
Mostrado como byte
flink.taskmanager.Status.JVM.Memory.Heap.Used
(medidor)
La cantidad de memoria heap utilizada actualmente en el administrador de tareas
Mostrado como byte
flink.taskmanager.Status.JVM.Memory.Mapped.Count
(medidor)
El número de búferes en la mezcla de búferes asignados en el administrador de tareas.
flink.taskmanager.Status.JVM.Memory.Mapped.MemoryUsed
(medidor)
La cantidad de memoria utilizada por la JVM para la mezcla de búferes asignados en el administrador de tareas
Mostrado como byte
flink.taskmanager.Status.JVM.Memory.Mapped.TotalCapacity
(count)
La capacidad total de todos los búferes en la mezcla de búferes asignados en el administrador de tareas
Mostrado como byte
flink.taskmanager.Status.JVM.Memory.NonHeap.Committed
(medidor)
La cantidad de memoria no-Heap garantizada para estar disponible para la JVM en el administrador de tareas
Se muestra como byte.
flink.taskmanager.Status.JVM.Memory.NonHeap.Max
(medidor)
La cantidad máxima de memoria no-Heap que se puede utilizar para la gestión de memoria en el administrador de tareas
Mostrado como byte.
flink.taskmanager.Status.JVM.Memory.NonHeap.Used
(medidor)
La cantidad de memoria no-Heap utilizada actualmente en el administrador de tareas
Mostrado como byte
flink.taskmanager.Status.JVM.Threads.Count
(count)
El número total de hilos activos en el administrador de tareas
Mostrado como hilo
flink.taskmanager.Status.Shuffle.Netty.AvailableMemorySegments
(medidor)
El número de segmentos de memoria no utilizados en el administrador de tareas
flink.taskmanager.Status.Shuffle.Netty.TotalMemorySegments
(medidor)
El número de segmentos de memoria asignados en el administrador de tareas

Checks de servicio

Flink no incluye checks de servicio.

Eventos

Flink no incluye eventos.

Solucionar problemas

¿Necesitas ayuda? Ponte en contacto con asistencia técnica de Datadog.