This product is not supported for your selected Datadog site. ().
このページは日本語には対応しておりません。随時翻訳に取り組んでいます。
翻訳に関してご質問やご意見ございましたら、お気軽にご連絡ください

Overview

Sensitive data, such as credit card numbers, bank routing numbers, and API keys, can be revealed unintentionally in your logs, which can expose your organization to financial and privacy risks.

Use Observability Pipelines to identify, tag, and optionally redact or hash sensitive information before routing logs to different destinations and outside of your infrastructure. You can use out-of-the-box scanning rules to detect common patterns such as email addresses, credit card numbers, API keys, authorization tokens, and more. Or, create custom scanning rules using regex patterns to match sensitive information.

The log sources, processors, and destinations available for this use case

This document walks you through the following:

  1. The prerequisites needed to set up Observability Pipelines
  2. Setting up Observability Pipelines
  3. Sending logs to the Observability Pipelines Worker

Prerequisites

To use Observability Pipelines’ Logstash source, you need the following information available:

  • Logstash address, such as 0.0.0.0:8088. The Observability Pipelines Worker listens on this bind address to receive logs from your applications. Later on, you configure your applications to send logs to this address.
  • The appropriate TLS certificates and the password you used to create your private key, if your forwarders are globally configured to enable SSL.

Set up Observability Pipelines

  1. Navigate to Observability Pipelines.
  2. Select the Sensitive Data Redactions template to create a new pipeline.
  3. Select the Logstash source.

Set up the source

