_server_adaptations.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. # Copyright 2016 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Translates gRPC's server-side API into gRPC's server-side Beta API."""
  15. import collections
  16. import threading
  17. import grpc
  18. from grpc import _common
  19. from grpc.beta import _metadata
  20. from grpc.beta import interfaces
  21. from grpc.framework.common import cardinality
  22. from grpc.framework.common import style
  23. from grpc.framework.foundation import abandonment
  24. from grpc.framework.foundation import logging_pool
  25. from grpc.framework.foundation import stream
  26. from grpc.framework.interfaces.face import face
  27. # pylint: disable=too-many-return-statements
  28. _DEFAULT_POOL_SIZE = 8
  29. class _ServerProtocolContext(interfaces.GRPCServicerContext):
  30. def __init__(self, servicer_context):
  31. self._servicer_context = servicer_context
  32. def peer(self):
  33. return self._servicer_context.peer()
  34. def disable_next_response_compression(self):
  35. pass # TODO(https://github.com/grpc/grpc/issues/4078): design, implement.
  36. class _FaceServicerContext(face.ServicerContext):
  37. def __init__(self, servicer_context):
  38. self._servicer_context = servicer_context
  39. def is_active(self):
  40. return self._servicer_context.is_active()
  41. def time_remaining(self):
  42. return self._servicer_context.time_remaining()
  43. def add_abortion_callback(self, abortion_callback):
  44. raise NotImplementedError(
  45. "add_abortion_callback no longer supported server-side!"
  46. )
  47. def cancel(self):
  48. self._servicer_context.cancel()
  49. def protocol_context(self):
  50. return _ServerProtocolContext(self._servicer_context)
  51. def invocation_metadata(self):
  52. return _metadata.beta(self._servicer_context.invocation_metadata())
  53. def initial_metadata(self, initial_metadata):
  54. self._servicer_context.send_initial_metadata(
  55. _metadata.unbeta(initial_metadata)
  56. )
  57. def terminal_metadata(self, terminal_metadata):
  58. self._servicer_context.set_terminal_metadata(
  59. _metadata.unbeta(terminal_metadata)
  60. )
  61. def code(self, code):
  62. self._servicer_context.set_code(code)
  63. def details(self, details):
  64. self._servicer_context.set_details(details)
  65. def _adapt_unary_request_inline(unary_request_inline):
  66. def adaptation(request, servicer_context):
  67. return unary_request_inline(
  68. request, _FaceServicerContext(servicer_context)
  69. )
  70. return adaptation
  71. def _adapt_stream_request_inline(stream_request_inline):
  72. def adaptation(request_iterator, servicer_context):
  73. return stream_request_inline(
  74. request_iterator, _FaceServicerContext(servicer_context)
  75. )
  76. return adaptation
  77. class _Callback(stream.Consumer):
  78. def __init__(self):
  79. self._condition = threading.Condition()
  80. self._values = []
  81. self._terminated = False
  82. self._cancelled = False
  83. def consume(self, value):
  84. with self._condition:
  85. self._values.append(value)
  86. self._condition.notify_all()
  87. def terminate(self):
  88. with self._condition:
  89. self._terminated = True
  90. self._condition.notify_all()
  91. def consume_and_terminate(self, value):
  92. with self._condition:
  93. self._values.append(value)
  94. self._terminated = True
  95. self._condition.notify_all()
  96. def cancel(self):
  97. with self._condition:
  98. self._cancelled = True
  99. self._condition.notify_all()
  100. def draw_one_value(self):
  101. with self._condition:
  102. while True:
  103. if self._cancelled:
  104. raise abandonment.Abandoned()
  105. elif self._values:
  106. return self._values.pop(0)
  107. elif self._terminated:
  108. return None
  109. else:
  110. self._condition.wait()
  111. def draw_all_values(self):
  112. with self._condition:
  113. while True:
  114. if self._cancelled:
  115. raise abandonment.Abandoned()
  116. elif self._terminated:
  117. all_values = tuple(self._values)
  118. self._values = None
  119. return all_values
  120. else:
  121. self._condition.wait()
  122. def _run_request_pipe_thread(
  123. request_iterator, request_consumer, servicer_context
  124. ):
  125. thread_joined = threading.Event()
  126. def pipe_requests():
  127. for request in request_iterator:
  128. if not servicer_context.is_active() or thread_joined.is_set():
  129. return
  130. request_consumer.consume(request)
  131. if not servicer_context.is_active() or thread_joined.is_set():
  132. return
  133. request_consumer.terminate()
  134. request_pipe_thread = threading.Thread(target=pipe_requests)
  135. request_pipe_thread.daemon = True
  136. request_pipe_thread.start()
  137. def _adapt_unary_unary_event(unary_unary_event):
  138. def adaptation(request, servicer_context):
  139. callback = _Callback()
  140. if not servicer_context.add_callback(callback.cancel):
  141. raise abandonment.Abandoned()
  142. unary_unary_event(
  143. request,
  144. callback.consume_and_terminate,
  145. _FaceServicerContext(servicer_context),
  146. )
  147. return callback.draw_all_values()[0]
  148. return adaptation
  149. def _adapt_unary_stream_event(unary_stream_event):
  150. def adaptation(request, servicer_context):
  151. callback = _Callback()
  152. if not servicer_context.add_callback(callback.cancel):
  153. raise abandonment.Abandoned()
  154. unary_stream_event(
  155. request, callback, _FaceServicerContext(servicer_context)
  156. )
  157. while True:
  158. response = callback.draw_one_value()
  159. if response is None:
  160. return
  161. else:
  162. yield response
  163. return adaptation
  164. def _adapt_stream_unary_event(stream_unary_event):
  165. def adaptation(request_iterator, servicer_context):
  166. callback = _Callback()
  167. if not servicer_context.add_callback(callback.cancel):
  168. raise abandonment.Abandoned()
  169. request_consumer = stream_unary_event(
  170. callback.consume_and_terminate,
  171. _FaceServicerContext(servicer_context),
  172. )
  173. _run_request_pipe_thread(
  174. request_iterator, request_consumer, servicer_context
  175. )
  176. return callback.draw_all_values()[0]
  177. return adaptation
  178. def _adapt_stream_stream_event(stream_stream_event):
  179. def adaptation(request_iterator, servicer_context):
  180. callback = _Callback()
  181. if not servicer_context.add_callback(callback.cancel):
  182. raise abandonment.Abandoned()
  183. request_consumer = stream_stream_event(
  184. callback, _FaceServicerContext(servicer_context)
  185. )
  186. _run_request_pipe_thread(
  187. request_iterator, request_consumer, servicer_context
  188. )
  189. while True:
  190. response = callback.draw_one_value()
  191. if response is None:
  192. return
  193. else:
  194. yield response
  195. return adaptation
  196. class _SimpleMethodHandler(
  197. collections.namedtuple(
  198. "_MethodHandler",
  199. (
  200. "request_streaming",
  201. "response_streaming",
  202. "request_deserializer",
  203. "response_serializer",
  204. "unary_unary",
  205. "unary_stream",
  206. "stream_unary",
  207. "stream_stream",
  208. ),
  209. ),
  210. grpc.RpcMethodHandler,
  211. ):
  212. pass
  213. def _simple_method_handler(
  214. implementation, request_deserializer, response_serializer
  215. ):
  216. if implementation.style is style.Service.INLINE:
  217. if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
  218. return _SimpleMethodHandler(
  219. False,
  220. False,
  221. request_deserializer,
  222. response_serializer,
  223. _adapt_unary_request_inline(implementation.unary_unary_inline),
  224. None,
  225. None,
  226. None,
  227. )
  228. elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
  229. return _SimpleMethodHandler(
  230. False,
  231. True,
  232. request_deserializer,
  233. response_serializer,
  234. None,
  235. _adapt_unary_request_inline(implementation.unary_stream_inline),
  236. None,
  237. None,
  238. )
  239. elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
  240. return _SimpleMethodHandler(
  241. True,
  242. False,
  243. request_deserializer,
  244. response_serializer,
  245. None,
  246. None,
  247. _adapt_stream_request_inline(
  248. implementation.stream_unary_inline
  249. ),
  250. None,
  251. )
  252. elif (
  253. implementation.cardinality is cardinality.Cardinality.STREAM_STREAM
  254. ):
  255. return _SimpleMethodHandler(
  256. True,
  257. True,
  258. request_deserializer,
  259. response_serializer,
  260. None,
  261. None,
  262. None,
  263. _adapt_stream_request_inline(
  264. implementation.stream_stream_inline
  265. ),
  266. )
  267. elif implementation.style is style.Service.EVENT:
  268. if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
  269. return _SimpleMethodHandler(
  270. False,
  271. False,
  272. request_deserializer,
  273. response_serializer,
  274. _adapt_unary_unary_event(implementation.unary_unary_event),
  275. None,
  276. None,
  277. None,
  278. )
  279. elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
  280. return _SimpleMethodHandler(
  281. False,
  282. True,
  283. request_deserializer,
  284. response_serializer,
  285. None,
  286. _adapt_unary_stream_event(implementation.unary_stream_event),
  287. None,
  288. None,
  289. )
  290. elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
  291. return _SimpleMethodHandler(
  292. True,
  293. False,
  294. request_deserializer,
  295. response_serializer,
  296. None,
  297. None,
  298. _adapt_stream_unary_event(implementation.stream_unary_event),
  299. None,
  300. )
  301. elif (
  302. implementation.cardinality is cardinality.Cardinality.STREAM_STREAM
  303. ):
  304. return _SimpleMethodHandler(
  305. True,
  306. True,
  307. request_deserializer,
  308. response_serializer,
  309. None,
  310. None,
  311. None,
  312. _adapt_stream_stream_event(implementation.stream_stream_event),
  313. )
  314. raise ValueError()
  315. def _flatten_method_pair_map(method_pair_map):
  316. method_pair_map = method_pair_map or {}
  317. flat_map = {}
  318. for method_pair in method_pair_map:
  319. method = _common.fully_qualified_method(method_pair[0], method_pair[1])
  320. flat_map[method] = method_pair_map[method_pair]
  321. return flat_map
  322. class _GenericRpcHandler(grpc.GenericRpcHandler):
  323. def __init__(
  324. self,
  325. method_implementations,
  326. multi_method_implementation,
  327. request_deserializers,
  328. response_serializers,
  329. ):
  330. self._method_implementations = _flatten_method_pair_map(
  331. method_implementations
  332. )
  333. self._request_deserializers = _flatten_method_pair_map(
  334. request_deserializers
  335. )
  336. self._response_serializers = _flatten_method_pair_map(
  337. response_serializers
  338. )
  339. self._multi_method_implementation = multi_method_implementation
  340. def service(self, handler_call_details):
  341. method_implementation = self._method_implementations.get(
  342. handler_call_details.method
  343. )
  344. if method_implementation is not None:
  345. return _simple_method_handler(
  346. method_implementation,
  347. self._request_deserializers.get(handler_call_details.method),
  348. self._response_serializers.get(handler_call_details.method),
  349. )
  350. elif self._multi_method_implementation is None:
  351. return None
  352. else:
  353. try:
  354. return None # TODO(nathaniel): call the multimethod.
  355. except face.NoSuchMethodError:
  356. return None
  357. class _Server(interfaces.Server):
  358. def __init__(self, grpc_server):
  359. self._grpc_server = grpc_server
  360. def add_insecure_port(self, address):
  361. return self._grpc_server.add_insecure_port(address)
  362. def add_secure_port(self, address, server_credentials):
  363. return self._grpc_server.add_secure_port(address, server_credentials)
  364. def start(self):
  365. self._grpc_server.start()
  366. def stop(self, grace):
  367. return self._grpc_server.stop(grace)
  368. def __enter__(self):
  369. self._grpc_server.start()
  370. return self
  371. def __exit__(self, exc_type, exc_val, exc_tb):
  372. self._grpc_server.stop(None)
  373. return False
  374. def server(
  375. service_implementations,
  376. multi_method_implementation,
  377. request_deserializers,
  378. response_serializers,
  379. thread_pool,
  380. thread_pool_size,
  381. ):
  382. generic_rpc_handler = _GenericRpcHandler(
  383. service_implementations,
  384. multi_method_implementation,
  385. request_deserializers,
  386. response_serializers,
  387. )
  388. if thread_pool is None:
  389. effective_thread_pool = logging_pool.pool(
  390. _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size
  391. )
  392. else:
  393. effective_thread_pool = thread_pool
  394. return _Server(
  395. grpc.server(effective_thread_pool, handlers=(generic_rpc_handler,))
  396. )