Producing Delta Temporality Metrics with OpenTelemetry

Overview

The OpenTelemetry protocol (OTLP) sends several metric types, some of which can have either delta or cumulative aggregation temporality. Datadog works best with delta aggregation temporality for monotonic sums, histograms, and exponential histograms.

This guide describes the implications of using cumulative aggregation temporality instead, and how to select which aggregation temporality to export your metrics with, either in the OpenTelemetry SDK or by using the OpenTelemetry Collector cumulativetodelta processor.

Implications of using cumulative aggregation temporality

If you opt to send OTLP monotonic sums, histograms, or exponential histograms with cumulative aggregation temporality, Datadog takes the difference between consecutive points on a timeseries. This means that:

  • Your deployment is stateful, so you need to send all points on a timeseries to the same Datadog Agent or Datadog exporter. This affects how you scale your OpenTelemetry Collector deployments.
  • Datadog might not send the first point it receives from a given timeseries if it cannot ensure this point is the true start of the timeseries. This may lead to missing points upon restarts.
  • The minimum and maximum cannot be recovered for cumulative OTLP Histograms; they may be missing or approximated depending on the histograms export mode.

Configuring your OpenTelemetry SDK

If you produce OTLP metrics from an OpenTelemetry SDK, you can configure your OTLP exporter to produce these metric types with delta aggregation temporality. In some languages you can use the recommended configuration by setting the OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE environment variable to Delta (case-insensitive). For a list of languages with support for this environment variable, read the specification compliance matrix.

If your SDK does not support this environment variable you can configure delta temporality in code. The following example configures an OTLP HTTP exporter and adds 1 to a counter every two seconds for a total of five minutes.

Note: These examples are to get you started. You shouldn’t apply patterns like using console or stdout exporters in production scenarios.

import time

from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
    OTLPMetricExporter,
)
from opentelemetry.sdk.metrics import (
    Counter,
    Histogram,
    MeterProvider,
    ObservableCounter,
    ObservableGauge,
    ObservableUpDownCounter,
    UpDownCounter,
)
from opentelemetry.sdk.metrics.export import (
    AggregationTemporality,
    ConsoleMetricExporter,
    PeriodicExportingMetricReader,
)

deltaTemporality = {
    Counter: AggregationTemporality.DELTA,
    UpDownCounter: AggregationTemporality.CUMULATIVE,
    Histogram: AggregationTemporality.DELTA,
    ObservableCounter: AggregationTemporality.DELTA,
    ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
    ObservableGauge: AggregationTemporality.CUMULATIVE,
}

exporter = OTLPMetricExporter(preferred_temporality=deltaTemporality)
reader = PeriodicExportingMetricReader(exporter, export_interval_millis=5_000)
provider = MeterProvider(metric_readers=[reader])

consoleReader = PeriodicExportingMetricReader(
    ConsoleMetricExporter(preferred_temporality=deltaTemporality), export_interval_millis=5_000)
consoleProvider = MeterProvider(metric_readers=[consoleReader])

meter = provider.get_meter("my-meter")
counter = meter.create_counter("example.counter")

consoleMeter = consoleProvider.get_meter("my-meter-console")
consoleCounter = consoleMeter.create_counter("example.counter.console")

for i in range(150):
  counter.add(1)
  consoleCounter.add(1)
  time.sleep(2)
package main

import (
	"context"
	"time"

	"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
	"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
	"go.opentelemetry.io/otel/sdk/metric"
	"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func deltaSelector(kind metric.InstrumentKind) metricdata.Temporality {
	switch kind {
	case metric.InstrumentKindCounter,
		metric.InstrumentKindHistogram,
		metric.InstrumentKindObservableGauge,
		metric.InstrumentKindObservableCounter:
		return metricdata.DeltaTemporality
	case metric.InstrumentKindUpDownCounter,
		metric.InstrumentKindObservableUpDownCounter:
		return metricdata.CumulativeTemporality
	}
	panic("unknown instrument kind")
}

