Flink

Supported OS Linux Windows Mac OS

Integration version2.0.0

Overview

This check monitors Flink. Datadog collects Flink metrics through Flink’s Datadog HTTP Reporter, which uses Datadog’s HTTP API.

Setup

Installation

The Flink check is included in the Datadog Agent package. No additional installation is needed on your server.

Configuration

Metric collection

  1. Configure the Datadog HTTP Reporter in Flink.

    In your <FLINK_HOME>/conf/flink-conf.yaml, add these lines, replacing <DATADOG_API_KEY> with your Datadog API key:

    metrics.reporter.dghttp.factory.class: org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
    metrics.reporter.dghttp.apikey: <DATADOG_API_KEY>
    metrics.reporter.dghttp.dataCenter: US #(optional) The data center (EU/US) to connect to, defaults to US.
    
  2. Re-map system scopes in your <FLINK_HOME>/conf/flink-conf.yaml.

    metrics.scope.jm: flink.jobmanager
    metrics.scope.jm.job: flink.jobmanager.job
    metrics.scope.tm: flink.taskmanager
    metrics.scope.tm.job: flink.taskmanager.job
    metrics.scope.task: flink.task
    metrics.scope.operator: flink.operator
    

    Note: The system scopes must be remapped for your Flink metrics to be supported, otherwise they are submitted as custom metrics.

  3. Configure additional tags in <FLINK_HOME>/conf/flink-conf.yaml. Here is an example of custom tags:

    metrics.reporter.dghttp.scope.variables.additional: <KEY1>:<VALUE1>, <KEY1>:<VALUE2>
    

    Note: By default, any variables in metric names are sent as tags, so there is no need to add custom tags for job_id, task_id, etc.

  4. Restart Flink to start sending your Flink metrics to Datadog.

Log collection

Available for Agent >6.0

  1. Flink uses the log4j logger by default. To enable logging to a file, customize the format by editing the log4j*.properties configuration files in the conf/ directory of the Flink distribution. See the Flink logging documentation for information on which configuration file is relevant for your setup. See Flink’s repository for default configurations.

  2. By default, the integration pipeline supports the following layout pattern:

    %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    

    An example of a valid timestamp is: 2020-02-03 18:43:12,251.

    Clone and edit the integration pipeline if you have a different format.

  3. Collecting logs is disabled by default in the Datadog Agent, enable it in your datadog.yaml file:

    logs_enabled: true
    
  4. Uncomment and edit the logs configuration block in your flink.d/conf.yaml file. Change the path and service parameter values based on your environment. See the sample flink.d/conf.yaml for all available configuration options.

    logs:
      - type: file
        path: /var/log/flink/server.log
        source: flink
        service: myapp
        #To handle multi line that starts with yyyy-mm-dd use the following pattern
        #log_processing_rules:
        #  - type: multi_line
        #    pattern: \d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])
        #    name: new_log_start_with_date
    
  5. Restart the Agent.

Validation

Run the Agent’s status subcommand and look for flink under the Checks section.

Data Collected

Metrics

