Supported OS Linux Windows Mac OS

インテグレーションバージョン6.1.0

概要

Datadog Agent は、以下のような多くのメトリクスを Airflow から収集します。

  • DAGs(Directed Acyclic Graphs): DAG 処理の数、DAG バッグサイズなど
  • タスク: タスクの失敗、成功、強制終了など
  • プール: オープンスロット、使用中のスロットなど
  • エグゼキューター: オープンスロット、キューにあるタスク、実行中のタスクなど

メトリクスは Airflow StatsD プラグインを通じて収集され、Datadog の DogStatsD へ送られます。

Datadog Agent はメトリクスだけでなく、Airflow の健全性に関するサービスチェックも送信します。

セットアップ

インストール

Airflow インテグレーションを適切に動作させるには、以下のステップをすべて実施する必要があります。ステップを開始する前に、StatsD/DogStatsD マッピング機能が含まれる Datadog Agent (バージョン 6.17 または 7.17 以降) をインストールしてください。

構成

Airflow インテグレーションには 2 つの部分があります。

  • Datadog Agent 部分では、Airflow が接続可能であり、正常に動作しているかどうかを報告するために、指定されたエンドポイントにリクエストを送信します。また、Agent インテグレーションは Airflow に問い合わせを行い、一部の独自メトリクスを生成します。
  • Airflow StatsD 部分では、Airflow を Datadog Agent にメトリクスを送信するよう構成することができます。Datadog Agent は Airflow の表記を Datadog 表記に再マッピングします。

Airflow インテグレーションのメトリクスは、Agent 部分と StatsD 部分の両方から取得されます。

ホスト

Datadog Agent Airflow インテグレーションを構成する

Datadog Agent パッケージに含まれている Airflow チェックを構成して、ヘルスメトリクスとサービスチェックを収集します。これは、Agent のコンフィギュレーションディレクトリのルートにある conf.d/ フォルダーにある airflow.d/conf.yaml ファイル内の url を編集して、Airflow サービスチェックの収集を開始することで実行できます。利用可能なすべてのコンフィギュレーションオプションについては、airflow.d/conf.yaml のサンプルを参照してください。

url が Airflow Web サーバー base_url (Airflow インスタンスへの接続に使用する URL) に一致することを確認します。

Airflow を DogStatsD に接続する

Airflow の statsd 機能を使用してメトリクスを収集することにより、Airflow を DogStatsD (Datadog Agent に含まれる) に接続します。使用されている Airflow バージョンによって報告されるメトリクスと追加のコンフィギュレーションオプションの詳細については、以下の Airflow ドキュメントを参照してください。

