Supported OS Linux Windows Mac OS

インテグレーションバージョン1.5.0

概要

このチェックは Flink を監視します。Datadog は Flink の Datadog HTTP Reporter を使用し、Datadog の HTTP API によって Flink のメトリクスを収集します。

セットアップ

インストール

Flink チェックは Datadog Agent パッケージに含まれています。 サーバーに追加でインストールする必要はありません。

構成

メトリクスの収集

  1. Flink で Datadog HTTP Reporter を構成します。

    <FLINK_HOME>/conf/flink-conf.yaml に以下の行を追加し、<DATADOG_API_KEY> を Datadog API キーと置き換えます。

    metrics.reporter.dghttp.factory.class: org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
    metrics.reporter.dghttp.apikey: <DATADOG_API_KEY>
    metrics.reporter.dghttp.dataCenter: 
    
  2. <FLINK_HOME>/conf/flink-conf.yaml で、システムのスコープを再マッピングします。

    metrics.scope.jm: flink.jobmanager
    metrics.scope.jm.job: flink.jobmanager.job
    metrics.scope.tm: flink.taskmanager
    metrics.scope.tm.job: flink.taskmanager.job
    metrics.scope.task: flink.task
    metrics.scope.operator: flink.operator
    

    : Flink のメトリクスをサポートするには、システムスコープをマッピングし直す必要があります。しない場合は、カスタムメトリクスとして送信されます。

  3. <FLINK_HOME>/conf/flink-conf.yaml に、たとえば以下のカスタムタグのようなタグを追加します。

    metrics.reporter.dghttp.scope.variables.additional: <KEY1>:<VALUE1>, <KEY1>:<VALUE2>
    

    : デフォルトでは、メトリクスの名前はタグとして送信され、識別されるため、job_idtask_id などのカスタムタグを追加する必要はありません。

  4. Flink を再起動すると、Flink のメトリクスが Datadog に送信されます。

ログ収集

Agent バージョン 6.0 以降で利用可能

  1. Flink はデフォルトで log4j ロガーを使用します。ファイルへのロギングを有効にするには、Flink ディストリビューションの conf/ ディレクトリにある log4j*.properties コンフィギュレーションファイルを編集して、フォーマットをカスタマイズします。どのコンフィギュレーションファイルがあなたのセットアップに関連するかについては、Flink のロギングに関するドキュメントを参照してください。デフォルトの構成については、Flink のリポジトリを参照してください。

  2. インテグレーションパイプラインは、デフォルトで、次のレイアウトパターンをサポートします。

    %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    

    タイムスタンプの部分には、たとえば 2020-02-03 18:43:12,251 などが入ります。

    フォーマットが異なる場合は、インテグレーションパイプラインを複製して編集してください。

  3. Datadog Agent で、ログの収集はデフォルトで無効になっています。以下のように、datadog.yaml ファイルでこれを有効にします。

    logs_enabled: true
    
  4. flink.d/conf.yaml ファイルのコメントを解除して、ログコンフィギュレーションブロックを編集します。環境に基づいて、path パラメーターと service パラメーターの値を変更してください。使用可能なすべてのコンフィギュレーションオプションの詳細については、flink.d/conf.yaml のサンプルを参照してください。

    logs:
      - type: file
        path: /var/log/flink/server.log
        source: flink
        service: myapp
        #To handle multi line that starts with yyyy-mm-dd use the following pattern
        #log_processing_rules:
        #  - type: multi_line
        #    pattern: \d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])
        #    name: new_log_start_with_date
    
  5. Agent を再起動します

検証

Agent の status サブコマンドを実行し、Checks セクションで flink を探します。

収集データ

メトリクス

