Airflow
Datadog's Research Report: The State of Serverless Report: The State of Serverless

Airflow

Agent Check Agent Check

Supported OS: Linux Mac OS Windows

Overview

The Datadog Agent collects many metrics from Airflow, including those for:

  • DAGs (Directed Acyclic Graphs): Number of DAG processes, DAG bag size, etc.
  • Tasks: Task failures, successes, killed, etc.
  • Pools: Open slots, used slots, etc.
  • Executors: Open slots, queued tasks, running tasks, etc.

Metrics are collected through the Airflow StatsD plugin and sent to Datadog’s DogStatsD.

In addition to metrics, the Datadog Agent also sends service checks related to Airflow’s health.

Setup

Installation

All three steps below are needed for the Airflow integration to work properly. Before you begin, install the Datadog Agent version >=6.17 or >=7.17, which includes the StatsD/DogStatsD mapping feature.

Step 1: Configure Airflow to collect health metrics and service checks

Configure the Airflow check included in the Datadog Agent package to collect health metrics and service checks.

Edit the airflow.d/conf.yaml file, in the conf.d/ folder at the root of your Agent’s configuration directory to start collecting your Airflow service checks. See the sample airflow.d/conf.yaml for all available configuration options.

Step 2: Connect Airflow to DogStatsD (included in the Datadog Agent) by using Airflow statsd feature to collect metrics

  1. Install the Airflow StatsD plugin.

    pip install 'apache-airflow[statsd]'
  2. Update the Airflow configuration file airflow.cfg by adding the following configs:

    [scheduler]
    statsd_on = True
    statsd_host = localhost
    statsd_port = 8125
    statsd_prefix = airflow
  3. Update the Datadog Agent main configuration file datadog.yaml by adding the following configs:

    # dogstatsd_mapper_cache_size: 1000  # default to 1000
    dogstatsd_mapper_profiles:
     - name: airflow
       prefix: "airflow."
       mappings:
         - match: "airflow.*_start"
           name: "airflow.job.start"
           tags:
             job_name: "$1"
         - match: "airflow.*_end"
           name: "airflow.job.end"
           tags:
             job_name: "$1"
         - match: "airflow.operator_failures_*"
           name: "airflow.operator_failures"
           tags:
             operator_name: "$1"
         - match: "airflow.operator_successes_*"
           name: "airflow.operator_successes"
           tags:
             operator_name: "$1"
         - match: 'airflow\.dag_processing\.last_runtime\.(.*)'
           match_type: "regex"
           name: "airflow.dag_processing.last_runtime"
           tags:
             dag_file: "$1"
         - match: 'airflow\.dag_processing\.last_run\.seconds_ago\.(.*)'
           match_type: "regex"
           name: "airflow.dag_processing.last_run.seconds_ago"
           tags:
             dag_file: "$1"
         - match: 'airflow\.dag\.loading-duration\.(.*)'
           match_type: "regex"
           name: "airflow.dag.loading_duration"
           tags:
             dag_file: "$1"
         - match: "airflow.pool.open_slots.*"
           name: "airflow.pool.open_slots"
           tags:
             pool_name: "$1"
         - match: "airflow.pool.used_slots.*"
           name: "airflow.pool.used_slots"
           tags:
             pool_name: "$1"
         - match: "airflow.pool.starving_tasks.*"
           name: "airflow.pool.starving_tasks"
           tags:
             pool_name: "$1"
         - match: 'airflow\.dagrun\.dependency-check\.(.*)'
           match_type: "regex"
           name: "airflow.dagrun.dependency_check"
           tags:
             dag_id: "$1"
         - match: 'airflow\.dag\.(.*)\.([^.]*)\.duration'
           match_type: "regex"
           name: "airflow.dag.task.duration"
           tags:
             dag_id: "$1"
             task_id: "$2"
         - match: 'airflow\.dag_processing\.last_duration\.(.*)'
           match_type: "regex"
           name: "airflow.dag_processing.last_duration"
           tags:
             dag_file: "$1"
         - match: 'airflow\.dagrun\.duration\.success\.(.*)'
           match_type: "regex"
           name: "airflow.dagrun.duration.success"
           tags:
             dag_id: "$1"
         - match: 'airflow\.dagrun\.duration\.failed\.(.*)'
           match_type: "regex"
           name: "airflow.dagrun.duration.failed"
           tags:
             dag_id: "$1"
         - match: 'airflow\.dagrun\.schedule_delay\.(.*)'
           match_type: "regex"
           name: "airflow.dagrun.schedule_delay"
           tags:
             dag_id: "$1"
         - match: 'airflow\.task_removed_from_dag\.(.*)'
           match_type: "regex"
           name: "airflow.dag.task_removed"
           tags:
             dag_id: "$1"
         - match: 'airflow\.task_restored_to_dag\.(.*)'
           match_type: "regex"
           name: "airflow.dag.task_restored"
           tags:
             dag_id: "$1"
         - match: "airflow.task_instance_created-*"
           name: "airflow.task.instance_created"
           tags:
             task_class: "$1"

Step 3: Restart Datadog Agent and Airflow

  1. Restart the Agent.
  2. Restart Airflow to start sending your Airflow metrics to the Agent DogStatsD endpoint.

Integration Service Checks

Use the default configuration of your airflow.d/conf.yaml file to activate the collection of your Airflow service checks. See the sample airflow.d/conf.yaml for all available configuration options.

Log collection

Available for Agent versions >6.0

  1. Collecting logs is disabled by default in the Datadog Agent. Enable it in your datadog.yaml file:

    logs_enabled: true
  2. Uncomment and edit this configuration block at the bottom of your airflow.d/conf.yaml:

Change the path and service parameter values and configure them for your environment.

