- 필수 기능
- 시작하기
- Glossary
- 표준 속성
- Guides
- Agent
- 통합
- 개방형텔레메트리
- 개발자
- Administrator's Guide
- Datadog Mobile App
- CoScreen
- Cloudcraft
- 앱 내
- 서비스 관리
- 인프라스트럭처
- 애플리케이션 성능
- Continuous Profiler
- 스팬 시각화
- 데이터 스트림 모니터링
- 데이터 작업 모니터링
- 디지털 경험
- 소프트웨어 제공
- 보안
- AI Observability
- 로그 관리
- 관리
Supported OS
Datadog 에이전트는 Airflow에서 다음을 포함한 다양한 메트릭을 수집합니다.
메트릭은 Airflow StatsD 플러그인을 통해 수집되고 Datadog DogStatsD로 전송됩니다.
Datadog 에이전트는 메트릭에 더해 Airflow의 상태와 관련한 서비스 점검 결과도 전송합니다.
Airflow 통합이 잘 작동하도록 하려면 다음 단계를 올바로 따라야 합니다. 시작하기 전에 먼저 Datadog 에이전트를 설치하세요. 설치 버전은 >=6.17
이나 >=7.17
이어야 합니다. 이 버전에 StatsD/DogStatsD 매핑 기능이 포함되어 있습니다.
Airflow 통합은 두 부분으로 나누어집니다.
Airflow 통합의 메트릭은 에이전트와 StatsD 부분 모두에서 옵니다.
Datadog 에이전트 패키지에 포함된 Airflow 점검을 구성해 메트릭 상태와 서비스 점검을 수집합니다. 그러려면 에이전트 구성 디렉터리의 루트 수준에 있는 conf.d/
폴더에서 airflow.d/conf.yaml
파일 내 url
을 편집하세요. 그러면 Airflow 서비스 점검이 시작됩니다. 사용할 수 있는 구성 옵션 전체를 보려면 airflow.d/conf.yam 샘플을 참고하세요.
이 내 Airflow 웹서버 base_url
과 일치하는지 확인하세요. 이는 Airflow 인스턴스에 연결할 때 사용한 URL입니다.
Airflow statsd
기능을 사용해 Airflow를 DogStatsD(Datadog 에이전트에 포함되어 있음)에 연결하여 메트릭을 수집하세요. 사용된 Airflow 버전에 따른 메트릭 보고서와 추가 구성 옵션에 관해 자세히 알아보려면 아래 Airflow 설명서를 참고하세요.
참고: Airflow가 보고하는 StatsD 메트릭 존재 여부는 사용하는 Airflow Executor 종류에 따라 달라집니다. 예를 들어 KubernetesExecutor
6의 경우 airflow.ti_failures/successes
, airflow.operator_failures/successes
, airflow.dag.task.duration
이 보고되지 않습니다.
Airflow StatsD 플러그인을 설치합니다.
pip install 'apache-airflow[statsd]'
Airflow 구성 파일 airflow.cfg
에 다음 구성을 추가해 업데이트합니다.
statsd_on = True
# Hostname or IP of server running the Datadog Agent
statsd_host = localhost
# DogStatsD port configured in the Datadog Agent
statsd_port = 8125
statsd_prefix = airflow
Datadog 에이전트 주 구성 파일에 다음 구성을 추가해 업데이트합니다.
# dogstatsd_mapper_cache_size: 1000 # default to 1000
- name: airflow
prefix: "airflow."
- match: "airflow.*_start"
name: "airflow.job.start"
job_name: "$1"
- match: "airflow.*_end"
name: "airflow.job.end"
job_name: "$1"
- match: "airflow.*_heartbeat_failure"
name: airflow.job.heartbeat.failure
job_name: "$1"
- match: "airflow.operator_failures_*"
name: "airflow.operator_failures"
operator_name: "$1"
- match: "airflow.operator_successes_*"
name: "airflow.operator_successes"
operator_name: "$1"
- match: 'airflow\.dag_processing\.last_runtime\.(.*)'
match_type: "regex"
name: "airflow.dag_processing.last_runtime"
dag_file: "$1"
- match: 'airflow\.dag_processing\.last_run\.seconds_ago\.(.*)'
match_type: "regex"
name: "airflow.dag_processing.last_run.seconds_ago"
dag_file: "$1"
- match: 'airflow\.dag\.loading-duration\.(.*)'
match_type: "regex"
name: "airflow.dag.loading_duration"
dag_file: "$1"
- match: "airflow.local_task_job.task_exit.*.*.*.*"
name: "airflow.local_task_job.task_exit"
job_id: "$1"
dag_id: "$2"
task_id: "$3"
return_code: "$4"
- match: "airflow.dag.*.*.queue_duration"
name: "airflow.dag.queue_duration"
dag_id: "$1"
task_id: "$2"
- match: "airflow.dag.*.*.scheduled_duration"
name: "airflow.dag.scheduled_duration"
dag_id: "$1"
task_id: "$2"
- match: "airflow.dagrun.*.first_task_scheduling_delay"
name: "airflow.dagrun.first_task_scheduling_delay"
dag_id: "$1"
- match: "airflow.pool.open_slots.*"
name: "airflow.pool.open_slots"
pool_name: "$1"
- match: "airflow.pool.queued_slots.*"
name: "airflow.pool.queued_slots"
pool_name: "$1"
- match: "airflow.pool.running_slots.*"
name: "airflow.pool.running_slots"
pool_name: "$1"
- match: "airflow.pool.used_slots.*"
name: "airflow.pool.used_slots"
pool_name: "$1"
- match: "airflow.pool.starving_tasks.*"
name: "airflow.pool.starving_tasks"
pool_name: "$1"
- match: 'airflow\.dagrun\.dependency-check\.(.*)'
match_type: "regex"
name: "airflow.dagrun.dependency_check"
dag_id: "$1"
- match: 'airflow\.dag\.(.*)\.([^.]*)\.duration'
match_type: "regex"
name: "airflow.dag.task.duration"
dag_id: "$1"
task_id: "$2"
- match: 'airflow\.dag_processing\.last_duration\.(.*)'
match_type: "regex"
name: "airflow.dag_processing.last_duration"
dag_file: "$1"
- match: 'airflow\.dagrun\.duration\.success\.(.*)'
match_type: "regex"
name: "airflow.dagrun.duration.success"
dag_id: "$1"
- match: 'airflow\.dagrun\.duration\.failed\.(.*)'
match_type: "regex"
name: "airflow.dagrun.duration.failed"
dag_id: "$1"
- match: 'airflow\.dagrun\.schedule_delay\.(.*)'
match_type: "regex"
name: "airflow.dagrun.schedule_delay"
dag_id: "$1"
- match: 'airflow.scheduler.tasks.running'
name: "airflow.scheduler.tasks.running"
- match: 'airflow.scheduler.tasks.starving'
name: "airflow.scheduler.tasks.starving"
- match: 'airflow.sla_email_notification_failure'
name: 'airflow.sla_email_notification_failure'
- match: 'airflow\.task_removed_from_dag\.(.*)'
match_type: "regex"
name: "airflow.dag.task_removed"
dag_id: "$1"
- match: 'airflow\.task_restored_to_dag\.(.*)'
match_type: "regex"
name: "airflow.dag.task_restored"
dag_id: "$1"
- match: "airflow.task_instance_created-*"
name: "airflow.task.instance_created"
task_class: "$1"
- match: 'airflow\.ti\.start\.(.+)\.(\w+)'
match_type: regex
name: airflow.ti.start
dag_id: "$1"
task_id: "$2"
- match: 'airflow\.ti\.finish\.(\w+)\.(.+)\.(\w+)'
name: airflow.ti.finish
match_type: regex
dag_id: "$1"
task_id: "$2"
state: "$3"
파일의 기본 구성을 사용해 Airflow 서비스 점검을 활성화하세요. 사용할 수 있는 전체 사용 옵션을 보려면 샘플airflow.d/conf.yaml을 참고하세요.
에이전트 버전 > 6.0에서 사용 가능
Datadog Agent에서 로그 수집은 기본적으로 비활성화되어 있으므로 datadog.yaml
파일에서 활성화합니다.
logs_enabled: true
맨 아래에 있는 다음 구성 블록에 코멘트를 지우고 편집합니다.
와 service
파라미터 값을 변경하고 환경에 맞게 구성합니다.
DAG 프로세서 매니저와 스케쥴러 로그를 구성합니다.
- type: file
path: "<PATH_TO_AIRFLOW>/logs/dag_processor_manager/dag_processor_manager.log"
source: airflow
- type: multi_line
name: new_log_start_with_date
pattern: \[\d{4}\-\d{2}\-\d{2}
- type: file
path: "<PATH_TO_AIRFLOW>/logs/scheduler/latest/*.log"
source: airflow
- type: multi_line
name: new_log_start_with_date
pattern: \[\d{4}\-\d{2}\-\d{2}
매일 로그 회전을 통해 스케쥴러를 정기적으로 정리하는 것이 좋습니다.
DAG 작업 로그 추가 구성
- type: file
path: "<PATH_TO_AIRFLOW>/logs/*/*/*/*.log"
source: airflow
- type: multi_line
name: new_log_start_with_date
pattern: \[\d{4}\-\d{2}\-\d{2}
중요: 기본적으로 Airflow에서는 로그 파일 템플릿 log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
를 사용합니다. 정기적으로 제거하지 않으면 로그 파일 수가 빠른 속도로 늘어납니다. 각 실행 작업 로그를 개별적으로 Airflow UI에 표시하기 위해 이와 같은 패턴을 사용합니다.
Airflow UI로 로그를 보지 않는 경우, Datadog에서는 airflow.cfg
에 log_filename_template = dag_tasks.log
구성을 사용하기를 권장합니다. 그러면 로그가 이 파일을 회전하고 다음 구성을 사용합니다.
- type: file
path: "<PATH_TO_AIRFLOW>/logs/dag_tasks.log"
source: airflow
- type: multi_line
name: new_log_start_with_date
pattern: \[\d{4}\-\d{2}\-\d{2}
컨테이너화된 환경의 경우 자동탐지 통합 템플릿에 아래 파라미터를 적용하는 방법이 안내되어 있습니다.
파라미터 | 값 |
<INTEGRATION_NAME> | airflow |
<INIT_CONFIG> | 비어 있음 또는 {} |
<INSTANCE_CONFIG> | {"url": "http://%%host%%:8080"} |
이 내 Airflow 웹서버 base_url
과 일치하는지 확인하세요. 이는 Airflow 인스턴스에 연결할 때 사용한 URL입니다. localhost
를 템플릿 변수 %%host%%
로 변경하세요.
Airflow의 Helm 차트를 이용하는 경우, 웹 서버를 ClusterIP 서비스로 노출하며, 이 ClusterIP를 url
파라미터에서 사용하게 됩니다.
다음은 자동탐지 주석의 예시입니다.
apiVersion: v1
kind: Pod
# (...)
name: '<POD_NAME>'
ad.datadoghq.com/<CONTAINER_IDENTIFIER>.checks: |
"airflow": {
"instances": ["url": "http://airflow-ui.%%kube_namespace%%.svc.cluster.local:8080"]
# (...)
Airflow statsd
기능을 사용해 Airflow를 DogStatsD(Datadog 에이전트에 포함되어 있음)에 연결하여 메트릭을 수집하세요. 사용된 Airflow 버전에 따른 메트릭 보고서와 추가 구성 옵션에 관해 자세히 알아보려면 아래 Airflow 설명서를 참고하세요.
참고: Airflow가 보고하는 StatsD 메트릭 존재 여부는 사용하는 Airflow Executor 종류에 따라 달라집니다. 예를 들어 KubernetesExecutor
6의 경우 airflow.ti_failures/successes
, airflow.operator_failures/successes
, airflow.dag.task.duration
이 보고되지 않습니다.
참고: Airflow에 사용되는 환경 변수가 버전에 따라 다를 수 있습니다. 예를 들어 Airflow 2.0.0
를 활용하지만 Airflow 1.10.15
를 사용합니다.
쿠버네티스 배포의 경우 다음 환경 변수를 사용해 Airflow StatsD 구성을 활성화할 수 있습니다.
value: "True"
value: "8125"
value: "airflow"
fieldPath: status.hostIP
의 환경 변수는 노드의 호스트 IP 주소와 함께 제공되어 StatsD 데이터를 Airflow Pod와 같은 Pod인 Datadog 에이전트 포드로 라우팅합니다. 이 설정을 사용하려면 에이전트의 hostPort
인 8125
포트가 개방되어 있어야 하고 로컬이 아닌 StatsD 트래픽을 수락해야 합니다. 더 자세한 정보는 Kubernetes에서 DogStatsD 설정을 참고하세요.
그러면 StatsD 트래픽이 수신 준비가 된 상태로 Airflow 컨테이너에서 Datadog 에이전트로 이동합니다. 마지막으로 실행할 단계는 Datadog 에이전트를 적합한 dogstatsd_mapper_profiles
로 업데이트하는 것입니다. 그러려면 호스트 설치에 있는 dogstatsd_mapper_profiles
를 복사해 datadog.yaml
파일에 붙여 넣으세요. 또는 환경 변수 DD_DOGSTATSD_MAPPER_PROFILES
에서 동급의 JSON 구성을 사용해 Datadog 에이전트를 배포하세요. Kubernetes의 경우 동급 환경 변수 표기는 다음과 같습니다.
value: >
StatsD 메트릭에 비고정 태그를 추가하려면 DogStatsD 매퍼 프로파일을 사용해야 합니다. service
와 env
태그를 추가하는 매퍼 프로필 예시를 참고하세요.
에이전트 버전 > 6.0에서 사용 가능
Datadog 에이전트에서 로그 수집은 기본값으로 비활성화되어 있습니다. 이를 활성화하려면 Kubernetes 로그 수집을 참고하세요.
파라미터 | 값 |
<LOG_CONFIG> | {"source": "airflow", "service": "<YOUR_APP_NAME>"} |
에이전트 상태 하위 명령을 실행하고 점검 섹션에서 airflow
를 찾습니다.
추가로 Airflow DatadogHook을 사용해 Datadog과 소통할 수 있습니다.
airflow.can_connect (count) | 1 if can connect to Airflow, otherwise 0 |
airflow.celery.task_timeout_error (count) | Number of AirflowTaskTimeout errors raised when publishing Task to Celery Broker.Shown as error |
airflow.collect_db_dags (gauge) | Milliseconds taken for fetching all Serialized Dags from DB Shown as millisecond |
airflow.dag.callback_exceptions (count) | Number of exceptions raised from DAG callbacks. When this happens, it means DAG callback is not working Shown as error |
airflow.dag.loading_duration (gauge) | DAG loading duration in seconds (deprecated) Shown as second |
airflow.dag.task.duration (gauge) | Milliseconds taken to finish a task Shown as millisecond |
airflow.dag.task.ongoing_duration (gauge) | Current duration for ongoing DAG tasks Shown as second |
airflow.dag.task.total_running (gauge) | Total number of running tasks |
airflow.dag.task_removed (gauge) | Tasks removed from DAG Shown as second |
airflow.dag.task_restored (gauge) | Tasks restored to DAG Shown as second |
airflow.dag_file_refresh_error (count) | Number of failures loading any DAG files Shown as error |
airflow.dag_processing.import_errors (gauge) | Number of errors from trying to parse DAG files Shown as error |
airflow.dag_processing.last_duration (gauge) | Milliseconds taken to load the given DAG file Shown as millisecond |
airflow.dag_processing.last_run.seconds_ago (gauge) | Seconds since <dag_file> was last processedShown as second |
airflow.dag_processing.last_runtime (gauge) | Seconds spent processing <dag_file> (in most recent iteration)Shown as second |
airflow.dag_processing.manager_stalls (count) | Number of stalled DagFileProcessorManager |
airflow.dag_processing.processes (count) | Number of currently running DAG parsing processes |
airflow.dag_processing.processor_timeouts (gauge) | Number of file processors that have been killed due to taking too long |
airflow.dag_processing.total_parse_time (gauge) | Seconds taken to scan and import all DAG files once Shown as second |
airflow.dagbag_size (gauge) | DAG bag size |
airflow.dagrun.dependency_check (gauge) | Milliseconds taken to check DAG dependencies Shown as millisecond |
airflow.dagrun.duration.failed (gauge) | Milliseconds taken for a DagRun to reach failed state Shown as millisecond |
airflow.dagrun.duration.success (gauge) | Milliseconds taken for a DagRun to reach success state Shown as millisecond |
airflow.dagrun.first_task_scheduling_delay (gauge) | Milliseconds elapsed between first task start_date and dagrun expected start Shown as millisecond |
airflow.dagrun.schedule_delay (gauge) | Milliseconds of delay between the scheduled DagRun start date and the actual DagRun start date Shown as millisecond |
airflow.executor.open_slots (gauge) | Number of open slots on executor |
airflow.executor.queued_tasks (gauge) | Number of queued tasks on executor Shown as task |
airflow.executor.running_tasks (gauge) | Number of running tasks on executor Shown as task |
airflow.healthy (count) | 1 if Airflow is healthy, otherwise 0 |
airflow.job.end (count) | Number of ended <job_name> job, ex. SchedulerJob , LocalTaskJob Shown as job |
airflow.job.heartbeat.failure (count) | Number of failed Heartbeats for a <job_name> job, ex. SchedulerJob , LocalTaskJob Shown as error |
airflow.job.start (count) | Number of started <job_name> job, ex. SchedulerJob , LocalTaskJob Shown as job |
airflow.operator_failures (count) | Operator <operator_name> failures |
airflow.operator_successes (count) | Operator <operator_name> successes |
airflow.pool.open_slots (gauge) | Number of open slots in the pool |
airflow.pool.queued_slots (gauge) | Number of queued slots in the pool |
airflow.pool.running_slots (gauge) | Number of running slots in the pool |
airflow.pool.starving_tasks (gauge) | Number of starving tasks in the pool Shown as task |
airflow.pool.used_slots (gauge) | Number of used slots in the pool |
airflow.previously_succeeded (count) | Number of previously succeeded task instances Shown as task |
airflow.scheduler.critical_section_busy (count) | Count of times a scheduler process tried to get a lock on the critical section (needed to send tasks to the executor) and found it locked by another process. Shown as operation |
airflow.scheduler.critical_section_duration (gauge) | Milliseconds spent in the critical section of scheduler loop -- only a single scheduler can enter this loop at a time Shown as millisecond |
airflow.scheduler.orphaned_tasks.adopted (count) | Number of Orphaned tasks adopted by the Scheduler Shown as task |
airflow.scheduler.orphaned_tasks.cleared (count) | Number of Orphaned tasks cleared by the Scheduler Shown as task |
airflow.scheduler.tasks.executable (count) | Number of tasks that are ready for execution (set to queued) with respect to pool limits, dag concurrency, executor state, and priority. Shown as task |
airflow.scheduler.tasks.killed_externally (count) | Number of tasks killed externally Shown as task |
airflow.scheduler.tasks.running (count) | Number of tasks running in executor Shown as task |
airflow.scheduler.tasks.starving (count) | Number of tasks that cannot be scheduled because of no open slot in pool Shown as task |
airflow.scheduler.tasks.without_dagrun (count) | Number of tasks without DagRuns or with DagRuns not in Running state Shown as task |
airflow.scheduler_heartbeat (count) | Scheduler heartbeats |
airflow.sla_email_notification_failure (count) | Number of failed SLA miss email notification attempts Shown as task |
airflow.smart_sensor_operator.exception_failures (count) | Number of failures caused by exception in the previous smart sensor poking loop Shown as error |
airflow.smart_sensor_operator.infra_failures (count) | Number of infrastructure failures in the previous smart sensor poking loop Shown as error |
airflow.smart_sensor_operator.poked_exception (count) | Number of exceptions in the previous smart sensor poking loop Shown as error |
airflow.smart_sensor_operator.poked_success (count) | Number of newly succeeded tasks poked by the smart sensor in the previous poking loop Shown as task |
airflow.smart_sensor_operator.poked_tasks (count) | Number of tasks poked by the smart sensor in the previous poking loop Shown as task |
airflow.task.instance_created (gauge) | Task instances created Shown as second |
airflow.task_instance_created (count) | Number of tasks instances created for a given Operator Shown as task |
airflow.task_removed_from_dag (count) | Number of tasks removed for a given dag (i.e. task no longer exists in DAG) Shown as task |
airflow.task_restored_to_dag (count) | Number of tasks restored for a given dag (i.e. task instance which was previously in REMOVED state in the DB is added to DAG file) Shown as task |
airflow.ti.finish (count) | Number of completed task in a given dag. Shown as task |
airflow.ti.start (count) | Number of started task in a given dag. Shown as task |
airflow.ti_failures (count) | Overall task instances failures Shown as task |
airflow.ti_successes (count) | Overall task instances successes Shown as task |
airflow.zombies_killed (count) | Zombie tasks killed Shown as task |
참고: airflow.healthy
, airflow.can_connect
, airflow.dag.task.total_running
, airflow.dag.task.ongoing_duration
메트릭은 통합의 에이전트 부분에서 수집됩니다. 다른 메트릭은 StatsD에서 옵니다.
Airflow 점검에는 이벤트가 포함되지 않습니다.
if unable to connect to Airflow. Returns OK
Statuses: ok, critical
if Airflow is not healthy. Returns OK
Statuses: ok, critical
Airflow의 API에 인증된 요청을 보내려면 Datadog 에이전트의 파라미터를 설정해야 할 수 있습니다. 구성 옵션 중에서 하나를 선택해 사용하세요.
도움이 필요하신가요? Datadog 지원팀에 문의하세요.