flink.jobmanager.Status.JVM.CPU.Load
(gauge)
The recent CPU usage of the JVM in the jobmanager
Shown as percent
flink.jobmanager.Status.JVM.CPU.Time
(gauge)
The CPU time used by the JVM in the jobmanager
Shown as second
flink.jobmanager.Status.JVM.ClassLoader.ClassesLoaded
(count)
The total number of classes loaded since the start of the JVM in the jobmanager
flink.jobmanager.Status.JVM.ClassLoader.ClassesUnloaded
(count)
The total number of classes unloaded since the start of the JVM in the jobmanager
flink.jobmanager.Status.JVM.Memory.Direct.Count
(count)
The number of buffers in the direct buffer pool in the jobmanager
Shown as buffer
flink.jobmanager.Status.JVM.Memory.Direct.MemoryUsed
(gauge)
The amount of memory used by the JVM for the direct buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Direct.TotalCapacity
(count)
The total capacity of all buffers in the direct buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Heap.Committed
(gauge)
The amount of heap memory guaranteed to be available to the JVM in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Heap.Max
(gauge)
The maximum amount of heap memory that can be used for memory management in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Heap.Used
(gauge)
The amount of heap memory currently used in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Mapped.Count
(gauge)
The number of buffers in the mapped buffer pool in the jobmanager
Shown as buffer
flink.jobmanager.Status.JVM.Memory.Mapped.MemoryUsed
(gauge)
The amount of memory used by the JVM for the mapped buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Mapped.TotalCapacity
(count)
The total capacity of all buffers in the mapped buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Committed
(gauge)
The amount of non-heap memory guaranteed to be available to the JVM in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Max
(gauge)
The maximum amount of non-heap memory that can be used for memory management in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Used
(gauge)
The amount of non-heap memory currently used in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Threads.Count
(count)
The total number of live threads in the jobmanager
Shown as thread
flink.jobmanager.job.downtime
(gauge)
For jobs currently in a failing/recovering situation- the time elapsed during this outage. Returns 0 for running jobs and -1 for completed jobs
Shown as millisecond
flink.jobmanager.job.lastCheckpointAlignmentBuffered
(gauge)
The number of buffered bytes during alignment over all subtasks for the last checkpoint
Shown as byte
flink.jobmanager.job.lastCheckpointDuration
(gauge)
The time it took to complete the last checkpoint
Shown as millisecond
flink.jobmanager.job.lastCheckpointExternalPath
(gauge)
The path where the last external checkpoint was stored
flink.jobmanager.job.lastCheckpointRestoreTimestamp
(gauge)
Timestamp when the last checkpoint was restored at the coordinator
Shown as millisecond
flink.jobmanager.job.lastCheckpointSize
(gauge)
The total size of the last checkpoint
Shown as byte
flink.jobmanager.job.numRestarts
(gauge)
The total number of restarts since this job was submitted, including full restarts and fine-grained restarts
flink.jobmanager.job.numberOfCompletedCheckpoints
(count)
The number of successfully completed checkpoints
flink.jobmanager.job.numberOfFailedCheckpoints
(count)
The number of failed checkpoints
flink.jobmanager.job.numberOfInProgressCheckpoints
(gauge)
The number of in progress checkpoints
flink.jobmanager.job.restartingTime
(gauge)
The time it took to restart the job or how long the current restart has been in progress
Shown as millisecond
flink.jobmanager.job.totalNumberOfCheckpoints
(count)
The number of total checkpoints (in progress completed and failed)
flink.jobmanager.job.uptime
(gauge)
The time that the job has been running without interruption. Returns -1 for completed jobs
Shown as millisecond
flink.jobmanager.numRegisteredTaskManagers
(gauge)
The number of registered taskmanagers
flink.jobmanager.numRunningJobs
(gauge)
The number of running jobs
Shown as job
flink.jobmanager.taskSlotsTotal
(gauge)
The total number of task slots
flink.operator.commitsFailed
(count)
The total number of offset commit failures to Kafka if offset committing is turned on and checkpointing is enabled. Note that committing offsets back to Kafka is only a means to expose consumer progress so a commit failure does not affect the integrity of Flink's checkpointed partition offsets
Shown as commit
flink.operator.commitsSucceeded
(count)
The total number of successful offset commits to Kafka if offset committing is turned on and checkpointing is enabled
Shown as commit
flink.operator.currentInput1Watermark
(gauge)
The last watermark this operator has received in its first input. Only for operators with 2 inputs
Shown as millisecond
flink.operator.currentInput2Watermark
(gauge)
The last watermark this operator has received in its second input. Only for operators with 2 inputs
Shown as millisecond
flink.operator.currentInputWatermark
(gauge)
The last watermark this operator has received. For tasks with 2 inputs this is the minimum of the last received watermarks
Shown as millisecond
flink.operator.currentOutputWatermark
(gauge)
The last watermark this operator has emitted
Shown as millisecond
flink.operator.numLateRecordsDropped
(count)
The number of records this operator has dropped due to arriving late
Shown as record
flink.operator.numRecordsIn
(count)
The total number of records this operator has received
Shown as record
flink.operator.numRecordsInPerSecond
(gauge)
The number of records this operator receives per second
Shown as record
flink.operator.numRecordsOut
(count)
The total number of records this operator has emitted
Shown as record
flink.operator.numRecordsOutPerSec
(gauge)
The total number of records this operator has emitted per second
Shown as record
flink.operator.numSplitsProcessed
(count)
The total number of InputSplits this data source has processed (if the operator is a data source)
flink.task.Shuffle.Netty.Input.Buffers.inPoolUsage
(gauge)
An estimate of the input buffers usage
flink.task.Shuffle.Netty.Input.Buffers.inputQueueLength
(gauge)
The number of queued input buffers
Shown as buffer
flink.task.Shuffle.Netty.Input.numBuffersInLocal
(count)
The total number of network buffers this task has read from a local source
Shown as buffer
flink.task.Shuffle.Netty.Input.numBuffersInLocalPerSecond
(gauge)
The number of network buffers this task reads from a local source per second
flink.task.Shuffle.Netty.Input.numBuffersInRemote
(count)
The total number of network buffers this task has read from a remote source
Shown as buffer
flink.task.Shuffle.Netty.Input.numBuffersInRemotePerSecond
(gauge)
The number of network buffers this task reads from a remote source per second
Shown as buffer
flink.task.Shuffle.Netty.Input.numBytesInLocal
(count)
The total number of bytes this task has read from a local source
Shown as byte
flink.task.Shuffle.Netty.Input.numBytesInLocalPerSecond
(gauge)
The number of bytes this task reads from a local source per second
Shown as byte
flink.task.Shuffle.Netty.Input.numBytesInRemote
(count)
The total number of bytes this task has read from a remote source
Shown as byte
flink.task.Shuffle.Netty.Input.numBytesInRemotePerSecond
(gauge)
The number of bytes this task reads from a remote source per second
Shown as byte
flink.task.Shuffle.Netty.Output.Buffers.outPoolUsage
(gauge)
An estimate of the output buffers usage
flink.task.Shuffle.Netty.Output.Buffers.outputQueueLength
(gauge)
The number of queued output buffers
Shown as buffer
flink.task.checkpointAlignmentTime
(gauge)
The time in nanoseconds that the last barrier alignment took to complete or how long the current alignment has taken so far
Shown as nanosecond
flink.task.currentInputWatermark
(gauge)
The last watermark this task has received. For tasks with 2 inputs this is the minimum of the last received watermarks
Shown as millisecond
flink.task.numBuffersOut
(count)
The total number of network buffers this task has emitted
Shown as buffer
flink.task.numBuffersOutPerSecond
(gauge)
The number of network buffers this task emits per second
Shown as buffer
flink.task.numBytesOut
(count)
The total number of bytes this task has emitted
Shown as byte
flink.task.numBytesOutPerSecond
(gauge)
The number of bytes this task emits per second
Shown as byte
flink.task.numLateRecordsDropped
(count)
The number of records this task has dropped due to arriving late
Shown as record
flink.task.numRecordsIn
(count)
The total number of records this task has received
Shown as record
flink.task.numRecordsInPerSecond
(gauge)
The number of records this task receives per second
Shown as record
flink.task.numRecordsOut
(count)
The total number of records this task has emitted
Shown as record
flink.task.numRecordsOutPerSec
(gauge)
The total number of records this task has emitted per second
Shown as record
flink.taskmanager.Status.JVM.CPU.Load
(gauge)
The recent CPU usage of the JVM in the taskmanager
Shown as percent
flink.taskmanager.Status.JVM.CPU.Time
(gauge)
The CPU time used by the JVM in the taskmanager
Shown as second
flink.taskmanager.Status.JVM.ClassLoader.ClassesLoaded
(count)
The total number of classes loaded since the start of the JVM in the taskmanager
flink.taskmanager.Status.JVM.ClassLoader.ClassesUnloaded
(count)
The total number of classes unloaded since the start of the JVM in the taskmanager
flink.taskmanager.Status.JVM.Memory.Direct.Count
(gauge)
The number of buffers in the direct buffer pool in the taskmanager
Shown as buffer
flink.taskmanager.Status.JVM.Memory.Direct.MemoryUsed
(gauge)
The amount of memory used by the JVM for the direct buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Direct.TotalCapacity
(count)
The total capacity of all buffers in the direct buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Heap.Committed
(gauge)
The amount of heap memory guaranteed to be available to the JVM in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Heap.Max
(gauge)
The maximum amount of heap memory that can be used for memory management in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Heap.Used
(gauge)
The amount of heap memory currently used in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Mapped.Count
(gauge)
The number of buffers in the mapped buffer pool in the taskmanager
flink.taskmanager.Status.JVM.Memory.Mapped.MemoryUsed
(gauge)
The amount of memory used by the JVM for the mapped buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Mapped.TotalCapacity
(count)
The total capacity of all buffers in the mapped buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.NonHeap.Committed
(gauge)
The amount of non-heap memory guaranteed to be available to the JVM in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.NonHeap.Max
(gauge)
The maximum amount of non-heap memory that can be used for memory management in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.NonHeap.Used
(gauge)
The amount of non-heap memory currently used in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Threads.Count
(count)
The total number of live threads in the taskmanager
Shown as thread
flink.taskmanager.Status.Shuffle.Netty.AvailableMemorySegments
(gauge)
The number of unused memory segments in the taskmanager
flink.taskmanager.Status.Shuffle.Netty.TotalMemorySegments
(gauge)
The number of allocated memory segments in the taskmanager

サービスチェック

Flink には、サービスのチェック機能は含まれません。

イベント

Flink には、イベントは含まれません。

トラブルシューティング

ご不明な点は、Datadog のサポートチームまでお問合せください。