: Airflow による StatsD メトリクスの報告有無は、使用する Airflow Executor によって異なる場合があります。例えば、airflow.ti_failures/successesairflow.operator_failures/successesairflow.dag.task.durationKubernetesExecutor では報告されません

  1. Airflow StatsD プラグインをインストールします。

    pip install 'apache-airflow[statsd]'
    
  2. 下記のコンフィギュレーションを追加して、Airflow コンフィギュレーションファイル airflow.cfg を更新します。

    `statsd_datadog_enabled` を true に設定しないでください。`statsd_datadog_enabled` を有効にすると、競合が発生する可能性があります。問題を防ぐには、この変数を `False` に設定してください。
    [scheduler]
    statsd_on = True
    # Hostname or IP of server running the Datadog Agent
    statsd_host = localhost
    # DogStatsD port configured in the Datadog Agent
    statsd_port = 8125
    statsd_prefix = airflow
    
  3. 下記のコンフィギュレーションを追加して、Datadog Agent のメインコンフィギュレーションファイルである datadog.yaml を更新します。

    # dogstatsd_mapper_cache_size: 1000  # default to 1000
    dogstatsd_mapper_profiles:
      - name: airflow
        prefix: "airflow."
        mappings:
          - match: "airflow.*_start"
            name: "airflow.job.start"
            tags:
              job_name: "$1"
          - match: "airflow.*_end"
            name: "airflow.job.end"
            tags:
              job_name: "$1"
          - match: "airflow.*_heartbeat_failure"
            name: airflow.job.heartbeat.failure
            tags:
              job_name: "$1"
          - match: "airflow.operator_failures_*"
            name: "airflow.operator_failures"
            tags:
              operator_name: "$1"
          - match: "airflow.operator_successes_*"
            name: "airflow.operator_successes"
            tags:
              operator_name: "$1"
          - match: 'airflow\.dag_processing\.last_runtime\.(.*)'
            match_type: "regex"
            name: "airflow.dag_processing.last_runtime"
            tags:
              dag_file: "$1"
          - match: 'airflow\.dag_processing\.last_run\.seconds_ago\.(.*)'
            match_type: "regex"
            name: "airflow.dag_processing.last_run.seconds_ago"
            tags:
              dag_file: "$1"
          - match: 'airflow\.dag\.loading-duration\.(.*)'
            match_type: "regex"
            name: "airflow.dag.loading_duration"
            tags:
              dag_file: "$1"
          - match: "airflow.local_task_job.task_exit.*.*.*.*"
            name: "airflow.local_task_job.task_exit"
            tags:
              job_id: "$1"
              dag_id: "$2"
              task_id: "$3"
              return_code: "$4"
          - match: "airflow.dag.*.*.queue_duration"
            name: "airflow.dag.queue_duration"
            tags:
              dag_id: "$1"
              task_id: "$2"
          - match: "airflow.dag.*.*.scheduled_duration"
            name: "airflow.dag.scheduled_duration"
            tags:
              dag_id: "$1"
              task_id: "$2"
          - match: "airflow.pool.open_slots.*"
          - match: "airflow.dagrun.*.first_task_scheduling_delay"
            name: "airflow.dagrun.first_task_scheduling_delay"
            tags:
              dag_id: "$1"
          - match: "airflow.pool.open_slots.*"
            name: "airflow.pool.open_slots"
            tags:
              pool_name: "$1"
          - match: "airflow.pool.queued_slots.*"
            name: "airflow.pool.queued_slots"
            tags:
              pool_name: "$1"
          - match: "airflow.pool.running_slots.*"
            name: "airflow.pool.running_slots"
            tags:
              pool_name: "$1"
          - match: "airflow.pool.used_slots.*"
            name: "airflow.pool.used_slots"
            tags:
              pool_name: "$1"
          - match: "airflow.pool.starving_tasks.*"
            name: "airflow.pool.starving_tasks"
            tags:
              pool_name: "$1"
          - match: 'airflow\.dagrun\.dependency-check\.(.*)'
            match_type: "regex"
            name: "airflow.dagrun.dependency_check"
            tags:
              dag_id: "$1"
          - match: 'airflow\.dag\.(.*)\.([^.]*)\.duration'
            match_type: "regex"
            name: "airflow.dag.task.duration"
            tags:
              dag_id: "$1"
              task_id: "$2"
          - match: 'airflow\.dag_processing\.last_duration\.(.*)'
            match_type: "regex"
            name: "airflow.dag_processing.last_duration"
            tags:
              dag_file: "$1"
          - match: 'airflow\.dagrun\.duration\.success\.(.*)'
            match_type: "regex"
            name: "airflow.dagrun.duration.success"
            tags:
              dag_id: "$1"
          - match: 'airflow\.dagrun\.duration\.failed\.(.*)'
            match_type: "regex"
            name: "airflow.dagrun.duration.failed"
            tags:
              dag_id: "$1"
          - match: 'airflow\.dagrun\.schedule_delay\.(.*)'
            match_type: "regex"
            name: "airflow.dagrun.schedule_delay"
            tags:
              dag_id: "$1"
          - match: 'airflow.scheduler.tasks.running'
            name: "airflow.scheduler.tasks.running"
          - match: 'airflow.scheduler.tasks.starving'
            name: "airflow.scheduler.tasks.starving"
          - match: 'airflow.sla_email_notification_failure'
            name: 'airflow.sla_email_notification_failure'
          - match: 'airflow\.task_removed_from_dag\.(.*)'
            match_type: "regex"
            name: "airflow.dag.task_removed"
            tags:
              dag_id: "$1"
          - match: 'airflow\.task_restored_to_dag\.(.*)'
            match_type: "regex"
            name: "airflow.dag.task_restored"
            tags:
              dag_id: "$1"
          - match: "airflow.task_instance_created-*"
            name: "airflow.task.instance_created"
            tags:
              task_class: "$1"
          - match: 'airflow\.ti\.start\.(.+)\.(\w+)'
            match_type: regex
            name: airflow.ti.start
            tags:
              dag_id: "$1"
              task_id: "$2"
          - match: 'airflow\.ti\.finish\.(\w+)\.(.+)\.(\w+)'
            name: airflow.ti.finish
            match_type: regex
            tags:
              dag_id: "$1"
              task_id: "$2"
              state: "$3"
    
