---
title: Custom Jobs using OpenLineage
description: >-
  Monitor jobs from in-house tools, custom pipelines, and orchestrators that
  don't have native Datadog integrations.
breadcrumbs: >-
  Docs > Data Observability Overview > Data Observability: Jobs Monitoring >
  Custom Jobs using OpenLineage
---

# Custom Jobs using OpenLineage

{% callout %}
# Important note for users on the following Datadog sites: app.ddog-gov.com, us2.ddog-gov.com

{% alert level="danger" %}
This product is not supported for your selected [Datadog site](https://docs.datadoghq.com/getting_started/site.md). ({% placeholder "user-datadog-site-name" /%}).
{% /alert %}

{% /callout %}

{% alert level="info" %}
Custom jobs using OpenLineage is in Preview.
{% /alert %}

## Overview{% #overview %}

Custom jobs use the [OpenLineage](https://openlineage.io/docs/spec/run-cycle/) standard to send job and lineage events to Datadog. Use custom jobs when you need to:

- Capture lineage from systems Datadog doesn't integrate with natively, such as in-house tools or custom ETL scripts
- Emit lineage events for jobs or orchestrators where a native Datadog integration isn't available

**Note**: To centralize configuration and avoid distributing API keys to every application, you can [set up the Datadog Agent as an OpenLineage proxy](https://docs.datadoghq.com/data_observability/jobs_monitoring/openlineage/datadog_agent_for_openlineage.md).

## Prerequisites{% #prerequisites %}

- A Datadog API key. See [API and Application Keys](https://docs.datadoghq.com/account_management/api-app-keys.md).
- Your Datadog [site URL](https://docs.datadoghq.com/getting_started/site.md#access-the-datadog-site). The examples on this page use `datadoghq.com`; replace the hostname with the intake endpoint for your site.

## Step 1: Send a `START` event{% #step-1-send-a-start-event %}

Use one of the following options to send [OpenLineage events](https://openlineage.io/docs/spec/run-cycle/) to Datadog:

**Note**: Datadog requires the `jobType` [Job Facet](https://openlineage.io/docs/spec/facets/job-facets/job-type/) to process run events.

To also see lineage edges between your job and its datasets, include `inputs` and `outputs` in your event. Dataset namespaces must match the format Datadog expects for each platform. See Dataset naming conventions.

{% tab title="Direct HTTP with curl" %}
Send a raw [OpenLineage RunEvent](https://openlineage.io/docs/spec/run-cycle/) as JSON to Datadog's intake endpoint.

```shell
curl -X POST "https://data-obs-intake.datadoghq.com/api/v1/lineage" \
  -H "Authorization: Bearer <DD_API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
        "eventTime": "2024-01-01T10:00:00Z",
        "eventType": "START",
        "run": { "runId": "<RUN_UUID>" },
        "job": {
          "namespace": "<YOUR_NAMESPACE>",
          "name": "<YOUR_JOB_NAME>",
          "facets": {
            "jobType": {
              "_producer": "<YOUR_PRODUCER_ID>",
              "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json",
              "processingType": "BATCH",
              "integration": "custom",
              "jobType": "JOB"
            }
          }
        },
        "inputs": [
          {
            "namespace": "postgres://demo-db.example.com:5432",
            "name": "orders.public.orders"
          }
        ],
        "outputs": [
          {
            "namespace": "snowflake://myorg-myaccount",
            "name": "ANALYTICS.PUBLIC.ORDERS"
          }
        ],
        "producer": "<YOUR_PRODUCER_ID>"
      }'
```

{% /tab %}

{% tab title="OpenLineage Python client (HTTP transport)" %}
Use the [OpenLineage Python client](https://openlineage.io/docs/client/python) with a manually specified HTTP transport.

```python
from datetime import datetime
import uuid
from openlineage.client import OpenLineageClient, OpenLineageClientOptions
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run, InputDataset, OutputDataset
from openlineage.client.facet_v2 import job_type_job

client = OpenLineageClient(
    url="https://data-obs-intake.datadoghq.com",
    options=OpenLineageClientOptions(api_key="<DD_API_KEY>")
)

event = RunEvent(
    eventType=RunState.START,
    eventTime=datetime.utcnow().isoformat(),
    run=Run(runId=str(uuid.uuid4())),
    job=Job(
        namespace="<YOUR_NAMESPACE>",
        name="<YOUR_JOB_NAME>",
        facets={
            "jobType": job_type_job.JobTypeJobFacet(
                processingType="BATCH",
                integration="custom",
                jobType="JOB"
            )
        }
    ),
    inputs=[
        InputDataset(
            namespace="postgres://demo-db.example.com:5432",
            name="orders.public.orders"
        )
    ],
    outputs=[
        OutputDataset(
            namespace="snowflake://myorg-myaccount",
            name="ANALYTICS.PUBLIC.ORDERS"
        )
    ],
    producer="<YOUR_PRODUCER_ID>"
)

client.emit(event)
```

{% /tab %}

{% tab title="OpenLineage Python client (Datadog transport)" %}
In OpenLineage 1.37.0+, use the [Datadog transport](https://openlineage.io/docs/client/python#datadog-transport) for automatic configuration and optimized event delivery.

```python
from datetime import datetime
import uuid
from openlineage.client import OpenLineageClient
from openlineage.client.event_v2 import RunEvent, RunState, Job, Run, InputDataset, OutputDataset
from openlineage.client.facet_v2 import job_type_job
from openlineage.client.transport.datadog import DatadogConfig, DatadogTransport

config = DatadogConfig(
    apiKey="<DD_API_KEY>",
    site="datadoghq.com"  # Change if using a different Datadog site
)

client = OpenLineageClient(transport=DatadogTransport(config))

event = RunEvent(
    eventType=RunState.START,
    eventTime=datetime.utcnow().isoformat(),
    run=Run(runId=str(uuid.uuid4())),
    job=Job(
        namespace="<YOUR_NAMESPACE>",
        name="<YOUR_JOB_NAME>",
        facets={
            "jobType": job_type_job.JobTypeJobFacet(
                processingType="BATCH",
                integration="custom",
                jobType="JOB"
            )
        }
    ),
    inputs=[
        InputDataset(
            namespace="postgres://demo-db.example.com:5432",
            name="orders.public.orders"
        )
    ],
    outputs=[
        OutputDataset(
            namespace="snowflake://myorg-myaccount",
            name="ANALYTICS.PUBLIC.ORDERS"
        )
    ],
    producer="<YOUR_PRODUCER_ID>"
)

client.emit(event)
```

You can also configure the Datadog transport with environment variables instead of `DatadogConfig`:

```shell
export DD_API_KEY=<DD_API_KEY>
export DD_SITE=datadoghq.com
export OPENLINEAGE__TRANSPORT__TYPE=datadog
```

```python
client = OpenLineageClient.from_environment()
```

{% /tab %}

## Step 2: Verify in Datadog{% #step-2-verify-in-datadog %}

After sending your events, check the following:

- [**Jobs Monitoring**](https://app.datadoghq.com/data-jobs): Your job run appears with start time, duration, and status.
- [**Lineage graph**](https://app.datadoghq.com/data-obs/lineage): If you included `inputs` or `outputs` in your event, your job appears as a node connected to the dataset nodes.

## Dataset naming conventions{% #dataset-naming-conventions %}

To connect your custom job's lineage to datasets already tracked by Datadog's native integrations, include `inputs` and `outputs` in your event using the exact `namespace` and `name` that Datadog expects for that platform. For example, referencing a Snowflake table in your custom job's `outputs` with the correct namespace and name links it to the existing dataset node in the lineage graph.

Datadog resolves datasets into a hierarchy of account, database, schema, and table. If a name has fewer parts than expected (for example, `database.table` instead of `database.schema.table`), Datadog falls back to the nearest higher-order node in the lineage graph.

| Platform   | Namespace                                        | Name                          |
| ---------- | ------------------------------------------------ | ----------------------------- |
| BigQuery   | `bigquery`                                       | `{project}.{dataset}.{table}` |
| Snowflake  | `snowflake://{org}-{account}`                    | `{database}.{schema}.{table}` |
| Redshift   | `redshift://{aws_account_id}:{region}:{cluster}` | `{database}.{schema}.{table}` |
| PostgreSQL | `postgres://{host}:{port}`                       | `{database}.{schema}.{table}` |
| Databricks | `databricks://{workspace-url}`                   | `{database}.{schema}.{table}` |
| Trino      | `trino://{host}:{port}`                          | `{catalog}.{schema}.{table}`  |
| AWS Glue   | `arn:aws:glue:{region}:{accountId}`              | `{database}.{table}`          |
| S3         | `s3://{bucket}`                                  | `{path}`                      |

For platforms not listed here, follow the [OpenLineage naming conventions](https://openlineage.io/docs/spec/naming/).

The following example shows a job reading from a PostgreSQL table and writing to a Snowflake table:

```json
"inputs": [
  {
    "namespace": "postgres://db.example.com:5432",
    "name": "mydb.public.raw_orders"
  }
],
"outputs": [
  {
    "namespace": "snowflake://myorg-myaccount",
    "name": "ANALYTICS.PUBLIC.ORDERS"
  }
]
```

**Note**: If a dataset namespace is not recognized, Datadog still creates a lineage node for it but does not surface it in the Data Observability product. Use a recognized namespace format to have datasets appear in the catalog and lineage graph.

## Supported facets{% #supported-facets %}

Facets are structured metadata attached to OpenLineage events. Each facet requires `_producer` (a URI identifying the system that produced it) and `_schemaURL` (a URI referencing its JSON schema).

### `JobTypeJobFacet`{% #jobtypejobfacet %}

The `jobType` job facet is **required**. It determines how Datadog classifies and displays the job.

#### `integration` values{% #integration-values %}

Use `custom` for custom jobs. The values below are used by Datadog's native integrations. Using them for custom jobs may produce unexpected behavior. In particular, `SPARK` prevents span generation.

| Value       | Platform                                                           |
| ----------- | ------------------------------------------------------------------ |
| `custom`    | Custom or unsupported platforms                                    |
| `SPARK`     | Apache Spark (native integration only; do not use for custom jobs) |
| `AIRFLOW`   | Apache Airflow                                                     |
| `DBT`       | dbt                                                                |
| `BIGQUERY`  | Google BigQuery                                                    |
| `SNOWFLAKE` | Snowflake                                                          |
| `TRINO`     | Trino                                                              |
| `ICEBERG`   | Apache Iceberg                                                     |
| `TABLEAU`   | Tableau                                                            |

#### `processingType` values{% #processingtype-values %}

`BATCH` or `STREAMING`.

#### `jobType` values{% #jobtype-values %}

Common values include `JOB`, `TASK`, `DAG`, `MODEL`, `COMMAND`, and `QUERY`.

**Note**: If `jobType` is set to `QUERY`, Datadog does not generate lineage nodes for the job.

### Other supported facets{% #other-supported-facets %}

| Facet          | What Datadog does                                                                  |
| -------------- | ---------------------------------------------------------------------------------- |
| `parent`       | Creates parent-child job hierarchy in the lineage graph                            |
| `errorMessage` | Generates error spans with `error.message` and `error.stack` tags                  |
| `tags`         | Adds span tags to the run; `_dd.ol_service` value maps to the Datadog service name |
| `sql`          | Parses and masks the SQL query; generates query events                             |

## Further reading{% #further-reading %}

- [Data Observability Overview](https://docs.datadoghq.com/data_observability.md)
- [Set up Datadog Agent for OpenLineage Proxy](https://docs.datadoghq.com/data_observability/jobs_monitoring/openlineage/datadog_agent_for_openlineage.md)
