_utilities.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. # Copyright 2015 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Internal utilities for gRPC Python."""
  15. import collections
  16. import logging
  17. import threading
  18. import time
  19. from typing import Callable, Dict, Optional, Sequence
  20. import grpc # pytype: disable=pyi-error
  21. from grpc import _common # pytype: disable=pyi-error
  22. from grpc._typing import DoneCallbackType
  23. _LOGGER = logging.getLogger(__name__)
  24. _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
  25. 'Exception calling connectivity future "done" callback!'
  26. )
  27. class RpcMethodHandler(
  28. collections.namedtuple(
  29. "_RpcMethodHandler",
  30. (
  31. "request_streaming",
  32. "response_streaming",
  33. "request_deserializer",
  34. "response_serializer",
  35. "unary_unary",
  36. "unary_stream",
  37. "stream_unary",
  38. "stream_stream",
  39. ),
  40. ),
  41. grpc.RpcMethodHandler,
  42. ):
  43. pass
  44. class DictionaryGenericHandler(grpc.ServiceRpcHandler):
  45. _name: str
  46. _method_handlers: Dict[str, grpc.RpcMethodHandler]
  47. def __init__(
  48. self, service: str, method_handlers: Dict[str, grpc.RpcMethodHandler]
  49. ):
  50. self._name = service
  51. self._method_handlers = {
  52. _common.fully_qualified_method(service, method): method_handler
  53. for method, method_handler in method_handlers.items()
  54. }
  55. def service_name(self) -> str:
  56. return self._name
  57. def service(
  58. self, handler_call_details: grpc.HandlerCallDetails
  59. ) -> Optional[grpc.RpcMethodHandler]:
  60. details_method = handler_call_details.method
  61. return self._method_handlers.get(
  62. details_method
  63. ) # pytype: disable=attribute-error
  64. class _ChannelReadyFuture(grpc.Future):
  65. _condition: threading.Condition
  66. _channel: grpc.Channel
  67. _matured: bool
  68. _cancelled: bool
  69. _done_callbacks: Sequence[Callable]
  70. def __init__(self, channel: grpc.Channel):
  71. self._condition = threading.Condition()
  72. self._channel = channel
  73. self._matured = False
  74. self._cancelled = False
  75. self._done_callbacks = []
  76. def _block(self, timeout: Optional[float]) -> None:
  77. until = None if timeout is None else time.time() + timeout
  78. with self._condition:
  79. while True:
  80. if self._cancelled:
  81. raise grpc.FutureCancelledError()
  82. elif self._matured:
  83. return
  84. else:
  85. if until is None:
  86. self._condition.wait()
  87. else:
  88. remaining = until - time.time()
  89. if remaining < 0:
  90. raise grpc.FutureTimeoutError()
  91. else:
  92. self._condition.wait(timeout=remaining)
  93. def _update(self, connectivity: Optional[grpc.ChannelConnectivity]) -> None:
  94. with self._condition:
  95. if (
  96. not self._cancelled
  97. and connectivity is grpc.ChannelConnectivity.READY
  98. ):
  99. self._matured = True
  100. self._channel.unsubscribe(self._update)
  101. self._condition.notify_all()
  102. done_callbacks = tuple(self._done_callbacks)
  103. self._done_callbacks = None
  104. else:
  105. return
  106. for done_callback in done_callbacks:
  107. try:
  108. done_callback(self)
  109. except Exception: # pylint: disable=broad-except
  110. _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE)
  111. def cancel(self) -> bool:
  112. with self._condition:
  113. if not self._matured:
  114. self._cancelled = True
  115. self._channel.unsubscribe(self._update)
  116. self._condition.notify_all()
  117. done_callbacks = tuple(self._done_callbacks)
  118. self._done_callbacks = None
  119. else:
  120. return False
  121. for done_callback in done_callbacks:
  122. try:
  123. done_callback(self)
  124. except Exception: # pylint: disable=broad-except
  125. _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE)
  126. return True
  127. def cancelled(self) -> bool:
  128. with self._condition:
  129. return self._cancelled
  130. def running(self) -> bool:
  131. with self._condition:
  132. return not self._cancelled and not self._matured
  133. def done(self) -> bool:
  134. with self._condition:
  135. return self._cancelled or self._matured
  136. def result(self, timeout: Optional[float] = None) -> None:
  137. self._block(timeout)
  138. def exception(self, timeout: Optional[float] = None) -> None:
  139. self._block(timeout)
  140. def traceback(self, timeout: Optional[float] = None) -> None:
  141. self._block(timeout)
  142. def add_done_callback(self, fn: DoneCallbackType):
  143. with self._condition:
  144. if not self._cancelled and not self._matured:
  145. self._done_callbacks.append(fn)
  146. return
  147. fn(self)
  148. def start(self):
  149. with self._condition:
  150. self._channel.subscribe(self._update, try_to_connect=True)
  151. def __del__(self):
  152. with self._condition:
  153. if not self._cancelled and not self._matured:
  154. self._channel.unsubscribe(self._update)
  155. def channel_ready_future(channel: grpc.Channel) -> _ChannelReadyFuture:
  156. ready_future = _ChannelReadyFuture(channel)
  157. ready_future.start()
  158. return ready_future
  159. def first_version_is_lower(version1: str, version2: str) -> bool:
  160. """
  161. Compares two versions in the format '1.60.1' or '1.60.1.dev0'.
  162. This method will be used in all stubs generated by grpcio-tools to check whether
  163. the stub version is compatible with the runtime grpcio.
  164. Args:
  165. version1: The first version string.
  166. version2: The second version string.
  167. Returns:
  168. True if version1 is lower, False otherwise.
  169. """
  170. version1_list = version1.split(".")
  171. version2_list = version2.split(".")
  172. try:
  173. for i in range(3):
  174. if int(version1_list[i]) < int(version2_list[i]):
  175. return True
  176. elif int(version1_list[i]) > int(version2_list[i]):
  177. return False
  178. except ValueError:
  179. # Return false in case we can't convert version to int.
  180. return False
  181. # The version without dev0 will be considered lower.
  182. return len(version1_list) < len(version2_list)