---
title: Airflow
description: Tracks metrics related to DAGs, tasks, pools, executors, etc
breadcrumbs: Docs > Integrations > Airflow
---

# Airflow
Supported OS Integration version7.3.0
{% alert level="info" %}
[Data Observability: Jobs Monitoring](https://docs.datadoghq.com/data_observability/jobs_monitoring/) provides out-of-the-box tracing for Airflow DAG runs, helping you quickly troubleshoot problematic tasks, correlate DAG runs to logs, and understand complex pipelines with data lineage across DAGs.**Note**: This page covers only the documentation for collecting Airflow integration metrics and logs using the Datadog Agent.
{% /alert %}

## Overview{% #overview %}

The Datadog Agent collects many metrics from Airflow, including those for:

- DAGs (Directed Acyclic Graphs): Number of DAG processes, DAG bag size, etc.
- Tasks: Task failures, successes, killed, etc.
- Pools: Open slots, used slots, etc.
- Executors: Open slots, queued tasks, running tasks, etc.

Metrics are collected through the [Airflow StatsD](https://airflow.apache.org/docs/stable/metrics.html) plugin and sent to Datadog's [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/).

In addition to metrics, the Datadog Agent also sends service checks related to Airflow's health.

**Minimum Agent version:** 7.17.0

## Setup{% #setup %}

### Installation{% #installation %}

All steps below are needed for the Airflow integration to work properly. Before you begin, [install the Datadog Agent](https://docs.datadoghq.com/agent/) version `>=6.17` or `>=7.17`, which includes the StatsD/DogStatsD mapping feature.

### Configuration{% #configuration %}

There are two parts of the Airflow integration:

- The Datadog Agent portion, which makes requests to a provided endpoint for Airflow to report whether it can connect and is healthy. The Agent integration also queries Airflow to produce some of its own metrics. *Support for Airflow V1 and V2*.
- The Airflow StatsD portion, where Airflow can be configured to send metrics to the Datadog Agent, which can remap the Airflow notation to a Datadog notation. *Support for Airflow V1, V2, and V3*.

The Airflow integration's metrics come from both the Agent and StatsD portions.

{% tab title="Host" %}
#### Host{% #host %}

##### Configure Datadog Agent Airflow integration{% #configure-datadog-agent-airflow-integration %}

**Note:** The Datadog Agent's `airflow` integration does not support Airflow V3.

Configure the Agent's `airflow` check included in the [Datadog Agent](https://app.datadoghq.com/account/settings/agent/latest) package to collect health metrics and service checks. This can be done by editing the `url` within the `airflow.d/conf.yaml` file, in the `conf.d/` folder at the root of your Agent's configuration directory, to start collecting your Airflow service checks. See the [sample airflow.d/conf.yaml](https://github.com/DataDog/integrations-core/blob/master/airflow/datadog_checks/airflow/data/conf.yaml.example) for all available configuration options.

Ensure that `url` matches your Airflow [webserver `base_url`](https://airflow.apache.org/docs/apache-airflow/2.11.0/configurations-ref.html#base-url), the URL used to connect to your Airflow instance.

##### Connect Airflow to DogStatsD{% #connect-airflow-to-dogstatsd %}

Connect Airflow to DogStatsD (included in the Datadog Agent) by using the Airflow `statsd` feature to collect metrics. For more information about the metrics reported by the Airflow version used and the additional configuration options, see the Airflow documentation below:

- [Airflow Metrics](https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html)
- [Airflow Metrics Configuration](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#metrics)

**Note**: Presence or absence of StatsD metrics reported by Airflow might vary depending on the Airflow Executor used. For example: `airflow.ti_failures/successes`, `airflow.operator_failures/successes`, `airflow.dag.task.duration` are [not reported for `KubernetesExecutor`](https://airflow.apache.org/docs/apache-airflow/stable/executor/kubernetes.html).

1. Install the [Airflow StatsD plugin](https://airflow.apache.org/docs/stable/metrics.html).

   ```shell
   pip install 'apache-airflow[statsd]'
   ```

1. Update the Airflow configuration file `airflow.cfg` by adding the following configs:

   ```
   [metrics]
   statsd_on = True
   # Hostname or IP of server running the Datadog Agent
   statsd_host = localhost
   # DogStatsD port configured in the Datadog Agent
   statsd_port = 8125
   statsd_prefix = airflow
   ```

Do not set `statsd_datadog_enabled` without first installing the Datadog DogStatsD package.

1. Update the [Datadog Agent main configuration file](https://docs.datadoghq.com/agent/guide/agent-configuration-files/) `datadog.yaml` by adding the following configuration to remap the Airflow notation to Datadog notation:

   ```yaml
   # dogstatsd_mapper_cache_size: 1000  # default to 1000
   dogstatsd_mapper_profiles:
     - name: airflow
       prefix: "airflow."
       mappings:
         - match: "airflow.*_start"
           name: "airflow.job.start"
           tags:
             job_name: "$1"
         - match: "airflow.*_end"
           name: "airflow.job.end"
           tags:
             job_name: "$1"
         - match: "airflow.*_heartbeat_failure"
           name: airflow.job.heartbeat.failure
           tags:
             job_name: "$1"
         - match: "airflow.operator_failures_*"
           name: "airflow.operator_failures"
           tags:
             operator_name: "$1"
         - match: "airflow.operator_successes_*"
           name: "airflow.operator_successes"
           tags:
             operator_name: "$1"
         - match: 'airflow\.dag_processing\.last_runtime\.(.*)'
           match_type: "regex"
           name: "airflow.dag_processing.last_runtime"
           tags:
             dag_file: "$1"
         - match: 'airflow\.dag_processing\.last_run\.seconds_ago\.(.*)'
           match_type: "regex"
           name: "airflow.dag_processing.last_run.seconds_ago"
           tags:
             dag_file: "$1"
         - match: 'airflow\.dag\.loading-duration\.(.*)'
           match_type: "regex"
           name: "airflow.dag.loading_duration"
           tags:
             dag_file: "$1"
         - match: "airflow.local_task_job.task_exit.*.*.*.*"
           name: "airflow.local_task_job.task_exit"
           tags:
             job_id: "$1"
             dag_id: "$2"
             task_id: "$3"
             return_code: "$4"
         - match: "airflow.dag.*.*.queue_duration"
           name: "airflow.dag.queue_duration"
           tags:
             dag_id: "$1"
             task_id: "$2"
         - match: "airflow.dag.*.*.queued_duration"
           name: "airflow.dag.queued_duration"
           tags:
             dag_id: "$1"
             task_id: "$2"
         - match: "airflow.dag.*.*.scheduled_duration"
           name: "airflow.dag.scheduled_duration"
           tags:
             dag_id: "$1"
             task_id: "$2"
         - match: "airflow.dagrun.*.first_task_scheduling_delay"
           name: "airflow.dagrun.first_task_scheduling_delay"
           tags:
             dag_id: "$1"
         - match: "airflow.pool.open_slots.*"
           name: "airflow.pool.open_slots"
           tags:
             pool_name: "$1"
         - match: "airflow.pool.queued_slots.*"
           name: "airflow.pool.queued_slots"
           tags:
             pool_name: "$1"
         - match: "airflow.pool.running_slots.*"
           name: "airflow.pool.running_slots"
           tags:
             pool_name: "$1"
         - match: "airflow.pool.used_slots.*"
           name: "airflow.pool.used_slots"
           tags:
             pool_name: "$1"
         - match: "airflow.pool.starving_tasks.*"
           name: "airflow.pool.starving_tasks"
           tags:
             pool_name: "$1"
         - match: "airflow.pool.deferred_slots.*"
           name: "airflow.pool.deferred_slots"
           tags:
             pool_name: "$1"
         - match: "airflow.pool.scheduled_slots.*"
           name: "airflow.pool.scheduled_slots"
           tags:
             pool_name: "$1"
         - match: 'airflow\.dagrun\.dependency-check\.(.*)'
           match_type: "regex"
           name: "airflow.dagrun.dependency_check"
           tags:
             dag_id: "$1"
         - match: 'airflow\.dag\.(.*)\.([^.]*)\.duration'
           match_type: "regex"
           name: "airflow.dag.task.duration"
           tags:
             dag_id: "$1"
             task_id: "$2"
         - match: 'airflow\.task\.cpu_usage\.(.*)\.(.*)'
           match_type: "regex"
           name: "airflow.task.cpu_usage"
           tags:
             dag_id: "$1"
             task_id: "$2"
         - match: 'airflow\.task\.mem_usage\.(.*)\.(.*)'
           match_type: "regex"
           name: "airflow.task.mem_usage"
           tags:
             dag_id: "$1"
             task_id: "$2"
         - match: "airflow.task.duration"
           name: "airflow.task.duration"
         - match: "airflow.task.queued_duration"
           name: "airflow.task.queued_duration"
         - match: "airflow.task.scheduled_duration"
           name: "airflow.task.scheduled_duration"
         - match: 'airflow\.dag_processing\.last_duration\.(.*)'
           match_type: "regex"
           name: "airflow.dag_processing.last_duration"
           tags:
             dag_file: "$1"
         - match: 'airflow\.dagrun\.duration\.success\.(.*)'
           match_type: "regex"
           name: "airflow.dagrun.duration.success"
           tags:
             dag_id: "$1"
         - match: 'airflow\.dagrun\.duration\.failed\.(.*)'
           match_type: "regex"
           name: "airflow.dagrun.duration.failed"
           tags:
             dag_id: "$1"
         - match: 'airflow\.dagrun\.schedule_delay\.(.*)'
           match_type: "regex"
           name: "airflow.dagrun.schedule_delay"
           tags:
             dag_id: "$1"
         - match: "airflow.dagrun.dependency-check"
           name: "airflow.dagrun.dependency_check"
         - match: "airflow.dagrun.duration.success"
           name: "airflow.dagrun.duration.success"
         - match: "airflow.dagrun.duration.failed"
           name: "airflow.dagrun.duration.failed"
         - match: "airflow.dagrun.schedule_delay"
           name: "airflow.dagrun.schedule_delay"
         - match: "airflow.dagrun.first_task_scheduling_delay"
           name: "airflow.dagrun.first_task_scheduling_delay"
         - match: "airflow.scheduler.tasks.running"
           name: "airflow.scheduler.tasks.running"
         - match: "airflow.scheduler.tasks.starving"
           name: "airflow.scheduler.tasks.starving"
         - match: "airflow.sla_email_notification_failure"
           name: "airflow.sla_email_notification_failure"
         - match: "airflow.sla_missed"
           name: "airflow.sla_missed"
         - match: "airflow.sla_callback_notification_failure"
           name: "airflow.sla_callback_notification_failure"
         - match: "airflow.scheduler.critical_section_query_duration"
           name: "airflow.scheduler.critical_section_query_duration"
         - match: "airflow.scheduler.scheduler_loop_duration"
           name: "airflow.scheduler.scheduler_loop_duration"
         - match: 'airflow\.task_removed_from_dag\.(.*)'
           match_type: "regex"
           name: "airflow.dag.task_removed"
           tags:
             dag_id: "$1"
         - match: 'airflow\.task_restored_to_dag\.(.*)'
           match_type: "regex"
           name: "airflow.dag.task_restored"
           tags:
             dag_id: "$1"
         - match: "airflow.task_instance_created-*"
           name: "airflow.task.instance_created"
           tags:
             task_class: "$1"
         - match: "airflow.task_instance_created"
           name: "airflow.task.instance_created"
         - match: "airflow.task_instance_created_*"
           name: "airflow.task.instance_created"
           tags:
             operator_name: "$1"
         - match: "airflow.task_removed_from_dag"
           name: "airflow.dag.task_removed"
         - match: 'airflow\.ti\.start\.(.+)\.(\w+)'
           match_type: regex
           name: airflow.ti.start
           tags:
             dag_id: "$1"
             task_id: "$2"
         - match: 'airflow\.ti\.finish\.(\w+)\.(.+)\.(\w+)'
           name: airflow.ti.finish
           match_type: regex
           tags:
             dag_id: "$1"
             task_id: "$2"
             state: "$3"
         - match: "airflow.ti.start"
           name: "airflow.ti.start"
         - match: "airflow.ti.finish"
           name: "airflow.ti.finish"
         - match: "airflow.celery.execute_command.failure"
           name: "airflow.celery.execute_command.failure"
         - match: "airflow.triggerer_heartbeat"
           name: "airflow.triggerer_heartbeat"
         - match: "airflow.triggers.blocked_main_thread"
           name: "airflow.triggers.blocked_main_thread"
         - match: "airflow.triggers.failed"
           name: "airflow.triggers.failed"
         - match: "airflow.triggers.succeeded"
           name: "airflow.triggers.succeeded"
         - match: "airflow.triggers.running"
           name: "airflow.triggers.running"
         - match: 'airflow\.triggers\.running\.(.*)'
           match_type: "regex"
           name: "airflow.triggers.running"
           tags:
             hostname: "$1"
         - match: "airflow.dataset.updates"
           name: "airflow.dataset.updates"
         - match: "airflow.dataset.orphaned"
           name: "airflow.dataset.orphaned"
         - match: "airflow.dataset.triggered_dagruns"
           name: "airflow.dataset.triggered_dagruns"
         - match: "airflow.executor.open_slots"
           name: "airflow.executor.open_slots"
         - match: 'airflow\.executor\.open_slots\.(.*)'
           match_type: "regex"
           name: "airflow.executor.open_slots"
           tags:
             executor_class_name: "$1"
         - match: "airflow.executor.queued_tasks"
           name: "airflow.executor.queued_tasks"
         - match: 'airflow\.executor\.queued_tasks\.(.*)'
           match_type: "regex"
           name: "airflow.executor.queued_tasks"
           tags:
             executor_class_name: "$1"
         - match: "airflow.executor.running_tasks"
           name: "airflow.executor.running_tasks"
         - match: 'airflow\.executor\.running_tasks\.(.*)'
           match_type: "regex"
           name: "airflow.executor.running_tasks"
           tags:
             executor_class_name: "$1"
         - match: "airflow.kubernetes_executor.adopt_task_instances.duration"
           name: "airflow.kubernetes_executor.adopt_task_instances.duration"
         - match: "airflow.kubernetes_executor.clear_not_launched_queued_tasks.duration"
           name: "airflow.kubernetes_executor.clear_not_launched_queued_tasks.duration"
         - match: "airflow.dag_processing.file_path_queue_size"
           name: "airflow.dag_processing.file_path_queue_size"
         - match: "airflow.dag_processing.file_path_queue_update_count"
           name: "airflow.dag_processing.file_path_queue_update_count"
         - match: 'airflow\.dag_processing\.last_num_of_db_queries\.(.*)'
           match_type: "regex"
           name: "airflow.dag_processing.last_num_of_db_queries"
           tags:
             dag_file: "$1"
         - match: "airflow.dag_processing.other_callback_count"
           name: "airflow.dag_processing.other_callback_count"
         - match: "airflow.dag_processing.sla_callback_count"
           name: "airflow.dag_processing.sla_callback_count"
         - match: "airflow.dag_file_processor_timeouts"
           name: "airflow.dag_file_processor_timeouts"
   ```

##### Restart Datadog Agent and Airflow{% #restart-datadog-agent-and-airflow %}

1. [Restart the Agent](https://docs.datadoghq.com/agent/guide/agent-commands/?tab=agentv6#start-stop-and-restart-the-agent).
1. Restart Airflow to start sending your Airflow metrics to the Agent DogStatsD endpoint.

##### Integration service checks{% #integration-service-checks %}

Use the default configuration in your `airflow.d/conf.yaml` file to activate your Airflow service checks. See the sample [airflow.d/conf.yaml](https://github.com/DataDog/integrations-core/blob/master/airflow/datadog_checks/airflow/data/conf.yaml.example) for all available configuration options.

##### Log collection{% #log-collection %}

*Available for Agent versions >6.0*

1. Collecting logs is disabled by default in the Datadog Agent. Enable it in your `datadog.yaml` file:

   ```yaml
   logs_enabled: true
   ```

1. Uncomment and edit this configuration block at the bottom of your `airflow.d/conf.yaml`: Change the `path` and `service` parameter values and configure them for your environment.

   - Configuration for DAG processor manager and Scheduler logs:

     ```yaml
     logs:
       - type: file
         path: "<PATH_TO_AIRFLOW>/logs/dag_processor_manager/dag_processor_manager.log"
         source: airflow
         log_processing_rules:
           - type: multi_line
             name: new_log_start_with_date
             pattern: \[\d{4}\-\d{2}\-\d{2}
       - type: file
         path: "<PATH_TO_AIRFLOW>/logs/scheduler/latest/*.log"
         source: airflow
         log_processing_rules:
           - type: multi_line
             name: new_log_start_with_date
             pattern: \[\d{4}\-\d{2}\-\d{2}
     ```

Regular clean up is recommended for scheduler logs with daily log rotation.

   - Additional configuration for DAG tasks logs:

     ```yaml
     logs:
       - type: file
         path: "<PATH_TO_AIRFLOW>/logs/*/*/*/*.log"
         source: airflow
         log_processing_rules:
           - type: multi_line
             name: new_log_start_with_date
             pattern: \[\d{4}\-\d{2}\-\d{2}
     ```

Caveat: By default Airflow uses this log file template for tasks: `log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log`. The number of log files grow quickly if not cleaned regularly. This pattern is used by Airflow UI to display logs individually for each executed task.

If you do not view logs in Airflow UI, Datadog recommends this configuration in `airflow.cfg`: `log_filename_template = dag_tasks.log`. Then log rotate this file and use this configuration:

     ```yaml
     logs:
       - type: file
         path: "<PATH_TO_AIRFLOW>/logs/dag_tasks.log"
         source: airflow
         log_processing_rules:
           - type: multi_line
             name: new_log_start_with_date
             pattern: \[\d{4}\-\d{2}\-\d{2}
     ```

1. [Restart the Agent](https://docs.datadoghq.com/agent/guide/agent-commands/?tab=agentv6#start-stop-and-restart-the-agent).

{% /tab %}

{% tab title="Containerized" %}
#### Containerized{% #containerized %}

##### Configure Datadog Agent Airflow integration{% #configure-datadog-agent-airflow-integration %}

**Note:** The Datadog Agent's `airflow` integration does not support Airflow V3.

For containerized environments, see the [Autodiscovery Integration Templates](https://docs.datadoghq.com/getting_started/agent/autodiscovery/?tab=docker#integration-templates) for guidance on applying the parameters below.

| Parameter            | Value                             |
| -------------------- | --------------------------------- |
| `<INTEGRATION_NAME>` | `airflow`                         |
| `<INIT_CONFIG>`      | blank or `{}`                     |
| `<INSTANCE_CONFIG>`  | `{"url": "http://%%host%%:8080"}` |

Ensure that `url` matches your Airflow [webserver `base_url`](https://airflow.apache.org/docs/apache-airflow/2.11.0/configurations-ref.html#base-url), the URL used to connect to your Airflow instance. Replace `localhost` with the template variable `%%host%%`.

If you are using the [official Airflow Helm chart](https://airflow.apache.org/docs/helm-chart/stable/index.html), this should be applied on the `webserver` pod and its `webserver` container. For example, with the [`webserver.podAnnotations`](https://github.com/apache/airflow/blob/helm-chart/1.16.0/chart/values.yaml#L1583), your Autodiscovery Annotations may look like the following:

```yaml
webserver:
  podAnnotations:
    ad.datadoghq.com/webserver.checks: |
      {
        "airflow": {
          "instances": [
            {
              "url": "http://%%host%%:8080"
            }
          ]
        }
      }
```

Adjust the `ad.datadoghq.com/<CONTAINER_NAME>.checks` annotation accordingly if your container name differs.

##### Connect Airflow to DogStatsD{% #connect-airflow-to-dogstatsd %}

Connect Airflow to DogStatsD (included in the Datadog Agent) by using the Airflow `statsd` feature to collect metrics. For more information about the metrics reported by the Airflow version used and the additional configuration options, see the Airflow documentation below:

- [Airflow Metrics](https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html)
- [Airflow Metrics Configuration](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#metrics)

**Note**: Presence or absence of StatsD metrics reported by Airflow might vary depending on the Airflow Executor used. For example: `airflow.ti_failures/successes`, `airflow.operator_failures/successes`, `airflow.dag.task.duration` are [not reported for `KubernetesExecutor`](https://airflow.apache.org/docs/apache-airflow/stable/executor/kubernetes.html).

The Airflow StatsD configuration can be enabled with the following environment variables with the Airflow Helm Chart:

```yaml
env:
  - name: AIRFLOW__METRICS__STATSD_ON
    value: "True"
  - name: AIRFLOW__METRICS__STATSD_PORT
    value: "8125"
  - name: AIRFLOW__METRICS__STATSD_PREFIX
    value: "airflow"
extraEnv: |
  - name: AIRFLOW__METRICS__STATSD_HOST
    valueFrom:
      fieldRef:
        fieldPath: status.hostIP
```

**Note**: The [Airflow Helm Chart](https://airflow.apache.org/docs/helm-chart/stable/index.html) requires the `valueFrom` based environment variables to be set with `extraEnv`. Do not set `AIRFLOW__METRICS__STATSD_DATADOG_ENABLED` without first installing the Datadog package.

The environment variable for the metrics endpoint `AIRFLOW__METRICS__STATSD_HOST` is supplied with the node's host IP address to route the StatsD data to the Datadog Agent pod on the same node as the Airflow pod. This setup also requires the Agent to have a `hostPort` open for this port `8125` and accepting non-local StatsD traffic. For more information, see [DogStatsD on Kubernetes Setup](https://docs.datadoghq.com/developers/dogstatsd/?tab=kubernetes#setup). This should direct the StatsD traffic from the Airflow container to a Datadog Agent ready to accept the incoming data.

You must also update the Datadog Agent with the corresponding `dogstatsd_mapper_profiles`. To do this, copy the `dogstatsd_mapper_profiles` provided in the [Host installation](https://docs.datadoghq.com/integrations/airflow/?tab=host#connect-airflow-to-dogstatsd) into your `datadog.yaml` file. Alternatively, you can also deploy your Datadog Agent with the equivalent JSON configuration in the environment variable `DD_DOGSTATSD_MAPPER_PROFILES`. For Kubernetes, use the complete configuration below:

```yaml
env:
  - name: DD_DOGSTATSD_MAPPER_PROFILES
    value: >
      [{"name":"airflow","prefix":"airflow.","mappings":[{"match":"airflow.*_start","name":"airflow.job.start","tags":{"job_name":"$1"}},{"match":"airflow.*_end","name":"airflow.job.end","tags":{"job_name":"$1"}},{"match":"airflow.*_heartbeat_failure","name":"airflow.job.heartbeat.failure","tags":{"job_name":"$1"}},{"match":"airflow.operator_failures_*","name":"airflow.operator_failures","tags":{"operator_name":"$1"}},{"match":"airflow.operator_successes_*","name":"airflow.operator_successes","tags":{"operator_name":"$1"}},{"match":"airflow\\.dag_processing\\.last_runtime\\.(.*)","match_type":"regex","name":"airflow.dag_processing.last_runtime","tags":{"dag_file":"$1"}},{"match":"airflow\\.dag_processing\\.last_run\\.seconds_ago\\.(.*)","match_type":"regex","name":"airflow.dag_processing.last_run.seconds_ago","tags":{"dag_file":"$1"}},{"match":"airflow\\.dag\\.loading-duration\\.(.*)","match_type":"regex","name":"airflow.dag.loading_duration","tags":{"dag_file":"$1"}},{"match":"airflow.local_task_job.task_exit.*.*.*.*","name":"airflow.local_task_job.task_exit","tags":{"job_id":"$1","dag_id":"$2","task_id":"$3","return_code":"$4"}},{"match":"airflow.dag.*.*.queue_duration","name":"airflow.dag.queue_duration","tags":{"dag_id":"$1","task_id":"$2"}},{"match":"airflow.dag.*.*.queued_duration","name":"airflow.dag.queued_duration","tags":{"dag_id":"$1","task_id":"$2"}},{"match":"airflow.dag.*.*.scheduled_duration","name":"airflow.dag.scheduled_duration","tags":{"dag_id":"$1","task_id":"$2"}},{"match":"airflow.dagrun.*.first_task_scheduling_delay","name":"airflow.dagrun.first_task_scheduling_delay","tags":{"dag_id":"$1"}},{"match":"airflow.pool.open_slots.*","name":"airflow.pool.open_slots","tags":{"pool_name":"$1"}},{"match":"airflow.pool.queued_slots.*","name":"airflow.pool.queued_slots","tags":{"pool_name":"$1"}},{"match":"airflow.pool.running_slots.*","name":"airflow.pool.running_slots","tags":{"pool_name":"$1"}},{"match":"airflow.pool.used_slots.*","name":"airflow.pool.used_slots","tags":{"pool_name":"$1"}},{"match":"airflow.pool.starving_tasks.*","name":"airflow.pool.starving_tasks","tags":{"pool_name":"$1"}},{"match":"airflow.pool.deferred_slots.*","name":"airflow.pool.deferred_slots","tags":{"pool_name":"$1"}},{"match":"airflow.pool.scheduled_slots.*","name":"airflow.pool.scheduled_slots","tags":{"pool_name":"$1"}},{"match":"airflow\\.dagrun\\.dependency-check\\.(.*)","match_type":"regex","name":"airflow.dagrun.dependency_check","tags":{"dag_id":"$1"}},{"match":"airflow\\.dag\\.(.*)\\.([^.]*)\\.duration","match_type":"regex","name":"airflow.dag.task.duration","tags":{"dag_id":"$1","task_id":"$2"}},{"match":"airflow\\.task\\.cpu_usage\\.(.*)\\.(.*)","match_type":"regex","name":"airflow.task.cpu_usage","tags":{"dag_id":"$1","task_id":"$2"}},{"match":"airflow\\.task\\.mem_usage\\.(.*)\\.(.*)","match_type":"regex","name":"airflow.task.mem_usage","tags":{"dag_id":"$1","task_id":"$2"}},{"match":"airflow.task.duration","name":"airflow.task.duration"},{"match":"airflow.task.queued_duration","name":"airflow.task.queued_duration"},{"match":"airflow.task.scheduled_duration","name":"airflow.task.scheduled_duration"},{"match":"airflow\\.dag_processing\\.last_duration\\.(.*)","match_type":"regex","name":"airflow.dag_processing.last_duration","tags":{"dag_file":"$1"}},{"match":"airflow\\.dagrun\\.duration\\.success\\.(.*)","match_type":"regex","name":"airflow.dagrun.duration.success","tags":{"dag_id":"$1"}},{"match":"airflow\\.dagrun\\.duration\\.failed\\.(.*)","match_type":"regex","name":"airflow.dagrun.duration.failed","tags":{"dag_id":"$1"}},{"match":"airflow\\.dagrun\\.schedule_delay\\.(.*)","match_type":"regex","name":"airflow.dagrun.schedule_delay","tags":{"dag_id":"$1"}},{"match":"airflow.dagrun.dependency-check","name":"airflow.dagrun.dependency_check"},{"match":"airflow.dagrun.duration.success","name":"airflow.dagrun.duration.success"},{"match":"airflow.dagrun.duration.failed","name":"airflow.dagrun.duration.failed"},{"match":"airflow.dagrun.schedule_delay","name":"airflow.dagrun.schedule_delay"},{"match":"airflow.dagrun.first_task_scheduling_delay","name":"airflow.dagrun.first_task_scheduling_delay"},{"match":"airflow.scheduler.tasks.running","name":"airflow.scheduler.tasks.running"},{"match":"airflow.scheduler.tasks.starving","name":"airflow.scheduler.tasks.starving"},{"match":"airflow.sla_email_notification_failure","name":"airflow.sla_email_notification_failure"},{"match":"airflow.sla_missed","name":"airflow.sla_missed"},{"match":"airflow.sla_callback_notification_failure","name":"airflow.sla_callback_notification_failure"},{"match":"airflow.scheduler.critical_section_query_duration","name":"airflow.scheduler.critical_section_query_duration"},{"match":"airflow.scheduler.scheduler_loop_duration","name":"airflow.scheduler.scheduler_loop_duration"},{"match":"airflow\\.task_removed_from_dag\\.(.*)","match_type":"regex","name":"airflow.dag.task_removed","tags":{"dag_id":"$1"}},{"match":"airflow\\.task_restored_to_dag\\.(.*)","match_type":"regex","name":"airflow.dag.task_restored","tags":{"dag_id":"$1"}},{"match":"airflow.task_instance_created-*","name":"airflow.task.instance_created","tags":{"task_class":"$1"}},{"match":"airflow.task_instance_created","name":"airflow.task.instance_created"},{"match":"airflow.task_instance_created_*","name":"airflow.task.instance_created","tags":{"operator_name":"$1"}},{"match":"airflow.task_removed_from_dag","name":"airflow.dag.task_removed"},{"match":"airflow\\.ti\\.start\\.(.+)\\.(\\w+)","match_type":"regex","name":"airflow.ti.start","tags":{"dag_id":"$1","task_id":"$2"}},{"match":"airflow\\.ti\\.finish\\.(\\w+)\\.(.+)\\.(\\w+)","match_type":"regex","name":"airflow.ti.finish","tags":{"dag_id":"$1","task_id":"$2","state":"$3"}},{"match":"airflow.ti.start","name":"airflow.ti.start"},{"match":"airflow.ti.finish","name":"airflow.ti.finish"},{"match":"airflow.celery.execute_command.failure","name":"airflow.celery.execute_command.failure"},{"match":"airflow.triggerer_heartbeat","name":"airflow.triggerer_heartbeat"},{"match":"airflow.triggers.blocked_main_thread","name":"airflow.triggers.blocked_main_thread"},{"match":"airflow.triggers.failed","name":"airflow.triggers.failed"},{"match":"airflow.triggers.succeeded","name":"airflow.triggers.succeeded"},{"match":"airflow.triggers.running","name":"airflow.triggers.running"},{"match":"airflow\\.triggers\\.running\\.(.*)","match_type":"regex","name":"airflow.triggers.running","tags":{"hostname":"$1"}},{"match":"airflow.dataset.updates","name":"airflow.dataset.updates"},{"match":"airflow.dataset.orphaned","name":"airflow.dataset.orphaned"},{"match":"airflow.dataset.triggered_dagruns","name":"airflow.dataset.triggered_dagruns"},{"match":"airflow.executor.open_slots","name":"airflow.executor.open_slots"},{"match":"airflow\\.executor\\.open_slots\\.(.*)","match_type":"regex","name":"airflow.executor.open_slots","tags":{"executor_class_name":"$1"}},{"match":"airflow.executor.queued_tasks","name":"airflow.executor.queued_tasks"},{"match":"airflow\\.executor\\.queued_tasks\\.(.*)","match_type":"regex","name":"airflow.executor.queued_tasks","tags":{"executor_class_name":"$1"}},{"match":"airflow.executor.running_tasks","name":"airflow.executor.running_tasks"},{"match":"airflow\\.executor\\.running_tasks\\.(.*)","match_type":"regex","name":"airflow.executor.running_tasks","tags":{"executor_class_name":"$1"}},{"match":"airflow.kubernetes_executor.adopt_task_instances.duration","name":"airflow.kubernetes_executor.adopt_task_instances.duration"},{"match":"airflow.kubernetes_executor.clear_not_launched_queued_tasks.duration","name":"airflow.kubernetes_executor.clear_not_launched_queued_tasks.duration"},{"match":"airflow.dag_processing.file_path_queue_size","name":"airflow.dag_processing.file_path_queue_size"},{"match":"airflow.dag_processing.file_path_queue_update_count","name":"airflow.dag_processing.file_path_queue_update_count"},{"match":"airflow\\.dag_processing\\.last_num_of_db_queries\\.(.*)","match_type":"regex","name":"airflow.dag_processing.last_num_of_db_queries","tags":{"dag_file":"$1"}},{"match":"airflow.dag_processing.other_callback_count","name":"airflow.dag_processing.other_callback_count"},{"match":"airflow.dag_processing.sla_callback_count","name":"airflow.dag_processing.sla_callback_count"},{"match":"airflow.dag_file_processor_timeouts","name":"airflow.dag_file_processor_timeouts"}]}]
```

To add non-static tags to the StatsD metrics, you must use DogStatsD mapper profiles. [See an example mapper profile](http://docs.datadoghq.com/resources/json/airflow_ust.json) that adds `service` and `env` tags.

##### Log collection{% #log-collection %}

*Available for Agent versions >6.0*

Collecting logs is disabled by default in the Datadog Agent. To enable it, see [Kubernetes Log Collection](https://docs.datadoghq.com/agent/kubernetes/integrations/?tab=kubernetes#configuration).

| Parameter      | Value                                                 |
| -------------- | ----------------------------------------------------- |
| `<LOG_CONFIG>` | `{"source": "airflow", "service": "<YOUR_APP_NAME>"}` |

{% /tab %}

### Validation{% #validation %}

[Run the Agent's status subcommand](https://docs.datadoghq.com/agent/guide/agent-commands/?tab=agentv6#agent-status-and-information) and look for `airflow` under the Checks section.

## Annexe{% #annexe %}

### Airflow DatadogHook{% #airflow-datadoghook %}

In addition, [Airflow DatadogHook](https://airflow.apache.org/docs/apache-airflow-providers-datadog/stable/_modules/airflow/providers/datadog/hooks/datadog.html) can be used to interact with Datadog:

- Send Metric
- Query Metric
- Post Event

## Data Collected{% #data-collected %}

### Metrics{% #metrics %}

|  |
|  |
| **airflow.can\_connect**(count)                                                      | 1 if can connect to Airflow, otherwise 0                                                                                                                                          |
| **airflow.celery.execute\_command.failure**(count)                                   | Number of non-zero exit code from Celery task.                                                                                                                                    |
| **airflow.celery.task\_timeout\_error**(count)                                       | Number of `AirflowTaskTimeout` errors raised when publishing Task to Celery Broker.*Shown as error*                                                                               |
| **airflow.collect\_db\_dags**(gauge)                                                 | Milliseconds taken for fetching all Serialized Dags from DB*Shown as millisecond*                                                                                                 |
| **airflow.dag.callback\_exceptions**(count)                                          | Number of exceptions raised from DAG callbacks. When this happens, it means DAG callback is not working*Shown as error*                                                           |
| **airflow.dag.loading\_duration**(gauge)                                             | DAG loading duration in seconds (deprecated)*Shown as second*                                                                                                                     |
| **airflow.dag.queue\_duration**(gauge)                                               | Milliseconds a task spends in the queue (deprecated, use queued_duration).*Shown as millisecond*                                                                                  |
| **airflow.dag.queued\_duration**(gauge)                                              | Milliseconds a task spends in the Queued state, before being Running.*Shown as millisecond*                                                                                       |
| **airflow.dag.scheduled\_duration**(gauge)                                           | Milliseconds a task spends in the Scheduled state, before being Queued.*Shown as millisecond*                                                                                     |
| **airflow.dag.task.duration**(gauge)                                                 | Milliseconds taken to finish a task*Shown as millisecond*                                                                                                                         |
| **airflow.dag.task.ongoing\_duration**(gauge)                                        | Current duration for ongoing DAG tasks*Shown as second*                                                                                                                           |
| **airflow.dag.task.total\_running**(gauge)                                           | Total number of running tasks                                                                                                                                                     |
| **airflow.dag.task\_removed**(gauge)                                                 | Tasks removed from DAG*Shown as second*                                                                                                                                           |
| **airflow.dag.task\_restored**(gauge)                                                | Tasks restored to DAG*Shown as second*                                                                                                                                            |
| **airflow.dag\_file\_processor\_timeouts**(gauge)                                    | (DEPRECATED) same behavior as dag_processing.processor_timeouts                                                                                                                   |
| **airflow.dag\_file\_refresh\_error**(count)                                         | Number of failures loading any DAG files*Shown as error*                                                                                                                          |
| **airflow.dag\_processing.file\_path\_queue\_size**(gauge)                           | Number of DAG files to be considered for the next scan.                                                                                                                           |
| **airflow.dag\_processing.file\_path\_queue\_update\_count**(count)                  | Number of times we've scanned the filesystem and queued all existing dags.                                                                                                        |
| **airflow.dag\_processing.import\_errors**(gauge)                                    | Number of errors from trying to parse DAG files*Shown as error*                                                                                                                   |
| **airflow.dag\_processing.last\_duration**(gauge)                                    | Milliseconds taken to load the given DAG file*Shown as millisecond*                                                                                                               |
| **airflow.dag\_processing.last\_num\_of\_db\_queries**(gauge)                        | Number of queries to Airflow database during parsing per dag_file.                                                                                                                |
| **airflow.dag\_processing.last\_run.seconds\_ago**(gauge)                            | Seconds since `<dag_file>` was last processed*Shown as second*                                                                                                                    |
| **airflow.dag\_processing.last\_runtime**(gauge)                                     | Seconds spent processing `<dag_file>` (in most recent iteration)*Shown as second*                                                                                                 |
| **airflow.dag\_processing.manager\_stalls**(count)                                   | Number of stalled `DagFileProcessorManager`                                                                                                                                       |
| **airflow.dag\_processing.other\_callback\_count**(count)                            | Number of non-SLA callbacks received.                                                                                                                                             |
| **airflow.dag\_processing.processes**(count)                                         | Number of currently running DAG parsing processes                                                                                                                                 |
| **airflow.dag\_processing.processor\_timeouts**(gauge)                               | Number of file processors that have been killed due to taking too long                                                                                                            |
| **airflow.dag\_processing.sla\_callback\_count**(count)                              | Number of SLA callbacks received.                                                                                                                                                 |
| **airflow.dag\_processing.total\_parse\_time**(gauge)                                | Seconds taken to scan and import all DAG files once*Shown as second*                                                                                                              |
| **airflow.dagbag\_size**(gauge)                                                      | DAG bag size                                                                                                                                                                      |
| **airflow.dagrun.dependency\_check**(gauge)                                          | Milliseconds taken to check DAG dependencies*Shown as millisecond*                                                                                                                |
| **airflow.dagrun.duration.failed**(gauge)                                            | Milliseconds taken for a DagRun to reach failed state*Shown as millisecond*                                                                                                       |
| **airflow.dagrun.duration.success**(gauge)                                           | Milliseconds taken for a DagRun to reach success state*Shown as millisecond*                                                                                                      |
| **airflow.dagrun.first\_task\_scheduling\_delay**(gauge)                             | Milliseconds elapsed between first task start_date and dagrun expected start*Shown as millisecond*                                                                                |
| **airflow.dagrun.schedule\_delay**(gauge)                                            | Milliseconds of delay between the scheduled DagRun start date and the actual DagRun start date*Shown as millisecond*                                                              |
| **airflow.dataset.orphaned**(gauge)                                                  | Number of datasets marked as orphans because they are no longer referenced in DAG.                                                                                                |
| **airflow.dataset.triggered\_dagruns**(count)                                        | Number of DAG runs triggered by a dataset update.                                                                                                                                 |
| **airflow.dataset.updates**(count)                                                   | Number of updated datasets.                                                                                                                                                       |
| **airflow.executor.open\_slots**(gauge)                                              | Number of open slots on executor                                                                                                                                                  |
| **airflow.executor.queued\_tasks**(gauge)                                            | Number of queued tasks on executor*Shown as task*                                                                                                                                 |
| **airflow.executor.running\_tasks**(gauge)                                           | Number of running tasks on executor*Shown as task*                                                                                                                                |
| **airflow.healthy**(count)                                                           | 1 if Airflow is healthy, otherwise 0                                                                                                                                              |
| **airflow.job.end**(count)                                                           | Number of ended `<job_name>` job, ex. `SchedulerJob`, `LocalTaskJob`*Shown as job*                                                                                                |
| **airflow.job.heartbeat.failure**(count)                                             | Number of failed Heartbeats for a `<job_name>` job, ex. `SchedulerJob`, `LocalTaskJob`*Shown as error*                                                                            |
| **airflow.job.start**(count)                                                         | Number of started `<job_name>` job, ex. `SchedulerJob`, `LocalTaskJob`*Shown as job*                                                                                              |
| **airflow.kubernetes\_executor.adopt\_task\_instances.duration**(gauge)              | Milliseconds taken to adopt the task instances in Kubernetes Executor.*Shown as millisecond*                                                                                      |
| **airflow.kubernetes\_executor.clear\_not\_launched\_queued\_tasks.duration**(gauge) | Milliseconds taken for clearing not launched queued tasks in Kubernetes Executor.*Shown as millisecond*                                                                           |
| **airflow.local\_task\_job.task\_exit**(count)                                       | Number of LocalTaskJob terminations with a return_code.                                                                                                                           |
| **airflow.operator\_failures**(count)                                                | Operator `<operator_name>` failures                                                                                                                                               |
| **airflow.operator\_successes**(count)                                               | Operator `<operator_name>` successes                                                                                                                                              |
| **airflow.pool.deferred\_slots**(gauge)                                              | Number of deferred slots in the pool.                                                                                                                                             |
| **airflow.pool.open\_slots**(gauge)                                                  | Number of open slots in the pool                                                                                                                                                  |
| **airflow.pool.queued\_slots**(gauge)                                                | Number of queued slots in the pool                                                                                                                                                |
| **airflow.pool.running\_slots**(gauge)                                               | Number of running slots in the pool                                                                                                                                               |
| **airflow.pool.scheduled\_slots**(gauge)                                             | Number of scheduled slots in the pool.                                                                                                                                            |
| **airflow.pool.starving\_tasks**(gauge)                                              | Number of starving tasks in the pool*Shown as task*                                                                                                                               |
| **airflow.pool.used\_slots**(gauge)                                                  | Number of used slots in the pool                                                                                                                                                  |
| **airflow.previously\_succeeded**(count)                                             | Number of previously succeeded task instances*Shown as task*                                                                                                                      |
| **airflow.scheduler.critical\_section\_busy**(count)                                 | Count of times a scheduler process tried to get a lock on the critical section (needed to send tasks to the executor) and found it locked by another process.*Shown as operation* |
| **airflow.scheduler.critical\_section\_duration**(gauge)                             | Milliseconds spent in the critical section of scheduler loop – only a single scheduler can enter this loop at a time*Shown as millisecond*                                        |
| **airflow.scheduler.critical\_section\_query\_duration**(gauge)                      | Milliseconds spent running the critical section task instance query.*Shown as millisecond*                                                                                        |
| **airflow.scheduler.orphaned\_tasks.adopted**(count)                                 | Number of Orphaned tasks adopted by the Scheduler*Shown as task*                                                                                                                  |
| **airflow.scheduler.orphaned\_tasks.cleared**(count)                                 | Number of Orphaned tasks cleared by the Scheduler*Shown as task*                                                                                                                  |
| **airflow.scheduler.scheduler\_loop\_duration**(gauge)                               | Milliseconds spent running one scheduler loop.*Shown as millisecond*                                                                                                              |
| **airflow.scheduler.tasks.executable**(count)                                        | Number of tasks that are ready for execution (set to queued) with respect to pool limits, dag concurrency, executor state, and priority.*Shown as task*                           |
| **airflow.scheduler.tasks.killed\_externally**(count)                                | Number of tasks killed externally*Shown as task*                                                                                                                                  |
| **airflow.scheduler.tasks.running**(count)                                           | Number of tasks running in executor*Shown as task*                                                                                                                                |
| **airflow.scheduler.tasks.starving**(count)                                          | Number of tasks that cannot be scheduled because of no open slot in pool*Shown as task*                                                                                           |
| **airflow.scheduler.tasks.without\_dagrun**(count)                                   | Number of tasks without DagRuns or with DagRuns not in Running state*Shown as task*                                                                                               |
| **airflow.scheduler\_heartbeat**(count)                                              | Scheduler heartbeats                                                                                                                                                              |
| **airflow.sla\_callback\_notification\_failure**(count)                              | Number of failed SLA miss callback notification attempts.                                                                                                                         |
| **airflow.sla\_email\_notification\_failure**(count)                                 | Number of failed SLA miss email notification attempts*Shown as task*                                                                                                              |
| **airflow.sla\_missed**(count)                                                       | Number of SLA misses.                                                                                                                                                             |
| **airflow.smart\_sensor\_operator.exception\_failures**(count)                       | Number of failures caused by exception in the previous smart sensor poking loop*Shown as error*                                                                                   |
| **airflow.smart\_sensor\_operator.infra\_failures**(count)                           | Number of infrastructure failures in the previous smart sensor poking loop*Shown as error*                                                                                        |
| **airflow.smart\_sensor\_operator.poked\_exception**(count)                          | Number of exceptions in the previous smart sensor poking loop*Shown as error*                                                                                                     |
| **airflow.smart\_sensor\_operator.poked\_success**(count)                            | Number of newly succeeded tasks poked by the smart sensor in the previous poking loop*Shown as task*                                                                              |
| **airflow.smart\_sensor\_operator.poked\_tasks**(count)                              | Number of tasks poked by the smart sensor in the previous poking loop*Shown as task*                                                                                              |
| **airflow.task.cpu\_usage**(gauge)                                                   | Percentage of CPU used by a task.*Shown as percent*                                                                                                                               |
| **airflow.task.duration**(gauge)                                                     | Milliseconds taken to run a task.*Shown as millisecond*                                                                                                                           |
| **airflow.task.instance\_created**(gauge)                                            | Task instances created*Shown as second*                                                                                                                                           |
| **airflow.task.mem\_usage**(gauge)                                                   | Percentage of memory used by a task.*Shown as percent*                                                                                                                            |
| **airflow.task.queued\_duration**(gauge)                                             | Milliseconds a task spends in the Queued state, before being Running.*Shown as millisecond*                                                                                       |
| **airflow.task.scheduled\_duration**(gauge)                                          | Milliseconds a task spends in the Scheduled state, before being Queued.*Shown as millisecond*                                                                                     |
| **airflow.task\_instance\_created**(count)                                           | Number of tasks instances created for a given Operator*Shown as task*                                                                                                             |
| **airflow.task\_removed\_from\_dag**(count)                                          | Number of tasks removed for a given dag (i.e. task no longer exists in DAG)*Shown as task*                                                                                        |
| **airflow.task\_restored\_to\_dag**(count)                                           | Number of tasks restored for a given dag (i.e. task instance which was previously in REMOVED state in the DB is added to DAG file)*Shown as task*                                 |
| **airflow.ti.finish**(count)                                                         | Number of completed task in a given dag.*Shown as task*                                                                                                                           |
| **airflow.ti.start**(count)                                                          | Number of started task in a given dag.*Shown as task*                                                                                                                             |
| **airflow.ti\_failures**(count)                                                      | Overall task instances failures*Shown as task*                                                                                                                                    |
| **airflow.ti\_successes**(count)                                                     | Overall task instances successes*Shown as task*                                                                                                                                   |
| **airflow.triggerer\_heartbeat**(count)                                              | Triggerer heartbeats.                                                                                                                                                             |
| **airflow.triggers.blocked\_main\_thread**(count)                                    | Number of triggers that blocked the main thread (likely due to not being async).                                                                                                  |
| **airflow.triggers.failed**(count)                                                   | Number of triggers that errored before they could fire an event.                                                                                                                  |
| **airflow.triggers.running**(gauge)                                                  | Number of triggers currently running for a triggerer (described by hostname).                                                                                                     |
| **airflow.triggers.succeeded**(count)                                                | Number of triggers that have fired at least one event.                                                                                                                            |
| **airflow.zombies\_killed**(count)                                                   | Zombie tasks killed*Shown as task*                                                                                                                                                |

**Note**: `airflow.healthy`, `airflow.can_connect`, `airflow.dag.task.total_running`, and `airflow.dag.task.ongoing_duration` metrics are collected from the Agent portion of the integration. All other metrics come from StatsD.

### Events{% #events %}

The Airflow check does not include any events.

### Service Checks{% #service-checks %}

**airflow.can\_connect**

Returns `CRITICAL` if unable to connect to Airflow. Returns `OK` otherwise.

*Statuses: ok, critical*

**airflow.healthy**

Returns `CRITICAL` if Airflow is not healthy. Returns `OK` otherwise.

*Statuses: ok, critical*

## Troubleshooting{% #troubleshooting %}

### HTTP 403 errors for Agent integration{% #http-403-errors-for-agent-integration %}

You may need to configure parameters for the Datadog Agent to make authenticated requests to Airflow's API. Use one of the available [configuration options](https://github.com/DataDog/integrations-core/blob/master/airflow/datadog_checks/airflow/data/conf.yaml.example#L84-L118).

### Datadog DogStatsD package and origin detection{% #datadog-dogstatsd-package-and-origin-detection %}

Airflow can use its own StatsD library, as well the Datadog Python DogStatsD logger. Using the Datadog Python DogStatsD can provide extra tagging options, including [Origin Detection](https://docs.datadoghq.com/developers/dogstatsd/?tab=cgroups#origin-detection) in Kubernetes.

However, this does **not** come installed by default in Airflow. You need to install the [Datadog provider package](https://airflow.apache.org/docs/apache-airflow-providers-datadog/stable/index.html). For host installations, you can install it directly with `pip install apache-airflow-providers-datadog`.

For containerized environments, [Airflow recommends](https://airflow.apache.org/docs/docker-stack/entrypoint.html#installing-additional-requirements) to build a custom image with this package installed. For example, the following `Dockerfile` can be used relative to your desired version tag (ex: `2.8.4` or `3.0.2`):

```
FROM apache/airflow:<VERSION>
RUN pip install apache-airflow-providers-datadog
```

After that is running, provide the environment variable to your Airflow containers to enable this:

```yaml
- name: AIRFLOW__METRICS__STATSD_DATADOG_ENABLED
  value: "true"
```

Because this option switches Airflow from using the Airflow StatsD library to the Datadog DogStatsD library, this option supports Datadog tagging options, including Origin Detection out-of-the-box on the Airflow side. You need to enable [Origin Detection on the Datadog Agent](https://docs.datadoghq.com/developers/dogstatsd/?tab=cgroups#origin-detection) side to match.

If you try to enable the DogStatsD plugin without this package installed, no metrics are sent, and an error like the following occurs:

{stats.py:42} ERROR - Could not configure StatsClient: No module named 'datadog', using NoStatsLogger instead.

Need help? Contact [Datadog support](https://docs.datadoghq.com/help/).