flink.jobmanager.Status.JVM.CPU.Load
(gauge)
The recent CPU usage of the JVM in the jobmanager
Shown as percent
flink.jobmanager.Status.JVM.CPU.Time
(gauge)
The CPU time used by the JVM in the jobmanager
Shown as second
flink.jobmanager.Status.JVM.ClassLoader.ClassesLoaded
(count)
The total number of classes loaded since the start of the JVM in the jobmanager
flink.jobmanager.Status.JVM.ClassLoader.ClassesUnloaded
(count)
The total number of classes unloaded since the start of the JVM in the jobmanager
flink.jobmanager.Status.JVM.Memory.Direct.Count
(count)
The number of buffers in the direct buffer pool in the jobmanager
Shown as buffer
flink.jobmanager.Status.JVM.Memory.Direct.MemoryUsed
(gauge)
The amount of memory used by the JVM for the direct buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Direct.TotalCapacity
(count)
The total capacity of all buffers in the direct buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Heap.Committed
(gauge)
The amount of heap memory guaranteed to be available to the JVM in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Heap.Max
(gauge)
The maximum amount of heap memory that can be used for memory management in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Heap.Used
(gauge)
The amount of heap memory currently used in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Mapped.Count
(gauge)
The number of buffers in the mapped buffer pool in the jobmanager
Shown as buffer
flink.jobmanager.Status.JVM.Memory.Mapped.MemoryUsed
(gauge)
The amount of memory used by the JVM for the mapped buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Mapped.TotalCapacity
(count)
The total capacity of all buffers in the mapped buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Committed
(gauge)
The amount of non-heap memory guaranteed to be available to the JVM in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Max
(gauge)
The maximum amount of non-heap memory that can be used for memory management in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Used
(gauge)
The amount of non-heap memory currently used in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Threads.Count
(count)
The total number of live threads in the jobmanager
Shown as thread
flink.jobmanager.job.downtime
(gauge)
For jobs currently in a failing/recovering situation- the time elapsed during this outage. Returns 0 for running jobs and -1 for completed jobs
Shown as millisecond
flink.jobmanager.job.lastCheckpointAlignmentBuffered
(gauge)
The number of buffered bytes during alignment over all subtasks for the last checkpoint
Shown as byte
flink.jobmanager.job.lastCheckpointDuration
(gauge)
The time it took to complete the last checkpoint
Shown as millisecond
flink.jobmanager.job.lastCheckpointExternalPath
(gauge)
The path where the last external checkpoint was stored
flink.jobmanager.job.lastCheckpointRestoreTimestamp
(gauge)
Timestamp when the last checkpoint was restored at the coordinator
Shown as millisecond
flink.jobmanager.job.lastCheckpointSize
(gauge)
The total size of the last checkpoint
Shown as byte
flink.jobmanager.job.numRestarts
(gauge)
The total number of restarts since this job was submitted, including full restarts and fine-grained restarts
flink.jobmanager.job.numberOfCompletedCheckpoints
(count)
The number of successfully completed checkpoints
flink.jobmanager.job.numberOfFailedCheckpoints
(count)
The number of failed checkpoints
flink.jobmanager.job.numberOfInProgressCheckpoints
(gauge)
The number of in progress checkpoints
flink.jobmanager.job.restartingTime
(gauge)
The time it took to restart the job or how long the current restart has been in progress
Shown as millisecond
flink.jobmanager.job.totalNumberOfCheckpoints
(count)
The number of total checkpoints (in progress completed and failed)
flink.jobmanager.job.uptime
(gauge)
The time that the job has been running without interruption. Returns -1 for completed jobs
Shown as millisecond
flink.jobmanager.numRegisteredTaskManagers
(gauge)
The number of registered taskmanagers
flink.jobmanager.numRunningJobs
(gauge)
The number of running jobs
Shown as job
flink.jobmanager.taskSlotsTotal
(gauge)
The total number of task slots
flink.operator.commitsFailed
(count)
The total number of offset commit failures to Kafka if offset committing is turned on and checkpointing is enabled. Note that committing offsets back to Kafka is only a means to expose consumer progress so a commit failure does not affect the integrity of Flink's checkpointed partition offsets
Shown as commit
flink.operator.commitsSucceeded
(count)
The total number of successful offset commits to Kafka if offset committing is turned on and checkpointing is enabled
Shown as commit
flink.operator.currentInput1Watermark
(gauge)
The last watermark this operator has received in its first input. Only for operators with 2 inputs
Shown as millisecond
flink.operator.currentInput2Watermark
(gauge)
The last watermark this operator has received in its second input. Only for operators with 2 inputs
Shown as millisecond
flink.operator.currentInputWatermark
(gauge)
The last watermark this operator has received. For tasks with 2 inputs this is the minimum of the last received watermarks
Shown as millisecond
flink.operator.currentOutputWatermark
(gauge)
The last watermark this operator has emitted
Shown as millisecond
flink.operator.numLateRecordsDropped
(count)
The number of records this operator has dropped due to arriving late
Shown as record
flink.operator.numRecordsIn
(count)
The total number of records this operator has received
Shown as record
flink.operator.numRecordsInPerSecond
(gauge)
The number of records this operator receives per second
Shown as record
flink.operator.numRecordsOut
(count)
The total number of records this operator has emitted
Shown as record
flink.operator.numRecordsOutPerSec
(gauge)
The total number of records this operator has emitted per second
Shown as record
flink.operator.numSplitsProcessed
(count)
The total number of InputSplits this data source has processed (if the operator is a data source)
flink.task.Shuffle.Netty.Input.Buffers.inPoolUsage
(gauge)
An estimate of the input buffers usage
flink.task.Shuffle.Netty.Input.Buffers.inputQueueLength
(gauge)
The number of queued input buffers
Shown as buffer
flink.task.Shuffle.Netty.Input.numBuffersInLocal
(count)
The total number of network buffers this task has read from a local source
Shown as buffer
flink.task.Shuffle.Netty.Input.numBuffersInLocalPerSecond
(gauge)
The number of network buffers this task reads from a local source per second
flink.task.Shuffle.Netty.Input.numBuffersInRemote
(count)
The total number of network buffers this task has read from a remote source
Shown as buffer
flink.task.Shuffle.Netty.Input.numBuffersInRemotePerSecond
(gauge)
The number of network buffers this task reads from a remote source per second
Shown as buffer
flink.task.Shuffle.Netty.Input.numBytesInLocal
(count)
The total number of bytes this task has read from a local source
Shown as byte
flink.task.Shuffle.Netty.Input.numBytesInLocalPerSecond
(gauge)
The number of bytes this task reads from a local source per second
Shown as byte
flink.task.Shuffle.Netty.Input.numBytesInRemote
(count)
The total number of bytes this task has read from a remote source
Shown as byte
flink.task.Shuffle.Netty.Input.numBytesInRemotePerSecond
(gauge)
The number of bytes this task reads from a remote source per second
Shown as byte
flink.task.Shuffle.Netty.Output.Buffers.outPoolUsage
(gauge)
An estimate of the output buffers usage
flink.task.Shuffle.Netty.Output.Buffers.outputQueueLength
(gauge)
The number of queued output buffers
Shown as buffer
flink.task.checkpointAlignmentTime
(gauge)
The time in nanoseconds that the last barrier alignment took to complete or how long the current alignment has taken so far
Shown as nanosecond
flink.task.currentInputWatermark
(gauge)
The last watermark this task has received. For tasks with 2 inputs this is the minimum of the last received watermarks
Shown as millisecond
flink.task.numBuffersOut
(count)
The total number of network buffers this task has emitted
Shown as buffer
flink.task.numBuffersOutPerSecond
(gauge)
The number of network buffers this task emits per second
Shown as buffer
flink.task.numBytesOut
(count)
The total number of bytes this task has emitted
Shown as byte
flink.task.numBytesOutPerSecond
(gauge)
The number of bytes this task emits per second
Shown as byte
flink.task.numLateRecordsDropped
(count)
The number of records this task has dropped due to arriving late
Shown as record
flink.task.numRecordsIn
(count)
The total number of records this task has received
Shown as record
flink.task.numRecordsInPerSecond
(gauge)
The number of records this task receives per second
Shown as record
flink.task.numRecordsOut
(count)
The total number of records this task has emitted
Shown as record
flink.task.numRecordsOutPerSec
(gauge)
The total number of records this task has emitted per second
Shown as record
flink.taskmanager.Status.JVM.CPU.Load
(gauge)
The recent CPU usage of the JVM in the taskmanager
Shown as percent
flink.taskmanager.Status.JVM.CPU.Time
(gauge)
The CPU time used by the JVM in the taskmanager
Shown as second
flink.taskmanager.Status.JVM.ClassLoader.ClassesLoaded
(count)
The total number of classes loaded since the start of the JVM in the taskmanager
flink.taskmanager.Status.JVM.ClassLoader.ClassesUnloaded
(count)
The total number of classes unloaded since the start of the JVM in the taskmanager
flink.taskmanager.Status.JVM.Memory.Direct.Count
(gauge)
The number of buffers in the direct buffer pool in the taskmanager
Shown as buffer
flink.taskmanager.Status.JVM.Memory.Direct.MemoryUsed
(gauge)
The amount of memory used by the JVM for the direct buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Direct.TotalCapacity
(count)
The total capacity of all buffers in the direct buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Heap.Committed
(gauge)
The amount of heap memory guaranteed to be available to the JVM in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Heap.Max
(gauge)
The maximum amount of heap memory that can be used for memory management in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Heap.Used
(gauge)
The amount of heap memory currently used in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Mapped.Count
(gauge)
The number of buffers in the mapped buffer pool in the taskmanager
flink.taskmanager.Status.JVM.Memory.Mapped.MemoryUsed
(gauge)
The amount of memory used by the JVM for the mapped buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Mapped.TotalCapacity
(count)
The total capacity of all buffers in the mapped buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.NonHeap.Committed
(gauge)
The amount of non-heap memory guaranteed to be available to the JVM in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.NonHeap.Max
(gauge)
The maximum amount of non-heap memory that can be used for memory management in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.NonHeap.Used
(gauge)
The amount of non-heap memory currently used in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Threads.Count
(count)
The total number of live threads in the taskmanager
Shown as thread
flink.taskmanager.Status.Shuffle.Netty.AvailableMemorySegments
(gauge)
The number of unused memory segments in the taskmanager
flink.taskmanager.Status.Shuffle.Netty.TotalMemorySegments
(gauge)
The number of allocated memory segments in the taskmanager

Service Checks

Flink does not include any service checks.

Events

Flink does not include any events.

Troubleshooting

Need help? Contact Datadog support.