---
title: Flink
description: Track metrics for your flink jobs.
breadcrumbs: Docs > Integrations > Flink
---

# Flink
Supported OS Integration version3.2.0
## Overview{% #overview %}

This check monitors [Flink](https://flink.apache.org/). Datadog collects Flink metrics through Flink's [Datadog HTTP Reporter](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/metric_reporters/#datadog), which uses [Datadog's HTTP API](https://docs.datadoghq.com/api/?lang=bash#api-reference).

**Minimum Agent version:** 7.17.2

## Setup{% #setup %}

### Installation{% #installation %}

The Flink check is included in the [Datadog Agent](https://app.datadoghq.com/account/settings/agent/latest) package. No additional installation is needed on your server.

### Configuration{% #configuration %}

#### Metric collection{% #metric-collection %}

{% callout %}
# Important note for users on the following Datadog sites: app.ddog-gov.com



1. Configure the [StatsD reporter](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/metric_reporters/#statsd) in Flink. In your `<FLINK_HOME>/conf/flink-conf.yaml`, add these lines:
   ```yaml
   metrics.reporter.stsd.factory.class: org.apache.flink.metrics.statsd.StatsDReporterFactory
   metrics.reporter.stsd.host: datadog-agent
   metrics.reporter.stsd.port: 8125
   metrics.reporter.stsd.interval: 60 SECONDS 
   ```
1. Ensure DogStatsD/StatsD is enabled within Datadog Agent and consider setting `DD_DOGSTATSD_NON_LOCAL_TRAFFIC=true` for containerized environments.
1. Restart Flink to start sending your Flink metrics to Datadog.


{% /callout %}

{% callout %}
# Important note for users on the following Datadog sites: app.datadoghq.com, us3.datadoghq.com, us5.datadoghq.com, app.datadoghq.eu, ap1.datadoghq.com



1. Configure the [Datadog HTTP Reporter](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/metric_reporters/#datadog) in Flink.

In your `<FLINK_HOME>/conf/flink-conf.yaml`, add these lines, replacing `<DATADOG_API_KEY>` with your Datadog [API key](https://app.datadoghq.com/organization-settings/api-keys):

   ```yaml
   metrics.reporter.dghttp.factory.class: org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
   metrics.reporter.dghttp.apikey: <DATADOG_API_KEY>
   metrics.reporter.dghttp.dataCenter: US #(optional) The data center (EU/US) to connect to, defaults to US.
   ```

1. Re-map system scopes in your `<FLINK_HOME>/conf/flink-conf.yaml`.

   ```yaml
   metrics.scope.jm: flink.jobmanager
   metrics.scope.jm.job: flink.jobmanager.job
   metrics.scope.tm: flink.taskmanager
   metrics.scope.tm.job: flink.taskmanager.job
   metrics.scope.task: flink.task
   metrics.scope.operator: flink.operator
   ```

**Note**: The system scopes must be remapped for your Flink metrics to be supported, otherwise they are submitted as custom metrics.

1. Configure additional [tags](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/metric_reporters/#datadog) in `<FLINK_HOME>/conf/flink-conf.yaml`. Here is an example of custom tags:

   ```yaml
   metrics.reporter.dghttp.scope.variables.additional: <KEY1>:<VALUE1>, <KEY1>:<VALUE2>
   ```

**Note**: By default, any variables in metric names are sent as tags, so there is no need to add custom tags for `job_id`, `task_id`, etc.

1. Restart Flink to start sending your Flink metrics to Datadog.


{% /callout %}

#### Log collection{% #log-collection %}

*Available for Agent >6.0*

1. Flink uses the `log4j` logger by default. To enable logging to a file, customize the format by editing the `log4j*.properties` configuration files in the `conf/` directory of the Flink distribution. See the [Flink logging documentation](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/advanced/logging/) for information on which configuration file is relevant for your setup. See [Flink's repository](https://github.com/apache/flink/tree/release-1.16/flink-dist/src/main/flink-bin/conf) for default configurations.

1. By default, the integration pipeline supports the following layout pattern:

   ```text
   %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
   ```

An example of a valid timestamp is: `2020-02-03 18:43:12,251`.

Clone and edit the [integration pipeline](https://docs.datadoghq.com/logs/processing/#integration-pipelines) if you have a different format.

1. Collecting logs is disabled by default in the Datadog Agent, enable it in your `datadog.yaml` file:

   ```yaml
   logs_enabled: true
   ```

1. Uncomment and edit the logs configuration block in your `flink.d/conf.yaml` file. Change the `path` and `service` parameter values based on your environment. See the [sample flink.d/conf.yaml](https://github.com/DataDog/integrations-core/blob/master/flink/datadog_checks/flink/data/conf.yaml.example) for all available configuration options.

   ```yaml
   logs:
     - type: file
       path: /var/log/flink/server.log
       source: flink
       service: myapp
       #To handle multi line that starts with yyyy-mm-dd use the following pattern
       #log_processing_rules:
       #  - type: multi_line
       #    pattern: \d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])
       #    name: new_log_start_with_date
   ```

1. [Restart the Agent](https://docs.datadoghq.com/agent/guide/agent-commands/#start-stop-and-restart-the-agent).

### Validation{% #validation %}

[Run the Agent's status subcommand](https://docs.datadoghq.com/agent/guide/agent-commands/#agent-status-and-information) and look for `flink` under the Checks section.

## Data Collected{% #data-collected %}

### Metrics{% #metrics %}

|  |
|  |
| **flink.jobmanager.Status.JVM.CPU.Load**(gauge)                           | The recent CPU usage of the JVM in the jobmanager*Shown as percent*                                                                                                                                                                                                                                                     |
| **flink.jobmanager.Status.JVM.CPU.Time**(gauge)                           | The CPU time used by the JVM in the jobmanager*Shown as second*                                                                                                                                                                                                                                                         |
| **flink.jobmanager.Status.JVM.ClassLoader.ClassesLoaded**(count)          | The total number of classes loaded since the start of the JVM in the jobmanager                                                                                                                                                                                                                                         |
| **flink.jobmanager.Status.JVM.ClassLoader.ClassesUnloaded**(count)        | The total number of classes unloaded since the start of the JVM in the jobmanager                                                                                                                                                                                                                                       |
| **flink.jobmanager.Status.JVM.Memory.Direct.Count**(count)                | The number of buffers in the direct buffer pool in the jobmanager*Shown as buffer*                                                                                                                                                                                                                                      |
| **flink.jobmanager.Status.JVM.Memory.Direct.MemoryUsed**(gauge)           | The amount of memory used by the JVM for the direct buffer pool in the jobmanager*Shown as byte*                                                                                                                                                                                                                        |
| **flink.jobmanager.Status.JVM.Memory.Direct.TotalCapacity**(count)        | The total capacity of all buffers in the direct buffer pool in the jobmanager*Shown as byte*                                                                                                                                                                                                                            |
| **flink.jobmanager.Status.JVM.Memory.Heap.Committed**(gauge)              | The amount of heap memory guaranteed to be available to the JVM in the jobmanager*Shown as byte*                                                                                                                                                                                                                        |
| **flink.jobmanager.Status.JVM.Memory.Heap.Max**(gauge)                    | The maximum amount of heap memory that can be used for memory management in the jobmanager*Shown as byte*                                                                                                                                                                                                               |
| **flink.jobmanager.Status.JVM.Memory.Heap.Used**(gauge)                   | The amount of heap memory currently used in the jobmanager*Shown as byte*                                                                                                                                                                                                                                               |
| **flink.jobmanager.Status.JVM.Memory.Mapped.Count**(gauge)                | The number of buffers in the mapped buffer pool in the jobmanager*Shown as buffer*                                                                                                                                                                                                                                      |
| **flink.jobmanager.Status.JVM.Memory.Mapped.MemoryUsed**(gauge)           | The amount of memory used by the JVM for the mapped buffer pool in the jobmanager*Shown as byte*                                                                                                                                                                                                                        |
| **flink.jobmanager.Status.JVM.Memory.Mapped.TotalCapacity**(count)        | The total capacity of all buffers in the mapped buffer pool in the jobmanager*Shown as byte*                                                                                                                                                                                                                            |
| **flink.jobmanager.Status.JVM.Memory.NonHeap.Committed**(gauge)           | The amount of non-heap memory guaranteed to be available to the JVM in the jobmanager*Shown as byte*                                                                                                                                                                                                                    |
| **flink.jobmanager.Status.JVM.Memory.NonHeap.Max**(gauge)                 | The maximum amount of non-heap memory that can be used for memory management in the jobmanager*Shown as byte*                                                                                                                                                                                                           |
| **flink.jobmanager.Status.JVM.Memory.NonHeap.Used**(gauge)                | The amount of non-heap memory currently used in the jobmanager*Shown as byte*                                                                                                                                                                                                                                           |
| **flink.jobmanager.Status.JVM.Threads.Count**(count)                      | The total number of live threads in the jobmanager*Shown as thread*                                                                                                                                                                                                                                                     |
| **flink.jobmanager.job.downtime**(gauge)                                  | For jobs currently in a failing/recovering situation- the time elapsed during this outage. Returns 0 for running jobs and -1 for completed jobs*Shown as millisecond*                                                                                                                                                   |
| **flink.jobmanager.job.lastCheckpointAlignmentBuffered**(gauge)           | The number of buffered bytes during alignment over all subtasks for the last checkpoint*Shown as byte*                                                                                                                                                                                                                  |
| **flink.jobmanager.job.lastCheckpointDuration**(gauge)                    | The time it took to complete the last checkpoint*Shown as millisecond*                                                                                                                                                                                                                                                  |
| **flink.jobmanager.job.lastCheckpointExternalPath**(gauge)                | The path where the last external checkpoint was stored                                                                                                                                                                                                                                                                  |
| **flink.jobmanager.job.lastCheckpointRestoreTimestamp**(gauge)            | Timestamp when the last checkpoint was restored at the coordinator*Shown as millisecond*                                                                                                                                                                                                                                |
| **flink.jobmanager.job.lastCheckpointSize**(gauge)                        | The total size of the last checkpoint*Shown as byte*                                                                                                                                                                                                                                                                    |
| **flink.jobmanager.job.numRestarts**(gauge)                               | The total number of restarts since this job was submitted, including full restarts and fine-grained restarts                                                                                                                                                                                                            |
| **flink.jobmanager.job.numberOfCompletedCheckpoints**(count)              | The number of successfully completed checkpoints                                                                                                                                                                                                                                                                        |
| **flink.jobmanager.job.numberOfFailedCheckpoints**(count)                 | The number of failed checkpoints                                                                                                                                                                                                                                                                                        |
| **flink.jobmanager.job.numberOfInProgressCheckpoints**(gauge)             | The number of in progress checkpoints                                                                                                                                                                                                                                                                                   |
| **flink.jobmanager.job.restartingTime**(gauge)                            | The time it took to restart the job or how long the current restart has been in progress*Shown as millisecond*                                                                                                                                                                                                          |
| **flink.jobmanager.job.totalNumberOfCheckpoints**(count)                  | The number of total checkpoints (in progress completed and failed)                                                                                                                                                                                                                                                      |
| **flink.jobmanager.job.uptime**(gauge)                                    | The time that the job has been running without interruption. Returns -1 for completed jobs*Shown as millisecond*                                                                                                                                                                                                        |
| **flink.jobmanager.numRegisteredTaskManagers**(gauge)                     | The number of registered taskmanagers                                                                                                                                                                                                                                                                                   |
| **flink.jobmanager.numRunningJobs**(gauge)                                | The number of running jobs*Shown as job*                                                                                                                                                                                                                                                                                |
| **flink.jobmanager.taskSlotsTotal**(gauge)                                | The total number of task slots                                                                                                                                                                                                                                                                                          |
| **flink.operator.commitsFailed**(count)                                   | The total number of offset commit failures to Kafka if offset committing is turned on and checkpointing is enabled. Note that committing offsets back to Kafka is only a means to expose consumer progress so a commit failure does not affect the integrity of Flink's checkpointed partition offsets*Shown as commit* |
| **flink.operator.commitsSucceeded**(count)                                | The total number of successful offset commits to Kafka if offset committing is turned on and checkpointing is enabled*Shown as commit*                                                                                                                                                                                  |
| **flink.operator.currentInput1Watermark**(gauge)                          | The last watermark this operator has received in its first input. Only for operators with 2 inputs*Shown as millisecond*                                                                                                                                                                                                |
| **flink.operator.currentInput2Watermark**(gauge)                          | The last watermark this operator has received in its second input. Only for operators with 2 inputs*Shown as millisecond*                                                                                                                                                                                               |
| **flink.operator.currentInputWatermark**(gauge)                           | The last watermark this operator has received. For tasks with 2 inputs this is the minimum of the last received watermarks*Shown as millisecond*                                                                                                                                                                        |
| **flink.operator.currentOutputWatermark**(gauge)                          | The last watermark this operator has emitted*Shown as millisecond*                                                                                                                                                                                                                                                      |
| **flink.operator.numLateRecordsDropped**(count)                           | The number of records this operator has dropped due to arriving late*Shown as record*                                                                                                                                                                                                                                   |
| **flink.operator.numRecordsIn**(count)                                    | The total number of records this operator has received*Shown as record*                                                                                                                                                                                                                                                 |
| **flink.operator.numRecordsInPerSecond**(gauge)                           | The number of records this operator receives per second*Shown as record*                                                                                                                                                                                                                                                |
| **flink.operator.numRecordsOut**(count)                                   | The total number of records this operator has emitted*Shown as record*                                                                                                                                                                                                                                                  |
| **flink.operator.numRecordsOutPerSec**(gauge)                             | The total number of records this operator has emitted per second*Shown as record*                                                                                                                                                                                                                                       |
| **flink.operator.numSplitsProcessed**(count)                              | The total number of InputSplits this data source has processed (if the operator is a data source)                                                                                                                                                                                                                       |
| **flink.task.Shuffle.Netty.Input.Buffers.inPoolUsage**(gauge)             | An estimate of the input buffers usage                                                                                                                                                                                                                                                                                  |
| **flink.task.Shuffle.Netty.Input.Buffers.inputQueueLength**(gauge)        | The number of queued input buffers*Shown as buffer*                                                                                                                                                                                                                                                                     |
| **flink.task.Shuffle.Netty.Input.numBuffersInLocal**(count)               | The total number of network buffers this task has read from a local source*Shown as buffer*                                                                                                                                                                                                                             |
| **flink.task.Shuffle.Netty.Input.numBuffersInLocalPerSecond**(gauge)      | The number of network buffers this task reads from a local source per second                                                                                                                                                                                                                                            |
| **flink.task.Shuffle.Netty.Input.numBuffersInRemote**(count)              | The total number of network buffers this task has read from a remote source*Shown as buffer*                                                                                                                                                                                                                            |
| **flink.task.Shuffle.Netty.Input.numBuffersInRemotePerSecond**(gauge)     | The number of network buffers this task reads from a remote source per second*Shown as buffer*                                                                                                                                                                                                                          |
| **flink.task.Shuffle.Netty.Input.numBytesInLocal**(count)                 | The total number of bytes this task has read from a local source*Shown as byte*                                                                                                                                                                                                                                         |
| **flink.task.Shuffle.Netty.Input.numBytesInLocalPerSecond**(gauge)        | The number of bytes this task reads from a local source per second*Shown as byte*                                                                                                                                                                                                                                       |
| **flink.task.Shuffle.Netty.Input.numBytesInRemote**(count)                | The total number of bytes this task has read from a remote source*Shown as byte*                                                                                                                                                                                                                                        |
| **flink.task.Shuffle.Netty.Input.numBytesInRemotePerSecond**(gauge)       | The number of bytes this task reads from a remote source per second*Shown as byte*                                                                                                                                                                                                                                      |
| **flink.task.Shuffle.Netty.Output.Buffers.outPoolUsage**(gauge)           | An estimate of the output buffers usage                                                                                                                                                                                                                                                                                 |
| **flink.task.Shuffle.Netty.Output.Buffers.outputQueueLength**(gauge)      | The number of queued output buffers*Shown as buffer*                                                                                                                                                                                                                                                                    |
| **flink.task.checkpointAlignmentTime**(gauge)                             | The time in nanoseconds that the last barrier alignment took to complete or how long the current alignment has taken so far*Shown as nanosecond*                                                                                                                                                                        |
| **flink.task.currentInputWatermark**(gauge)                               | The last watermark this task has received. For tasks with 2 inputs this is the minimum of the last received watermarks*Shown as millisecond*                                                                                                                                                                            |
| **flink.task.numBuffersOut**(count)                                       | The total number of network buffers this task has emitted*Shown as buffer*                                                                                                                                                                                                                                              |
| **flink.task.numBuffersOutPerSecond**(gauge)                              | The number of network buffers this task emits per second*Shown as buffer*                                                                                                                                                                                                                                               |
| **flink.task.numBytesOut**(count)                                         | The total number of bytes this task has emitted*Shown as byte*                                                                                                                                                                                                                                                          |
| **flink.task.numBytesOutPerSecond**(gauge)                                | The number of bytes this task emits per second*Shown as byte*                                                                                                                                                                                                                                                           |
| **flink.task.numLateRecordsDropped**(count)                               | The number of records this task has dropped due to arriving late*Shown as record*                                                                                                                                                                                                                                       |
| **flink.task.numRecordsIn**(count)                                        | The total number of records this task has received*Shown as record*                                                                                                                                                                                                                                                     |
| **flink.task.numRecordsInPerSecond**(gauge)                               | The number of records this task receives per second*Shown as record*                                                                                                                                                                                                                                                    |
| **flink.task.numRecordsOut**(count)                                       | The total number of records this task has emitted*Shown as record*                                                                                                                                                                                                                                                      |
| **flink.task.numRecordsOutPerSec**(gauge)                                 | The total number of records this task has emitted per second*Shown as record*                                                                                                                                                                                                                                           |
| **flink.taskmanager.Status.JVM.CPU.Load**(gauge)                          | The recent CPU usage of the JVM in the taskmanager*Shown as percent*                                                                                                                                                                                                                                                    |
| **flink.taskmanager.Status.JVM.CPU.Time**(gauge)                          | The CPU time used by the JVM in the taskmanager*Shown as second*                                                                                                                                                                                                                                                        |
| **flink.taskmanager.Status.JVM.ClassLoader.ClassesLoaded**(count)         | The total number of classes loaded since the start of the JVM in the taskmanager                                                                                                                                                                                                                                        |
| **flink.taskmanager.Status.JVM.ClassLoader.ClassesUnloaded**(count)       | The total number of classes unloaded since the start of the JVM in the taskmanager                                                                                                                                                                                                                                      |
| **flink.taskmanager.Status.JVM.Memory.Direct.Count**(gauge)               | The number of buffers in the direct buffer pool in the taskmanager*Shown as buffer*                                                                                                                                                                                                                                     |
| **flink.taskmanager.Status.JVM.Memory.Direct.MemoryUsed**(gauge)          | The amount of memory used by the JVM for the direct buffer pool in the taskmanager*Shown as byte*                                                                                                                                                                                                                       |
| **flink.taskmanager.Status.JVM.Memory.Direct.TotalCapacity**(count)       | The total capacity of all buffers in the direct buffer pool in the taskmanager*Shown as byte*                                                                                                                                                                                                                           |
| **flink.taskmanager.Status.JVM.Memory.Heap.Committed**(gauge)             | The amount of heap memory guaranteed to be available to the JVM in the taskmanager*Shown as byte*                                                                                                                                                                                                                       |
| **flink.taskmanager.Status.JVM.Memory.Heap.Max**(gauge)                   | The maximum amount of heap memory that can be used for memory management in the taskmanager*Shown as byte*                                                                                                                                                                                                              |
| **flink.taskmanager.Status.JVM.Memory.Heap.Used**(gauge)                  | The amount of heap memory currently used in the taskmanager*Shown as byte*                                                                                                                                                                                                                                              |
| **flink.taskmanager.Status.JVM.Memory.Mapped.Count**(gauge)               | The number of buffers in the mapped buffer pool in the taskmanager                                                                                                                                                                                                                                                      |
| **flink.taskmanager.Status.JVM.Memory.Mapped.MemoryUsed**(gauge)          | The amount of memory used by the JVM for the mapped buffer pool in the taskmanager*Shown as byte*                                                                                                                                                                                                                       |
| **flink.taskmanager.Status.JVM.Memory.Mapped.TotalCapacity**(count)       | The total capacity of all buffers in the mapped buffer pool in the taskmanager*Shown as byte*                                                                                                                                                                                                                           |
| **flink.taskmanager.Status.JVM.Memory.NonHeap.Committed**(gauge)          | The amount of non-heap memory guaranteed to be available to the JVM in the taskmanager*Shown as byte*                                                                                                                                                                                                                   |
| **flink.taskmanager.Status.JVM.Memory.NonHeap.Max**(gauge)                | The maximum amount of non-heap memory that can be used for memory management in the taskmanager*Shown as byte*                                                                                                                                                                                                          |
| **flink.taskmanager.Status.JVM.Memory.NonHeap.Used**(gauge)               | The amount of non-heap memory currently used in the taskmanager*Shown as byte*                                                                                                                                                                                                                                          |
| **flink.taskmanager.Status.JVM.Threads.Count**(count)                     | The total number of live threads in the taskmanager*Shown as thread*                                                                                                                                                                                                                                                    |
| **flink.taskmanager.Status.Shuffle.Netty.AvailableMemorySegments**(gauge) | The number of unused memory segments in the taskmanager                                                                                                                                                                                                                                                                 |
| **flink.taskmanager.Status.Shuffle.Netty.TotalMemorySegments**(gauge)     | The number of allocated memory segments in the taskmanager                                                                                                                                                                                                                                                              |

### Service Checks{% #service-checks %}

Flink does not include any service checks.

### Events{% #events %}

Flink does not include any events.

## Troubleshooting{% #troubleshooting %}

Need help? Contact [Datadog support](https://docs.datadoghq.com/help/).