func main() {
	ctx := context.Background()
	exporter, err := otlpmetrichttp.New(ctx,
		otlpmetrichttp.WithTemporalitySelector(deltaSelector),
	)
	consoleExporter, consoleErr := stdoutmetric.New(
		stdoutmetric.WithTemporalitySelector(deltaSelector),
	)
	if err != nil || consoleErr != nil {
		panic(err)
	}

	reader := metric.NewPeriodicReader(exporter,
		metric.WithInterval(5*time.Second),
	)
	provider := metric.NewMeterProvider(metric.WithReader(reader))

	consoleReader := metric.NewPeriodicReader(consoleExporter,
		metric.WithInterval(5*time.Second),
	)
	consoleProvider := metric.NewMeterProvider(metric.WithReader(consoleReader))

	defer func() {
		err := provider.Shutdown(ctx)
		consoleErr := consoleProvider.Shutdown(ctx)
		if err != nil || consoleErr != nil {
			panic(err)
		}
	}()

	meter := provider.Meter("my-meter")
	counter, err := meter.Int64Counter("example.counter")

	consoleMeter := consoleProvider.Meter("my-meter-console")
	consoleCounter, consoleErr := consoleMeter.Int64Counter("example.counter.console")

	if err != nil || consoleErr != nil {
		panic(err)
	}

	for i := 0; i < 150; i++ {
		counter.Add(ctx, 1)
		consoleCounter.Add(ctx, 1)
		time.Sleep(2 * time.Second)
	}
}
package io.opentelemetry.example.delta;

import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;

public final class Main {
  public static void main(String[] args) throws InterruptedException {
    OtlpHttpMetricExporter exporter = 
		OtlpHttpMetricExporter.builder()
        .setAggregationTemporalitySelector(
			AggregationTemporalitySelector.deltaPreferred())
        .build();

    MetricReader reader = 
		PeriodicMetricReader.builder(exporter).build();

    MeterProvider provider = SdkMeterProvider.builder()
        .registerMetricReader(reader)
        .build();

    Meter meter = provider.get("my-meter");

    LongCounter counter = 
		meter.counterBuilder("example.counter").build();

    for (int i = 0; i < 150; i++) {
      counter.add(1);
      Thread.sleep(2000);
    }
  }
}
// Requires: $ dotnet add package OpenTelemetry.Exporter.OpenTelemetryProtocol

using System.Diagnostics;
using System.Diagnostics.Metrics;
using OpenTelemetry;
using OpenTelemetry.Exporter;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using System.Threading;
using System;
using System.Threading.Tasks;

namespace GettingStarted;

public class Program
{
    public static void Main()
    {
		using var meter = new Meter("my-meter");
		var providerBuilder = Sdk.CreateMeterProviderBuilder().AddMeter(meter.Name);
		providerBuilder
        .AddConsoleExporter((exporterOptions, metricReaderOptions) =>
			{
                metricReaderOptions.PeriodicExportingMetricReaderOptions = new PeriodicExportingMetricReaderOptions
                    {
                        ExportIntervalMilliseconds = Convert.ToInt32("5000"),
                    };
				metricReaderOptions.TemporalityPreference = MetricReaderTemporalityPreference.Delta;
			})
        .AddOtlpExporter((exporterOptions, metricReaderOptions) =>
			{
                metricReaderOptions.PeriodicExportingMetricReaderOptions = new PeriodicExportingMetricReaderOptions
                    {
                        ExportIntervalMilliseconds = Convert.ToInt32("5000"),
                    };
				exporterOptions.Protocol = OtlpExportProtocol.HttpProtobuf;
				metricReaderOptions.TemporalityPreference = MetricReaderTemporalityPreference.Delta;
			});
		using var provider = providerBuilder.Build();

		Counter<int> counter = meter.CreateCounter<int>("example.counter", "1", "Example counter");
		for (int i = 0; i < 150; i++) {
			counter?.Add(1);
			Task.Delay(2000).Wait();
		}
  }
}

You can configure OTLP gRPC exporters in a similar fashion.

Converting to delta temporality on the Collector

When your metrics do not come from an OpenTelemetry language library, it may be infeasible to configure them to use delta aggregation temporality. This may be the case, for example, when producing metrics with other open source libraries such as Prometheus. In this situation, you can use the cumulative to delta processor to map your metrics to delta aggregation temporality. Your deployment is still stateful, so if your deployment has multiple Collectors, you need to use the processor on a first layer of stateful Collectors to ensure that all points of a metric are sent to the same Collector instance.

To enable the cumulative-to-delta processor so that it applies to all your metrics, define it with an empty configuration on the processors section:

processors:
    cumulativetodelta:

Finally, add it to the processors list on your metrics pipelines.

Note: The cumulative-to-delta processor does not support exponential histograms. Also, some fields, such as the minimum and maximum, can’t be recovered with this approach. Instead, use the OpenTelemetry SDK approach whenever possible.

Further reading

Additional helpful documentation, links, and articles: