1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 |
- # Copyright The OpenTelemetry Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from __future__ import annotations
- from contextvars import ContextVar, Token
- from opentelemetry.context.context import Context, _RuntimeContext
- class ContextVarsRuntimeContext(_RuntimeContext):
- """An implementation of the RuntimeContext interface which wraps ContextVar under
- the hood. This is the preferred implementation for usage with Python 3.5+
- """
- _CONTEXT_KEY = "current_context"
- def __init__(self) -> None:
- self._current_context = ContextVar(
- self._CONTEXT_KEY, default=Context()
- )
- def attach(self, context: Context) -> Token[Context]:
- """Sets the current `Context` object. Returns a
- token that can be used to reset to the previous `Context`.
- Args:
- context: The Context to set.
- """
- return self._current_context.set(context)
- def get_current(self) -> Context:
- """Returns the current `Context` object."""
- return self._current_context.get()
- def detach(self, token: Token[Context]) -> None:
- """Resets Context to a previous value
- Args:
- token: A reference to a previous Context.
- """
- self._current_context.reset(token)
- __all__ = ["ContextVarsRuntimeContext"]
|