- 重要な情報
- はじめに
- 用語集
- ガイド
- エージェント
- インテグレーション
- OpenTelemetry
- 開発者
- API
- CoScreen
- アプリ内
- Service Management
- インフラストラクチャー
- アプリケーションパフォーマンス
- 継続的インテグレーション
- ログ管理
- セキュリティ
- UX モニタリング
- 管理
Supported OS
Datadog Agent は、以下のような多くのメトリクスを Airflow から収集します。
メトリクスは Airflow StatsD プラグインを通じて収集され、Datadog の DogStatsD へ送られます。
Datadog Agent はメトリクスだけでなく、Airflow の健全性に関するサービスチェックも送信します。
Airflow インテグレーションを適切に動作させるには、以下のステップをすべて実施する必要があります。ステップを開始する前に、StatsD/DogStatsD マッピング機能が含まれる Datadog Agent (バージョン 6.17 または 7.17
以降) をインストールしてください。
Airflow インテグレーションには 2 つの形式があります。まず、Airflow が接続でき、正常であるかどうかを報告するために、提供されたエンドポイントにリクエストを行う Datadog Agent インテグレーションがあります。次に、Airflow が Datadog Agent にメトリクスを送信するように Airflow を構成できる Airflow StatsD 部分があります。これにより、Airflow 表記を Datadog 表記に再マップできます。
Datadog Agent パッケージに含まれている Airflow チェックを構成して、ヘルスメトリクスとサービスチェックを収集します。これは、Agent のコンフィギュレーションディレクトリのルートにある conf.d/
フォルダーにある airflow.d/conf.yaml
ファイル内の url
を編集して、Airflow サービスチェックの収集を開始することで実行できます。利用可能なすべてのコンフィギュレーションオプションについては、airflow.d/conf.yaml のサンプルを参照してください。
url
が Airflow Web サーバー base_url
(Airflow インスタンスへの接続に使用する URL) に一致することを確認します。
Airflow の statsd
機能を使用してメトリクスを収集することにより、Airflow を DogStatsD (Datadog Agent に含まれる) に接続します。使用されている Airflow バージョンによって報告されるメトリクスと追加のコンフィギュレーションオプションの詳細については、以下の Airflow ドキュメントを参照してください。
注: Airflow により報告される StatsD メトリクスの有無は、使用される Airflow エグゼキューターにより異なる場合があります。たとえば、airflow.ti_failures/successes
、airflow.operator_failures/successes
、airflow.dag.task.duration
は KubernetesExecutor
に報告されません。
Airflow StatsD プラグインをインストールします。
pip install 'apache-airflow[statsd]'
下記のコンフィギュレーションを追加して、Airflow コンフィギュレーションファイル airflow.cfg
を更新します。
[scheduler]
statsd_on = True
# Hostname or IP of server running the Datadog Agent
statsd_host = localhost
# DogStatsD port configured in the Datadog Agent
statsd_port = 8125
statsd_prefix = airflow
下記のコンフィギュレーションを追加して、Datadog Agent のメインコンフィギュレーションファイルである datadog.yaml
を更新します。
# dogstatsd_mapper_cache_size: 1000 # default to 1000
dogstatsd_mapper_profiles:
- name: airflow
prefix: "airflow."
mappings:
- match: "airflow.*_start"
name: "airflow.job.start"
tags:
job_name: "$1"
- match: "airflow.*_end"
name: "airflow.job.end"
tags:
job_name: "$1"
- match: "airflow.*_heartbeat_failure"
name: airflow.job.heartbeat.failure
tags:
job_name: "$1"
- match: "airflow.operator_failures_*"
name: "airflow.operator_failures"
tags:
operator_name: "$1"
- match: "airflow.operator_successes_*"
name: "airflow.operator_successes"
tags:
operator_name: "$1"
- match: 'airflow\.dag_processing\.last_runtime\.(.*)'
match_type: "regex"
name: "airflow.dag_processing.last_runtime"
tags:
dag_file: "$1"
- match: 'airflow\.dag_processing\.last_run\.seconds_ago\.(.*)'
match_type: "regex"
name: "airflow.dag_processing.last_run.seconds_ago"
tags:
dag_file: "$1"
- match: 'airflow\.dag\.loading-duration\.(.*)'
match_type: "regex"
name: "airflow.dag.loading_duration"
tags:
dag_file: "$1"
- match: "airflow.dagrun.*.first_task_scheduling_delay"
name: "airflow.dagrun.first_task_scheduling_delay"
tags:
dag_id: "$1"
- match: "airflow.pool.open_slots.*"
name: "airflow.pool.open_slots"
tags:
pool_name: "$1"
- match: "airflow.pool.queued_slots.*"
name: "airflow.pool.queued_slots"
tags:
pool_name: "$1"
- match: "airflow.pool.running_slots.*"
name: "airflow.pool.running_slots"
tags:
pool_name: "$1"
- match: "airflow.pool.used_slots.*"
name: "airflow.pool.used_slots"
tags:
pool_name: "$1"
- match: "airflow.pool.starving_tasks.*"
name: "airflow.pool.starving_tasks"
tags:
pool_name: "$1"
- match: 'airflow\.dagrun\.dependency-check\.(.*)'
match_type: "regex"
name: "airflow.dagrun.dependency_check"
tags:
dag_id: "$1"
- match: 'airflow\.dag\.(.*)\.([^.]*)\.duration'
match_type: "regex"
name: "airflow.dag.task.duration"
tags:
dag_id: "$1"
task_id: "$2"
- match: 'airflow\.dag_processing\.last_duration\.(.*)'
match_type: "regex"
name: "airflow.dag_processing.last_duration"
tags:
dag_file: "$1"
- match: 'airflow\.dagrun\.duration\.success\.(.*)'
match_type: "regex"
name: "airflow.dagrun.duration.success"
tags:
dag_id: "$1"
- match: 'airflow\.dagrun\.duration\.failed\.(.*)'
match_type: "regex"
name: "airflow.dagrun.duration.failed"
tags:
dag_id: "$1"
- match: 'airflow\.dagrun\.schedule_delay\.(.*)'
match_type: "regex"
name: "airflow.dagrun.schedule_delay"
tags:
dag_id: "$1"
- match: 'airflow.scheduler.tasks.running'
name: "airflow.scheduler.tasks.running"
- match: 'airflow.scheduler.tasks.starving'
name: "airflow.scheduler.tasks.starving"
- match: 'airflow.sla_email_notification_failure'
name: 'airflow.sla_email_notification_failure'
- match: 'airflow\.task_removed_from_dag\.(.*)'
match_type: "regex"
name: "airflow.dag.task_removed"
tags:
dag_id: "$1"
- match: 'airflow\.task_restored_to_dag\.(.*)'
match_type: "regex"
name: "airflow.dag.task_restored"
tags:
dag_id: "$1"
- match: "airflow.task_instance_created-*"
name: "airflow.task.instance_created"
tags:
task_class: "$1"
- match: "airflow.ti.start.*.*"
name: "airflow.ti.start"
tags:
dag_id: "$1"
task_id: "$2"
- match: "airflow.ti.finish.*.*.*"
name: "airflow.ti.finish"
tags:
dag_id: "$1"
task_id: "$2"
state: "$3"
airflow.d/conf.yaml
ファイルのデフォルトコンフィギュレーションを使用して、Airflow サービスチェックを有効にします。利用可能なすべてのコンフィギュレーションオプションについては、airflow.d/conf.yaml のサンプルを参照してください。
Agent バージョン 6.0 以降で利用可能
Datadog Agent で、ログの収集はデフォルトで無効になっています。以下のように、datadog.yaml
ファイルでこれを有効にします。
logs_enabled: true
airflow.d/conf.yaml
の下部にある、コンフィギュレーションブロックのコメントを解除して編集します。
path
パラメーターと service
パラメーターの値を変更し、環境に合わせて構成してください。
DAG プロセッサーマネージャーと Scheduler のログのコンフィギュレーション
logs:
- type: file
path: "<PATH_TO_AIRFLOW>/logs/dag_processor_manager/dag_processor_manager.log"
source: airflow
log_processing_rules:
- type: multi_line
name: new_log_start_with_date
pattern: \[\d{4}\-\d{2}\-\d{2}
- type: file
path: "<PATH_TO_AIRFLOW>/logs/scheduler/latest/*.log"
source: airflow
log_processing_rules:
- type: multi_line
name: new_log_start_with_date
pattern: \[\d{4}\-\d{2}\-\d{2}
スケジューラーログを毎日ローテーションする場合は、ログを定期的にクリーンアップすることをお勧めします。
DAG タスクのログ用に追加するコンフィギュレーション
logs:
- type: file
path: "<PATH_TO_AIRFLOW>/logs/!(scheduler)/*/*.log"
source: airflow
log_processing_rules:
- type: multi_line
name: new_log_start_with_date
pattern: \[\d{4}\-\d{2}\-\d{2}
注意事項: デフォルトでは、Airflow は log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
のログファイルテンプレートをタスクに使用します。ログファイルの数は、定期的に削除しなければ急速に増加します。これは、実行された各タスクのログを Airflow UI が個別に表示するために使用するパターンです。
ログを Airflow UI で確認しない場合は、airflow.cfg
に log_filename_template = dag_tasks.log
を構成することをお勧めします。これにより、ログはこのファイルをローテーションすると同時に、以下のコンフィギュレーションを使用します。
logs:
- type: file
path: "<PATH_TO_AIRFLOW>/logs/dag_tasks.log"
source: airflow
log_processing_rules:
- type: multi_line
name: new_log_start_with_date
pattern: \[\d{4}\-\d{2}\-\d{2}
コンテナ環境の場合は、オートディスカバリーのインテグレーションテンプレートのガイドを参照して、次のパラメーターを適用してください。
パラメーター | 値 |
---|---|
<インテグレーション名> | airflow |
<初期コンフィギュレーション> | 空白または {} |
<インスタンスコンフィギュレーション> | {"url": "http://%%host%%:8080"} |
url
が Airflow Web サーバー base_url
(Airflow インスタンスへの接続に使用する URL) に一致することを確認します。localhost
をテンプレート変数 %%host%
に置き換えます。
Airflow の statsd
機能を使用してメトリクスを収集することにより、Airflow を DogStatsD (Datadog Agent に含まれる) に接続します。使用されている Airflow バージョンによって報告されるメトリクスと追加のコンフィギュレーションオプションの詳細については、以下の Airflow ドキュメントを参照してください。
注: Airflow により報告される StatsD メトリクスの有無は、使用される Airflow エグゼキューターにより異なる場合があります。たとえば、airflow.ti_failures/successes
、airflow.operator_failures/successes
、airflow.dag.task.duration
は KubernetesExecutor
に報告されません。
注: Airflow に使用される環境変数は、バージョン間で異なる場合があります。たとえば、Airflow 2.0.0
では、これは環境変数 AIRFLOW__METRICS__STATSD_HOST
を利用しますが、Airflow 1.10.15
は AIRFLOW__SCHEDULER__STATSD_HOST
を利用します。
Airflow StatsD コンフィギュレーションは、Kubernetes デプロイメントで次の環境変数を使用して有効にできます。
env:
- name: AIRFLOW__SCHEDULER__STATSD_ON
value: "True"
- name: AIRFLOW__SCHEDULER__STATSD_PORT
value: "8125"
- name: AIRFLOW__SCHEDULER__STATSD_PREFIX
value: "airflow"
- name: AIRFLOW__SCHEDULER__STATSD_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
ホストエンドポイント AIRFLOW__SCHEDULER__STATSD_HOST
の環境変数には、ノードのホスト IP アドレスが提供され、Airflow ポッドと同じノード上の Datadog Agent ポッドに StatsD データをルーティングします。この設定では、Agent がこのポート 8125
に対して hostPort
を開き、非ローカルの StatsD トラフィックを受け入れる必要もあります。詳細については、Kubernetes セットアップの DogStatsD を参照してください。
これにより、StatsD トラフィックが Airflow コンテナから受信データを受け入れる準備ができている Datadog Agent に転送されます。最後の部分は、対応する dogstatsd_mapper_profiles
で Datadog Agent を更新することです。これは、ホストインストールで提供されている dogstatsd_mapper_profiles
を datadog.yaml
ファイルにコピーすることで実行できます。または、環境変数 DD_DOGSTATSD_MAPPER_PROFILES
に同等の JSON コンフィギュレーションで Datadog Agent をデプロイします。Kubernetes に関して、同等の環境変数表記は次のとおりです。
env:
- name: DD_DOGSTATSD_MAPPER_PROFILES
value: >
[{"prefix":"airflow.","name":"airflow","mappings":[{"name":"airflow.job.start","match":"airflow.*_start","tags":{"job_name":"$1"}},{"name":"airflow.job.end","match":"airflow.*_end","tags":{"job_name":"$1"}},{"name":"airflow.job.heartbeat.failure","match":"airflow.*_heartbeat_failure","tags":{"job_name":"$1"}},{"name":"airflow.operator_failures","match":"airflow.operator_failures_*","tags":{"operator_name":"$1"}},{"name":"airflow.operator_successes","match":"airflow.operator_successes_*","tags":{"operator_name":"$1"}},{"match_type":"regex","name":"airflow.dag_processing.last_runtime","match":"airflow\\.dag_processing\\.last_runtime\\.(.*)","tags":{"dag_file":"$1"}},{"match_type":"regex","name":"airflow.dag_processing.last_run.seconds_ago","match":"airflow\\.dag_processing\\.last_run\\.seconds_ago\\.(.*)","tags":{"dag_file":"$1"}},{"match_type":"regex","name":"airflow.dag.loading_duration","match":"airflow\\.dag\\.loading-duration\\.(.*)","tags":{"dag_file":"$1"}},{"name":"airflow.dagrun.first_task_scheduling_delay","match":"airflow.dagrun.*.first_task_scheduling_delay","tags":{"dag_id":"$1"}},{"name":"airflow.pool.open_slots","match":"airflow.pool.open_slots.*","tags":{"pool_name":"$1"}},{"name":"airflow.pool.queued_slots","match":"airflow.pool.queued_slots.*","tags":{"pool_name":"$1"}},{"name":"airflow.pool.running_slots","match":"airflow.pool.running_slots.*","tags":{"pool_name":"$1"}},{"name":"airflow.pool.used_slots","match":"airflow.pool.used_slots.*","tags":{"pool_name":"$1"}},{"name":"airflow.pool.starving_tasks","match":"airflow.pool.starving_tasks.*","tags":{"pool_name":"$1"}},{"match_type":"regex","name":"airflow.dagrun.dependency_check","match":"airflow\\.dagrun\\.dependency-check\\.(.*)","tags":{"dag_id":"$1"}},{"match_type":"regex","name":"airflow.dag.task.duration","match":"airflow\\.dag\\.(.*)\\.([^.]*)\\.duration","tags":{"dag_id":"$1","task_id":"$2"}},{"match_type":"regex","name":"airflow.dag_processing.last_duration","match":"airflow\\.dag_processing\\.last_duration\\.(.*)","tags":{"dag_file":"$1"}},{"match_type":"regex","name":"airflow.dagrun.duration.success","match":"airflow\\.dagrun\\.duration\\.success\\.(.*)","tags":{"dag_id":"$1"}},{"match_type":"regex","name":"airflow.dagrun.duration.failed","match":"airflow\\.dagrun\\.duration\\.failed\\.(.*)","tags":{"dag_id":"$1"}},{"match_type":"regex","name":"airflow.dagrun.schedule_delay","match":"airflow\\.dagrun\\.schedule_delay\\.(.*)","tags":{"dag_id":"$1"}},{"name":"airflow.scheduler.tasks.running","match":"airflow.scheduler.tasks.running"},{"name":"airflow.scheduler.tasks.starving","match":"airflow.scheduler.tasks.starving"},{"name":"airflow.sla_email_notification_failure","match":"airflow.sla_email_notification_failure"},{"match_type":"regex","name":"airflow.dag.task_removed","match":"airflow\\.task_removed_from_dag\\.(.*)","tags":{"dag_id":"$1"}},{"match_type":"regex","name":"airflow.dag.task_restored","match":"airflow\\.task_restored_to_dag\\.(.*)","tags":{"dag_id":"$1"}},{"name":"airflow.task.instance_created","match":"airflow.task_instance_created-*","tags":{"task_class":"$1"}},{"name":"airflow.ti.start","match":"airflow.ti.start.*.*","tags":{"dag_id":"$1","task_id":"$2"}},{"name":"airflow.ti.finish","match":"airflow.ti.finish.*.*.*","tags":{"dag_id":"$1","state":"$3","task_id":"$2"}}]}]
設定の例については、Datadog integrations-core
レポジトリを参照してください。
Agent バージョン 6.0 以降で利用可能
Datadog Agent で、ログの収集はデフォルトで無効になっています。有効にする方法については、Kubernetes ログ収集を参照してください。
パラメーター | 値 |
---|---|
<LOG_CONFIG> | {"source": "airflow", "service": "<YOUR_APP_NAME>"} |
Agent の status サブコマンドを実行し、Checks セクションで airflow
を探します。
さらに、Datadog とのインタラクションに Airflow DatadogHook を使用することも可能です。
airflow.can_connect (count) | 1 if can connect to Airflow, otherwise 0 |
airflow.collect_db_dags (gauge) | Milliseconds taken for fetching all Serialized Dags from DB Shown as millisecond |
airflow.healthy (count) | 1 if Airflow is healthy, otherwise 0 |
airflow.job.start (count) | Number of started <job_name> job, ex. SchedulerJob , LocalTaskJob Shown as job |
airflow.job.end (count) | Number of ended <job_name> job, ex. SchedulerJob , LocalTaskJob Shown as job |
airflow.job.heartbeat.failure (count) | Number of failed Heartbeats for a <job_name> job, ex. SchedulerJob , LocalTaskJob Shown as error |
airflow.operator_failures (count) | Operator <operator_name> failures |
airflow.operator_successes (count) | Operator <operator_name> successes |
airflow.ti_failures (count) | Overall task instances failures Shown as task |
airflow.ti_successes (count) | Overall task instances successes Shown as task |
airflow.previously_succeeded (count) | Number of previously succeeded task instances Shown as task |
airflow.zombies_killed (count) | Zombie tasks killed Shown as task |
airflow.scheduler_heartbeat (count) | Scheduler heartbeats |
airflow.dag_processing.processes (count) | Number of currently running DAG parsing processes |
airflow.dag_processing.manager_stalls (count) | Number of stalled DagFileProcessorManager |
airflow.dag_file_refresh_error (count) | Number of failures loading any DAG files Shown as error |
airflow.scheduler.critical_section_duration (gauge) | Milliseconds spent in the critical section of scheduler loop -- only a single scheduler can enter this loop at a time Shown as millisecond |
airflow.scheduler.tasks.killed_externally (count) | Number of tasks killed externally Shown as task |
airflow.scheduler.orphaned_tasks.cleared (count) | Number of Orphaned tasks cleared by the Scheduler Shown as task |
airflow.scheduler.orphaned_tasks.adopted (count) | Number of Orphaned tasks adopted by the Scheduler Shown as task |
airflow.scheduler.critical_section_busy (count) | Count of times a scheduler process tried to get a lock on the critical section (needed to send tasks to the executor) and found it locked by another process. Shown as operation |
airflow.scheduler.tasks.running (count) | Number of tasks running in executor Shown as task |
airflow.scheduler.tasks.starving (count) | Number of tasks that cannot be scheduled because of no open slot in pool Shown as task |
airflow.dagbag_size (gauge) | DAG bag size |
airflow.dag_processing.import_errors (gauge) | Number of errors from trying to parse DAG files Shown as error |
airflow.dag_processing.total_parse_time (gauge) | Seconds taken to scan and import all DAG files once Shown as second |
airflow.dag_processing.last_runtime (gauge) | Seconds spent processing <dag_file> (in most recent iteration)Shown as second |
airflow.dag_processing.last_run.seconds_ago (gauge) | Seconds since <dag_file> was last processedShown as second |
airflow.dag_processing.processor_timeouts (gauge) | Number of file processors that have been killed due to taking too long |
airflow.executor.open_slots (gauge) | Number of open slots on executor |
airflow.executor.queued_tasks (gauge) | Number of queued tasks on executor Shown as task |
airflow.executor.running_tasks (gauge) | Number of running tasks on executor Shown as task |
airflow.pool.open_slots (gauge) | Number of open slots in the pool |
airflow.pool.queued_slots (gauge) | Number of queued slots in the pool |
airflow.pool.used_slots (gauge) | Number of used slots in the pool |
airflow.pool.running_slots (gauge) | Number of running slots in the pool |
airflow.pool.starving_tasks (gauge) | Number of starving tasks in the pool Shown as task |
airflow.dagrun.dependency_check (gauge) | Milliseconds taken to check DAG dependencies Shown as millisecond |
airflow.dag.task.duration (gauge) | Milliseconds taken to finish a task Shown as millisecond |
airflow.dag_processing.last_duration (gauge) | Milliseconds taken to load the given DAG file Shown as millisecond |
airflow.dagrun.duration.success (gauge) | Milliseconds taken for a DagRun to reach success state Shown as millisecond |
airflow.dagrun.duration.failed (gauge) | Milliseconds taken for a DagRun to reach failed state Shown as millisecond |
airflow.dagrun.schedule_delay (gauge) | Milliseconds of delay between the scheduled DagRun start date and the actual DagRun start date Shown as millisecond |
airflow.dagrun.first_task_scheduling_delay (gauge) | Milliseconds elapsed between first task start_date and dagrun expected start Shown as millisecond |
airflow.dag.loading_duration (gauge) | DAG loading duration in seconds (deprecated) Shown as second |
airflow.dag.task_removed (gauge) | Tasks removed from DAG Shown as second |
airflow.dag.task_restored (gauge) | Tasks restored to DAG Shown as second |
airflow.sla_email_notification_failure (count) | Number of failed SLA miss email notification attempts Shown as task |
airflow.task.instance_created (gauge) | Task instances created Shown as second |
airflow.ti.start (count) | Number of started task in a given dag. Shown as task |
airflow.ti.finish (count) | Number of completed task in a given dag. Shown as task |
airflow.dag.callback_exceptions (count) | Number of exceptions raised from DAG callbacks. When this happens, it means DAG callback is not working Shown as error |
airflow.celery.task_timeout_error (count) | Number of AirflowTaskTimeout errors raised when publishing Task to Celery Broker.Shown as error |
airflow.task_removed_from_dag (count) | Number of tasks removed for a given dag (i.e. task no longer exists in DAG) Shown as task |
airflow.task_restored_to_dag (count) | Number of tasks restored for a given dag (i.e. task instance which was previously in REMOVED state in the DB is added to DAG file) Shown as task |
airflow.task_instance_created (count) | Number of tasks instances created for a given Operator Shown as task |
airflow.scheduler.tasks.without_dagrun (count) | Number of tasks without DagRuns or with DagRuns not in Running state Shown as task |
airflow.scheduler.tasks.executable (count) | Number of tasks that are ready for execution (set to queued) with respect to pool limits, dag concurrency, executor state, and priority. Shown as task |
airflow.smart_sensor_operator.poked_tasks (count) | Number of tasks poked by the smart sensor in the previous poking loop Shown as task |
airflow.smart_sensor_operator.poked_success (count) | Number of newly succeeded tasks poked by the smart sensor in the previous poking loop Shown as task |
airflow.smart_sensor_operator.poked_exception (count) | Number of exceptions in the previous smart sensor poking loop Shown as error |
airflow.smart_sensor_operator.exception_failures (count) | Number of failures caused by exception in the previous smart sensor poking loop Shown as error |
airflow.smart_sensor_operator.infra_failures (count) | Number of infrastructure failures in the previous smart sensor poking loop Shown as error |
Airflow チェックには、イベントは含まれません。
airflow.can_connect
Returns CRITICAL
if unable to connect to Airflow. Returns OK
otherwise.
Statuses: ok, critical
airflow.healthy
Returns CRITICAL
if Airflow is not healthy. Returns OK
otherwise.
Statuses: ok, critical
ご不明な点は、Datadog のサポートチームまでお問合せください。