a. Configuration for DAG processor manager and Scheduler logs:

 ```yaml
 logs:
   - type: file
     path: '<PATH_TO_AIRFLOW>/logs/dag_processor_manager/dag_processor_manager.log'
     source: airflow
     service: '<SERVICE_NAME>'
     log_processing_rules:
       - type: multi_line
         name: new_log_start_with_date
         pattern: \[\d{4}\-\d{2}\-\d{2}
   - type: file
     path: '<PATH_TO_AIRFLOW>/logs/scheduler/*/*.log'
     source: airflow
     service: '<SERVICE_NAME>'
     log_processing_rules:
       - type: multi_line
         name: new_log_start_with_date
         pattern: \[\d{4}\-\d{2}\-\d{2}
 ```

 Regular clean up is recommended for scheduler logs with daily log rotation.

b. Additional configuration for DAG tasks logs:

 ```yaml
 logs:
   - type: file
     path: '<PATH_TO_AIRFLOW>/logs/*/*/*/*.log'
     source: airflow
     service: '<SERVICE_NAME>'
     log_processing_rules:
       - type: multi_line
         name: new_log_start_with_date
         pattern: \[\d{4}\-\d{2}\-\d{2}
 ```

 Caveat: By default Airflow uses this log file template for tasks: `log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{   try_number }}.log`. The number of log files will grow quickly if not cleaned regularly. This pattern is used by Airflow UI to display logs individually for each executed task.

 If you do not view logs in Airflow UI, Datadog recommends this configuration in `airflow.cfg`: `log_filename_template = dag_tasks.log`. Then log rotate this file and use this configuration:

 ```yaml
 logs:
   - type: file
     path: '<PATH_TO_AIRFLOW>/logs/dag_tasks.log'
     source: airflow
     service: '<SERVICE_NAME>'
     log_processing_rules:
       - type: multi_line
         name: new_log_start_with_date
         pattern: \[\d{4}\-\d{2}\-\d{2}
 ```
  1. Restart the Agent.

Validation

Run the Agent’s status subcommand and look for airflow under the Checks section.

Data Collected

Metrics

airflow.can_connect
(count)
1 if can connect to Airflow, otherwise 0
airflow.healthy
(count)
1 if Airflow is healthy, otherwise 0
airflow.job.start
(count)
Number of started `` job, ex. `SchedulerJob`, `LocalTaskJob`
Shown as job
airflow.job.end
(count)
Number of ended `` job, ex. `SchedulerJob`, `LocalTaskJob`
Shown as job
airflow.operator_failures
(count)
Operator `` failures
airflow.operator_successes
(count)
Operator `` successes
airflow.ti_failures
(count)
Overall task instances failures
Shown as task
airflow.ti_successes
(count)
Overall task instances successes
Shown as task
airflow.zombies_killed
(count)
Zombie tasks killed
Shown as task
airflow.scheduler_heartbeat
(count)
Scheduler heartbeats
airflow.dag_processing.processes
(count)
Number of currently running DAG parsing processes
airflow.scheduler.tasks.killed_externally
(gauge)
Number of tasks killed externally
Shown as task
airflow.dagbag_size
(gauge)
DAG bag size
airflow.dag_processing.import_errors
(gauge)
Number of errors from trying to parse DAG files
Shown as error
airflow.dag_processing.total_parse_time
(gauge)
Seconds taken to scan and import all DAG files once
Shown as second
airflow.dag_processing.last_runtime
(gauge)
Seconds spent processing `` (in most recent iteration)
Shown as second
airflow.dag_processing.last_run.seconds_ago
(gauge)
Seconds since `` was last processed
Shown as second
airflow.dag_processing.processor_timeouts
(gauge)
Number of file processors that have been killed due to taking too long
airflow.executor.open_slots
(gauge)
Number of open slots on executor
airflow.executor.queued_tasks
(gauge)
Number of queued tasks on executor
Shown as task
airflow.executor.running_tasks
(gauge)
Number of running tasks on executor
Shown as task
airflow.pool.open_slots
(gauge)
Number of open slots in the pool
airflow.pool.used_slots
(gauge)
Number of used slots in the pool
airflow.pool.starving_tasks
(gauge)
Number of starving tasks in the pool
Shown as task
airflow.dagrun.dependency_check
(gauge)
Milliseconds taken to check DAG dependencies
Shown as millisecond
airflow.dag.task.duration
(gauge)
Milliseconds taken to finish a task
Shown as millisecond
airflow.dag_processing.last_duration
(gauge)
Milliseconds taken to load the given DAG file
Shown as millisecond
airflow.dagrun.duration.success
(gauge)
Milliseconds taken for a DagRun to reach success state
Shown as millisecond
airflow.dagrun.duration.failed
(gauge)
Milliseconds taken for a DagRun to reach failed state
Shown as millisecond
airflow.dagrun.schedule_delay
(gauge)
Milliseconds of delay between the scheduled DagRun start date and the actual DagRun start date
Shown as millisecond
airflow.dag.loading_duration
(gauge)
DAG loading duration in seconds (deprecated)
Shown as second
airflow.dag.task_removed
(gauge)
Tasks removed from DAG
Shown as second
airflow.dag.task_restored
(gauge)
Tasks restored to DAG
Shown as second
airflow.task.instance_created
(gauge)
Task instances created
Shown as second

Service Checks

airflow.can_connect:

Returns CRITICAL if unable to connect to Airflow. Returns OK otherwise.

airflow.healthy:

Returns CRITICAL if Airflow is not healthy. Returns OK otherwise.

Events

The Airflow check does not include any events.

Annexe

Airflow DatadogHook

In addition, Airflow DatadogHook can be used to interact with Datadog:

  • Send Metric
  • Query Metric
  • Post Event

Troubleshooting

Need help? Contact Datadog support.