123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- # Copyright The OpenTelemetry Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- import threading
- from collections import OrderedDict
- from collections.abc import MutableMapping
- from typing import Optional, Sequence, Tuple, Union
- from opentelemetry.util import types
- # bytes are accepted as a user supplied value for attributes but
- # decoded to strings internally.
- _VALID_ATTR_VALUE_TYPES = (bool, str, bytes, int, float)
- _logger = logging.getLogger(__name__)
- def _clean_attribute(
- key: str, value: types.AttributeValue, max_len: Optional[int]
- ) -> Optional[Union[types.AttributeValue, Tuple[Union[str, int, float], ...]]]:
- """Checks if attribute value is valid and cleans it if required.
- The function returns the cleaned value or None if the value is not valid.
- An attribute value is valid if it is either:
- - A primitive type: string, boolean, double precision floating
- point (IEEE 754-1985) or integer.
- - An array of primitive type values. The array MUST be homogeneous,
- i.e. it MUST NOT contain values of different types.
- An attribute needs cleansing if:
- - Its length is greater than the maximum allowed length.
- - It needs to be encoded/decoded e.g, bytes to strings.
- """
- if not (key and isinstance(key, str)):
- _logger.warning("invalid key `%s`. must be non-empty string.", key)
- return None
- if isinstance(value, _VALID_ATTR_VALUE_TYPES):
- return _clean_attribute_value(value, max_len)
- if isinstance(value, Sequence):
- sequence_first_valid_type = None
- cleaned_seq = []
- for element in value:
- element = _clean_attribute_value(element, max_len) # type: ignore
- if element is None:
- cleaned_seq.append(element)
- continue
- element_type = type(element)
- # Reject attribute value if sequence contains a value with an incompatible type.
- if element_type not in _VALID_ATTR_VALUE_TYPES:
- _logger.warning(
- "Invalid type %s in attribute '%s' value sequence. Expected one of "
- "%s or None",
- element_type.__name__,
- key,
- [
- valid_type.__name__
- for valid_type in _VALID_ATTR_VALUE_TYPES
- ],
- )
- return None
- # The type of the sequence must be homogeneous. The first non-None
- # element determines the type of the sequence
- if sequence_first_valid_type is None:
- sequence_first_valid_type = element_type
- # use equality instead of isinstance as isinstance(True, int) evaluates to True
- elif element_type != sequence_first_valid_type:
- _logger.warning(
- "Attribute %r mixes types %s and %s in attribute value sequence",
- key,
- sequence_first_valid_type.__name__,
- type(element).__name__,
- )
- return None
- cleaned_seq.append(element)
- # Freeze mutable sequences defensively
- return tuple(cleaned_seq)
- _logger.warning(
- "Invalid type %s for attribute '%s' value. Expected one of %s or a "
- "sequence of those types",
- type(value).__name__,
- key,
- [valid_type.__name__ for valid_type in _VALID_ATTR_VALUE_TYPES],
- )
- return None
- def _clean_attribute_value(
- value: types.AttributeValue, limit: Optional[int]
- ) -> Optional[types.AttributeValue]:
- if value is None:
- return None
- if isinstance(value, bytes):
- try:
- value = value.decode()
- except UnicodeDecodeError:
- _logger.warning("Byte attribute could not be decoded.")
- return None
- if limit is not None and isinstance(value, str):
- value = value[:limit]
- return value
- class BoundedAttributes(MutableMapping): # type: ignore
- """An ordered dict with a fixed max capacity.
- Oldest elements are dropped when the dict is full and a new element is
- added.
- """
- def __init__(
- self,
- maxlen: Optional[int] = None,
- attributes: types.Attributes = None,
- immutable: bool = True,
- max_value_len: Optional[int] = None,
- ):
- if maxlen is not None:
- if not isinstance(maxlen, int) or maxlen < 0:
- raise ValueError(
- "maxlen must be valid int greater or equal to 0"
- )
- self.maxlen = maxlen
- self.dropped = 0
- self.max_value_len = max_value_len
- # OrderedDict is not used until the maxlen is reached for efficiency.
- self._dict: Union[
- MutableMapping[str, types.AttributeValue],
- OrderedDict[str, types.AttributeValue],
- ] = {}
- self._lock = threading.RLock()
- if attributes:
- for key, value in attributes.items():
- self[key] = value
- self._immutable = immutable
- def __repr__(self) -> str:
- return f"{dict(self._dict)}"
- def __getitem__(self, key: str) -> types.AttributeValue:
- return self._dict[key]
- def __setitem__(self, key: str, value: types.AttributeValue) -> None:
- if getattr(self, "_immutable", False): # type: ignore
- raise TypeError
- with self._lock:
- if self.maxlen is not None and self.maxlen == 0:
- self.dropped += 1
- return
- value = _clean_attribute(key, value, self.max_value_len) # type: ignore
- if value is not None:
- if key in self._dict:
- del self._dict[key]
- elif (
- self.maxlen is not None and len(self._dict) == self.maxlen
- ):
- if not isinstance(self._dict, OrderedDict):
- self._dict = OrderedDict(self._dict)
- self._dict.popitem(last=False) # type: ignore
- self.dropped += 1
- self._dict[key] = value # type: ignore
- def __delitem__(self, key: str) -> None:
- if getattr(self, "_immutable", False): # type: ignore
- raise TypeError
- with self._lock:
- del self._dict[key]
- def __iter__(self): # type: ignore
- with self._lock:
- return iter(self._dict.copy()) # type: ignore
- def __len__(self) -> int:
- return len(self._dict)
- def copy(self): # type: ignore
- return self._dict.copy() # type: ignore
|