Custom Jobs using OpenLineage

Custom jobs using OpenLineage is in Preview.

Overview

Custom jobs use the OpenLineage standard to send job and lineage events to Datadog. Use custom jobs when you need to:

  • Capture lineage from systems Datadog doesn’t integrate with natively, such as in-house tools or custom ETL scripts
  • Emit lineage events for jobs or orchestrators where a native Datadog integration isn’t available

Replace the hostname in the examples with the relevant Datadog site for your organization. To find your Datadog site, see Access the Datadog site. This example uses datadoghq.com.

Note: To centralize configuration and avoid distributing API keys to every application, you can set up the Datadog Agent as an OpenLineage proxy.

Use one of the following options to send OpenLineage events to Datadog:

Send a raw OpenLineage RunEvent as JSON to Datadog’s intake endpoint.

curl -X POST "https://data-obs-intake.datadoghq.com/api/v1/lineage" \
  -H "Authorization: Bearer <DD_API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
        "eventTime": "2023-01-01T00:00:00Z",
        "eventType": "START",
        "run": { "runId": "123e4567-e89b-12d3-a456-426614174000" },
        "job": { "namespace": "default", "name": "test-job" },
        "producer": "your-producer-id"
      }'

Use the OpenLineage Python client with a manually specified HTTP transport.

from datetime import datetime
import uuid
from openlineage.client import OpenLineageClient, OpenLineageClientOptions
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run

client = OpenLineageClient(
    url="https://data-obs-intake.datadoghq.com",
    options=OpenLineageClientOptions(api_key="<DD_API_KEY>")
)

event = RunEvent(
    eventType=RunState.START,
    eventTime=datetime.utcnow().isoformat(),
    run=Run(runId=str(uuid.uuid4())),
    job=Job(namespace="default", name="test-job"),
    producer="your-producer-id"
)

client.emit(event)

In OpenLineage 1.37.0+, use the Datadog transport for automatic configuration and optimized event delivery.

from datetime import datetime
import uuid
from openlineage.client import OpenLineageClient
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run
from openlineage.client.transport.datadog import DatadogConfig, DatadogTransport

config = DatadogConfig(
    apiKey="<DD_API_KEY>",
    site="datadoghq.com"  # Change if using a different Datadog site
)

client = OpenLineageClient(transport=DatadogTransport(config))

event = RunEvent(
    eventType=RunState.START,
    eventTime=datetime.utcnow().isoformat(),
    run=Run(runId=str(uuid.uuid4())),
    job=Job(namespace="default", name="test-job"),
    producer="your-producer-id"
)

client.emit(event)
For Option 3, you can skip DatadogConfig by using environment variables:
export DD_API_KEY=your-datadog-api-key
export DD_SITE=datadoghq.com
export OPENLINEAGE__TRANSPORT__TYPE=datadog
client = OpenLineageClient.from_environment()

Further reading