Google Cloud Dataflow

Overview

Google Cloud Dataflow is a fully-managed service for transforming and enriching data in stream (real time) and batch (historical) modes with equal reliability and expressiveness.

Use the Datadog Google Cloud integration to collect metrics from Google Cloud Dataflow.

Setup

Metric collection

Installation

If you haven’t already, set up the Google Cloud Platform integration first. There are no other installation steps.

Log collection

Google Cloud Dataflow logs are collected with Google Cloud Logging and sent to a Dataflow job through a Cloud Pub/Sub topic. If you haven’t already, set up logging with the Datadog Dataflow template.

Once this is done, export your Google Cloud Dataflow logs from Google Cloud Logging to the Pub/sub:

  1. Go to the Google Cloud Logging page and filter the Google Cloud Dataflow logs.
  2. Click Create Sink and name the sink accordingly.
  3. Choose “Cloud Pub/Sub” as the destination and select the Pub/Sub topic that was created for that purpose. Note: The Pub/Sub topic can be located in a different project.
  4. Click Create and wait for the confirmation message to show up.

Data Collected

Metrics

gcp.dataflow.job.active_worker_instances
(gauge)
The active number of worker instances.
gcp.dataflow.job.aggregated_worker_utilization
(gauge)
Aggregated worker utilization (for example, worker CPU utilization) across the worker pool.
Shown as percent
gcp.dataflow.job.backlog_bytes
(gauge)
Amount of known, unprocessed input for a stage, in bytes.
Shown as byte
gcp.dataflow.job.backlog_elements
(gauge)
Amount of known, unprocessed input for a stage, in elements.
gcp.dataflow.job.bigquery.write_count
(count)
BigQuery write requests from BigQueryIO.Write in Dataflow jobs.
gcp.dataflow.job.billable_shuffle_data_processed
(gauge)
The billable bytes of shuffle data processed by this Dataflow job.
Shown as byte
gcp.dataflow.job.bundle_user_processing_latencies.avg
(gauge)
The average bundle user processing latencies from a particular stage. Available for jobs running on Streaming Engine.
Shown as millisecond
gcp.dataflow.job.bundle_user_processing_latencies.samplecount
(gauge)
The sample count for bundle user processing latencies from a particular stage. Available for jobs running on Streaming Engine.
Shown as millisecond
gcp.dataflow.job.bundle_user_processing_latencies.sumsqdev
(gauge)
The sum of squared deviation for bundle user processing latencies from a particular stage. Available for jobs running on Streaming Engine.
Shown as millisecond
gcp.dataflow.job.current_num_vcpus
(gauge)
The number of vCPUs currently being used by this Dataflow job.
Shown as cpu
gcp.dataflow.job.current_shuffle_slots
(gauge)
The current shuffle slots used by this Dataflow job.
gcp.dataflow.job.data_watermark_age
(gauge)
The age (time since event timestamp) of the most recent item of data that has been fully processed by the pipeline.
Shown as second
gcp.dataflow.job.disk_space_capacity
(gauge)
The amount of persistent disk currently being allocated to all workers associated with this Dataflow job.
Shown as byte
gcp.dataflow.job.dofn_latency_average
(gauge)
The average processing time for a single message in a given DoFn (over the past 3 min window). Note that this includes time spent in GetData calls.
Shown as millisecond
gcp.dataflow.job.dofn_latency_max
(gauge)
The maximum processing time for a single message in a given DoFn (over the past 3 min window). Note that this includes time spent in GetData calls.
Shown as millisecond
gcp.dataflow.job.dofn_latency_min
(gauge)
The minimum processing time for a single message in a given DoFn (over the past 3 min window).
Shown as millisecond
gcp.dataflow.job.dofn_latency_num_messages
(gauge)
The number of messages processed by a given DoFn (over the past 3 min window).
gcp.dataflow.job.dofn_latency_total
(gauge)
The total processing time for all messages in a given DoFn (over the past 3 min window). Note that this includes time spent in GetData calls.
Shown as millisecond
gcp.dataflow.job.duplicates_filtered_out_count
(count)
The number of messages being processed by a particular stage that have been filtered out as duplicates. Available for jobs running on Streaming Engine.
gcp.dataflow.job.elapsed_time
(gauge)
Duration that the current run of this pipeline has been in the Running state so far, in seconds. When a run completes, this stays at the duration of that run until the next run starts.
Shown as second
gcp.dataflow.job.element_count
(count)
Number of elements added to the PCollection so far.
Shown as item
gcp.dataflow.job.elements_produced_count
(count)
The number of elements produced by each PTransform.
gcp.dataflow.job.estimated_backlog_processing_time
(gauge)
Estimated time (in seconds) to consume current backlog if no new data comes in and throughput stays the same. Only available for Streaming Engine jobs.
Shown as second
gcp.dataflow.job.estimated_byte_count
(count)
An estimated number of bytes added to the PCollection so far.
Shown as byte
gcp.dataflow.job.estimated_bytes_active
(gauge)
Estimated number of bytes active in this stage of the job.
Shown as byte
gcp.dataflow.job.estimated_bytes_consumed_count
(count)
Estimated number of bytes consumed by the stage of this job.
Shown as byte
gcp.dataflow.job.estimated_bytes_produced_count
(count)
The estimated total byte size of elements produced by each PTransform.
gcp.dataflow.job.estimated_timer_backlog_processing_time
(gauge)
Estimated time (in seconds) for timers to complete. Only available for Streaming Engine jobs.
Shown as second
gcp.dataflow.job.gpu_memory_utilization
(gauge)
Percent of time over the past sample period during which global (device) memory was being read or written.
Shown as percent
gcp.dataflow.job.gpu_utilization
(gauge)
Percent of time over the past sample period during which one or more kernels was executing on the GPU.
Shown as percent
gcp.dataflow.job.horizontal_worker_scaling
(gauge)
A boolean value that indicates what kind of horizontal scaling direction which the autoscaler recommended and rationale behind it. A true metric output means a scaling decision is made and a false metric output means the corresponding scaling is not taking effect.
gcp.dataflow.job.is_failed
(gauge)
Has this job failed.
gcp.dataflow.job.max_worker_instances_limit
(gauge)
The maximum number of workers autoscaling is allowed to request.
gcp.dataflow.job.memory_capacity
(gauge)
The amount of memory currently being allocated to all workers associated with this Dataflow job.
Shown as byte
gcp.dataflow.job.min_worker_instances_limit
(gauge)
The minimum number of workers autoscaling is allowed to request.
gcp.dataflow.job.oldest_active_message_age
(gauge)
How long the oldest active message in a DoFn has been processing for.
Shown as millisecond
gcp.dataflow.job.per_stage_data_watermark_age
(gauge)
The age (time since event timestamp) up to which all data has been processed by this stage of the pipeline.
Shown as second
gcp.dataflow.job.per_stage_system_lag
(gauge)
The current maximum duration that an item of data has been processing or awaiting processing in seconds, per pipeline stage.
Shown as second
gcp.dataflow.job.processing_parallelism_keys
(gauge)
Approximate number of keys in use for data processing for each stage. Processing for any given key is serialized, so the total number of keys for a stage represents the maximum available parallelism at that stage. Available for jobs running on Streaming Engine.
gcp.dataflow.job.pubsub.late_messages_count
(count)
The number of messages from Pub/Sub with timestamp older than the estimated watermark.
gcp.dataflow.job.pubsub.published_messages_count
(count)
The number of Pub/Sub messages published broken down by topic and status.
gcp.dataflow.job.pubsub.pulled_message_ages.avg
(gauge)
The average the distribution of pulled but unacked Pub/Sub message ages.
Shown as millisecond
gcp.dataflow.job.pubsub.pulled_message_ages.samplecount
(gauge)
The sample count for the distribution of pulled but unacked Pub/Sub message ages.
Shown as millisecond
gcp.dataflow.job.pubsub.pulled_message_ages.sumsqdev
(gauge)
The sum of squared deviation for the distribution of pulled but unacked Pub/Sub message ages.
Shown as millisecond
gcp.dataflow.job.pubsub.read_count
(count)
Pub/Sub Pull Requests. For Streaming Engine, this metric is deprecated. See the Using the Dataflow monitoring interface page for upcoming changes.
gcp.dataflow.job.pubsub.read_latencies.avg
(count)
The average Pub/Sub Pull request latencies from PubsubIO.Read in Dataflow jobs. For Streaming Engine, this metric is deprecated. See the Using the Dataflow monitoring interface page for upcoming changes.
Shown as millisecond
gcp.dataflow.job.pubsub.read_latencies.samplecount
(count)
The sample count for Pub/Sub Pull request latencies from PubsubIO.Read in Dataflow jobs. For Streaming Engine, this metric is deprecated. See the Using the Dataflow monitoring interface page for upcoming changes.
Shown as millisecond
gcp.dataflow.job.pubsub.read_latencies.sumsqdev
(count)
The sum of squared deviation for Pub/Sub Pull request latencies from PubsubIO.Read in Dataflow jobs. For Streaming Engine, this metric is deprecated. See the Using the Dataflow monitoring interface page for upcoming changes.
Shown as millisecond
gcp.dataflow.job.pubsub.streaming_pull_connection_status
(gauge)
Percentage of all Streaming Pull connections that are either active (OK status) or terminated because of an error (non-OK status). When a connection is terminated, Dataflow will wait some time before attempting to re-connect. For Streaming Engine only.
Shown as percent
gcp.dataflow.job.pubsub.write_count
(count)
Pub/Sub Publish requests from PubsubIO.Write in Dataflow jobs.
gcp.dataflow.job.pubsub.write_latencies.avg
(count)
The average Pub/Sub Publish request latencies from PubsubIO.Write in Dataflow jobs.
Shown as millisecond
gcp.dataflow.job.pubsub.write_latencies.samplecount
(count)
The sample count for Pub/Sub Publish request latencies from PubsubIO.Write in Dataflow jobs.
Shown as millisecond
gcp.dataflow.job.pubsub.write_latencies.sumsqdev
(count)
The sum of squared deviation for Pub/Sub Publish request latencies from PubsubIO.Write in Dataflow jobs.
Shown as millisecond
gcp.dataflow.job.status
(gauge)
Current state of this pipeline (for example, RUNNING, DONE, CANCELLED, or FAILED). Not reported while the pipeline is not running.
gcp.dataflow.job.streaming_engine.key_processing_availability
(gauge)
Percentage of streaming processing keys that are assigned to workers and available to perform work. Work for unavailable keys will be deferred until keys are available.
gcp.dataflow.job.streaming_engine.persistent_state.read_bytes_count
(count)
Storage bytes read by a particular stage. Available for jobs running on Streaming Engine.
gcp.dataflow.job.streaming_engine.persistent_state.stored_bytes
(gauge)
Current bytes stored in persistent state for the job.
Shown as byte
gcp.dataflow.job.streaming_engine.persistent_state.write_bytes_count
(count)
Storage bytes written by a particular stage. Available for jobs running on Streaming Engine.
gcp.dataflow.job.streaming_engine.persistent_state.write_latencies.avg
(count)
The average storage write latencies from a particular stage. Available for jobs running on Streaming Engine.
Shown as millisecond
gcp.dataflow.job.streaming_engine.persistent_state.write_latencies.samplecount
(count)
The sample count for storage write latencies from a particular stage. Available for jobs running on Streaming Engine.
Shown as millisecond
gcp.dataflow.job.streaming_engine.persistent_state.write_latencies.sumsqdev
(count)
The sum of squared deviation for storage write latencies from a particular stage. Available for jobs running on Streaming Engine.
Shown as millisecond
gcp.dataflow.job.streaming_engine.stage_end_to_end_latencies.avg
(gauge)
The average distribution of time spent by streaming engine in each stage of the pipeline. This time includes shuffling messages, queueing them for processing, processing, queueing for persistent state write, and the write itself.
Shown as millisecond
gcp.dataflow.job.streaming_engine.stage_end_to_end_latencies.samplecount
(gauge)
The sample count for distribution of time spent by streaming engine in each stage of the pipeline. This time includes shuffling messages, queueing them for processing, processing, queueing for persistent state write, and the write itself.
Shown as millisecond
gcp.dataflow.job.streaming_engine.stage_end_to_end_latencies.sumsqdev
(gauge)
The sum of squared deviation for distribution of time spent by streaming engine in each stage of the pipeline. This time includes shuffling messages, queueing them for processing, processing, queueing for persistent state write, and the write itself.
Shown as millisecond
gcp.dataflow.job.system_lag
(gauge)
The current maximum duration that an item of data has been awaiting processing, in seconds.
Shown as second
gcp.dataflow.job.target_worker_instances
(gauge)
The desired number of worker instances.
gcp.dataflow.job.thread_time
(count)
Estimated time in milliseconds spent running in the function of the ptransform totaled across threads on all workers of the job.
Shown as millisecond
gcp.dataflow.job.timers_pending_count
(count)
The number of timers pending in a particular stage. Available for jobs running on Streaming Engine.
gcp.dataflow.job.timers_processed_count
(count)
The number of timers completed by a particular stage. Available for jobs running on Streaming Engine.
gcp.dataflow.job.total_dcu_usage
(count)
Cumulative amount of DCUs (Data Compute Unit) used by the Dataflow job since it was launched.
gcp.dataflow.job.total_memory_usage_time
(gauge)
The total GB seconds of memory allocated to this Dataflow job.
Shown as gibibyte
gcp.dataflow.job.total_pd_usage_time
(gauge)
The total GB seconds for all persistent disk used by all workers associated with this Dataflow job.
Shown as gibibyte
gcp.dataflow.job.total_secu_usage
(gauge)
The total amount of SECUs (Streaming Engine Compute Unit) used by the Dataflow job since it was launched.
gcp.dataflow.job.total_shuffle_data_processed
(gauge)
The total bytes of shuffle data processed by this Dataflow job.
Shown as byte
gcp.dataflow.job.total_streaming_data_processed
(gauge)
The total bytes of streaming data processed by this Dataflow job.
Shown as byte
gcp.dataflow.job.total_vcpu_time
(gauge)
The total vCPU seconds used by this Dataflow job.
gcp.dataflow.job.user_counter
(gauge)
A user-defined counter metric.
gcp.dataflow.job.worker_utilization_hint
(gauge)
Worker utilization hint for autoscaling. This hint value is configured by the customers and defines a target worker CPU utilization range thus influencing scaling aggressiveness.
Shown as percent
gcp.dataflow.job.worker_utilization_hint_is_actively_used
(gauge)
Reports whether or not the worker utilization hint is actively used by the horizontal autoscaling policy.
gcp.dataflow.quota.region_endpoint_shuffle_slot.exceeded
(count)
Number of attempts to exceed the limit on quota metric dataflow.googleapis.com/region_endpoint_shuffle_slot.
gcp.dataflow.quota.region_endpoint_shuffle_slot.limit
(gauge)
Current limit on quota metric dataflow.googleapis.com/region_endpoint_shuffle_slot.
gcp.dataflow.quota.region_endpoint_shuffle_slot.usage
(gauge)
Current usage on quota metric dataflow.googleapis.com/region_endpoint_shuffle_slot.
gcp.dataflow.worker.memory.bytes_used
(gauge)
The memory usage in bytes by a particular container instance on a Dataflow worker.
Shown as byte
gcp.dataflow.worker.memory.container_limit
(gauge)
Maximum RAM size in bytes available to a particular container instance on a Dataflow worker.
Shown as byte
gcp.dataflow.worker.memory.total_limit
(gauge)
RAM size in bytes available to a Dataflow worker.
Shown as byte
When using Google Cloud Dataflow to monitor Apache Beam pipeline metrics, note that metrics generated from Gauge static methods are not collected. If you need to monitor these metrics, you can use Micrometer.

Events

The Google Cloud Dataflow integration does not include any events.

Service Checks

The Google Cloud Dataflow integration does not include any service checks.

Troubleshooting

Need help? Contact Datadog support.

Further Reading

Additional helpful documentation, links, and articles: