_channel.py 79 KB


  1. # Copyright 2016 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Invocation-side implementation of gRPC Python."""
  15. import copy
  16. import functools
  17. import logging
  18. import os
  19. import sys
  20. import threading
  21. import time
  22. import types
  23. from typing import (
  24. Any,
  25. Callable,
  26. Dict,
  27. Iterator,
  28. List,
  29. Optional,
  30. Sequence,
  31. Set,
  32. Tuple,
  33. Union,
  34. )
  35. import grpc # pytype: disable=pyi-error
  36. from grpc import _common # pytype: disable=pyi-error
  37. from grpc import _compression # pytype: disable=pyi-error
  38. from grpc import _grpcio_metadata # pytype: disable=pyi-error
  39. from grpc import _observability # pytype: disable=pyi-error
  40. from grpc._cython import cygrpc
  41. from grpc._typing import ChannelArgumentType
  42. from grpc._typing import DeserializingFunction
  43. from grpc._typing import IntegratedCallFactory
  44. from grpc._typing import MetadataType
  45. from grpc._typing import NullaryCallbackType
  46. from grpc._typing import ResponseType
  47. from grpc._typing import SerializingFunction
  48. from grpc._typing import UserTag
  49. import grpc.experimental # pytype: disable=pyi-error
  50. _LOGGER = logging.getLogger(__name__)
  51. _USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__)
  52. _EMPTY_FLAGS = 0
  53. # NOTE(rbellevi): No guarantees are given about the maintenance of this
  54. # environment variable.
  55. _DEFAULT_SINGLE_THREADED_UNARY_STREAM = (
  56. os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
  57. )
  58. _UNARY_UNARY_INITIAL_DUE = (
  59. cygrpc.OperationType.send_initial_metadata,
  60. cygrpc.OperationType.send_message,
  61. cygrpc.OperationType.send_close_from_client,
  62. cygrpc.OperationType.receive_initial_metadata,
  63. cygrpc.OperationType.receive_message,
  64. cygrpc.OperationType.receive_status_on_client,
  65. )
  66. _UNARY_STREAM_INITIAL_DUE = (
  67. cygrpc.OperationType.send_initial_metadata,
  68. cygrpc.OperationType.send_message,
  69. cygrpc.OperationType.send_close_from_client,
  70. cygrpc.OperationType.receive_initial_metadata,
  71. cygrpc.OperationType.receive_status_on_client,
  72. )
  73. _STREAM_UNARY_INITIAL_DUE = (
  74. cygrpc.OperationType.send_initial_metadata,
  75. cygrpc.OperationType.receive_initial_metadata,
  76. cygrpc.OperationType.receive_message,
  77. cygrpc.OperationType.receive_status_on_client,
  78. )
  79. _STREAM_STREAM_INITIAL_DUE = (
  80. cygrpc.OperationType.send_initial_metadata,
  81. cygrpc.OperationType.receive_initial_metadata,
  82. cygrpc.OperationType.receive_status_on_client,
  83. )
  84. _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
  85. "Exception calling channel subscription callback!"
  86. )
  87. _OK_RENDEZVOUS_REPR_FORMAT = (
  88. '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
  89. )
  90. _NON_OK_RENDEZVOUS_REPR_FORMAT = (
  91. "<{} of RPC that terminated with:\n"
  92. "\tstatus = {}\n"
  93. '\tdetails = "{}"\n'
  94. '\tdebug_error_string = "{}"\n'
  95. ">"
  96. )
  97. def _deadline(timeout: Optional[float]) -> Optional[float]:
  98. return None if timeout is None else time.time() + timeout
  99. def _unknown_code_details(
  100. unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str]
  101. ) -> str:
  102. return 'Server sent unknown code {} and details "{}"'.format(
  103. unknown_cygrpc_code, details
  104. )
  105. class _RPCState(object):
  106. condition: threading.Condition
  107. due: Set[cygrpc.OperationType]
  108. initial_metadata: Optional[MetadataType]
  109. response: Any
  110. trailing_metadata: Optional[MetadataType]
  111. code: Optional[grpc.StatusCode]
  112. details: Optional[str]
  113. debug_error_string: Optional[str]
  114. cancelled: bool
  115. callbacks: List[NullaryCallbackType]
  116. fork_epoch: Optional[int]
  117. rpc_start_time: Optional[float] # In relative seconds
  118. rpc_end_time: Optional[float] # In relative seconds
  119. method: Optional[str]
  120. target: Optional[str]
  121. def __init__(
  122. self,
  123. due: Sequence[cygrpc.OperationType],
  124. initial_metadata: Optional[MetadataType],
  125. trailing_metadata: Optional[MetadataType],
  126. code: Optional[grpc.StatusCode],
  127. details: Optional[str],
  128. ):
  129. # `condition` guards all members of _RPCState. `notify_all` is called on
  130. # `condition` when the state of the RPC has changed.
  131. self.condition = threading.Condition()
  132. # The cygrpc.OperationType objects representing events due from the RPC's
  133. # completion queue. If an operation is in `due`, it is guaranteed that
  134. # `operate()` has been called on a corresponding operation. But the
  135. # converse is not true. That is, in the case of failed `operate()`
  136. # calls, there may briefly be events in `due` that do not correspond to
  137. # operations submitted to Core.
  138. self.due = set(due)
  139. self.initial_metadata = initial_metadata
  140. self.response = None
  141. self.trailing_metadata = trailing_metadata
  142. self.code = code
  143. self.details = details
  144. self.debug_error_string = None
  145. # The following three fields are used for observability.
  146. # Updates to those fields do not trigger self.condition.
  147. self.rpc_start_time = None
  148. self.rpc_end_time = None
  149. self.method = None
  150. self.target = None
  151. # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
  152. # slightly wonky, so they have to be tracked separately from the rest of the
  153. # result of the RPC. This field tracks whether cancellation was requested
  154. # prior to termination of the RPC.
  155. self.cancelled = False
  156. self.callbacks = []
  157. self.fork_epoch = cygrpc.get_fork_epoch()
  158. def reset_postfork_child(self):
  159. self.condition = threading.Condition()
  160. def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None:
  161. if state.code is None:
  162. state.code = code
  163. state.details = details
  164. if state.initial_metadata is None:
  165. state.initial_metadata = ()
  166. state.trailing_metadata = ()
  167. def _handle_event(
  168. event: cygrpc.BaseEvent,
  169. state: _RPCState,
  170. response_deserializer: Optional[DeserializingFunction],
  171. ) -> List[NullaryCallbackType]:
  172. callbacks = []
  173. for batch_operation in event.batch_operations:
  174. operation_type = batch_operation.type()
  175. state.due.remove(operation_type)
  176. if operation_type == cygrpc.OperationType.receive_initial_metadata:
  177. state.initial_metadata = batch_operation.initial_metadata()
  178. elif operation_type == cygrpc.OperationType.receive_message:
  179. serialized_response = batch_operation.message()
  180. if serialized_response is not None:
  181. response = _common.deserialize(
  182. serialized_response, response_deserializer
  183. )
  184. if response is None:
  185. details = "Exception deserializing response!"
  186. _abort(state, grpc.StatusCode.INTERNAL, details)
  187. else:
  188. state.response = response
  189. elif operation_type == cygrpc.OperationType.receive_status_on_client:
  190. state.trailing_metadata = batch_operation.trailing_metadata()
  191. if state.code is None:
  192. code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
  193. batch_operation.code()
  194. )
  195. if code is None:
  196. state.code = grpc.StatusCode.UNKNOWN
  197. state.details = _unknown_code_details(
  198. code, batch_operation.details()
  199. )
  200. else:
  201. state.code = code
  202. state.details = batch_operation.details()
  203. state.debug_error_string = batch_operation.error_string()
  204. state.rpc_end_time = time.perf_counter()
  205. _observability.maybe_record_rpc_latency(state)
  206. callbacks.extend(state.callbacks)
  207. state.callbacks = None
  208. return callbacks
  209. def _event_handler(
  210. state: _RPCState, response_deserializer: Optional[DeserializingFunction]
  211. ) -> UserTag:
  212. def handle_event(event):
  213. with state.condition:
  214. callbacks = _handle_event(event, state, response_deserializer)
  215. state.condition.notify_all()
  216. done = not state.due
  217. for callback in callbacks:
  218. try:
  219. callback()
  220. except Exception as e: # pylint: disable=broad-except
  221. # NOTE(rbellevi): We suppress but log errors here so as not to
  222. # kill the channel spin thread.
  223. logging.error(
  224. "Exception in callback %s: %s", repr(callback.func), repr(e)
  225. )
  226. return done and state.fork_epoch >= cygrpc.get_fork_epoch()
  227. return handle_event
  228. # TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall.
  229. # pylint: disable=too-many-statements
  230. def _consume_request_iterator(
  231. request_iterator: Iterator,
  232. state: _RPCState,
  233. call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall],
  234. request_serializer: SerializingFunction,
  235. event_handler: Optional[UserTag],
  236. ) -> None:
  237. """Consume a request supplied by the user."""
  238. def consume_request_iterator(): # pylint: disable=too-many-branches
  239. # Iterate over the request iterator until it is exhausted or an error
  240. # condition is encountered.
  241. while True:
  242. return_from_user_request_generator_invoked = False
  243. try:
  244. # The thread may die in user-code. Do not block fork for this.
  245. cygrpc.enter_user_request_generator()
  246. request = next(request_iterator)
  247. except StopIteration:
  248. break
  249. except Exception: # pylint: disable=broad-except
  250. cygrpc.return_from_user_request_generator()
  251. return_from_user_request_generator_invoked = True
  252. code = grpc.StatusCode.UNKNOWN
  253. details = "Exception iterating requests!"
  254. _LOGGER.exception(details)
  255. call.cancel(
  256. _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
  257. )
  258. _abort(state, code, details)
  259. return
  260. finally:
  261. if not return_from_user_request_generator_invoked:
  262. cygrpc.return_from_user_request_generator()
  263. serialized_request = _common.serialize(request, request_serializer)
  264. with state.condition:
  265. if state.code is None and not state.cancelled:
  266. if serialized_request is None:
  267. code = grpc.StatusCode.INTERNAL
  268. details = "Exception serializing request!"
  269. call.cancel(
  270. _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
  271. details,
  272. )
  273. _abort(state, code, details)
  274. return
  275. else:
  276. state.due.add(cygrpc.OperationType.send_message)
  277. operations = (
  278. cygrpc.SendMessageOperation(
  279. serialized_request, _EMPTY_FLAGS
  280. ),
  281. )
  282. operating = call.operate(operations, event_handler)
  283. if not operating:
  284. state.due.remove(cygrpc.OperationType.send_message)
  285. return
  286. def _done():
  287. return (
  288. state.code is not None
  289. or cygrpc.OperationType.send_message
  290. not in state.due
  291. )
  292. _common.wait(
  293. state.condition.wait,
  294. _done,
  295. spin_cb=functools.partial(
  296. cygrpc.block_if_fork_in_progress, state
  297. ),
  298. )
  299. if state.code is not None:
  300. return
  301. else:
  302. return
  303. with state.condition:
  304. if state.code is None:
  305. state.due.add(cygrpc.OperationType.send_close_from_client)
  306. operations = (
  307. cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
  308. )
  309. operating = call.operate(operations, event_handler)
  310. if not operating:
  311. state.due.remove(
  312. cygrpc.OperationType.send_close_from_client
  313. )
  314. consumption_thread = cygrpc.ForkManagedThread(
  315. target=consume_request_iterator
  316. )
  317. consumption_thread.setDaemon(True)
  318. consumption_thread.start()
  319. def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str:
  320. """Calculates error string for RPC."""
  321. with rpc_state.condition:
  322. if rpc_state.code is None:
  323. return "<{} object>".format(class_name)
  324. elif rpc_state.code is grpc.StatusCode.OK:
  325. return _OK_RENDEZVOUS_REPR_FORMAT.format(
  326. class_name, rpc_state.code, rpc_state.details
  327. )
  328. else:
  329. return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
  330. class_name,
  331. rpc_state.code,
  332. rpc_state.details,
  333. rpc_state.debug_error_string,
  334. )
  335. class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
  336. """An RPC error not tied to the execution of a particular RPC.
  337. The RPC represented by the state object must not be in-progress or
  338. cancelled.
  339. Attributes:
  340. _state: An instance of _RPCState.
  341. """
  342. _state: _RPCState
  343. def __init__(self, state: _RPCState):
  344. with state.condition:
  345. self._state = _RPCState(
  346. (),
  347. copy.deepcopy(state.initial_metadata),
  348. copy.deepcopy(state.trailing_metadata),
  349. state.code,
  350. copy.deepcopy(state.details),
  351. )
  352. self._state.response = copy.copy(state.response)
  353. self._state.debug_error_string = copy.copy(state.debug_error_string)
  354. def initial_metadata(self) -> Optional[MetadataType]:
  355. return self._state.initial_metadata
  356. def trailing_metadata(self) -> Optional[MetadataType]:
  357. return self._state.trailing_metadata
  358. def code(self) -> Optional[grpc.StatusCode]:
  359. return self._state.code
  360. def details(self) -> Optional[str]:
  361. return _common.decode(self._state.details)
  362. def debug_error_string(self) -> Optional[str]:
  363. return _common.decode(self._state.debug_error_string)
  364. def _repr(self) -> str:
  365. return _rpc_state_string(self.__class__.__name__, self._state)
  366. def __repr__(self) -> str:
  367. return self._repr()
  368. def __str__(self) -> str:
  369. return self._repr()
  370. def cancel(self) -> bool:
  371. """See grpc.Future.cancel."""
  372. return False
  373. def cancelled(self) -> bool:
  374. """See grpc.Future.cancelled."""
  375. return False
  376. def running(self) -> bool:
  377. """See grpc.Future.running."""
  378. return False
  379. def done(self) -> bool:
  380. """See grpc.Future.done."""
  381. return True
  382. def result(
  383. self, timeout: Optional[float] = None
  384. ) -> Any: # pylint: disable=unused-argument
  385. """See grpc.Future.result."""
  386. raise self
  387. def exception(
  388. self, timeout: Optional[float] = None # pylint: disable=unused-argument
  389. ) -> Optional[Exception]:
  390. """See grpc.Future.exception."""
  391. return self
  392. def traceback(
  393. self, timeout: Optional[float] = None # pylint: disable=unused-argument
  394. ) -> Optional[types.TracebackType]:
  395. """See grpc.Future.traceback."""
  396. try:
  397. raise self
  398. except grpc.RpcError:
  399. return sys.exc_info()[2]
  400. def add_done_callback(
  401. self,
  402. fn: Callable[[grpc.Future], None],
  403. timeout: Optional[float] = None, # pylint: disable=unused-argument
  404. ) -> None:
  405. """See grpc.Future.add_done_callback."""
  406. fn(self)
  407. class _Rendezvous(grpc.RpcError, grpc.RpcContext):
  408. """An RPC iterator.
  409. Attributes:
  410. _state: An instance of _RPCState.
  411. _call: An instance of SegregatedCall or IntegratedCall.
  412. In either case, the _call object is expected to have operate, cancel,
  413. and next_event methods.
  414. _response_deserializer: A callable taking bytes and return a Python
  415. object.
  416. _deadline: A float representing the deadline of the RPC in seconds. Or
  417. possibly None, to represent an RPC with no deadline at all.
  418. """
  419. _state: _RPCState
  420. _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall]
  421. _response_deserializer: Optional[DeserializingFunction]
  422. _deadline: Optional[float]
  423. def __init__(
  424. self,
  425. state: _RPCState,
  426. call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall],
  427. response_deserializer: Optional[DeserializingFunction],
  428. deadline: Optional[float],
  429. ):
  430. super(_Rendezvous, self).__init__()
  431. self._state = state
  432. self._call = call
  433. self._response_deserializer = response_deserializer
  434. self._deadline = deadline
  435. def is_active(self) -> bool:
  436. """See grpc.RpcContext.is_active"""
  437. with self._state.condition:
  438. return self._state.code is None
  439. def time_remaining(self) -> Optional[float]:
  440. """See grpc.RpcContext.time_remaining"""
  441. with self._state.condition:
  442. if self._deadline is None:
  443. return None
  444. else:
  445. return max(self._deadline - time.time(), 0)
  446. def cancel(self) -> bool:
  447. """See grpc.RpcContext.cancel"""
  448. with self._state.condition:
  449. if self._state.code is None:
  450. code = grpc.StatusCode.CANCELLED
  451. details = "Locally cancelled by application!"
  452. self._call.cancel(
  453. _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
  454. )
  455. self._state.cancelled = True
  456. _abort(self._state, code, details)
  457. self._state.condition.notify_all()
  458. return True
  459. else:
  460. return False
  461. def add_callback(self, callback: NullaryCallbackType) -> bool:
  462. """See grpc.RpcContext.add_callback"""
  463. with self._state.condition:
  464. if self._state.callbacks is None:
  465. return False
  466. else:
  467. self._state.callbacks.append(callback)
  468. return True
  469. def __iter__(self):
  470. return self
  471. def next(self):
  472. return self._next()
  473. def __next__(self):
  474. return self._next()
  475. def _next(self):
  476. raise NotImplementedError()
  477. def debug_error_string(self) -> Optional[str]:
  478. raise NotImplementedError()
  479. def _repr(self) -> str:
  480. return _rpc_state_string(self.__class__.__name__, self._state)
  481. def __repr__(self) -> str:
  482. return self._repr()
  483. def __str__(self) -> str:
  484. return self._repr()
  485. def __del__(self) -> None:
  486. with self._state.condition:
  487. if self._state.code is None:
  488. self._state.code = grpc.StatusCode.CANCELLED
  489. self._state.details = "Cancelled upon garbage collection!"
  490. self._state.cancelled = True
  491. self._call.cancel(
  492. _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
  493. self._state.details,
  494. )
  495. self._state.condition.notify_all()
  496. class _SingleThreadedRendezvous(
  497. _Rendezvous, grpc.Call, grpc.Future
  498. ): # pylint: disable=too-many-ancestors
  499. """An RPC iterator operating entirely on a single thread.
  500. The __next__ method of _SingleThreadedRendezvous does not depend on the
  501. existence of any other thread, including the "channel spin thread".
  502. However, this means that its interface is entirely synchronous. So this
  503. class cannot completely fulfill the grpc.Future interface. The result,
  504. exception, and traceback methods will never block and will instead raise
  505. an exception if calling the method would result in blocking.
  506. This means that these methods are safe to call from add_done_callback
  507. handlers.
  508. """
  509. _state: _RPCState
  510. def _is_complete(self) -> bool:
  511. return self._state.code is not None
  512. def cancelled(self) -> bool:
  513. with self._state.condition:
  514. return self._state.cancelled
  515. def running(self) -> bool:
  516. with self._state.condition:
  517. return self._state.code is None
  518. def done(self) -> bool:
  519. with self._state.condition:
  520. return self._state.code is not None
  521. def result(self, timeout: Optional[float] = None) -> Any:
  522. """Returns the result of the computation or raises its exception.
  523. This method will never block. Instead, it will raise an exception
  524. if calling this method would otherwise result in blocking.
  525. Since this method will never block, any `timeout` argument passed will
  526. be ignored.
  527. """
  528. del timeout
  529. with self._state.condition:
  530. if not self._is_complete():
  531. raise grpc.experimental.UsageError(
  532. "_SingleThreadedRendezvous only supports result() when the"
  533. " RPC is complete."
  534. )
  535. if self._state.code is grpc.StatusCode.OK:
  536. return self._state.response
  537. elif self._state.cancelled:
  538. raise grpc.FutureCancelledError()
  539. else:
  540. raise self
  541. def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
  542. """Return the exception raised by the computation.
  543. This method will never block. Instead, it will raise an exception
  544. if calling this method would otherwise result in blocking.
  545. Since this method will never block, any `timeout` argument passed will
  546. be ignored.
  547. """
  548. del timeout
  549. with self._state.condition:
  550. if not self._is_complete():
  551. raise grpc.experimental.UsageError(
  552. "_SingleThreadedRendezvous only supports exception() when"
  553. " the RPC is complete."
  554. )
  555. if self._state.code is grpc.StatusCode.OK:
  556. return None
  557. elif self._state.cancelled:
  558. raise grpc.FutureCancelledError()
  559. else:
  560. return self
  561. def traceback(
  562. self, timeout: Optional[float] = None
  563. ) -> Optional[types.TracebackType]:
  564. """Access the traceback of the exception raised by the computation.
  565. This method will never block. Instead, it will raise an exception
  566. if calling this method would otherwise result in blocking.
  567. Since this method will never block, any `timeout` argument passed will
  568. be ignored.
  569. """
  570. del timeout
  571. with self._state.condition:
  572. if not self._is_complete():
  573. raise grpc.experimental.UsageError(
  574. "_SingleThreadedRendezvous only supports traceback() when"
  575. " the RPC is complete."
  576. )
  577. if self._state.code is grpc.StatusCode.OK:
  578. return None
  579. elif self._state.cancelled:
  580. raise grpc.FutureCancelledError()
  581. else:
  582. try:
  583. raise self
  584. except grpc.RpcError:
  585. return sys.exc_info()[2]
  586. def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
  587. with self._state.condition:
  588. if self._state.code is None:
  589. self._state.callbacks.append(functools.partial(fn, self))
  590. return
  591. fn(self)
  592. def initial_metadata(self) -> Optional[MetadataType]:
  593. """See grpc.Call.initial_metadata"""
  594. with self._state.condition:
  595. # NOTE(gnossen): Based on our initial call batch, we are guaranteed
  596. # to receive initial metadata before any messages.
  597. while self._state.initial_metadata is None:
  598. self._consume_next_event()
  599. return self._state.initial_metadata
  600. def trailing_metadata(self) -> Optional[MetadataType]:
  601. """See grpc.Call.trailing_metadata"""
  602. with self._state.condition:
  603. if self._state.trailing_metadata is None:
  604. raise grpc.experimental.UsageError(
  605. "Cannot get trailing metadata until RPC is completed."
  606. )
  607. return self._state.trailing_metadata
  608. def code(self) -> Optional[grpc.StatusCode]:
  609. """See grpc.Call.code"""
  610. with self._state.condition:
  611. if self._state.code is None:
  612. raise grpc.experimental.UsageError(
  613. "Cannot get code until RPC is completed."
  614. )
  615. return self._state.code
  616. def details(self) -> Optional[str]:
  617. """See grpc.Call.details"""
  618. with self._state.condition:
  619. if self._state.details is None:
  620. raise grpc.experimental.UsageError(
  621. "Cannot get details until RPC is completed."
  622. )
  623. return _common.decode(self._state.details)
  624. def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]:
  625. event = self._call.next_event()
  626. with self._state.condition:
  627. callbacks = _handle_event(
  628. event, self._state, self._response_deserializer
  629. )
  630. for callback in callbacks:
  631. # NOTE(gnossen): We intentionally allow exceptions to bubble up
  632. # to the user when running on a single thread.
  633. callback()
  634. return event
  635. def _next_response(self) -> Any:
  636. while True:
  637. self._consume_next_event()
  638. with self._state.condition:
  639. if self._state.response is not None:
  640. response = self._state.response
  641. self._state.response = None
  642. return response
  643. elif (
  644. cygrpc.OperationType.receive_message not in self._state.due
  645. ):
  646. if self._state.code is grpc.StatusCode.OK:
  647. raise StopIteration()
  648. elif self._state.code is not None:
  649. raise self
  650. def _next(self) -> Any:
  651. with self._state.condition:
  652. if self._state.code is None:
  653. # We tentatively add the operation as expected and remove
  654. # it if the enqueue operation fails. This allows us to guarantee that
  655. # if an event has been submitted to the core completion queue,
  656. # it is in `due`. If we waited until after a successful
  657. # enqueue operation then a signal could interrupt this
  658. # thread between the enqueue operation and the addition of the
  659. # operation to `due`. This would cause an exception on the
  660. # channel spin thread when the operation completes and no
  661. # corresponding operation would be present in state.due.
  662. # Note that, since `condition` is held through this block, there is
  663. # no data race on `due`.
  664. self._state.due.add(cygrpc.OperationType.receive_message)
  665. operating = self._call.operate(
  666. (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None
  667. )
  668. if not operating:
  669. self._state.due.remove(cygrpc.OperationType.receive_message)
  670. elif self._state.code is grpc.StatusCode.OK:
  671. raise StopIteration()
  672. else:
  673. raise self
  674. return self._next_response()
  675. def debug_error_string(self) -> Optional[str]:
  676. with self._state.condition:
  677. if self._state.debug_error_string is None:
  678. raise grpc.experimental.UsageError(
  679. "Cannot get debug error string until RPC is completed."
  680. )
  681. return _common.decode(self._state.debug_error_string)
  682. class _MultiThreadedRendezvous(
  683. _Rendezvous, grpc.Call, grpc.Future
  684. ): # pylint: disable=too-many-ancestors
  685. """An RPC iterator that depends on a channel spin thread.
  686. This iterator relies upon a per-channel thread running in the background,
  687. dequeueing events from the completion queue, and notifying threads waiting
  688. on the threading.Condition object in the _RPCState object.
  689. This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
  690. and to mediate a bidirection streaming RPC.
  691. """
  692. _state: _RPCState
  693. def initial_metadata(self) -> Optional[MetadataType]:
  694. """See grpc.Call.initial_metadata"""
  695. with self._state.condition:
  696. def _done():
  697. return self._state.initial_metadata is not None
  698. _common.wait(self._state.condition.wait, _done)
  699. return self._state.initial_metadata
  700. def trailing_metadata(self) -> Optional[MetadataType]:
  701. """See grpc.Call.trailing_metadata"""
  702. with self._state.condition:
  703. def _done():
  704. return self._state.trailing_metadata is not None
  705. _common.wait(self._state.condition.wait, _done)
  706. return self._state.trailing_metadata
  707. def code(self) -> Optional[grpc.StatusCode]:
  708. """See grpc.Call.code"""
  709. with self._state.condition:
  710. def _done():
  711. return self._state.code is not None
  712. _common.wait(self._state.condition.wait, _done)
  713. return self._state.code
  714. def details(self) -> Optional[str]:
  715. """See grpc.Call.details"""
  716. with self._state.condition:
  717. def _done():
  718. return self._state.details is not None
  719. _common.wait(self._state.condition.wait, _done)
  720. return _common.decode(self._state.details)
  721. def debug_error_string(self) -> Optional[str]:
  722. with self._state.condition:
  723. def _done():
  724. return self._state.debug_error_string is not None
  725. _common.wait(self._state.condition.wait, _done)
  726. return _common.decode(self._state.debug_error_string)
  727. def cancelled(self) -> bool:
  728. with self._state.condition:
  729. return self._state.cancelled
  730. def running(self) -> bool:
  731. with self._state.condition:
  732. return self._state.code is None
  733. def done(self) -> bool:
  734. with self._state.condition:
  735. return self._state.code is not None
  736. def _is_complete(self) -> bool:
  737. return self._state.code is not None
  738. def result(self, timeout: Optional[float] = None) -> Any:
  739. """Returns the result of the computation or raises its exception.
  740. See grpc.Future.result for the full API contract.
  741. """
  742. with self._state.condition:
  743. timed_out = _common.wait(
  744. self._state.condition.wait, self._is_complete, timeout=timeout
  745. )
  746. if timed_out:
  747. raise grpc.FutureTimeoutError()
  748. else:
  749. if self._state.code is grpc.StatusCode.OK:
  750. return self._state.response
  751. elif self._state.cancelled:
  752. raise grpc.FutureCancelledError()
  753. else:
  754. raise self
  755. def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
  756. """Return the exception raised by the computation.
  757. See grpc.Future.exception for the full API contract.
  758. """
  759. with self._state.condition:
  760. timed_out = _common.wait(
  761. self._state.condition.wait, self._is_complete, timeout=timeout
  762. )
  763. if timed_out:
  764. raise grpc.FutureTimeoutError()
  765. else:
  766. if self._state.code is grpc.StatusCode.OK:
  767. return None
  768. elif self._state.cancelled:
  769. raise grpc.FutureCancelledError()
  770. else:
  771. return self
  772. def traceback(
  773. self, timeout: Optional[float] = None
  774. ) -> Optional[types.TracebackType]:
  775. """Access the traceback of the exception raised by the computation.
  776. See grpc.future.traceback for the full API contract.
  777. """
  778. with self._state.condition:
  779. timed_out = _common.wait(
  780. self._state.condition.wait, self._is_complete, timeout=timeout
  781. )
  782. if timed_out:
  783. raise grpc.FutureTimeoutError()
  784. else:
  785. if self._state.code is grpc.StatusCode.OK:
  786. return None
  787. elif self._state.cancelled:
  788. raise grpc.FutureCancelledError()
  789. else:
  790. try:
  791. raise self
  792. except grpc.RpcError:
  793. return sys.exc_info()[2]
  794. def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
  795. with self._state.condition:
  796. if self._state.code is None:
  797. self._state.callbacks.append(functools.partial(fn, self))
  798. return
  799. fn(self)
  800. def _next(self) -> Any:
  801. with self._state.condition:
  802. if self._state.code is None:
  803. event_handler = _event_handler(
  804. self._state, self._response_deserializer
  805. )
  806. self._state.due.add(cygrpc.OperationType.receive_message)
  807. operating = self._call.operate(
  808. (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
  809. event_handler,
  810. )
  811. if not operating:
  812. self._state.due.remove(cygrpc.OperationType.receive_message)
  813. elif self._state.code is grpc.StatusCode.OK:
  814. raise StopIteration()
  815. else:
  816. raise self
  817. def _response_ready():
  818. return self._state.response is not None or (
  819. cygrpc.OperationType.receive_message not in self._state.due
  820. and self._state.code is not None
  821. )
  822. _common.wait(self._state.condition.wait, _response_ready)
  823. if self._state.response is not None:
  824. response = self._state.response
  825. self._state.response = None
  826. return response
  827. elif cygrpc.OperationType.receive_message not in self._state.due:
  828. if self._state.code is grpc.StatusCode.OK:
  829. raise StopIteration()
  830. elif self._state.code is not None:
  831. raise self
  832. def _start_unary_request(
  833. request: Any,
  834. timeout: Optional[float],
  835. request_serializer: SerializingFunction,
  836. ) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]:
  837. deadline = _deadline(timeout)
  838. serialized_request = _common.serialize(request, request_serializer)
  839. if serialized_request is None:
  840. state = _RPCState(
  841. (),
  842. (),
  843. (),
  844. grpc.StatusCode.INTERNAL,
  845. "Exception serializing request!",
  846. )
  847. error = _InactiveRpcError(state)
  848. return deadline, None, error
  849. else:
  850. return deadline, serialized_request, None
  851. def _end_unary_response_blocking(
  852. state: _RPCState,
  853. call: cygrpc.SegregatedCall,
  854. with_call: bool,
  855. deadline: Optional[float],
  856. ) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
  857. if state.code is grpc.StatusCode.OK:
  858. if with_call:
  859. rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
  860. return state.response, rendezvous
  861. else:
  862. return state.response
  863. else:
  864. raise _InactiveRpcError(state) # pytype: disable=not-instantiable
  865. def _stream_unary_invocation_operations(
  866. metadata: Optional[MetadataType], initial_metadata_flags: int
  867. ) -> Sequence[Sequence[cygrpc.Operation]]:
  868. return (
  869. (
  870. cygrpc.SendInitialMetadataOperation(
  871. metadata, initial_metadata_flags
  872. ),
  873. cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
  874. cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
  875. ),
  876. (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
  877. )
  878. def _stream_unary_invocation_operations_and_tags(
  879. metadata: Optional[MetadataType], initial_metadata_flags: int
  880. ) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]:
  881. return tuple(
  882. (
  883. operations,
  884. None,
  885. )
  886. for operations in _stream_unary_invocation_operations(
  887. metadata, initial_metadata_flags
  888. )
  889. )
  890. def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]:
  891. parent_deadline = cygrpc.get_deadline_from_context()
  892. if parent_deadline is None and user_deadline is None:
  893. return None
  894. elif parent_deadline is not None and user_deadline is None:
  895. return parent_deadline
  896. elif user_deadline is not None and parent_deadline is None:
  897. return user_deadline
  898. else:
  899. return min(parent_deadline, user_deadline)
  900. class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
  901. _channel: cygrpc.Channel
  902. _managed_call: IntegratedCallFactory
  903. _method: bytes
  904. _target: bytes
  905. _request_serializer: Optional[SerializingFunction]
  906. _response_deserializer: Optional[DeserializingFunction]
  907. _context: Any
  908. _registered_call_handle: Optional[int]
  909. __slots__ = [
  910. "_channel",
  911. "_managed_call",
  912. "_method",
  913. "_target",
  914. "_request_serializer",
  915. "_response_deserializer",
  916. "_context",
  917. ]
  918. # pylint: disable=too-many-arguments
  919. def __init__(
  920. self,
  921. channel: cygrpc.Channel,
  922. managed_call: IntegratedCallFactory,
  923. method: bytes,
  924. target: bytes,
  925. request_serializer: Optional[SerializingFunction],
  926. response_deserializer: Optional[DeserializingFunction],
  927. _registered_call_handle: Optional[int],
  928. ):
  929. self._channel = channel
  930. self._managed_call = managed_call
  931. self._method = method
  932. self._target = target
  933. self._request_serializer = request_serializer
  934. self._response_deserializer = response_deserializer
  935. self._context = cygrpc.build_census_context()
  936. self._registered_call_handle = _registered_call_handle
  937. def _prepare(
  938. self,
  939. request: Any,
  940. timeout: Optional[float],
  941. metadata: Optional[MetadataType],
  942. wait_for_ready: Optional[bool],
  943. compression: Optional[grpc.Compression],
  944. ) -> Tuple[
  945. Optional[_RPCState],
  946. Optional[Sequence[cygrpc.Operation]],
  947. Optional[float],
  948. Optional[grpc.RpcError],
  949. ]:
  950. deadline, serialized_request, rendezvous = _start_unary_request(
  951. request, timeout, self._request_serializer
  952. )
  953. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  954. wait_for_ready
  955. )
  956. augmented_metadata = _compression.augment_metadata(
  957. metadata, compression
  958. )
  959. if serialized_request is None:
  960. return None, None, None, rendezvous
  961. else:
  962. state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
  963. operations = (
  964. cygrpc.SendInitialMetadataOperation(
  965. augmented_metadata, initial_metadata_flags
  966. ),
  967. cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
  968. cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
  969. cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
  970. cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
  971. cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
  972. )
  973. return state, operations, deadline, None
  974. def _blocking(
  975. self,
  976. request: Any,
  977. timeout: Optional[float] = None,
  978. metadata: Optional[MetadataType] = None,
  979. credentials: Optional[grpc.CallCredentials] = None,
  980. wait_for_ready: Optional[bool] = None,
  981. compression: Optional[grpc.Compression] = None,
  982. ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
  983. state, operations, deadline, rendezvous = self._prepare(
  984. request, timeout, metadata, wait_for_ready, compression
  985. )
  986. if state is None:
  987. raise rendezvous # pylint: disable-msg=raising-bad-type
  988. else:
  989. state.rpc_start_time = time.perf_counter()
  990. state.method = _common.decode(self._method)
  991. state.target = _common.decode(self._target)
  992. call = self._channel.segregated_call(
  993. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  994. self._method,
  995. None,
  996. _determine_deadline(deadline),
  997. metadata,
  998. None if credentials is None else credentials._credentials,
  999. (
  1000. (
  1001. operations,
  1002. None,
  1003. ),
  1004. ),
  1005. self._context,
  1006. self._registered_call_handle,
  1007. )
  1008. event = call.next_event()
  1009. _handle_event(event, state, self._response_deserializer)
  1010. return state, call
  1011. def __call__(
  1012. self,
  1013. request: Any,
  1014. timeout: Optional[float] = None,
  1015. metadata: Optional[MetadataType] = None,
  1016. credentials: Optional[grpc.CallCredentials] = None,
  1017. wait_for_ready: Optional[bool] = None,
  1018. compression: Optional[grpc.Compression] = None,
  1019. ) -> Any:
  1020. (
  1021. state,
  1022. call,
  1023. ) = self._blocking(
  1024. request, timeout, metadata, credentials, wait_for_ready, compression
  1025. )
  1026. return _end_unary_response_blocking(state, call, False, None)
  1027. def with_call(
  1028. self,
  1029. request: Any,
  1030. timeout: Optional[float] = None,
  1031. metadata: Optional[MetadataType] = None,
  1032. credentials: Optional[grpc.CallCredentials] = None,
  1033. wait_for_ready: Optional[bool] = None,
  1034. compression: Optional[grpc.Compression] = None,
  1035. ) -> Tuple[Any, grpc.Call]:
  1036. (
  1037. state,
  1038. call,
  1039. ) = self._blocking(
  1040. request, timeout, metadata, credentials, wait_for_ready, compression
  1041. )
  1042. return _end_unary_response_blocking(state, call, True, None)
  1043. def future(
  1044. self,
  1045. request: Any,
  1046. timeout: Optional[float] = None,
  1047. metadata: Optional[MetadataType] = None,
  1048. credentials: Optional[grpc.CallCredentials] = None,
  1049. wait_for_ready: Optional[bool] = None,
  1050. compression: Optional[grpc.Compression] = None,
  1051. ) -> _MultiThreadedRendezvous:
  1052. state, operations, deadline, rendezvous = self._prepare(
  1053. request, timeout, metadata, wait_for_ready, compression
  1054. )
  1055. if state is None:
  1056. raise rendezvous # pylint: disable-msg=raising-bad-type
  1057. else:
  1058. event_handler = _event_handler(state, self._response_deserializer)
  1059. state.rpc_start_time = time.perf_counter()
  1060. state.method = _common.decode(self._method)
  1061. state.target = _common.decode(self._target)
  1062. call = self._managed_call(
  1063. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  1064. self._method,
  1065. None,
  1066. deadline,
  1067. metadata,
  1068. None if credentials is None else credentials._credentials,
  1069. (operations,),
  1070. event_handler,
  1071. self._context,
  1072. self._registered_call_handle,
  1073. )
  1074. return _MultiThreadedRendezvous(
  1075. state, call, self._response_deserializer, deadline
  1076. )
  1077. class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
  1078. _channel: cygrpc.Channel
  1079. _method: bytes
  1080. _target: bytes
  1081. _request_serializer: Optional[SerializingFunction]
  1082. _response_deserializer: Optional[DeserializingFunction]
  1083. _context: Any
  1084. _registered_call_handle: Optional[int]
  1085. __slots__ = [
  1086. "_channel",
  1087. "_method",
  1088. "_target",
  1089. "_request_serializer",
  1090. "_response_deserializer",
  1091. "_context",
  1092. ]
  1093. # pylint: disable=too-many-arguments
  1094. def __init__(
  1095. self,
  1096. channel: cygrpc.Channel,
  1097. method: bytes,
  1098. target: bytes,
  1099. request_serializer: SerializingFunction,
  1100. response_deserializer: DeserializingFunction,
  1101. _registered_call_handle: Optional[int],
  1102. ):
  1103. self._channel = channel
  1104. self._method = method
  1105. self._target = target
  1106. self._request_serializer = request_serializer
  1107. self._response_deserializer = response_deserializer
  1108. self._context = cygrpc.build_census_context()
  1109. self._registered_call_handle = _registered_call_handle
  1110. def __call__( # pylint: disable=too-many-locals
  1111. self,
  1112. request: Any,
  1113. timeout: Optional[float] = None,
  1114. metadata: Optional[MetadataType] = None,
  1115. credentials: Optional[grpc.CallCredentials] = None,
  1116. wait_for_ready: Optional[bool] = None,
  1117. compression: Optional[grpc.Compression] = None,
  1118. ) -> _SingleThreadedRendezvous:
  1119. deadline = _deadline(timeout)
  1120. serialized_request = _common.serialize(
  1121. request, self._request_serializer
  1122. )
  1123. if serialized_request is None:
  1124. state = _RPCState(
  1125. (),
  1126. (),
  1127. (),
  1128. grpc.StatusCode.INTERNAL,
  1129. "Exception serializing request!",
  1130. )
  1131. raise _InactiveRpcError(state)
  1132. state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
  1133. call_credentials = (
  1134. None if credentials is None else credentials._credentials
  1135. )
  1136. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  1137. wait_for_ready
  1138. )
  1139. augmented_metadata = _compression.augment_metadata(
  1140. metadata, compression
  1141. )
  1142. operations = (
  1143. (
  1144. cygrpc.SendInitialMetadataOperation(
  1145. augmented_metadata, initial_metadata_flags
  1146. ),
  1147. cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
  1148. cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
  1149. ),
  1150. (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
  1151. (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
  1152. )
  1153. operations_and_tags = tuple((ops, None) for ops in operations)
  1154. state.rpc_start_time = time.perf_counter()
  1155. state.method = _common.decode(self._method)
  1156. state.target = _common.decode(self._target)
  1157. call = self._channel.segregated_call(
  1158. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  1159. self._method,
  1160. None,
  1161. _determine_deadline(deadline),
  1162. metadata,
  1163. call_credentials,
  1164. operations_and_tags,
  1165. self._context,
  1166. self._registered_call_handle,
  1167. )
  1168. return _SingleThreadedRendezvous(
  1169. state, call, self._response_deserializer, deadline
  1170. )
  1171. class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
  1172. _channel: cygrpc.Channel
  1173. _managed_call: IntegratedCallFactory
  1174. _method: bytes
  1175. _target: bytes
  1176. _request_serializer: Optional[SerializingFunction]
  1177. _response_deserializer: Optional[DeserializingFunction]
  1178. _context: Any
  1179. _registered_call_handle: Optional[int]
  1180. __slots__ = [
  1181. "_channel",
  1182. "_managed_call",
  1183. "_method",
  1184. "_target",
  1185. "_request_serializer",
  1186. "_response_deserializer",
  1187. "_context",
  1188. ]
  1189. # pylint: disable=too-many-arguments
  1190. def __init__(
  1191. self,
  1192. channel: cygrpc.Channel,
  1193. managed_call: IntegratedCallFactory,
  1194. method: bytes,
  1195. target: bytes,
  1196. request_serializer: SerializingFunction,
  1197. response_deserializer: DeserializingFunction,
  1198. _registered_call_handle: Optional[int],
  1199. ):
  1200. self._channel = channel
  1201. self._managed_call = managed_call
  1202. self._method = method
  1203. self._target = target
  1204. self._request_serializer = request_serializer
  1205. self._response_deserializer = response_deserializer
  1206. self._context = cygrpc.build_census_context()
  1207. self._registered_call_handle = _registered_call_handle
  1208. def __call__( # pylint: disable=too-many-locals
  1209. self,
  1210. request: Any,
  1211. timeout: Optional[float] = None,
  1212. metadata: Optional[MetadataType] = None,
  1213. credentials: Optional[grpc.CallCredentials] = None,
  1214. wait_for_ready: Optional[bool] = None,
  1215. compression: Optional[grpc.Compression] = None,
  1216. ) -> _MultiThreadedRendezvous:
  1217. deadline, serialized_request, rendezvous = _start_unary_request(
  1218. request, timeout, self._request_serializer
  1219. )
  1220. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  1221. wait_for_ready
  1222. )
  1223. if serialized_request is None:
  1224. raise rendezvous # pylint: disable-msg=raising-bad-type
  1225. else:
  1226. augmented_metadata = _compression.augment_metadata(
  1227. metadata, compression
  1228. )
  1229. state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
  1230. operations = (
  1231. (
  1232. cygrpc.SendInitialMetadataOperation(
  1233. augmented_metadata, initial_metadata_flags
  1234. ),
  1235. cygrpc.SendMessageOperation(
  1236. serialized_request, _EMPTY_FLAGS
  1237. ),
  1238. cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
  1239. cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
  1240. ),
  1241. (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
  1242. )
  1243. state.rpc_start_time = time.perf_counter()
  1244. state.method = _common.decode(self._method)
  1245. state.target = _common.decode(self._target)
  1246. call = self._managed_call(
  1247. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  1248. self._method,
  1249. None,
  1250. _determine_deadline(deadline),
  1251. metadata,
  1252. None if credentials is None else credentials._credentials,
  1253. operations,
  1254. _event_handler(state, self._response_deserializer),
  1255. self._context,
  1256. self._registered_call_handle,
  1257. )
  1258. return _MultiThreadedRendezvous(
  1259. state, call, self._response_deserializer, deadline
  1260. )
  1261. class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
  1262. _channel: cygrpc.Channel
  1263. _managed_call: IntegratedCallFactory
  1264. _method: bytes
  1265. _target: bytes
  1266. _request_serializer: Optional[SerializingFunction]
  1267. _response_deserializer: Optional[DeserializingFunction]
  1268. _context: Any
  1269. _registered_call_handle: Optional[int]
  1270. __slots__ = [
  1271. "_channel",
  1272. "_managed_call",
  1273. "_method",
  1274. "_target",
  1275. "_request_serializer",
  1276. "_response_deserializer",
  1277. "_context",
  1278. ]
  1279. # pylint: disable=too-many-arguments
  1280. def __init__(
  1281. self,
  1282. channel: cygrpc.Channel,
  1283. managed_call: IntegratedCallFactory,
  1284. method: bytes,
  1285. target: bytes,
  1286. request_serializer: Optional[SerializingFunction],
  1287. response_deserializer: Optional[DeserializingFunction],
  1288. _registered_call_handle: Optional[int],
  1289. ):
  1290. self._channel = channel
  1291. self._managed_call = managed_call
  1292. self._method = method
  1293. self._target = target
  1294. self._request_serializer = request_serializer
  1295. self._response_deserializer = response_deserializer
  1296. self._context = cygrpc.build_census_context()
  1297. self._registered_call_handle = _registered_call_handle
  1298. def _blocking(
  1299. self,
  1300. request_iterator: Iterator,
  1301. timeout: Optional[float],
  1302. metadata: Optional[MetadataType],
  1303. credentials: Optional[grpc.CallCredentials],
  1304. wait_for_ready: Optional[bool],
  1305. compression: Optional[grpc.Compression],
  1306. ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
  1307. deadline = _deadline(timeout)
  1308. state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
  1309. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  1310. wait_for_ready
  1311. )
  1312. augmented_metadata = _compression.augment_metadata(
  1313. metadata, compression
  1314. )
  1315. state.rpc_start_time = time.perf_counter()
  1316. state.method = _common.decode(self._method)
  1317. state.target = _common.decode(self._target)
  1318. call = self._channel.segregated_call(
  1319. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  1320. self._method,
  1321. None,
  1322. _determine_deadline(deadline),
  1323. augmented_metadata,
  1324. None if credentials is None else credentials._credentials,
  1325. _stream_unary_invocation_operations_and_tags(
  1326. augmented_metadata, initial_metadata_flags
  1327. ),
  1328. self._context,
  1329. self._registered_call_handle,
  1330. )
  1331. _consume_request_iterator(
  1332. request_iterator, state, call, self._request_serializer, None
  1333. )
  1334. while True:
  1335. event = call.next_event()
  1336. with state.condition:
  1337. _handle_event(event, state, self._response_deserializer)
  1338. state.condition.notify_all()
  1339. if not state.due:
  1340. break
  1341. return state, call
  1342. def __call__(
  1343. self,
  1344. request_iterator: Iterator,
  1345. timeout: Optional[float] = None,
  1346. metadata: Optional[MetadataType] = None,
  1347. credentials: Optional[grpc.CallCredentials] = None,
  1348. wait_for_ready: Optional[bool] = None,
  1349. compression: Optional[grpc.Compression] = None,
  1350. ) -> Any:
  1351. (
  1352. state,
  1353. call,
  1354. ) = self._blocking(
  1355. request_iterator,
  1356. timeout,
  1357. metadata,
  1358. credentials,
  1359. wait_for_ready,
  1360. compression,
  1361. )
  1362. return _end_unary_response_blocking(state, call, False, None)
  1363. def with_call(
  1364. self,
  1365. request_iterator: Iterator,
  1366. timeout: Optional[float] = None,
  1367. metadata: Optional[MetadataType] = None,
  1368. credentials: Optional[grpc.CallCredentials] = None,
  1369. wait_for_ready: Optional[bool] = None,
  1370. compression: Optional[grpc.Compression] = None,
  1371. ) -> Tuple[Any, grpc.Call]:
  1372. (
  1373. state,
  1374. call,
  1375. ) = self._blocking(
  1376. request_iterator,
  1377. timeout,
  1378. metadata,
  1379. credentials,
  1380. wait_for_ready,
  1381. compression,
  1382. )
  1383. return _end_unary_response_blocking(state, call, True, None)
  1384. def future(
  1385. self,
  1386. request_iterator: Iterator,
  1387. timeout: Optional[float] = None,
  1388. metadata: Optional[MetadataType] = None,
  1389. credentials: Optional[grpc.CallCredentials] = None,
  1390. wait_for_ready: Optional[bool] = None,
  1391. compression: Optional[grpc.Compression] = None,
  1392. ) -> _MultiThreadedRendezvous:
  1393. deadline = _deadline(timeout)
  1394. state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
  1395. event_handler = _event_handler(state, self._response_deserializer)
  1396. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  1397. wait_for_ready
  1398. )
  1399. augmented_metadata = _compression.augment_metadata(
  1400. metadata, compression
  1401. )
  1402. state.rpc_start_time = time.perf_counter()
  1403. state.method = _common.decode(self._method)
  1404. state.target = _common.decode(self._target)
  1405. call = self._managed_call(
  1406. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  1407. self._method,
  1408. None,
  1409. deadline,
  1410. augmented_metadata,
  1411. None if credentials is None else credentials._credentials,
  1412. _stream_unary_invocation_operations(
  1413. metadata, initial_metadata_flags
  1414. ),
  1415. event_handler,
  1416. self._context,
  1417. self._registered_call_handle,
  1418. )
  1419. _consume_request_iterator(
  1420. request_iterator,
  1421. state,
  1422. call,
  1423. self._request_serializer,
  1424. event_handler,
  1425. )
  1426. return _MultiThreadedRendezvous(
  1427. state, call, self._response_deserializer, deadline
  1428. )
  1429. class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
  1430. _channel: cygrpc.Channel
  1431. _managed_call: IntegratedCallFactory
  1432. _method: bytes
  1433. _target: bytes
  1434. _request_serializer: Optional[SerializingFunction]
  1435. _response_deserializer: Optional[DeserializingFunction]
  1436. _context: Any
  1437. _registered_call_handle: Optional[int]
  1438. __slots__ = [
  1439. "_channel",
  1440. "_managed_call",
  1441. "_method",
  1442. "_target",
  1443. "_request_serializer",
  1444. "_response_deserializer",
  1445. "_context",
  1446. ]
  1447. # pylint: disable=too-many-arguments
  1448. def __init__(
  1449. self,
  1450. channel: cygrpc.Channel,
  1451. managed_call: IntegratedCallFactory,
  1452. method: bytes,
  1453. target: bytes,
  1454. request_serializer: Optional[SerializingFunction],
  1455. response_deserializer: Optional[DeserializingFunction],
  1456. _registered_call_handle: Optional[int],
  1457. ):
  1458. self._channel = channel
  1459. self._managed_call = managed_call
  1460. self._method = method
  1461. self._target = target
  1462. self._request_serializer = request_serializer
  1463. self._response_deserializer = response_deserializer
  1464. self._context = cygrpc.build_census_context()
  1465. self._registered_call_handle = _registered_call_handle
  1466. def __call__(
  1467. self,
  1468. request_iterator: Iterator,
  1469. timeout: Optional[float] = None,
  1470. metadata: Optional[MetadataType] = None,
  1471. credentials: Optional[grpc.CallCredentials] = None,
  1472. wait_for_ready: Optional[bool] = None,
  1473. compression: Optional[grpc.Compression] = None,
  1474. ) -> _MultiThreadedRendezvous:
  1475. deadline = _deadline(timeout)
  1476. state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
  1477. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  1478. wait_for_ready
  1479. )
  1480. augmented_metadata = _compression.augment_metadata(
  1481. metadata, compression
  1482. )
  1483. operations = (
  1484. (
  1485. cygrpc.SendInitialMetadataOperation(
  1486. augmented_metadata, initial_metadata_flags
  1487. ),
  1488. cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
  1489. ),
  1490. (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
  1491. )
  1492. event_handler = _event_handler(state, self._response_deserializer)
  1493. state.rpc_start_time = time.perf_counter()
  1494. state.method = _common.decode(self._method)
  1495. state.target = _common.decode(self._target)
  1496. call = self._managed_call(
  1497. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  1498. self._method,
  1499. None,
  1500. _determine_deadline(deadline),
  1501. augmented_metadata,
  1502. None if credentials is None else credentials._credentials,
  1503. operations,
  1504. event_handler,
  1505. self._context,
  1506. self._registered_call_handle,
  1507. )
  1508. _consume_request_iterator(
  1509. request_iterator,
  1510. state,
  1511. call,
  1512. self._request_serializer,
  1513. event_handler,
  1514. )
  1515. return _MultiThreadedRendezvous(
  1516. state, call, self._response_deserializer, deadline
  1517. )
  1518. class _InitialMetadataFlags(int):
  1519. """Stores immutable initial metadata flags"""
  1520. def __new__(cls, value: int = _EMPTY_FLAGS):
  1521. value &= cygrpc.InitialMetadataFlags.used_mask
  1522. return super(_InitialMetadataFlags, cls).__new__(cls, value)
  1523. def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int:
  1524. if wait_for_ready is not None:
  1525. if wait_for_ready:
  1526. return self.__class__(
  1527. self
  1528. | cygrpc.InitialMetadataFlags.wait_for_ready
  1529. | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
  1530. )
  1531. elif not wait_for_ready:
  1532. return self.__class__(
  1533. self & ~cygrpc.InitialMetadataFlags.wait_for_ready
  1534. | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
  1535. )
  1536. return self
  1537. class _ChannelCallState(object):
  1538. channel: cygrpc.Channel
  1539. managed_calls: int
  1540. threading: bool
  1541. def __init__(self, channel: cygrpc.Channel):
  1542. self.lock = threading.Lock()
  1543. self.channel = channel
  1544. self.managed_calls = 0
  1545. self.threading = False
  1546. def reset_postfork_child(self) -> None:
  1547. self.managed_calls = 0
  1548. def __del__(self):
  1549. try:
  1550. self.channel.close(
  1551. cygrpc.StatusCode.cancelled, "Channel deallocated!"
  1552. )
  1553. except (TypeError, AttributeError):
  1554. pass
  1555. def _run_channel_spin_thread(state: _ChannelCallState) -> None:
  1556. def channel_spin():
  1557. while True:
  1558. cygrpc.block_if_fork_in_progress(state)
  1559. event = state.channel.next_call_event()
  1560. if event.completion_type == cygrpc.CompletionType.queue_timeout:
  1561. continue
  1562. call_completed = event.tag(event)
  1563. if call_completed:
  1564. with state.lock:
  1565. state.managed_calls -= 1
  1566. if state.managed_calls == 0:
  1567. return
  1568. channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
  1569. channel_spin_thread.setDaemon(True)
  1570. channel_spin_thread.start()
  1571. def _channel_managed_call_management(state: _ChannelCallState):
  1572. # pylint: disable=too-many-arguments
  1573. def create(
  1574. flags: int,
  1575. method: bytes,
  1576. host: Optional[str],
  1577. deadline: Optional[float],
  1578. metadata: Optional[MetadataType],
  1579. credentials: Optional[cygrpc.CallCredentials],
  1580. operations: Sequence[Sequence[cygrpc.Operation]],
  1581. event_handler: UserTag,
  1582. context: Any,
  1583. _registered_call_handle: Optional[int],
  1584. ) -> cygrpc.IntegratedCall:
  1585. """Creates a cygrpc.IntegratedCall.
  1586. Args:
  1587. flags: An integer bitfield of call flags.
  1588. method: The RPC method.
  1589. host: A host string for the created call.
  1590. deadline: A float to be the deadline of the created call or None if
  1591. the call is to have an infinite deadline.
  1592. metadata: The metadata for the call or None.
  1593. credentials: A cygrpc.CallCredentials or None.
  1594. operations: A sequence of sequences of cygrpc.Operations to be
  1595. started on the call.
  1596. event_handler: A behavior to call to handle the events resultant from
  1597. the operations on the call.
  1598. context: Context object for distributed tracing.
  1599. _registered_call_handle: An int representing the call handle of the
  1600. method, or None if the method is not registered.
  1601. Returns:
  1602. A cygrpc.IntegratedCall with which to conduct an RPC.
  1603. """
  1604. operations_and_tags = tuple(
  1605. (
  1606. operation,
  1607. event_handler,
  1608. )
  1609. for operation in operations
  1610. )
  1611. with state.lock:
  1612. call = state.channel.integrated_call(
  1613. flags,
  1614. method,
  1615. host,
  1616. deadline,
  1617. metadata,
  1618. credentials,
  1619. operations_and_tags,
  1620. context,
  1621. _registered_call_handle,
  1622. )
  1623. if state.managed_calls == 0:
  1624. state.managed_calls = 1
  1625. _run_channel_spin_thread(state)
  1626. else:
  1627. state.managed_calls += 1
  1628. return call
  1629. return create
  1630. class _ChannelConnectivityState(object):
  1631. lock: threading.RLock
  1632. channel: grpc.Channel
  1633. polling: bool
  1634. connectivity: grpc.ChannelConnectivity
  1635. try_to_connect: bool
  1636. # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704
  1637. callbacks_and_connectivities: List[
  1638. Sequence[
  1639. Union[
  1640. Callable[[grpc.ChannelConnectivity], None],
  1641. Optional[grpc.ChannelConnectivity],
  1642. ]
  1643. ]
  1644. ]
  1645. delivering: bool
  1646. def __init__(self, channel: grpc.Channel):
  1647. self.lock = threading.RLock()
  1648. self.channel = channel
  1649. self.polling = False
  1650. self.connectivity = None
  1651. self.try_to_connect = False
  1652. self.callbacks_and_connectivities = []
  1653. self.delivering = False
  1654. def reset_postfork_child(self) -> None:
  1655. self.polling = False
  1656. self.connectivity = None
  1657. self.try_to_connect = False
  1658. self.callbacks_and_connectivities = []
  1659. self.delivering = False
  1660. def _deliveries(
  1661. state: _ChannelConnectivityState,
  1662. ) -> List[Callable[[grpc.ChannelConnectivity], None]]:
  1663. callbacks_needing_update = []
  1664. for callback_and_connectivity in state.callbacks_and_connectivities:
  1665. (
  1666. callback,
  1667. callback_connectivity,
  1668. ) = callback_and_connectivity
  1669. if callback_connectivity is not state.connectivity:
  1670. callbacks_needing_update.append(callback)
  1671. callback_and_connectivity[1] = state.connectivity
  1672. return callbacks_needing_update
  1673. def _deliver(
  1674. state: _ChannelConnectivityState,
  1675. initial_connectivity: grpc.ChannelConnectivity,
  1676. initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
  1677. ) -> None:
  1678. connectivity = initial_connectivity
  1679. callbacks = initial_callbacks
  1680. while True:
  1681. for callback in callbacks:
  1682. cygrpc.block_if_fork_in_progress(state)
  1683. try:
  1684. callback(connectivity)
  1685. except Exception: # pylint: disable=broad-except
  1686. _LOGGER.exception(
  1687. _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE
  1688. )
  1689. with state.lock:
  1690. callbacks = _deliveries(state)
  1691. if callbacks:
  1692. connectivity = state.connectivity
  1693. else:
  1694. state.delivering = False
  1695. return
  1696. def _spawn_delivery(
  1697. state: _ChannelConnectivityState,
  1698. callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
  1699. ) -> None:
  1700. delivering_thread = cygrpc.ForkManagedThread(
  1701. target=_deliver,
  1702. args=(
  1703. state,
  1704. state.connectivity,
  1705. callbacks,
  1706. ),
  1707. )
  1708. delivering_thread.setDaemon(True)
  1709. delivering_thread.start()
  1710. state.delivering = True
  1711. # NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
  1712. def _poll_connectivity(
  1713. state: _ChannelConnectivityState,
  1714. channel: grpc.Channel,
  1715. initial_try_to_connect: bool,
  1716. ) -> None:
  1717. try_to_connect = initial_try_to_connect
  1718. connectivity = channel.check_connectivity_state(try_to_connect)
  1719. with state.lock:
  1720. state.connectivity = (
  1721. _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
  1722. connectivity
  1723. ]
  1724. )
  1725. callbacks = tuple(
  1726. callback for callback, _ in state.callbacks_and_connectivities
  1727. )
  1728. for callback_and_connectivity in state.callbacks_and_connectivities:
  1729. callback_and_connectivity[1] = state.connectivity
  1730. if callbacks:
  1731. _spawn_delivery(state, callbacks)
  1732. while True:
  1733. event = channel.watch_connectivity_state(
  1734. connectivity, time.time() + 0.2
  1735. )
  1736. cygrpc.block_if_fork_in_progress(state)
  1737. with state.lock:
  1738. if (
  1739. not state.callbacks_and_connectivities
  1740. and not state.try_to_connect
  1741. ):
  1742. state.polling = False
  1743. state.connectivity = None
  1744. break
  1745. try_to_connect = state.try_to_connect
  1746. state.try_to_connect = False
  1747. if event.success or try_to_connect:
  1748. connectivity = channel.check_connectivity_state(try_to_connect)
  1749. with state.lock:
  1750. state.connectivity = (
  1751. _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
  1752. connectivity
  1753. ]
  1754. )
  1755. if not state.delivering:
  1756. callbacks = _deliveries(state)
  1757. if callbacks:
  1758. _spawn_delivery(state, callbacks)
  1759. def _subscribe(
  1760. state: _ChannelConnectivityState,
  1761. callback: Callable[[grpc.ChannelConnectivity], None],
  1762. try_to_connect: bool,
  1763. ) -> None:
  1764. with state.lock:
  1765. if not state.callbacks_and_connectivities and not state.polling:
  1766. polling_thread = cygrpc.ForkManagedThread(
  1767. target=_poll_connectivity,
  1768. args=(state, state.channel, bool(try_to_connect)),
  1769. )
  1770. polling_thread.setDaemon(True)
  1771. polling_thread.start()
  1772. state.polling = True
  1773. state.callbacks_and_connectivities.append([callback, None])
  1774. elif not state.delivering and state.connectivity is not None:
  1775. _spawn_delivery(state, (callback,))
  1776. state.try_to_connect |= bool(try_to_connect)
  1777. state.callbacks_and_connectivities.append(
  1778. [callback, state.connectivity]
  1779. )
  1780. else:
  1781. state.try_to_connect |= bool(try_to_connect)
  1782. state.callbacks_and_connectivities.append([callback, None])
  1783. def _unsubscribe(
  1784. state: _ChannelConnectivityState,
  1785. callback: Callable[[grpc.ChannelConnectivity], None],
  1786. ) -> None:
  1787. with state.lock:
  1788. for index, (subscribed_callback, unused_connectivity) in enumerate(
  1789. state.callbacks_and_connectivities
  1790. ):
  1791. if callback == subscribed_callback:
  1792. state.callbacks_and_connectivities.pop(index)
  1793. break
  1794. def _augment_options(
  1795. base_options: Sequence[ChannelArgumentType],
  1796. compression: Optional[grpc.Compression],
  1797. ) -> Sequence[ChannelArgumentType]:
  1798. compression_option = _compression.create_channel_option(compression)
  1799. return (
  1800. tuple(base_options)
  1801. + compression_option
  1802. + (
  1803. (
  1804. cygrpc.ChannelArgKey.primary_user_agent_string,
  1805. _USER_AGENT,
  1806. ),
  1807. )
  1808. )
  1809. def _separate_channel_options(
  1810. options: Sequence[ChannelArgumentType],
  1811. ) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]:
  1812. """Separates core channel options from Python channel options."""
  1813. core_options = []
  1814. python_options = []
  1815. for pair in options:
  1816. if (
  1817. pair[0]
  1818. == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
  1819. ):
  1820. python_options.append(pair)
  1821. else:
  1822. core_options.append(pair)
  1823. return python_options, core_options
  1824. class Channel(grpc.Channel):
  1825. """A cygrpc.Channel-backed implementation of grpc.Channel."""
  1826. _single_threaded_unary_stream: bool
  1827. _channel: cygrpc.Channel
  1828. _call_state: _ChannelCallState
  1829. _connectivity_state: _ChannelConnectivityState
  1830. _target: str
  1831. _registered_call_handles: Dict[str, int]
  1832. def __init__(
  1833. self,
  1834. target: str,
  1835. options: Sequence[ChannelArgumentType],
  1836. credentials: Optional[grpc.ChannelCredentials],
  1837. compression: Optional[grpc.Compression],
  1838. ):
  1839. """Constructor.
  1840. Args:
  1841. target: The target to which to connect.
  1842. options: Configuration options for the channel.
  1843. credentials: A cygrpc.ChannelCredentials or None.
  1844. compression: An optional value indicating the compression method to be
  1845. used over the lifetime of the channel.
  1846. """
  1847. python_options, core_options = _separate_channel_options(options)
  1848. self._single_threaded_unary_stream = (
  1849. _DEFAULT_SINGLE_THREADED_UNARY_STREAM
  1850. )
  1851. self._process_python_options(python_options)
  1852. self._channel = cygrpc.Channel(
  1853. _common.encode(target),
  1854. _augment_options(core_options, compression),
  1855. credentials,
  1856. )
  1857. self._target = target
  1858. self._call_state = _ChannelCallState(self._channel)
  1859. self._connectivity_state = _ChannelConnectivityState(self._channel)
  1860. cygrpc.fork_register_channel(self)
  1861. if cygrpc.g_gevent_activated:
  1862. cygrpc.gevent_increment_channel_count()
  1863. def _get_registered_call_handle(self, method: str) -> int:
  1864. """
  1865. Get the registered call handle for a method.
  1866. This is a semi-private method. It is intended for use only by gRPC generated code.
  1867. This method is not thread-safe.
  1868. Args:
  1869. method: Required, the method name for the RPC.
  1870. Returns:
  1871. The registered call handle pointer in the form of a Python Long.
  1872. """
  1873. return self._channel.get_registered_call_handle(_common.encode(method))
  1874. def _process_python_options(
  1875. self, python_options: Sequence[ChannelArgumentType]
  1876. ) -> None:
  1877. """Sets channel attributes according to python-only channel options."""
  1878. for pair in python_options:
  1879. if (
  1880. pair[0]
  1881. == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
  1882. ):
  1883. self._single_threaded_unary_stream = True
  1884. def subscribe(
  1885. self,
  1886. callback: Callable[[grpc.ChannelConnectivity], None],
  1887. try_to_connect: Optional[bool] = None,
  1888. ) -> None:
  1889. _subscribe(self._connectivity_state, callback, try_to_connect)
  1890. def unsubscribe(
  1891. self, callback: Callable[[grpc.ChannelConnectivity], None]
  1892. ) -> None:
  1893. _unsubscribe(self._connectivity_state, callback)
  1894. # pylint: disable=arguments-differ
  1895. def unary_unary(
  1896. self,
  1897. method: str,
  1898. request_serializer: Optional[SerializingFunction] = None,
  1899. response_deserializer: Optional[DeserializingFunction] = None,
  1900. _registered_method: Optional[bool] = False,
  1901. ) -> grpc.UnaryUnaryMultiCallable:
  1902. _registered_call_handle = None
  1903. if _registered_method:
  1904. _registered_call_handle = self._get_registered_call_handle(method)
  1905. return _UnaryUnaryMultiCallable(
  1906. self._channel,
  1907. _channel_managed_call_management(self._call_state),
  1908. _common.encode(method),
  1909. _common.encode(self._target),
  1910. request_serializer,
  1911. response_deserializer,
  1912. _registered_call_handle,
  1913. )
  1914. # pylint: disable=arguments-differ
  1915. def unary_stream(
  1916. self,
  1917. method: str,
  1918. request_serializer: Optional[SerializingFunction] = None,
  1919. response_deserializer: Optional[DeserializingFunction] = None,
  1920. _registered_method: Optional[bool] = False,
  1921. ) -> grpc.UnaryStreamMultiCallable:
  1922. _registered_call_handle = None
  1923. if _registered_method:
  1924. _registered_call_handle = self._get_registered_call_handle(method)
  1925. # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
  1926. # on a single Python thread results in an appreciable speed-up. However,
  1927. # due to slight differences in capability, the multi-threaded variant
  1928. # remains the default.
  1929. if self._single_threaded_unary_stream:
  1930. return _SingleThreadedUnaryStreamMultiCallable(
  1931. self._channel,
  1932. _common.encode(method),
  1933. _common.encode(self._target),
  1934. request_serializer,
  1935. response_deserializer,
  1936. _registered_call_handle,
  1937. )
  1938. else:
  1939. return _UnaryStreamMultiCallable(
  1940. self._channel,
  1941. _channel_managed_call_management(self._call_state),
  1942. _common.encode(method),
  1943. _common.encode(self._target),
  1944. request_serializer,
  1945. response_deserializer,
  1946. _registered_call_handle,
  1947. )
  1948. # pylint: disable=arguments-differ
  1949. def stream_unary(
  1950. self,
  1951. method: str,
  1952. request_serializer: Optional[SerializingFunction] = None,
  1953. response_deserializer: Optional[DeserializingFunction] = None,
  1954. _registered_method: Optional[bool] = False,
  1955. ) -> grpc.StreamUnaryMultiCallable:
  1956. _registered_call_handle = None
  1957. if _registered_method:
  1958. _registered_call_handle = self._get_registered_call_handle(method)
  1959. return _StreamUnaryMultiCallable(
  1960. self._channel,
  1961. _channel_managed_call_management(self._call_state),
  1962. _common.encode(method),
  1963. _common.encode(self._target),
  1964. request_serializer,
  1965. response_deserializer,
  1966. _registered_call_handle,
  1967. )
  1968. # pylint: disable=arguments-differ
  1969. def stream_stream(
  1970. self,
  1971. method: str,
  1972. request_serializer: Optional[SerializingFunction] = None,
  1973. response_deserializer: Optional[DeserializingFunction] = None,
  1974. _registered_method: Optional[bool] = False,
  1975. ) -> grpc.StreamStreamMultiCallable:
  1976. _registered_call_handle = None
  1977. if _registered_method:
  1978. _registered_call_handle = self._get_registered_call_handle(method)
  1979. return _StreamStreamMultiCallable(
  1980. self._channel,
  1981. _channel_managed_call_management(self._call_state),
  1982. _common.encode(method),
  1983. _common.encode(self._target),
  1984. request_serializer,
  1985. response_deserializer,
  1986. _registered_call_handle,
  1987. )
  1988. def _unsubscribe_all(self) -> None:
  1989. state = self._connectivity_state
  1990. if state:
  1991. with state.lock:
  1992. del state.callbacks_and_connectivities[:]
  1993. def _close(self) -> None:
  1994. self._unsubscribe_all()
  1995. self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!")
  1996. cygrpc.fork_unregister_channel(self)
  1997. if cygrpc.g_gevent_activated:
  1998. cygrpc.gevent_decrement_channel_count()
  1999. def _close_on_fork(self) -> None:
  2000. self._unsubscribe_all()
  2001. self._channel.close_on_fork(
  2002. cygrpc.StatusCode.cancelled, "Channel closed due to fork"
  2003. )
  2004. def __enter__(self):
  2005. return self
  2006. def __exit__(self, exc_type, exc_val, exc_tb):
  2007. self._close()
  2008. return False
  2009. def close(self) -> None:
  2010. self._close()
  2011. def __del__(self):
  2012. # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
  2013. # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
  2014. # here (or more likely, call self._close() here). We don't do this today
  2015. # because many valid use cases today allow the channel to be deleted
  2016. # immediately after stubs are created. After a sufficient period of time
  2017. # has passed for all users to be trusted to freeze out to their channels
  2018. # for as long as they are in use and to close them after using them,
  2019. # then deletion of this grpc._channel.Channel instance can be made to
  2020. # effect closure of the underlying cygrpc.Channel instance.
  2021. try:
  2022. self._unsubscribe_all()
  2023. except: # pylint: disable=bare-except
  2024. # Exceptions in __del__ are ignored by Python anyway, but they can
  2025. # keep spamming logs. Just silence them.
  2026. pass