Observability Pipelines is not available on the US1-FED Datadog site.
Overview
Observability Pipelines enables you to shape and transform observability data. Similar to Logging without Limits™ pipelines, you can configure pipelines for Observability Pipelines that are composed of a series of transform
components. These transforms allow you to parse, structure, and enrich data with built-in type safety.
Remap data with VRL
Vector Remap Language (VRL) is an expression-oriented, domain specific language designed for transforming observability data (logs and metrics). It features a simple syntax and built-in functions tailored to observability use cases.
Vector Remap Language is supported in the remap
transform.
Remap transforms act on a single event and can be used to transform them or specify conditions for routing and filtering. You can use VRL in the following ways:
- Manipulate arrays, strings, and other data types.
- Encode and decode values using Codec.
- Encrypt and decrypt values.
- Coerce one datatype to another datatype (for example, from an integer to a string).
- Convert syslog values to read-able values.
- Enrich values by using enrichment tables.
- Manipulate IP values.
- Parse values with custom rules (for example, grok, regex, and so on) and out-of-the-box functions (for example, syslog, apache, VPC flow logs, and so on).
- Manipulate event metadata and paths.
See VRL Function Reference for a full list of VRL built-in functions.
To get started, see the following example for a basic remap transform that contains a VRL program in the source
field:
transforms:
modify:
type: remap
inputs:
- previous_component_id
source: |2
del(.user_info)
.timestamp = now()
[transforms.modify]
type = "remap"
inputs = ["previous_component_id"]
source = '''
del(.user_info)
.timestamp = now()
'''
{
"transforms": {
"modify": {
"type": "remap",
"inputs": [
"previous_component_id"
],
"source": " del(.user_info)\n .timestamp = now()\n"
}
}
}
In this example, the type
field is set to a remap
transform. The inputs
field defines where it receives events from the previously defined previous_component_id
source. The first line in the source
field deletes the .user_info
field. At scale, dropping fields is particularly useful for reducing the payload of your events and cutting down on spend for your downstream services.
The second line adds the .timestamp
field and the value to the event, changing the content of every event that passes through this transform.
See VRL References and Configurations for more information.
Parse data
Parsing showcases more advanced use cases of VRL. The below snippet is an HTTP log event in JSON format:
"{\"status\":200,\"timestamp\":\"2021-03-01T19:19:24.646170Z\",\"message\":\"SUCCESS\",\"username\":\"ub40fan4life\"}"
The configuration below uses VRL to modify the log event by:
- Parsing the raw string into JSON.
- Reformatting the time into a UNIX timestamp.
- Removing the username field.
- Converting the message to lowercase.
transforms:
parse_syslog_id:
type: remap
inputs:
- previous_component_id
source: |2
. = parse_json!(string!(.message))
.timestamp = to_unix_timestamp(to_timestamp!(.timestamp))
del(.username)
.message = downcase(string!(.message))
[transforms.parse_syslog_id]
type = "remap"
inputs = ["previous_component_id"]
source = '''
. = parse_json!(string!(.message))
.timestamp = to_unix_timestamp(to_timestamp!(.timestamp))
del(.username)
.message = downcase(string!(.message))
'''
{
"transforms": {
"parse_syslog_id": {
"type": "remap",
"inputs": [
"previous_component_id"
],
"source": " . = parse_json!(string!(.message))\n .timestamp = to_unix_timestamp(to_timestamp!(.timestamp))\n del(.username)\n .message = downcase(string!(.message))\n"
}
}
}
This configuration returns the following:
{
"message": "success",
"status": 200,
"timestamp": 1614626364
}
Sample, reduce, filter, and aggregate data
Sampling, reducing, filtering, and aggregating are common transforms to reduce the volume of observability data delivered to downstream services. Observability Pipelines offers a variety of ways to control your data volume:
See Control Log Volume and Size for examples on how to use these transforms.
Route data
Another commonly used transform is route
, which allows you to split a stream of events into multiple substreams based on supplied conditions. This is useful when you need to send observability data to different destinations or operate differently on streams of data based on their use case.
The below snippet is an example log that you want to route to different destinations based on the value of the level
field:
{
"logs": {
"kind": "absolute",
"level": "info,
"name": "memory_available_bytes",
"namespace": "host",
"tags": {}
}
}
To route based on the level
value, see the below configuration example:
transforms:
splitting_logs_id:
type: route
inputs:
- my-source-or-transform-id
route:
debug: .level == "debug"
info: .level == "info"
warn: .level == "warn"
error: .level == "error"
[transforms.splitting_logs_id]
type = "route"
inputs = [ "my-source-or-transform-id" ]
[transforms.splitting_logs_id.route]
debug = '.level == "debug"'
info = '.level == "info"'
warn = '.level == "warn"'
error = '.level == "error"'
{
"transforms": {
"my_transform_id": {
"type": "route",
"inputs": [
"my-source-or-transform-id"
],
"route": {
"debug": ".level == \"debug\"",
"info": ".level == \"info\"",
"warn": ".level == \"warn\"",
"error": ".level == \"error\""
}
}
}
}
Each row under the route
field defines a route identifier, followed by a logical condition representing the filter of the route
. The end result of this route
can then be referenced as an input by other components with the name <transform_name>.<route_id>
.
For example, if you wish to route logs with level
field values of warn
and error
to Datadog, see the following example:
sinks:
my_sink_id:
type: datadog_logs
inputs:
- splitting_logs_id.warn
- splitting_logs_id.error
default_api_key: '${DATADOG_API_KEY_ENV_VAR}'
compression: gzip
[sinks.my_sink_id]
type = "datadog_logs"
inputs = [ "splitting_logs_id.warn", "splitting_logs_id.error" ]
default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
compression = "gzip"
{
"sinks": {
"my_sink_id": {
"type": "datadog_logs",
"inputs": [
"splitting_logs_id.warn",
"splitting_logs_id.error"
],
"default_api_key": "${DATADOG_API_KEY_ENV_VAR}",
"compression": "gzip"
}
}
}
See the Route Transform documentation for more information.
Throttle data
Downstream services can sometimes get overwhelmed when there is a spike in volume, which can lead to data being dropped. Use the throttle
transform to safeguard against this scenario and also enforce usage quotas on users. The throttle
transform rate limits logs passing through a topology. See the following example of a throttle
transform configuration:
transforms:
my_transform_id:
type: throttle
inputs:
- my-source-or-transform-id
exclude: null
threshold: 100
window_secs: 1
[transforms.my_transform_id]
type = "throttle"
inputs = [ "my-source-or-transform-id" ]
threshold = 100
window_secs = 1
{
"transforms": {
"my_transform_id": {
"type": "throttle",
"inputs": [
"my-source-or-transform-id"
],
"exclude": null,
"threshold": 100,
"window_secs": 1
}
}
}
The threshold
field defines the number of events allowed for a given bucket. window_secs
defines the time frame in which the configured threshold is applied. In the example configuration, when the component receives more than 100 events in a span of 1 second, any additional events are dropped.
Further Reading
Additional helpful documentation, links, and articles: