This page is not yet available in Spanish. We are working on its translation.
If you have any questions or feedback about our current translation project, feel free to reach out to us!


Use the Observability Pipelines Worker to format your Amazon Data Firehose logs into a Datadog-rehydratable format before routing them to Datadog Log Archives.

The log sources, processors, and destinations available for this use case

This document walks you through the following steps:

  1. The prerequisites needed to set up Observability Pipelines
  2. Configuring a Log Archive
  3. Setting up Observability Pipelines
  4. Sending logs to the Observability Pipelines Worker


To use Observability Pipelines’ Amazon Data Firehose:

  • Since Amazon Data Firehose can only deliver data over HTTP to an HTTPS URL, when you deploy the Observability Pipelines Worker, you need to deploy it with a publicly exposed endpoint and solve TLS termination. To solve TLS termination, you can front OPW with a load balancer or configure TLS options. See Understand HTTP endpoint delivery request and response specifications for more information.
  • If your forwarders are globally configured to enable SSL, you need the appropriate TLS certificates and the password you used to create your private key.

Configure Log Archives

If you already have a Datadog Log Archive configured for Observability Pipelines, skip to Set up Observability Pipelines.

You need to have the Datadog integration for your cloud provider installed to set up Datadog Log Archive. See the AWS integration, Google Cloud Platform, and Azure integration documentation for more information.

Select the cloud provider you are using to archive your logs.

Create an Amazon S3 bucket

  1. Navigate to Amazon S3 buckets.
  2. Click Create bucket.
  3. Enter a descriptive name for your bucket.
  4. Do not make your bucket publicly readable.
  5. Optionally, add tags.
  6. Click Create bucket.

