Airflow
Rapport de recherche Datadog : Bilan sur l'adoption de l'informatique sans serveur Rapport : Bilan sur l'adoption de l'informatique sans serveur

Airflow

Agent Check Check de l'Agent

Supported OS: Linux Mac OS Windows

Présentation

L’Agent Datadog recueille de nombreuses métriques à partir d’Airflow, notamment pour :

  • Les DAG (Directed Acyclic Graphs, ou graphes orientés acycliques) : nombre de processus DAG, taille des dagbags, etc.
  • Les tâches : tâches non réussies, réussies, arrêtées, etc.
  • Les pools : emplacements libres, emplacements utilisés, etc.
  • Les exécuteurs : emplacements libres, tâches en attente, tâches en cours d’exécution, etc.

Les métriques sont recueillies via le plugin StatsD pour Airflow puis envoyées à Datadog DogStatsD.

En plus des métriques, l’Agent Datadog envoie également des checks de service liés à l’état de santé d’Airflow.

Configuration

Installation

Les étapes décrites ci-dessous sont toutes les trois nécessaires pour faire fonctionner l’intégration Airflow. Avant de commencer, installez l’Agent Datadog version >=6.17 ou >=7.17 pour bénéficier de la fonctionnalité de mapping Statsd/DogStatsD.

Étape 1 : configurez Airflow de façon à recueillir ses métriques de santé et ses checks de service.

Configurez le check Airflow inclus avec le package de l’Agent Datadog pour recueillir ses métriques de santé et ses checks de service.

Modifiez le fichier airflow.d/conf.yaml dans le dossier conf.d/ à la racine du répertoire de configuration de votre Agent pour commencer à recueillir vos checks de service Airflow. Consultez le fichier d’exemple airflow.d/conf.yaml pour découvrir toutes les options de configuration disponibles.

Étape 2 : connectez Airflow à DogStatsD (inclus avec l’Agent Datadog) via la fonctionnalité statsd d’Airflow pour recueillir des métriques.

  1. Installez le plugin StatsD pour Airflow.

    pip install 'apache-airflow[statsd]'
  2. Modifiez le fichier de configuration airflow.cfg d’Airflow pour y ajouter les paramètres suivants :

    [scheduler]
    statsd_on = True
    statsd_host = localhost
    statsd_port = 8125
    statsd_prefix = airflow
  3. Modifiez le fichier de configuration principal de l’Agent Datadog datadog.yaml pour y ajouter les paramètres suivants :

    # dogstatsd_mapper_cache_size: 1000  # default to 1000
    dogstatsd_mapper_profiles:
     - name: airflow
       prefix: "airflow."
       mappings:
         - match: "airflow.*_start"
           name: "airflow.job.start"
           tags:
             job_name: "$1"
         - match: "airflow.*_end"
           name: "airflow.job.end"
           tags:
             job_name: "$1"
         - match: "airflow.operator_failures_*"
           name: "airflow.operator_failures"
           tags:
             operator_name: "$1"
         - match: "airflow.operator_successes_*"
           name: "airflow.operator_successes"
           tags:
             operator_name: "$1"
         - match: 'airflow\.dag_processing\.last_runtime\.(.*)'
           match_type: "regex"
           name: "airflow.dag_processing.last_runtime"
           tags:
             dag_file: "$1"
         - match: 'airflow\.dag_processing\.last_run\.seconds_ago\.(.*)'
           match_type: "regex"
           name: "airflow.dag_processing.last_run.seconds_ago"
           tags:
             dag_file: "$1"
         - match: 'airflow\.dag\.loading-duration\.(.*)'
           match_type: "regex"
           name: "airflow.dag.loading_duration"
           tags:
             dag_file: "$1"
         - match: "airflow.pool.open_slots.*"
           name: "airflow.pool.open_slots"
           tags:
             pool_name: "$1"
         - match: "airflow.pool.used_slots.*"
           name: "airflow.pool.used_slots"
           tags:
             pool_name: "$1"
         - match: "airflow.pool.starving_tasks.*"
           name: "airflow.pool.starving_tasks"
           tags:
             pool_name: "$1"
         - match: 'airflow\.dagrun\.dependency-check\.(.*)'
           match_type: "regex"
           name: "airflow.dagrun.dependency_check"
           tags:
             dag_id: "$1"
         - match: 'airflow\.dag\.(.*)\.([^.]*)\.duration'
           match_type: "regex"
           name: "airflow.dag.task.duration"
           tags:
             dag_id: "$1"
             task_id: "$2"
         - match: 'airflow\.dag_processing\.last_duration\.(.*)'
           match_type: "regex"
           name: "airflow.dag_processing.last_duration"
           tags:
             dag_file: "$1"
         - match: 'airflow\.dagrun\.duration\.success\.(.*)'
           match_type: "regex"
           name: "airflow.dagrun.duration.success"
           tags:
             dag_id: "$1"
         - match: 'airflow\.dagrun\.duration\.failed\.(.*)'
           match_type: "regex"
           name: "airflow.dagrun.duration.failed"
           tags:
             dag_id: "$1"
         - match: 'airflow\.dagrun\.schedule_delay\.(.*)'
           match_type: "regex"
           name: "airflow.dagrun.schedule_delay"
           tags:
             dag_id: "$1"
         - match: 'airflow\.task_removed_from_dag\.(.*)'
           match_type: "regex"
           name: "airflow.dag.task_removed"
           tags:
             dag_id: "$1"
         - match: 'airflow\.task_restored_to_dag\.(.*)'
           match_type: "regex"
           name: "airflow.dag.task_restored"
           tags:
             dag_id: "$1"
         - match: "airflow.task_instance_created-*"
           name: "airflow.task.instance_created"
           tags:
             task_class: "$1"