Datadog Agent と Airflow を再起動する
  1. Agent を再起動します
  2. Airflow を再起動し、Agent の DogStatsD エンドポイントへの Airflow メトリクスの送信を開始します。
インテグレーションサービスチェック

airflow.d/conf.yaml ファイルのデフォルトコンフィギュレーションを使用して、Airflow サービスチェックを有効にします。利用可能なすべてのコンフィギュレーションオプションについては、airflow.d/conf.yaml のサンプルを参照してください。

ログ収集

Agent バージョン 6.0 以降で利用可能

  1. Datadog Agent で、ログの収集はデフォルトで無効になっています。以下のように、datadog.yaml ファイルでこれを有効にします。

    logs_enabled: true
    
  2. airflow.d/conf.yaml の下部にある、コンフィギュレーションブロックのコメントを解除して編集します。 path パラメーターと service パラメーターの値を変更し、環境に合わせて構成してください。

    • DAG プロセッサーマネージャーと Scheduler のログのコンフィギュレーション

      logs:
        - type: file
          path: "<PATH_TO_AIRFLOW>/logs/dag_processor_manager/dag_processor_manager.log"
          source: airflow
          log_processing_rules:
            - type: multi_line
              name: new_log_start_with_date
              pattern: \[\d{4}\-\d{2}\-\d{2}
        - type: file
          path: "<PATH_TO_AIRFLOW>/logs/scheduler/latest/*.log"
          source: airflow
          log_processing_rules:
            - type: multi_line
              name: new_log_start_with_date
              pattern: \[\d{4}\-\d{2}\-\d{2}
      

        スケジューラーログを毎日ローテーションする場合は、ログを定期的にクリーンアップすることをお勧めします。

    • DAG タスクのログ用に追加するコンフィギュレーション

      logs:
        - type: file
          path: "<PATH_TO_AIRFLOW>/logs/*/*/*/*.log"
          source: airflow
          log_processing_rules:
            - type: multi_line
              name: new_log_start_with_date
              pattern: \[\d{4}\-\d{2}\-\d{2}
      

      注意事項: デフォルトでは、Airflow は log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log のログファイルテンプレートをタスクに使用します。ログファイルの数は、定期的に削除しなければ急速に増加します。これは、実行された各タスクのログを Airflow UI が個別に表示するために使用するパターンです。

      ログを Airflow UI で確認しない場合は、airflow.cfglog_filename_template = dag_tasks.log を構成することをお勧めします。これにより、ログはこのファイルをローテーションすると同時に、以下のコンフィギュレーションを使用します。

      logs:
        - type: file
          path: "<PATH_TO_AIRFLOW>/logs/dag_tasks.log"
          source: airflow
          log_processing_rules:
            - type: multi_line
              name: new_log_start_with_date
              pattern: \[\d{4}\-\d{2}\-\d{2}
      
  3. Agent を再起動します

コンテナ化

Datadog Agent Airflow インテグレーションを構成する

コンテナ環境の場合は、オートディスカバリーのインテグレーションテンプレートのガイドを参照して、次のパラメーターを適用してください。

パラメーター
<INTEGRATION_NAME>airflow
<INIT_CONFIG>空白または {}
<INSTANCE_CONFIG>{"url": "http://%%host%%:8080"}

url が Airflow Web サーバー base_url (Airflow インスタンスへの接続に使用する URL) に一致することを確認します。localhost をテンプレート変数 %%host% に置き換えます。

Airflow の Helm チャートを使用している場合、これは Web サーバーを ClusterIP サービスとして公開します。このサービスを url パラメーターで使用する必要があります。

例えば、オートディスカバリーのアノテーションは次のようになります。

apiVersion: v1
kind: Pod
# (...)
metadata:
  name: '<POD_NAME>'
  annotations:
    ad.datadoghq.com/<CONTAINER_IDENTIFIER>.checks: |
      {
        "airflow": {
          "instances": ["url": "http://airflow-ui.%%kube_namespace%%.svc.cluster.local:8080"]
        }
      }      
    # (...)
Airflow を DogStatsD に接続する

Airflow の statsd 機能を使用してメトリクスを収集することにより、Airflow を DogStatsD (Datadog Agent に含まれる) に接続します。使用されている Airflow バージョンによって報告されるメトリクスと追加のコンフィギュレーションオプションの詳細については、以下の Airflow ドキュメントを参照してください。

: Airflow による StatsD メトリクスの報告有無は、使用する Airflow Executor によって異なる場合があります。例えば、airflow.ti_failures/successesairflow.operator_failures/successesairflow.dag.task.durationKubernetesExecutor では報告されません

: Airflow に使用される環境変数は、バージョン間で異なる場合があります。たとえば、Airflow 2.0.0 では、これは環境変数 AIRFLOW__METRICS__STATSD_HOST を利用しますが、Airflow 1.10.15AIRFLOW__SCHEDULER__STATSD_HOST を利用します。

Airflow StatsD コンフィギュレーションは、Kubernetes デプロイメントで次の環境変数を使用して有効にできます。

env:
  - name: AIRFLOW__SCHEDULER__STATSD_ON
    value: "True"
  - name: AIRFLOW__SCHEDULER__STATSD_PORT
    value: "8125"
  - name: AIRFLOW__SCHEDULER__STATSD_PREFIX
    value: "airflow"
  - name: AIRFLOW__SCHEDULER__STATSD_HOST
    valueFrom:
      fieldRef:
        fieldPath: status.hostIP

ホストエンドポイント AIRFLOW__SCHEDULER__STATSD_HOST の環境変数には、ノードのホスト IP アドレスが指定され、Airflow ポッドと同じノード上の Datadog Agent ポッドに StatsD データをルーティングします。この設定では、Agent がポート 8125 に対して hostPort を開き、非ローカルの StatsD トラフィックを受け入れる必要もあります。詳細については、Kubernetes の DogStatsD セットアップを参照してください。

これにより、StatsD トラフィックが Airflow コンテナから受信データを受け入れる準備が整った Datadog Agent に転送されます。最後の手順は、対応する dogstatsd_mapper_profiles を用いて Datadog Agent を更新することです。これは、ホストインストール で提供されている dogstatsd_mapper_profilesdatadog.yaml ファイルにコピーすることで実行できます。または、環境変数 DD_DOGSTATSD_MAPPER_PROFILES に同等の JSON 構成を使用して Datadog Agent をデプロイします。Kubernetes に関しては、同等の環境変数表記は次のとおりです。

env:
  - name: DD_DOGSTATSD_MAPPER_PROFILES
    value: >
      [{"name":"airflow","prefix":"airflow.","mappings":[{"match":"airflow.*_start","name":"airflow.job.start","tags":{"job_name":"$1"}},{"match":"airflow.*_end","name":"airflow.job.end","tags":{"job_name":"$1"}},{"match":"airflow.*_heartbeat_failure","name":"airflow.job.heartbeat.failure","tags":{"job_name":"$1"}},{"match":"airflow.operator_failures_*","name":"airflow.operator_failures","tags":{"operator_name":"$1"}},{"match":"airflow.operator_successes_*","name":"airflow.operator_successes","tags":{"operator_name":"$1"}},{"match":"airflow\\.dag_processing\\.last_runtime\\.(.*)","match_type":"regex","name":"airflow.dag_processing.last_runtime","tags":{"dag_file":"$1"}},{"match":"airflow\\.dag_processing\\.last_run\\.seconds_ago\\.(.*)","match_type":"regex","name":"airflow.dag_processing.last_run.seconds_ago","tags":{"dag_file":"$1"}},{"match":"airflow\\.dag\\.loading-duration\\.(.*)","match_type":"regex","name":"airflow.dag.loading_duration","tags":{"dag_file":"$1"}},{"match":"airflow.dagrun.*.first_task_scheduling_delay","name":"airflow.dagrun.first_task_scheduling_delay","tags":{"dag_id":"$1"}},{"match":"airflow.pool.open_slots.*","name":"airflow.pool.open_slots","tags":{"pool_name":"$1"}},{"match":"airflow.pool.queued_slots.*","name":"airflow.pool.queued_slots","tags":{"pool_name":"$1"}},{"match":"airflow.pool.running_slots.*","name":"airflow.pool.running_slots","tags":{"pool_name":"$1"}},{"match":"airflow.pool.used_slots.*","name":"airflow.pool.used_slots","tags":{"pool_name":"$1"}},{"match":"airflow.pool.starving_tasks.*","name":"airflow.pool.starving_tasks","tags":{"pool_name":"$1"}},{"match":"airflow\\.dagrun\\.dependency-check\\.(.*)","match_type":"regex","name":"airflow.dagrun.dependency_check","tags":{"dag_id":"$1"}},{"match":"airflow\\.dag\\.(.*)\\.([^.]*)\\.duration","match_type":"regex","name":"airflow.dag.task.duration","tags":{"dag_id":"$1","task_id":"$2"}},{"match":"airflow\\.dag_processing\\.last_duration\\.(.*)","match_type":"regex","name":"airflow.dag_processing.last_duration","tags":{"dag_file":"$1"}},{"match":"airflow\\.dagrun\\.duration\\.success\\.(.*)","match_type":"regex","name":"airflow.dagrun.duration.success","tags":{"dag_id":"$1"}},{"match":"airflow\\.dagrun\\.duration\\.failed\\.(.*)","match_type":"regex","name":"airflow.dagrun.duration.failed","tags":{"dag_id":"$1"}},{"match":"airflow\\.dagrun\\.schedule_delay\\.(.*)","match_type":"regex","name":"airflow.dagrun.schedule_delay","tags":{"dag_id":"$1"}},{"match":"airflow.scheduler.tasks.running","name":"airflow.scheduler.tasks.running"},{"match":"airflow.scheduler.tasks.starving","name":"airflow.scheduler.tasks.starving"},{"match":"airflow.sla_email_notification_failure","name":"airflow.sla_email_notification_failure"},{"match":"airflow\\.task_removed_from_dag\\.(.*)","match_type":"regex","name":"airflow.dag.task_removed","tags":{"dag_id":"$1"}},{"match":"airflow\\.task_restored_to_dag\\.(.*)","match_type":"regex","name":"airflow.dag.task_restored","tags":{"dag_id":"$1"}},{"match":"airflow.task_instance_created-*","name":"airflow.task.instance_created","tags":{"task_class":"$1"}},{"match":"airflow\\.ti\\.start\\.(.+)\\.(\\w+)","match_type":"regex","name":"airflow.ti.start","tags":{"dag_id":"$1","task_id":"$2"}},{"match":"airflow\\.ti\\.finish\\.(\\w+)\\.(.+)\\.(\\w+)","name":"airflow.ti.finish","match_type":"regex","tags":{"dag_id":"$1","task_id":"$2","state":"$3"}}]}]      

StatsD メトリクスに非静的タグを追加するには、DogStatsD マッパープロファイルを使用する必要があります。service および env タグを追加するマッパープロファイルの例をご覧ください

ログ収集

Agent バージョン 6.0 以降で利用可能

Datadog Agent で、ログの収集はデフォルトで無効になっています。有効にする方法については、Kubernetes ログ収集を参照してください。

パラメーター
<LOG_CONFIG>{"source": "airflow", "service": "<YOUR_APP_NAME>"}

検証

Agent の status サブコマンドを実行し、Checks セクションで airflow を探します。

付録

Airflow DatadogHook

さらに、Datadog とのインタラクションに Airflow DatadogHook を使用することも可能です。

  • メトリクスの送信
  • メトリクスのクエリ
  • イベントのポスト

収集データ

メトリクス

airflow.can_connect
(count)
1 if can connect to Airflow, otherwise 0
airflow.celery.task_timeout_error
(count)
Number of AirflowTaskTimeout errors raised when publishing Task to Celery Broker.
Shown as error
airflow.collect_db_dags
(gauge)
Milliseconds taken for fetching all Serialized Dags from DB
Shown as millisecond
airflow.dag.callback_exceptions
(count)
Number of exceptions raised from DAG callbacks. When this happens, it means DAG callback is not working
Shown as error
airflow.dag.loading_duration
(gauge)
DAG loading duration in seconds (deprecated)
Shown as second
airflow.dag.task.duration
(gauge)
Milliseconds taken to finish a task
Shown as millisecond
airflow.dag.task.ongoing_duration
(gauge)
Current duration for ongoing DAG tasks
Shown as second
airflow.dag.task.total_running
(gauge)
Total number of running tasks
airflow.dag.task_removed
(gauge)
Tasks removed from DAG
Shown as second
airflow.dag.task_restored
(gauge)
Tasks restored to DAG
Shown as second
airflow.dag_file_refresh_error
(count)
Number of failures loading any DAG files
Shown as error
airflow.dag_processing.import_errors
(gauge)
Number of errors from trying to parse DAG files
Shown as error
airflow.dag_processing.last_duration
(gauge)
Milliseconds taken to load the given DAG file
Shown as millisecond
airflow.dag_processing.last_run.seconds_ago
(gauge)
Seconds since <dag_file> was last processed
Shown as second
airflow.dag_processing.last_runtime
(gauge)
Seconds spent processing <dag_file> (in most recent iteration)
Shown as second
airflow.dag_processing.manager_stalls
(count)
Number of stalled DagFileProcessorManager
airflow.dag_processing.processes
(count)
Number of currently running DAG parsing processes
airflow.dag_processing.processor_timeouts
(gauge)
Number of file processors that have been killed due to taking too long
airflow.dag_processing.total_parse_time
(gauge)
Seconds taken to scan and import all DAG files once
Shown as second
airflow.dagbag_size
(gauge)
DAG bag size
airflow.dagrun.dependency_check
(gauge)
Milliseconds taken to check DAG dependencies
Shown as millisecond
airflow.dagrun.duration.failed
(gauge)
Milliseconds taken for a DagRun to reach failed state
Shown as millisecond
airflow.dagrun.duration.success
(gauge)
Milliseconds taken for a DagRun to reach success state
Shown as millisecond
airflow.dagrun.first_task_scheduling_delay
(gauge)
Milliseconds elapsed between first task start_date and dagrun expected start
Shown as millisecond
airflow.dagrun.schedule_delay
(gauge)
Milliseconds of delay between the scheduled DagRun start date and the actual DagRun start date
Shown as millisecond
airflow.executor.open_slots
(gauge)
Number of open slots on executor
airflow.executor.queued_tasks
(gauge)
Number of queued tasks on executor
Shown as task
airflow.executor.running_tasks
(gauge)
Number of running tasks on executor
Shown as task
airflow.healthy
(count)
1 if Airflow is healthy, otherwise 0
airflow.job.end
(count)
Number of ended <job_name> job, ex. SchedulerJob, LocalTaskJob
Shown as job
airflow.job.heartbeat.failure
(count)
Number of failed Heartbeats for a <job_name> job, ex. SchedulerJob, LocalTaskJob
Shown as error
airflow.job.start
(count)
Number of started <job_name> job, ex. SchedulerJob, LocalTaskJob
Shown as job
airflow.operator_failures
(count)
Operator <operator_name> failures
airflow.operator_successes
(count)
Operator <operator_name> successes
airflow.pool.open_slots
(gauge)
Number of open slots in the pool
airflow.pool.queued_slots
(gauge)
Number of queued slots in the pool
airflow.pool.running_slots
(gauge)
Number of running slots in the pool
airflow.pool.starving_tasks
(gauge)
Number of starving tasks in the pool
Shown as task
airflow.pool.used_slots
(gauge)
Number of used slots in the pool
airflow.previously_succeeded
(count)
Number of previously succeeded task instances
Shown as task
airflow.scheduler.critical_section_busy
(count)
Count of times a scheduler process tried to get a lock on the critical section (needed to send tasks to the executor) and found it locked by another process.
Shown as operation
airflow.scheduler.critical_section_duration
(gauge)
Milliseconds spent in the critical section of scheduler loop -- only a single scheduler can enter this loop at a time
Shown as millisecond
airflow.scheduler.orphaned_tasks.adopted
(count)
Number of Orphaned tasks adopted by the Scheduler
Shown as task
airflow.scheduler.orphaned_tasks.cleared
(count)
Number of Orphaned tasks cleared by the Scheduler
Shown as task
airflow.scheduler.tasks.executable
(count)
Number of tasks that are ready for execution (set to queued) with respect to pool limits, dag concurrency, executor state, and priority.
Shown as task
airflow.scheduler.tasks.killed_externally
(count)
Number of tasks killed externally
Shown as task
airflow.scheduler.tasks.running
(count)
Number of tasks running in executor
Shown as task
airflow.scheduler.tasks.starving
(count)
Number of tasks that cannot be scheduled because of no open slot in pool
Shown as task
airflow.scheduler.tasks.without_dagrun
(count)
Number of tasks without DagRuns or with DagRuns not in Running state
Shown as task
airflow.scheduler_heartbeat
(count)
Scheduler heartbeats
airflow.sla_email_notification_failure
(count)
Number of failed SLA miss email notification attempts
Shown as task
airflow.smart_sensor_operator.exception_failures
(count)
Number of failures caused by exception in the previous smart sensor poking loop
Shown as error
airflow.smart_sensor_operator.infra_failures
(count)
Number of infrastructure failures in the previous smart sensor poking loop
Shown as error
airflow.smart_sensor_operator.poked_exception
(count)
Number of exceptions in the previous smart sensor poking loop
Shown as error
airflow.smart_sensor_operator.poked_success
(count)
Number of newly succeeded tasks poked by the smart sensor in the previous poking loop
Shown as task
airflow.smart_sensor_operator.poked_tasks
(count)
Number of tasks poked by the smart sensor in the previous poking loop
Shown as task
airflow.task.instance_created
(gauge)
Task instances created
Shown as second
airflow.task_instance_created
(count)
Number of tasks instances created for a given Operator
Shown as task
airflow.task_removed_from_dag
(count)
Number of tasks removed for a given dag (i.e. task no longer exists in DAG)
Shown as task
airflow.task_restored_to_dag
(count)
Number of tasks restored for a given dag (i.e. task instance which was previously in REMOVED state in the DB is added to DAG file)
Shown as task
airflow.ti.finish
(count)
Number of completed task in a given dag.
Shown as task
airflow.ti.start
(count)
Number of started task in a given dag.
Shown as task
airflow.ti_failures
(count)
Overall task instances failures
Shown as task
airflow.ti_successes
(count)
Overall task instances successes
Shown as task
airflow.zombies_killed
(count)
Zombie tasks killed
Shown as task

: airflow.healthyairflow.can_connectairflow.dag.task.total_running、および airflow.dag.task.ongoing_duration のメトリクスは、インテグレーションの Agent 部分から収集されます。それ以外のすべてのメトリクスは StatsD から収集されます。

イベント

Airflow チェックには、イベントは含まれません。

サービスチェック

airflow.can_connect
Returns CRITICAL if unable to connect to Airflow. Returns OK otherwise.
Statuses: ok, critical

airflow.healthy
Returns CRITICAL if Airflow is not healthy. Returns OK otherwise.
Statuses: ok, critical

トラブルシューティング

Agent インテグレーションの HTTP 403 エラー

Airflow の API に対して認証済みリクエストを行うために、Datadog Agent のパラメーターを構成する必要がある場合があります。利用可能な構成オプションのいずれかを使用してください。

ご不明な点は、Datadog のサポートチームまでお問い合わせください。