このページは日本語には対応しておりません。随時翻訳に取り組んでいます。翻訳に関してご質問やご意見ございましたら、お気軽にご連絡ください。

Overview

Configure Logstash to send logs to the Observability Pipelines Worker so that you can generate metrics from those logs.

The log sources, processors, and destinations available for this use case

This document walks you through the following steps:

  1. The prerequisites needed to set up Observability Pipelines
  2. Setting up Observability Pipelines
  3. Sending logs to the Observability Pipelines Worker

Prerequisites

To use Observability Pipelines’ Logstash source, you need the following information available:

  • Logstash address, such as 0.0.0.0:8088. The Observability Pipelines Worker listens on this bind address to receive logs from your applications. Later on, you configure your applications to send logs to this address.
  • The appropriate TLS certificates and the password you used to create your private key, if your forwarders are globally configured to enable SSL.

Set up Observability Pipelines

  1. Navigate to Observability Pipelines.
  2. Select the Generate Metrics template to create a new pipeline.
  3. Select the Logstash source.

Set up the source

Optionally, toggle the switch to enable TLS. If you enable TLS, the following certificate and key files are required:

  • Server Certificate Path: The path to the certificate file that has been signed by your Certificate Authority (CA) Root File in DER or PEM (X.509).
  • CA Certificate Path: The path to the certificate file that is your Certificate Authority (CA) Root File in DER or PEM (X.509).
  • Private Key Path: The path to the .key private key file that belongs to your Server Certificate Path in DER or PEM (PKCS #8) format.

Set up the destinations

Enter the following information based on your selected logs destination.

Datadog の宛先に関して必要な構成手順はありません。

  • Splunk HEC アドレス:
    • Observability Pipelines Worker がリッスンして、本来 Splunk インデクサー向けであるログを受信するバインドアドレスです。例えば、0.0.0.0:8088
      : /services/collector/event は自動的にエンドポイントに付加されます。
    • 環境変数 DD_OP_SOURCE_SPLUNK_HEC_ADDRESS に格納されます。

以下のフィールドはオプションです。

  1. Encoding ドロップダウンメニューで、パイプラインの出力を JSONLogfmt、または Raw テキストでエンコードするかを選択します。デコードが選択されていない場合、デフォルトで JSON にデコードされます。
  2. Sumo Logic コレクターのソースに設定されたデフォルトの name 値を上書きするには、source name を入力します。
  3. Sumo Logic コレクターのソースに設定されたデフォルトの host 値を上書きするには、host name を入力します。
  4. Sumo Logic コレクターのソースに設定されたデフォルトの category 値を上書きするには、category name を入力します。
  5. カスタムヘッダーフィールドと値を追加するには、Add Header をクリックします。
The rsyslog and syslog-ng destinations support the RFC5424 format.

The rsyslog and syslog-ng destinations match these log fields to the following Syslog fields:

Log EventSYSLOG FIELDDefault
log[“message”]MESSAGENIL
log[“procid”]PROCIDThe running Worker’s process ID.
log[“appname”]APP-NAMEobservability_pipelines
log[“facility”]FACILITY8 (log_user)
log[“msgid”]MSGIDNIL
log[“severity”]SEVERITYinfo
log[“host”]HOSTNAMENIL
log[“timestamp”]TIMESTAMPCurrent UTC time.

The following destination settings are optional:

  1. Toggle the switch to enable TLS. If you enable TLS, the following certificate and key files are required:
    • Server Certificate Path: The path to the certificate file that has been signed by your Certificate Authority (CA) Root File in DER or PEM (X.509).
    • CA Certificate Path: The path to the certificate file that is your Certificate Authority (CA) Root File in DER or PEM (X.509).
    • Private Key Path: The path to the .key private key file that belongs to your Server Certificate Path in DER or PEM (PKCS#8) format.
  2. Enter the number of seconds to wait before sending TCP keepalive probes on an idle connection.

To authenticate the Observability Pipelines Worker for Google Chronicle, contact your Google Security Operations representative for a Google Developer Service Account Credential. This credential is a JSON file and must be placed under DD_OP_DATA_DIR/config. See Getting API authentication credential for more information.

To set up the Worker’s Google Chronicle destination:

  1. Enter the customer ID for your Google Chronicle instance.
  2. Enter the path to the credentials JSON file you downloaded earlier.
  3. Select JSON or Raw encoding in the dropdown menu.
  4. Select the appropriate Log Type in the dropdown menu.

Note: Logs sent to the Google Chronicle destination must have ingestion labels. For example, if the logs are from a A10 load balancer, it must have the ingestion label A10_LOAD_BALANCER. See Google Cloud’s Support log types with a default parser for a list of available log types and their respective ingestion labels.

The following fields are optional:

  1. Enter the name for the Elasticsearch index.
  2. Enter the Elasticsearch version.

Optionally, enter the name of the OpenSearch index.

  1. Optionally, enter the name of the Amazon OpenSearch index.
  2. Select an authentication strategy, Basic or AWS. For AWS, enter the AWS region.

Select the data center region (US or EU) of your New Relic account.

Set up processors

There are pre-selected processors added to your processor group out of the box. You can add additional processors or delete any existing ones based on your processing needs.

Processor groups are executed from top to bottom. The order of the processors is important because logs are checked by each processor, but only logs that match the processor’s filters are processed. To modify the order of the processors, use the drag handle on the top left corner of the processor you want to move.

Filter query syntax

Each processor has a corresponding filter query in their fields. Processors only process logs that match their filter query. And for all processors except the filter processor, logs that do not match the query are sent to the next step of the pipeline. For the filter processor, logs that do not match the query are dropped.

For any attribute, tag, or key:value pair that is not a reserved attribute, your query must start with @. Conversely, to filter reserved attributes, you do not need to append @ in front of your filter query.

For example, to filter out and drop status:info logs, your filter can be set as NOT (status:info). To filter out and drop system-status:info, your filter must be set as NOT (@system-status:info).

Filter query examples:

  • NOT (status:debug): This filters for only logs that do not have the status DEBUG.
  • status:ok service:flask-web-app: This filters for all logs with the status OK from your flask-web-app service.
    • This query can also be written as: status:ok AND service:flask-web-app.
  • host:COMP-A9JNGYK OR host:COMP-J58KAS: This filter query only matches logs from the labeled hosts.
  • @user.status:inactive: This filters for logs with the status inactive nested under the user attribute.

Learn more about writing filter queries in Datadog’s Log Search Syntax.

プロセッサを追加

使用するプロセッサの情報を入力します。 Add ボタンをクリックして、プロセッサを追加します。プロセッサを削除するには、プロセッサの右側にあるケバブメニューをクリックし、Delete を選択します。

利用可能なログプロセッサ

このプロセッサーは、指定されたフィルタークエリに一致するログをフィルタリングし、一致しないすべてのログを削除します。このプロセッサーでログが削除された場合、このプロセッサーより下位のプロセッサーはいずれも、そのログを受け取りません。このプロセッサーは、デバッグや警告などの不要なログを除外することができます。

フィルタープロセッサーをセットアップするには

  • フィルタークエリを定義します。クエリを指定すると、それに一致するログのみを通過させ、それ以外のログはすべて削除します。

The remap processor can add, drop, or rename fields within your individual log data. Use this processor to enrich your logs with additional context, remove low-value fields to reduce volume, and standardize naming across important attributes. Select add field, drop field, or rename field in the dropdown menu to get started.

Add field

Use add field to append a new key-value field to your log.

To set up the add field processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the field and value you want to add. To specify a nested field for your key, use the path notation: <OUTER_FIELD>.<INNER_FIELD>. All values are stored as strings. Note: If the field you want to add already exists, the Worker throws an error and the existing field remains unchanged.
Drop field

Use drop field to drop a field from logging data that matches the filter you specify below. It can delete objects, so you can use the processor to drop nested keys.

To set up the drop field processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the key of the field you want to drop. To specify a nested field for your specified key, use the path notation: <OUTER_FIELD>.<INNER_FIELD>. Note: If your specified key does not exist, your log will be unimpacted.
Rename field

Use rename field to rename a field within your log.

To set up the rename field processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the name of the field you want to rename in the Source field. To specify a nested field for your key, use the path notation: <OUTER_FIELD>.<INNER_FIELD>. Once renamed, your original field is deleted unless you enable the Preserve source tag checkbox described below.
    Note: If the source key you specify doesn’t exist, a default null value is applied to your target.
  3. In the Target field, enter the name you want the source field to be renamed to. To specify a nested field for your specified key, use the path notation: <OUTER_FIELD>.<INNER_FIELD>.
    Note: If the target field you specify already exists, the Worker throws an error and does not overwrite the existing target field.
  4. Optionally, check the Preserve source tag box if you want to retain the original source field and duplicate the information from your source key to your specified target key. If this box is not checked, the source key is dropped after it is renamed.
Path notation example

For the following message structure, use outer_key.inner_key.double_inner_key to refer to the key with the value double_inner_value.

{
    "outer_key": {
        "inner_key": "inner_value",
            "a": {
                    "double_inner_key": "double_inner_value",
                    "b": "b value"
                },
            "c": "c value"
        },
        "d": "d value"
    }

This processor samples your logging traffic for a representative subset at the rate that you define, dropping the remaining logs. As an example, you can use this processor to sample 20% of logs from a noisy non-critical service.

The sampling only applies to logs that match your filter query and does not impact other logs. If a log is dropped at this processor, none of the processors below receives that log.

To set up the sample processor:

  1. Define a filter query. Only logs that match the specified filter query are sampled at the specified retention rate below. The sampled logs and the logs that do not match the filter query are sent to the next step in the pipeline.
  2. Set the retain field with your desired sampling rate expressed as a percentage. For example, entering 2 means 2% of logs are retained out of all the logs that match the filter query.

This processor parses logs using the grok parsing rules that are available for a set of sources. The rules are automatically applied to logs based on the log source. Therefore, logs must have a source field with the source name. If this field is not added when the log is sent to the Observability Pipelines Worker, you can use the Add field processor to add it.

If the source field of a log matches one of the grok parsing rule sets, the log’s message field is checked against those rules. If a rule matches, the resulting parsed data is added in the message field as a JSON object, overwriting the original message.

If there isn’t a source field on the log, or no rule matches the log message, then no changes are made to the log and it is sent to the next step in the pipeline.

To set up the grok parser, define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they match the filter query, are sent to the next step in the pipeline.

To test log samples for out-of-the-box rules:

  1. Click the Preview Library Rules button.
  2. Search or select a source in the dropdown menu.
  3. Enter a log sample to test the parsing rules for that source.

To add a custom parsing rule:

  1. Click Add Custom Rule.
  2. If you want to clone a library rule, select Clone library rule and then the library source from the dropdown menu.
  3. If you want to create a custom rule, select Custom and then enter the source. The parsing rules are applied to logs with that source.
  4. Enter log samples to test the parsing rules.
  5. Enter the rules for parsing the logs. See Parsing for more information on writing parsing rules.
    Note: The url, useragent, and csv filters are not available.
  6. Click Advanced Settings if you want to add helper rules. See Using helper rules to factorize multiple parsing rules for more information.
  7. Click Add Rule.

クォータプロセッサは、指定したフィルターに一致するログのロギングトラフィックを測定します。構成された日次クォータが 24 時間のローリングウィンドウ内で達成されると、プロセッサは追加のログをドロップするか、Datadog モニターを使用してアラートを送信することができます。プロセッサは、総ボリュームまたはイベントの総数を追跡するように構成できます。パイプラインは、Worker の複数の Remote Configuration デプロイメント間でクォータを識別するために、クォータの名前を使用します。

例えば、このプロセッサを構成して、過去 24 時間以内に特定のサービスから 1000 万件のイベントを受信した後に、新しいログを削除するか、削除せずにアラートをトリガーするように構成できます。

クォータプロセッサを設定するには、次の手順に従ってください、

  1. クォータプロセッサの名前を入力します。
  2. フィルタークエリを定義します。指定したフィルタークエリに一致するログのみが日次制限にカウントされます。
    • クォータフィルターに一致し、かつ日次クォータ内のログは、パイプラインの次のステップに送信されます。
    • クォータフィルターに一致しないログも、パイプラインの次のステップに送信されます。
  3. Unit for quota (クォータの単位) ドロップダウンメニューで、クォータを Events の数か、バイト単位の Volume で測定するかを選択します。
  4. 日次クォータの上限を設定し、希望するクォータの大きさの単位を選択します。
  5. クォータが上限に達した場合にすべてのイベントを削除したい場合は、Drop events (イベントを削除) のチェックボックスをオンにします。クォータが上限に達したときにアラートを送信するモニターをセットアップする場合は、チェックを外しておきます。
    • 日次クォータの上限に達した後にクォータフィルターに一致するログが受信され、Drop events (イベントを削除) オプションが選択されている場合、それらのログは削除されます。この場合、フィルタークエリに一致しなかったログのみがパイプラインの次のステップに送信されます。
    • 日次クォータの上限に達した後でも Drop events (イベントを削除) オプションが選択されていない場合、クォータフィルターに一致するログと一致しなかったログが共にパイプラインの次のステップに送信されます。
  6. オプション: 特定のサービスまたはリージョンフィールドにクォータを設定したい場合は、Add Field をクリックします。 a. パーティション分割したいフィールド名を入力します。詳細はパーティションの例を参照してください。 i. クォータをパーティションに一致するイベントのみに適用したい場合は、Ignore when missing を選択します。詳細は「欠落時に無視」オプションの例を参照してください。 ii. オプション: パーティション化されたフィールドに異なるクォータを設定したい場合は、Overrides をクリックします。
  • CSV の構造例については、Download as CSV をクリックします。
  • オーバーライド CSV をドラッグアンドドロップしてアップロードします。または、Browse をクリックしてファイルを選択してアップロードすることもできます。詳細はオーバーライドの例を参照してください。 b. もう 1 つのパーティションを追加したい場合は、Add Field をクリックします。

パーティションの例

特定のサービスまたはリージョンにクォータを設定したい場合は、Partition by を使用します。例えば、1 日に 10 件のイベントのクォータを設定し、service フィールドでイベントをグループ化したい場合、servicePartition by フィールドに入力します。

「欠落時に無視」オプションの例

クォータをパーティションに一致するイベントのみに適用したい場合は、Ignore when missing を選択します。例えば、Worker が次のイベントセットを受信した場合:

{"service":"a", "source":"foo", "message": "..."}
{"service":"b", "source":"bar", "message": "..."}
{"service":"b", "message": "..."}
{"source":"redis", "message": "..."}
{"message": "..."}

そして Ignore when missing が選択されている場合、Worker は:

  • service:asource:foo を持つログのセットを作成します
  • service:bsource:bar を持つログのセットを作成します
  • 最後の 3 つのイベントを無視します

クォータは 2 つのログセットに適用され、最後の 3 つのイベントには適用されません。

Ignore when missing が選択されていない場合、クォータは 5 つのすべてのイベントに適用されます。

オーバーライドの例

service でパーティション分割し、2 つのサービス ab がある場合、オーバーライドを使用してそれぞれに異なるクォータを適用できます。例えば、service:a に 5,000 バイトのクォータ制限、service:b に 50 イベントの制限を設定したい場合、オーバーライドルールは次のようになります。

サービスタイプLimit
aBytes5,000
bイベント50

The reduce processor groups multiple log events into a single log, based on the fields specified and the merge strategies selected. Logs are grouped at 10-second intervals. After the interval has elapsed for the group, the reduced log for that group is sent to the next step in the pipeline.

To set up the reduce processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. Reduced logs and logs that do not match the filter query are sent to the next step in the pipeline.
  2. In the Group By section, enter the field you want to group the logs by.
  3. Click Add Group by Field to add additional fields.
  4. In the Merge Strategy section:
    • In On Field, enter the name of the field you want to merge the logs on.
    • Select the merge strategy in the Apply dropdown menu. This is the strategy used to combine events. See the following Merge strategies section for descriptions of the available strategies.
    • Click Add Merge Strategy to add additional strategies.
Merge strategies

These are the available merge strategies for combining log events.

NameDescription
ArrayAppends each value to an array.
ConcatConcatenates each string value, delimited with a space.
Concat newlineConcatenates each string value, delimited with a newline.
Concat rawConcatenates each string value, without a delimiter.
DiscardDiscards all values except the first value that was received.
Flat uniqueCreates a flattened array of all unique values that were received.
Longest arrayKeeps the longest array that was received.
MaxKeeps the maximum numeric value that was received.
MinKeeps the minimum numeric value that was received.
RetainDiscards all values except the last value that was received. Works as a way to coalesce by not retaining `null`.
Shortest arrayKeeps the shortest array that was received.
SumSums all numeric values that were received.

The deduplicate processor removes copies of data to reduce volume and noise. It caches 5,000 messages at a time and compares your incoming logs traffic against the cached messages. For example, this processor can be used to keep only unique warning logs in the case where multiple identical warning logs are sent in succession.

To set up the deduplicate processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. Deduped logs and logs that do not match the filter query are sent to the next step in the pipeline.
  2. In the Type of deduplication dropdown menu, select whether you want to Match on or Ignore the fields specified below.
    • If Match is selected, then after a log passes through, future logs that have the same values for all of the fields you specify below are removed.
    • If Ignore is selected, then after a log passes through, future logs that have the same values for all of their fields, except the ones you specify below, are removed.
  3. Enter the fields you want to match on, or ignore. At least one field is required, and you can specify a maximum of three fields.
    • Use the path notation <OUTER_FIELD>.<INNER_FIELD> to match subfields. See the Path notation example below.
  4. Click Add field to add additional fields you want to filter on.
Path notation example

For the following message structure, use outer_key.inner_key.double_inner_key to refer to the key with the value double_inner_value.

{
    "outer_key": {
        "inner_key": "inner_value",
            "a": {
                    "double_inner_key": "double_inner_value",
                    "b": "b value"
                },
            "c": "c value"
        },
        "d": "d value"
    }

Sensitive Data Scanner プロセッサはログをスキャンして、PII、PCI、カスタムの機密データなどの機密情報を検出し、マスキングまたはハッシュ化します。機密データをスキャンするには、当社のライブラリから定義済みのルールを選択するか、カスタムの正規表現ルールを入力できます。

Sensitive Data Scanner プロセッサをセットアップするには

  1. フィルタークエリを定義します。指定したフィルタークエリに一致するログのみがスキャンおよび処理されます。フィルタークエリに一致するかどうかにかかわらず、すべてのログはパイプラインの次のステップに送信されます。
  2. Add Scanning Rule をクリックします。
  3. スキャンルールに名前を付けます。
  4. Select scanning rule type フィールドで、ライブラリからルールを作成するか、カスタムルールを作成するかを選択します。
    • ライブラリからルールを作成する場合は、使用するライブラリパターンを選択します。
    • カスタムルールを作成する場合は、データに対して確認する正規表現パターンを入力します。
  5. Scan entire or part of event セクションで、ドロップダウンメニューの Entire Event (イベント全体)、Specific Attributes (特定の属性)、Exclude Attributes (属性の除外) からスキャン対象を選択します。
    • Specific Attributes (特定の属性) を選択した場合は、Add Field をクリックし、スキャンする特定の属性を入力します。最大 3 つのフィールドを追加できます。ネストされたキーにアクセスするには、パス記法 (outer_key.inner_key) を使用します。ネストされたデータを持つ特定の属性では、すべてのネストされたデータがスキャンされます。
    • Exclude Attributes (属性の除外) を選択した場合は、Add Field をクリックし、スキャンから除外する特定の属性を入力します。最大 3 つのフィールドを追加できます。ネストされたキーにアクセスするには、パス記法 (outer_key.inner_key) を使用します。ネストされたデータを持つ指定された属性については、すべてのネストされたデータが除外されます。
  6. Define action on match セクションで、一致した情報に対して実行するアクションを選択します。: マスキング、部分的なマスキング、およびハッシュ化はすべて元に戻せないアクションです。
    • 情報をマスキングする場合は、一致したデータを置き換えるテキストを指定します。
    • 情報を部分的にマスキングする場合は、マスキングする文字数を指定し、部分的なマスキングを一致したデータの先頭または末尾に適用するかどうかを指定します。
    • : ハッシュ化を選択した場合、一致した UTF-8 バイトは FarmHash の 64 ビットフィンガープリントでハッシュ化されます。
  7. オプションとして、正規表現に一致するすべてのイベントにタグを追加し、イベントのフィルタリング、分析、アラートを行うことができます。

This processor adds a field with the name of the host that sent the log. For example, hostname: 613e197f3526. Note: If the hostname already exists, the Worker throws an error and does not overwrite the existing hostname.

To set up this processor:

  • Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.

This processor converts the specified field into JSON objects.

To set up this processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the name of the field you want to parse JSON on.
    Note: The parsed JSON overwrites what was originally contained in the field.

Use this processor to enrich your logs with information from a reference table, which could be a local file or database.

To set up the enrichment table processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline.
  2. Enter the source attribute of the log. The source attribute’s value is what you want to find in the reference table.
  3. Enter the target attribute. The target attribute’s value stores, as a JSON object, the information found in the reference table.
  4. Select the type of reference table you want to use, File or GeoIP.
    • For the File type:
      1. Enter the file path.
      2. Enter the column name. The column name in the enrichment table is used for matching the source attribute value. See the Enrichment file example.
    • For the GeoIP type, enter the GeoIP path.
Enrichment file example

For this example, merchant_id is used as the source attribute and merchant_info as the target attribute.

This is the example reference table that the enrichment processor uses:

merch_idmerchant_namecitystate
803Andy’s OttomansBoiseIdaho
536Cindy’s CouchesBoulderColorado
235Debra’s BenchesLas VegasNevada

merch_id is set as the column name the processor uses to find the source attribute’s value. Note: The source attribute’s value does not have to match the column name.

If the enrichment processor receives a log with "merchant_id":"536":

  • The processor looks for the value 536 in the reference table’s merch_id column.
  • After it finds the value, it adds the entire row of information from the reference table to the merchant_info attribute as a JSON object:
merchant_info {
    "merchant_name":"Cindy's Couches",
    "city":"Boulder",
    "state":"Colorado"
}

Many types of logs are meant to be used for telemetry to track trends, such as KPIs, over long periods of time. Generating metrics from your logs is a cost-effective way to summarize log data from high-volume logs, such as CDN logs, VPC flow logs, firewall logs, and networks logs. Use the generate metrics processor to generate either a count metric of logs that match a query or a distribution metric of a numeric value contained in the logs, such as a request duration.

Note: The metrics generated are custom metrics and billed accordingly. See Custom Metrics Billing for more information.

To set up the processor:

Click Manage Metrics to create new metrics or edit existing metrics. This opens a side panel.

  • If you have not created any metrics yet, enter the metric parameters as described in the Add a metric section to create a metric.
  • If you have already created metrics, click on the metric’s row in the overview table to edit or delete it. Use the search bar to find a specific metric by its name, and then select the metric to edit or delete it. Click Add Metric to add another metric.
Add a metric
  1. Enter a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they match the filter query, are sent to the next step in the pipeline. Note: Since a single processor can generate multiple metrics, you can define a different filter query for each metric.
  2. Enter a name for the metric.
  3. In the Define parameters section, select the metric type (count, gauge, or distribution). See the Count metric example and Distribution metric example. Also see Metrics Types for more information.
    • For gauge and distribution metric types, select a log field which has a numeric (or parseable numeric string) value that is used for the value of the generated metric.
    • For the distribution metric type, the log field’s value can be an array of (parseable) numerics, which is used for the generated metric’s sample set.
    • The Group by field determines how the metric values are grouped together. For example, if you have hundreds of hosts spread across four regions, grouping by region allows you to graph one line for every region. The fields listed in the Group by setting are set as tags on the configured metric.
  4. Click Add Metric.
Metrics Types

You can generate these types of metrics for your logs. See the Metrics Types and Distributions documentation for more details.

Metric typeDescriptionExample
COUNTRepresents the total number of event occurrences in one time interval. This value can be reset to zero, but cannot be decreased.You want to count the number of logs with status:error.
GAUGERepresents a snapshot of events in one time interval.You want to measure the latest CPU utilization per host for all logs in the production environment.
DISTRIBUTIONRepresent the global statistical distribution of a set of values calculated across your entire distributed infrastructure in one time interval.You want to measure the average time it takes for an API call to be made.
Count metric example

For this status:error log example:

{"status": "error", "env": "prod", "host": "ip-172-25-222-111.ec2.internal"}

To create a count metric that counts the number of logs that contain "status":"error" and groups them by env and host, enter the following information:

Input parametersValue
Filter query@status:error
Metric namestatus_error_total
Metric typeCount
Group byenv, prod
Distribution metric example

For this example of an API response log:

{
    "timestamp": "2018-10-15T17:01:33Z",
    "method": "GET",
    "status": 200,
    "request_body": "{"information"}",
    "response_time_seconds: 10
}

To create a distribution metric that measures the average time it takes for an API call to be made, enter the following information:

Input parametersValue
Filter query@method
Metric namestatus_200_response
Metric typeDistribution
Select a log attributeresponse_time_seconds
Group bymethod

Use this processor to add a field name and value of an environment variable to the log message.

To set up this processor:

  1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they match the filter query, are sent to the next step in the pipeline.
  2. Enter the field name for the environment variable.
  3. Enter the environment variable name.
  4. Click Add Environment Variable if you want to add another environment variable.
Blocked environment variables

Environment variables that match any of the following patterns are blocked from being added to log messages because the environment variable could contain sensitive data.

  • CONNECTIONSTRING / CONNECTION-STRING / CONNECTION_STRING
  • AUTH
  • CERT
  • CLIENTID / CLIENT-ID / CLIENT_ID
  • CREDENTIALS
  • DATABASEURL / DATABASE-URL / DATABASE_URL
  • DBURL / DB-URL / DB_URL
  • KEY
  • OAUTH
  • PASSWORD
  • PWD
  • ROOT
  • SECRET
  • TOKEN
  • USER

The environment variable is matched to the pattern and not the literal word. For example, PASSWORD blocks environment variables like USER_PASSWORD and PASSWORD_SECRET from getting added to the log messages.

Install the Observability Pipelines Worker

  1. Select your platform in the Choose your installation platform dropdown menu.
  2. Enter the Logstash address and port, such as 0.0.0.0:9997. The Observability Pipelines Worker listens on this address for incoming log messages.
  3. Provide the environment variables for each of your selected destinations. See the prerequisites for more information.

    Datadog ログ管理で構成する環境変数はありません。

    Enter your Splunk HEC token and the base URL of the Splunk instance. See prerequisites for more information.

    The Worker passes the HEC token to the Splunk collection endpoint. After the Observability Pipelines Worker processes the logs, it sends the logs to the specified Splunk instance URL.

    Note: The Splunk HEC destination forwards all logs to the /services/collector/event endpoint regardless of whether you configure your Splunk HEC destination to encode your output in JSON or raw.

    Sumo Logic HTTP コレクター の URLを入力します。詳細は、前提条件を参照してください。

    Enter the rsyslog or syslog-ng endpoint URL. For example, 127.0.0.1:9997. The Observability Pipelines Worker sends logs to this address and port.

    Enter the Google Chronicle endpoint URL. For example, https://chronicle.googleapis.com.

    1. Enter the Elasticsearch authentication username.
    2. Enter the Elasticsearch authentication password.
    3. Enter the Elasticsearch endpoint URL. For example, http://CLUSTER_ID.LOCAL_HOST_IP.ip.es.io:9200.
    1. Enter the OpenSearch authentication username.
    2. Enter the OpenSearch authentication password.
    3. Enter the OpenSearch endpoint URL. For example, http://<hostname.IP>:9200.
    1. Enter the Amazon OpenSearch authentication username.
    2. Enter the Amazon OpenSearch authentication password.
    3. Enter the Amazon OpenSearch endpoint URL. For example, http://<hostname.IP>:9200.
    1. Enter your New Relic account ID.
    2. Enter your New Relic license key.
  4. Follow the instructions for your environment to install the Worker.
    1. Select API key をクリックして、使用する Datadog API キーを選択します。
    2. UI で提供されるコマンドを実行して Worker をインストールします。コマンドには、先ほど入力した環境変数が自動的に入力されます。
      docker run -i -e DD_API_KEY=<DATADOG_API_KEY> \
          -e DD_OP_PIPELINE_ID=<PIPELINE_ID> \
          -e DD_SITE=<DATADOG_SITE> \
          -e <SOURCE_ENV_VARIABLE> \
          -e <DESTINATION_ENV_VARIABLE> \
          -p 8088:8088 \
          datadog/observability-pipelines-worker run
      
      : デフォルトでは、docker run コマンドは Worker がリッスンしているのと同じポートを公開します。ワーカーのコンテナポートを Docker ホストの別のポートにマッピングしたい場合は、コマンドで -p | --publish オプションを使用します。
      -p 8282:8088 datadog/observability-pipelines-worker run
      
    3. Observability Pipelines のインストールページに戻り、Deploy をクリックします。

    パイプラインの構成を変更したい場合は、既存のパイプラインの更新を参照してください。

    1. Download the Helm chart values file. If you are not using a managed service such as Amazon EKS, Google GKE, or Azure AKS, see Self-hosted and self-managed Kubernetes clusters before continuing to the next step.
    2. Click Select API key to choose the Datadog API key you want to use.
    3. Add the Datadog chart repository to Helm:
      helm repo add datadog https://helm.datadoghq.com
      
      If you already have the Datadog chart repository, run the following command to make sure it is up to date:
      helm repo update
      
    4. Run the command provided in the UI to install the Worker. The command is automatically populated with the environment variables you entered earlier.
      helm upgrade --install opw \
      -f values.yaml \
      --set datadog.apiKey=<DATADOG_API_KEY> \
      --set datadog.pipelineId=<PIPELINE_ID> \
      --set <SOURCE_ENV_VARIABLES> \
      --set <DESTINATION_ENV_VARIABLES> \
      --set service.ports[0].protocol=TCP,service.ports[0].port=<SERVICE_PORT>,service.ports[0].targetPort=<TARGET_PORT> \
      datadog/observability-pipelines-worker
      
      Note: By default, the Kubernetes Service maps incoming port <SERVICE_PORT> to the port the Worker is listening on (<TARGET_PORT>). If you want to map the Worker’s pod port to a different incoming port of the Kubernetes Service, use the following service.ports[0].port and service.ports[0].targetPort values in the command:
      --set service.ports[0].protocol=TCP,service.ports[0].port=8088,service.ports[0].targetPort=8282
      
    5. Navigate back to the Observability Pipelines installation page and click Deploy.

    See Update Existing Pipelines if you want to make changes to your pipeline’s configuration.

    Self-hosted and self-managed Kubernetes clusters

    If you are running a self-hosted and self-managed Kubernetes cluster, and have defined zones with node labels using topology.kubernetes.io/zone, then you can use the Helm chart values file as is. However, if you are not using the label topology.kubernetes.io/zone, you need to update the topologyKey in the values.yaml file to match the key you are using. Or if you run your Kubernetes install without zones, remove the entire topology.kubernetes.io/zone section.

    1. Click Select API key to choose the Datadog API key you want to use.

    2. Run the one-step command provided in the UI to install the Worker.

      Note: The environment variables used by the Worker in /etc/default/observability-pipelines-worker are not updated on subsequent runs of the install script. If changes are needed, update the file manually and restart the Worker.

    If you prefer not to use the one-line installation script, follow these step-by-step instructions:

    1. Set up APT transport for downloading using HTTPS:
      sudo apt-get update
      sudo apt-get install apt-transport-https curl gnupg
      
    2. Run the following commands to set up the Datadog deb repo on your system and create a Datadog archive keyring:
      sudo sh -c "echo 'deb [signed-by=/usr/share/keyrings/datadog-archive-keyring.gpg] https://apt.datadoghq.com/ stable observability-pipelines-worker-2' > /etc/apt/sources.list.d/datadog-observability-pipelines-worker.list"
      sudo touch /usr/share/keyrings/datadog-archive-keyring.gpg
      sudo chmod a+r /usr/share/keyrings/datadog-archive-keyring.gpg
      curl https://keys.datadoghq.com/DATADOG_APT_KEY_CURRENT.public | sudo gpg --no-default-keyring --keyring /usr/share/keyrings/datadog-archive-keyring.gpg --import --batch
      curl https://keys.datadoghq.com/DATADOG_APT_KEY_06462314.public | sudo gpg --no-default-keyring --keyring /usr/share/keyrings/datadog-archive-keyring.gpg --import --batch
      curl https://keys.datadoghq.com/DATADOG_APT_KEY_F14F620E.public | sudo gpg --no-default-keyring --keyring /usr/share/keyrings/datadog-archive-keyring.gpg --import --batch
      curl https://keys.datadoghq.com/DATADOG_APT_KEY_C0962C7D.public | sudo gpg --no-default-keyring --keyring /usr/share/keyrings/datadog-archive-keyring.gpg --import --batch
      
    3. Run the following commands to update your local apt repo and install the Worker:
      sudo apt-get update
      sudo apt-get install observability-pipelines-worker datadog-signing-keys
      
    4. Add your keys, site (for example, datadoghq.com for US1), source, and destination environment variables to the Worker’s environment file:
      sudo cat &lt;<EOF > /etc/default/observability-pipelines-worker
      DD_API_KEY=<DATADOG_API_KEY>
      DD_OP_PIPELINE_ID=<PIPELINE_ID>
      DD_SITE=<DATADOG_SITE>
      <SOURCE_ENV_VARIABLES>
      <DESTINATION_ENV_VARIABLES>
      EOF
      
    5. Start the worker:
      sudo systemctl restart observability-pipelines-worker
      

    See Update Existing Pipelines if you want to make changes to your pipeline’s configuration.

    1. Click Select API key to choose the Datadog API key you want to use.

    2. Run the one-step command provided in the UI to install the Worker.

      Note: The environment variables used by the Worker in /etc/default/observability-pipelines-worker are not updated on subsequent runs of the install script. If changes are needed, update the file manually and restart the Worker.

    If you prefer not to use the one-line installation script, follow these step-by-step instructions:

    1. Set up the Datadog rpm repo on your system with the below command. Note: If you are running RHEL 8.1 or CentOS 8.1, use repo_gpgcheck=0 instead of repo_gpgcheck=1 in the configuration below.
      cat &lt;<EOF > /etc/yum.repos.d/datadog-observability-pipelines-worker.repo
      [observability-pipelines-worker]
      name = Observability Pipelines Worker
      baseurl = https://yum.datadoghq.com/stable/observability-pipelines-worker-2/\$basearch/
      enabled=1
      gpgcheck=1
      repo_gpgcheck=1
      gpgkey=https://keys.datadoghq.com/DATADOG_RPM_KEY_CURRENT.public
          https://keys.datadoghq.com/DATADOG_RPM_KEY_B01082D3.public
      EOF
      
    2. Update your packages and install the Worker:
      sudo yum makecache
      sudo yum install observability-pipelines-worker
      
    3. Add your keys, site (for example, datadoghq.com for US1), source, and destination environment variables to the Worker’s environment file:
      sudo cat &lt;&lt;-EOF > /etc/default/observability-pipelines-worker
      DD_API_KEY=<API_KEY>
      DD_OP_PIPELINE_ID=<PIPELINE_ID>
      DD_SITE=<SITE>
      <SOURCE_ENV_VARIABLES>
      <DESTINATION_ENV_VARIABLES>
      EOF
      
    4. Start the worker:
      sudo systemctl restart observability-pipelines-worker
      
    5. Navigate back to the Observability Pipelines installation page and click Deploy.

    See Update Existing Pipelines if you want to make changes to your pipeline’s configuration.

    1. ドロップダウンのオプションを 1 つ選択し、パイプラインで予想されるログの量を入力します。

      オプション説明
      Unsureログの量を予想できない場合、または Worker をテストしたい場合は、このオプションを使用します。このオプションは、最大 2 つの汎用 t4g.large インスタンスで EC2 オートスケーリンググループをプロビジョニングします。
      1-5 TB/dayこのオプションは、最大 2 つのコンピュート最適化インスタンス c6g.large で EC2 オートスケーリンググループをプロビジョニングします。
      5-10 TB/dayこのオプションは、最低 2 つ、最大 5 つのコンピュート最適化インスタンス c6g.large で EC2 オートスケーリンググループをプロビジョニングします。
      >10 TB/dayDatadog は大規模な本番デプロイでこのオプションを推奨しています。このオプションは、最低 2 つ、最大 10 個のコンピュート最適化インスタンス c6g.xlarge で EC2 オートスケーリンググループをプロビジョニングします。

      : その他のパラメーターは、すべて Worker デプロイメントに適したデフォルト値に設定されていますが、スタックを作成する前に AWS コンソールで必要に応じてユースケースに合わせて調整できます。

    2. Worker のインストールに使用する AWS リージョンを選択します。

    3. Select API key をクリックして、使用する Datadog API キーを選択します。

    4. Launch CloudFormation Template をクリックして AWS コンソールに移動し、スタックの構成を確認してから起動します。CloudFormation パラメーターが想定通りであることを確認してください。

    5. Worker のインストールに使用する VPC とサブネットを選択します。

    6. IAM の必要な権限のチェックボックスを見直して確認します。Submit をクリックしてスタックを作成します。ここでは、CloudFormation がインストールを処理し、Worker インスタンスが起動され、必要なソフトウェアがダウンロードされ、Worker が自動的に開始します。

    7. Observability Pipelines のインストールページに戻り、Deploy をクリックします。

    パイプラインの構成を変更したい場合は、既存のパイプラインの更新を参照してください。

Send logs to the Observability Pipelines Worker over Logstash

To configure Logstash to send logs to the Observability Pipelines Worker, use the following output configuration:

output {
	lumberjack {
		# update these to point to your Observability Pipelines Worker
		hosts => ["127.0.0.1"]
		port => 5044
		ssl_certificate => "/path/to/certificate.crt"
	}
}

Note: Logstash requires SSL to be configured.