Étape 3 : redémarrez l’Agent Datadog et Airflow.

  1. Redémarrez l’Agent.
  2. Redémarrez Airflow pour commencer à envoyer vos métriques Airflow à l’endpoint DogStatsD de l’Agent Datadog.

Checks de service de l’intégration

Utilisez la configuration par défaut de votre fichier airflow.d/conf.yaml pour activer la collecte de vos checks de service Airflow. Consultez le fichier d’exemple airflow.d/conf.yaml pour découvrir toutes les options de configuration disponibles.

Collecte de logs

Disponible à partir des versions > 6.0 de l’Agent

  1. La collecte de logs est désactivée par défaut dans l’Agent Datadog. Vous devez l’activer dans datadog.yaml :

    logs_enabled: true
  2. Supprimez la mise en commentaire du bloc de configuration suivant en bas de votre fichier airflow.d/conf.yaml, puis modifiez-le : Modifiez les valeurs des paramètres path et service et configurez-les pour votre environnement.

    • Configuration des logs pour le planificateur et le gestionnaire de processeurs DAG :

      logs:
      - type: file
        path: "<PATH_TO_AIRFLOW>/logs/dag_processor_manager/dag_processor_manager.log"
        source: airflow
        service: "<SERVICE_NAME>"
        log_processing_rules:
          - type: multi_line
            name: new_log_start_with_date
            pattern: \[\d{4}\-\d{2}\-\d{2}
      - type: file
        path: "<PATH_TO_AIRFLOW>/logs/scheduler/*/*.log"
        source: airflow
        service: "<SERVICE_NAME>"
        log_processing_rules:
          - type: multi_line
            name: new_log_start_with_date
            pattern: \[\d{4}\-\d{2}\-\d{2}

      Il est conseillé d’effectuer un nettoyage régulier des logs du planificateur, en réalisant une rotation quotidienne des logs.

    • Configuration supplémentaire pour les logs des tâches DAG :

      logs:
      - type: file
        path: "<PATH_TO_AIRFLOW>/logs/*/*/*/*.log"
        source: airflow
        service: "<SERVICE_NAME>"
        log_processing_rules:
          - type: multi_line
            name: new_log_start_with_date
            pattern: \[\d{4}\-\d{2}\-\d{2}

    Attention : par défaut, Airflow utilise le modèle de fichier de log suivant pour les tâches : log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log. Le nombre de logs augmentera considérablement si vous n’effectuez pas un nettoyage régulier. Ce modèle est utilisé par l’interface d’Airflow pour afficher tous les logs de chaque tâche exécutée.

    Si vous ne consultez pas de logs dans l’interface Airflow, nous conseillons plutôt d’utiliser la configuration suivante dans airflow.cfg : log_filename_template = dag_tasks.log. Effectuez ensuite une rotation de ce fichier et utilisez cette configuration :

      logs:
        - type: file
          path: "<PATH_TO_AIRFLOW>/logs/dag_tasks.log"
          source: airflow
          service: "<SERVICE_NAME>"
          log_processing_rules:
            - type: multi_line
              name: new_log_start_with_date
              pattern: \[\d{4}\-\d{2}\-\d{2}
  3. Redémarrez l’Agent.

Validation

Lancez la sous-commande status de l’Agent et cherchez airflow dans la section Checks.

Données collectées

Métriques

airflow.can_connect
(count)
1 if can connect to Airflow, otherwise 0
airflow.healthy
(count)
1 if Airflow is healthy, otherwise 0
airflow.job.start
(count)
Number of started `` job, ex. `SchedulerJob`, `LocalTaskJob`
Shown as job
airflow.job.end
(count)
Number of ended `` job, ex. `SchedulerJob`, `LocalTaskJob`
Shown as job
airflow.operator_failures
(count)
Operator `` failures
airflow.operator_successes
(count)
Operator `` successes
airflow.ti_failures
(count)
Overall task instances failures
Shown as task
airflow.ti_successes
(count)
Overall task instances successes
Shown as task
airflow.zombies_killed
(count)
Zombie tasks killed
Shown as task
airflow.scheduler_heartbeat
(count)
Scheduler heartbeats
airflow.dag_processing.processes
(count)
Number of currently running DAG parsing processes
airflow.scheduler.tasks.killed_externally
(count)
Number of tasks killed externally
Shown as task
airflow.scheduler.tasks.running
(count)
Number of tasks running in executor
Shown as task
airflow.scheduler.tasks.starving
(count)
Number of tasks that cannot be scheduled because of no open slot in pool
Shown as task
airflow.dagbag_size
(gauge)
DAG bag size
airflow.dag_processing.import_errors
(gauge)
Number of errors from trying to parse DAG files
Shown as error
airflow.dag_processing.total_parse_time
(gauge)
Seconds taken to scan and import all DAG files once
Shown as second
airflow.dag_processing.last_runtime
(gauge)
Seconds spent processing `` (in most recent iteration)
Shown as second
airflow.dag_processing.last_run.seconds_ago
(gauge)
Seconds since `` was last processed
Shown as second
airflow.dag_processing.processor_timeouts
(gauge)
Number of file processors that have been killed due to taking too long
airflow.executor.open_slots
(gauge)
Number of open slots on executor
airflow.executor.queued_tasks
(gauge)
Number of queued tasks on executor
Shown as task
airflow.executor.running_tasks
(gauge)
Number of running tasks on executor
Shown as task
airflow.pool.open_slots
(gauge)
Number of open slots in the pool
airflow.pool.queued_slots
(gauge)
Number of queued slots in the pool
airflow.pool.used_slots
(gauge)
Number of used slots in the pool
airflow.pool.running_slots
(gauge)
Number of running slots in the pool
airflow.pool.starving_tasks
(gauge)
Number of starving tasks in the pool
Shown as task
airflow.dagrun.dependency_check
(gauge)
Milliseconds taken to check DAG dependencies
Shown as millisecond
airflow.dag.task.duration
(gauge)
Milliseconds taken to finish a task
Shown as millisecond
airflow.dag_processing.last_duration
(gauge)
Milliseconds taken to load the given DAG file
Shown as millisecond
airflow.dagrun.duration.success
(gauge)
Milliseconds taken for a DagRun to reach success state
Shown as millisecond
airflow.dagrun.duration.failed
(gauge)
Milliseconds taken for a DagRun to reach failed state
Shown as millisecond
airflow.dagrun.schedule_delay
(gauge)
Milliseconds of delay between the scheduled DagRun start date and the actual DagRun start date
Shown as millisecond
airflow.dag.loading_duration
(gauge)
DAG loading duration in seconds (deprecated)
Shown as second
airflow.dag.task_removed
(gauge)
Tasks removed from DAG
Shown as second
airflow.dag.task_restored
(gauge)
Tasks restored to DAG
Shown as second
airflow.sla_email_notification_failure
(count)
Number of failed SLA miss email notification attempts
Shown as task
airflow.task.instance_created
(gauge)
Task instances created
Shown as second
airflow.ti.start
(count)
Number of started task in a given dag.
Shown as task
airflow.ti.finish
(count)
Number of completed task in a given dag.
Shown as task

Checks de service

airflow.can_connect :

Renvoie CRITICAL si l’Agent ne parvient pas à se connecter à Airflow. Si ce n’est pas le cas, renvoie OK.

airflow.healthy :

Renvoie CRITICAL si le processus Airflow n’est pas sain. Si ce n’est pas le cas, renvoie OK.

Événements

Le check Airflow n’inclut aucun événement.

Annexe

Hook Datadog pour Airflow

Un hook Datadog pour Airflow peut également être utilisé pour interagir avec Datadog :

  • Envoyer des métriques
  • Interroger des métriques
  • Envoyer des événements

Dépannage

Besoin d’aide ? Contactez l’assistance Datadog.