For AI agents: A markdown version of this page is available at https://docs.datadoghq.com/data_observability/jobs_monitoring/openlineage.md. A documentation index is available at /llms.txt.
This product is not supported for your selected Datadog site. ().
Custom jobs using OpenLineage is in Preview.

Overview

Custom jobs use the OpenLineage standard to send job and lineage events to Datadog. Use custom jobs when you need to:

  • Capture lineage from systems Datadog doesn’t integrate with natively, such as in-house tools or custom ETL scripts
  • Emit lineage events for jobs or orchestrators where a native Datadog integration isn’t available

Note: To centralize configuration and avoid distributing API keys to every application, you can set up the Datadog Agent as an OpenLineage proxy.

Prerequisites

  • A Datadog API key. See API and Application Keys.
  • Your Datadog site URL. The examples on this page use datadoghq.com; replace the hostname with the intake endpoint for your site.

Step 1: Send a START event

Use one of the following options to send OpenLineage events to Datadog:

Note: Datadog requires the jobType Job Facet to process run events.

To also see lineage edges between your job and its datasets, include inputs and outputs in your event. Dataset namespaces must match the format Datadog expects for each platform. See Dataset naming conventions.

Send a raw OpenLineage RunEvent as JSON to Datadog’s intake endpoint.

curl -X POST "https://data-obs-intake.datadoghq.com/api/v1/lineage" \
  -H "Authorization: Bearer <DD_API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
        "eventTime": "2024-01-01T10:00:00Z",
        "eventType": "START",
        "run": { "runId": "<RUN_UUID>" },
        "job": {
          "namespace": "<YOUR_NAMESPACE>",
          "name": "<YOUR_JOB_NAME>",
          "facets": {
            "jobType": {
              "_producer": "<YOUR_PRODUCER_ID>",
              "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json",
              "processingType": "BATCH",
              "integration": "custom",
              "jobType": "JOB"
            }
          }
        },
        "inputs": [
          {
            "namespace": "postgres://demo-db.example.com:5432",
            "name": "orders.public.orders"
          }
        ],
        "outputs": [
          {
            "namespace": "snowflake://myorg-myaccount",
            "name": "ANALYTICS.PUBLIC.ORDERS"
          }
        ],
        "producer": "<YOUR_PRODUCER_ID>"
      }'

Use the OpenLineage Python client with a manually specified HTTP transport.

from datetime import datetime
import uuid
from openlineage.client import OpenLineageClient, OpenLineageClientOptions
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run, InputDataset, OutputDataset
from openlineage.client.facet_v2 import job_type_job

client = OpenLineageClient(
    url="https://data-obs-intake.datadoghq.com",
    options=OpenLineageClientOptions(api_key="<DD_API_KEY>")
)

event = RunEvent(
    eventType=RunState.START,
    eventTime=datetime.utcnow().isoformat(),
    run=Run(runId=str(uuid.uuid4())),
    job=Job(
        namespace="<YOUR_NAMESPACE>",
        name="<YOUR_JOB_NAME>",
        facets={
            "jobType": job_type_job.JobTypeJobFacet(
                processingType="BATCH",
                integration="custom",
                jobType="JOB"
            )
        }
    ),
    inputs=[
        InputDataset(
            namespace="postgres://demo-db.example.com:5432",
            name="orders.public.orders"
        )
    ],
    outputs=[
        OutputDataset(
            namespace="snowflake://myorg-myaccount",
            name="ANALYTICS.PUBLIC.ORDERS"
        )
    ],
    producer="<YOUR_PRODUCER_ID>"
)

client.emit(event)

In OpenLineage 1.37.0+, use the Datadog transport for automatic configuration and optimized event delivery.

from datetime import datetime
import uuid
from openlineage.client import OpenLineageClient
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run, InputDataset, OutputDataset
from openlineage.client.facet_v2 import job_type_job
from openlineage.client.transport.datadog import DatadogConfig, DatadogTransport

config = DatadogConfig(
    apiKey="<DD_API_KEY>",
    site="datadoghq.com"  # Change if using a different Datadog site
)

client = OpenLineageClient(transport=DatadogTransport(config))

