Este producto no es compatible con el sitio Datadog seleccionado. ().
Esta página aún no está disponible en español. Estamos trabajando en su traducción.
Si tienes alguna pregunta o comentario sobre nuestro actual proyecto de traducción, no dudes en ponerte en contacto con nosotros.

Overview

Use Observability Pipelines’ Kafka destination to send logs to Kafka topics.

When to use this destination

Common scenarios when you might use this destination:

  • To route logs to the following destinations:
    • Clickhouse: An open-source column-oriented database management system used for analyzing large volumes of logs.
    • Snowflake: A data warehouse used for storage and query.
      • Snowflake’s API integration utilizes Kafka as a method to ingest logs into their platform.
    • Databricks: A data lakehouse for analytics and storage.
    • Azure Event Hub: An ingest and processing service in the Microsoft and Azure ecosystem.
  • To route data to Kafka and use the Kafka Connect ecosystem.
  • To process and normalize your data with Observability Pipelines before routing to Apache Spark with Kafka to analyze data and run machine learning workloads.

Setup

Set up the Kafka destination and its environment variables when you set up a pipeline. The information below is configured in the pipelines UI.

Set up the destination

  1. Enter the name of the topic you want to send logs to.
  2. In the Encoding dropdown menu, select either JSON or Raw message as the output format.

Optional settings

Enable TLS

Toggle the switch to enable TLS. The following certificate and key files are required.
Note: All file paths are made relative to the configuration data directory, which is /var/lib/observability-pipelines-worker/config/ by default. See Advanced Configurations for more information. The file must be owned by the observability-pipelines-worker group and observability-pipelines-worker user, or at least readable by the group or user.

  • Server Certificate Path: The path to the certificate file that has been signed by your Certificate Authority (CA) Root File in DER or PEM (X.509).
  • CA Certificate Path: The path to the certificate file that is your Certificate Authority (CA) Root File in DER or PEM (X.509).
  • Private Key Path: The path to the .key private key file that belongs to your Server Certificate Path in DER or PEM (PKCS#8) format.
Enable SASL authentication
  1. Toggle the switch to enable SASL Authentication.
  2. Select the mechanism (PLAIN, SCHRAM-SHA-256, or SCHRAM-SHA-512) in the dropdown menu.
Enable compression
  1. Toggle switch to Enable Compression.
  2. In the Compression Algorithm dropdown menu, select a compression algorithm (gzip, zstd, lz4, or snappy).
  3. (Optional) Select a Compression Level in the dropdown menu. If the level is not specified, the algorithm’s default level is used.
Buffering options (Preview)

Toggle the switch to enable Buffering Options (PREVIEW indicates an early access version of a major product or feature that you can opt into before its official release.Glossary).
Note: Contact your account manager to request access to the Preview.

  • If disabled (default): Up to 500 events are buffered before flush.
  • If enabled:
    1. Select the buffer type you want to set.
      • Memory: Fast, limited by RAM
      • Buffer size: Durable, survives restarts
    2. Enter the buffer size and select the unit.
      • Maximum capacity in MB or GB.
Advanced options

Click Advanced if you want to set any of the following fields:

  1. Message Key Field: Specify which log field contains the message key for partitioning, grouping, and ordering.
  2. Headers Key: Specify which log field contains your Kafka headers. If left blank, no headers are written.
  3. Message Timeout (ms): Local message timeout, in milliseconds. Default is 300,000 ms.
  4. Socket Timeout (ms): Default timeout, in milliseconds, for network requests. Default is 60,000 ms.
  5. Rate Limit Events: The maximum number of requests the Kafka client can send within the rate limit time window. Default is no rate limit.
  6. Rate Limit Time Window (secs): The time window used for the rate limit option.
    • This setting has no effect if the rate limit for events is not set.
    • Default is 1 second if Rate Limit Events is set, but Rate Limit Time Window is not set.
  7. To add additional librdkafka options, click Add Option and select an option in the dropdown menu.
    1. Enter a value for that option.
    2. Check your values against the librdkafka documentation to make sure they have the correct type and are within the set range.
    3. Click Add Option to add another librdkafka option.

Set environment variables

Kafka bootstrap servers

  • The host and port of the Kafka bootstrap servers.
  • This is the bootstrap server that the client uses to connect to the Kafka cluster and discover all the other hosts in the cluster. The host and port must be entered in the format of host:port, such as 10.14.22.123:9092. If there is more than one server, use commas to separate them.
  • Stored as the environment variable: DD_OP_DESTINATION_KAFKA_BOOTSTRAP_SERVERS.

TLS (when enabled)

  • If TLS is enabled, the Kafka TLS passphrase is needed.
  • Stored as the environment variable: DD_OP_DESTINATION_KAFKA_KEY_PASS.

SASL (when enabled)

  • Kafka SASL username
    • Stored as the environment variable: DD_OP_DESTINATION_KAFKA_SASL_USERNAME.
  • Kafka SASL password
    • Stored as the environment variable: DD_OP_DESTINATION_KAFKA_SASL_PASSWORD.

librdkafka options

These are the available librdkafka options:

  • client.id
  • queue.buffering.max_messages
  • transactional.id
  • enable.idempotence
  • acks

See the librdkafka documentation for more information and to ensure your values have the correct type and are within range.

How the destination works

See the Observability Pipelines Metrics for a full list of available health metrics.

Worker health metrics

Component metrics

Monitor the health of your destination with the following key metrics:

pipelines.component_sent_events_total
Events successfully delivered.
pipelines.component_discarded_events_total
Events dropped.
pipelines.component_errors_total
Errors in the destination component.
pipelines.component_sent_events_bytes_total
Total event bytes sent.
pipelines.utilization
Worker resource usage.

Buffer metrics (when buffering is enabled)

Track buffer behavior with these additional metrics:

pipelines.buffer_events
Number of events currently in the buffer.
pipelines.buffer_byte_size
Current buffer size in bytes.
pipelines.buffer_received_events_total
Total events added to the buffer.
pipelines.buffer_received_event_bytes_total
Total bytes added to the buffer.
pipelines.buffer_sent_events_total
Total events successfully flushed from the buffer.
pipelines.buffer_sent_event_bytes_total
Total bytes successfully flushed from the buffer.
pipelines.buffer_discarded_events_total
Events discarded from the buffer (for example, due to overflow).

Event batching

A batch of events is flushed when one of these parameters is met. See event batching for more information.

Max EventsMax BytesTimeout (seconds)
10,0001,000,0001