# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
"""
This library allows export of metrics data to `Prometheus <https://prometheus.io/>`_.
Usage
-----
The **OpenTelemetry Prometheus Exporter** allows export of `OpenTelemetry`_
metrics to `Prometheus`_.
.. _Prometheus: https://prometheus.io/
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/
.. code:: python
from prometheus_client import start_http_server
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.metrics import get_meter_provider, set_meter_provider
from opentelemetry.sdk.metrics import MeterProvider
# Start Prometheus client
start_http_server(port=8000, addr="localhost")
# Exporter to export metrics to Prometheus
prefix = "MyAppPrefix"
reader = PrometheusMetricReader(prefix=prefix)
# Meter is responsible for creating and recording metrics
set_meter_provider(MeterProvider(metric_readers=[reader]))
meter = get_meter_provider().get_meter("myapp", "0.1.2")
counter = meter.create_counter(
"requests",
"requests",
"number of requests",
)
# Labels are used to identify key-values that are associated with a specific
# metric that you want to record. These are useful for pre-aggregation and can
# be used to store custom dimensions pertaining to a metric
labels = {"environment": "staging"}
counter.add(25, labels)
input("Press any key to exit...")
API
---
"""
from collections import deque
from collections.abc import Iterable, Sequence
from itertools import chain
from json import dumps
from logging import getLogger
from os import environ
from prometheus_client import CollectorRegistry, start_http_server
from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
GaugeMetricFamily,
HistogramMetricFamily,
InfoMetricFamily,
)
from prometheus_client.core import Metric as PrometheusMetric
from opentelemetry.exporter.prometheus._mapping import (
map_unit,
sanitize_attribute,
sanitize_full_name,
)
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_PROMETHEUS_HOST,
OTEL_EXPORTER_PROMETHEUS_PORT,
)
from opentelemetry.sdk.metrics import (
Counter,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk.metrics import Histogram as HistogramInstrument
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
Gauge,
Histogram,
HistogramDataPoint,
MetricReader,
MetricsData,
Sum,
)
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OtelComponentTypeValues,
)
from opentelemetry.util.types import Attributes
_logger = getLogger(__name__)
_TARGET_INFO_NAME = "target"
_TARGET_INFO_DESCRIPTION = "Target metadata"
def _convert_buckets(
bucket_counts: Sequence[int], explicit_bounds: Sequence[float]
) -> Sequence[tuple[str, int]]:
buckets = []
total_count = 0
for upper_bound, count in zip(
chain(explicit_bounds, ["+Inf"]),
bucket_counts,
):
total_count += count
buckets.append((f"{upper_bound}", total_count))
return buckets
[docs]
class PrometheusMetricReader(MetricReader):
"""Prometheus metric exporter for OpenTelemetry."""
def __init__(
self,
disable_target_info: bool = False,
prefix: str = "",
*,
registry: CollectorRegistry = REGISTRY,
) -> None:
super().__init__(
preferred_temporality={
Counter: AggregationTemporality.CUMULATIVE,
UpDownCounter: AggregationTemporality.CUMULATIVE,
HistogramInstrument: AggregationTemporality.CUMULATIVE,
ObservableCounter: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
},
otel_component_type=OtelComponentTypeValues.PROMETHEUS_HTTP_TEXT_METRIC_EXPORTER,
)
self._collector = _CustomCollector(
disable_target_info=disable_target_info, prefix=prefix
)
self._registry = registry
self._registry.register(self._collector)
self._collector._callback = self.collect
self._prefix = prefix
def _receive_metrics(
self,
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> None:
if metrics_data is None:
return
self._collector.add_metrics_data(metrics_data)
[docs]
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
self._registry.unregister(self._collector)
class _CustomCollector:
"""_CustomCollector represents the Prometheus Collector object
See more:
https://github.com/prometheus/client_python#custom-collectors
"""
def __init__(self, disable_target_info: bool = False, prefix: str = ""):
self._callback = None
self._metrics_datas: deque[MetricsData] = deque()
self._disable_target_info = disable_target_info
self._target_info = None
self._prefix = prefix
def add_metrics_data(self, metrics_data: MetricsData) -> None:
"""Add metrics to Prometheus data"""
self._metrics_datas.append(metrics_data)
def collect(self) -> Iterable[PrometheusMetric]:
"""Collect fetches the metrics from OpenTelemetry
and delivers them as Prometheus Metrics.
Collect is invoked every time a ``prometheus.Gatherer`` is run
for example when the HTTP endpoint is invoked by Prometheus.
"""
if self._callback is not None:
self._callback()
metric_family_id_metric_family = {}
if len(self._metrics_datas):
if not self._disable_target_info:
if self._target_info is None:
attributes: Attributes = {}
for res in self._metrics_datas[0].resource_metrics:
attributes = {**attributes, **res.resource.attributes}
self._target_info = self._create_info_metric(
_TARGET_INFO_NAME, _TARGET_INFO_DESCRIPTION, attributes
)
metric_family_id_metric_family[_TARGET_INFO_NAME] = (
self._target_info
)
while self._metrics_datas:
self._translate_to_prometheus(
self._metrics_datas.popleft(), metric_family_id_metric_family
)
if metric_family_id_metric_family:
yield from metric_family_id_metric_family.values()
# pylint: disable=too-many-locals,too-many-branches
def _translate_to_prometheus(
self,
metrics_data: MetricsData,
metric_family_id_metric_family: dict[str, PrometheusMetric],
):
metrics = []
for resource_metrics in metrics_data.resource_metrics:
for scope_metrics in resource_metrics.scope_metrics:
for metric in scope_metrics.metrics:
metrics.append(metric)
for metric in metrics:
label_values_data_points = []
values = []
metric_name = metric.name
if self._prefix:
metric_name = self._prefix + "_" + metric_name
metric_name = sanitize_full_name(metric_name)
metric_description = metric.description or ""
metric_unit = map_unit(metric.unit)
# First pass: collect all unique label keys across all data points
all_label_keys_set = set()
data_point_attributes = []
for number_data_point in metric.data.data_points:
attrs = {}
for key, value in number_data_point.attributes.items():
sanitized_key = sanitize_attribute(key)
all_label_keys_set.add(sanitized_key)
attrs[sanitized_key] = self._check_value(value)
data_point_attributes.append(attrs)
if isinstance(number_data_point, HistogramDataPoint):
values.append(
{
"bucket_counts": number_data_point.bucket_counts,
"explicit_bounds": (
number_data_point.explicit_bounds
),
"sum": number_data_point.sum,
}
)
else:
values.append(number_data_point.value)
# Sort label keys for consistent ordering
all_label_keys = sorted(all_label_keys_set)
# Second pass: build label values with empty strings for missing labels
for attrs in data_point_attributes:
label_values = []
for key in all_label_keys:
label_values.append(attrs.get(key, ""))
label_values_data_points.append(label_values)
# Create metric family ID without label keys
per_metric_family_id = "|".join(
[
metric_name,
metric_description,
metric_unit,
]
)
is_non_monotonic_sum = (
isinstance(metric.data, Sum)
and metric.data.is_monotonic is False
)
is_cumulative = (
isinstance(metric.data, Sum)
and metric.data.aggregation_temporality
== AggregationTemporality.CUMULATIVE
)
# The prometheus compatibility spec for sums says: If the aggregation temporality is cumulative and the sum is non-monotonic, it MUST be converted to a Prometheus Gauge.
should_convert_sum_to_gauge = (
is_non_monotonic_sum and is_cumulative
)
if (
isinstance(metric.data, Sum)
and not should_convert_sum_to_gauge
):
metric_family_id = "|".join(
[per_metric_family_id, CounterMetricFamily.__name__]
)
if metric_family_id not in metric_family_id_metric_family:
metric_family_id_metric_family[metric_family_id] = (
CounterMetricFamily(
name=metric_name,
documentation=metric_description,
labels=all_label_keys,
unit=metric_unit,
)
)
for label_values, value in zip(
label_values_data_points, values
):
metric_family_id_metric_family[
metric_family_id
].add_metric(labels=label_values, value=value)
elif isinstance(metric.data, Gauge) or should_convert_sum_to_gauge:
metric_family_id = "|".join(
[per_metric_family_id, GaugeMetricFamily.__name__]
)
if (
metric_family_id
not in metric_family_id_metric_family.keys()
):
metric_family_id_metric_family[metric_family_id] = (
GaugeMetricFamily(
name=metric_name,
documentation=metric_description,
labels=all_label_keys,
unit=metric_unit,
)
)
for label_values, value in zip(
label_values_data_points, values
):
metric_family_id_metric_family[
metric_family_id
].add_metric(labels=label_values, value=value)
elif isinstance(metric.data, Histogram):
metric_family_id = "|".join(
[per_metric_family_id, HistogramMetricFamily.__name__]
)
if (
metric_family_id
not in metric_family_id_metric_family.keys()
):
metric_family_id_metric_family[metric_family_id] = (
HistogramMetricFamily(
name=metric_name,
documentation=metric_description,
labels=all_label_keys,
unit=metric_unit,
)
)
for label_values, value in zip(
label_values_data_points, values
):
metric_family_id_metric_family[
metric_family_id
].add_metric(
labels=label_values,
buckets=_convert_buckets(
value["bucket_counts"], value["explicit_bounds"]
),
sum_value=value["sum"],
)
else:
_logger.warning(
"Unsupported metric data. %s", type(metric.data)
)
# pylint: disable=no-self-use
def _check_value(self, value: int | float | str | Sequence) -> str:
"""Check the label value and return is appropriate representation"""
if not isinstance(value, str):
return dumps(value, default=str)
return str(value)
def _create_info_metric(
self, name: str, description: str, attributes: dict[str, str]
) -> InfoMetricFamily:
"""Create an Info Metric Family with list of attributes"""
# sanitize the attribute names according to Prometheus rule
attributes = {
sanitize_attribute(key): self._check_value(value)
for key, value in attributes.items()
}
info = InfoMetricFamily(name, description, labels=attributes)
info.add_metric(labels=list(attributes.keys()), value=attributes)
return info
class _AutoPrometheusMetricReader(PrometheusMetricReader):
"""Thin wrapper around PrometheusMetricReader used for the opentelemetry_metrics_exporter entry point.
This allows users to use the prometheus exporter with opentelemetry-instrument. It handles
starting the Prometheus http server on the the correct port and host.
"""
def __init__(self) -> None:
super().__init__()
# Default values are specified in
# https://github.com/open-telemetry/opentelemetry-specification/blob/v1.24.0/specification/configuration/sdk-environment-variables.md#prometheus-exporter
start_http_server(
port=int(environ.get(OTEL_EXPORTER_PROMETHEUS_PORT, "9464")),
addr=environ.get(OTEL_EXPORTER_PROMETHEUS_HOST, "localhost"),
)