Optionally, toggle the switch to enable TLS. If you enable TLS, the following certificate and key files are required.
Note: All file paths are made relative to the configuration data directory, which is /var/lib/observability-pipelines-worker/config/ by default. See Advanced Configurations for more information. The file must be owned by the observability-pipelines-worker group and observability-pipelines-worker user, or at least readable by the group or user.

  • Server Certificate Path: The path to the certificate file that has been signed by your Certificate Authority (CA) Root File in DER or PEM (X.509).
  • CA Certificate Path: The path to the certificate file that is your Certificate Authority (CA) Root File in DER or PEM (X.509).
  • Private Key Path: The path to the .key private key file that belongs to your Server Certificate Path in DER or PEM (PKCS #8) format.

Set up the destinations

Enter the following information based on your selected logs destinations.

  1. Optionally, enter the name of the Amazon OpenSearch index. See template syntax if you want to route logs to different indexes based on specific fields in your logs.
  2. Select an authentication strategy, Basic or AWS. For AWS, enter the AWS region.
  3. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.
Prerequisites
  1. Follow the Getting Started with Amazon Security Lake to set up Amazon Security Lake, and make sure to:
    • Enable Amazon Security Lake for the AWS account.
    • Select the AWS regions where S3 buckets will be created for OCSF data.
  2. Follow Collecting data from custom sources in Security Lake to create a custom source in Amazon Security Lake.
    • When you configure a custom log source in Security Lake in the AWS console:
      • Enter a source name.
      • Select the OCSF event class for the log source and type.
      • Enter the account details for the AWS account that will write logs to Amazon Security Lake:
        • AWS account ID
        • External ID
    • Select Create and use a new service for service access.
    • Take note of the name of the bucket that is created because you need it when you set up the Amazon Security Lake destination later on.
      • To find the bucket name, navigate to Custom Sources. The bucket name is in the location for your custom source. For example, if the location is s3://aws-security-data-lake-us-east-2-qjh9pr8hy/ext/op-api-activity-test, the bucket name is aws-security-data-lake-us-east-2-qjh9pr8hy.
Set up the destination
  1. Enter your S3 bucket name.
  2. Enter the AWS region.
  3. Enter the custom source name.
  4. Optionally, select an AWS authentication option.
    1. Enter the ARN of the IAM role you want to assume.
    2. Optionally, enter the assumed role session name and external ID.
  5. Optionally, toggle the switch to enable TLS. If you enable TLS, the following certificate and key files are required.
    Note: All file paths are made relative to the configuration data directory, which is /var/lib/observability-pipelines-worker/config/ by default. See Advanced Configurations for more information. The file must be owned by the observability-pipelines-worker group and observability-pipelines-worker user, or at least readable by the group or user.
    • Server Certificate Path: The path to the certificate file that has been signed by your Certificate Authority (CA) Root File in DER or PEM (X.509).
    • CA Certificate Path: The path to the certificate file that is your Certificate Authority (CA) Root File in DER or PEM (X.509).
    • Private Key Path: The path to the .key private key file that belongs to your Server Certificate Path in DER or PEM (PKCS#8) format.
  6. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.

Notes:

  • When you add the Amazon Security Lake destination, the OCSF processor is automatically added so that you can convert your logs to Parquet before they are sent to Amazon Security Lake. See Remap to OCSF documentation for setup instructions.
  • Only logs formatted by the OCSF processor are converted to Parquet.

To set up the Worker’s Google Chronicle destination:

  1. Enter the customer ID for your Google Chronicle instance.
  2. If you have a credentials JSON file, enter the path to your credentials JSON file. The credentials file must be placed under DD_OP_DATA_DIR/config. Alternatively, you can use the GOOGLE_APPLICATION_CREDENTIALS environment variable to provide the credential path.
  3. Select JSON or Raw encoding in the dropdown menu.
  4. Enter the log type. See template syntax if you want to route logs to different log types based on specific fields in your logs.
  5. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.

Note: Logs sent to the Google Chronicle destination must have ingestion labels. For example, if the logs are from a A10 load balancer, it must have the ingestion label A10_LOAD_BALANCER. See Google Cloud’s Support log types with a default parser for a list of available log types and their respective ingestion labels.

To use the CrowdStrike NG-SIEM destination, you need to set up a CrowdStrike data connector using the HEC/HTTP Event Connector. See Step 1: Set up the HEC/HTTP event data connector for instructions. When you set up the data connector, you are given a HEC API key and URL, which you use when you configure the Observability Pipelines Worker later on.

  1. Select JSON or Raw encoding in the dropdown menu.
  2. Optionally, enable compressions and select an algorithm (gzip or zlib) in the dropdown menu.
  3. Optionally, toggle the switch to enable TLS. If you enable TLS, the following certificate and key files are required.
    Note: All file paths are made relative to the configuration data directory, which is /var/lib/observability-pipelines-worker/config/ by default. See Advanced Configurations for more information. The file must be owned by the observability-pipelines-worker group and observability-pipelines-worker user, or at least readable by the group or user.
    • Server Certificate Path: The path to the certificate file that has been signed by your Certificate Authority (CA) Root File in DER or PEM (X.509).
    • CA Certificate Path: The path to the certificate file that is your Certificate Authority (CA) Root File in DER or PEM (X.509).
    • Private Key Path: The path to the .key private key file that belongs to your Server Certificate Path in DER or PEM (PKCS#8) format.
  4. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.
  1. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.
If the Worker is ingesting logs that are not coming from the Datadog Agent and are shipped to an archive using the Observability Pipelines Datadog Archives destination, those logs are not tagged with reserved attributes. In addition, logs rehydrated into Datadog will not have standard attributes mapped. This means that when you rehydrate your logs into Log Management, you may lose Datadog telemetry, the ability to search logs easily, and the benefits of unified service tagging if you do not structure and remap your logs in Observability Pipelines before routing your logs to an archive.

For example, say your syslogs are sent to Datadog Archives and those logs have the status tagged as severity instead of the reserved attribute of status and the host tagged as host-name instead of the reserved attribute hostname. When these logs are rehydrated in Datadog, the status for each log is set to info and none of the logs have a hostname tag.

If you do not have a Datadog Log Archive configured for Observability Pipelines, configure a Log Archive for your cloud provider (Amazon S3, Google Cloud Storage, or Azure Storage).

Note: You need to have the Datadog integration for your cloud provider installed to set up Datadog Log Archives. See the AWS integration, Google Cloud Platform, and Azure integration documentation for more information.

To set up the destination, follow the instructions for the cloud provider you are using to archive your logs.

  1. Enter your S3 bucket name. If you configured Log Archives, it’s the name of the bucket you created earlier.
  2. Enter the AWS region the S3 bucket is in.
  3. Enter the key prefix.
    • Prefixes are useful for partitioning objects. For example, you can use a prefix as an object key to store objects under a particular directory. If using a prefix for this purpose, it must end in / to act as a directory path; a trailing / is not automatically added.
    • See template syntax if you want to route logs to different object keys based on specific fields in your logs.
    • Note: Datadog recommends that you start your prefixes with the directory name and without a lead slash (/). For example, app-logs/ or service-logs/.
  4. Select the storage class for your S3 bucket in the Storage Class dropdown menu. If you are going to archive and rehydrate your logs:
  5. Optionally, select an AWS authentication option. If you are only using the user or role you created earlier for authentication, do not select Assume role. The Assume role option should only be used if the user or role you created earlier needs to assume a different role to access the specific AWS resource and that permission has to be explicitly defined.
    If you select Assume role:
    1. Enter the ARN of the IAM role you want to assume.
    2. Optionally, enter the assumed role session name and external ID.
  6. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.

Example destination and log archive setup

If you enter the following values for your Amazon S3 destination:

  • S3 Bucket Name: test-op-bucket
  • Prefix to apply to all object keys: op-logs
  • Storage class for the created objects: Standard
The Amazon S3 destination setup with the example values

Then these are the values you enter for configuring the S3 bucket for Log Archives:

  • S3 bucket: test-op-bucket
  • Path: op-logs
  • Storage class: Standard
The log archive configuration with the example values
  1. Enter the name of your Google Cloud storage bucket. If you configured Log Archives, it’s the bucket you created earlier.
  2. If you have a credentials JSON file, enter the path to your credentials JSON file. If you configured Log Archives it’s the credentials you downloaded earlier. The credentials file must be placed under DD_OP_DATA_DIR/config. Alternatively, you can use the GOOGLE_APPLICATION_CREDENTIALS environment variable to provide the credential path.
  3. Select the storage class for the created objects.
  4. Select the access level of the created objects.
  5. Optionally, enter in the prefix.
    • Prefixes are useful for partitioning objects. For example, you can use a prefix as an object key to store objects under a particular directory. If using a prefix for this purpose, it must end in / to act as a directory path; a trailing / is not automatically added.
    • See template syntax if you want to route logs to different object keys based on specific fields in your logs.
    • Note: Datadog recommends that you start your prefixes with the directory name and without a lead slash (/). For example, app-logs/ or service-logs/.
  6. Optionally, click Add Header to add metadata.
  7. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.
  1. Enter the name of the Azure container you created earlier.
  2. Optionally, enter a prefix.
    • Prefixes are useful for partitioning objects. For example, you can use a prefix as an object key to store objects under a particular directory. If using a prefix for this purpose, it must end in / to act as a directory path; a trailing / is not automatically added.
    • See template syntax if you want to route logs to different object keys based on specific fields in your logs.
    • Note: Datadog recommends that you start your prefixes with the directory name and without a lead slash (/). For example, app-logs/ or service-logs/.
  3. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.

The following fields are optional:

  1. Enter the name for the Elasticsearch index. See template syntax if you want to route logs to different indexes based on specific fields in your logs.
  2. Enter the Elasticsearch version.
  3. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.

Prerequisites

To set up the Microsoft Sentinel destination, you need to create a Workspace in Azure if you haven’t already. In that workspace:

  1. Add Microsoft Sentinel to the workspace.
  2. Create a Data Collection Endpoint (DCE).
  3. Create a Log Analytics Workspace in the workspace if you haven’t already.
  4. In the Log Analytics Workspace, navigate to Settings > Tables.
    1. Click + Create.
    2. Define a custom table (for example, MyOPWLogs).
      • Notes:
        - After the table is configured, the prefix Custom- and suffix _CL are automatically appended to the table name. For example, if you defined the table name in Azure to be MyOPWLogs, the full table name is stored as Custom-MyOPWLogs_CL. You must use the full table name when you set up the Observability Pipelines Microsoft Sentinel destination.
        -The full table name can be found in the resource JSON of the DCR under streamDeclarations.
        - You can also use an Azure Table instead of a custom table.
    3. Select New Custom Log (DCR-based).
    4. Click Create a new data collection rule and select the DCE you created earlier.
    5. Click Next.
    6. Upload a sample JSON Log. For this example, the following JSON is used for the Schema and Transformation, where TimeGenerated is required:
      {
          "TimeGenerated": "2024-07-22T11:47:51Z",
          "event": {}
      }
      
    7. Click Create.
  5. In Azure, navigate to Microsoft Entra ID.
    1. Click Add > App Registration.
    2. Click Create.
    3. On the overview page, click Client credentials: Add a certificate or secret.
    4. Click New client secret.
    5. Enter a name for the secret and click Add. Note: Make sure to take note of the client secret, which gets obfuscated after 10 minutes.
    6. Also take note of the Tenant ID and Client ID. You need this information, along with the client secret, when you set up the Observability Pipelines Microsoft Sentinel destination.
  6. In Azure Portal’s Data Collection Rules page, search for and select the DCR you created earlier.
    1. Click Access Control (IAM) in the left nav.
    2. Click Add and select Add role assignment.
    3. Add the Monitoring Metrics Publisher role.
    4. On the Members page, select User, group, or service principal.
    5. Click Select Members and search for the application you created in the app registration step.
    6. Click Review + Assign. Note: It can take up to 10 minutes for the IAM change to take effect.

The table below summarizes the Azure and Microsoft Sentinel information you need when you set up the Observability Pipelines Microsoft Sentinel destination:

NameDescription
Application (client) IDThe Azure Active Directory (AD) application’s client ID. See Register an application in Microsoft Entra ID for more information.
Example: 550e8400-e29b-41d4-a716-446655440000
Directory (tenant) IDThe Azure AD tenant ID. See Register an application in Microsoft Entra ID for more information.
Example: 72f988bf-86f1-41af-91ab-2d7cd011db47
Table (Stream) NameThe name of the stream which matches the table chosen when configuring the Data Collection Rule (DCR). Note: The full table name can be found in the resource JSON of the DCR under streamDeclarations.
Example: Custom-MyOPWLogs_CL
Data Collection Rule (DCR) immutable IDThis is the immutable ID of the DCR where logging routes are defined. It is the Immutable ID shown on the DCR Overview page.
Note: Ensure the Monitoring Metrics Publisher role is assigned in the DCR IAM settings.
Example: dcr-000a00a000a00000a000000aa000a0aa
See Data collection rules (DCRs) in Azure Monitor to learn more about creating or viewing DCRs.

Set up the destination in Observability Pipelines

To set up the Microsoft Sentinel destination in Observability Pipelines:

  1. Enter the client ID for your application, such as 550e8400-e29b-41d4-a716-446655440000.
  2. Enter the directory ID for your tenant, such as 72f988bf-86f1-41af-91ab-2d7cd011db47. This is the Azure AD tenant ID.
  3. Enter the full table name to which you are sending logs. An example table name: Custom-MyOPWLogs_CL.
  4. Enter the Data Collection Rule (DCR) immutable ID, such as dcr-000a00a000a00000a000000aa000a0aa.
  5. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.
  1. Select the data center region (US or EU) of your New Relic account.
  2. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.
  1. Optionally, enter the name of the OpenSearch index. See template syntax if you want to route logs to different indexes based on specific fields in your logs.
  2. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.
  1. Select your SentinelOne logs environment in the dropdown menu.
  2. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.
  1. In the Mode dropdown menu, select the socket type to use.
  2. In the Encoding dropdown menu, select either JSON or Raw message as the output format.
  3. Optionally, toggle the switch to enable TLS. If you enable TLS, the following certificate and key files are required:
    • Server Certificate Path: The path to the certificate file that has been signed by your Certificate Authority (CA) Root File in DER or PEM (X.509).
    • CA Certificate Path: The path to the certificate file that is your Certificate Authority (CA) Root File in DER or PEM (X.509).
    • Private Key Path: The path to the .key private key file that belongs to your Server Certificate Path in DER or PEM (PKCS#8) format.
  4. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.
Observability Pipelines compresses logs with the gzip (level 6) algorithm.

The following fields are optional:

  1. Enter the name of the Splunk index you want your data in. This has to be an allowed index for your HEC. See template syntax if you want to route logs to different indexes based on specific fields in your logs.
  2. Select whether the timestamp should be auto-extracted. If set to true, Splunk extracts the timestamp from the message with the expected format of yyyy-mm-dd hh:mm:ss.
  3. Optionally, set the sourcetype to override Splunk’s default value, which is httpevent for HEC data. See template syntax if you want to route logs to different source types based on specific fields in your logs.
  4. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.

The following fields are optional:

  1. In the Encoding dropdown menu, select whether you want to encode your pipeline’s output in JSON, Logfmt, or Raw text. If no decoding is selected, the decoding defaults to JSON.
  2. Enter a source name to override the default name value configured for your Sumo Logic collector’s source.
  3. Enter a host name to override the default host value configured for your Sumo Logic collector’s source.
  4. Enter a category name to override the default category value configured for your Sumo Logic collector’s source.
  5. Click Add Header to add any custom header fields and values.
  6. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.
The rsyslog and syslog-ng destinations support the RFC5424 format.

The rsyslog and syslog-ng destinations match these log fields to the following Syslog fields:

Log EventSYSLOG FIELDDefault
log[“message”]MESSAGENIL
log[“procid”]PROCIDThe running Worker’s process ID.
log[“appname”]APP-NAMEobservability_pipelines
log[“facility”]FACILITY8 (log_user)
log[“msgid”]MSGIDNIL
log[“severity”]SEVERITYinfo
log[“host”]HOSTNAMENIL
log[“timestamp”]TIMESTAMPCurrent UTC time.

The following destination settings are optional:

  1. Toggle the switch to enable TLS. If you enable TLS, the following certificate and key files are required:
    • Server Certificate Path: The path to the certificate file that has been signed by your Certificate Authority (CA) Root File in DER or PEM (X.509).
    • CA Certificate Path: The path to the certificate file that is your Certificate Authority (CA) Root File in DER or PEM (X.509).
    • Private Key Path: The path to the .key private key file that belongs to your Server Certificate Path in DER or PEM (PKCS#8) format.
  2. Enter the number of seconds to wait before sending TCP keepalive probes on an idle connection.
  3. Optionally, toggle the switch to enable Buffering Options.
    Note: Buffering options is in Preview. Contact your account manager to request access.
    • If left disabled, the maximum size for buffering is 500 events.
    • If enabled:
      1. Select the buffer type you want to set (Memory or Disk).
      2. Enter the buffer size and select the unit.

Add additional destinations

Click the plus sign (+) to the left of the destinations to add additional destinations to the same set of processors.

To delete a destination, click on the pencil icon to the top right of the destination, and select Delete destination. If you delete a destination from a processor group that has multiple destinations, only the deleted destination is removed. If you delete a destination from a processor group that only has one destination, both the destination and the processor group are removed.

Notes:

  • A pipeline must have at least one destination. If a processor group only has one destination, that destination cannot be deleted.
  • You can add a total of three destinations for a pipeline.
  • A specific destination can only be added once. For example, you cannot add multiple Splunk HEC destinations.

Set up processors

There are pre-selected processors added to your processor group out of the box. You can add additional processors or delete any existing ones based on your processing needs.

Processor groups are executed from top to bottom. The order of the processors is important because logs are checked by each processor, but only logs that match the processor’s filters are processed. To modify the order of the processors, use the drag handle on the top left corner of the processor you want to move.

Filter query syntax

Each processor has a corresponding filter query in their fields. Processors only process logs that match their filter query. And for all processors except the filter processor, logs that do not match the query are sent to the next step of the pipeline. For the filter processor, logs that do not match the query are dropped.

For any attribute, tag, or key:value pair that is not a reserved attribute, your query must start with @. Conversely, to filter reserved attributes, you do not need to append @ in front of your filter query.

For example, to filter out and drop status:info logs, your filter can be set as NOT (status:info). To filter out and drop system-status:info, your filter must be set as NOT (@system-status:info).

Filter query examples:

  • NOT (status:debug): This filters for only logs that do not have the status DEBUG.
  • status:ok service:flask-web-app: This filters for all logs with the status OK from your flask-web-app service.
    • This query can also be written as: status:ok AND service:flask-web-app.
  • host:COMP-A9JNGYK OR host:COMP-J58KAS: This filter query only matches logs from the labeled hosts.
  • @user.status:inactive: This filters for logs with the status inactive nested under the user attribute.
  • @http.status:[200 TO 299] or @http.status:{300 TO 399}: These two filters represent the syntax to query a range for http.status. Ranges can be used across any attribute.

Queries run in the Observability Pipelines Worker are case sensitive. Learn more about writing filter queries in Datadog’s Log Search Syntax.

Add processors

Enter the information for the processors you want to use. Click the Add button to add additional processors. To delete a processor, click the kebab on the right side of the processor and select Delete.

The log processors available

Use this processor to add a field name and value of an environment variable to the log message.

To set up this processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they match the filter query, are sent to the next step in the pipeline.
  2. Enter the field name for the environment variable.
  3. Enter the environment variable name.
  4. Click Add Environment Variable if you want to add another environment variable.
Blocked environment variables

Environment variables that match any of the following patterns are blocked from being added to log messages because the environment variable could contain sensitive data.

  • CONNECTIONSTRING / CONNECTION-STRING / CONNECTION_STRING
  • AUTH
  • CERT
  • CLIENTID / CLIENT-ID / CLIENT_ID
  • CREDENTIALS
  • DATABASEURL / DATABASE-URL / DATABASE_URL
  • DBURL / DB-URL / DB_URL
  • KEY
  • OAUTH
  • PASSWORD
  • PWD
  • ROOT
  • SECRET
  • TOKEN
  • USER

The environment variable is matched to the pattern and not the literal word. For example, PASSWORD blocks environment variables like USER_PASSWORD and PASSWORD_SECRET from getting added to the log messages.

Allowlist

After you have added processors to your pipeline and clicked Next: Install, in the Add environment variable processor(s) allowlist field, enter a comma-separated list of environment variables you want to pull values from and use with this processor.

The allowlist is stored in the environment variable DD_OP_PROCESSOR_ADD_ENV_VARS_ALLOWLIST.

This processor adds a field with the name of the host that sent the log. For example, hostname: 613e197f3526. Note: If the hostname already exists, the Worker throws an error and does not overwrite the existing hostname.

To set up this processor:

  • Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.

Use this processor with Vector Remap Language (VRL) to modify and enrich your logs. VRL is an expression-oriented, domain specific language designed for transforming logs. It features a simple syntax and built-in functions for observability use cases. You can use custom functions in the following ways:

See Custom functions for the full list of available functions.

See Remap Reserved Attributes on how to use the Custom Processor to manually and dynamically remap attributes.

To set up this processor:

  • If you have not created any functions yet, click Add custom processor and follow the instructions in Add a function to create a function.
  • If you have already added custom functions, click Manage custom processors. Click on a function in the list to edit or delete it. You can use the search bar to find a function by its name. Click Add Custom Processor to add a function.
Add a function
  1. Enter a name for your custom processor.
  2. Add your script to modify your logs using custom functions. You can also click Autofill with Example and select one of the common use cases to get started. Click the copy icon for the example script and paste it into your script. See Get Started with the Custom Processor for more information.
  3. Optionally, check Drop events on error if you want to drop events that encounter an error during processing.
  4. Enter a sample log event.
  5. Click Run to preview how the functions process the log. After the script has run, you can see the output for the log.
  6. Click Save.

The deduplicate processor removes copies of data to reduce volume and noise. It caches 5,000 messages at a time and compares your incoming logs traffic against the cached messages. For example, this processor can be used to keep only unique warning logs in the case where multiple identical warning logs are sent in succession.

To set up the deduplicate processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. Deduped logs and logs that do not match the filter query are sent to the next step in the pipeline.
  2. In the Type of deduplication dropdown menu, select whether you want to Match on or Ignore the fields specified below.
    • If Match is selected, then after a log passes through, future logs that have the same values for all of the fields you specify below are removed.
    • If Ignore is selected, then after a log passes through, future logs that have the same values for all of their fields, except the ones you specify below, are removed.
  3. Enter the fields you want to match on, or ignore. At least one field is required, and you can specify a maximum of three fields.
    • Use the path notation <OUTER_FIELD>.<INNER_FIELD> to match subfields. See the Path notation example below.
  4. Click Add field to add additional fields you want to filter on.
Path notation example

For the following message structure:

{
    "outer_key": {
        "inner_key": "inner_value",
        "a": {
            "double_inner_key": "double_inner_value",
            "b": "b value"
        },
        "c": "c value"
    },
    "d": "d value"
}
  • Use outer_key.inner_key to refer to the key with the value inner_value.
  • Use outer_key.inner_key.double_inner_key to refer to the key with the value double_inner_value.

The remap processor can add, drop, or rename fields within your individual log data. Use this processor to enrich your logs with additional context, remove low-value fields to reduce volume, and standardize naming across important attributes. Select add field, drop field, or rename field in the dropdown menu to get started.

See the Remap Reserved Attributes guide on how to use the Edit Fields processor to remap attributes.

Add field

Use add field to append a new key-value field to your log.

To set up the add field processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the field and value you want to add. To specify a nested field for your key, use the path notation: <OUTER_FIELD>.<INNER_FIELD>. All values are stored as strings. Note: If the field you want to add already exists, the Worker throws an error and the existing field remains unchanged.
Drop field

Use drop field to drop a field from logging data that matches the filter you specify below. It can delete objects, so you can use the processor to drop nested keys.

To set up the drop field processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the key of the field you want to drop. To specify a nested field for your specified key, use the path notation: <OUTER_FIELD>.<INNER_FIELD>. Note: If your specified key does not exist, your log will be unimpacted.
Rename field

Use rename field to rename a field within your log.

To set up the rename field processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the name of the field you want to rename in the Source field. To specify a nested field for your key, use the path notation: <OUTER_FIELD>.<INNER_FIELD>. Once renamed, your original field is deleted unless you enable the Preserve source tag checkbox described below.
    Note: If the source key you specify doesn’t exist, a default null value is applied to your target.
  3. In the Target field, enter the name you want the source field to be renamed to. To specify a nested field for your specified key, use the path notation: <OUTER_FIELD>.<INNER_FIELD>.
    Note: If the target field you specify already exists, the Worker throws an error and does not overwrite the existing target field.
  4. Optionally, check the Preserve source tag box if you want to retain the original source field and duplicate the information from your source key to your specified target key. If this box is not checked, the source key is dropped after it is renamed.
Path notation example

For the following message structure:

{
    "outer_key": {
        "inner_key": "inner_value",
        "a": {
            "double_inner_key": "double_inner_value",
            "b": "b value"
        },
        "c": "c value"
    },
    "d": "d value"
}
  • Use outer_key.inner_key to refer to the key with the value inner_value.
  • Use outer_key.inner_key.double_inner_key to refer to the key with the value double_inner_value.

Use this processor to enrich your logs with information from a reference table, which could be a local file or database.

To set up the enrichment table processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the source attribute of the log. The source attribute’s value is what you want to find in the reference table.
  3. Enter the target attribute. The target attribute’s value stores, as a JSON object, the information found in the reference table.
  4. Select the type of reference table you want to use, File or GeoIP.
    • For the File type:
      1. Enter the file path.
        Note: All file paths are made relative to the configuration data directory, which is /var/lib/observability-pipelines-worker/config/ by default. See Advanced Configurations for more information. The file must be owned by the observability-pipelines-worker group and observability-pipelines-worker user, or at least readable by the group or user.
      2. Enter the column name. The column name in the enrichment table is used for matching the source attribute value. See the Enrichment file example.
    • For the GeoIP type, enter the GeoIP path.
Enrichment file example

For this example, merchant_id is used as the source attribute and merchant_info as the target attribute.

This is the example reference table that the enrichment processor uses:

merch_idmerchant_namecitystate
803Andy’s OttomansBoiseIdaho
536Cindy’s CouchesBoulderColorado
235Debra’s BenchesLas VegasNevada

merch_id is set as the column name the processor uses to find the source attribute’s value. Note: The source attribute’s value does not have to match the column name.

If the enrichment processor receives a log with "merchant_id":"536":

  • The processor looks for the value 536 in the reference table’s merch_id column.
  • After it finds the value, it adds the entire row of information from the reference table to the merchant_info attribute as a JSON object:
merchant_info {
    "merchant_name":"Cindy's Couches",
    "city":"Boulder",
    "state":"Colorado"
}

This processor filters for logs that match the specified filter query and drops all non-matching logs. If a log is dropped at this processor, then none of the processors below this one receives that log. This processor can filter out unnecessary logs, such as debug or warning logs.

To set up the filter processor:

  • Define a filter query. The query you specify filters for and passes on only logs that match it, dropping all other logs.

Many types of logs are meant to be used for telemetry to track trends, such as KPIs, over long periods of time. Generating metrics from your logs is a cost-effective way to summarize log data from high-volume logs, such as CDN logs, VPC flow logs, firewall logs, and networks logs. Use the generate metrics processor to generate either a count metric of logs that match a query or a distribution metric of a numeric value contained in the logs, such as a request duration.

Note: The metrics generated are custom metrics and billed accordingly. See Custom Metrics Billing for more information.

To set up the processor:

Click Manage Metrics to create new metrics or edit existing metrics. This opens a side panel.

  • If you have not created any metrics yet, enter the metric parameters as described in the Add a metric section to create a metric.
  • If you have already created metrics, click on the metric’s row in the overview table to edit or delete it. Use the search bar to find a specific metric by its name, and then select the metric to edit or delete it. Click Add Metric to add another metric.
Add a metric
  1. Enter a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they match the filter query, are sent to the next step in the pipeline. Note: Since a single processor can generate multiple metrics, you can define a different filter query for each metric.
  2. Enter a name for the metric.
  3. In the Define parameters section, select the metric type (count, gauge, or distribution). See the Count metric example and Distribution metric example. Also see Metrics Types for more information.
    • For gauge and distribution metric types, select a log field which has a numeric (or parseable numeric string) value that is used for the value of the generated metric.
    • For the distribution metric type, the log field’s value can be an array of (parseable) numerics, which is used for the generated metric’s sample set.
    • The Group by field determines how the metric values are grouped together. For example, if you have hundreds of hosts spread across four regions, grouping by region allows you to graph one line for every region. The fields listed in the Group by setting are set as tags on the configured metric.
  4. Click Add Metric.
Metrics Types

You can generate these types of metrics for your logs. See the Metrics Types and Distributions documentation for more details.

Metric typeDescriptionExample
COUNTRepresents the total number of event occurrences in one time interval. This value can be reset to zero, but cannot be decreased.You want to count the number of logs with status:error.
GAUGERepresents a snapshot of events in one time interval.You want to measure the latest CPU utilization per host for all logs in the production environment.
DISTRIBUTIONRepresent the global statistical distribution of a set of values calculated across your entire distributed infrastructure in one time interval.You want to measure the average time it takes for an API call to be made.
Count metric example

For this status:error log example:

{"status": "error", "env": "prod", "host": "ip-172-25-222-111.ec2.internal"}

To create a count metric that counts the number of logs that contain "status":"error" and groups them by env and host, enter the following information:

Input parametersValue
Filter query@status:error
Metric namestatus_error_total
Metric typeCount
Group byenv, prod
Distribution metric example

For this example of an API response log:

{
    "timestamp": "2018-10-15T17:01:33Z",
    "method": "GET",
    "status": 200,
    "request_body": "{"information"}",
    "response_time_seconds: 10
}

To create a distribution metric that measures the average time it takes for an API call to be made, enter the following information:

Input parametersValue
Filter query@method
Metric namestatus_200_response
Metric typeDistribution
Select a log attributeresponse_time_seconds
Group bymethod

This processor parses logs using the grok parsing rules that are available for a set of sources. The rules are automatically applied to logs based on the log source. Therefore, logs must have a source field with the source name. If this field is not added when the log is sent to the Observability Pipelines Worker, you can use the Add field processor to add it.

If the source field of a log matches one of the grok parsing rule sets, the log’s message field is checked against those rules. If a rule matches, the resulting parsed data is added in the message field as a JSON object, overwriting the original message.

If there isn’t a source field on the log, or no rule matches the log message, then no changes are made to the log and it is sent to the next step in the pipeline.

Datadog’s Grok patterns differ from the standard Grok pattern, where Datadog’s Grok implementation provides:

  • Matchers that include options for how you define parsing rules
  • Filters for post-processing of extracted data
  • A set of built-in patterns tailored to common log formats

See Parsing for more information on Datadog’s Grok patterns.

To set up the grok parser, define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they match the filter query, are sent to the next step in the pipeline.

To test log samples for out-of-the-box rules:

  1. Click the Preview Library Rules button.
  2. Search or select a source in the dropdown menu.
  3. Enter a log sample to test the parsing rules for that source.

To add a custom parsing rule:

  1. Click Add Custom Rule.
  2. If you want to clone a library rule, select Clone library rule and then the library source from the dropdown menu.
  3. If you want to create a custom rule, select Custom and then enter the source. The parsing rules are applied to logs with that source.
  4. Enter log samples to test the parsing rules.
  5. Enter the rules for parsing the logs. See Parsing for more information on writing parsing rules with Datadog Grok patterns.
    Note: The url, useragent, and csv filters are not available.
  6. Click Advanced Settings if you want to add helper rules. See Using helper rules to factorize multiple parsing rules for more information.
  7. Click Add Rule.

This processor parses the specified JSON field into objects. For example, if you have a message field that contains stringified JSON:

{
    "foo": "bar",
    "team": "my-team",
    "message": "{\"level\":\"info\",\"timestamp\":\"2024-01-15T10:30:00Z\",\"service\":\"user-service\",\"user_id\":\"12345\",\"action\":\"login\",\"success\":true,\"ip_address\":\"192.168.1.100\"}"
    "app_id":"streaming-services",
    "ddtags": [
    "kube_service:my-service",
    "k8_deployment :your-host"
    ]
}

Use the Parse JSON processor to parse the message field so the message field has all the attributes within a nested object.

The parse json processor with message as the field to parse on

This output contains the message field with the parsed JSON:

{
    "foo": "bar",
    "team": "my-team",
    "message": {
        "action": "login",
        "ip_address": "192.168.1.100",
        "level": "info",
        "service": "user-service",
        "success": true,
        "timestamp": "2024-01-15T10:30:00Z",
        "user_id": "12345"
    }
    "app_id":"streaming-services",
    "ddtags": [
    "kube_service:my-service",
    "k8_deployment :your-host"
    ]
}

To set up this processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the name of the field you want to parse JSON on.
    Note: The parsed JSON overwrites what was originally contained in the field.

This processor parses Extensible Markup Language (XML) so the data can be processed and sent to different destinations. XML is a log format used to store and transport structured data. It is organized in a tree-like structure to represent nested information and uses tags and attributes to define the data. For example, this is XML data using only tags (<recipe>,<type>, and <name>) and no attributes:

<recipe>
    <type>pasta</type>
    <name>Carbonara</name>
</recipe>

This is an XML example where the tag recipe has the attribute type:

<recipe>
    <recipe type="pasta">
    <name>Carbonara</name>
</recipe>

The following image shows a Windows Event 4625 log in XML, next to the same log parsed and output in JSON. By parsing the XML log, the size of the log event was reduced by approximately 30%.

The XML log and the resulting parsed log in JSON

To set up this processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they match the filter query, are sent to the next step in the pipeline.
  2. Enter the path to the log field on which you want to parse XML. Use the path notation <OUTER_FIELD>.<INNER_FIELD> to match subfields. See the Path notation example below.
  3. Optionally, in the Enter text key field, input the key name to use for the text node when XML attributes are appended. See the text key example. If the field is left empty, value is used as the key name.
  4. Optionally, select Always use text key if you want to store text inside an object using the text key even when no attributes exist.
  5. Optionally, toggle Include XML attributes on if you want to include XML attributes. You can then choose to add the attribute prefix you want to use. See attribute prefix example. If the field is left empty, the original attribute key is used.
  6. Optionally, select if you want to convert data types into numbers, Booleans, or nulls.
    • If Numbers is selected, numbers are parsed as integers and floats.
    • If Booleans is selected, true and false are parsed as Booleans.
    • If Nulls is selected, the string null is parsed as null.
Path notation example

For the following message structure:

{
    "outer_key": {
        "inner_key": "inner_value",
        "a": {
            "double_inner_key": "double_inner_value",
            "b": "b value"
        },
        "c": "c value"
    },
    "d": "d value"
}
  • Use outer_key.inner_key to refer to the key with the value inner_value.
  • Use outer_key.inner_key.double_inner_key to refer to the key with the value double_inner_value.
Always use text key example

If Always use text key is selected, the text key is the default (value), and you have the following XML:

<recipe>
    <recipe type="pasta">
    <name>Carbonara</name>
</recipe>

The XML is converted to:

{
    "recipe": {
        "type": "pasta",
        "value": "Carbonara"
        }
}
Text key example

If the key is text and you have the following XML:

<recipe>
    <recipe type="pasta">
    <name>Carbonara</name>
</recipe>

The XML is converted to:

{
    "recipe": {
        "type": "pasta",
        "text": "Carbonara"
        }
}
Attribute prefix example

If you enable Include XML attributes, the attribute is added as a prefix to each XML attribute. For example, if the attribute prefix is @ and you have the following XML:

<recipe type="pasta">Carbonara</recipe>

Then it is converted to the JSON:

{
    "recipe": {
        "@type": "pasta",
        "<text key>": "Carbonara"
        }
}

The quota processor measures the logging traffic for logs that match the filter you specify. When the configured daily quota is met inside the 24-hour rolling window, the processor can either keep or drop additional logs, or send them to a storage bucket. For example, you can configure this processor to drop new logs or trigger an alert without dropping logs after the processor has received 10 million events from a certain service in the last 24 hours.

You can also use field-based partitioning, such as service, env, status. Each unique fields uses a separate quota bucket with its own daily quota limit. See Partition example for more information.

Notes:

  • Each pipeline can have up to 1000 buckets. If you need to increase the bucket limit, contact your account manager.
  • The pipeline uses the name of the quota to identify the quota across multiple Remote Configuration deployments of the Worker.

To set up the quota processor:

  1. Enter a name for the quota processor.
  2. Define a filter query. Only logs that match the specified filter query are counted towards the daily limit.
    • Logs that match the quota filter and are within the daily quota are sent to the next step in the pipeline.
    • Logs that do not match the quota filter are sent to the next step of the pipeline.
  3. In the Unit for quota dropdown menu, select if you want to measure the quota by the number of Events or by the Volume in bytes.
  4. Set the daily quota limit and select the unit of magnitude for your desired quota.
  5. Optional, Click Add Field if you want to set a quota on a specific service or region field.
    a. Enter the field name you want to partition by. See the Partition example for more information.
    i. Select the Ignore when missing if you want the quota applied only to events that match the partition. See the Ignore when missing example for more information.
    ii. Optional: Click Overrides if you want to set different quotas for the partitioned field.
    - Click Download as CSV for an example of how to structure the CSV.
    - Drag and drop your overrides CSV to upload it. You can also click Browse to select the file to upload it. See the Overrides example for more information.
    b. Click Add Field if you want to add another partition.
  6. In the When quota is met dropdown menu, select if you want to drop events, keep events, or send events to overflow destination, when the quota has been met.
    1. If you select send events to overflow destination, an overflow destination is added with the following cloud storage options: Amazon S3, Azure Blob, and Google Cloud.
    2. Select the cloud storage you want to send overflow logs to. See the setup instructions for your cloud storage: Amazon S3, Azure Blob Storage, or Google Cloud Storage.

Examples

Partition example

Use Partition by if you want to set a quota on a specific service or region. For example, if you want to set a quota for 10 events per day and group the events by the service field, enter service into the Partition by field.

Example for the “ignore when missing” option

Select Ignore when missing if you want the quota applied only to events that match the partition. For example, if the Worker receives the following set of events:

{"service":"a", "source":"foo", "message": "..."}
{"service":"b", "source":"bar", "message": "..."}
{"service":"b", "message": "..."}
{"source":"redis", "message": "..."}
{"message": "..."}

And the Ignore when missing is selected, then the Worker:

  • creates a set for logs with service:a and source:foo
  • creates a set for logs with service:b and source:bar
  • ignores the last three events

The quota is applied to the two sets of logs and not to the last three events.

If the Ignore when missing is not selected, the quota is applied to all five events.

Overrides example

If you are partitioning by service and have two services: a and b, you can use overrides to apply different quotas for them. For example, if you want service:a to have a quota limit of 5,000 bytes and service:b to have a limit of 50 events, the override rules look like this:

ServiceTypeLimit
aBytes5,000
bEvents50

The reduce processor groups multiple log events into a single log, based on the fields specified and the merge strategies selected. Logs are grouped at 10-second intervals. After the interval has elapsed for the group, the reduced log for that group is sent to the next step in the pipeline.

To set up the reduce processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. Reduced logs and logs that do not match the filter query are sent to the next step in the pipeline.
  2. In the Group By section, enter the field you want to group the logs by.
  3. Click Add Group by Field to add additional fields.
  4. In the Merge Strategy section:
    • In On Field, enter the name of the field you want to merge the logs on.
    • Select the merge strategy in the Apply dropdown menu. This is the strategy used to combine events. See the following Merge strategies section for descriptions of the available strategies.
    • Click Add Merge Strategy to add additional strategies.
Merge strategies

These are the available merge strategies for combining log events.

NameDescription
ArrayAppends each value to an array.
ConcatConcatenates each string value, delimited with a space.
Concat newlineConcatenates each string value, delimited with a newline.
Concat rawConcatenates each string value, without a delimiter.
DiscardDiscards all values except the first value that was received.
Flat uniqueCreates a flattened array of all unique values that were received.
Longest arrayKeeps the longest array that was received.
MaxKeeps the maximum numeric value that was received.
MinKeeps the minimum numeric value that was received.
RetainDiscards all values except the last value that was received. Works as a way to coalesce by not retaining `null`.
Shortest arrayKeeps the shortest array that was received.
SumSums all numeric values that were received.

Use this processor to remap logs to Open Cybersecurity Schema Framework (OCSF) events. OCSF schema event classes are set for a specific log source and type. You can add multiple mappings to one processor. Note: Datadog recommends that the OCSF processor be the last processor in your pipeline, so that remapping is done after the logs have been processed by all the other processors.

To set up this processor:

Click Manage mappings. This opens a modal:

  • If you have already added mappings, click on a mapping in the list to edit or delete it. You can use the search bar to find a mapping by its name. Click Add Mapping if you want to add another mapping. Select Library Mapping or Custom Mapping and click Continue.
  • If you have not added any mappings yet, select Library Mapping or Custom Mapping. Click Continue.

Add a mapping

  1. Select the log type in the dropdown menu.
  2. Define a filter query. Only logs that match the specified filter query are remapped. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  3. Review the sample source log and the resulting OCSF output.
  4. Click Save Mapping.

Library mappings

These are the library mappings available:

Log SourceLog TypeOCSF CategorySupported OCSF versions
AWS CloudTrailType: Management
EventName: ChangePassword
Account Change (3001)1.3.0
1.1.0
Google Cloud AuditSetIamPolicyAccount Change (3001)1.3.0
1.1.0
Google Cloud AuditCreateSinkAccount Change (3001)1.3.0
1.1.0
Google Cloud AuditUpdateSyncAccount Change (3001)1.3.0
1.1.0
Google Cloud AuditCreateBucketAccount Change (3001)1.3.0
1.1.0
GitHubCreate UserAccount Change (3001)1.1.0
Google Workspace AdminaddPrivilegeUser Account Management (3005)1.1.0
OktaUser session startAuthentication (3002)1.1.0
Microsoft 365 DefenderIncidentIncident Finding (2005)1.3.0
1.1.0
Palo Alto NetworksTrafficNetwork Activity (4001)1.1.0

When you set up a custom mapping, if you try to close or exit the modal, you are prompted to export your mapping. Datadog recommends that you export your mapping to save what you have set up so far. The exported mapping is saved as a JSON file.

To set up a custom mapping:

  1. Optionally, add a name for the mapping. The default name is Custom Authentication.
  2. Define a filter query. Only logs that match the specified filter query are remapped. All logs, regardless of whether they match the filter query, are sent to the next step in the pipeline.
  3. Select the OCSF event category from the dropdown menu.
  4. Select the OCSF event class from the dropdown menu.
  5. Enter a log sample so that you can reference it when you add fields.
  6. Click Continue.
  7. Select any OCSF profiles that you want to add. See OCSF Schema Browser for more information.
  8. All required fields are shown. Enter the required Source Logs Fields and Fallback Values for them. If you want to manually add additional fields, click + Field. Click the trash can icon to delete a field. Note: Required fields cannot be deleted.
    • The fallback value is used for the OCSF field if the log doesn’t have the source log field.
    • You can add multiple fields for Source Log Fields. For example, Okta’s user.system.start logs have either the eventType or legacyEventType field. You can map both fields to the same OCSF field.
    • If you have your own OCSF mappings in JSON or saved a previous mapping that you want to use, click Import Configuration File.
  9. Click Continue.
  10. Some log source values must be mapped to OCSF values. For example, the values of a source log’s severity field that is mapped to the OCSF’s severity_id field, must be mapped to the OCSF severity_id’s values. See severity_id in Authentication for a list of OCSF values. An example of mapping severity values:
    Log source valueOCSF value
    INFOInformational
    WARNMedium
    ERRORHigh
  11. All values that are required to be mapped to an OCSF value are listed. Click + Add Row if you want to map additional values.
  12. Click Save Mapping.

Filter query syntax

Each processor has a corresponding filter query in their fields. Processors only process logs that match their filter query. And for all processors except the filter processor, logs that do not match the query are sent to the next step of the pipeline. For the filter processor, logs that do not match the query are dropped.

For any attribute, tag, or key:value pair that is not a reserved attribute, your query must start with @. Conversely, to filter reserved attributes, you do not need to append @ in front of your filter query.

For example, to filter out and drop status:info logs, your filter can be set as NOT (status:info). To filter out and drop system-status:info, your filter must be set as NOT (@system-status:info).

Filter query examples:

  • NOT (status:debug): This filters for only logs that do not have the status DEBUG.
  • status:ok service:flask-web-app: This filters for all logs with the status OK from your flask-web-app service.
    • This query can also be written as: status:ok AND service:flask-web-app.
  • host:COMP-A9JNGYK OR host:COMP-J58KAS: This filter query only matches logs from the labeled hosts.
  • @user.status:inactive: This filters for logs with the status inactive nested under the user attribute.
  • @http.status:[200 TO 299] or @http.status:{300 TO 399}: These two filters represent the syntax to query a range for http.status. Ranges can be used across any attribute.

Queries run in the Observability Pipelines Worker are case sensitive. Learn more about writing filter queries in Datadog’s Log Search Syntax.

This processor samples your logging traffic for a representative subset at the rate that you define, dropping the remaining logs. As an example, you can use this processor to sample 20% of logs from a noisy non-critical service.

The sampling only applies to logs that match your filter query and does not impact other logs. If a log is dropped at this processor, none of the processors below receives that log.

To set up the sample processor:

  1. Define a filter query. Only logs that match the specified filter query are sampled at the specified retention rate below. The sampled logs and the logs that do not match the filter query are sent to the next step in the pipeline.
  2. Enter your desired sampling rate in the Retain field. For example, entering 2 means 2% of logs are retained out of all the logs that match the filter query.
  3. Optionally, enter a Group By field to create separate sampling groups for each unique value for that field. For example, status:error and status:info are two unique field values. Each bucket of events with the same field is sampled independently. Click Add Field if you want to add more fields to partition by. See the group-by example.
Group-by example

If you have the following setup for the sample processor:

  • Filter query: env:staging
  • Retain: 40% of matching logs
  • Group by: status and host
The sample processor with example values

Then, 40% of logs for each unique combination of status and service from env:staging is retained. For example:

  • 40% of logs with status:info and service:networks are retained.
  • 40% of logs with status:info and service:core-web are retained.
  • 40% of logs with status:error and service:networks are retained.
  • 40% of logs with status:error and service:core-web are retained.

The Sensitive Data Scanner processor scans logs to detect and redact or hash sensitive information such as PII, PCI, and custom sensitive data. You can pick from Datadog’s library of predefined rules, or input custom Regex rules to scan for sensitive data.

To set up the processor:

  1. Define a filter query. Only logs that match the specified filter query are scanned and processed. All logs are sent to the next step in the pipeline, regardless of whether they match the filter query.
  2. Click Add Scanning Rule.
  3. Select one of the following:
  1. In the dropdown menu, select the library rule you want to use.
  2. Recommended keywords are automatically added based on the library rule selected. After the scanning rule has been added, you can add additional keywords or remove recommended keywords.
  3. In the Define rule target and action section, select if you want to scan the Entire Event, Specific Attributes, or Exclude Attributes in the dropdown menu.
    • If you are scanning the entire event, you can optionally exclude specific attributes from getting scanned. Use path notation (outer_key.inner_key) to access nested keys. For specified attributes with nested data, all nested data is excluded.
    • If you are scanning specific attributes, specify which attributes you want to scan. Use path notation (outer_key.inner_key) to access nested keys. For specified attributes with nested data, all nested data is scanned.
  4. For Define actions on match, select the action you want to take for the matched information. Note: Redaction, partial redaction, and hashing are all irreversible actions.
    • Redact: Replaces all matching values with the text you specify in the Replacement text field.
    • Partially Redact: Replaces a specified portion of all matched data. In the Redact section, specify the number of characters you want to redact and which part of the matched data to redact.
    • Hash: Replaces all matched data with a unique identifier. The UTF-8 bytes of the match are hashed with the 64-bit fingerprint of FarmHash.
  5. Optionally, click Add Field to add tags you want to associate with the matched events.
  6. Add a name for the scanning rule.
  7. Optionally, add a description for the rule.
  8. Click Save.
Path notation example

For the following message structure:

{
    "outer_key": {
        "inner_key": "inner_value",
        "a": {
            "double_inner_key": "double_inner_value",
            "b": "b value"
        },
        "c": "c value"
    },
    "d": "d value"
}
  • Use outer_key.inner_key to refer to the key with the value inner_value.
  • Use outer_key.inner_key.double_inner_key to refer to the key with the value double_inner_value.
Add additional keywords

After adding scanning rules from the library, you can edit each rule separately and add additional keywords to the keyword dictionary.

  1. Navigate to your pipeline.
  2. In the Sensitive Data Scanner processor with the rule you want to edit, click Manage Scanning Rules.
  3. Toggle Use recommended keywords if you want the rule to use them. Otherwise, add your own keywords to the Create keyword dictionary field. You can also require that these keywords be within a specified number of characters of a match. By default, keywords must be within 30 characters before a matched value.
  4. Click Update.
  1. In the Define match conditions section, specify the regex pattern to use for matching against events in the Define the regex field. Enter sample data in the Add sample data field to verify that your regex pattern is valid. Sensitive Data Scanner supports Perl Compatible Regular Expressions (PCRE), but the following patterns are not supported:
    • Backreferences and capturing sub-expressions (lookarounds)
    • Arbitrary zero-width assertions
    • Subroutine references and recursive patterns
    • Conditional patterns
    • Backtracking control verbs
    • The \C “single-byte” directive (which breaks UTF-8 sequences)
    • The \R newline match
    • The \K start of match reset directive
    • Callouts and embedded code
    • Atomic grouping and possessive quantifiers
  2. For Create keyword dictionary, add keywords to refine detection accuracy when matching regex conditions. For example, if you are scanning for a sixteen-digit Visa credit card number, you can add keywords like visa, credit, and card. You can also require that these keywords be within a specified number of characters of a match. By default, keywords must be within 30 characters before a matched value.
  3. In the Define rule target and action section, select if you want to scan the Entire Event, Specific Attributes, or Exclude Attributes in the dropdown menu.
    • If you are scanning the entire event, you can optionally exclude specific attributes from getting scanned. Use path notation (outer_key.inner_key) to access nested keys. For specified attributes with nested data, all nested data is excluded.
    • If you are scanning specific attributes, specify which attributes you want to scan. Use path notation (outer_key.inner_key) to access nested keys. For specified attributes with nested data, all nested data is scanned.
  4. For Define actions on match, select the action you want to take for the matched information. Note: Redaction, partial redaction, and hashing are all irreversible actions.
    • Redact: Replaces all matching values with the text you specify in the Replacement text field.
    • Partially Redact: Replaces a specified portion of all matched data. In the Redact section, specify the number of characters you want to redact and which part of the matched data to redact.
    • Hash: Replaces all matched data with a unique identifier. The UTF-8 bytes of the match is hashed with the 64-bit fingerprint of FarmHash.
  5. Optionally, click Add Field to add tags you want to associate with the matched events.
  6. Add a name for the scanning rule.
  7. Optionally, add a description for the rule.
  8. Click Add Rule.
Path notation example

For the following message structure:

{
    "outer_key": {
        "inner_key": "inner_value",
        "a": {
            "double_inner_key": "double_inner_value",
            "b": "b value"
        },
        "c": "c value"
    },
    "d": "d value"
}
  • Use outer_key.inner_key to refer to the key with the value inner_value.
  • Use outer_key.inner_key.double_inner_key to refer to the key with the value double_inner_value.

This processor splits nested arrays into distinct events so that you can query, filter, alert, and visualize data within an array. The arrays need to already be parsed. For example, the processor can process [item_1, item_2], but cannot process "[item_1, item2]". The items in the array can be JSON objects, strings, integers, floats, or Booleans. All unmodified fields are added to the child events. For example, if you are sending the following items to the Observability Pipelines Worker:

{
    "host": "my-host",
    "env": "prod",
    "batched_items": [item_1, item_2]
}

Use the Split Array processor to send each item in batched_items as a separate event:

{
    "host": "my-host",
    "env": "prod",
    "batched_items": item_1
}
{
    "host": "my-host",
    "env": "prod",
    "batched_items": item_2
}

See the split array example for a more detailed example.

To set up this processor:

Click Manage arrays to split to add an array to split or edit an existing array to split. This opens a side panel.

  • If you have not created any arrays yet, enter the array parameters as described in the Add a new array section below.
  • If you have already created arrays, click on the array’s row in the table to edit or delete it. Use the search bar to find a specific array, and then select the array to edit or delete it. Click Add Array to Split to add a new array.
Add a new array
  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they match the filter query, are sent to the next step in the pipeline.
  2. Enter the path to the array field. Use the path notation <OUTER_FIELD>.<INNER_FIELD> to match subfields. See the Path notation example below.
  3. Click Save.
Split array example

This is an example event:

{
    "ddtags": ["tag1", "tag2"],
    "host": "my-host",
    "env": "prod",
    "message": {
        "isMessage": true,
        "myfield" : {
            "timestamp":14500000,
            "firstarray":["one", 2]
        },
    },
    "secondarray": [
    {
        "some":"json",
        "Object":"works"
    }, 44]
}

If the processor is splitting the arrays "message.myfield.firstarray" and "secondarray", it outputs child events that are identical to the parent event, except for the values of "message.myfield.firstarray" and "secondarray", which becomes a single item from their respective original array. Each child event is a unique combination of items from the two arrays, so four child events (2 items * 2 items = 4 combinations) are created in this example.

{
    "ddtags": ["tag1", "tag2"],
    "host": "my-host",
    "env": "prod",
    "message": {
        "isMessage": true,
        "myfield" : {"timestamp":14500000, "firstarray":"one"},
    },
    "secondarray": {
        "some":"json",
        "Object":"works"
    }
}
{
    "ddtags": ["tag1", "tag2"],
    "host": "my-host",
    "env": "prod",
    "message": {
        "isMessage": true,
        "myfield" : {"timestamp":14500000, "firstarray":"one"},
        },
    "secondarray": 44
}
{
    "ddtags": ["tag1", "tag2"],
    "host": "my-host",
    "env": "prod",
    "message": {
        "isMessage": true,
        "myfield" : {"timestamp":14500000, "firstarray":2},
        },
    "secondarray": {
            "some":"json",
            "object":"works"
        }
}
{
    "ddtags": ["tag1", "tag2"],
    "host": "my-host",
    "env": "prod",
    "message": {
        "isMessage": true,
        "myfield" : {"timestamp":14500000, "firstarray":2},
        },
    "secondarray": 44
}
Path notation example

For the following message structure:

{
    "outer_key": {
        "inner_key": "inner_value",
        "a": {
            "double_inner_key": "double_inner_value",
            "b": "b value"
        },
        "c": "c value"
    },
    "d": "d value"
}
  • Use outer_key.inner_key to refer to the key with the value inner_value.
  • Use outer_key.inner_key.double_inner_key to refer to the key with the value double_inner_value.

For logs coming from the Datadog Agent, use this processor to exclude or include specific tags in the Datadog tags (ddtags) array. Tags that are excluded or not included are dropped and may reduce your outbound log volume.

To set up the processor:

  1. Define a filter query. Only matching logs are processed by this processor, but all logs continue to the next step in the pipeline.
  2. Optionally, input a Datadog tags array for the Configure tags section. The supported formats are ["key:value", "key"]. See Define Tags for more information about the key:value format.
  3. In the Configure tags section, choose whether to Exclude tags or Include tags. If you provided a tag array in the previous step, select the tag keys you want to configure. You can also manually add tag keys. Note: You can select up to 100 tags.

Use this processor to set a limit on the number of logs sent within a specific time window. For example, you can set a limit so that only 100 logs are sent per second. Setting a rate limit can help you catch any spikes in log ingestion and prevent unexpected billing costs.

To set up the processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All matched logs get throttled. Logs that are sent within the throttle limit and logs that do not match the filter are sent to the next step. Logs sent after the throttle limit has been reached, are dropped.
  2. Set the throttling rate. This is the number of events allowed for a given bucket during the set time window. Note: This rate limit is applied on a per-worker level. If you scale the number of workers up or down, you may want to adjust the processor rate limit accordingly. You can update the rate limit programmatically using the Observability Pipelines API.
  3. Set the time window.
  4. Optionally, click Add Field if you want to group by a field.

Add another set of processors and destinations

Click the plus sign (+) to the left of the processors to add another set of processors and destinations to the source. See Add additional destinations on adding additional destinations to the processor group.

To delete a processor group, you need to delete all destinations linked to that processor group. When the last destination is deleted, the processor group is removed with it.

Install the Observability Pipelines Worker

  1. Select your platform in the Choose your installation platform dropdown menu.
  2. Enter the Logstash address and port, such as 0.0.0.0:9997. The Observability Pipelines Worker listens on this address for incoming log messages.
  3. Provide the environment variables for each of your selected destinations. See the prerequisites for more information.
    1. Enter the Amazon OpenSearch authentication username.
    2. Enter the Amazon OpenSearch authentication password.
    3. Enter the Amazon OpenSearch endpoint URL. For example, http://<hostname.IP>:9200.

    There are no environment variables to configure for the Amazon Security Lake destination.

    Enter the Google Chronicle endpoint URL. For example, https://chronicle.googleapis.com.

    1. Enter the CrowdStrike HEC ingestion URL.
    2. Enter the CrowdStrike HEC API key.

    There are no environment variables to configure for Datadog Log Management.

    For the Datadog Archives destination, follow the instructions for the cloud provider you are using to archive your logs.

    There are no environment variables to configure.

    There are no environment variables to configure.

    Enter the Azure connection string you created earlier. The connection string gives the Worker access to your Azure Storage bucket.

    To get the connection string:

    1. Navigate to Azure Storage accounts.
    2. Click Access keys under Security and networking in the left navigation menu.
    3. Copy the connection string for the storage account and paste it into the Azure connection string field on the Observability Pipelines Worker installation page.
    1. Enter the Elasticsearch authentication username.
    2. Enter the Elasticsearch authentication password.
    3. Enter the Elasticsearch endpoint URL. For example, http://CLUSTER_ID.LOCAL_HOST_IP.ip.es.io:9200.
    1. Enter the data collection endpoint (DCE).
    2. Enter the client secret.
    1. Enter your New Relic account ID.
    2. Enter your New Relic license key.
    1. Enter the OpenSearch authentication username.
    2. Enter the OpenSearch authentication password.
    3. Enter the OpenSearch endpoint URL. For example, http://<hostname.IP>:9200.

    Enter your SentinelOne write access token. To find your write access token:

    1. Log into the S1 console.
    2. Navigate to the Singularity Data Lake (SDL) API Keys page. To access it from the console, click Visibility on the left menu to go to SDL. Click on your username and then API Keys.
    3. Copy the Logs Access write key and paste it into the SentinelOne Write Access Token field on the Install Observability Pipelines Worker page.

    After you’ve installed the Observability Pipelines Worker and finished setting up the pipeline, see View logs in a SentinelOne cluster for instructions on how to see the logs you sent from Observability Pipelines to the SentinelOne destination.

    Enter the socket destination address, such as 92.12.333.224:5000 or https://somehost:5000. The address must include a port.

    Enter your Splunk HEC token and the base URL of the Splunk instance. See prerequisites for more information.

    The Worker passes the HEC token to the Splunk collection endpoint. After the Observability Pipelines Worker processes the logs, it sends the logs to the specified Splunk instance URL.

    Note: The Splunk HEC destination forwards all logs to the /services/collector/event endpoint regardless of whether you configure your Splunk HEC destination to encode your output in JSON or raw.

    Enter the Sumo Logic HTTP collector URL. See prerequisites for more information.

    Enter the rsyslog or syslog-ng endpoint URL. For example, 127.0.0.1:9997. The Observability Pipelines Worker sends logs to this address and port.

  4. Follow the instructions for your environment to install the Worker.
    1. Click Select API key to choose the Datadog API key you want to use.
    2. Run the command provided in the UI to install the Worker. The command is automatically populated with the environment variables you entered earlier.
      docker run -i -e DD_API_KEY=<DATADOG_API_KEY> \
          -e DD_OP_PIPELINE_ID=<PIPELINE_ID> \
          -e DD_SITE=<DATADOG_SITE> \
          -e <SOURCE_ENV_VARIABLE> \
          -e <DESTINATION_ENV_VARIABLE> \
          -p 8088:8088 \
          datadog/observability-pipelines-worker run
      
      Note: By default, the docker run command exposes the same port the Worker is listening on. If you want to map the Worker’s container port to a different port on the Docker host, use the -p | --publish option in the command:
      -p 8282:8088 datadog/observability-pipelines-worker run
      
    3. Navigate back to the Observability Pipelines installation page and click Deploy.

    See Update Existing Pipelines if you want to make changes to your pipeline’s configuration.

    The Observability Pipelines Worker supports all major Kubernetes distributions, such as:

    • Amazon Elastic Kubernetes Service (EKS)
    • Azure Kubernetes Service (AKS)
    • Google Kubernetes Engine (GKE)
    • Red Hat Openshift
    • Rancher
    1. Download the Helm chart values file. See the full list of configuration options available.
    2. Click Select API key to choose the Datadog API key you want to use.
    3. Add the Datadog chart repository to Helm:
      helm repo add datadog https://helm.datadoghq.com
      
      If you already have the Datadog chart repository, run the following command to make sure it is up to date:
      helm repo update
      
    4. Run the command provided in the UI to install the Worker. The command is automatically populated with the environment variables you entered earlier.
      helm upgrade --install opw \
      -f values.yaml \
      --set datadog.apiKey=<DATADOG_API_KEY> \
      --set datadog.pipelineId=<PIPELINE_ID> \
      --set <SOURCE_ENV_VARIABLES> \
      --set <DESTINATION_ENV_VARIABLES> \
      --set service.ports[0].protocol=TCP,service.ports[0].port=<SERVICE_PORT>,service.ports[0].targetPort=<TARGET_PORT> \
      datadog/observability-pipelines-worker
      
      Note: By default, the Kubernetes Service maps incoming port <SERVICE_PORT> to the port the Worker is listening on (<TARGET_PORT>). If you want to map the Worker’s pod port to a different incoming port of the Kubernetes Service, use the following service.ports[0].port and service.ports[0].targetPort values in the command:
      --set service.ports[0].protocol=TCP,service.ports[0].port=8088,service.ports[0].targetPort=8282
      
    5. Navigate back to the Observability Pipelines installation page and click Deploy.

    See Update Existing Pipelines if you want to make changes to your pipeline’s configuration.

    Self-hosted and self-managed Kubernetes clusters

    If you are running a self-hosted and self-managed Kubernetes cluster, and defined zones with node labels using topology.kubernetes.io/zone, then you can use the Helm chart values file as is. However, if you are not using the label topology.kubernetes.io/zone, you need to update the topologyKey in the values.yaml file to match the key you are using. Or if you run your Kubernetes install without zones, remove the entire topology.kubernetes.io/zone section.

    1. Click Select API key to choose the Datadog API key you want to use.

    2. Run the one-step command provided in the UI to install the Worker.

      Note: The environment variables used by the Worker in /etc/default/observability-pipelines-worker are not updated on subsequent runs of the install script. If changes are needed, update the file manually and restart the Worker.

    If you prefer not to use the one-line installation script, follow these step-by-step instructions:

    1. Set up APT transport for downloading using HTTPS:
      sudo apt-get update
      sudo apt-get install apt-transport-https curl gnupg
      
    2. Run the following commands to set up the Datadog deb repo on your system and create a Datadog archive keyring:
      sudo sh -c "echo 'deb [signed-by=/usr/share/keyrings/datadog-archive-keyring.gpg] https://apt.datadoghq.com/ stable observability-pipelines-worker-2' > /etc/apt/sources.list.d/datadog-observability-pipelines-worker.list"
      sudo touch /usr/share/keyrings/datadog-archive-keyring.gpg
      sudo chmod a+r /usr/share/keyrings/datadog-archive-keyring.gpg
      curl https://keys.datadoghq.com/DATADOG_APT_KEY_CURRENT.public | sudo gpg --no-default-keyring --keyring /usr/share/keyrings/datadog-archive-keyring.gpg --import --batch
      curl https://keys.datadoghq.com/DATADOG_APT_KEY_06462314.public | sudo gpg --no-default-keyring --keyring /usr/share/keyrings/datadog-archive-keyring.gpg --import --batch
      curl https://keys.datadoghq.com/DATADOG_APT_KEY_F14F620E.public | sudo gpg --no-default-keyring --keyring /usr/share/keyrings/datadog-archive-keyring.gpg --import --batch
      curl https://keys.datadoghq.com/DATADOG_APT_KEY_C0962C7D.public | sudo gpg --no-default-keyring --keyring /usr/share/keyrings/datadog-archive-keyring.gpg --import --batch
      
    3. Run the following commands to update your local apt repo and install the Worker:
      sudo apt-get update
      sudo apt-get install observability-pipelines-worker datadog-signing-keys
      
    4. Add your keys, site (for example, datadoghq.com for US1), source, and destination environment variables to the Worker’s environment file:
      sudo cat <<EOF > /etc/default/observability-pipelines-worker
      DD_API_KEY=<DATADOG_API_KEY>
      DD_OP_PIPELINE_ID=<PIPELINE_ID>
      DD_SITE=<DATADOG_SITE>
      <SOURCE_ENV_VARIABLES>
      <DESTINATION_ENV_VARIABLES>
      EOF
      
    5. Start the worker:
      sudo systemctl restart observability-pipelines-worker
      

    See Update Existing Pipelines if you want to make changes to your pipeline’s configuration.

    For RHEL and CentOS, the Observability Pipelines Worker supports versions 8.0 or later.
    1. Click Select API key to choose the Datadog API key you want to use.

    2. Run the one-step command provided in the UI to install the Worker.

      Note: The environment variables used by the Worker in /etc/default/observability-pipelines-worker are not updated on subsequent runs of the install script. If changes are needed, update the file manually and restart the Worker.

    If you prefer not to use the one-line installation script, follow these step-by-step instructions:

    1. Set up the Datadog rpm repo on your system with the below command. Note: If you are running RHEL 8.1 or CentOS 8.1, use repo_gpgcheck=0 instead of repo_gpgcheck=1 in the configuration below.
      cat <<EOF > /etc/yum.repos.d/datadog-observability-pipelines-worker.repo
      [observability-pipelines-worker]
      name = Observability Pipelines Worker
      baseurl = https://yum.datadoghq.com/stable/observability-pipelines-worker-2/\$basearch/
      enabled=1
      gpgcheck=1
      repo_gpgcheck=1
      gpgkey=https://keys.datadoghq.com/DATADOG_RPM_KEY_CURRENT.public
          https://keys.datadoghq.com/DATADOG_RPM_KEY_B01082D3.public
      EOF
      
    2. Update your packages and install the Worker:
      sudo yum makecache
      sudo yum install observability-pipelines-worker
      
    3. Add your keys, site (for example, datadoghq.com for US1), source, and destination environment variables to the Worker’s environment file:
      sudo cat <<-EOF > /etc/default/observability-pipelines-worker
      DD_API_KEY=<API_KEY>
      DD_OP_PIPELINE_ID=<PIPELINE_ID>
      DD_SITE=<SITE>
      <SOURCE_ENV_VARIABLES>
      <DESTINATION_ENV_VARIABLES>
      EOF
      
    4. Start the worker:
      sudo systemctl restart observability-pipelines-worker
      
    5. Navigate back to the Observability Pipelines installation page and click Deploy.

    See Update Existing Pipelines if you want to make changes to your pipeline’s configuration.

    1. Select one of the options in the dropdown to provide the expected log volume for the pipeline:

      OptionDescription
      UnsureUse this option if you are not able to project the log volume or you want to test the Worker. This option provisions the EC2 Auto Scaling group with a maximum of 2 general purpose t4g.large instances.
      1-5 TB/dayThis option provisions the EC2 Auto Scaling group with a maximum of 2 compute optimized instances c6g.large.
      5-10 TB/dayThis option provisions the EC2 Auto Scaling group with a minimum of 2 and a maximum of 5 compute optimized c6g.large instances.
      >10 TB/dayDatadog recommends this option for large-scale production deployments. It provisions the EC2 Auto Scaling group with a minimum of 2 and a maximum of 10 compute optimized c6g.xlarge instances.

      Note: All other parameters are set to reasonable defaults for a Worker deployment, but you can adjust them for your use case as needed in the AWS Console before creating the stack.

    2. Select the AWS region you want to use to install the Worker.

    3. Click Select API key to choose the Datadog API key you want to use.

    4. Click Launch CloudFormation Template to navigate to the AWS Console to review the stack configuration and then launch it. Make sure the CloudFormation parameters are as expected.

    5. Select the VPC and subnet you want to use to install the Worker.

    6. Review and check the necessary permissions checkboxes for IAM. Click Submit to create the stack. CloudFormation handles the installation at this point; the Worker instances are launched, the necessary software is downloaded, and the Worker starts automatically.

    7. Navigate back to the Observability Pipelines installation page and click Deploy.

    See Update Existing Pipelines if you want to make changes to your pipeline’s configuration.

Send logs to the Observability Pipelines Worker over Logstash

To configure Logstash to send logs to the Observability Pipelines Worker, use the following output configuration:

output {
	lumberjack {
		# update these to point to your Observability Pipelines Worker
		hosts => ["127.0.0.1"]
		port => 5044
		ssl_certificate => "/path/to/certificate.crt"
	}
}

Note: Logstash requires SSL to be configured.