Airflow

Airflow

Agent Check Agent Check

Supported OS Linux Mac OS Windows

Integrationv2.1.0

Overview

The Datadog Agent collects many metrics from Airflow, including those for:

  • DAGs (Directed Acyclic Graphs): Number of DAG processes, DAG bag size, etc.
  • Tasks: Task failures, successes, killed, etc.
  • Pools: Open slots, used slots, etc.
  • Executors: Open slots, queued tasks, running tasks, etc.

Metrics are collected through the Airflow StatsD plugin and sent to Datadog’s DogStatsD.

In addition to metrics, the Datadog Agent also sends service checks related to Airflow’s health.

Setup

Installation

All steps below are needed for the Airflow integration to work properly. Before you begin, install the Datadog Agent version >=6.17 or >=7.17, which includes the StatsD/DogStatsD mapping feature.

Configuration

There are two forms of the Airflow integration. There is the Datadog Agent integration which will make requests to a provided endpoint for Airflow to report whether it can connect and is healthy. Then there is the Airflow StatsD portion where Airflow can be configured to send metrics to the Datadog Agent, which can remap the Airflow notation to a Datadog notation.

Host

Configure Datadog Agent Airflow Integration

Configure the Airflow check included in the Datadog Agent package to collect health metrics and service checks. This can be done by editing the url within the airflow.d/conf.yaml file, in the conf.d/ folder at the root of your Agent’s configuration directory, to start collecting your Airflow service checks. See the sample airflow.d/conf.yaml for all available configuration options.

Connect Airflow to DogStatsD

Connect Airflow to DogStatsD (included in the Datadog Agent) by using the Airflow statsd feature to collect metrics. For more information about the metrics reported by the Airflow version used and the additional configuration options, see the Airflow documentation below:

Note: Presence or absence of StatsD metrics reported by Airflow might vary depending on the Airflow Executor used. For example: airflow.ti_failures/successes, airflow.operator_failures/successes, airflow.dag.task.duration are not reported for KubernetesExecutor.

  1. Install the Airflow StatsD plugin.

    pip install 'apache-airflow[statsd]'
    
  2. Update the Airflow configuration file airflow.cfg by adding the following configs:

    [scheduler]
    statsd_on = True
    statsd_host = localhost  # Hostname or IP of server running the Datadog Agent
    statsd_port = 8125       # DogStatsD port configured in the Datadog Agent
    statsd_prefix = airflow
    
  3. Update the Datadog Agent main configuration file datadog.yaml by adding the following configs:

    # dogstatsd_mapper_cache_size: 1000  # default to 1000
    dogstatsd_mapper_profiles:
      - name: airflow
        prefix: "airflow."
        mappings:
          - match: "airflow.*_start"
            name: "airflow.job.start"
            tags:
              job_name: "$1"
          - match: "airflow.*_end"
            name: "airflow.job.end"
            tags:
              job_name: "$1"
          - match: "airflow.*_heartbeat_failure"
            name: airflow.job.heartbeat.failure
            tags:
              job_name: "$1"
          - match: "airflow.operator_failures_*"
            name: "airflow.operator_failures"
            tags:
              operator_name: "$1"
          - match: "airflow.operator_successes_*"
            name: "airflow.operator_successes"
            tags:
              operator_name: "$1"
          - match: 'airflow\.dag_processing\.last_runtime\.(.*)'
            match_type: "regex"
            name: "airflow.dag_processing.last_runtime"
            tags:
              dag_file: "$1"
          - match: 'airflow\.dag_processing\.last_run\.seconds_ago\.(.*)'
            match_type: "regex"
            name: "airflow.dag_processing.last_run.seconds_ago"
            tags:
              dag_file: "$1"
          - match: 'airflow\.dag\.loading-duration\.(.*)'
            match_type: "regex"
            name: "airflow.dag.loading_duration"
            tags:
              dag_file: "$1"
          - match: "airflow.dagrun.*.first_task_scheduling_delay"
            name: "airflow.dagrun.first_task_scheduling_delay"
            tags:
              dag_id: "$1"
          - match: "airflow.pool.open_slots.*"
            name: "airflow.pool.open_slots"
            tags:
              pool_name: "$1"
          - match: "pool.queued_slots.*"
            name: "airflow.pool.queued_slots"
            tags:
              pool_name: "$1"
          - match: "pool.running_slots.*"
            name: "airflow.pool.running_slots"
            tags:
              pool_name: "$1"
          - match: "airflow.pool.used_slots.*"
            name: "airflow.pool.used_slots"
            tags:
              pool_name: "$1"
          - match: "airflow.pool.starving_tasks.*"
            name: "airflow.pool.starving_tasks"
            tags:
              pool_name: "$1"
          - match: 'airflow\.dagrun\.dependency-check\.(.*)'
            match_type: "regex"
            name: "airflow.dagrun.dependency_check"
            tags:
              dag_id: "$1"
          - match: 'airflow\.dag\.(.*)\.([^.]*)\.duration'
            match_type: "regex"
            name: "airflow.dag.task.duration"
            tags:
              dag_id: "$1"
              task_id: "$2"
          - match: 'airflow\.dag_processing\.last_duration\.(.*)'
            match_type: "regex"
            name: "airflow.dag_processing.last_duration"
            tags:
              dag_file: "$1"
          - match: 'airflow\.dagrun\.duration\.success\.(.*)'
            match_type: "regex"
            name: "airflow.dagrun.duration.success"
            tags:
              dag_id: "$1"
          - match: 'airflow\.dagrun\.duration\.failed\.(.*)'
            match_type: "regex"
            name: "airflow.dagrun.duration.failed"
            tags:
              dag_id: "$1"
          - match: 'airflow\.dagrun\.schedule_delay\.(.*)'
            match_type: "regex"
            name: "airflow.dagrun.schedule_delay"
            tags:
              dag_id: "$1"
          - match: 'scheduler.tasks.running'
            name: "airflow.scheduler.tasks.running"
          - match: 'scheduler.tasks.starving'
            name: "airflow.scheduler.tasks.starving"
          - match: sla_email_notification_failure
            name: 'airflow.sla_email_notification_failure'
          - match: 'airflow\.task_removed_from_dag\.(.*)'
            match_type: "regex"
            name: "airflow.dag.task_removed"
            tags:
              dag_id: "$1"
          - match: 'airflow\.task_restored_to_dag\.(.*)'
            match_type: "regex"
            name: "airflow.dag.task_restored"
            tags:
              dag_id: "$1"
          - match: "airflow.task_instance_created-*"
            name: "airflow.task.instance_created"
            tags:
              task_class: "$1"
          - match: "ti.start.*.*"
            name: "airflow.ti.start"
            tags:
              dagid: "$1"
              taskid: "$2"
          - match: "ti.finish.*.*.*"
            name: "airflow.ti.finish"
            tags:
              dagid: "$1"
              taskid: "$2"
              state: "$3"
    
Restart Datadog Agent and Airflow
  1. Restart the Agent.
  2. Restart Airflow to start sending your Airflow metrics to the Agent DogStatsD endpoint.
Integration service checks

Use the default configuration in your airflow.d/conf.yaml file to activate your Airflow service checks. See the sample airflow.d/conf.yaml for all available configuration options.

Log collection

Available for Agent versions >6.0

  1. Collecting logs is disabled by default in the Datadog Agent. Enable it in your datadog.yaml file:

    logs_enabled: true
    
  2. Uncomment and edit this configuration block at the bottom of your airflow.d/conf.yaml: Change the path and service parameter values and configure them for your environment.

    • Configuration for DAG processor manager and Scheduler logs:

      logs:
        - type: file
          path: "<PATH_TO_AIRFLOW>/logs/dag_processor_manager/dag_processor_manager.log"
          source: airflow
          log_processing_rules:
            - type: multi_line
              name: new_log_start_with_date
              pattern: \[\d{4}\-\d{2}\-\d{2}
        - type: file
          path: "<PATH_TO_AIRFLOW>/logs/scheduler/latest/*.log"
          source: airflow
          log_processing_rules:
            - type: multi_line
              name: new_log_start_with_date
              pattern: \[\d{4}\-\d{2}\-\d{2}
      

      Regular clean up is recommended for scheduler logs with daily log rotation.

    • Additional configuration for DAG tasks logs:

      logs:
        - type: file
          path: "<PATH_TO_AIRFLOW>/logs/!(scheduler)/*/*.log"
          source: airflow
          log_processing_rules:
            - type: multi_line
              name: new_log_start_with_date
              pattern: \[\d{4}\-\d{2}\-\d{2}
      

      Caveat: By default Airflow uses this log file template for tasks: log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log. The number of log files will grow quickly if not cleaned regularly. This pattern is used by Airflow UI to display logs individually for each executed task.

      If you do not view logs in Airflow UI, Datadog recommends this configuration in airflow.cfg: log_filename_template = dag_tasks.log. Then log rotate this file and use this configuration:

      logs:
        - type: file
          path: "<PATH_TO_AIRFLOW>/logs/dag_tasks.log"
          source: airflow
          log_processing_rules:
            - type: multi_line
              name: new_log_start_with_date
              pattern: \[\d{4}\-\d{2}\-\d{2}
      
  3. Restart the Agent.

Containerized

Configure Datadog Agent Airflow Integration

For containerized environments, see the Autodiscovery Integration Templates for guidance on applying the parameters below.

ParameterValue
<INTEGRATION_NAME>airflow
<INIT_CONFIG>blank or {}
<INSTANCE_CONFIG>{"url": "http://%%host%%"}
Connect Airflow to DogStatsD

Connect Airflow to DogStatsD (included in the Datadog Agent) by using the Airflow statsd feature to collect metrics. For more information about the metrics reported by the Airflow version used and the additional configuration options, see the Airflow documentation below:

Note: Presence or absence of StatsD metrics reported by Airflow might vary depending on the Airflow Executor used. For example: airflow.ti_failures/successes, airflow.operator_failures/successes, airflow.dag.task.duration are not reported for KubernetesExecutor.

Note: The environment variables used for Airflow may differ between versions. For example in Airflow 2.0.0 this utilizes the environment variable AIRFLOW__METRICS__STATSD_HOST, whereas Airflow 1.10.15 utilizes AIRFLOW__SCHEDULER__STATSD_HOST.

The Airflow StatsD configuration can be enabled with the following environment variables in a Kubernetes Deployment:

env:
  - name: AIRFLOW__SCHEDULER__STATSD_ON
    value: "True"
  - name: AIRFLOW__SCHEDULER__STATSD_PORT
    value: "8125"
  - name: AIRFLOW__SCHEDULER__STATSD_PREFIX
    value: "airflow"
  - name: AIRFLOW__SCHEDULER__STATSD_HOST
    valueFrom:
      fieldRef:
        fieldPath: status.hostIP

The environment variable for the host endpoint AIRFLOW__SCHEDULER__STATSD_HOST is supplied with the node’s host IP address to route the StatsD data to the Datadog Agent pod on the same node as the Airflow pod. This setup also requires the Agent to have a hostPort open for this port 8125 and accepting non-local StatsD traffic. For more information, see DogStatsD on Kubernetes Setup here.

This should direct the StatsD traffic from the Airflow container to a Datadog Agent ready to accept the incoming data. The last portion is to update the Datadog Agent with the corresponding dogstatsd_mapper_profiles . This can be done by copying the dogstatsd_mapper_profiles provided in the Host installation into your datadog.yaml file. Or by deploying your Datadog Agent with the equivalent JSON configuration in the environment variable DD_DOGSTATSD_MAPPER_PROFILES. With respect to Kubernetes the equivalent environment variable notation is:

env: 
  - name: DD_DOGSTATSD_MAPPER_PROFILES
    value: >
            [{"prefix":"airflow.","name":"airflow","mappings":[{"name":"airflow.job.start","match":"airflow.*_start","tags":{"job_name":"$1"}},{"name":"airflow.job.end","match":"airflow.*_end","tags":{"job_name":"$1"}},{"name":"airflow.job.heartbeat.failure","match":"airflow.*_heartbeat_failure","tags":{"job_name":"$1"}},{"name":"airflow.operator_failures","match":"airflow.operator_failures_*","tags":{"operator_name":"$1"}},{"name":"airflow.operator_successes","match":"airflow.operator_successes_*","tags":{"operator_name":"$1"}},{"match_type":"regex","name":"airflow.dag_processing.last_runtime","match":"airflow\\.dag_processing\\.last_runtime\\.(.*)","tags":{"dag_file":"$1"}},{"match_type":"regex","name":"airflow.dag_processing.last_run.seconds_ago","match":"airflow\\.dag_processing\\.last_run\\.seconds_ago\\.(.*)","tags":{"dag_file":"$1"}},{"match_type":"regex","name":"airflow.dag.loading_duration","match":"airflow\\.dag\\.loading-duration\\.(.*)","tags":{"dag_file":"$1"}},{"name":"airflow.dagrun.first_task_scheduling_delay","match":"airflow.dagrun.*.first_task_scheduling_delay","tags":{"dag_id":"$1"}},{"name":"airflow.pool.open_slots","match":"airflow.pool.open_slots.*","tags":{"pool_name":"$1"}},{"name":"airflow.pool.queued_slots","match":"pool.queued_slots.*","tags":{"pool_name":"$1"}},{"name":"airflow.pool.running_slots","match":"pool.running_slots.*","tags":{"pool_name":"$1"}},{"name":"airflow.pool.used_slots","match":"airflow.pool.used_slots.*","tags":{"pool_name":"$1"}},{"name":"airflow.pool.starving_tasks","match":"airflow.pool.starving_tasks.*","tags":{"pool_name":"$1"}},{"match_type":"regex","name":"airflow.dagrun.dependency_check","match":"airflow\\.dagrun\\.dependency-check\\.(.*)","tags":{"dag_id":"$1"}},{"match_type":"regex","name":"airflow.dag.task.duration","match":"airflow\\.dag\\.(.*)\\.([^.]*)\\.duration","tags":{"dag_id":"$1","task_id":"$2"}},{"match_type":"regex","name":"airflow.dag_processing.last_duration","match":"airflow\\.dag_processing\\.last_duration\\.(.*)","tags":{"dag_file":"$1"}},{"match_type":"regex","name":"airflow.dagrun.duration.success","match":"airflow\\.dagrun\\.duration\\.success\\.(.*)","tags":{"dag_id":"$1"}},{"match_type":"regex","name":"airflow.dagrun.duration.failed","match":"airflow\\.dagrun\\.duration\\.failed\\.(.*)","tags":{"dag_id":"$1"}},{"match_type":"regex","name":"airflow.dagrun.schedule_delay","match":"airflow\\.dagrun\\.schedule_delay\\.(.*)","tags":{"dag_id":"$1"}},{"name":"airflow.scheduler.tasks.running","match":"scheduler.tasks.running"},{"name":"airflow.scheduler.tasks.starving","match":"scheduler.tasks.starving"},{"name":"airflow.sla_email_notification_failure","match":"sla_email_notification_failure"},{"match_type":"regex","name":"airflow.dag.task_removed","match":"airflow\\.task_removed_from_dag\\.(.*)","tags":{"dag_id":"$1"}},{"match_type":"regex","name":"airflow.dag.task_restored","match":"airflow\\.task_restored_to_dag\\.(.*)","tags":{"dag_id":"$1"}},{"name":"airflow.task.instance_created","match":"airflow.task_instance_created-*","tags":{"task_class":"$1"}},{"name":"airflow.ti.start","match":"ti.start.*.*","tags":{"dagid":"$1","taskid":"$2"}},{"name":"airflow.ti.finish","match":"ti.finish.*.*.*","tags":{"dagid":"$1","state":"$3","taskid":"$2"}}]}]

See the Datadog integrations-core repo for an example setup.

Log collection

Available for Agent versions >6.0

Collecting logs is disabled by default in the Datadog Agent. To enable it, see Kubernetes log collection documentation.

ParameterValue
<LOG_CONFIG>{"source": "airflow", "service": "<YOUR_APP_NAME>"}

Validation

Run the Agent’s status subcommand and look for airflow under the Checks section.

Annexe

Airflow DatadogHook

In addition, Airflow DatadogHook can be used to interact with Datadog:

  • Send Metric
  • Query Metric
  • Post Event

Data Collected

Metrics

airflow.can_connect
(count)
1 if can connect to Airflow, otherwise 0
airflow.collect_db_dags
(gauge)
Milliseconds taken for fetching all Serialized Dags from DB
Shown as millisecond
airflow.healthy
(count)
1 if Airflow is healthy, otherwise 0
airflow.job.start
(count)
Number of started `` job, ex. `SchedulerJob`, `LocalTaskJob`
Shown as job
airflow.job.end
(count)
Number of ended `` job, ex. `SchedulerJob`, `LocalTaskJob`
Shown as job
airflow.job.heartbeat.failure
(count)
Number of failed Heartbeats for a `` job, ex. `SchedulerJob`, `LocalTaskJob`
Shown as error
airflow.operator_failures
(count)
Operator `` failures
airflow.operator_successes
(count)
Operator `` successes
airflow.ti_failures
(count)
Overall task instances failures
Shown as task
airflow.ti_successes
(count)
Overall task instances successes
Shown as task
airflow.previously_succeeded
(count)
Number of previously succeeded task instances
Shown as task
airflow.zombies_killed
(count)
Zombie tasks killed
Shown as task
airflow.scheduler_heartbeat
(count)
Scheduler heartbeats
airflow.dag_processing.processes
(count)
Number of currently running DAG parsing processes
airflow.dag_processing.manager_stalls
(count)
Number of stalled `DagFileProcessorManager`
airflow.dag_file_refresh_error
(count)
Number of failures loading any DAG files
Shown as error
airflow.scheduler.critical_section_duration
(gauge)
Milliseconds spent in the critical section of scheduler loop -- only a single scheduler can enter this loop at a time
Shown as millisecond
airflow.scheduler.tasks.killed_externally
(count)
Number of tasks killed externally
Shown as task
airflow.scheduler.orphaned_tasks.cleared
(count)
Number of Orphaned tasks cleared by the Scheduler
Shown as task
airflow.scheduler.orphaned_tasks.adopted
(count)
Number of Orphaned tasks adopted by the Scheduler
Shown as task
airflow.scheduler.critical_section_busy
(count)
Count of times a scheduler process tried to get a lock on the critical section (needed to send tasks to the executor) and found it locked by another process.
Shown as operation
airflow.scheduler.tasks.running
(count)
Number of tasks running in executor
Shown as task
airflow.scheduler.tasks.starving
(count)
Number of tasks that cannot be scheduled because of no open slot in pool
Shown as task
airflow.dagbag_size
(gauge)
DAG bag size
airflow.dag_processing.import_errors
(gauge)
Number of errors from trying to parse DAG files
Shown as error
airflow.dag_processing.total_parse_time
(gauge)
Seconds taken to scan and import all DAG files once
Shown as second
airflow.dag_processing.last_runtime
(gauge)
Seconds spent processing `` (in most recent iteration)
Shown as second
airflow.dag_processing.last_run.seconds_ago
(gauge)
Seconds since `` was last processed
Shown as second
airflow.dag_processing.processor_timeouts
(gauge)
Number of file processors that have been killed due to taking too long
airflow.executor.open_slots
(gauge)
Number of open slots on executor
airflow.executor.queued_tasks
(gauge)
Number of queued tasks on executor
Shown as task
airflow.executor.running_tasks
(gauge)
Number of running tasks on executor
Shown as task
airflow.pool.open_slots
(gauge)
Number of open slots in the pool
airflow.pool.queued_slots
(gauge)
Number of queued slots in the pool
airflow.pool.used_slots
(gauge)
Number of used slots in the pool
airflow.pool.running_slots
(gauge)
Number of running slots in the pool
airflow.pool.starving_tasks
(gauge)
Number of starving tasks in the pool
Shown as task
airflow.dagrun.dependency_check
(gauge)
Milliseconds taken to check DAG dependencies
Shown as millisecond
airflow.dag.task.duration
(gauge)
Milliseconds taken to finish a task
Shown as millisecond
airflow.dag_processing.last_duration
(gauge)
Milliseconds taken to load the given DAG file
Shown as millisecond
airflow.dagrun.duration.success
(gauge)
Milliseconds taken for a DagRun to reach success state
Shown as millisecond
airflow.dagrun.duration.failed
(gauge)
Milliseconds taken for a DagRun to reach failed state
Shown as millisecond
airflow.dagrun.schedule_delay
(gauge)
Milliseconds of delay between the scheduled DagRun start date and the actual DagRun start date
Shown as millisecond
airflow.dagrun.first_task_scheduling_delay
(gauge)
Milliseconds elapsed between first task start_date and dagrun expected start
Shown as millisecond
airflow.dag.loading_duration
(gauge)
DAG loading duration in seconds (deprecated)
Shown as second
airflow.dag.task_removed
(gauge)
Tasks removed from DAG
Shown as second
airflow.dag.task_restored
(gauge)
Tasks restored to DAG
Shown as second
airflow.sla_email_notification_failure
(count)
Number of failed SLA miss email notification attempts
Shown as task
airflow.task.instance_created
(gauge)
Task instances created
Shown as second
airflow.ti.start
(count)
Number of started task in a given dag.
Shown as task
airflow.ti.finish
(count)
Number of completed task in a given dag.
Shown as task
airflow.dag.callback_exceptions
(count)
Number of exceptions raised from DAG callbacks. When this happens, it means DAG callback is not working
Shown as error
airflow.celery.task_timeout_error
(count)
Number of `AirflowTaskTimeout` errors raised when publishing Task to Celery Broker.
Shown as error
airflow.task_removed_from_dag
(count)
Number of tasks removed for a given dag (i.e. task no longer exists in DAG)
Shown as task
airflow.task_restored_to_dag
(count)
Number of tasks restored for a given dag (i.e. task instance which was previously in REMOVED state in the DB is added to DAG file)
Shown as task
airflow.task_instance_created
(count)
Number of tasks instances created for a given Operator
Shown as task
airflow.scheduler.tasks.without_dagrun
(count)
Number of tasks without DagRuns or with DagRuns not in Running state
Shown as task
airflow.scheduler.tasks.executable
(count)
Number of tasks that are ready for execution (set to queued) with respect to pool limits, dag concurrency, executor state, and priority.
Shown as task
airflow.smart_sensor_operator.poked_tasks
(count)
Number of tasks poked by the smart sensor in the previous poking loop
Shown as task
airflow.smart_sensor_operator.poked_success
(count)
Number of newly succeeded tasks poked by the smart sensor in the previous poking loop
Shown as task
airflow.smart_sensor_operator.poked_exception
(count)
Number of exceptions in the previous smart sensor poking loop
Shown as error
airflow.smart_sensor_operator.exception_failures
(count)
Number of failures caused by exception in the previous smart sensor poking loop
Shown as error
airflow.smart_sensor_operator.infra_failures
(count)
Number of infrastructure failures in the previous smart sensor poking loop
Shown as error

Events

The Airflow check does not include any events.

Service Checks

airflow.can_connect
Returns CRITICAL if unable to connect to Airflow. Returns OK otherwise.
Statuses: ok, critical

airflow.healthy
Returns CRITICAL if Airflow is not healthy. Returns OK otherwise.
Statuses: ok, critical

Troubleshooting

Need help? Contact Datadog support.