---
isPrivate: true
title: (LEGACY) Ingest Amazon S3 Logs with the Observability Pipelines Worker
description: Datadog, the leading service for cloud-scale monitoring.
breadcrumbs: >-
  Docs > Observability Pipelines > (LEGACY) Observability Pipelines
  Documentation > (LEGACY) Observability Pipelines Guides > (LEGACY) Ingest
  Amazon S3 Logs with the Observability Pipelines Worker
---

# (LEGACY) Ingest Amazon S3 Logs with the Observability Pipelines Worker

{% callout %}
# Important note for users on the following Datadog sites: app.ddog-gov.com

{% alert level="danger" %}
This product is not supported for your selected [Datadog site](https://docs.datadoghq.com/getting_started/site.md). ().
{% /alert %}

{% /callout %}

## Overview{% #overview %}

The [Observability Pipelines Worker](https://docs.datadoghq.com/observability_pipelines/legacy.md#observability-pipelines-worker) can ingest logs from many different sources. If you have an Amazon S3 bucket that is receiving logs from an external system, such as AWS CloudTrail or CloudWatch, you can configure the Worker to ingest those logs. The setup uses Observability Pipelines Worker's Amazon S3 source, which requires configuring an Amazon SQS queue to receive event notifications from the S3 bucket. The event notification then informs the Worker to collect the new log events in the S3 bucket.

This guide walks you through the following steps:

1. Create an Amazon SQS Topic to receive S3 event notifications
1. Enable event notifications on the S3 bucket
1. Create an IAM role to provide the Worker only the necessary permissions
1. Configure the Worker to receive notifications from the SQS queue and to collect logs from the S3 bucket
1. Configure the Worker to separate out batched S3 log events

## Prerequisites{% #prerequisites %}

- You have [installed](https://docs.datadoghq.com/observability_pipelines/legacy/setup.md) and [configured](https://docs.datadoghq.com/observability_pipelines/legacy/configurations.md) the Observability Pipelines Worker to collect data from your sources and route it to your destinations.
- You are familiar with [the basics of configuring Observability Pipelines](https://docs.datadoghq.com/observability_pipelines/legacy/configurations.md).

## Create an Amazon SQS topic to receive S3 notifications{% #create-an-amazon-sqs-topic-to-receive-s3-notifications %}

In the Amazon SQS console, provision a new queue specific to this configuration. This keeps any changes you make to it separate from any other log analysis tools that you are using.

1. Go to the [Amazon SQS console](https://console.aws.amazon.com/sqs/home).
1. Click **Create queue** to provision a new queue specific to this configuration.
1. Enter a name for the queue.
1. In the **Access policy** section, click the **Advanced** button.
1. Copy and paste the below example JSON object into the advanced access policy section. It configures the queue and allows the S3 bucket to send event notifications. Replace `${REGION}`, `${AWS_ACCOUNT_ID}`, `${QUEUE_NAME}`, and `${BUCKET_NAME}` with the relevant AWS account information, the queue name, and the bucket name you just entered.
   ```json
     {
     "Version": "2008-10-17",
     "Id": "__default_policy_ID",
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "s3.amazonaws.com"
         },
         "Action": "SQS:SendMessage",
         "Resource": "arn:aws:sqs:${REGION}:${AWS_ACCOUNT_ID}:${QUEUE_NAME}",
         "Condition": {
           "StringEquals": {
             "aws:SourceAccount": "${AWS_ACCOUNT_ID}"
           },
           "StringLike": {
             "aws:SourceArn": "arn:aws:s3:*:*:${BUCKET_NAME}"
           }
         }
       }
     ]
     }
```
1. Leave the other queue options as the defaults.
1. Click **Create queue**.

## Enable event notifications on the S3 bucket{% #enable-event-notifications-on-the-s3-bucket %}

1. In the [Amazon S3 console](https://console.aws.amazon.com/s3/), go to the S3 bucket that is collecting the logs that you want the Worker to ingest.
1. Click the **Properties** tab.
1. Go to the **Event notifications** section, and click **Create event notification**.
1. Enter a name for the event.
1. In the **Event types** section, click **All object create events**. The Worker only responds to object creation events, so those are the only events to which you need to subscribe.
1. In the **Destination** section, select **SQS queue** and then choose the SQS queue you created earlier.
1. Click **Save changes**.

The SQS queue should now be receiving messages for the Worker to process.

If you encounter the "Unable to validate the following destination configurations" error, check that the SQS access policy is set up correctly.

## Create an IAM role for the Worker{% #create-an-iam-role-for-the-worker %}

Create a separate IAM role for the Worker so that only the necessary permissions are provided.

1. Go to the [AWS IAM console](https://console.aws.amazon.com/iam/).
1. In the navigation pane, click **Roles**.
1. Click **Create role**.
1. Select the trusted entity type to which the role is attached.
1. Click **Next**.
1. Click **Create policy**.
1. Click the **JSON** tab. Copy and paste in the minimal permissions that must be attached to the role:
   ```json
   {
     "Version": "2012-10-17",
     "Statement": [
         {
             "Effect": "Allow",
             "Action": [
                 "sqs:DeleteMessage",
                 "s3:GetObject",
                 "sqs:ReceiveMessage",
                 "s3:ListBucket"
             ],
             "Resource": [
                 "arn:aws:s3:::${BUCKET_NAME}/*",
                 "arn:aws:s3:::${BUCKET_NAME}",
                 "arn:aws:sqs:${REGION}:${ACCOUNT_ID}:${QUEUE_NAME}"
             ]
         }
     ]
   }
   ```
1. Replace `${REGION`}, `${AWS_ACCOUNT_ID}`, `${QUEUE_NAME}`, and `${BUCKET_NAME}` with the relevant AWS account information and the queue and bucket names that you are using. You need to further modify the role permissions if you want the role to be attachable to EC2 instances, assumable by users, etc.
1. Click **Next: Tags**. Optionally, add tags.
1. Click **Next: Review**.
1. Enter a name for the policy.
1. Click **Create policy**.

Apply the role to the running Observability Pipelines process. You can do this by attaching the role to an EC2 instance or assuming a role from a given user profile.

## Configure the Worker to receive notifications from the SQS queue{% #configure-the-worker-to-receive-notifications-from-the-sqs-queue %}

1. Use the below source configuration example to set up the Worker to:
   1. Receive the SQS event notifications.
   1. Read the associated logs in the S3 bucket.
   1. Emit the logs to the console.
      ```yaml
        sources:
          cloudtrail:
            type: aws_s3
            region: ${REGION}
            sqs:
              queue_url: ${SQS_URL}
      ```
1. Replace `${REGION}` with the AWS account region. Replace `${SQS_URL}` with the HTTP URL provided in the SQS queue's **Details** section in the console.

See [Amazon S3 source documentation](https://docs.datadoghq.com/observability_pipelines/legacy/reference/sources.md#awss3) for more options.

With the Amazon S3 source set up, you can now add [transforms](https://docs.datadoghq.com/observability_pipelines/legacy/reference/transforms.md) to manipulate the data and [sinks](https://docs.datadoghq.com/observability_pipelines/legacy/reference/sinks.md) to output the logs to destinations based on your use case. See [Configurations](https://docs.datadoghq.com/observability_pipelines/legacy/configurations.md) for more information on sources, transforms, and sinks.

## Configure the Worker to separate batched Amazon S3 log events{% #configure-the-worker-to-separate-batched-amazon-s3-log-events %}

Most services (for example, CloudTrail) send logs to S3 in batches, which means that each event that the Worker receives is composed of multiple logs. In the below example, `Records` is an array of three log events that are batched together.

```json
{
  "Records": [
    {
      "log event 1": "xxxx"
    },
    {
      "log event 2": "xxxx"
    },
    {
      "log event 3": "xxxx"
    }
  ]
}
```

Add the following `explode` and `map` transforms to separate the batched log events into individual events for correct processing for sinks:

```json
transforms:
 explode:
   type: remap
   inputs:
     - cloudtrail
   source: |-
     .message = parse_json!(.message)
     . = unnest!(.message.Records)

 map:
   type: remap
   inputs:
     - explode
   source: |-
     merge!(., .message.Records)
     del(.message)
```

In this example, the `parse_json` function parses the string into JSON.

The `unnest` function separates the batched log events into an array of individual log events.

```
[
   {"Records": {"log event 1": "xxx"}},
   {"Records": {"log event 2": "xxx"}},
   {"Records": {"log event 3": "xxx"}}
]
```

Then the `merge` function collapses the data in `.Records` to the top level so that each log event is an individual log line. The `del` function removes the extraneous field.

```
{"log event 1": "xxx"}
```

```
{"log event 2": "xxx"}
```

```
{"log event 3": "xxx"}
```

### Further reading{% #further-reading %}

- [Working with data using Observability Pipelines](https://docs.datadoghq.com/observability_pipelines/legacy/working_with_data.md)
- [Learn more about Observability Pipelines configurations](https://docs.datadoghq.com/observability_pipelines/legacy/configurations.md)
