12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267 |
- # Copyright 2016 gRPC authors.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Invocation-side implementation of gRPC Python."""
- import copy
- import functools
- import logging
- import os
- import sys
- import threading
- import time
- import types
- from typing import (
- Any,
- Callable,
- Dict,
- Iterator,
- List,
- Optional,
- Sequence,
- Set,
- Tuple,
- Union,
- )
- import grpc # pytype: disable=pyi-error
- from grpc import _common # pytype: disable=pyi-error
- from grpc import _compression # pytype: disable=pyi-error
- from grpc import _grpcio_metadata # pytype: disable=pyi-error
- from grpc import _observability # pytype: disable=pyi-error
- from grpc._cython import cygrpc
- from grpc._typing import ChannelArgumentType
- from grpc._typing import DeserializingFunction
- from grpc._typing import IntegratedCallFactory
- from grpc._typing import MetadataType
- from grpc._typing import NullaryCallbackType
- from grpc._typing import ResponseType
- from grpc._typing import SerializingFunction
- from grpc._typing import UserTag
- import grpc.experimental # pytype: disable=pyi-error
- _LOGGER = logging.getLogger(__name__)
- _USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__)
- _EMPTY_FLAGS = 0
- # NOTE(rbellevi): No guarantees are given about the maintenance of this
- # environment variable.
- _DEFAULT_SINGLE_THREADED_UNARY_STREAM = (
- os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
- )
- _UNARY_UNARY_INITIAL_DUE = (
- cygrpc.OperationType.send_initial_metadata,
- cygrpc.OperationType.send_message,
- cygrpc.OperationType.send_close_from_client,
- cygrpc.OperationType.receive_initial_metadata,
- cygrpc.OperationType.receive_message,
- cygrpc.OperationType.receive_status_on_client,
- )
- _UNARY_STREAM_INITIAL_DUE = (
- cygrpc.OperationType.send_initial_metadata,
- cygrpc.OperationType.send_message,
- cygrpc.OperationType.send_close_from_client,
- cygrpc.OperationType.receive_initial_metadata,
- cygrpc.OperationType.receive_status_on_client,
- )
- _STREAM_UNARY_INITIAL_DUE = (
- cygrpc.OperationType.send_initial_metadata,
- cygrpc.OperationType.receive_initial_metadata,
- cygrpc.OperationType.receive_message,
- cygrpc.OperationType.receive_status_on_client,
- )
- _STREAM_STREAM_INITIAL_DUE = (
- cygrpc.OperationType.send_initial_metadata,
- cygrpc.OperationType.receive_initial_metadata,
- cygrpc.OperationType.receive_status_on_client,
- )
- _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
- "Exception calling channel subscription callback!"
- )
- _OK_RENDEZVOUS_REPR_FORMAT = (
- '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
- )
- _NON_OK_RENDEZVOUS_REPR_FORMAT = (
- "<{} of RPC that terminated with:\n"
- "\tstatus = {}\n"
- '\tdetails = "{}"\n'
- '\tdebug_error_string = "{}"\n'
- ">"
- )
- def _deadline(timeout: Optional[float]) -> Optional[float]:
- return None if timeout is None else time.time() + timeout
- def _unknown_code_details(
- unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str]
- ) -> str:
- return 'Server sent unknown code {} and details "{}"'.format(
- unknown_cygrpc_code, details
- )
- class _RPCState(object):
- condition: threading.Condition
- due: Set[cygrpc.OperationType]
- initial_metadata: Optional[MetadataType]
- response: Any
- trailing_metadata: Optional[MetadataType]
- code: Optional[grpc.StatusCode]
- details: Optional[str]
- debug_error_string: Optional[str]
- cancelled: bool
- callbacks: List[NullaryCallbackType]
- fork_epoch: Optional[int]
- rpc_start_time: Optional[float] # In relative seconds
- rpc_end_time: Optional[float] # In relative seconds
- method: Optional[str]
- target: Optional[str]
- def __init__(
- self,
- due: Sequence[cygrpc.OperationType],
- initial_metadata: Optional[MetadataType],
- trailing_metadata: Optional[MetadataType],
- code: Optional[grpc.StatusCode],
- details: Optional[str],
- ):
- # `condition` guards all members of _RPCState. `notify_all` is called on
- # `condition` when the state of the RPC has changed.
- self.condition = threading.Condition()
- # The cygrpc.OperationType objects representing events due from the RPC's
- # completion queue. If an operation is in `due`, it is guaranteed that
- # `operate()` has been called on a corresponding operation. But the
- # converse is not true. That is, in the case of failed `operate()`
- # calls, there may briefly be events in `due` that do not correspond to
- # operations submitted to Core.
- self.due = set(due)
- self.initial_metadata = initial_metadata
- self.response = None
- self.trailing_metadata = trailing_metadata
- self.code = code
- self.details = details
- self.debug_error_string = None
- # The following three fields are used for observability.
- # Updates to those fields do not trigger self.condition.
- self.rpc_start_time = None
- self.rpc_end_time = None
- self.method = None
- self.target = None
- # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
- # slightly wonky, so they have to be tracked separately from the rest of the
- # result of the RPC. This field tracks whether cancellation was requested
- # prior to termination of the RPC.
- self.cancelled = False
- self.callbacks = []
- self.fork_epoch = cygrpc.get_fork_epoch()
- def reset_postfork_child(self):
- self.condition = threading.Condition()
- def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None:
- if state.code is None:
- state.code = code
- state.details = details
- if state.initial_metadata is None:
- state.initial_metadata = ()
- state.trailing_metadata = ()
- def _handle_event(
- event: cygrpc.BaseEvent,
- state: _RPCState,
- response_deserializer: Optional[DeserializingFunction],
- ) -> List[NullaryCallbackType]:
- callbacks = []
- for batch_operation in event.batch_operations:
- operation_type = batch_operation.type()
- state.due.remove(operation_type)
- if operation_type == cygrpc.OperationType.receive_initial_metadata:
- state.initial_metadata = batch_operation.initial_metadata()
- elif operation_type == cygrpc.OperationType.receive_message:
- serialized_response = batch_operation.message()
- if serialized_response is not None:
- response = _common.deserialize(
- serialized_response, response_deserializer
- )
- if response is None:
- details = "Exception deserializing response!"
- _abort(state, grpc.StatusCode.INTERNAL, details)
- else:
- state.response = response
- elif operation_type == cygrpc.OperationType.receive_status_on_client:
- state.trailing_metadata = batch_operation.trailing_metadata()
- if state.code is None:
- code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
- batch_operation.code()
- )
- if code is None:
- state.code = grpc.StatusCode.UNKNOWN
- state.details = _unknown_code_details(
- code, batch_operation.details()
- )
- else:
- state.code = code
- state.details = batch_operation.details()
- state.debug_error_string = batch_operation.error_string()
- state.rpc_end_time = time.perf_counter()
- _observability.maybe_record_rpc_latency(state)
- callbacks.extend(state.callbacks)
- state.callbacks = None
- return callbacks
- def _event_handler(
- state: _RPCState, response_deserializer: Optional[DeserializingFunction]
- ) -> UserTag:
- def handle_event(event):
- with state.condition:
- callbacks = _handle_event(event, state, response_deserializer)
- state.condition.notify_all()
- done = not state.due
- for callback in callbacks:
- try:
- callback()
- except Exception as e: # pylint: disable=broad-except
- # NOTE(rbellevi): We suppress but log errors here so as not to
- # kill the channel spin thread.
- logging.error(
- "Exception in callback %s: %s", repr(callback.func), repr(e)
- )
- return done and state.fork_epoch >= cygrpc.get_fork_epoch()
- return handle_event
- # TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall.
- # pylint: disable=too-many-statements
- def _consume_request_iterator(
- request_iterator: Iterator,
- state: _RPCState,
- call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall],
- request_serializer: SerializingFunction,
- event_handler: Optional[UserTag],
- ) -> None:
- """Consume a request supplied by the user."""
- def consume_request_iterator(): # pylint: disable=too-many-branches
- # Iterate over the request iterator until it is exhausted or an error
- # condition is encountered.
- while True:
- return_from_user_request_generator_invoked = False
- try:
- # The thread may die in user-code. Do not block fork for this.
- cygrpc.enter_user_request_generator()
- request = next(request_iterator)
- except StopIteration:
- break
- except Exception: # pylint: disable=broad-except
- cygrpc.return_from_user_request_generator()
- return_from_user_request_generator_invoked = True
- code = grpc.StatusCode.UNKNOWN
- details = "Exception iterating requests!"
- _LOGGER.exception(details)
- call.cancel(
- _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
- )
- _abort(state, code, details)
- return
- finally:
- if not return_from_user_request_generator_invoked:
- cygrpc.return_from_user_request_generator()
- serialized_request = _common.serialize(request, request_serializer)
- with state.condition:
- if state.code is None and not state.cancelled:
- if serialized_request is None:
- code = grpc.StatusCode.INTERNAL
- details = "Exception serializing request!"
- call.cancel(
- _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
- details,
- )
- _abort(state, code, details)
- return
- else:
- state.due.add(cygrpc.OperationType.send_message)
- operations = (
- cygrpc.SendMessageOperation(
- serialized_request, _EMPTY_FLAGS
- ),
- )
- operating = call.operate(operations, event_handler)
- if not operating:
- state.due.remove(cygrpc.OperationType.send_message)
- return
- def _done():
- return (
- state.code is not None
- or cygrpc.OperationType.send_message
- not in state.due
- )
- _common.wait(
- state.condition.wait,
- _done,
- spin_cb=functools.partial(
- cygrpc.block_if_fork_in_progress, state
- ),
- )
- if state.code is not None:
- return
- else:
- return
- with state.condition:
- if state.code is None:
- state.due.add(cygrpc.OperationType.send_close_from_client)
- operations = (
- cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
- )
- operating = call.operate(operations, event_handler)
- if not operating:
- state.due.remove(
- cygrpc.OperationType.send_close_from_client
- )
- consumption_thread = cygrpc.ForkManagedThread(
- target=consume_request_iterator
- )
- consumption_thread.setDaemon(True)
- consumption_thread.start()
- def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str:
- """Calculates error string for RPC."""
- with rpc_state.condition:
- if rpc_state.code is None:
- return "<{} object>".format(class_name)
- elif rpc_state.code is grpc.StatusCode.OK:
- return _OK_RENDEZVOUS_REPR_FORMAT.format(
- class_name, rpc_state.code, rpc_state.details
- )
- else:
- return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
- class_name,
- rpc_state.code,
- rpc_state.details,
- rpc_state.debug_error_string,
- )
- class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
- """An RPC error not tied to the execution of a particular RPC.
- The RPC represented by the state object must not be in-progress or
- cancelled.
- Attributes:
- _state: An instance of _RPCState.
- """
- _state: _RPCState
- def __init__(self, state: _RPCState):
- with state.condition:
- self._state = _RPCState(
- (),
- copy.deepcopy(state.initial_metadata),
- copy.deepcopy(state.trailing_metadata),
- state.code,
- copy.deepcopy(state.details),
- )
- self._state.response = copy.copy(state.response)
- self._state.debug_error_string = copy.copy(state.debug_error_string)
- def initial_metadata(self) -> Optional[MetadataType]:
- return self._state.initial_metadata
- def trailing_metadata(self) -> Optional[MetadataType]:
- return self._state.trailing_metadata
- def code(self) -> Optional[grpc.StatusCode]:
- return self._state.code
- def details(self) -> Optional[str]:
- return _common.decode(self._state.details)
- def debug_error_string(self) -> Optional[str]:
- return _common.decode(self._state.debug_error_string)
- def _repr(self) -> str:
- return _rpc_state_string(self.__class__.__name__, self._state)
- def __repr__(self) -> str:
- return self._repr()
- def __str__(self) -> str:
- return self._repr()
- def cancel(self) -> bool:
- """See grpc.Future.cancel."""
- return False
- def cancelled(self) -> bool:
- """See grpc.Future.cancelled."""
- return False
- def running(self) -> bool:
- """See grpc.Future.running."""
- return False
- def done(self) -> bool:
- """See grpc.Future.done."""
- return True
- def result(
- self, timeout: Optional[float] = None
- ) -> Any: # pylint: disable=unused-argument
- """See grpc.Future.result."""
- raise self
- def exception(
- self, timeout: Optional[float] = None # pylint: disable=unused-argument
- ) -> Optional[Exception]:
- """See grpc.Future.exception."""
- return self
- def traceback(
- self, timeout: Optional[float] = None # pylint: disable=unused-argument
- ) -> Optional[types.TracebackType]:
- """See grpc.Future.traceback."""
- try:
- raise self
- except grpc.RpcError:
- return sys.exc_info()[2]
- def add_done_callback(
- self,
- fn: Callable[[grpc.Future], None],
- timeout: Optional[float] = None, # pylint: disable=unused-argument
- ) -> None:
- """See grpc.Future.add_done_callback."""
- fn(self)
- class _Rendezvous(grpc.RpcError, grpc.RpcContext):
- """An RPC iterator.
- Attributes:
- _state: An instance of _RPCState.
- _call: An instance of SegregatedCall or IntegratedCall.
- In either case, the _call object is expected to have operate, cancel,
- and next_event methods.
- _response_deserializer: A callable taking bytes and return a Python
- object.
- _deadline: A float representing the deadline of the RPC in seconds. Or
- possibly None, to represent an RPC with no deadline at all.
- """
- _state: _RPCState
- _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall]
- _response_deserializer: Optional[DeserializingFunction]
- _deadline: Optional[float]
- def __init__(
- self,
- state: _RPCState,
- call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall],
- response_deserializer: Optional[DeserializingFunction],
- deadline: Optional[float],
- ):
- super(_Rendezvous, self).__init__()
- self._state = state
- self._call = call
- self._response_deserializer = response_deserializer
- self._deadline = deadline
- def is_active(self) -> bool:
- """See grpc.RpcContext.is_active"""
- with self._state.condition:
- return self._state.code is None
- def time_remaining(self) -> Optional[float]:
- """See grpc.RpcContext.time_remaining"""
- with self._state.condition:
- if self._deadline is None:
- return None
- else:
- return max(self._deadline - time.time(), 0)
- def cancel(self) -> bool:
- """See grpc.RpcContext.cancel"""
- with self._state.condition:
- if self._state.code is None:
- code = grpc.StatusCode.CANCELLED
- details = "Locally cancelled by application!"
- self._call.cancel(
- _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
- )
- self._state.cancelled = True
- _abort(self._state, code, details)
- self._state.condition.notify_all()
- return True
- else:
- return False
- def add_callback(self, callback: NullaryCallbackType) -> bool:
- """See grpc.RpcContext.add_callback"""
- with self._state.condition:
- if self._state.callbacks is None:
- return False
- else:
- self._state.callbacks.append(callback)
- return True
- def __iter__(self):
- return self
- def next(self):
- return self._next()
- def __next__(self):
- return self._next()
- def _next(self):
- raise NotImplementedError()
- def debug_error_string(self) -> Optional[str]:
- raise NotImplementedError()
- def _repr(self) -> str:
- return _rpc_state_string(self.__class__.__name__, self._state)
- def __repr__(self) -> str:
- return self._repr()
- def __str__(self) -> str:
- return self._repr()
- def __del__(self) -> None:
- with self._state.condition:
- if self._state.code is None:
- self._state.code = grpc.StatusCode.CANCELLED
- self._state.details = "Cancelled upon garbage collection!"
- self._state.cancelled = True
- self._call.cancel(
- _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
- self._state.details,
- )
- self._state.condition.notify_all()
- class _SingleThreadedRendezvous(
- _Rendezvous, grpc.Call, grpc.Future
- ): # pylint: disable=too-many-ancestors
- """An RPC iterator operating entirely on a single thread.
- The __next__ method of _SingleThreadedRendezvous does not depend on the
- existence of any other thread, including the "channel spin thread".
- However, this means that its interface is entirely synchronous. So this
- class cannot completely fulfill the grpc.Future interface. The result,
- exception, and traceback methods will never block and will instead raise
- an exception if calling the method would result in blocking.
- This means that these methods are safe to call from add_done_callback
- handlers.
- """
- _state: _RPCState
- def _is_complete(self) -> bool:
- return self._state.code is not None
- def cancelled(self) -> bool:
- with self._state.condition:
- return self._state.cancelled
- def running(self) -> bool:
- with self._state.condition:
- return self._state.code is None
- def done(self) -> bool:
- with self._state.condition:
- return self._state.code is not None
- def result(self, timeout: Optional[float] = None) -> Any:
- """Returns the result of the computation or raises its exception.
- This method will never block. Instead, it will raise an exception
- if calling this method would otherwise result in blocking.
- Since this method will never block, any `timeout` argument passed will
- be ignored.
- """
- del timeout
- with self._state.condition:
- if not self._is_complete():
- raise grpc.experimental.UsageError(
- "_SingleThreadedRendezvous only supports result() when the"
- " RPC is complete."
- )
- if self._state.code is grpc.StatusCode.OK:
- return self._state.response
- elif self._state.cancelled:
- raise grpc.FutureCancelledError()
- else:
- raise self
- def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
- """Return the exception raised by the computation.
- This method will never block. Instead, it will raise an exception
- if calling this method would otherwise result in blocking.
- Since this method will never block, any `timeout` argument passed will
- be ignored.
- """
- del timeout
- with self._state.condition:
- if not self._is_complete():
- raise grpc.experimental.UsageError(
- "_SingleThreadedRendezvous only supports exception() when"
- " the RPC is complete."
- )
- if self._state.code is grpc.StatusCode.OK:
- return None
- elif self._state.cancelled:
- raise grpc.FutureCancelledError()
- else:
- return self
- def traceback(
- self, timeout: Optional[float] = None
- ) -> Optional[types.TracebackType]:
- """Access the traceback of the exception raised by the computation.
- This method will never block. Instead, it will raise an exception
- if calling this method would otherwise result in blocking.
- Since this method will never block, any `timeout` argument passed will
- be ignored.
- """
- del timeout
- with self._state.condition:
- if not self._is_complete():
- raise grpc.experimental.UsageError(
- "_SingleThreadedRendezvous only supports traceback() when"
- " the RPC is complete."
- )
- if self._state.code is grpc.StatusCode.OK:
- return None
- elif self._state.cancelled:
- raise grpc.FutureCancelledError()
- else:
- try:
- raise self
- except grpc.RpcError:
- return sys.exc_info()[2]
- def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
- with self._state.condition:
- if self._state.code is None:
- self._state.callbacks.append(functools.partial(fn, self))
- return
- fn(self)
- def initial_metadata(self) -> Optional[MetadataType]:
- """See grpc.Call.initial_metadata"""
- with self._state.condition:
- # NOTE(gnossen): Based on our initial call batch, we are guaranteed
- # to receive initial metadata before any messages.
- while self._state.initial_metadata is None:
- self._consume_next_event()
- return self._state.initial_metadata
- def trailing_metadata(self) -> Optional[MetadataType]:
- """See grpc.Call.trailing_metadata"""
- with self._state.condition:
- if self._state.trailing_metadata is None:
- raise grpc.experimental.UsageError(
- "Cannot get trailing metadata until RPC is completed."
- )
- return self._state.trailing_metadata
- def code(self) -> Optional[grpc.StatusCode]:
- """See grpc.Call.code"""
- with self._state.condition:
- if self._state.code is None:
- raise grpc.experimental.UsageError(
- "Cannot get code until RPC is completed."
- )
- return self._state.code
- def details(self) -> Optional[str]:
- """See grpc.Call.details"""
- with self._state.condition:
- if self._state.details is None:
- raise grpc.experimental.UsageError(
- "Cannot get details until RPC is completed."
- )
- return _common.decode(self._state.details)
- def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]:
- event = self._call.next_event()
- with self._state.condition:
- callbacks = _handle_event(
- event, self._state, self._response_deserializer
- )
- for callback in callbacks:
- # NOTE(gnossen): We intentionally allow exceptions to bubble up
- # to the user when running on a single thread.
- callback()
- return event
- def _next_response(self) -> Any:
- while True:
- self._consume_next_event()
- with self._state.condition:
- if self._state.response is not None:
- response = self._state.response
- self._state.response = None
- return response
- elif (
- cygrpc.OperationType.receive_message not in self._state.due
- ):
- if self._state.code is grpc.StatusCode.OK:
- raise StopIteration()
- elif self._state.code is not None:
- raise self
- def _next(self) -> Any:
- with self._state.condition:
- if self._state.code is None:
- # We tentatively add the operation as expected and remove
- # it if the enqueue operation fails. This allows us to guarantee that
- # if an event has been submitted to the core completion queue,
- # it is in `due`. If we waited until after a successful
- # enqueue operation then a signal could interrupt this
- # thread between the enqueue operation and the addition of the
- # operation to `due`. This would cause an exception on the
- # channel spin thread when the operation completes and no
- # corresponding operation would be present in state.due.
- # Note that, since `condition` is held through this block, there is
- # no data race on `due`.
- self._state.due.add(cygrpc.OperationType.receive_message)
- operating = self._call.operate(
- (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None
- )
- if not operating:
- self._state.due.remove(cygrpc.OperationType.receive_message)
- elif self._state.code is grpc.StatusCode.OK:
- raise StopIteration()
- else:
- raise self
- return self._next_response()
- def debug_error_string(self) -> Optional[str]:
- with self._state.condition:
- if self._state.debug_error_string is None:
- raise grpc.experimental.UsageError(
- "Cannot get debug error string until RPC is completed."
- )
- return _common.decode(self._state.debug_error_string)
- class _MultiThreadedRendezvous(
- _Rendezvous, grpc.Call, grpc.Future
- ): # pylint: disable=too-many-ancestors
- """An RPC iterator that depends on a channel spin thread.
- This iterator relies upon a per-channel thread running in the background,
- dequeueing events from the completion queue, and notifying threads waiting
- on the threading.Condition object in the _RPCState object.
- This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
- and to mediate a bidirection streaming RPC.
- """
- _state: _RPCState
- def initial_metadata(self) -> Optional[MetadataType]:
- """See grpc.Call.initial_metadata"""
- with self._state.condition:
- def _done():
- return self._state.initial_metadata is not None
- _common.wait(self._state.condition.wait, _done)
- return self._state.initial_metadata
- def trailing_metadata(self) -> Optional[MetadataType]:
- """See grpc.Call.trailing_metadata"""
- with self._state.condition:
- def _done():
- return self._state.trailing_metadata is not None
- _common.wait(self._state.condition.wait, _done)
- return self._state.trailing_metadata
- def code(self) -> Optional[grpc.StatusCode]:
- """See grpc.Call.code"""
- with self._state.condition:
- def _done():
- return self._state.code is not None
- _common.wait(self._state.condition.wait, _done)
- return self._state.code
- def details(self) -> Optional[str]:
- """See grpc.Call.details"""
- with self._state.condition:
- def _done():
- return self._state.details is not None
- _common.wait(self._state.condition.wait, _done)
- return _common.decode(self._state.details)
- def debug_error_string(self) -> Optional[str]:
- with self._state.condition:
- def _done():
- return self._state.debug_error_string is not None
- _common.wait(self._state.condition.wait, _done)
- return _common.decode(self._state.debug_error_string)
- def cancelled(self) -> bool:
- with self._state.condition:
- return self._state.cancelled
- def running(self) -> bool:
- with self._state.condition:
- return self._state.code is None
- def done(self) -> bool:
- with self._state.condition:
- return self._state.code is not None
- def _is_complete(self) -> bool:
- return self._state.code is not None
- def result(self, timeout: Optional[float] = None) -> Any:
- """Returns the result of the computation or raises its exception.
- See grpc.Future.result for the full API contract.
- """
- with self._state.condition:
- timed_out = _common.wait(
- self._state.condition.wait, self._is_complete, timeout=timeout
- )
- if timed_out:
- raise grpc.FutureTimeoutError()
- else:
- if self._state.code is grpc.StatusCode.OK:
- return self._state.response
- elif self._state.cancelled:
- raise grpc.FutureCancelledError()
- else:
- raise self
- def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
- """Return the exception raised by the computation.
- See grpc.Future.exception for the full API contract.
- """
- with self._state.condition:
- timed_out = _common.wait(
- self._state.condition.wait, self._is_complete, timeout=timeout
- )
- if timed_out:
- raise grpc.FutureTimeoutError()
- else:
- if self._state.code is grpc.StatusCode.OK:
- return None
- elif self._state.cancelled:
- raise grpc.FutureCancelledError()
- else:
- return self
- def traceback(
- self, timeout: Optional[float] = None
- ) -> Optional[types.TracebackType]:
- """Access the traceback of the exception raised by the computation.
- See grpc.future.traceback for the full API contract.
- """
- with self._state.condition:
- timed_out = _common.wait(
- self._state.condition.wait, self._is_complete, timeout=timeout
- )
- if timed_out:
- raise grpc.FutureTimeoutError()
- else:
- if self._state.code is grpc.StatusCode.OK:
- return None
- elif self._state.cancelled:
- raise grpc.FutureCancelledError()
- else:
- try:
- raise self
- except grpc.RpcError:
- return sys.exc_info()[2]
- def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
- with self._state.condition:
- if self._state.code is None:
- self._state.callbacks.append(functools.partial(fn, self))
- return
- fn(self)
- def _next(self) -> Any:
- with self._state.condition:
- if self._state.code is None:
- event_handler = _event_handler(
- self._state, self._response_deserializer
- )
- self._state.due.add(cygrpc.OperationType.receive_message)
- operating = self._call.operate(
- (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
- event_handler,
- )
- if not operating:
- self._state.due.remove(cygrpc.OperationType.receive_message)
- elif self._state.code is grpc.StatusCode.OK:
- raise StopIteration()
- else:
- raise self
- def _response_ready():
- return self._state.response is not None or (
- cygrpc.OperationType.receive_message not in self._state.due
- and self._state.code is not None
- )
- _common.wait(self._state.condition.wait, _response_ready)
- if self._state.response is not None:
- response = self._state.response
- self._state.response = None
- return response
- elif cygrpc.OperationType.receive_message not in self._state.due:
- if self._state.code is grpc.StatusCode.OK:
- raise StopIteration()
- elif self._state.code is not None:
- raise self
- def _start_unary_request(
- request: Any,
- timeout: Optional[float],
- request_serializer: SerializingFunction,
- ) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]:
- deadline = _deadline(timeout)
- serialized_request = _common.serialize(request, request_serializer)
- if serialized_request is None:
- state = _RPCState(
- (),
- (),
- (),
- grpc.StatusCode.INTERNAL,
- "Exception serializing request!",
- )
- error = _InactiveRpcError(state)
- return deadline, None, error
- else:
- return deadline, serialized_request, None
- def _end_unary_response_blocking(
- state: _RPCState,
- call: cygrpc.SegregatedCall,
- with_call: bool,
- deadline: Optional[float],
- ) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
- if state.code is grpc.StatusCode.OK:
- if with_call:
- rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
- return state.response, rendezvous
- else:
- return state.response
- else:
- raise _InactiveRpcError(state) # pytype: disable=not-instantiable
- def _stream_unary_invocation_operations(
- metadata: Optional[MetadataType], initial_metadata_flags: int
- ) -> Sequence[Sequence[cygrpc.Operation]]:
- return (
- (
- cygrpc.SendInitialMetadataOperation(
- metadata, initial_metadata_flags
- ),
- cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- ),
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
- )
- def _stream_unary_invocation_operations_and_tags(
- metadata: Optional[MetadataType], initial_metadata_flags: int
- ) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]:
- return tuple(
- (
- operations,
- None,
- )
- for operations in _stream_unary_invocation_operations(
- metadata, initial_metadata_flags
- )
- )
- def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]:
- parent_deadline = cygrpc.get_deadline_from_context()
- if parent_deadline is None and user_deadline is None:
- return None
- elif parent_deadline is not None and user_deadline is None:
- return parent_deadline
- elif user_deadline is not None and parent_deadline is None:
- return user_deadline
- else:
- return min(parent_deadline, user_deadline)
- class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
- _channel: cygrpc.Channel
- _managed_call: IntegratedCallFactory
- _method: bytes
- _target: bytes
- _request_serializer: Optional[SerializingFunction]
- _response_deserializer: Optional[DeserializingFunction]
- _context: Any
- _registered_call_handle: Optional[int]
- __slots__ = [
- "_channel",
- "_managed_call",
- "_method",
- "_target",
- "_request_serializer",
- "_response_deserializer",
- "_context",
- ]
- # pylint: disable=too-many-arguments
- def __init__(
- self,
- channel: cygrpc.Channel,
- managed_call: IntegratedCallFactory,
- method: bytes,
- target: bytes,
- request_serializer: Optional[SerializingFunction],
- response_deserializer: Optional[DeserializingFunction],
- _registered_call_handle: Optional[int],
- ):
- self._channel = channel
- self._managed_call = managed_call
- self._method = method
- self._target = target
- self._request_serializer = request_serializer
- self._response_deserializer = response_deserializer
- self._context = cygrpc.build_census_context()
- self._registered_call_handle = _registered_call_handle
- def _prepare(
- self,
- request: Any,
- timeout: Optional[float],
- metadata: Optional[MetadataType],
- wait_for_ready: Optional[bool],
- compression: Optional[grpc.Compression],
- ) -> Tuple[
- Optional[_RPCState],
- Optional[Sequence[cygrpc.Operation]],
- Optional[float],
- Optional[grpc.RpcError],
- ]:
- deadline, serialized_request, rendezvous = _start_unary_request(
- request, timeout, self._request_serializer
- )
- initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
- wait_for_ready
- )
- augmented_metadata = _compression.augment_metadata(
- metadata, compression
- )
- if serialized_request is None:
- return None, None, None, rendezvous
- else:
- state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
- operations = (
- cygrpc.SendInitialMetadataOperation(
- augmented_metadata, initial_metadata_flags
- ),
- cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
- cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
- return state, operations, deadline, None
- def _blocking(
- self,
- request: Any,
- timeout: Optional[float] = None,
- metadata: Optional[MetadataType] = None,
- credentials: Optional[grpc.CallCredentials] = None,
- wait_for_ready: Optional[bool] = None,
- compression: Optional[grpc.Compression] = None,
- ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
- state, operations, deadline, rendezvous = self._prepare(
- request, timeout, metadata, wait_for_ready, compression
- )
- if state is None:
- raise rendezvous # pylint: disable-msg=raising-bad-type
- else:
- state.rpc_start_time = time.perf_counter()
- state.method = _common.decode(self._method)
- state.target = _common.decode(self._target)
- call = self._channel.segregated_call(
- cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
- self._method,
- None,
- _determine_deadline(deadline),
- metadata,
- None if credentials is None else credentials._credentials,
- (
- (
- operations,
- None,
- ),
- ),
- self._context,
- self._registered_call_handle,
- )
- event = call.next_event()
- _handle_event(event, state, self._response_deserializer)
- return state, call
- def __call__(
- self,
- request: Any,
- timeout: Optional[float] = None,
- metadata: Optional[MetadataType] = None,
- credentials: Optional[grpc.CallCredentials] = None,
- wait_for_ready: Optional[bool] = None,
- compression: Optional[grpc.Compression] = None,
- ) -> Any:
- (
- state,
- call,
- ) = self._blocking(
- request, timeout, metadata, credentials, wait_for_ready, compression
- )
- return _end_unary_response_blocking(state, call, False, None)
- def with_call(
- self,
- request: Any,
- timeout: Optional[float] = None,
- metadata: Optional[MetadataType] = None,
- credentials: Optional[grpc.CallCredentials] = None,
- wait_for_ready: Optional[bool] = None,
- compression: Optional[grpc.Compression] = None,
- ) -> Tuple[Any, grpc.Call]:
- (
- state,
- call,
- ) = self._blocking(
- request, timeout, metadata, credentials, wait_for_ready, compression
- )
- return _end_unary_response_blocking(state, call, True, None)
- def future(
- self,
- request: Any,
- timeout: Optional[float] = None,
- metadata: Optional[MetadataType] = None,
- credentials: Optional[grpc.CallCredentials] = None,
- wait_for_ready: Optional[bool] = None,
- compression: Optional[grpc.Compression] = None,
- ) -> _MultiThreadedRendezvous:
- state, operations, deadline, rendezvous = self._prepare(
- request, timeout, metadata, wait_for_ready, compression
- )
- if state is None:
- raise rendezvous # pylint: disable-msg=raising-bad-type
- else:
- event_handler = _event_handler(state, self._response_deserializer)
- state.rpc_start_time = time.perf_counter()
- state.method = _common.decode(self._method)
- state.target = _common.decode(self._target)
- call = self._managed_call(
- cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
- self._method,
- None,
- deadline,
- metadata,
- None if credentials is None else credentials._credentials,
- (operations,),
- event_handler,
- self._context,
- self._registered_call_handle,
- )
- return _MultiThreadedRendezvous(
- state, call, self._response_deserializer, deadline
- )
- class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
- _channel: cygrpc.Channel
- _method: bytes
- _target: bytes
- _request_serializer: Optional[SerializingFunction]
- _response_deserializer: Optional[DeserializingFunction]
- _context: Any
- _registered_call_handle: Optional[int]
- __slots__ = [
- "_channel",
- "_method",
- "_target",
- "_request_serializer",
- "_response_deserializer",
- "_context",
- ]
- # pylint: disable=too-many-arguments
- def __init__(
- self,
- channel: cygrpc.Channel,
- method: bytes,
- target: bytes,
- request_serializer: SerializingFunction,
- response_deserializer: DeserializingFunction,
- _registered_call_handle: Optional[int],
- ):
- self._channel = channel
- self._method = method
- self._target = target
- self._request_serializer = request_serializer
- self._response_deserializer = response_deserializer
- self._context = cygrpc.build_census_context()
- self._registered_call_handle = _registered_call_handle
- def __call__( # pylint: disable=too-many-locals
- self,
- request: Any,
- timeout: Optional[float] = None,
- metadata: Optional[MetadataType] = None,
- credentials: Optional[grpc.CallCredentials] = None,
- wait_for_ready: Optional[bool] = None,
- compression: Optional[grpc.Compression] = None,
- ) -> _SingleThreadedRendezvous:
- deadline = _deadline(timeout)
- serialized_request = _common.serialize(
- request, self._request_serializer
- )
- if serialized_request is None:
- state = _RPCState(
- (),
- (),
- (),
- grpc.StatusCode.INTERNAL,
- "Exception serializing request!",
- )
- raise _InactiveRpcError(state)
- state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
- call_credentials = (
- None if credentials is None else credentials._credentials
- )
- initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
- wait_for_ready
- )
- augmented_metadata = _compression.augment_metadata(
- metadata, compression
- )
- operations = (
- (
- cygrpc.SendInitialMetadataOperation(
- augmented_metadata, initial_metadata_flags
- ),
- cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
- cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
- ),
- (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
- )
- operations_and_tags = tuple((ops, None) for ops in operations)
- state.rpc_start_time = time.perf_counter()
- state.method = _common.decode(self._method)
- state.target = _common.decode(self._target)
- call = self._channel.segregated_call(
- cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
- self._method,
- None,
- _determine_deadline(deadline),
- metadata,
- call_credentials,
- operations_and_tags,
- self._context,
- self._registered_call_handle,
- )
- return _SingleThreadedRendezvous(
- state, call, self._response_deserializer, deadline
- )
- class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
- _channel: cygrpc.Channel
- _managed_call: IntegratedCallFactory
- _method: bytes
- _target: bytes
- _request_serializer: Optional[SerializingFunction]
- _response_deserializer: Optional[DeserializingFunction]
- _context: Any
- _registered_call_handle: Optional[int]
- __slots__ = [
- "_channel",
- "_managed_call",
- "_method",
- "_target",
- "_request_serializer",
- "_response_deserializer",
- "_context",
- ]
- # pylint: disable=too-many-arguments
- def __init__(
- self,
- channel: cygrpc.Channel,
- managed_call: IntegratedCallFactory,
- method: bytes,
- target: bytes,
- request_serializer: SerializingFunction,
- response_deserializer: DeserializingFunction,
- _registered_call_handle: Optional[int],
- ):
- self._channel = channel
- self._managed_call = managed_call
- self._method = method
- self._target = target
- self._request_serializer = request_serializer
- self._response_deserializer = response_deserializer
- self._context = cygrpc.build_census_context()
- self._registered_call_handle = _registered_call_handle
- def __call__( # pylint: disable=too-many-locals
- self,
- request: Any,
- timeout: Optional[float] = None,
- metadata: Optional[MetadataType] = None,
- credentials: Optional[grpc.CallCredentials] = None,
- wait_for_ready: Optional[bool] = None,
- compression: Optional[grpc.Compression] = None,
- ) -> _MultiThreadedRendezvous:
- deadline, serialized_request, rendezvous = _start_unary_request(
- request, timeout, self._request_serializer
- )
- initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
- wait_for_ready
- )
- if serialized_request is None:
- raise rendezvous # pylint: disable-msg=raising-bad-type
- else:
- augmented_metadata = _compression.augment_metadata(
- metadata, compression
- )
- state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
- operations = (
- (
- cygrpc.SendInitialMetadataOperation(
- augmented_metadata, initial_metadata_flags
- ),
- cygrpc.SendMessageOperation(
- serialized_request, _EMPTY_FLAGS
- ),
- cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- ),
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
- )
- state.rpc_start_time = time.perf_counter()
- state.method = _common.decode(self._method)
- state.target = _common.decode(self._target)
- call = self._managed_call(
- cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
- self._method,
- None,
- _determine_deadline(deadline),
- metadata,
- None if credentials is None else credentials._credentials,
- operations,
- _event_handler(state, self._response_deserializer),
- self._context,
- self._registered_call_handle,
- )
- return _MultiThreadedRendezvous(
- state, call, self._response_deserializer, deadline
- )
- class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
- _channel: cygrpc.Channel
- _managed_call: IntegratedCallFactory
- _method: bytes
- _target: bytes
- _request_serializer: Optional[SerializingFunction]
- _response_deserializer: Optional[DeserializingFunction]
- _context: Any
- _registered_call_handle: Optional[int]
- __slots__ = [
- "_channel",
- "_managed_call",
- "_method",
- "_target",
- "_request_serializer",
- "_response_deserializer",
- "_context",
- ]
- # pylint: disable=too-many-arguments
- def __init__(
- self,
- channel: cygrpc.Channel,
- managed_call: IntegratedCallFactory,
- method: bytes,
- target: bytes,
- request_serializer: Optional[SerializingFunction],
- response_deserializer: Optional[DeserializingFunction],
- _registered_call_handle: Optional[int],
- ):
- self._channel = channel
- self._managed_call = managed_call
- self._method = method
- self._target = target
- self._request_serializer = request_serializer
- self._response_deserializer = response_deserializer
- self._context = cygrpc.build_census_context()
- self._registered_call_handle = _registered_call_handle
- def _blocking(
- self,
- request_iterator: Iterator,
- timeout: Optional[float],
- metadata: Optional[MetadataType],
- credentials: Optional[grpc.CallCredentials],
- wait_for_ready: Optional[bool],
- compression: Optional[grpc.Compression],
- ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
- deadline = _deadline(timeout)
- state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
- initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
- wait_for_ready
- )
- augmented_metadata = _compression.augment_metadata(
- metadata, compression
- )
- state.rpc_start_time = time.perf_counter()
- state.method = _common.decode(self._method)
- state.target = _common.decode(self._target)
- call = self._channel.segregated_call(
- cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
- self._method,
- None,
- _determine_deadline(deadline),
- augmented_metadata,
- None if credentials is None else credentials._credentials,
- _stream_unary_invocation_operations_and_tags(
- augmented_metadata, initial_metadata_flags
- ),
- self._context,
- self._registered_call_handle,
- )
- _consume_request_iterator(
- request_iterator, state, call, self._request_serializer, None
- )
- while True:
- event = call.next_event()
- with state.condition:
- _handle_event(event, state, self._response_deserializer)
- state.condition.notify_all()
- if not state.due:
- break
- return state, call
- def __call__(
- self,
- request_iterator: Iterator,
- timeout: Optional[float] = None,
- metadata: Optional[MetadataType] = None,
- credentials: Optional[grpc.CallCredentials] = None,
- wait_for_ready: Optional[bool] = None,
- compression: Optional[grpc.Compression] = None,
- ) -> Any:
- (
- state,
- call,
- ) = self._blocking(
- request_iterator,
- timeout,
- metadata,
- credentials,
- wait_for_ready,
- compression,
- )
- return _end_unary_response_blocking(state, call, False, None)
- def with_call(
- self,
- request_iterator: Iterator,
- timeout: Optional[float] = None,
- metadata: Optional[MetadataType] = None,
- credentials: Optional[grpc.CallCredentials] = None,
- wait_for_ready: Optional[bool] = None,
- compression: Optional[grpc.Compression] = None,
- ) -> Tuple[Any, grpc.Call]:
- (
- state,
- call,
- ) = self._blocking(
- request_iterator,
- timeout,
- metadata,
- credentials,
- wait_for_ready,
- compression,
- )
- return _end_unary_response_blocking(state, call, True, None)
- def future(
- self,
- request_iterator: Iterator,
- timeout: Optional[float] = None,
- metadata: Optional[MetadataType] = None,
- credentials: Optional[grpc.CallCredentials] = None,
- wait_for_ready: Optional[bool] = None,
- compression: Optional[grpc.Compression] = None,
- ) -> _MultiThreadedRendezvous:
- deadline = _deadline(timeout)
- state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
- event_handler = _event_handler(state, self._response_deserializer)
- initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
- wait_for_ready
- )
- augmented_metadata = _compression.augment_metadata(
- metadata, compression
- )
- state.rpc_start_time = time.perf_counter()
- state.method = _common.decode(self._method)
- state.target = _common.decode(self._target)
- call = self._managed_call(
- cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
- self._method,
- None,
- deadline,
- augmented_metadata,
- None if credentials is None else credentials._credentials,
- _stream_unary_invocation_operations(
- metadata, initial_metadata_flags
- ),
- event_handler,
- self._context,
- self._registered_call_handle,
- )
- _consume_request_iterator(
- request_iterator,
- state,
- call,
- self._request_serializer,
- event_handler,
- )
- return _MultiThreadedRendezvous(
- state, call, self._response_deserializer, deadline
- )
- class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
- _channel: cygrpc.Channel
- _managed_call: IntegratedCallFactory
- _method: bytes
- _target: bytes
- _request_serializer: Optional[SerializingFunction]
- _response_deserializer: Optional[DeserializingFunction]
- _context: Any
- _registered_call_handle: Optional[int]
- __slots__ = [
- "_channel",
- "_managed_call",
- "_method",
- "_target",
- "_request_serializer",
- "_response_deserializer",
- "_context",
- ]
- # pylint: disable=too-many-arguments
- def __init__(
- self,
- channel: cygrpc.Channel,
- managed_call: IntegratedCallFactory,
- method: bytes,
- target: bytes,
- request_serializer: Optional[SerializingFunction],
- response_deserializer: Optional[DeserializingFunction],
- _registered_call_handle: Optional[int],
- ):
- self._channel = channel
- self._managed_call = managed_call
- self._method = method
- self._target = target
- self._request_serializer = request_serializer
- self._response_deserializer = response_deserializer
- self._context = cygrpc.build_census_context()
- self._registered_call_handle = _registered_call_handle
- def __call__(
- self,
- request_iterator: Iterator,
- timeout: Optional[float] = None,
- metadata: Optional[MetadataType] = None,
- credentials: Optional[grpc.CallCredentials] = None,
- wait_for_ready: Optional[bool] = None,
- compression: Optional[grpc.Compression] = None,
- ) -> _MultiThreadedRendezvous:
- deadline = _deadline(timeout)
- state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
- initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
- wait_for_ready
- )
- augmented_metadata = _compression.augment_metadata(
- metadata, compression
- )
- operations = (
- (
- cygrpc.SendInitialMetadataOperation(
- augmented_metadata, initial_metadata_flags
- ),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- ),
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
- )
- event_handler = _event_handler(state, self._response_deserializer)
- state.rpc_start_time = time.perf_counter()
- state.method = _common.decode(self._method)
- state.target = _common.decode(self._target)
- call = self._managed_call(
- cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
- self._method,
- None,
- _determine_deadline(deadline),
- augmented_metadata,
- None if credentials is None else credentials._credentials,
- operations,
- event_handler,
- self._context,
- self._registered_call_handle,
- )
- _consume_request_iterator(
- request_iterator,
- state,
- call,
- self._request_serializer,
- event_handler,
- )
- return _MultiThreadedRendezvous(
- state, call, self._response_deserializer, deadline
- )
- class _InitialMetadataFlags(int):
- """Stores immutable initial metadata flags"""
- def __new__(cls, value: int = _EMPTY_FLAGS):
- value &= cygrpc.InitialMetadataFlags.used_mask
- return super(_InitialMetadataFlags, cls).__new__(cls, value)
- def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int:
- if wait_for_ready is not None:
- if wait_for_ready:
- return self.__class__(
- self
- | cygrpc.InitialMetadataFlags.wait_for_ready
- | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
- )
- elif not wait_for_ready:
- return self.__class__(
- self & ~cygrpc.InitialMetadataFlags.wait_for_ready
- | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
- )
- return self
- class _ChannelCallState(object):
- channel: cygrpc.Channel
- managed_calls: int
- threading: bool
- def __init__(self, channel: cygrpc.Channel):
- self.lock = threading.Lock()
- self.channel = channel
- self.managed_calls = 0
- self.threading = False
- def reset_postfork_child(self) -> None:
- self.managed_calls = 0
- def __del__(self):
- try:
- self.channel.close(
- cygrpc.StatusCode.cancelled, "Channel deallocated!"
- )
- except (TypeError, AttributeError):
- pass
- def _run_channel_spin_thread(state: _ChannelCallState) -> None:
- def channel_spin():
- while True:
- cygrpc.block_if_fork_in_progress(state)
- event = state.channel.next_call_event()
- if event.completion_type == cygrpc.CompletionType.queue_timeout:
- continue
- call_completed = event.tag(event)
- if call_completed:
- with state.lock:
- state.managed_calls -= 1
- if state.managed_calls == 0:
- return
- channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
- channel_spin_thread.setDaemon(True)
- channel_spin_thread.start()
- def _channel_managed_call_management(state: _ChannelCallState):
- # pylint: disable=too-many-arguments
- def create(
- flags: int,
- method: bytes,
- host: Optional[str],
- deadline: Optional[float],
- metadata: Optional[MetadataType],
- credentials: Optional[cygrpc.CallCredentials],
- operations: Sequence[Sequence[cygrpc.Operation]],
- event_handler: UserTag,
- context: Any,
- _registered_call_handle: Optional[int],
- ) -> cygrpc.IntegratedCall:
- """Creates a cygrpc.IntegratedCall.
- Args:
- flags: An integer bitfield of call flags.
- method: The RPC method.
- host: A host string for the created call.
- deadline: A float to be the deadline of the created call or None if
- the call is to have an infinite deadline.
- metadata: The metadata for the call or None.
- credentials: A cygrpc.CallCredentials or None.
- operations: A sequence of sequences of cygrpc.Operations to be
- started on the call.
- event_handler: A behavior to call to handle the events resultant from
- the operations on the call.
- context: Context object for distributed tracing.
- _registered_call_handle: An int representing the call handle of the
- method, or None if the method is not registered.
- Returns:
- A cygrpc.IntegratedCall with which to conduct an RPC.
- """
- operations_and_tags = tuple(
- (
- operation,
- event_handler,
- )
- for operation in operations
- )
- with state.lock:
- call = state.channel.integrated_call(
- flags,
- method,
- host,
- deadline,
- metadata,
- credentials,
- operations_and_tags,
- context,
- _registered_call_handle,
- )
- if state.managed_calls == 0:
- state.managed_calls = 1
- _run_channel_spin_thread(state)
- else:
- state.managed_calls += 1
- return call
- return create
- class _ChannelConnectivityState(object):
- lock: threading.RLock
- channel: grpc.Channel
- polling: bool
- connectivity: grpc.ChannelConnectivity
- try_to_connect: bool
- # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704
- callbacks_and_connectivities: List[
- Sequence[
- Union[
- Callable[[grpc.ChannelConnectivity], None],
- Optional[grpc.ChannelConnectivity],
- ]
- ]
- ]
- delivering: bool
- def __init__(self, channel: grpc.Channel):
- self.lock = threading.RLock()
- self.channel = channel
- self.polling = False
- self.connectivity = None
- self.try_to_connect = False
- self.callbacks_and_connectivities = []
- self.delivering = False
- def reset_postfork_child(self) -> None:
- self.polling = False
- self.connectivity = None
- self.try_to_connect = False
- self.callbacks_and_connectivities = []
- self.delivering = False
- def _deliveries(
- state: _ChannelConnectivityState,
- ) -> List[Callable[[grpc.ChannelConnectivity], None]]:
- callbacks_needing_update = []
- for callback_and_connectivity in state.callbacks_and_connectivities:
- (
- callback,
- callback_connectivity,
- ) = callback_and_connectivity
- if callback_connectivity is not state.connectivity:
- callbacks_needing_update.append(callback)
- callback_and_connectivity[1] = state.connectivity
- return callbacks_needing_update
- def _deliver(
- state: _ChannelConnectivityState,
- initial_connectivity: grpc.ChannelConnectivity,
- initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
- ) -> None:
- connectivity = initial_connectivity
- callbacks = initial_callbacks
- while True:
- for callback in callbacks:
- cygrpc.block_if_fork_in_progress(state)
- try:
- callback(connectivity)
- except Exception: # pylint: disable=broad-except
- _LOGGER.exception(
- _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE
- )
- with state.lock:
- callbacks = _deliveries(state)
- if callbacks:
- connectivity = state.connectivity
- else:
- state.delivering = False
- return
- def _spawn_delivery(
- state: _ChannelConnectivityState,
- callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
- ) -> None:
- delivering_thread = cygrpc.ForkManagedThread(
- target=_deliver,
- args=(
- state,
- state.connectivity,
- callbacks,
- ),
- )
- delivering_thread.setDaemon(True)
- delivering_thread.start()
- state.delivering = True
- # NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
- def _poll_connectivity(
- state: _ChannelConnectivityState,
- channel: grpc.Channel,
- initial_try_to_connect: bool,
- ) -> None:
- try_to_connect = initial_try_to_connect
- connectivity = channel.check_connectivity_state(try_to_connect)
- with state.lock:
- state.connectivity = (
- _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
- connectivity
- ]
- )
- callbacks = tuple(
- callback for callback, _ in state.callbacks_and_connectivities
- )
- for callback_and_connectivity in state.callbacks_and_connectivities:
- callback_and_connectivity[1] = state.connectivity
- if callbacks:
- _spawn_delivery(state, callbacks)
- while True:
- event = channel.watch_connectivity_state(
- connectivity, time.time() + 0.2
- )
- cygrpc.block_if_fork_in_progress(state)
- with state.lock:
- if (
- not state.callbacks_and_connectivities
- and not state.try_to_connect
- ):
- state.polling = False
- state.connectivity = None
- break
- try_to_connect = state.try_to_connect
- state.try_to_connect = False
- if event.success or try_to_connect:
- connectivity = channel.check_connectivity_state(try_to_connect)
- with state.lock:
- state.connectivity = (
- _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
- connectivity
- ]
- )
- if not state.delivering:
- callbacks = _deliveries(state)
- if callbacks:
- _spawn_delivery(state, callbacks)
- def _subscribe(
- state: _ChannelConnectivityState,
- callback: Callable[[grpc.ChannelConnectivity], None],
- try_to_connect: bool,
- ) -> None:
- with state.lock:
- if not state.callbacks_and_connectivities and not state.polling:
- polling_thread = cygrpc.ForkManagedThread(
- target=_poll_connectivity,
- args=(state, state.channel, bool(try_to_connect)),
- )
- polling_thread.setDaemon(True)
- polling_thread.start()
- state.polling = True
- state.callbacks_and_connectivities.append([callback, None])
- elif not state.delivering and state.connectivity is not None:
- _spawn_delivery(state, (callback,))
- state.try_to_connect |= bool(try_to_connect)
- state.callbacks_and_connectivities.append(
- [callback, state.connectivity]
- )
- else:
- state.try_to_connect |= bool(try_to_connect)
- state.callbacks_and_connectivities.append([callback, None])
- def _unsubscribe(
- state: _ChannelConnectivityState,
- callback: Callable[[grpc.ChannelConnectivity], None],
- ) -> None:
- with state.lock:
- for index, (subscribed_callback, unused_connectivity) in enumerate(
- state.callbacks_and_connectivities
- ):
- if callback == subscribed_callback:
- state.callbacks_and_connectivities.pop(index)
- break
- def _augment_options(
- base_options: Sequence[ChannelArgumentType],
- compression: Optional[grpc.Compression],
- ) -> Sequence[ChannelArgumentType]:
- compression_option = _compression.create_channel_option(compression)
- return (
- tuple(base_options)
- + compression_option
- + (
- (
- cygrpc.ChannelArgKey.primary_user_agent_string,
- _USER_AGENT,
- ),
- )
- )
- def _separate_channel_options(
- options: Sequence[ChannelArgumentType],
- ) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]:
- """Separates core channel options from Python channel options."""
- core_options = []
- python_options = []
- for pair in options:
- if (
- pair[0]
- == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
- ):
- python_options.append(pair)
- else:
- core_options.append(pair)
- return python_options, core_options
- class Channel(grpc.Channel):
- """A cygrpc.Channel-backed implementation of grpc.Channel."""
- _single_threaded_unary_stream: bool
- _channel: cygrpc.Channel
- _call_state: _ChannelCallState
- _connectivity_state: _ChannelConnectivityState
- _target: str
- _registered_call_handles: Dict[str, int]
- def __init__(
- self,
- target: str,
- options: Sequence[ChannelArgumentType],
- credentials: Optional[grpc.ChannelCredentials],
- compression: Optional[grpc.Compression],
- ):
- """Constructor.
- Args:
- target: The target to which to connect.
- options: Configuration options for the channel.
- credentials: A cygrpc.ChannelCredentials or None.
- compression: An optional value indicating the compression method to be
- used over the lifetime of the channel.
- """
- python_options, core_options = _separate_channel_options(options)
- self._single_threaded_unary_stream = (
- _DEFAULT_SINGLE_THREADED_UNARY_STREAM
- )
- self._process_python_options(python_options)
- self._channel = cygrpc.Channel(
- _common.encode(target),
- _augment_options(core_options, compression),
- credentials,
- )
- self._target = target
- self._call_state = _ChannelCallState(self._channel)
- self._connectivity_state = _ChannelConnectivityState(self._channel)
- cygrpc.fork_register_channel(self)
- if cygrpc.g_gevent_activated:
- cygrpc.gevent_increment_channel_count()
- def _get_registered_call_handle(self, method: str) -> int:
- """
- Get the registered call handle for a method.
- This is a semi-private method. It is intended for use only by gRPC generated code.
- This method is not thread-safe.
- Args:
- method: Required, the method name for the RPC.
- Returns:
- The registered call handle pointer in the form of a Python Long.
- """
- return self._channel.get_registered_call_handle(_common.encode(method))
- def _process_python_options(
- self, python_options: Sequence[ChannelArgumentType]
- ) -> None:
- """Sets channel attributes according to python-only channel options."""
- for pair in python_options:
- if (
- pair[0]
- == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
- ):
- self._single_threaded_unary_stream = True
- def subscribe(
- self,
- callback: Callable[[grpc.ChannelConnectivity], None],
- try_to_connect: Optional[bool] = None,
- ) -> None:
- _subscribe(self._connectivity_state, callback, try_to_connect)
- def unsubscribe(
- self, callback: Callable[[grpc.ChannelConnectivity], None]
- ) -> None:
- _unsubscribe(self._connectivity_state, callback)
- # pylint: disable=arguments-differ
- def unary_unary(
- self,
- method: str,
- request_serializer: Optional[SerializingFunction] = None,
- response_deserializer: Optional[DeserializingFunction] = None,
- _registered_method: Optional[bool] = False,
- ) -> grpc.UnaryUnaryMultiCallable:
- _registered_call_handle = None
- if _registered_method:
- _registered_call_handle = self._get_registered_call_handle(method)
- return _UnaryUnaryMultiCallable(
- self._channel,
- _channel_managed_call_management(self._call_state),
- _common.encode(method),
- _common.encode(self._target),
- request_serializer,
- response_deserializer,
- _registered_call_handle,
- )
- # pylint: disable=arguments-differ
- def unary_stream(
- self,
- method: str,
- request_serializer: Optional[SerializingFunction] = None,
- response_deserializer: Optional[DeserializingFunction] = None,
- _registered_method: Optional[bool] = False,
- ) -> grpc.UnaryStreamMultiCallable:
- _registered_call_handle = None
- if _registered_method:
- _registered_call_handle = self._get_registered_call_handle(method)
- # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
- # on a single Python thread results in an appreciable speed-up. However,
- # due to slight differences in capability, the multi-threaded variant
- # remains the default.
- if self._single_threaded_unary_stream:
- return _SingleThreadedUnaryStreamMultiCallable(
- self._channel,
- _common.encode(method),
- _common.encode(self._target),
- request_serializer,
- response_deserializer,
- _registered_call_handle,
- )
- else:
- return _UnaryStreamMultiCallable(
- self._channel,
- _channel_managed_call_management(self._call_state),
- _common.encode(method),
- _common.encode(self._target),
- request_serializer,
- response_deserializer,
- _registered_call_handle,
- )
- # pylint: disable=arguments-differ
- def stream_unary(
- self,
- method: str,
- request_serializer: Optional[SerializingFunction] = None,
- response_deserializer: Optional[DeserializingFunction] = None,
- _registered_method: Optional[bool] = False,
- ) -> grpc.StreamUnaryMultiCallable:
- _registered_call_handle = None
- if _registered_method:
- _registered_call_handle = self._get_registered_call_handle(method)
- return _StreamUnaryMultiCallable(
- self._channel,
- _channel_managed_call_management(self._call_state),
- _common.encode(method),
- _common.encode(self._target),
- request_serializer,
- response_deserializer,
- _registered_call_handle,
- )
- # pylint: disable=arguments-differ
- def stream_stream(
- self,
- method: str,
- request_serializer: Optional[SerializingFunction] = None,
- response_deserializer: Optional[DeserializingFunction] = None,
- _registered_method: Optional[bool] = False,
- ) -> grpc.StreamStreamMultiCallable:
- _registered_call_handle = None
- if _registered_method:
- _registered_call_handle = self._get_registered_call_handle(method)
- return _StreamStreamMultiCallable(
- self._channel,
- _channel_managed_call_management(self._call_state),
- _common.encode(method),
- _common.encode(self._target),
- request_serializer,
- response_deserializer,
- _registered_call_handle,
- )
- def _unsubscribe_all(self) -> None:
- state = self._connectivity_state
- if state:
- with state.lock:
- del state.callbacks_and_connectivities[:]
- def _close(self) -> None:
- self._unsubscribe_all()
- self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!")
- cygrpc.fork_unregister_channel(self)
- if cygrpc.g_gevent_activated:
- cygrpc.gevent_decrement_channel_count()
- def _close_on_fork(self) -> None:
- self._unsubscribe_all()
- self._channel.close_on_fork(
- cygrpc.StatusCode.cancelled, "Channel closed due to fork"
- )
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._close()
- return False
- def close(self) -> None:
- self._close()
- def __del__(self):
- # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
- # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
- # here (or more likely, call self._close() here). We don't do this today
- # because many valid use cases today allow the channel to be deleted
- # immediately after stubs are created. After a sufficient period of time
- # has passed for all users to be trusted to freeze out to their channels
- # for as long as they are in use and to close them after using them,
- # then deletion of this grpc._channel.Channel instance can be made to
- # effect closure of the underlying cygrpc.Channel instance.
- try:
- self._unsubscribe_all()
- except: # pylint: disable=bare-except
- # Exceptions in __del__ are ignored by Python anyway, but they can
- # keep spamming logs. Just silence them.
- pass
|