Set up an IAM policy that allows Workers to write to the S3 bucket

  1. Navigate to the IAM console.
  2. Select Policies in the left side menu.
  3. Click Create policy.
  4. Click JSON in the Specify permissions section.
  5. Copy the below policy and paste it into the Policy editor. Replace <MY_BUCKET_NAME> and <MY_BUCKET_NAME_1_/_MY_OPTIONAL_BUCKET_PATH_1> with the information for the S3 bucket you created earlier.
        "Version": "2012-10-17",
        "Statement": [
                "Sid": "DatadogUploadAndRehydrateLogArchives",
                "Effect": "Allow",
                "Action": ["s3:PutObject", "s3:GetObject"],
                "Resource": "arn:aws:s3:::<MY_BUCKET_NAME_1_/_MY_OPTIONAL_BUCKET_PATH_1>/*"
                "Sid": "DatadogRehydrateLogArchivesListBucket",
                "Effect": "Allow",
                "Action": "s3:ListBucket",
                "Resource": "arn:aws:s3:::<MY_BUCKET_NAME>"
  6. Click Next.
  7. Enter a descriptive policy name.
  8. Optionally, add tags.
  9. Click Create policy.

Create an IAM user

Create an IAM user and attach the IAM policy you created earlier to it.

  1. Navigate to the IAM console.
  2. Select Users in the left side menu.
  3. Click Create user.
  4. Enter a username.
  5. Click Next.
  6. Select Attach policies directly.
  7. Choose the IAM policy you created earlier to attach to the new IAM user.
  8. Click Next.
  9. Optionally, add tags.
  10. Click Create user.

Create access credentials for the new IAM user. The AWS access key and AWS secret access key are added as environment variables in the Install the Observability Pipelines Worker step.

Create a service account

Create a service account to use the policy you created above.

Create an IAM user

Create an IAM user and attach the IAM policy you created earlier to it.

  1. Navigate to the IAM console.
  2. Select Users in the left side menu.
  3. Click Create user.
  4. Enter a username.
  5. Click Next.
  6. Select Attach policies directly.
  7. Choose the IAM policy you created earlier to attach to the new IAM user.
  8. Click Next.
  9. Optionally, add tags.
  10. Click Create user.

Create access credentials for the new IAM user. The AWS access key and AWS secret access key are added later as environment variables when you install the Observability Pipelines Worker.

Create an IAM user

Create an IAM user and attach the IAM policy you created earlier to it.

  1. Navigate to the IAM console.
  2. Select Users in the left side menu.
  3. Click Create user.
  4. Enter a username.
  5. Click Next.
  6. Select Attach policies directly.
  7. Choose the IAM policy you created earlier to attach to the new IAM user.
  8. Click Next.
  9. Optionally, add tags.
  10. Click Create user.

Create access credentials for the new IAM user. The AWS access key and AWS secret access key are added as environment variables in the Install the Observability Pipelines Worker step.

Connect the S3 bucket to Datadog Log Archives

  1. Navigate to Datadog Log Forwarding.
  2. Click New archive.
  3. Enter a descriptive archive name.
  4. Add a query that filters out all logs going through log pipelines so that none of those logs go into this archive. For example, add the query observability_pipelines_read_only_archive, assuming no logs going through the pipeline have that tag added.
  5. Select AWS S3.
  6. Select the AWS account that your bucket is in.
  7. Enter the name of the S3 bucket.
  8. Optionally, enter a path.
  9. Check the confirmation statement.
  10. Optionally, add tags and define the maximum scan size for rehydration. See Advanced settings for more information.
  11. Click Save.

See the Log Archives documentation for additional information.

Crear un bucket de almacenamiento

  1. Navega hasta Google Cloud Storage.
  2. En la página Buckets, haz clic en Create (Crear) para crear un bucket para tus archivos…
  3. Introduce un nombre para el bucket y elige dónde almacenar los datos.
  4. Selecciona Fine-grained (Detallado) en la sección Choose how to control access to objects (Elegir cómo controlar el acceso a objetos).
  5. No añadas una política de retención porque los datos más recientes necesitan ser reescritos en algunos casos poco frecuentes (típicamente un caso de tiempo de espera).
  6. Haz clic en Create (Crear).

Crea una cuenta de servicio para permitir a los workers escribir en el bucket.

  1. Crea una cuenta de servicio de Google Cloud Storage.
    • Concede a la cuenta de servicio permisos para tu bucket con los permisos Storage Admin y Storage Object Admin.
    • Descarga el archivo JSON de claves de la cuenta de servicio. Este es el archivo JSON de credenciales y debe colocarse en DD_OP_DATA_DIR/config. Puedes hacer referencia a este archivo cuando configures el [destino de Google Cloud Storage] (#set-up-the-destinations) en la interfaz de usuario del pipeline más adelante.
  2. Sigue estas instrucciones para crear una clave de cuenta servicio. Elige json para el tipo de clave.

Conectar el bucket de almacenamiento a archivos de log de Datadog

  1. Navega a Reenvío de logs de Datadog.
  2. Haz clic en New archive (Nuevo archivo).
  3. Introduce un nombre de archivo descriptivo.
  4. Añade una consulta que filtre todos los logs que pasen por los pipelines de log para que ninguno de esos logs entre en este archivo. Por ejemplo, añade la consulta observability_pipelines_read_only_archive, suponiendo que ningún log que pase por el pipeline tenga esa etiqueta añadida.
  5. Selecciona Google Cloud Storage.
  6. Selecciona la cuenta de servicio en la que se encuentra tu bucket de almacenamiento.
  7. Selecciona el proyecto.
  8. Introduce el nombre del bucket de almacenamiento que creaste anteriormente.
  9. También puedes introducir una ruta.
  10. Opcionalmente, establece permisos, añade etiquetas y define el tamaño máximo de escaneo para la rehidratación. Consulta Configuración avanzada para obtener más información.
  11. Haz clic en Save (Guardar).

Para más información, consulta la documentación de Archivos de logs.

Crear una cuenta de almacenamiento

Crea una cuenta de almacenamiento de Azure si aún no tienes una.

  1. Navega hasta Storage accounts (Cuentas de almacenamiento).
  2. Haz clic en Create (Crear).
  3. Selecciona el nombre de la suscripción y el nombre del recurso que deseas utilizar.
  4. Introduce un nombre para tu cuenta de almacenamiento.
  5. Selecciona una región en el menú desplegable.
  6. Selecciona el tipo de cuenta Standard o Premium.
  7. Haz clic en Next (Siguiente).
  8. En la sección Blob storage (Almacenamiento de globos), selecciona el almacenamiento Hot (En caliente) o Cool (En frío).
  9. Haz clic en Review + create (Revisar + crear).

Crear un bucket de almacenamiento

  1. En tu cuenta de almacenamiento, haz clic en Containers (Contenedores) dentro de Data storage (Almacenamiento de datos) en el menú de navegación de la izquierda.
  2. Haz clic en + Container (+ Contenedor) en la parte superior para crear un nuevo contenedor.
  3. Introduce un nombre para el nuevo contenedor. Este nombre se utilizará más adelante cuando configures el destino Azure Storage de Observability Pipelines.

Nota: No establezcas políticas de inmutabilidad porque los datos más recientes podrían necesitar ser reescritos en casos pocos frecuentes (típicamente cuando hay un tiempo de espera).

Conecta el contenedor de Azure a los archivos de logs de Datadog

  1. Navega a Reenvío de logs de Datadog.
  2. Haz clic en New archive (Nuevo archivo).
  3. Introduce un nombre de archivo descriptivo.
  4. Añade una consulta que filtre todos los logs que pasen por los pipelines de log para que ninguno de esos logs entre en este archivo. Por ejemplo, añade la consulta observability_pipelines_read_only_archive, suponiendo que ningún log que pase por el pipeline tenga esa etiqueta añadida.
  5. Selecciona Azure Storage (Almacenamiento de Azure).
  6. Selecciona el inquilino de Azure y el cliente en el que se encuentra tu cuenta de almacenamiento.
  7. Introduce el nombre de la cuenta de almacenamiento.
  8. Introduce el nombre del contenedor que has creado anteriormente.
  9. Opcionalmente, introduce una ruta.
  10. Opcionalmente, establece permisos, añade etiquetas y define el tamaño máximo de escaneo para la rehidratación. Consulta Configuración avanzada para obtener más información.
  11. Haz clic en Save (Guardar).

Para más información, consulta la documentación de Archivos de logs.

Set up Observability Pipelines

  1. Navigate to Observability Pipelines.
  2. Select the Archive Logs template to create a new pipeline.
  3. Select the Amazon Data Firehose source.

Set up the source

Optionally, toggle the switch to enable TLS. If you enable TLS, the following certificate and key files are required:

  • Server Certificate Path: The path to the certificate file that has been signed by your Certificate Authority (CA) Root File in DER or PEM (X.509).
  • CA Certificate Path: The path to the certificate file that is your Certificate Authority (CA) Root File in DER or PEM (X.509).
  • Private Key Path: The path to the .key private key file that belongs to your Server Certificate Path in DER or PEM (PKCS#8) format.

Set up the destinations

Enter the following information based on your selected logs destinations.

  1. Optionally, enter the name of the Amazon OpenSearch index. See template syntax if you want to route logs to different indexes based on specific fields in your logs.
  2. Select an authentication strategy, Basic or AWS. For AWS, enter the AWS region.

To authenticate the Observability Pipelines Worker for Google Chronicle, contact your Google Security Operations representative for a Google Developer Service Account Credential. This credential is a JSON file and must be placed under DD_OP_DATA_DIR/config. See Getting API authentication credential for more information.

To set up the Worker’s Google Chronicle destination:

  1. Enter the customer ID for your Google Chronicle instance.
  2. Enter the path to the credentials JSON file you downloaded earlier.
  3. Select JSON or Raw encoding in the dropdown menu.
  4. Enter the log type. See template syntax if you want to route logs to different log types based on specific fields in your logs.

Note: Logs sent to the Google Chronicle destination must have ingestion labels. For example, if the logs are from a A10 load balancer, it must have the ingestion label A10_LOAD_BALANCER. See Google Cloud’s Support log types with a default parser for a list of available log types and their respective ingestion labels.

To use the CrowdStrike NG-SIEM destination, you need to set up a CrowdStrike data connector using the HEC/HTTP Event Connector. See Step 1: Set up the HEC/HTTP event data connector for instructions. When you set up the data connector, you are given a HEC API key and URL, which you use when you configure the Observability Pipelines Worker later on.

  1. Select JSON or Raw encoding in the dropdown menu.
  2. Optionally, enable compressions and select an algorithm (gzip or zlib) in the dropdown menu.
  3. Optionally, toggle the switch to enable TLS. If you enable TLS, the following certificate and key files are required:
    • Server Certificate Path: The path to the certificate file that has been signed by your Certificate Authority (CA) Root File in DER or PEM (X.509).
    • CA Certificate Path: The path to the certificate file that is your Certificate Authority (CA) Root File in DER or PEM (X.509).
    • Private Key Path: The path to the .key private key file that belongs to your Server Certificate Path in DER or PEM (PKCS#8) format.

There are no configuration steps for your Datadog destination.

Si el worker está ingiriendo logs que no proceden del Datadog Agent y se envían a un archivo utilizando como destino archivos Datadog de Observability Pipelines, esos logs no se etiquetan con atributos reservados. Además, los logs rehidratados en Datadog no tendrán asignados atributos estándar. Esto significa que cuando rehidrates tus logs en Log Management, podrías perder la telemetría de Datadog, la capacidad de buscar logs fácilmente y las ventajas del etiquetado unificado de servicios si no estructuras y reasignas tus logs en Observability Pipelines antes de enrutarlos a un archivo.

Por ejemplo, digamos que tus syslogs se envían a archivos de Datadog y que esos logs tienen su estado etiquetado como gravedad, en lugar del atributo reservado de estado, y el host etiquetado como host-name, en lugar del atributo reservado hostname. Cuando estos logs se rehidratan en Datadog, el estado de cada log se define como info y ninguno de los logs tiene una etiqueta (tag) hostname.

Follow the instructions for the cloud provider you are using to archive your logs.

  1. Enter the S3 bucket name for the S3 bucket you created earlier.
  2. Enter the AWS region the S3 bucket is in.
  3. Enter the key prefix.
    • Prefixes are useful for partitioning objects. For example, you can use a prefix as an object key to store objects under a particular directory. If using a prefix for this purpose, it must end in / to act as a directory path; a trailing / is not automatically added.
    • See template syntax if you want to route logs to different object keys based on specific fields in your logs.
  4. Select the storage class for your S3 bucket in the Storage Class dropdown menu.

Your AWS access key ID and AWS secret access key are set as environment variables when you install the Worker later.

  1. Introduce el nombre del bucket de almacenamiento de Google Cloud que creaste anteriormente.
  2. Introduce la ruta al archivo JSON de credenciales que descargaste anteriormente.
  3. Selecciona la clase de almacenamiento para los objetos creados.
  4. Selecciona el nivel de acceso de los objetos creados.
  5. Opcionalmente, introduce el prefijo. Los prefijos son útiles para particionar objetos. Por ejemplo, puedes utilizar un prefijo como clave de objeto para almacenar objetos en un directorio concreto. Si utilizas un prefijo con este fin, debe terminar en / para que actúe como una ruta de directorio; no se añade automáticamente una / al final.
  6. Si lo deseas, haz clic en Add Header (Añadir encabezado) para añadir metadatos.
  1. Enter the name of the Azure container you created earlier.
  2. Optionally, enter a prefix.
    • Prefixes are useful for partitioning objects. For example, you can use a prefix as an object key to store objects under a particular directory. If using a prefix for this purpose, it must end in / to act as a directory path; a trailing / is not automatically added.
    • See template syntax if you want to route logs to different object keys based on specific fields in your logs.

The following fields are optional:

  1. Enter the name for the Elasticsearch index. See template syntax if you want to route logs to different indexes based on specific fields in your logs.
  2. Enter the Elasticsearch version.
  1. Enter the client ID for your application.
  2. Enter the directory ID for your tenant.
  3. Enter the name of the table to which you are sending the logs.
  4. Enter the Data Collection Rule (DCR) immutable ID.

Select the data center region (US or EU) of your New Relic account.

Optionally, enter the name of the OpenSearch index. See template syntax if you want to route logs to different indexes based on specific fields in your logs.

Select your SentinelOne logs environment in the dropdown menu.

The following fields are optional:

  1. Enter the name of the Splunk index you want your data in. This has to be an allowed index for your HEC. See template syntax if you want to route logs to different indexes based on specific fields in your logs.
  2. Select whether the timestamp should be auto-extracted. If set to true, Splunk extracts the timestamp from the message with the expected format of yyyy-mm-dd hh:mm:ss.
  3. Optionally, set the sourcetype to override Splunk’s default value, which is httpevent for HEC data. See template syntax if you want to route logs to different source types based on specific fields in your logs.

The following fields are optional:

  1. In the Encoding dropdown menu, select whether you want to encode your pipeline’s output in JSON, Logfmt, or Raw text. If no decoding is selected, the decoding defaults to JSON.
  2. Enter a source name to override the default name value configured for your Sumo Logic collector’s source.
  3. Enter a host name to override the default host value configured for your Sumo Logic collector’s source.
  4. Enter a category name to override the default category value configured for your Sumo Logic collector’s source.
  5. Click Add Header to add any custom header fields and values.
The rsyslog and syslog-ng destinations support the RFC5424 format.

The rsyslog and syslog-ng destinations match these log fields to the following Syslog fields:

Log EventSYSLOG FIELDDefault
log[“procid”]PROCIDThe running Worker’s process ID.
log[“facility”]FACILITY8 (log_user)
log[“timestamp”]TIMESTAMPCurrent UTC time.

The following destination settings are optional:

  1. Toggle the switch to enable TLS. If you enable TLS, the following certificate and key files are required:
    • Server Certificate Path: The path to the certificate file that has been signed by your Certificate Authority (CA) Root File in DER or PEM (X.509).
    • CA Certificate Path: The path to the certificate file that is your Certificate Authority (CA) Root File in DER or PEM (X.509).
    • Private Key Path: The path to the .key private key file that belongs to your Server Certificate Path in DER or PEM (PKCS#8) format.
  2. Enter the number of seconds to wait before sending TCP keepalive probes on an idle connection.

Add additional destinations

Click the plus sign (+) to the left of the destinations to add additional destinations to the same set of processors.

To delete a destination, click on the pencil icon to the top right of the destination, and select Delete destination. If you delete a destination from a processor group that has multiple destinations, only the deleted destination is removed. If you delete a destination from a processor group that only has one destination, both the destination and the processor group are removed.


  • A pipeline must have at least one destination. If a processor group only has one destination, that destination cannot be deleted.
  • You can add a total of three destinations for a pipeline.
  • A specific destination can only be added once. For example, you cannot add multiple Splunk HEC destinations.

Set up processors

There are pre-selected processors added to your processor group out of the box. You can add additional processors or delete any existing ones based on your processing needs.

Processor groups are executed from top to bottom. The order of the processors is important because logs are checked by each processor, but only logs that match the processor’s filters are processed. To modify the order of the processors, use the drag handle on the top left corner of the processor you want to move.

Sintaxis de las consultas de filtro

Cada procesador tiene una consulta de filtro correspondiente en sus campos. Los procesadores sólo procesan los logs que coinciden con su consulta de filtro. Y en todos los procesadores, excepto el procesador de filtro, los logs que no coinciden con la consulta se envían al siguiente paso de la cadena. Para el procesador de filtro, los logs que no coinciden con la consulta se descartan.

Para cualquier atributo, etiqueta (tag) o par key:value que no sea un atributo reservado, la consulta debe empezar por @. Por el contrario, para filtrar atributos reservados, no es necesario añadir @ delante de la consulta de filtro.

Por ejemplo, para filtrar y descartar logs status:info, tu filtro puede definirse como NOT (status:info). Para filtrar y descartar system-status:info, el filtro debe ser NOT (@system-status:info).

Ejemplos de consulta de filtro:

  • NOT (status:debug): Esto filtra sólo los logs que no tienen el estado DEBUG.
  • status:ok service:flask-web-app: Esto filtra todos los logs con el estado OK de tu servicioflask-web-app.
    • Esta consulta también se puede escribir como: status:ok AND service:flask-web-app.
  • host:COMP-A9JNGYK OR host:COMP-J58KAS: Esta consulta de filtro sólo coincide con los logs de hosts etiquetados.
  • @user.status:inactive: Esto filtra los logs con el estado inactive anidado bajo el atributo user.

Las consultas ejecutadas en el worker de Observability Pipelines distinguen entre mayúsculas y minúsculas. Obtén más información sobre cómo escribir consultas de filtro con la sintaxis de búsqueda de logs de Datadog.

Añadir procesadores

Introduce la información de los procesadores que desees utilizar. Pulsa el botón Add (Añadir) para añadir procesadores adicionales. Para eliminar un procesador, haz clic en el botón situado a la derecha del procesador y selecciona Delete (Eliminar).

The log processors available

Utiliza este procesador para añadir un nombre de campo y el valor de una variable de entorno al mensaje de log.

Para configurar este procesador:

  1. Define una consulta de filtro. Sólo se procesan los logs que coinciden con la consulta de filtro especificada. Todos los logs, independientemente de si coinciden con la consulta de filtro, se envían al siguiente paso del pipeline.
  2. Introduce el nombre del campo para la variable de entorno.
  3. Introduce el nombre de la variable de entorno.
  4. Haz clic en Add Environment Variable (Añadir variable de entorno) si deseas añadir otra variable de entorno.
Variables de entorno bloqueadas

Las variables de entorno que coincidan con alguno de los siguientes patrones no podrán añadirse a los mensajes de log porque la variable de entorno podría contener datos confidenciales.

  • AUTH
  • CERT
  • KEY
  • PWD
  • ROOT
  • USER

La variable de entorno coincide con el patrón y no con la palabra literal. Por ejemplo, PASSWORD bloquea las variables de entorno como USER_PASSWORD y PASSWORD_SECRET para que no se añadan a los mensajes de log.

Este procesador añade un campo con el nombre del host que envió el log. Por ejemplo, hostname: 613e197f3526. Nota: Si el hostname ya existe, el worker lanza un error y no sobrescribe el hostname existente.

Para configurar este procesador:

  • Define una consulta de filtro. Sólo se procesan los logs que coinciden con la [consulta de filtro] especificada (#filter-query-syntax). Todos los logs, independientemente de si coinciden o no con la consulta de filtro, se envían al siguiente paso del proceso.

El procesador de deduplicación elimina copias de datos para reducir el volumen y el ruido. Almacena en caché 5000 mensajes a la vez y compara el tráfico entrante de logs con los mensajes almacenados en caché. Por ejemplo, este procesador puede utilizarse para conservar sólo logs de advertencia únicos en el caso de que se envíen varios logs de advertencia idénticos seguidos.

Para configurar el procesador de deduplicación:

  1. Define una consulta de filtro. Sólo se procesan los logs que coinciden con la [consulta de filtro] especificada (#filter-query-syntax). Todos los logs deduplicados y los logs que no coinciden con la consulta de filtro se envían al siguiente paso del pipeline.
  2. En el menú desplegable Type of deduplication (Tipo de deduplicación), selecciona si deseas Match en o Ignore los campos especificados a continuación.
    • Si se selecciona Match, después de que pase un log, se eliminarán los futuros logs que tengan los mismos valores para todos los campos que especifiques a continuación.
    • Si se selecciona Ignore, después de que pase un log, se eliminarán los futuros logs que tengan los mismos valores para todos los campos, excepto los que especifiques a continuación.
  3. Introduce los campos con los que deseas establecer una correspondencia o ignorarlos. Se requiere al menos un campo, y puedes especificar un máximo de tres campos.
    • Utiliza la notación de ruta <OUTER_FIELD>.<INNER_FIELD> para hacer coincidir subcampos. Consulta el Ejemplo de notación de ruta más abajo.
  4. Haz clic en Add field (Añadir campo) para añadir los campos adicionales que desees filtrar.
Ejemplo de notación de ruta

Para la siguiente estructura de mensajes, utiliza outer_key.inner_key.double_inner_key para referirse a la clave con el valor double_inner_value.

    "outer_key": {
        "inner_key": "inner_value",
            "a": {
                    "double_inner_key": "double_inner_value",
                    "b": "b value"
            "c": "c value"
        "d": "d value"

The remap processor can add, drop, or rename fields within your individual log data. Use this processor to enrich your logs with additional context, remove low-value fields to reduce volume, and standardize naming across important attributes. Select add field, drop field, or rename field in the dropdown menu to get started.

Add field

Use add field to append a new key-value field to your log.

To set up the add field processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the field and value you want to add. To specify a nested field for your key, use the path notation: <OUTER_FIELD>.<INNER_FIELD>. All values are stored as strings. Note: If the field you want to add already exists, the Worker throws an error and the existing field remains unchanged.
Drop field

Use drop field to drop a field from logging data that matches the filter you specify below. It can delete objects, so you can use the processor to drop nested keys.

To set up the drop field processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the key of the field you want to drop. To specify a nested field for your specified key, use the path notation: <OUTER_FIELD>.<INNER_FIELD>. Note: If your specified key does not exist, your log will be unimpacted.
Rename field

Use rename field to rename a field within your log.

To set up the rename field processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the name of the field you want to rename in the Source field. To specify a nested field for your key, use the path notation: <OUTER_FIELD>.<INNER_FIELD>. Once renamed, your original field is deleted unless you enable the Preserve source tag checkbox described below.
    Note: If the source key you specify doesn’t exist, a default null value is applied to your target.
  3. In the Target field, enter the name you want the source field to be renamed to. To specify a nested field for your specified key, use the path notation: <OUTER_FIELD>.<INNER_FIELD>.
    Note: If the target field you specify already exists, the Worker throws an error and does not overwrite the existing target field.
  4. Optionally, check the Preserve source tag box if you want to retain the original source field and duplicate the information from your source key to your specified target key. If this box is not checked, the source key is dropped after it is renamed.
Path notation example

For the following message structure, use outer_key.inner_key.double_inner_key to refer to the key with the value double_inner_value.

    "outer_key": {
        "inner_key": "inner_value",
        "a": {
            "double_inner_key": "double_inner_value",
            "b": "b value"
        "c": "c value"
    "d": "d value"

Use this processor to enrich your logs with information from a reference table, which could be a local file or database.

To set up the enrichment table processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the source attribute of the log. The source attribute’s value is what you want to find in the reference table.
  3. Enter the target attribute. The target attribute’s value stores, as a JSON object, the information found in the reference table.
  4. Select the type of reference table you want to use, File or GeoIP.
    • For the File type:
      1. Enter the file path.
      2. Enter the column name. The column name in the enrichment table is used for matching the source attribute value. See the Enrichment file example.
    • For the GeoIP type, enter the GeoIP path.
Enrichment file example

For this example, merchant_id is used as the source attribute and merchant_info as the target attribute.

This is the example reference table that the enrichment processor uses:

803Andy’s OttomansBoiseIdaho
536Cindy’s CouchesBoulderColorado
235Debra’s BenchesLas VegasNevada

merch_id is set as the column name the processor uses to find the source attribute’s value. Note: The source attribute’s value does not have to match the column name.

If the enrichment processor receives a log with "merchant_id":"536":

  • The processor looks for the value 536 in the reference table’s merch_id column.
  • After it finds the value, it adds the entire row of information from the reference table to the merchant_info attribute as a JSON object:
merchant_info {
    "merchant_name":"Cindy's Couches",

This processor filters for logs that match the specified filter query and drops all non-matching logs. If a log is dropped at this processor, then none of the processors below this one receives that log. This processor can filter out unnecessary logs, such as debug or warning logs.

To set up the filter processor:

  • Define a filter query. The query you specify filters for and passes on only logs that match it, dropping all other logs.

Many types of logs are meant to be used for telemetry to track trends, such as KPIs, over long periods of time. Generating metrics from your logs is a cost-effective way to summarize log data from high-volume logs, such as CDN logs, VPC flow logs, firewall logs, and networks logs. Use the generate metrics processor to generate either a count metric of logs that match a query or a distribution metric of a numeric value contained in the logs, such as a request duration.

Note: The metrics generated are custom metrics and billed accordingly. See Custom Metrics Billing for more information.

To set up the processor:

Click Manage Metrics to create new metrics or edit existing metrics. This opens a side panel.

  • If you have not created any metrics yet, enter the metric parameters as described in the Add a metric section to create a metric.
  • If you have already created metrics, click on the metric’s row in the overview table to edit or delete it. Use the search bar to find a specific metric by its name, and then select the metric to edit or delete it. Click Add Metric to add another metric.
Add a metric
  1. Enter a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they match the filter query, are sent to the next step in the pipeline. Note: Since a single processor can generate multiple metrics, you can define a different filter query for each metric.
  2. Enter a name for the metric.
  3. In the Define parameters section, select the metric type (count, gauge, or distribution). See the Count metric example and Distribution metric example. Also see Metrics Types for more information.
    • For gauge and distribution metric types, select a log field which has a numeric (or parseable numeric string) value that is used for the value of the generated metric.
    • For the distribution metric type, the log field’s value can be an array of (parseable) numerics, which is used for the generated metric’s sample set.
    • The Group by field determines how the metric values are grouped together. For example, if you have hundreds of hosts spread across four regions, grouping by region allows you to graph one line for every region. The fields listed in the Group by setting are set as tags on the configured metric.
  4. Click Add Metric.
Metrics Types

You can generate these types of metrics for your logs. See the Metrics Types and Distributions documentation for more details.

Metric typeDescriptionExample
COUNTRepresents the total number of event occurrences in one time interval. This value can be reset to zero, but cannot be decreased.You want to count the number of logs with status:error.
GAUGERepresents a snapshot of events in one time interval.You want to measure the latest CPU utilization per host for all logs in the production environment.
DISTRIBUTIONRepresent the global statistical distribution of a set of values calculated across your entire distributed infrastructure in one time interval.You want to measure the average time it takes for an API call to be made.
Count metric example

For this status:error log example:

{"status": "error", "env": "prod", "host": "ip-172-25-222-111.ec2.internal"}

To create a count metric that counts the number of logs that contain "status":"error" and groups them by env and host, enter the following information:

Input parametersValue
Filter query@status:error
Metric namestatus_error_total
Metric typeCount
Group byenv, prod
Distribution metric example

For this example of an API response log:

    "timestamp": "2018-10-15T17:01:33Z",
    "method": "GET",
    "status": 200,
    "request_body": "{"information"}",
    "response_time_seconds: 10

To create a distribution metric that measures the average time it takes for an API call to be made, enter the following information:

Input parametersValue
Filter query@method
Metric namestatus_200_response
Metric typeDistribution
Select a log attributeresponse_time_seconds
Group bymethod

Este procesador analiza logs mediante las reglas de parseo grok disponibles para un conjunto de orígenes. Las reglas se aplican automáticamente a logs basándose en el origen del log. Por lo tanto, los logs deben tener un campo source con el nombre del origen. Si este campo no se añade cuando el log se envía al worker de pipelines de observabilidad, puedes utilizar el procesador Add field (Añadir campo) para añadirlo.

Si el campo source de un log coincide con uno de los conjuntos de reglas de parseo grok, el campo message del log se comprueba con esas reglas. Si una regla coincide, los datos analizados resultantes se añaden al campo message como un objeto JSON, sobrescribiendo el message original.

Si no hay un campo source en el log, o ninguna regla coincide con el log message, entonces no se realizan cambios en el log y se envía al siguiente paso del pipeline.

Para configurar el analizador sintáctico grok, define un filtro de consulta. Sólo se procesan los logs que coincidan con la [consulta de filtro] especificada (#filter-query-syntax). Todos los logs, independientemente de si coinciden o no con la consulta de filtro, se envían al siguiente paso del pipeline.

Para probar muestras de log para las reglas predefinidas:

  1. Haz clic en el botón Preview Library Rules (Previsualizar reglas de biblioteca).
  2. Busca o selecciona un origen en el menú desplegable.
  3. Introduce una muestra de log para probar las reglas de parseo para ese origen.

Para añadir una regla personalizada de parseo:

  1. Haz clic en Add Custom Rule (Añadir regla personalizada).
  2. Si deseas clonar una regla de biblioteca, selecciona Clone library rule (Clonar regla de biblioteca) y, a continuación, el origen de biblioteca en el menú desplegable.
  3. Si deseas crear una regla personalizada, selecciona Custom (Personalizada) y, a continuación, introduce el source. Las reglas de parseo se aplican a logs con ese source.
  4. Introduce muestras de log para probar las reglas de parseo.
  5. Introduce las reglas para el parseo de los logs. Consulta Parseo para obtener más información sobre la escritura de reglas de parseo.
    Nota: Los filtros url, useragent y csv no están disponibles.
  6. Haz clic en Advanced Settings (Configuración avanzada) si deseas añadir reglas auxiliares. Consulta Uso de reglas auxiliares para factorizar varias reglas de parseo para obtener más información.
  7. Haz clic en Add Rule (Añadir regla).

This processor converts the specified field into JSON objects.

To set up this processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the name of the field you want to parse JSON on.
    Note: The parsed JSON overwrites what was originally contained in the field.

This processor parses Extensible Markup Language (XML) so the data can be processed and sent to different destinations. XML is a log format used to store and transport structured data. It is organized in a tree-like structure to represent nested information and uses tags and attributes to define the data. For example, this is XML data using only tags (<recipe>,<type>, and <name>) and no attributes:


This is an XML example where the tag recipe has the attribute type:

    <recipe type="pasta">

To set up this processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they match the filter query, are sent to the next step in the pipeline.
  2. Enter the path to the log field on which you want to parse XML. Use the path notation <OUTER_FIELD>.<INNER_FIELD> to match subfields. See the Path notation example below.
  3. Optionally, in the Enter text key field, input the key name to use for the text node when XML attributes are appended. See the text key example. If the field is left empty, value is used as the key name.
  4. Optionally, select Always use text key if you want to store text inside an object using the text key even when no attributes exist.
  5. Optionally, toggle Include XML attributes on if you want to include XML attributes. You can then choose to add the attribute prefix you want to use. See attribute prefix example. If the field is left empty, the original attribute key is used.
  6. Optionally, select if you want to convert data types into numbers, Booleans, or nulls.
    • If Numbers is selected, numbers are parsed as integers and floats.
    • If Booleans is selected, true and false are parsed as Booleans.
    • If Nulls is selected, the string null is parsed as null.
Path notation example

For the following message structure, use outer_key.inner_key.double_inner_key to refer to the key with the value double_inner_value.

    "outer_key": {
        "inner_key": "inner_value",
        "a": {
            "double_inner_key": "double_inner_value",
            "b": "b value"
        "c": "c value"
    "d": "d value"
Always use text key example

If Always use text key is selected, the text key is the default (value), and you have the following XML:

    <recipe type="pasta">

The XML is converted to:

    "recipe": {
        "type": "pasta",
        "value": "Carbonara"
Text key example

If the key is text and you have the following XML:

    <recipe type="pasta">

The XML is converted to:

    "recipe": {
        "type": "pasta",
        "text": "Carbonara"
Attribute prefix example

If you enable Include XML attributes, the attribute is added as a prefix to each XML attribute. For example, if the attribute prefix is @ and you have the following XML:

<recipe type="pasta">Carbonara</recipe>

Then it is converted to the JSON:

    "recipe": {
        "@type": "pasta",
        "<text key>": "Carbonara"

The quota processor measures the logging traffic for logs that match the filter you specify. When the configured daily quota is met inside the 24-hour rolling window, the processor can either drop additional logs or send an alert using a Datadog monitor. You can configure the processor to track the total volume or the total number of events. The pipeline uses the name of the quota to identify the quota across multiple Remote Configuration deployments of the Worker.

As an example, you can configure this processor to drop new logs or trigger an alert without dropping logs after the processor has received 10 million events from a certain service in the last 24 hours.

To set up the quota processor:

  1. Enter a name for the quota processor.
  2. Define a filter query. Only logs that match the specified filter query are counted towards the daily limit.
    • Logs that match the quota filter and are within the daily quota are sent to the next step in the pipeline.
    • Logs that do not match the quota filter are sent to the next step of the pipeline.
  3. In the Unit for quota dropdown menu, select if you want to measure the quota by the number of Events or by the Volume in bytes.
  4. Set the daily quota limit and select the unit of magnitude for your desired quota.
  5. Check the Drop events checkbox if you want to drop all events when your quota is met. Leave it unchecked if you plan to set up a monitor that sends an alert when the quota is met.
    • If logs that match the quota filter are received after the daily quota has been met and the Drop events option is selected, then those logs are dropped. In this case, only logs that did not match the filter query are sent to the next step in the pipeline.
    • If logs that match the quota filter are received after the daily quota has been met and the Drop events option is not selected, then those logs and the logs that did not match the filter query are sent to the next step in the pipeline.
  6. Optional: Click Add Field if you want to set a quota on a specific service or region field.
    a. Enter the field name you want to partition by. See the Partition example for more information.
    i. Select the Ignore when missing if you want the quota applied only to events that match the partition. See the Ignore when missing example for more information.
    ii. Optional: Click Overrides if you want to set different quotas for the partitioned field.
    - Click Download as CSV for an example of how to structure the CSV.
    - Drag and drop your overrides CSV to upload it. You can also click Browse to select the file to upload it. See the Overrides example for more information.
    b. Click Add Field if you want to add another partition.


Partition example

Use Partition by if you want to set a quota on a specific service or region. For example, if you want to set a quota for 10 events per day and group the events by the service field, enter service into the Partition by field.

Example for the “ignore when missing” option

Select Ignore when missing if you want the quota applied only to events that match the partition. For example, if the Worker receives the following set of events:

{"service":"a", "source":"foo", "message": "..."}
{"service":"b", "source":"bar", "message": "..."}
{"service":"b", "message": "..."}
{"source":"redis", "message": "..."}
{"message": "..."}

And the Ignore when missing is selected, then the Worker:

  • creates a set for logs with service:a and source:foo
  • creates a set for logs with service:b and source:bar
  • ignores the last three events

The quota is applied to the two sets of logs and not to the last three events.

If the Ignore when missing is not selected, the quota is applied to all five events.

Overrides example

If you are partitioning by service and have two services: a and b, you can use overrides to apply different quotas for them. For example, if you want service:a to have a quota limit of 5,000 bytes and service:b to have a limit of 50 events, the override rules look like this:


The reduce processor groups multiple log events into a single log, based on the fields specified and the merge strategies selected. Logs are grouped at 10-second intervals. After the interval has elapsed for the group, the reduced log for that group is sent to the next step in the pipeline.

To set up the reduce processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. Reduced logs and logs that do not match the filter query are sent to the next step in the pipeline.
  2. In the Group By section, enter the field you want to group the logs by.
  3. Click Add Group by Field to add additional fields.
  4. In the Merge Strategy section:
    • In On Field, enter the name of the field you want to merge the logs on.
    • Select the merge strategy in the Apply dropdown menu. This is the strategy used to combine events. See the following Merge strategies section for descriptions of the available strategies.
    • Click Add Merge Strategy to add additional strategies.
Merge strategies

These are the available merge strategies for combining log events.

ArrayAppends each value to an array.
ConcatConcatenates each string value, delimited with a space.
Concat newlineConcatenates each string value, delimited with a newline.
Concat rawConcatenates each string value, without a delimiter.
DiscardDiscards all values except the first value that was received.
Flat uniqueCreates a flattened array of all unique values that were received.
Longest arrayKeeps the longest array that was received.
MaxKeeps the maximum numeric value that was received.
MinKeeps the minimum numeric value that was received.
RetainDiscards all values except the last value that was received. Works as a way to coalesce by not retaining `null`.
Shortest arrayKeeps the shortest array that was received.
SumSums all numeric values that were received.
The Remap to OCSF processor is in Preview. Complete this form to request access.

Use this processor to remap logs to Open Cybersecurity Schema Framework (OCSF) events. OCSF schema event classes are set for a specific log source and type. You can add multiple mappings to one processor. Note: Datadog recommends that the OCSF processor be the last processor in your pipeline, so that remapping is done after the logs have been processed by all the other processors.

To set up this processor:

Click Manage mappings. This opens a side panel:

  • If you have not added any mappings yet, enter the mapping parameters as described in Add a mapping.
  • If you have already added mappings, click on a mapping in the list to edit or delete it. Use the search bar to find a mapping by its name. Click Add Mapping to add another mapping.

Add a mapping

  1. Select the log type in the dropdown menu.
  2. Define a filter query. Only logs that match the specified filter query are remapped. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  3. Click Add Mapping.


These are the mappings available:

Log SourceLog TypeOCSF CategorySupported OCSF versions
AWS CloudTrailType: Management
EventName: ChangePassword
Account Change (3001)1.3.0
Google Cloud AuditSetIamPolicyAccount Change (3001)1.3.0
Google Cloud AuditCreateSinkAccount Change (3001)1.3.0
Google Cloud AuditUpdateSyncAccount Change (3001)1.3.0
Google Cloud AuditCreateBucketAccount Change (3001)1.3.0
GitHubCreate UserAccount Change (3001)1.1.0
Google Workspace AdminaddPrivilegeUser Account Management (3005)1.1.0
OktaUser session startAuthentication (3002)1.1.0
Palo Alto NetworksTrafficNetwork Activity (4001)1.1.0

This processor samples your logging traffic for a representative subset at the rate that you define, dropping the remaining logs. As an example, you can use this processor to sample 20% of logs from a noisy non-critical service.

The sampling only applies to logs that match your filter query and does not impact other logs. If a log is dropped at this processor, none of the processors below receives that log.

To set up the sample processor:

  1. Define a filter query. Only logs that match the specified filter query are sampled at the specified retention rate below. The sampled logs and the logs that do not match the filter query are sent to the next step in the pipeline.
  2. Set the retain field with your desired sampling rate expressed as a percentage. For example, entering 2 means 2% of logs are retained out of all the logs that match the filter query.

The Sensitive Data Scanner processor scans logs to detect and redact or hash sensitive information such as PII, PCI, and custom sensitive data. You can pick from Datadog’s library of predefined rules, or input custom Regex rules to scan for sensitive data.

To set up the processor:

  1. Define a filter query. Only logs that match the specified filter query are scanned and processed. All logs are sent to the next step in the pipeline, regardless of whether they match the filter query.
  2. Click Add Scanning Rule.
  3. Select one of the following:
  1. In the dropdown menu, select the library rule you want to use.
  2. Recommended keywords are automatically added based on the library rule selected. After the scanning rule has been added, you can add additional keywords or remove recommended keywords.
  3. In the Define rule target and action section, select if you want to scan the Entire Event, Specific Attributes, or Exclude Attributes in the dropdown menu.
    • If you are scanning the entire event, you can optionally exclude specific attributes from getting scanned. Use path notation (outer_key.inner_key) to access nested keys. For specified attributes with nested data, all nested data is excluded.
    • If you are scanning specific attributes, specify which attributes you want to scan. Use path notation (outer_key.inner_key) to access nested keys. For specified attributes with nested data, all nested data is scanned.
  4. For Define actions on match, select the action you want to take for the matched information. Note: Redaction, partial redaction, and hashing are all irreversible actions.
    • Redact: Replaces all matching values with the text you specify in the Replacement text field.
    • Partially Redact: Replaces a specified portion of all matched data. In the Redact section, specify the number of characters you want to redact and which part of the matched data to redact.
    • Hash: Replaces all matched data with a unique identifier. The UTF-8 bytes of the match are hashed with the 64-bit fingerprint of FarmHash.
  5. Optionally, click Add Field to add tags you want to associate with the matched events.
  6. Add a name for the scanning rule.
  7. Optionally, add a description for the rule.
  8. Click Save.
Path notation example

For the following message structure, use outer_key.inner_key.double_inner_key to refer to the key with the value double_inner_value.

    "outer_key": {
        "inner_key": "inner_value",
        "a": {
            "double_inner_key": "double_inner_value",
            "b": "b value"
        "c": "c value"
    "d": "d value"
Add additional keywords

After adding scanning rules from the library, you can edit each rule separately and add additional keywords to the keyword dictionary.

  1. Navigate to your pipeline.
  2. In the Sensitive Data Scanner processor with the rule you want to edit, click Manage Scanning Rules.
  3. Toggle Use recommended keywords if you want the rule to use them. Otherwise, add your own keywords to the Create keyword dictionary field. You can also require that these keywords be within a specified number of characters of a match. By default, keywords must be within 30 characters before a matched value.
  4. Click Update.
  1. In the Define match conditions section, specify the regex pattern to use for matching against events in the Define the regex field. Enter sample data in the Add sample data field to verify that your regex pattern is valid. Sensitive Data Scanner supports Perl Compatible Regular Expressions (PCRE), but the following patterns are not supported:
    • Backreferences and capturing sub-expressions (lookarounds)
    • Arbitrary zero-width assertions
    • Subroutine references and recursive patterns
    • Conditional patterns
    • Backtracking control verbs
    • The \C “single-byte” directive (which breaks UTF-8 sequences)
    • The \R newline match
    • The \K start of match reset directive
    • Callouts and embedded code
    • Atomic grouping and possessive quantifiers
  2. For Create keyword dictionary, add keywords to refine detection accuracy when matching regex conditions. For example, if you are scanning for a sixteen-digit Visa credit card number, you can add keywords like visa, credit, and card. You can also require that these keywords be within a specified number of characters of a match. By default, keywords must be within 30 characters before a matched value.
  3. In the Define rule target and action section, select if you want to scan the Entire Event, Specific Attributes, or Exclude Attributes in the dropdown menu.
    • If you are scanning the entire event, you can optionally exclude specific attributes from getting scanned. Use path notation (outer_key.inner_key) to access nested keys. For specified attributes with nested data, all nested data is excluded.
    • If you are scanning specific attributes, specify which attributes you want to scan. Use path notation (outer_key.inner_key) to access nested keys. For specified attributes with nested data, all nested data is scanned.
  4. For Define actions on match, select the action you want to take for the matched information. Note: Redaction, partial redaction, and hashing are all irreversible actions.
    • Redact: Replaces all matching values with the text you specify in the Replacement text field.
    • Partially Redact: Replaces a specified portion of all matched data. In the Redact section, specify the number of characters you want to redact and which part of the matched data to redact.
    • Hash: Replaces all matched data with a unique identifier. The UTF-8 bytes of the match is hashed with the 64-bit fingerprint of FarmHash.
  5. Optionally, click Add Field to add tags you want to associate with the matched events.
  6. Add a name for the scanning rule.
  7. Optionally, add a description for the rule.
  8. Click Add Rule.
Path notation example

For the following message structure, use outer_key.inner_key.double_inner_key to refer to the key with the value double_inner_value.

    "outer_key": {
        "inner_key": "inner_value",
        "a": {
            "double_inner_key": "double_inner_value",
            "b": "b value"
        "c": "c value"
    "d": "d value"

This processor splits nested arrays into distinct events so that you can query, filter, alert, and visualize data within an array. The arrays need to already be parsed. For example, the processor can process [item_1, item_2], but cannot process "[item_1, item2]". The items in the array can be JSON objects, strings, integers, floats, or Booleans. All unmodified fields are added to the child events. For example, if you are sending the following items to the Observability Pipelines Worker:

    "host": "my-host",
    "env": "prod",
    "batched_items": [item_1, item_2]

Use the Split Array processor to send each item in batched_items as a separate event:

    "host": "my-host",
    "env": "prod",
    "batched_items": item_1
    "host": "my-host",
    "env": "prod",
    "batched_items": item_2

See the split array example for a more detailed example.

To set up this processor:

Click Manage arrays to split to add an array to split or edit an existing array to split. This opens a side panel.

  • If you have not created any arrays yet, enter the array parameters as described in the Add a new array section below.
  • If you have already created arrays, click on the array’s row in the table to edit or delete it. Use the search bar to find a specific array, and then select the array to edit or delete it. Click Add Array to Split to add a new array.
Add a new array
  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they match the filter query, are sent to the next step in the pipeline.
  2. Enter the path to the array field. Use the path notation <OUTER_FIELD>.<INNER_FIELD> to match subfields. See the Path notation example below.
  3. Click Save.
Split array example

This is an example event:

    "ddtags": ["tag1", "tag2"],
    "host": "my-host",
    "env": "prod",
    "message": {
        "isMessage": true,
        "myfield" : {
            "firstarray":["one", 2]
    "secondarray": [
    }, 44]

If the processor is splitting the arrays "message.myfield.firstarray" and "secondarray", it outputs child events that are identical to the parent event, except for the values of "message.myfield.firstarray" and "secondarray", which becomes a single item from their respective original array. Each child event is a unique combination of items from the two arrays, so four child events (2 items * 2 items = 4 combinations) are created in this example.

    "ddtags": ["tag1", "tag2"],
    "host": "my-host",
    "env": "prod",
    "message": {
        "isMessage": true,
        "myfield" : {"timestamp":14500000, "firstarray":"one"},
    "secondarray": {
    "ddtags": ["tag1", "tag2"],
    "host": "my-host",
    "env": "prod",
    "message": {
        "isMessage": true,
        "myfield" : {"timestamp":14500000, "firstarray":"one"},
    "secondarray": 44
    "ddtags": ["tag1", "tag2"],
    "host": "my-host",
    "env": "prod",
    "message": {
        "isMessage": true,
        "myfield" : {"timestamp":14500000, "firstarray":2},
    "secondarray": {
    "ddtags": ["tag1", "tag2"],
    "host": "my-host",
    "env": "prod",
    "message": {
        "isMessage": true,
        "myfield" : {"timestamp":14500000, "firstarray":2},
    "secondarray": 44
Path notation example

For the following message structure, use outer_key.inner_key.double_inner_key to refer to the key with the value double_inner_value.

    "outer_key": {
        "inner_key": "inner_value",
        "a": {
            "double_inner_key": "double_inner_value",
            "b": "b value"
        "c": "c value"
    "d": "d value"

Add another set of processors and destinations

Click the plus sign (+) to the left of the processors to add another set of processors and destinations to the source. See Add additional destinations on adding additional destinations to the processor group.

To delete a processor group, you need to delete all destinations linked to that processor group. When the last destination is deleted, the processor group is removed with it.

Install the Observability Pipelines Worker

  1. Select your platform in the Choose your installation platform dropdown menu.
  2. Enter the Amazon Data Firehose address. The Observability Pipelines Worker listens to this address and port for incoming logs from Amazon Data Firehose.
  3. Provide the environment variables for each of your selected destinations. See Prerequisites for more information.
    1. Introduce el nombre de usuario de autenticación de Amazon OpenSearch.
    2. Introduce la contraseña de autenticación de Amazon OpenSearch.
    3. Introduce la URL del endpoint de Amazon OpenSearch. Por ejemplo, http://<hostname.IP>:9200.

    Enter the Google Chronicle endpoint URL. For example,

    1. Enter the CrowdStrike HEC ingestion URL.
    2. Enter the CrowdStrike HEC API key.

    There are no environment variables to configure for Datadog Log Management.

    For the Datadog Archives destination, follow the instructions for the cloud provider you are using to archive your logs.

    Enter the AWS access key ID and AWS secret access key for the S3 archive bucket you created earlier.

    There are no environment variables to configure.

    Enter the Azure connection string you created earlier. The connection string gives the Worker access to your Azure Storage bucket.

    To get the connection string:

    1. Navigate to Azure Storage accounts.
    2. Click Access keys under Security and networking in the left navigation menu.
    3. Copy the connection string for the storage account and paste it into the Azure connection string field on the Observability Pipelines Worker installation page.
    1. Enter the Elasticsearch authentication username.
    2. Enter the Elasticsearch authentication password.
    3. Enter the Elasticsearch endpoint URL. For example,
    1. Enter the data collection endpoint (DCE).
    2. Enter the client secret.
    1. Enter your New Relic account ID.
    2. Enter your New Relic license key.
    1. Enter the OpenSearch authentication username.
    2. Enter the OpenSearch authentication password.
    3. Enter the OpenSearch endpoint URL. For example, http://<hostname.IP>:9200.

    Enter your SentinelOne write access token. To find your write access token:

    1. Log into the S1 console.
    2. Navigate to the Singularity Data Lake (SDL) API Keys page. To access it from the console, click Visibility on the left menu to go to SDL. Click on your username and then API Keys.
    3. Copy the Logs Access write key and paste it into the SentinelOne Write Access Token field on the Install Observability Pipelines Worker page.

    After you’ve installed the Observability Pipelines Worker and finished setting up the pipeline, see View logs in a SentinelOne cluster for instructions on how to see the logs you sent from Observability Pipelines to the SentinelOne destination.

    Enter your Splunk HEC token and the base URL of the Splunk instance. See prerequisites for more information.

    The Worker passes the HEC token to the Splunk collection endpoint. After the Observability Pipelines Worker processes the logs, it sends the logs to the specified Splunk instance URL.

    Note: The Splunk HEC destination forwards all logs to the /services/collector/event endpoint regardless of whether you configure your Splunk HEC destination to encode your output in JSON or raw.

    Enter the Sumo Logic HTTP collector URL. See prerequisites for more information.

    Enter the rsyslog or syslog-ng endpoint URL. For example, The Observability Pipelines Worker sends logs to this address and port.

  4. Follow the instructions for your environment to install the Worker.
    1. Click Select API key to choose the Datadog API key you want to use.
    2. Run the command provided in the UI to install the Worker. The command is automatically populated with the environment variables you entered earlier.
      docker run -i -e DD_API_KEY=<DATADOG_API_KEY> \
          -e DD_SITE=<DATADOG_SITE> \
          -e <SOURCE_ENV_VARIABLE> \
          -p 8088:8088 \
          datadog/observability-pipelines-worker run
      Note: By default, the docker run command exposes the same port the Worker is listening on. If you want to map the Worker’s container port to a different port on the Docker host, use the -p | --publish option in the command:
      -p 8282:8088 datadog/observability-pipelines-worker run
    3. Navigate back to the Observability Pipelines installation page and click Deploy.

    See Update Existing Pipelines if you want to make changes to your pipeline’s configuration.

    1. Descarga el archivo de valores del Helm chart para Amazon EKS.
    2. Haz clic en Select API key (Seleccionar clave de API) para elegir la clave de API de Datadog que quieres utilizar.
    3. Añade el repositorio de gráficos de Datadog a Helm:
      helm repo add datadog
      Si ya dispones del repositorio de gráficos de Datadog, ejecuta el siguiente comando para asegurarte de que está actualizado:
      helm repo update
    4. Ejecuta el comando proporcionado en la interfaz de usuario para instalar el worker. El comando se rellena automáticamente con las variables de entorno que introdujiste anteriormente.
      helm upgrade --install opw \
      -f aws_eks.yaml \
      --set datadog.apiKey=<DATADOG_API_KEY> \
      --set datadog.pipelineId=<PIPELINE_ID> \
      --set <SOURCE_ENV_VARIABLES> \
      --set service.ports[0].protocol=TCP,service.ports[0].port=<SERVICE_PORT>,service.ports[0].targetPort=<TARGET_PORT> \
      Nota: Por defecto, el servicio de Kubernetes Service asigna el puerto entrante <SERVICE_PORT> al puerto en el que escuchas al worker (<TARGET_PORT>). Si quieres asignar el puerto del pod del worker a un puerto entrante diferente de Kubernetes Service, utiliza los siguientes valores service.ports[0].port y service.ports[0].targetPort en el comando:
      --set service.ports[0].protocol=TCP,service.ports[0].port=8088,service.ports[0].targetPort=8282
    5. Vuelve a la página de instalación de Observability Pipelines y haz clic en Deploy (Desplegar).

    Si quieres realizar cambios en la configuración de tu pipeline, consulta Actualizar pipelines existentes.

    1. Descarga el archivo de valores del Helm chart para Azure AKS.
    2. Haz clic en Select API key (Seleccionar clave de API) para elegir la clave de API Datadog que quieres utilizar.
    3. Añade el repositorio de gráficos de Datadog a Helm:
      helm repo add datadog
      Si ya dispones del repositorio de gráficos de Datadog, ejecuta el siguiente comando para asegurarte de que está actualizado:
      helm repo update
    4. Ejecuta el comando proporcionado en la interfaz de usuario para instalar el worker. El comando se rellena automáticamente con las variables de entorno que introdujiste anteriormente.
      helm upgrade --install opw \
      -f azure_aks.yaml \
      --set datadog.apiKey=<DATADOG_API_KEY> \
      --set datadog.pipelineId=<PIPELINE_ID> \
      --set <SOURCE_ENV_VARIABLES> \
      --set service.ports[0].protocol=TCP,service.ports[0].port=<SERVICE_PORT>,service.ports[0].targetPort=<TARGET_PORT> \
      Nota: Por defecto, Kubernetes Service asigna el puerto entrante <SERVICE_PORT> al puerto en el que escuchas al worker (<TARGET_PORT>). Si quieres asignar el puerto del pod del worker a un puerto entrante diferente de Kubernetes Service, utiliza los siguientes valores service.ports[0].port y service.ports[0].targetPort en el comando:
      --set service.ports[0].protocol=TCP,service.ports[0].port=8088,service.ports[0].targetPort=8282
    5. Vuelve a la página de instalación de Observability Pipelines y haz clic en Deploy (Desplegar).

    Si quieres realizar cambios en la configuración de tu pipeline, consulta Actualizar pipelines existentes.

    1. Download the Helm chart values file for Google GKE.
    2. Click Select API key to choose the Datadog API key you want to use.
    3. Add the Datadog chart repository to Helm:
      helm repo add datadog
      If you already have the Datadog chart repository, run the following command to make sure it is up to date:
      helm repo update
    4. Run the command provided in the UI to install the Worker. The command is automatically populated with the environment variables you entered earlier.
      helm upgrade --install opw \
      -f google_gke.yaml \
      --set datadog.apiKey=<DATADOG_API_KEY> \
      --set datadog.pipelineId=<PIPELINE_ID> \
      --set <SOURCE_ENV_VARIABLES> \
      --set service.ports[0].protocol=TCP,service.ports[0].port=<SERVICE_PORT>,service.ports[0].targetPort=<TARGET_PORT> \
      Note: By default, the Kubernetes Service maps incoming port <SERVICE_PORT> to the port the Worker is listening on (<TARGET_PORT>). If you want to map the Worker’s pod port to a different incoming port of the Kubernetes Service, use the following service.ports[0].port and service.ports[0].targetPort values in the command:
      --set service.ports[0].protocol=TCP,service.ports[0].port=8088,service.ports[0].targetPort=8282
    5. Navigate back to the Observability Pipelines installation page and click Deploy.

    See Update Existing Pipelines if you want to make changes to your pipeline’s configuration.

    1. Click Select API key to choose the Datadog API key you want to use.

    2. Run the one-step command provided in the UI to install the Worker.

      Note: The environment variables used by the Worker in /etc/default/observability-pipelines-worker are not updated on subsequent runs of the install script. If changes are needed, update the file manually and restart the Worker.

    If you prefer not to use the one-line installation script, follow these step-by-step instructions:

    1. Set up APT transport for downloading using HTTPS:
      sudo apt-get update
      sudo apt-get install apt-transport-https curl gnupg
    2. Run the following commands to set up the Datadog deb repo on your system and create a Datadog archive keyring:
      sudo sh -c "echo 'deb [signed-by=/usr/share/keyrings/datadog-archive-keyring.gpg] stable observability-pipelines-worker-2' > /etc/apt/sources.list.d/datadog-observability-pipelines-worker.list"
      sudo touch /usr/share/keyrings/datadog-archive-keyring.gpg
      sudo chmod a+r /usr/share/keyrings/datadog-archive-keyring.gpg
      curl | sudo gpg --no-default-keyring --keyring /usr/share/keyrings/datadog-archive-keyring.gpg --import --batch
      curl | sudo gpg --no-default-keyring --keyring /usr/share/keyrings/datadog-archive-keyring.gpg --import --batch
      curl | sudo gpg --no-default-keyring --keyring /usr/share/keyrings/datadog-archive-keyring.gpg --import --batch
      curl | sudo gpg --no-default-keyring --keyring /usr/share/keyrings/datadog-archive-keyring.gpg --import --batch
    3. Run the following commands to update your local apt repo and install the Worker:
      sudo apt-get update
      sudo apt-get install observability-pipelines-worker datadog-signing-keys
    4. Add your keys, site (for example, for US1), source, and destination environment variables to the Worker’s environment file:
      sudo cat &lt;<EOF > /etc/default/observability-pipelines-worker
    5. Start the worker:
      sudo systemctl restart observability-pipelines-worker

    See Update Existing Pipelines if you want to make changes to your pipeline’s configuration.

    For RHEL and CentOS, the Observability Pipelines Worker supports versions 8.0 or later.
    1. Click Select API key to choose the Datadog API key you want to use.

    2. Run the one-step command provided in the UI to install the Worker.

      Note: The environment variables used by the Worker in /etc/default/observability-pipelines-worker are not updated on subsequent runs of the install script. If changes are needed, update the file manually and restart the Worker.

    If you prefer not to use the one-line installation script, follow these step-by-step instructions:

    1. Set up the Datadog rpm repo on your system with the below command. Note: If you are running RHEL 8.1 or CentOS 8.1, use repo_gpgcheck=0 instead of repo_gpgcheck=1 in the configuration below.
      cat &lt;<EOF > /etc/yum.repos.d/datadog-observability-pipelines-worker.repo
      name = Observability Pipelines Worker
      baseurl =\$basearch/

    2. Update your packages and install the Worker:
      sudo yum makecache
      sudo yum install observability-pipelines-worker
    3. Add your keys, site (for example, for US1), source, and destination environment variables to the Worker’s environment file:
      sudo cat &lt;&lt;-EOF > /etc/default/observability-pipelines-worker
    4. Start the worker:
      sudo systemctl restart observability-pipelines-worker
    5. Navigate back to the Observability Pipelines installation page and click Deploy.

    See Update Existing Pipelines if you want to make changes to your pipeline’s configuration.

    1. Select one of the options in the dropdown to provide the expected log volume for the pipeline:

      UnsureUse this option if you are not able to project the log volume or you want to test the Worker. This option provisions the EC2 Auto Scaling group with a maximum of 2 general purpose t4g.large instances.
      1-5 TB/dayThis option provisions the EC2 Auto Scaling group with a maximum of 2 compute optimized instances c6g.large.
      5-10 TB/dayThis option provisions the EC2 Auto Scaling group with a minimum of 2 and a maximum of 5 compute optimized c6g.large instances.
      >10 TB/dayDatadog recommends this option for large-scale production deployments. It provisions the EC2 Auto Scaling group with a minimum of 2 and a maximum of 10 compute optimized c6g.xlarge instances.

      Note: All other parameters are set to reasonable defaults for a Worker deployment, but you can adjust them for your use case as needed in the AWS Console before creating the stack.

    2. Select the AWS region you want to use to install the Worker.

    3. Click Select API key to choose the Datadog API key you want to use.

    4. Click Launch CloudFormation Template to navigate to the AWS Console to review the stack configuration and then launch it. Make sure the CloudFormation parameters are as expected.

    5. Select the VPC and subnet you want to use to install the Worker.

    6. Review and check the necessary permissions checkboxes for IAM. Click Submit to create the stack. CloudFormation handles the installation at this point; the Worker instances are launched, the necessary software is downloaded, and the Worker starts automatically.

    7. Navigate back to the Observability Pipelines installation page and click Deploy.

    See Update Existing Pipelines if you want to make changes to your pipeline’s configuration.

Send logs to the Observability Pipelines Worker

Since Amazon Data Firehose can only deliver data over HTTP to an HTTPS URL, when you deploy the Observability Pipelines Worker, you need to deploy it with a publicly exposed endpoint and solve TLS termination. To solve TLS termination, you can front OPW with a load balancer or configure TLS options. See Understand HTTP endpoint delivery request and response specifications for more information.

To send logs to the Observability Pipelines Worker, set up an Amazon Data Firehose stream with an HTTP endpoint destination in the region where your logs are. Configure the endpoint URL to the endpoint where OPW is deployed.