event = RunEvent(
    eventType=RunState.START,
    eventTime=datetime.utcnow().isoformat(),
    run=Run(runId=str(uuid.uuid4())),
    job=Job(
        namespace="<YOUR_NAMESPACE>",
        name="<YOUR_JOB_NAME>",
        facets={
            "jobType": job_type_job.JobTypeJobFacet(
                processingType="BATCH",
                integration="custom",
                jobType="JOB"
            )
        }
    ),
    inputs=[
        InputDataset(
            namespace="postgres://demo-db.example.com:5432",
            name="orders.public.orders"
        )
    ],
    outputs=[
        OutputDataset(
            namespace="snowflake://myorg-myaccount",
            name="ANALYTICS.PUBLIC.ORDERS"
        )
    ],
    producer="<YOUR_PRODUCER_ID>"
)

client.emit(event)

You can also configure the Datadog transport with environment variables instead of DatadogConfig:

export DD_API_KEY=<DD_API_KEY>
export DD_SITE=datadoghq.com
export OPENLINEAGE__TRANSPORT__TYPE=datadog
client = OpenLineageClient.from_environment()

Step 2: Verify in Datadog

After sending your events, check the following:

  • Jobs Monitoring: Your job run appears with start time, duration, and status.
  • Lineage graph: If you included inputs or outputs in your event, your job appears as a node connected to the dataset nodes.

Dataset naming conventions

To connect your custom job’s lineage to datasets already tracked by Datadog’s native integrations, include inputs and outputs in your event using the exact namespace and name that Datadog expects for that platform. For example, referencing a Snowflake table in your custom job’s outputs with the correct namespace and name links it to the existing dataset node in the lineage graph.

Datadog resolves datasets into a hierarchy of account, database, schema, and table. If a name has fewer parts than expected (for example, database.table instead of database.schema.table), Datadog falls back to the nearest higher-order node in the lineage graph.

PlatformNamespaceName
BigQuerybigquery{project}.{dataset}.{table}
Snowflakesnowflake://{org}-{account}{database}.{schema}.{table}
Redshiftredshift://{aws_account_id}:{region}:{cluster}{database}.{schema}.{table}
PostgreSQLpostgres://{host}:{port}{database}.{schema}.{table}
Databricksdatabricks://{workspace-url}{database}.{schema}.{table}
Trinotrino://{host}:{port}{catalog}.{schema}.{table}
AWS Gluearn:aws:glue:{region}:{accountId}{database}.{table}
S3s3://{bucket}{path}

For platforms not listed here, follow the OpenLineage naming conventions.

The following example shows a job reading from a PostgreSQL table and writing to a Snowflake table:

"inputs": [
  {
    "namespace": "postgres://db.example.com:5432",
    "name": "mydb.public.raw_orders"
  }
],
"outputs": [
  {
    "namespace": "snowflake://myorg-myaccount",
    "name": "ANALYTICS.PUBLIC.ORDERS"
  }
]

Note: If a dataset namespace is not recognized, Datadog still creates a lineage node for it but does not surface it in the Data Observability product. Use a recognized namespace format to have datasets appear in the catalog and lineage graph.

Supported facets

Facets are structured metadata attached to OpenLineage events. Each facet requires _producer (a URI identifying the system that produced it) and _schemaURL (a URI referencing its JSON schema).

JobTypeJobFacet

The jobType job facet is required. It determines how Datadog classifies and displays the job.

integration values

Use custom for custom jobs. The values below are used by Datadog’s native integrations. Using them for custom jobs may produce unexpected behavior. In particular, SPARK prevents span generation.

ValuePlatform
customCustom or unsupported platforms
SPARKApache Spark (native integration only; do not use for custom jobs)
AIRFLOWApache Airflow
DBTdbt
BIGQUERYGoogle BigQuery
SNOWFLAKESnowflake
TRINOTrino
ICEBERGApache Iceberg
TABLEAUTableau

processingType values

BATCH or STREAMING.

jobType values

Common values include JOB, TASK, DAG, MODEL, COMMAND, and QUERY.

Note: If jobType is set to QUERY, Datadog does not generate lineage nodes for the job.

Other supported facets

FacetWhat Datadog does
parentCreates parent-child job hierarchy in the lineage graph
errorMessageGenerates error spans with error.message and error.stack tags
tagsAdds span tags to the run; _dd.ol_service value maps to the Datadog service name
sqlParses and masks the SQL query; generates query events

Further reading