- 필수 기능
- 시작하기
- Glossary
- 표준 속성
- Guides
- Agent
- 통합
- 개방형텔레메트리
- 개발자
- Administrator's Guide
- API
- Datadog Mobile App
- CoScreen
- Cloudcraft
- 앱 내
- 서비스 관리
- 인프라스트럭처
- 애플리케이션 성능
- APM
- Continuous Profiler
- 스팬 시각화
- 데이터 스트림 모니터링
- 데이터 작업 모니터링
- 디지털 경험
- 소프트웨어 제공
- 보안
- AI Observability
- 로그 관리
- 관리
Because your log volume grows as your organization scales, the cost of ingesting and indexing in your downstream services (for example, log management solutions, SIEMs, and so forth) also rises. This guide walks you through using Observability Pipelines’ transforms to cut down on log volume and trim down the size of your logs to control your costs before data leaves your infrastructure or network.
In Observability Pipelines, a transform performs an action that modifies events, where events are logs flowing through the pipeline.
Use the dedupe transform to remove copies of data passing through your pipeline by adding the following component in your configuration.
transforms:
my_transform_id:
type: dedupe
inputs:
- my-source-or-transform-id
cache: null
fields: null
[transforms.my_transform_id]
type = "dedupe"
inputs = [ "my-source-or-transform-id" ]
{
"transforms": {
"my_transform_id": {
"type": "dedupe",
"inputs": [
"my-source-or-transform-id"
],
"cache": null,
"fields": null
}
}
}
The Observability Pipelines Worker assigns every event a unique identifier to track deduplicated events. The cache
option enables you to cache recent events to be used to check for duplicated data in the future, and defaults to 5,000 events. The fields
option lists which fields are used to determine if an event is a duplicate.
Use the filter transform when you want only certain logs that meet a specific criteria to pass through a component of your pipeline. For example, those conditions could be where the logs contain:
env
.status
field must be 400
.In those cases, insert a component that contains a filter transform to filter logs that uses the Datadog Processing Language (DPL) / Vector Remap Language (VRL) or Datadog Log Search syntax to set the conditions. Logs that don’t match the conditions are dropped.
The example below uses the filter transform and DPL/VRL to send only logs with a status
of 500
.
transforms:
my_transform_id:
type: filter
inputs:
- my-source-or-transform-id
condition:
type: "vrl"
source: ".status == 500"
[transforms.my_transform_id]
type = "filter"
inputs = [ "my-source-or-transform-id" ]
[transforms.my_transform_id.condition]
type = "vrl"
source = ".status == 500"
{
"transforms": {
"my_transform_id": {
"type": "filter",
"inputs": [
"my-source-or-transform-id"
],
"condition": {
"type": "vrl",
"source": ".status == 500"
}
}
}
}
When analyzing data that comes in large volumes or contains a lot of noise, such as CDN logs, sending all the logs to a destination is unnecessary. Instead, use the sample transform to send only the logs necessary to perform analysis that is statistically significant.
The exclude
field excludes events from being sampled, and also supports DPL/VRL or Datadog Log Search syntax. The example below shows a configuration that samples every 10 events, which is set by the rate
.
transforms:
my_transform_id:
type: sample
inputs:
- my-source-or-transform-id
exclude:
type: "datadog_search"
source: "*stack"
rate: 10
[transforms.my_transform_id]
type = "sample"
inputs = [ "my-source-or-transform-id" ]
rate = 10
[transforms.my_transform_id.exclude]
type = "datadog_search"
source = "*stack"
{
"transforms": {
"my_transform_id": {
"type": "sample",
"inputs": [
"my-source-or-transform-id"
],
"exclude": {
"type": "datadog_search",
"source": "*stack"
},
"rate": 10
}
}
}
In scenarios where you want to understand behavior over time, metrics around an event’s data point(s) would be more useful than a series of logs. As logs flow through your pipeline, use the log to metric transform to cut down on log volume by generating metrics based on specific tags.
You can generate four types of metrics:
The example below illustrates a configuration for generating a counter
metric , where metrics
defines the key/value pairs to be added to the event.
transforms:
my_transform_id:
type: log_to_metric
inputs:
- my-source-or-transform-id
metrics:
- type: counter
field: status
name: response_total
namespace: service
tags:
status: "{{status}}"
host: "{{host}}"
[transforms.my_transform_id]
type = "log_to_metric"
inputs = [ "my-source-or-transform-id" ]
[[transforms.my_transform_id.metrics]]
type = "counter"
field = "status"
name = "response_total"
namespace = "service"
[transforms.my_transform_id.metrics.tags]
status = "{{status}}"
host = "{{host}}"
{
"transforms": {
"my_transform_id": {
"type": "log_to_metric",
"inputs": [
"my-source-or-transform-id"
],
"metrics": [
{
"type": "counter",
"field": "status",
"name": "response_total",
"namespace": "service",
"tags": {
"status": "{{status}}",
"host": "{{host}}"
}
}
]
}
}
}
If the following log is passed through the configuration above:
{
"log": {
"host": "10.22.11.222",
"message": "Sent 200 in 54.2ms",
"status": 200
}
}
The following metric is generated:
{"metric":{"counter":{"value":1},"kind":"incremental","name":"response_total","namespace":"service","tags":{"host":"10.22.11.222","status":"200"}}}]
In some cases, multiple logs can be consolidated into a single log. Thus, another way to cut down on log volume is to merge multiple logs into a single log. Use the reduce transform to reduce multiple logs into one.
The example below uses a reduce transform configuration to merge multiple Ruby log exceptions events.
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
group_by:
- host
- pid
- tid
merge_strategies:
message: concat_newline
starts_when: match(string!(.message), r'^[^\\s]')
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
group_by = [ "host", "pid", "tid" ]
starts_when = "match(string!(.message), r'^[^\\s]')"
[transforms.my_transform_id.merge_strategies]
message = "concat_newline"
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
],
"group_by": [
"host",
"pid",
"tid"
],
"merge_strategies": {
"message": "concat_newline"
},
"starts_when": "match(string!(.message), r'^[^\\s]')"
}
}
}
In the reduce transform, group_by
is an ordered list of fields used to group events. In this example, the events are grouped by host
, pid
, and tid
fields.
merge_strategies
is a map of field names to custom merge strategies. There are different merge strategies, including array
, where each value is appended to an array, and sum
, which adds all numeric values. In this example, concat_newline
is used, where each string value is concatenated, then delimited by a newline.
starts_when
is a condition used to distinguish the first event of a transaction. If this condition resolves to true
for an event, the previous transaction is flushed without this event, and a new transaction is started. In this example, events with .message
that do not match the ^[^\\s]
regular expression condition are reduced into a single event.
If the following Ruby exception logs are passed through the configuration above:
[{"log":{
"host":"host-1.hostname.com",
"message":"foobar.rb:6:in `/': divided by 0(ZeroDivisionError)",
"pid":1234,
"tid":5678,
"timestamp":"2020-10-07T12:33:21.223543Z"}
},
{
"log":{
"host":"host-1.hostname.com",
"message":"from foobar.rb:6:in `bar'",
"pid":1234,
"tid":5678,
"timestamp":"2020-10-07T12:33:21.223543Z"}
},
{
"log":{
"host":"host-1.hostname.com",
"message":"from foobar.rb:2:in `foo'",
"pid":1234,
"tid":5678,
"timestamp":"2020-10-07T12:33:21.223543Z"}
},
{
"log":{
"host":"host-1.hostname.com",
"message":"from foobar.rb:9:in `\u003cmain\u003e'",
"pid":1234,"tid":5678,
"timestamp":"2020-10-07T12:33:21.223543Z"}
},
{
"log":{
"host":"host-1.hostname.com",
"message":"Hello world, I am a new log",
"pid":1234,
"tid":5678,
"timestamp":"2020-10-07T12:33:22.123528Z"
}}]
The following logs are generated:
[{
"log": {
"host":"host-1.hostname.com",
"message":"foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)\n
from foobar.rb:6:in `bar'\n
from foobar.rb:2:in `foo'\n
from foobar.rb:9:in `\u003cmain\u003e'",
"pid":1234,
"tid":5678,
"timestamp":"2020-10-07T12:33:21.223543Z"}
},
{
"log":{
"host":"host-1.hostname.com",
"message":"Hello world, I am a new log",
"pid":1234,
"tid":5678,
"timestamp":"2020-10-07T12:33:22.123528Z"
}}]
Logs can contain fields that are unnecessary. When processing terabytes of data a day, dropping fields that are superfluous can significantly reduce the total number of logs your destination ingests and indexes.
To remove unnecessary fields, use the DPL/VRL to remap your log data. The following example removes unnecessary tags using del
.
transforms:
my_transform_id:
type: remap
inputs:
- my-source-or-transform-id
source: |-
del(.unecessary_env_field)
del(.unecessary_service_field)
del(.unecessary_tag_field)
[transforms.my_transform_id]
type = "remap"
inputs = [ "my-source-or-transform-id" ]
source = """
del(.unecessary_env_field)
del(.unecessary_service_field)
del(.unecessary_tag_field)"""
{
"transforms": {
"my_transform_id": {
"type": "remap",
"inputs": [
"my-source-or-transform-id"
],
"source": "del(.unecessary_env_field)\ndel(.unecessary_service_field)\ndel(.unecessary_tag_field)"
}
}
}