概要

観測可能性パイプラインは観測可能性データを整形し、変換することができます。Logging without Limits™ パイプラインと同様に、一連の transform コンポーネントで構成される観測可能性パイプラインのパイプラインを構成することができます。これらの変換により、ビルトインの型安全性でデータの解析、構造化、リッチ化を行うことができます。

VRL でデータをリマップする

Vector Remap Language (VRL) は、観測可能性データ (ログやメトリクス) を変換するために設計された、式指向のドメイン特化型言語です。観測可能性のユースケースに合わせたシンプルな構文と組み込み関数を特徴としています。

Vector Remap Language は、remap 変換でサポートされています。

リマップ変換は単一のイベントに作用し、それらを変換したり、ルーティングやフィルターのための条件を指定するために使用できます。VRL は次のような方法で使用することができます。

VRL 組み込み関数の一覧は、VRL 関数リファレンスを参照してください。

まず始めに、source フィールドに VRL プログラムを含む基本的なリマップ変換の例を以下に示します。

transforms:
  modify:
    type: remap
    inputs:
      - previous_component_id
    source: |2
        del(.user_info)
        .timestamp = now()
[transforms.modify]
type = "remap"
inputs = ["previous_component_id"]
source = '''
  del(.user_info)
  .timestamp = now()
'''
{
  "transforms": {
    "modify": {
      "type": "remap",
      "inputs": [
        "previous_component_id"
      ],
      "source": "  del(.user_info)\n  .timestamp = now()\n"
    }
  }
}

この例では、type フィールドに remap 変換が設定されています。inputs フィールドは、先に定義した previous_component_id ソースからイベントを受け取る場所を定義します。source フィールドの最初の行は、.user_info フィールドを削除します。規模が大きくなると、フィールドを削除することは、イベントのペイロードを減らし、下流のサービスにかける費用を削減するために特に有効です。

2 行目は .timestamp フィールドとその値をイベントに追加し、この変換を通過するすべてのイベントのコンテンツを変更します。

詳しくは、VRL リファレンス構成をご覧ください。

データのパース

VRL の高度な利用例として、パース処理を紹介します。以下のスニペットは、JSON 形式の HTTP ログイベントです。

"{\"status\":200,\"timestamp\":\"2021-03-01T19:19:24.646170Z\",\"message\":\"SUCCESS\",\"username\":\"ub40fan4life\"}"

以下の構成では、VRL を使用して、以下を行うことでログイベントを変更します。

  • 生の文字列を JSON にパースする。
  • 時刻を UNIX タイムスタンプに再フォーマットする。
  • ユーザー名フィールドを削除する。
  • メッセージを小文字に変換する。
transforms:
  parse_syslog_id:
    type: remap
    inputs:
      - previous_component_id
    source: |2
         . = parse_json!(string!(.message))
         .timestamp = to_unix_timestamp(to_timestamp!(.timestamp))
         del(.username)
         .message = downcase(string!(.message))
[transforms.parse_syslog_id]
type = "remap"
inputs = ["previous_component_id"]
source = '''
   . = parse_json!(string!(.message))
   .timestamp = to_unix_timestamp(to_timestamp!(.timestamp))
   del(.username)
   .message = downcase(string!(.message))
'''
{
  "transforms": {
    "parse_syslog_id": {
      "type": "remap",
      "inputs": [
        "previous_component_id"
      ],
      "source": "   . = parse_json!(string!(.message))\n   .timestamp = to_unix_timestamp(to_timestamp!(.timestamp))\n   del(.username)\n   .message = downcase(string!(.message))\n"
    }
  }
}

この構成では、次のように返されます。

{
  "message": "success",
  "status": 200,
  "timestamp": 1614626364
}

データのサンプリング、削減、フィルター、集計

下流のサービスに配信される観測可能性データの量を減らすために、サンプリング、削減、フィルター、集計などの変換が一般的に行われています。観測可能性パイプラインは、データ量をコントロールするための様々な方法を提供します。

これらの変換の使用例については、ログ容量とサイズの制御を参照してください。

データのルーティング

もうひとつよく使われる変換に route があります。これは、指定した条件に基づいてイベントのストリームを複数のサブストリームに分割することができるものです。これは、観測可能性データを異なる宛先に送ったり、ユースケースに応じてデータのストリームを異なるように操作する必要がある場合に便利です。

以下のスニペットは、level フィールドの値に基づいて、異なる宛先にルーティングしたいログの例です。

{
  "logs": {
    "kind": "absolute",
    "level": "info,
    "name": "memory_available_bytes",
    "namespace": "host",
    "tags": {}
  }
}

level の値に基づいてルーティングを行うには、以下の構成例を参照してください。

transforms:
  splitting_logs_id:
    type: route
    inputs:
      - my-source-or-transform-id
    route:
      debug: .level == "debug"
      info: .level == "info"
      warn: .level == "warn"
      error: .level == "error"
[transforms.splitting_logs_id]
type = "route"
inputs = [ "my-source-or-transform-id" ]

  [transforms.splitting_logs_id.route]
  debug = '.level == "debug"'
  info = '.level == "info"'
  warn = '.level == "warn"'
  error = '.level == "error"'
{
  "transforms": {
    "my_transform_id": {
      "type": "route",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "route": {
        "debug": ".level == \"debug\"",
        "info": ".level == \"info\"",
        "warn": ".level == \"warn\"",
        "error": ".level == \"error\""
      }
    }
  }
}

route フィールドの各行では、ルート識別子を定義し、その後に route のフィルターを表す論理条件を定義します。この route の最終結果は、他のコンポーネントから <transform_name>.<route_id> という名前で入力として参照することができます。

例えば、level フィールドの値が warnerror のログを Datadog にルーティングしたい場合、以下の例を参照してください。

sinks:
  my_sink_id:
    type: datadog_logs
    inputs:
      - splitting_logs_id.warn
      - splitting_logs_id.error
    default_api_key: '${DATADOG_API_KEY_ENV_VAR}'
    compression: gzip
[sinks.my_sink_id]
type = "datadog_logs"
inputs = [ "splitting_logs_id.warn", "splitting_logs_id.error" ]
default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
compression = "gzip"
{
  "sinks": {
    "my_sink_id": {
      "type": "datadog_logs",
      "inputs": [
        "splitting_logs_id.warn",
        "splitting_logs_id.error"
      ],
      "default_api_key": "${DATADOG_API_KEY_ENV_VAR}",
      "compression": "gzip"
    }
  }
}

詳しくは、ルート変換のドキュメントをご覧ください。

データのスロットル

下流のサービスは、ボリュームが急増したときに圧倒されることがあり、その結果、データがドロップされることがあります。このシナリオから保護するために throttle 変換を使用し、ユーザーに使用量のクォータを強制します。throttle 変換は、トポロジーを通過するログをレート制限します。次の throttle 変換の構成例を参照してください。

transforms:
  my_transform_id:
    type: throttle
    inputs:
      - my-source-or-transform-id
    exclude: null
    threshold: 100
    window_secs: 1
[transforms.my_transform_id]
type = "throttle"
inputs = [ "my-source-or-transform-id" ]
threshold = 100
window_secs = 1
{
  "transforms": {
    "my_transform_id": {
      "type": "throttle",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "exclude": null,
      "threshold": 100,
      "window_secs": 1
    }
  }
}

threshold フィールドは、与えられたバケットに許可されるイベント数を定義します。window_secs は、設定されたしきい値が適用される時間枠を定義します。この構成例では、コンポーネントが 1 秒間に 100 個以上のイベントを受信すると、それ以降のイベントはすべて削除されます。

その他の参考資料