Flink

Supported OS Windows Mac OS

통합 버전3.0.0

개요

이 점검은 Flink를 모니터링합니다. Datadog은 Datadog’s HTTP API를 사용하는 Flink의 Datadog HTTP Reporter를 통해 Flink 메트릭을 수집합니다.

설정

설치

Flink 점검은 Datadog Agent 패키지에 포함되어 있습니다. 따라서 서버에 추가 설치가 필요하지 않습니다.

구성

메트릭 수집

  1. Flink에서 Datadog HTTP Reporter를 구성합니다.

    <FLINK_HOME>/conf/flink-conf.yaml에서 다음 줄을 추가하여 <DATADOG_API_KEY>를 Datadog API 키로 변경합니다.

    metrics.reporter.dghttp.factory.class: org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
    metrics.reporter.dghttp.apikey: <DATADOG_API_KEY>
    metrics.reporter.dghttp.dataCenter: US #(optional) The data center (EU/US) to connect to, defaults to US.
    
  2. <FLINK_HOME>/conf/flink-conf.yaml에서 시스템 범위를 다시 매핑합니다.

    metrics.scope.jm: flink.jobmanager
    metrics.scope.jm.job: flink.jobmanager.job
    metrics.scope.tm: flink.taskmanager
    metrics.scope.tm.job: flink.taskmanager.job
    metrics.scope.task: flink.task
    metrics.scope.operator: flink.operator
    

    참고: Flink 메트릭을 지원하려면 시스템 범위를 다시 매핑해야 합니다. 그렇지 않으면 커스텀 메트릭으로 제출됩니다.

  3. <FLINK_HOME>/conf/flink-conf.yaml에서 추가 태그를 구성합니다. 다음은 커스텀 태그의 예입니다.

    metrics.reporter.dghttp.scope.variables.additional: <KEY1>:<VALUE1>, <KEY1>:<VALUE2>
    

    참고: 기본적으로 메트릭 이름의 모든 변수는 태그로 전송되므로 job_id, task_id 등에 대한 커스텀 태그를 추가할 필요가 없습니다.

  4. Flink 메트릭을 Datadog으로 전송하려면 Flink를 다시 시작합니다.

로그 수집

Agent >6.0에서 사용 가능

  1. Flink는 기본적으로 log4j 로거를 사용합니다. 파일에 대한 로깅을 활성화하려면 Flink 배포 conf/ 디렉터리의 log4j*.properties 구성 파일을 편집하여 형식을 사용자 정의합니다. 설정과 관련된 구성 파일에 대한 정보는 Flink 로깅 문서를, 기본 구성은 Flink 리포지토리를 참조하세요.

  2. 기본적으로 통합 파이프라인은 다음 레이아웃 패턴을 지원합니다.

    %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    

    유효한 타임스탬프의 예는 2020-02-03 18:43:12,251입니다.

    형식이 다른 경우 통합 파이프라인을 복제하고 편집합니다.

  3. Datadog 에이전트에서 로그 수집은 기본적으로 사용하지 않도록 설정되어 있습니다. datadog.yaml파일에서 로그 수집을 사용하도록 설정합니다.

    logs_enabled: true
    
  4. flink.d/conf.yaml 파일에서 로그 구성 블록의 주석 처리를 제거하고 편집합니다. 환경에 따라 pathservice 파라미터 값을 변경합니다. 사용 가능한 모든 구성 옵션은 샘플 flink.d/conf.yaml을 참조하세요.

    logs:
      - type: file
        path: /var/log/flink/server.log
        source: flink
        service: myapp
        #To handle multi line that starts with yyyy-mm-dd use the following pattern
        #log_processing_rules:
        #  - type: multi_line
        #    pattern: \d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])
        #    name: new_log_start_with_date
    
  5. 에이전트를 다시 시작합니다.

검증

Agent의 상태 하위 명령을 실행하고 Checks 섹션에서 flink를 찾습니다.

수집한 데이터

메트릭

flink.jobmanager.Status.JVM.CPU.Load
(gauge)
The recent CPU usage of the JVM in the jobmanager
Shown as percent
flink.jobmanager.Status.JVM.CPU.Time
(gauge)
The CPU time used by the JVM in the jobmanager
Shown as second
flink.jobmanager.Status.JVM.ClassLoader.ClassesLoaded
(count)
The total number of classes loaded since the start of the JVM in the jobmanager
flink.jobmanager.Status.JVM.ClassLoader.ClassesUnloaded
(count)
The total number of classes unloaded since the start of the JVM in the jobmanager
flink.jobmanager.Status.JVM.Memory.Direct.Count
(count)
The number of buffers in the direct buffer pool in the jobmanager
Shown as buffer
flink.jobmanager.Status.JVM.Memory.Direct.MemoryUsed
(gauge)
The amount of memory used by the JVM for the direct buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Direct.TotalCapacity
(count)
The total capacity of all buffers in the direct buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Heap.Committed
(gauge)
The amount of heap memory guaranteed to be available to the JVM in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Heap.Max
(gauge)
The maximum amount of heap memory that can be used for memory management in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Heap.Used
(gauge)
The amount of heap memory currently used in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Mapped.Count
(gauge)
The number of buffers in the mapped buffer pool in the jobmanager
Shown as buffer
flink.jobmanager.Status.JVM.Memory.Mapped.MemoryUsed
(gauge)
The amount of memory used by the JVM for the mapped buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.Mapped.TotalCapacity
(count)
The total capacity of all buffers in the mapped buffer pool in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Committed
(gauge)
The amount of non-heap memory guaranteed to be available to the JVM in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Max
(gauge)
The maximum amount of non-heap memory that can be used for memory management in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Memory.NonHeap.Used
(gauge)
The amount of non-heap memory currently used in the jobmanager
Shown as byte
flink.jobmanager.Status.JVM.Threads.Count
(count)
The total number of live threads in the jobmanager
Shown as thread
flink.jobmanager.job.downtime
(gauge)
For jobs currently in a failing/recovering situation- the time elapsed during this outage. Returns 0 for running jobs and -1 for completed jobs
Shown as millisecond
flink.jobmanager.job.lastCheckpointAlignmentBuffered
(gauge)
The number of buffered bytes during alignment over all subtasks for the last checkpoint
Shown as byte
flink.jobmanager.job.lastCheckpointDuration
(gauge)
The time it took to complete the last checkpoint
Shown as millisecond
flink.jobmanager.job.lastCheckpointExternalPath
(gauge)
The path where the last external checkpoint was stored
flink.jobmanager.job.lastCheckpointRestoreTimestamp
(gauge)
Timestamp when the last checkpoint was restored at the coordinator
Shown as millisecond
flink.jobmanager.job.lastCheckpointSize
(gauge)
The total size of the last checkpoint
Shown as byte
flink.jobmanager.job.numRestarts
(gauge)
The total number of restarts since this job was submitted, including full restarts and fine-grained restarts
flink.jobmanager.job.numberOfCompletedCheckpoints
(count)
The number of successfully completed checkpoints
flink.jobmanager.job.numberOfFailedCheckpoints
(count)
The number of failed checkpoints
flink.jobmanager.job.numberOfInProgressCheckpoints
(gauge)
The number of in progress checkpoints
flink.jobmanager.job.restartingTime
(gauge)
The time it took to restart the job or how long the current restart has been in progress
Shown as millisecond
flink.jobmanager.job.totalNumberOfCheckpoints
(count)
The number of total checkpoints (in progress completed and failed)
flink.jobmanager.job.uptime
(gauge)
The time that the job has been running without interruption. Returns -1 for completed jobs
Shown as millisecond
flink.jobmanager.numRegisteredTaskManagers
(gauge)
The number of registered taskmanagers
flink.jobmanager.numRunningJobs
(gauge)
The number of running jobs
Shown as job
flink.jobmanager.taskSlotsTotal
(gauge)
The total number of task slots
flink.operator.commitsFailed
(count)
The total number of offset commit failures to Kafka if offset committing is turned on and checkpointing is enabled. Note that committing offsets back to Kafka is only a means to expose consumer progress so a commit failure does not affect the integrity of Flink's checkpointed partition offsets
Shown as commit
flink.operator.commitsSucceeded
(count)
The total number of successful offset commits to Kafka if offset committing is turned on and checkpointing is enabled
Shown as commit
flink.operator.currentInput1Watermark
(gauge)
The last watermark this operator has received in its first input. Only for operators with 2 inputs
Shown as millisecond
flink.operator.currentInput2Watermark
(gauge)
The last watermark this operator has received in its second input. Only for operators with 2 inputs
Shown as millisecond
flink.operator.currentInputWatermark
(gauge)
The last watermark this operator has received. For tasks with 2 inputs this is the minimum of the last received watermarks
Shown as millisecond
flink.operator.currentOutputWatermark
(gauge)
The last watermark this operator has emitted
Shown as millisecond
flink.operator.numLateRecordsDropped
(count)
The number of records this operator has dropped due to arriving late
Shown as record
flink.operator.numRecordsIn
(count)
The total number of records this operator has received
Shown as record
flink.operator.numRecordsInPerSecond
(gauge)
The number of records this operator receives per second
Shown as record
flink.operator.numRecordsOut
(count)
The total number of records this operator has emitted
Shown as record
flink.operator.numRecordsOutPerSec
(gauge)
The total number of records this operator has emitted per second
Shown as record
flink.operator.numSplitsProcessed
(count)
The total number of InputSplits this data source has processed (if the operator is a data source)
flink.task.Shuffle.Netty.Input.Buffers.inPoolUsage
(gauge)
An estimate of the input buffers usage
flink.task.Shuffle.Netty.Input.Buffers.inputQueueLength
(gauge)
The number of queued input buffers
Shown as buffer
flink.task.Shuffle.Netty.Input.numBuffersInLocal
(count)
The total number of network buffers this task has read from a local source
Shown as buffer
flink.task.Shuffle.Netty.Input.numBuffersInLocalPerSecond
(gauge)
The number of network buffers this task reads from a local source per second
flink.task.Shuffle.Netty.Input.numBuffersInRemote
(count)
The total number of network buffers this task has read from a remote source
Shown as buffer
flink.task.Shuffle.Netty.Input.numBuffersInRemotePerSecond
(gauge)
The number of network buffers this task reads from a remote source per second
Shown as buffer
flink.task.Shuffle.Netty.Input.numBytesInLocal
(count)
The total number of bytes this task has read from a local source
Shown as byte
flink.task.Shuffle.Netty.Input.numBytesInLocalPerSecond
(gauge)
The number of bytes this task reads from a local source per second
Shown as byte
flink.task.Shuffle.Netty.Input.numBytesInRemote
(count)
The total number of bytes this task has read from a remote source
Shown as byte
flink.task.Shuffle.Netty.Input.numBytesInRemotePerSecond
(gauge)
The number of bytes this task reads from a remote source per second
Shown as byte
flink.task.Shuffle.Netty.Output.Buffers.outPoolUsage
(gauge)
An estimate of the output buffers usage
flink.task.Shuffle.Netty.Output.Buffers.outputQueueLength
(gauge)
The number of queued output buffers
Shown as buffer
flink.task.checkpointAlignmentTime
(gauge)
The time in nanoseconds that the last barrier alignment took to complete or how long the current alignment has taken so far
Shown as nanosecond
flink.task.currentInputWatermark
(gauge)
The last watermark this task has received. For tasks with 2 inputs this is the minimum of the last received watermarks
Shown as millisecond
flink.task.numBuffersOut
(count)
The total number of network buffers this task has emitted
Shown as buffer
flink.task.numBuffersOutPerSecond
(gauge)
The number of network buffers this task emits per second
Shown as buffer
flink.task.numBytesOut
(count)
The total number of bytes this task has emitted
Shown as byte
flink.task.numBytesOutPerSecond
(gauge)
The number of bytes this task emits per second
Shown as byte
flink.task.numLateRecordsDropped
(count)
The number of records this task has dropped due to arriving late
Shown as record
flink.task.numRecordsIn
(count)
The total number of records this task has received
Shown as record
flink.task.numRecordsInPerSecond
(gauge)
The number of records this task receives per second
Shown as record
flink.task.numRecordsOut
(count)
The total number of records this task has emitted
Shown as record
flink.task.numRecordsOutPerSec
(gauge)
The total number of records this task has emitted per second
Shown as record
flink.taskmanager.Status.JVM.CPU.Load
(gauge)
The recent CPU usage of the JVM in the taskmanager
Shown as percent
flink.taskmanager.Status.JVM.CPU.Time
(gauge)
The CPU time used by the JVM in the taskmanager
Shown as second
flink.taskmanager.Status.JVM.ClassLoader.ClassesLoaded
(count)
The total number of classes loaded since the start of the JVM in the taskmanager
flink.taskmanager.Status.JVM.ClassLoader.ClassesUnloaded
(count)
The total number of classes unloaded since the start of the JVM in the taskmanager
flink.taskmanager.Status.JVM.Memory.Direct.Count
(gauge)
The number of buffers in the direct buffer pool in the taskmanager
Shown as buffer
flink.taskmanager.Status.JVM.Memory.Direct.MemoryUsed
(gauge)
The amount of memory used by the JVM for the direct buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Direct.TotalCapacity
(count)
The total capacity of all buffers in the direct buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Heap.Committed
(gauge)
The amount of heap memory guaranteed to be available to the JVM in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Heap.Max
(gauge)
The maximum amount of heap memory that can be used for memory management in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Heap.Used
(gauge)
The amount of heap memory currently used in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Mapped.Count
(gauge)
The number of buffers in the mapped buffer pool in the taskmanager
flink.taskmanager.Status.JVM.Memory.Mapped.MemoryUsed
(gauge)
The amount of memory used by the JVM for the mapped buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.Mapped.TotalCapacity
(count)
The total capacity of all buffers in the mapped buffer pool in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.NonHeap.Committed
(gauge)
The amount of non-heap memory guaranteed to be available to the JVM in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.NonHeap.Max
(gauge)
The maximum amount of non-heap memory that can be used for memory management in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Memory.NonHeap.Used
(gauge)
The amount of non-heap memory currently used in the taskmanager
Shown as byte
flink.taskmanager.Status.JVM.Threads.Count
(count)
The total number of live threads in the taskmanager
Shown as thread
flink.taskmanager.Status.Shuffle.Netty.AvailableMemorySegments
(gauge)
The number of unused memory segments in the taskmanager
flink.taskmanager.Status.Shuffle.Netty.TotalMemorySegments
(gauge)
The number of allocated memory segments in the taskmanager

서비스 점검

Flink는 서비스 점검을 포함하지 않습니다.

이벤트

Flink는 이벤트를 포함하지 않습니다.

트러블슈팅

도움이 필요하세요? Datadog 지원팀에 문의하세요.