Working with Data

Overview

Observability Pipelines enables you to shape and transform observability data. Similar to Logging without Limits™ pipelines, you can configure pipelines for Observability Pipelines that are composed of a series of transform components. These transforms allow you to parse, structure, and enrich data with built-in type safety.

Remap data with VRL

Vector Remap Language (VRL) is an expression-oriented, domain specific language designed for transforming observability data (logs and metrics). It features a simple syntax and built-in functions tailored to observability use cases.

Vector Remap Language is supported in the remap transform.

Remap transforms act on a single event and can be used to transform them or specify conditions for routing and filtering. You can use VRL in the following ways:

See VRL Function Reference for a full list of VRL built-in functions.

To get started, see the following example for a basic remap transform that contains a VRL program in the source field:

transforms:
  modify:
    type: remap
    inputs:
      - previous_component_id
    source: |2
        del(.user_info)
        .timestamp = now()
[transforms.modify]
type = "remap"
inputs = ["previous_component_id"]
source = '''
  del(.user_info)
  .timestamp = now()
'''
{
  "transforms": {
    "modify": {
      "type": "remap",
      "inputs": [
        "previous_component_id"
      ],
      "source": "  del(.user_info)\n  .timestamp = now()\n"
    }
  }
}

In this example, the type field is set to a remap transform. The inputs field defines where it receives events from the previously defined previous_component_id source. The first line in the source field deletes the .user_info field. At scale, dropping fields is particularly useful for reducing the payload of your events and cutting down on spend for your downstream services.

The second line adds the .timestamp field and the value to the event, changing the content of every event that passes through this transform.

See VRL References and Configurations for more information.

Parse data

Parsing showcases more advanced use cases of VRL. The below snippet is an HTTP log event in JSON format:

"{\"status\":200,\"timestamp\":\"2021-03-01T19:19:24.646170Z\",\"message\":\"SUCCESS\",\"username\":\"ub40fan4life\"}"

The configuration below uses VRL to modify the log event by:

  • Parsing the raw string into JSON.
  • Reformatting the time into a UNIX timestamp.
  • Removing the username field.
  • Converting the message to lowercase.
transforms:
  parse_syslog_id:
    type: remap
    inputs:
      - previous_component_id
    source: |2
         . = parse_json!(string!(.message))
         .timestamp = to_unix_timestamp(to_timestamp!(.timestamp))
         del(.username)
         .message = downcase(string!(.message))
[transforms.parse_syslog_id]
type = "remap"
inputs = ["previous_component_id"]
source = '''
   . = parse_json!(string!(.message))
   .timestamp = to_unix_timestamp(to_timestamp!(.timestamp))
   del(.username)
   .message = downcase(string!(.message))
'''
{
  "transforms": {
    "parse_syslog_id": {
      "type": "remap",
      "inputs": [
        "previous_component_id"
      ],
      "source": "   . = parse_json!(string!(.message))\n   .timestamp = to_unix_timestamp(to_timestamp!(.timestamp))\n   del(.username)\n   .message = downcase(string!(.message))\n"
    }
  }
}

This configuration returns the following:

{
  "message": "success",
  "status": 200,
  "timestamp": 1614626364
}

Sample, reduce, filter, and aggregate data

Sampling, reducing, filtering, and aggregating are common transforms to reduce the volume of observability data delivered to downstream services. Observability Pipelines offers a variety of ways to control your data volume:

See Control Log Volume and Size for examples on how to use these transforms.

Route data

Another commonly used transform is route, which allows you to split a stream of events into multiple substreams based on supplied conditions. This is useful when you need to send observability data to different destinations or operate differently on streams of data based on their use case.

The below snippet is an example log that you want to route to different destinations based on the value of the level field:

{
  "logs": {
    "kind": "absolute",
    "level": "info,
    "name": "memory_available_bytes",
    "namespace": "host",
    "tags": {}
  }
}

To route based on the level value, see the below configuration example:

transforms:
  splitting_logs_id:
    type: route
    inputs:
      - my-source-or-transform-id
    route:
      debug: .level == "debug"
      info: .level == "info"
      warn: .level == "warn"
      error: .level == "error"
[transforms.splitting_logs_id]
type = "route"
inputs = [ "my-source-or-transform-id" ]

  [transforms.splitting_logs_id.route]
  debug = '.level == "debug"'
  info = '.level == "info"'
  warn = '.level == "warn"'
  error = '.level == "error"'
{
  "transforms": {
    "my_transform_id": {
      "type": "route",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "route": {
        "debug": ".level == \"debug\"",
        "info": ".level == \"info\"",
        "warn": ".level == \"warn\"",
        "error": ".level == \"error\""
      }
    }
  }
}

Each row under the route field defines a route identifier, followed by a logical condition representing the filter of the route. The end result of this route can then be referenced as an input by other components with the name <transform_name>.<route_id>.

For example, if you wish to route logs with level field values of warn and error to Datadog, see the following example:

sinks:
  my_sink_id:
    type: datadog_logs
    inputs:
      - splitting_logs_id.warn
      - splitting_logs_id.error
    default_api_key: '${DATADOG_API_KEY_ENV_VAR}'
    compression: gzip
[sinks.my_sink_id]
type = "datadog_logs"
inputs = [ "splitting_logs_id.warn", "splitting_logs_id.error" ]
default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
compression = "gzip"
{
  "sinks": {
    "my_sink_id": {
      "type": "datadog_logs",
      "inputs": [
        "splitting_logs_id.warn",
        "splitting_logs_id.error"
      ],
      "default_api_key": "${DATADOG_API_KEY_ENV_VAR}",
      "compression": "gzip"
    }
  }
}

See the Route Transform documentation for more information.

Throttle data

Downstream services can sometimes get overwhelmed when there is a spike in volume, which can lead to data being dropped. Use the throttle transform to safeguard against this scenario and also enforce usage quotas on users. The throttle transform rate limits logs passing through a topology. See the following example of a throttle transform configuration:

transforms:
  my_transform_id:
    type: throttle
    inputs:
      - my-source-or-transform-id
    exclude: null
    threshold: 100
    window_secs: 1
[transforms.my_transform_id]
type = "throttle"
inputs = [ "my-source-or-transform-id" ]
threshold = 100
window_secs = 1
{
  "transforms": {
    "my_transform_id": {
      "type": "throttle",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "exclude": null,
      "threshold": 100,
      "window_secs": 1
    }
  }
}

The threshold field defines the number of events allowed for a given bucket. window_secs defines the time frame in which the configured threshold is applied. In the example configuration, when the component receives more than 100 events in a span of 1 second, any additional events are dropped.

Further Reading