WarpStream

Supported OS Linux Windows Mac OS

Integration version1.0.0

Overview

WarpStream is a data streaming platform compatible with Apache Kafka®, designed to run directly on object storage. This integration provides visibility into WarpStream agent performance by exposing key metrics, helping users monitor both health and performance.

Setup

Installation

Install the Datadog Agent version >=6.17 or >=7.17.

Configuration

Complete all of the following steps to ensure the WarpStream integration works properly.

There are two parts of the WarpStream integration:

  • The Datadog Agent portion, which makes requests to a provided endpoint for the WarpStream agent to report whether it can connect and is healthy.

  • The WarpStream StatsD portion, where the WarpStream Agent can be configured to send metrics to the Datadog Agent.

The WarpStream integration’s metrics come from both the Agent and StatsD portions.

Configure Datadog Agent WarpStream integration

Configure the WarpStream check included in the Datadog Agent package to collect health metrics and service checks. This can be done by editing the url within the warpstream.d/conf.yaml file, in the conf.d/ folder at the root of your Agent’s configuration directory, to start collecting your WarpStream service checks. See the sample warpstream.d/conf.yaml for all available configuration options.

Ensure that url matches your WarpStream Agent HTTP server (port 8080 by default).

Connect WarpStream Agent to DogStatsD

Start the agent with the flag -enableDatadogMetrics or set the environment variable WARPSTREAM_ENABLE_DATADOG_METRICS to true.

Restart Datadog Agent and WarpStream
  1. Restart the Agent.

  2. Restart the WarpStream agent to start sending your WarpStream metrics to the Agent DogStatsD endpoint.

Data Collected

Metrics

warpstream.can_connect
(gauge)
1 if can connect to Warpstream Agent, otherwise 0
warpstream.agent_connection_throttle_count
(count)
number of new connections opened to the Warpstream Agent throttled
warpstream.agent_deadscanner_optimistic_queue_delete_file_outcome
(count)
incremented whenever the Warpstream Agent processed a file in its deadscanner optimistic queue, tagged by ‘outcome’
warpstream.agent_deadscanner_optimistic_queue_submit_outcome
(count)
incremented whenever the Warpstream Agent submitted a request to the deadscanner optimistic queue, tagged by ‘outcome’
warpstream.agent_deadscanner_outcomes
(count)
number of files processed by a deadscanner job execution, tagged by ‘outcome’
warpstream.agent_file_cache_client_fetch_local_or_remote_counter
(count)
incremented whenever a Warpstream Agent loaded a file from its file cache, tagged by ‘outcome’ and ‘source’
warpstream.agent_file_cache_client_fetch_local_or_remote_num_bytes_distribution
(gauge)
number of bytes loaded by a Warpstream Agent from its file cache, tagged by ‘outcome’ and ‘source’
Shown as byte
warpstream.agent_file_cache_client_get_stream_range_latency
(gauge)
latency to load a files to its cache from another Warpstream Agent, tagged by ‘outcome’
Shown as second
warpstream.agent_file_cache_client_get_stream_range_outcome
(count)
incremented for each request to load a file to its cache from another Warpstream Agent, tagged by ‘outcome’
warpstream.agent_file_cache_client_get_stream_range_partitioned_requests_counter
(count)
incremented by the number of partitioned requests to load a file to its cache from another Agent, tagged by ‘outcome’
warpstream.agent_file_cache_client_get_stream_range_partitioned_requests_distribution
(gauge)
distribution flavor of ‘warpstream.agent_file_cache_client_get_stream_range_partitioned_requests_counter’
warpstream.agent_file_cache_concurrent_fetches
(count)
incremented by 1 for each new request to fetch to a Warpstream Agent file cache
warpstream.agent_file_cache_context_remaining_timeout
(gauge)
timeout of a request to fetch to a Warpstream Agent file cache
Shown as second
warpstream.agent_file_cache_get_range_num_chunks
(gauge)
number of chunks read in a given fetch request to a Warpstream Agent file cache
warpstream.agent_file_cache_server_blob_store_fetcher_counter
(count)
incremented by 1 every time a Warpstream Agent fetched data from the object storage to fill its file cache
warpstream.agent_file_cache_server_blob_store_fetcher_num_bytes_counter
(count)
number of bytes fetched from the object storage to fill an Agent file cache
Shown as byte
warpstream.agent_file_cache_server_blob_store_fetcher_num_bytes_distribution
(gauge)
distribution of the number of bytes fetched from the object storage to fill an Agent file cache
Shown as byte
warpstream.agent_file_cache_server_fetch_size_counter
(count)
incremented by 1 every time there is a cache miss and a Warpstream Agent needs to issue a GET to the object storage to fill its file cache
warpstream.agent_file_cache_server_fetch_size_num_bytes_counter
(count)
number of bytes the Warpstream Agent had to GET from the object storage to fill its file cache because there is a cache miss
Shown as byte
warpstream.agent_file_cache_server_get_range_copy_chunk
(count)
incremented by 1 when a Warpstream Agent managed to read a chunk of data from its file cache
warpstream.agent_file_cache_server_get_range_copy_chunk_num_bytes_copied
(count)
number of bytes copied from a Warpstream Agent file cache
Shown as byte
warpstream.agent_file_cache_server_get_stream_range_latency
(gauge)
latency of q request to read data from the object storage so an Agent can fill its file cache
Shown as second
warpstream.agent_file_cache_server_get_stream_range_num_bytes_count
(count)
number of bytes read from the object storage so a Warpstream Agent can fill its file cache
Shown as byte
warpstream.agent_file_cache_server_get_stream_range_num_bytes_distribution
(gauge)
distribution of the number of bytes read from the object storage so an Agent can fill its file cache
Shown as byte
warpstream.agent_file_cache_server_get_stream_range_num_ranges_distribution
(gauge)
number of ranges fetched from the object storage so an Agent can fill its file cache
warpstream.agent_file_cache_server_get_stream_range_outcome
(count)
incremented by 1 every time a Warpstream Agent has to fetch multiple ranges from the object object storage to fill its faile cache
warpstream.agent_flushtuner_flush_size_rec
(gauge)
current value of the size autotuner used to flush data to the object storage when producing data
Shown as byte
warpstream.agent_job_runner_fetched_jobs_num_jobs
(gauge)
distribution of the number of jobs fetched when polling jobs from Warpstream control plane
warpstream.agent_job_runner_fetched_jobs_outcome
(count)
incremented by 1 every time a Warpstream Agent issued a poll request to the Warpstream control plane, tagged by ‘outcome’
warpstream.agent_job_runner_no_jobs_found
(count)
incremented by 1 every time a Warpstream Agent issued a poll request to the Warpstream control plane and did not get any
warpstream.agent_job_runner_num_running_jobs_gauge
(gauge)
number of jobs currently running on a given Warpstream Agent
warpstream.agent_job_runner_num_running_slots_distribution
(gauge)
distribution of the job slots currently running on a given Warpstream Agent
warpstream.agent_job_runner_num_running_slots_gauge
(gauge)
gauge of the job slots currently running on a given Warpstream Agent
warpstream.agent_job_runner_slots_utilization_distribution
(gauge)
distribution of the ratio (between 0 and 1) of the job slots utilization
warpstream.agent_job_runner_slots_utilization_gauge
(gauge)
distribution of the ratio (between 0 and 1) of the job slots utilization
warpstream.agent_kafka_fetch_bytes_max
(gauge)
distribution of the max bytes of Kafka fetch requests issued to a Warpstream Agent
Shown as byte
warpstream.agent_kafka_fetch_bytes_min
(gauge)
distribution of the min bytes of Kafka fetch requests issued to a Warpstream Agent
Shown as byte
warpstream.agent_kafka_fetch_bytes_remaining
(gauge)
distribution of the number of bytes remaining when processing Kafka fetch requests issued to a Warpstream Agent
Shown as byte
warpstream.agent_kafka_fetch_compressed_bytes
(gauge)
distribution of the number of compressed bytes read when processing Kafka fetch requests issued to a Warpstream Agent
Shown as byte
warpstream.agent_kafka_fetch_compressed_bytes_counter
(count)
number of compressed bytes read when processing Kafka fetch requests issued on a Warpstream Agent
Shown as byte
warpstream.agent_kafka_fetch_num_pointers_counter
(count)
number of pointers read when processing Kafka fetch requests issued to a Warpstream Agent
warpstream.agent_kafka_fetch_num_pointers_distribution
(gauge)
distribution of the number of pointers read when processing Kafka fetch requests issued to a Warpstream Agent
warpstream.agent_kafka_fetch_partial_response_error_scenario_counter
(count)
number of partial fetch requests processed by a Warpstream Agent
warpstream.agent_kafka_fetch_partial_response_error_scenario_num_bytes_distribution
(gauge)
distribution of the bytes in partial fetch requests processed by a Warpstream Agent
Shown as byte
warpstream.agent_kafka_fetch_partitions_count
(count)
number of partitions read when processing Kafka fetch requests issued to a Warpstream Agent
warpstream.agent_kafka_fetch_prefetch_concurrency_distribution
(gauge)
distribution of the concurrency used to prefetch data when processing Kafka fetch requests issued on an Agent
warpstream.agent_kafka_fetch_request_latency
(gauge)
latency of Kafka fetch requests issued to a Warpstream Agent
Shown as second
warpstream.agent_kafka_fetch_single_attempt_latency
(gauge)
latency of a single attempt to process Kafka fetch requests issued to a Warpstream Agent
Shown as second
warpstream.agent_kafka_fetch_single_attempt_outcome
(count)
number of attempts to process Kafka fetch requests issued to a Warpstream Agent
warpstream.agent_kafka_fetch_topics_count
(count)
number of topics read when processing Kafka fetch requests issued to a Warpstream Agent
warpstream.agent_kafka_fetch_uncompressed_bytes
(gauge)
distribution of the number of uncompressed bytes read when processing Kafka fetch requests issued to an Agent
Shown as byte
warpstream.agent_kafka_fetch_uncompressed_bytes_counter
(count)
number of uncompressed bytes read when processing Kafka fetch requests issued on an Agent
Shown as byte
warpstream.agent_kafka_inflight_connections
(gauge)
gauge of the nuber of inflight Kafka connections opened to a Warpstream Agent
warpstream.agent_kafka_inflight_request_per_connection
(gauge)
distribution of the number of requests per Kafka connections open to an Agent
warpstream.agent_kafka_inflight_request_per_connection_on_close
(count)
number of inflight requests when closing a connection
warpstream.agent_kafka_inflight_requests
(gauge)
gauge of the number of inflight Kafka requests issued to a Warpstream Agent
warpstream.agent_kafka_joingroup_poll_iterations
(gauge)
distribution of the number of iterations to poll when processing a Kafka JoinGroup request
warpstream.agent_kafka_produce_compressed_bytes
(gauge)
distribution of the number of compressed bytes written when processing Kafka produce requests issued to an Agent
Shown as byte
warpstream.agent_kafka_produce_compressed_bytes_counter
(count)
number of compressed bytes written when processing Kafka produce requests issued to a Warpstream Agent
Shown as byte
warpstream.agent_kafka_produce_uncompressed_bytes
(gauge)
distribution of the number of uncompressed bytes written when processing Kafka produce requests issued to an Agent
Shown as byte
warpstream.agent_kafka_produce_uncompressed_bytes_counter
(count)
number of uncompressed bytes written when processing Kafka produce requests issued to a Warpstream Agent
Shown as byte
warpstream.agent_kafka_produce_with_offset_uncompressed_bytes
(gauge)
distribution of the number of uncompressed bytes written when processing Kafka produce with offset requests issued to an Agent
Shown as byte
warpstream.agent_kafka_produce_with_offset_uncompressed_bytes_counter
(count)
number of uncompressed bytes written when processing Kafka produce with offset requests issued to an Agent
Shown as byte
warpstream.agent_kafka_request_count
(count)
number of Kafka requests processed by a Warpstream Agent
warpstream.agent_kafka_request_latency
(gauge)
latency of Kafka requests processed by an Agent at the connection level
Shown as second
warpstream.agent_kafka_request_outcome
(count)
number of Kafka requests processed by a Warpstream Agent tagged by ‘outcome’
warpstream.agent_kafka_request_process_latency
(gauge)
latency to actually process a Kafka request by an Agent
Shown as second
warpstream.agent_kafka_source_cluster_connections_counter
(count)
number of connections opened from Orbit to a source cluster
warpstream.agent_net_kafka_tcp_idle_conn_close
(count)
number of Kafka connections closed because it was idle
warpstream.agent_net_kafka_tcp_limit_listener_acquire_count
(count)
number of times the Warpstream Agent accepted a new Kafka active connections
warpstream.agent_net_kafka_tcp_limit_listener_acquire_latency
(gauge)
latency to accept a new Kafka active connection
Shown as second
warpstream.agent_produce_backpressure_count
(count)
incremented by 1 every time a Warpstream Agent backpressured a produce request, tagged by ‘kafka_method’ and ‘backpressure_source’
warpstream.agent_produce_outcome
(count)
incremented every time a produce request has been processed, tagged by ‘outcome’
warpstream.agent_run_and_ack_job_latency
(gauge)
latency to run a job and acknowledge it back to Warpstream control plane
Shown as second
warpstream.agent_run_and_ack_job_outcome
(count)
incremented by 1 every time a job has been run and acknowledged, tagged by ‘outcome’
warpstream.agent_segment_batcher_flush_file_size_compressed_bytes
(gauge)
distribution of the compressed size of files batched in-memory before flushing to the object storage when processing a produce request
Shown as byte
warpstream.agent_segment_batcher_flush_file_size_uncompressed_bytes
(gauge)
distribution of the uncompressed size of files batched in-memory before flushing to the object storage when processing a produce request
Shown as byte
warpstream.agent_segment_batcher_flush_new_file_duration
(gauge)
distribution of the latency to batch in-memory before flushing to the object storage when processing a produce request
Shown as second
warpstream.agent_segment_batcher_flush_num_batches
(count)
counter of batches in a file flushed to object storage
warpstream.agent_segment_batcher_flush_num_records_counter
(count)
counter of records in a file flushed to object storage
warpstream.agent_segment_batcher_flush_num_records_distribution
(gauge)
distribution of the number of records in a file flushed to object storage
warpstream.agent_segment_batcher_flush_outcome
(count)
incremented by 1 every time the Warpstream Agent flushed a file to the object storage, tagged by ‘outcome’
warpstream.agent_segment_batcher_flush_put_file_duration
(gauge)
latency to advertise a new file to Warpstream control plane
Shown as second
warpstream.agent_segment_batcher_flush_total_duration
(gauge)
total time to flush a file and advertise it to Warpstream control plane
Shown as second
warpstream.agent_segment_batcher_flush_write_to_duration
(gauge)
time to upload a new file to object storage
Shown as second
warpstream.blob_store_get_stream_range_num_bytes_distribution
(gauge)
latency of the number of bytes read from object storage when reading ranges from a stream
Shown as byte
warpstream.blob_store_operation_latency
(gauge)
distribution of the latency of requests to object storage, tagged by ‘operation’
Shown as second
warpstream.build_age_unix_ms
(gauge)
time since the Warpstream Agent binary was built
Shown as millisecond
warpstream.circuit_breaker_count
(count)
incremented every time every time a circuit breaker state changes, tagged by ’name’ and ‘state’
warpstream.circuit_breaker_hits
(count)
incremented every time a circuit breaker records a success or a failure, tagged by ’name’ and ‘outcome’
warpstream.circuit_breaker_permit
(count)
incremented every time the Warpstream Agent tried to get a permit from a circuit breaker, tagged by ’name’ and ‘outcome’
warpstream.circuit_breaker_state_set
(gauge)
gauge of a circuit breaker state, tagged by ’name’ and ‘state’
warpstream.consumer_group_estimated_lag_very_coarse_do_not_use_to_measure_e2e_seconds
(gauge)
consumer time lag estimate, its tagging depends on the Warpstream Agent settings
Shown as second
warpstream.consumer_group_generation_id
(gauge)
indicates the generation number of the consumer group, incrementing by one with each rebalance. It serves as an effective indicator for detecting occurrences of rebalances.
warpstream.consumer_group_lag
(gauge)
sum of the consumer group lag across the topic’s partitions
warpstream.consumer_group_num_members
(gauge)
number of members in each consumer group
warpstream.consumer_group_num_partitions
(gauge)
number of partitions in each consumer group
warpstream.consumer_group_num_topics
(gauge)
number of topics in each consumer group
warpstream.consumer_group_state
(gauge)
indicates state of consumer group, tagged by ‘consumer_group’ and ‘group_state’
warpstream.container_running
(gauge)
gauge set to 1 by each Warpstream Agent running
warpstream.cpu_utilization_go_runtime
(gauge)
current process’s CPU utilization from the go runtime’s perspective
Shown as percent
warpstream.cpu_utilization_os
(gauge)
current process’s CPU utilization from the operating system’s perspective
Shown as percent
warpstream.diagnostic_status
(gauge)
diagnostic status, tabbed by ‘diagnostic_name’, ‘diagnostic_type’, ‘successful’ and ‘muted’
warpstream.error_count
(count)
number of error logs
warpstream.fast_retrier_execution
(count)
number of times the fast retrier was used
warpstream.fast_retrier_p99_ms
(gauge)
p99 latency tracked by a fast retrier
Shown as millisecond
warpstream.fast_retrier_speculative_retry
(count)
number of times a fast retrier interrupted an ongoing request to retry it faster
warpstream.file_cache_batcher_batches_count
(count)
number of batches to the file cache
warpstream.file_cache_batcher_batches_distribution
(gauge)
distribution of the number of batches to the file cache
warpstream.file_cache_batcher_called
(count)
number of times a Warpstream Agent batched requests to the file cache
warpstream.files_count
(gauge)
number of files tracked by Warpstream control planed, tagged by ’level’ of compaction
warpstream.get_records_batch_batches_count
(count)
number of batches when issuing a GetRecords request to Warpstream control plane
warpstream.get_records_batch_batches_distribution
(gauge)
distribution of number of batches when issuing a GetRecords request to Warpstream control plane
warpstream.get_records_batch_called
(count)
number of times a Warpstream Agent batched GetRecords requests
warpstream.global_ratelimit_fetch_compressed_bytes
(count)
number of fetch request bytes rate limited
Shown as byte
warpstream.kafka_batcher_batches_count
(count)
number of batches when issuing Kafka requests (not produce nor fetch) to Warpstream control plane
warpstream.kafka_batcher_batches_distribution
(gauge)
distribution of number of batches when issuing Kafka requests (not produce nor fetch) to Warpstream control plane
warpstream.kafka_batcher_called
(count)
number of times a Warpstream Agent processed a Kafka request (not produce nor fetch)
warpstream.list_streams_batcher_batches_count
(count)
number of batches when issuing a ListStream request to Warpstream control plane
warpstream.list_streams_batcher_batches_distribution
(gauge)
distribution of the number of batches when issuing a ListStream request to Warpstream control plane
warpstream.list_streams_batcher_called
(count)
number of times an Agent a ListStream request
warpstream.loading_cache_loader_outcome
(count)
incremented by 1 every time a loading cache filled a key, tagged by ‘cache_name’ and ‘outcome’
warpstream.loading_cache_refresh_completed
(count)
incremented by 1 every time a loading cache refreshed a key, tagged by ‘cache_name’ and ‘outcome’
warpstream.max_offset
(gauge)
max Kafka offset
warpstream.net_http_tcp_limit_listener_acquire_count
(count)
number of times the Warpstream Agent accepted a new HTTP active connection
warpstream.net_http_tcp_limit_listener_acquire_latency
(gauge)
latency to accept a new HTTP active connetion
Shown as second
warpstream.num_records
(gauge)
number of records
warpstream.partitions_count
(gauge)
number of partitions
warpstream.partitions_count_limit
(gauge)
max number of partitions
warpstream.schema_versions_count
(gauge)
number of schema registry versions
warpstream.schema_versions_limit
(gauge)
max number of schema registry versions
warpstream.server_starting
(count)
incremented by 1 when a Warpstream Agent process starts
warpstream.server_tick_running
(count)
incremented by 1 when a Warpstream Agent server tick ran
warpstream.target_release_server_middleware
(count)
incremented by 1 when a Warpstream Agent check that a HTTP request was intended for this Warpstream cluster, tagged by ‘outcome’
warpstream.topics_count
(gauge)
number of topics
warpstream.topics_count_limit
(gauge)
max number of topics

Uninstallation

  1. Remove the integration yaml config file and restart the Datadog Agent.

  2. Unset the -enableDatadogMetrics flag and restart the WarpStream Agent.

Support

Need help? Contact us through the following channels: