123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530 |
- # Copyright The OpenTelemetry Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # pylint: disable=too-many-ancestors
- from abc import ABC, abstractmethod
- from dataclasses import dataclass
- from logging import getLogger
- from re import compile as re_compile
- from typing import (
- Callable,
- Dict,
- Generator,
- Generic,
- Iterable,
- Optional,
- Sequence,
- TypeVar,
- Union,
- )
- # pylint: disable=unused-import; needed for typing and sphinx
- from opentelemetry import metrics
- from opentelemetry.context import Context
- from opentelemetry.metrics._internal.observation import Observation
- from opentelemetry.util.types import (
- Attributes,
- )
- _logger = getLogger(__name__)
- _name_regex = re_compile(r"[a-zA-Z][-_./a-zA-Z0-9]{0,254}")
- _unit_regex = re_compile(r"[\x00-\x7F]{0,63}")
- @dataclass(frozen=True)
- class _MetricsHistogramAdvisory:
- explicit_bucket_boundaries: Optional[Sequence[float]] = None
- @dataclass(frozen=True)
- class CallbackOptions:
- """Options for the callback
- Args:
- timeout_millis: Timeout for the callback's execution. If the callback does asynchronous
- work (e.g. HTTP requests), it should respect this timeout.
- """
- timeout_millis: float = 10_000
- InstrumentT = TypeVar("InstrumentT", bound="Instrument")
- # pylint: disable=invalid-name
- CallbackT = Union[
- Callable[[CallbackOptions], Iterable[Observation]],
- Generator[Iterable[Observation], CallbackOptions, None],
- ]
- class Instrument(ABC):
- """Abstract class that serves as base for all instruments."""
- @abstractmethod
- def __init__(
- self,
- name: str,
- unit: str = "",
- description: str = "",
- ) -> None:
- pass
- @staticmethod
- def _check_name_unit_description(
- name: str, unit: str, description: str
- ) -> Dict[str, Optional[str]]:
- """
- Checks the following instrument name, unit and description for
- compliance with the spec.
- Returns a dict with keys "name", "unit" and "description", the
- corresponding values will be the checked strings or `None` if the value
- is invalid. If valid, the checked strings should be used instead of the
- original values.
- """
- result: Dict[str, Optional[str]] = {}
- if _name_regex.fullmatch(name) is not None:
- result["name"] = name
- else:
- result["name"] = None
- if unit is None:
- unit = ""
- if _unit_regex.fullmatch(unit) is not None:
- result["unit"] = unit
- else:
- result["unit"] = None
- if description is None:
- result["description"] = ""
- else:
- result["description"] = description
- return result
- class _ProxyInstrument(ABC, Generic[InstrumentT]):
- def __init__(
- self,
- name: str,
- unit: str = "",
- description: str = "",
- ) -> None:
- self._name = name
- self._unit = unit
- self._description = description
- self._real_instrument: Optional[InstrumentT] = None
- def on_meter_set(self, meter: "metrics.Meter") -> None:
- """Called when a real meter is set on the creating _ProxyMeter"""
- # We don't need any locking on proxy instruments because it's OK if some
- # measurements get dropped while a real backing instrument is being
- # created.
- self._real_instrument = self._create_real_instrument(meter)
- @abstractmethod
- def _create_real_instrument(self, meter: "metrics.Meter") -> InstrumentT:
- """Create an instance of the real instrument. Implement this."""
- class _ProxyAsynchronousInstrument(_ProxyInstrument[InstrumentT]):
- def __init__(
- self,
- name: str,
- callbacks: Optional[Sequence[CallbackT]] = None,
- unit: str = "",
- description: str = "",
- ) -> None:
- super().__init__(name, unit, description)
- self._callbacks = callbacks
- class Synchronous(Instrument):
- """Base class for all synchronous instruments"""
- class Asynchronous(Instrument):
- """Base class for all asynchronous instruments"""
- @abstractmethod
- def __init__(
- self,
- name: str,
- callbacks: Optional[Sequence[CallbackT]] = None,
- unit: str = "",
- description: str = "",
- ) -> None:
- super().__init__(name, unit=unit, description=description)
- class Counter(Synchronous):
- """A Counter is a synchronous `Instrument` which supports non-negative increments."""
- @abstractmethod
- def add(
- self,
- amount: Union[int, float],
- attributes: Optional[Attributes] = None,
- context: Optional[Context] = None,
- ) -> None:
- pass
- class NoOpCounter(Counter):
- """No-op implementation of `Counter`."""
- def __init__(
- self,
- name: str,
- unit: str = "",
- description: str = "",
- ) -> None:
- super().__init__(name, unit=unit, description=description)
- def add(
- self,
- amount: Union[int, float],
- attributes: Optional[Attributes] = None,
- context: Optional[Context] = None,
- ) -> None:
- return super().add(amount, attributes=attributes, context=context)
- class _ProxyCounter(_ProxyInstrument[Counter], Counter):
- def add(
- self,
- amount: Union[int, float],
- attributes: Optional[Attributes] = None,
- context: Optional[Context] = None,
- ) -> None:
- if self._real_instrument:
- self._real_instrument.add(amount, attributes, context)
- def _create_real_instrument(self, meter: "metrics.Meter") -> Counter:
- return meter.create_counter(
- self._name,
- self._unit,
- self._description,
- )
- class UpDownCounter(Synchronous):
- """An UpDownCounter is a synchronous `Instrument` which supports increments and decrements."""
- @abstractmethod
- def add(
- self,
- amount: Union[int, float],
- attributes: Optional[Attributes] = None,
- context: Optional[Context] = None,
- ) -> None:
- pass
- class NoOpUpDownCounter(UpDownCounter):
- """No-op implementation of `UpDownCounter`."""
- def __init__(
- self,
- name: str,
- unit: str = "",
- description: str = "",
- ) -> None:
- super().__init__(name, unit=unit, description=description)
- def add(
- self,
- amount: Union[int, float],
- attributes: Optional[Attributes] = None,
- context: Optional[Context] = None,
- ) -> None:
- return super().add(amount, attributes=attributes, context=context)
- class _ProxyUpDownCounter(_ProxyInstrument[UpDownCounter], UpDownCounter):
- def add(
- self,
- amount: Union[int, float],
- attributes: Optional[Attributes] = None,
- context: Optional[Context] = None,
- ) -> None:
- if self._real_instrument:
- self._real_instrument.add(amount, attributes, context)
- def _create_real_instrument(self, meter: "metrics.Meter") -> UpDownCounter:
- return meter.create_up_down_counter(
- self._name,
- self._unit,
- self._description,
- )
- class ObservableCounter(Asynchronous):
- """An ObservableCounter is an asynchronous `Instrument` which reports monotonically
- increasing value(s) when the instrument is being observed.
- """
- class NoOpObservableCounter(ObservableCounter):
- """No-op implementation of `ObservableCounter`."""
- def __init__(
- self,
- name: str,
- callbacks: Optional[Sequence[CallbackT]] = None,
- unit: str = "",
- description: str = "",
- ) -> None:
- super().__init__(
- name,
- callbacks,
- unit=unit,
- description=description,
- )
- class _ProxyObservableCounter(
- _ProxyAsynchronousInstrument[ObservableCounter], ObservableCounter
- ):
- def _create_real_instrument(
- self, meter: "metrics.Meter"
- ) -> ObservableCounter:
- return meter.create_observable_counter(
- self._name,
- self._callbacks,
- self._unit,
- self._description,
- )
- class ObservableUpDownCounter(Asynchronous):
- """An ObservableUpDownCounter is an asynchronous `Instrument` which reports additive value(s) (e.g.
- the process heap size - it makes sense to report the heap size from multiple processes and sum them
- up, so we get the total heap usage) when the instrument is being observed.
- """
- class NoOpObservableUpDownCounter(ObservableUpDownCounter):
- """No-op implementation of `ObservableUpDownCounter`."""
- def __init__(
- self,
- name: str,
- callbacks: Optional[Sequence[CallbackT]] = None,
- unit: str = "",
- description: str = "",
- ) -> None:
- super().__init__(
- name,
- callbacks,
- unit=unit,
- description=description,
- )
- class _ProxyObservableUpDownCounter(
- _ProxyAsynchronousInstrument[ObservableUpDownCounter],
- ObservableUpDownCounter,
- ):
- def _create_real_instrument(
- self, meter: "metrics.Meter"
- ) -> ObservableUpDownCounter:
- return meter.create_observable_up_down_counter(
- self._name,
- self._callbacks,
- self._unit,
- self._description,
- )
- class Histogram(Synchronous):
- """Histogram is a synchronous `Instrument` which can be used to report arbitrary values
- that are likely to be statistically meaningful. It is intended for statistics such as
- histograms, summaries, and percentile.
- """
- @abstractmethod
- def __init__(
- self,
- name: str,
- unit: str = "",
- description: str = "",
- explicit_bucket_boundaries_advisory: Optional[Sequence[float]] = None,
- ) -> None:
- pass
- @abstractmethod
- def record(
- self,
- amount: Union[int, float],
- attributes: Optional[Attributes] = None,
- context: Optional[Context] = None,
- ) -> None:
- pass
- class NoOpHistogram(Histogram):
- """No-op implementation of `Histogram`."""
- def __init__(
- self,
- name: str,
- unit: str = "",
- description: str = "",
- explicit_bucket_boundaries_advisory: Optional[Sequence[float]] = None,
- ) -> None:
- super().__init__(
- name,
- unit=unit,
- description=description,
- explicit_bucket_boundaries_advisory=explicit_bucket_boundaries_advisory,
- )
- def record(
- self,
- amount: Union[int, float],
- attributes: Optional[Attributes] = None,
- context: Optional[Context] = None,
- ) -> None:
- return super().record(amount, attributes=attributes, context=context)
- class _ProxyHistogram(_ProxyInstrument[Histogram], Histogram):
- def __init__(
- self,
- name: str,
- unit: str = "",
- description: str = "",
- explicit_bucket_boundaries_advisory: Optional[Sequence[float]] = None,
- ) -> None:
- super().__init__(name, unit=unit, description=description)
- self._explicit_bucket_boundaries_advisory = (
- explicit_bucket_boundaries_advisory
- )
- def record(
- self,
- amount: Union[int, float],
- attributes: Optional[Attributes] = None,
- context: Optional[Context] = None,
- ) -> None:
- if self._real_instrument:
- self._real_instrument.record(amount, attributes, context)
- def _create_real_instrument(self, meter: "metrics.Meter") -> Histogram:
- return meter.create_histogram(
- self._name,
- self._unit,
- self._description,
- explicit_bucket_boundaries_advisory=self._explicit_bucket_boundaries_advisory,
- )
- class ObservableGauge(Asynchronous):
- """Asynchronous Gauge is an asynchronous `Instrument` which reports non-additive value(s) (e.g.
- the room temperature - it makes no sense to report the temperature value from multiple rooms
- and sum them up) when the instrument is being observed.
- """
- class NoOpObservableGauge(ObservableGauge):
- """No-op implementation of `ObservableGauge`."""
- def __init__(
- self,
- name: str,
- callbacks: Optional[Sequence[CallbackT]] = None,
- unit: str = "",
- description: str = "",
- ) -> None:
- super().__init__(
- name,
- callbacks,
- unit=unit,
- description=description,
- )
- class _ProxyObservableGauge(
- _ProxyAsynchronousInstrument[ObservableGauge],
- ObservableGauge,
- ):
- def _create_real_instrument(
- self, meter: "metrics.Meter"
- ) -> ObservableGauge:
- return meter.create_observable_gauge(
- self._name,
- self._callbacks,
- self._unit,
- self._description,
- )
- class Gauge(Synchronous):
- """A Gauge is a synchronous `Instrument` which can be used to record non-additive values as they occur."""
- @abstractmethod
- def set(
- self,
- amount: Union[int, float],
- attributes: Optional[Attributes] = None,
- context: Optional[Context] = None,
- ) -> None:
- pass
- class NoOpGauge(Gauge):
- """No-op implementation of ``Gauge``."""
- def __init__(
- self,
- name: str,
- unit: str = "",
- description: str = "",
- ) -> None:
- super().__init__(name, unit=unit, description=description)
- def set(
- self,
- amount: Union[int, float],
- attributes: Optional[Attributes] = None,
- context: Optional[Context] = None,
- ) -> None:
- return super().set(amount, attributes=attributes, context=context)
- class _ProxyGauge(
- _ProxyInstrument[Gauge],
- Gauge,
- ):
- def set(
- self,
- amount: Union[int, float],
- attributes: Optional[Attributes] = None,
- context: Optional[Context] = None,
- ) -> None:
- if self._real_instrument:
- self._real_instrument.set(amount, attributes, context)
- def _create_real_instrument(self, meter: "metrics.Meter") -> Gauge:
- return meter.create_gauge(
- self._name,
- self._unit,
- self._description,
- )
|