The Datadog Agent collects many metrics from Airflow, including those for:
Metrics are collected through the Airflow StatsD plugin and sent to Datadog’s DogStatsD.
In addition to metrics, the Datadog Agent also sends service checks related to Airflow’s health.
All steps below are needed for the Airflow integration to work properly. Before you begin, install the Datadog Agent version >=6.17
or >=7.17
, which includes the StatsD/DogStatsD mapping feature.
Configure the Airflow check included in the Datadog Agent package to collect health metrics and service checks.
(Optional) Edit the airflow.d/conf.yaml
file, in the conf.d/
folder at the root of your Agent’s configuration directory to start collecting your Airflow service checks. See the sample airflow.d/conf.yaml for all available configuration options.
Connect Airflow to DogStatsD (included in the Datadog Agent) by using the Airflow statsd
feature to collect metrics:
Note: Presence or absence of StatsD metrics reported by Airflow might vary depending on the Airflow Executor used. For example: airflow.ti_failures/successes, airflow.operator_failures/successes, airflow.dag.task.duration
are not reported for KubernetesExecutor
.
Install the Airflow StatsD plugin.
pip install 'apache-airflow[statsd]'
Update the Airflow configuration file airflow.cfg
by adding the following configs:
[scheduler]
statsd_on = True
statsd_host = localhost # Hostname or IP of server running the Datadog Agent
statsd_port = 8125 # DogStatsD port configured in the Datadog Agent
statsd_prefix = airflow
Update the Datadog Agent main configuration file datadog.yaml
by adding the following configs:
# dogstatsd_mapper_cache_size: 1000 # default to 1000
dogstatsd_mapper_profiles:
- name: airflow
prefix: "airflow."
mappings:
- match: "airflow.*_start"
name: "airflow.job.start"
tags:
job_name: "$1"
- match: "airflow.*_end"
name: "airflow.job.end"
tags:
job_name: "$1"
- match: "airflow.*_heartbeat_failure"
name: airflow.job.heartbeat.failure
tags:
job_name: "$1"
- match: "airflow.operator_failures_*"
name: "airflow.operator_failures"
tags:
operator_name: "$1"
- match: "airflow.operator_successes_*"
name: "airflow.operator_successes"
tags:
operator_name: "$1"
- match: 'airflow\.dag_processing\.last_runtime\.(.*)'
match_type: "regex"
name: "airflow.dag_processing.last_runtime"
tags:
dag_file: "$1"
- match: 'airflow\.dag_processing\.last_run\.seconds_ago\.(.*)'
match_type: "regex"
name: "airflow.dag_processing.last_run.seconds_ago"
tags:
dag_file: "$1"
- match: 'airflow\.dag\.loading-duration\.(.*)'
match_type: "regex"
name: "airflow.dag.loading_duration"
tags:
dag_file: "$1"
- match: "airflow.dagrun.*.first_task_scheduling_delay"
name: "airflow.dagrun.first_task_scheduling_delay"
tags:
dag_id: "$1"
- match: "airflow.pool.open_slots.*"
name: "airflow.pool.open_slots"
tags:
pool_name: "$1"
- match: "pool.queued_slots.*"
name: "airflow.pool.queued_slots"
tags:
pool_name: "$1"
- match: "pool.running_slots.*"
name: "airflow.pool.running_slots"
tags:
pool_name: "$1"
- match: "airflow.pool.used_slots.*"
name: "airflow.pool.used_slots"
tags:
pool_name: "$1"
- match: "airflow.pool.starving_tasks.*"
name: "airflow.pool.starving_tasks"
tags:
pool_name: "$1"
- match: 'airflow\.dagrun\.dependency-check\.(.*)'
match_type: "regex"
name: "airflow.dagrun.dependency_check"
tags:
dag_id: "$1"
- match: 'airflow\.dag\.(.*)\.([^.]*)\.duration'
match_type: "regex"
name: "airflow.dag.task.duration"
tags:
dag_id: "$1"
task_id: "$2"
- match: 'airflow\.dag_processing\.last_duration\.(.*)'
match_type: "regex"
name: "airflow.dag_processing.last_duration"
tags:
dag_file: "$1"
- match: 'airflow\.dagrun\.duration\.success\.(.*)'
match_type: "regex"
name: "airflow.dagrun.duration.success"
tags:
dag_id: "$1"
- match: 'airflow\.dagrun\.duration\.failed\.(.*)'
match_type: "regex"
name: "airflow.dagrun.duration.failed"
tags:
dag_id: "$1"
- match: 'airflow\.dagrun\.schedule_delay\.(.*)'
match_type: "regex"
name: "airflow.dagrun.schedule_delay"
tags:
dag_id: "$1"
- match: 'scheduler.tasks.running'
name: "airflow.scheduler.tasks.running"
- match: 'scheduler.tasks.starving'
name: "airflow.scheduler.tasks.starving"
- match: sla_email_notification_failure
name: 'airflow.sla_email_notification_failure'
- match: 'airflow\.task_removed_from_dag\.(.*)'
match_type: "regex"
name: "airflow.dag.task_removed"
tags:
dag_id: "$1"
- match: 'airflow\.task_restored_to_dag\.(.*)'
match_type: "regex"
name: "airflow.dag.task_restored"
tags:
dag_id: "$1"
- match: "airflow.task_instance_created-*"
name: "airflow.task.instance_created"
tags:
task_class: "$1"
- match: "ti.start.*.*"
name: "airflow.ti.start"
tags:
dagid: "$1"
taskid: "$2"
- match: "ti.finish.*.*.*"
name: "airflow.ti.finish"
tags:
dagid: "$1"
taskid: "$2"
state: "$3"
Use the default configuration in your airflow.d/conf.yaml
file to activate your Airflow service checks. See the sample airflow.d/conf.yaml for all available configuration options.
Available for Agent versions >6.0
Collecting logs is disabled by default in the Datadog Agent. Enable it in your datadog.yaml
file:
logs_enabled: true
Uncomment and edit this configuration block at the bottom of your airflow.d/conf.yaml
:
Change the path
and service
parameter values and configure them for your environment.
Configuration for DAG processor manager and Scheduler logs:
logs:
- type: file
path: "<PATH_TO_AIRFLOW>/logs/dag_processor_manager/dag_processor_manager.log"
source: airflow
service: "<SERVICE_NAME>"
log_processing_rules:
- type: multi_line
name: new_log_start_with_date
pattern: \[\d{4}\-\d{2}\-\d{2}
- type: file
path: "<PATH_TO_AIRFLOW>/logs/scheduler/*/*.log"
source: airflow
service: "<SERVICE_NAME>"
log_processing_rules:
- type: multi_line
name: new_log_start_with_date
pattern: \[\d{4}\-\d{2}\-\d{2}
Regular clean up is recommended for scheduler logs with daily log rotation.
Additional configuration for DAG tasks logs:
logs:
- type: file
path: "<PATH_TO_AIRFLOW>/logs/*/*/*/*.log"
source: airflow
service: "<SERVICE_NAME>"
log_processing_rules:
- type: multi_line
name: new_log_start_with_date
pattern: \[\d{4}\-\d{2}\-\d{2}
Caveat: By default Airflow uses this log file template for tasks: log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
. The number of log files will grow quickly if not cleaned regularly. This pattern is used by Airflow UI to display logs individually for each executed task.
If you do not view logs in Airflow UI, Datadog recommends this configuration in airflow.cfg
: log_filename_template = dag_tasks.log
. Then log rotate this file and use this configuration:
logs:
- type: file
path: "<PATH_TO_AIRFLOW>/logs/dag_tasks.log"
source: airflow
service: "<SERVICE_NAME>"
log_processing_rules:
- type: multi_line
name: new_log_start_with_date
pattern: \[\d{4}\-\d{2}\-\d{2}
For containerized environments, see the Autodiscovery Integration Templates for guidance on applying the parameters below.
Parameter | Value |
---|---|
<INTEGRATION_NAME> | airflow |
<INIT_CONFIG> | blank or {} |
<INSTANCE_CONFIG> | {"url": "%%host%%"} |
Available for Agent versions >6.0
Collecting logs is disabled by default in the Datadog Agent. To enable it, see Kubernetes log collection documentation.
Parameter | Value |
---|---|
<LOG_CONFIG> | {"source": "airflow", "service": "<YOUR_APP_NAME>"} |
Tips for Kubernetes installations:
airflow.cfg
, statsd_host
should be set to the IP address of the Kubernetes node.integrations-core
repo for an example setup.Run the Agent’s status subcommand and look for airflow
under the Checks section.
airflow.can_connect (count) | 1 if can connect to Airflow, otherwise 0 |
airflow.collect_db_dags (gauge) | Milliseconds taken for fetching all Serialized Dags from DB Shown as millisecond |
airflow.healthy (count) | 1 if Airflow is healthy, otherwise 0 |
airflow.job.start (count) | Number of started ` Shown as job |
airflow.job.end (count) | Number of ended ` Shown as job |
airflow.job.heartbeat.failure (count) | Number of failed Heartbeats for a ` Shown as error |
airflow.operator_failures (count) | Operator ` |
airflow.operator_successes (count) | Operator ` |
airflow.ti_failures (count) | Overall task instances failures Shown as task |
airflow.ti_successes (count) | Overall task instances successes Shown as task |
airflow.previously_succeeded (count) | Number of previously succeeded task instances Shown as task |
airflow.zombies_killed (count) | Zombie tasks killed Shown as task |
airflow.scheduler_heartbeat (count) | Scheduler heartbeats |
airflow.dag_processing.processes (count) | Number of currently running DAG parsing processes |
airflow.dag_processing.manager_stalls (count) | Number of stalled `DagFileProcessorManager` |
airflow.dag_file_refresh_error (count) | Number of failures loading any DAG files Shown as error |
airflow.scheduler.critical_section_duration (gauge) | Milliseconds spent in the critical section of scheduler loop -- only a single scheduler can enter this loop at a time Shown as millisecond |
airflow.scheduler.tasks.killed_externally (count) | Number of tasks killed externally Shown as task |
airflow.scheduler.orphaned_tasks.cleared (count) | Number of Orphaned tasks cleared by the Scheduler Shown as task |
airflow.scheduler.orphaned_tasks.adopted (count) | Number of Orphaned tasks adopted by the Scheduler Shown as task |
airflow.scheduler.critical_section_busy (count) | Count of times a scheduler process tried to get a lock on the critical section (needed to send tasks to the executor) and found it locked by another process. Shown as operation |
airflow.scheduler.tasks.running (count) | Number of tasks running in executor Shown as task |
airflow.scheduler.tasks.starving (count) | Number of tasks that cannot be scheduled because of no open slot in pool Shown as task |
airflow.dagbag_size (gauge) | DAG bag size |
airflow.dag_processing.import_errors (gauge) | Number of errors from trying to parse DAG files Shown as error |
airflow.dag_processing.total_parse_time (gauge) | Seconds taken to scan and import all DAG files once Shown as second |
airflow.dag_processing.last_runtime (gauge) | Seconds spent processing ` Shown as second |
airflow.dag_processing.last_run.seconds_ago (gauge) | Seconds since ` Shown as second |
airflow.dag_processing.processor_timeouts (gauge) | Number of file processors that have been killed due to taking too long |
airflow.executor.open_slots (gauge) | Number of open slots on executor |
airflow.executor.queued_tasks (gauge) | Number of queued tasks on executor Shown as task |
airflow.executor.running_tasks (gauge) | Number of running tasks on executor Shown as task |
airflow.pool.open_slots (gauge) | Number of open slots in the pool |
airflow.pool.queued_slots (gauge) | Number of queued slots in the pool |
airflow.pool.used_slots (gauge) | Number of used slots in the pool |
airflow.pool.running_slots (gauge) | Number of running slots in the pool |
airflow.pool.starving_tasks (gauge) | Number of starving tasks in the pool Shown as task |
airflow.dagrun.dependency_check (gauge) | Milliseconds taken to check DAG dependencies Shown as millisecond |
airflow.dag.task.duration (gauge) | Milliseconds taken to finish a task Shown as millisecond |
airflow.dag_processing.last_duration (gauge) | Milliseconds taken to load the given DAG file Shown as millisecond |
airflow.dagrun.duration.success (gauge) | Milliseconds taken for a DagRun to reach success state Shown as millisecond |
airflow.dagrun.duration.failed (gauge) | Milliseconds taken for a DagRun to reach failed state Shown as millisecond |
airflow.dagrun.schedule_delay (gauge) | Milliseconds of delay between the scheduled DagRun start date and the actual DagRun start date Shown as millisecond |
airflow.dagrun.first_task_scheduling_delay (gauge) | Milliseconds elapsed between first task start_date and dagrun expected start Shown as millisecond |
airflow.dag.loading_duration (gauge) | DAG loading duration in seconds (deprecated) Shown as second |
airflow.dag.task_removed (gauge) | Tasks removed from DAG Shown as second |
airflow.dag.task_restored (gauge) | Tasks restored to DAG Shown as second |
airflow.sla_email_notification_failure (count) | Number of failed SLA miss email notification attempts Shown as task |
airflow.task.instance_created (gauge) | Task instances created Shown as second |
airflow.ti.start (count) | Number of started task in a given dag. Shown as task |
airflow.ti.finish (count) | Number of completed task in a given dag. Shown as task |
airflow.dag.callback_exceptions (count) | Number of exceptions raised from DAG callbacks. When this happens, it means DAG callback is not working Shown as error |
airflow.celery.task_timeout_error (count) | Number of `AirflowTaskTimeout` errors raised when publishing Task to Celery Broker. Shown as error |
airflow.task_removed_from_dag (count) | Number of tasks removed for a given dag (i.e. task no longer exists in DAG) Shown as task |
airflow.task_restored_to_dag (count) | Number of tasks restored for a given dag (i.e. task instance which was previously in REMOVED state in the DB is added to DAG file) Shown as task |
airflow.task_instance_created (count) | Number of tasks instances created for a given Operator Shown as task |
airflow.scheduler.tasks.without_dagrun (count) | Number of tasks without DagRuns or with DagRuns not in Running state Shown as task |
airflow.scheduler.tasks.executable (count) | Number of tasks that are ready for execution (set to queued) with respect to pool limits, dag concurrency, executor state, and priority. Shown as task |
airflow.smart_sensor_operator.poked_tasks (count) | Number of tasks poked by the smart sensor in the previous poking loop Shown as task |
airflow.smart_sensor_operator.poked_success (count) | Number of newly succeeded tasks poked by the smart sensor in the previous poking loop Shown as task |
airflow.smart_sensor_operator.poked_exception (count) | Number of exceptions in the previous smart sensor poking loop Shown as error |
airflow.smart_sensor_operator.exception_failures (count) | Number of failures caused by exception in the previous smart sensor poking loop Shown as error |
airflow.smart_sensor_operator.infra_failures (count) | Number of infrastructure failures in the previous smart sensor poking loop Shown as error |
airflow.can_connect:
Returns CRITICAL
if unable to connect to Airflow. Returns OK
otherwise.
airflow.healthy:
Returns CRITICAL
if Airflow is not healthy. Returns OK
otherwise.
The Airflow check does not include any events.
In addition, Airflow DatadogHook can be used to interact with Datadog:
Need help? Contact Datadog support.