_call.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764
  1. # Copyright 2019 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Invocation-side implementation of gRPC Asyncio Python."""
  15. import asyncio
  16. import enum
  17. from functools import partial
  18. import inspect
  19. import logging
  20. import traceback
  21. from typing import (
  22. Any,
  23. AsyncIterator,
  24. Generator,
  25. Generic,
  26. Optional,
  27. Tuple,
  28. Union,
  29. )
  30. import grpc
  31. from grpc import _common
  32. from grpc._cython import cygrpc
  33. from . import _base_call
  34. from ._metadata import Metadata
  35. from ._typing import DeserializingFunction
  36. from ._typing import DoneCallbackType
  37. from ._typing import EOFType
  38. from ._typing import MetadatumType
  39. from ._typing import RequestIterableType
  40. from ._typing import RequestType
  41. from ._typing import ResponseType
  42. from ._typing import SerializingFunction
  43. __all__ = "AioRpcError", "Call", "UnaryUnaryCall", "UnaryStreamCall"
  44. _LOCAL_CANCELLATION_DETAILS = "Locally cancelled by application!"
  45. _GC_CANCELLATION_DETAILS = "Cancelled upon garbage collection!"
  46. _RPC_ALREADY_FINISHED_DETAILS = "RPC already finished."
  47. _RPC_HALF_CLOSED_DETAILS = 'RPC is half closed after calling "done_writing".'
  48. _API_STYLE_ERROR = (
  49. "The iterator and read/write APIs may not be mixed on a single RPC."
  50. )
  51. _OK_CALL_REPRESENTATION = (
  52. '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
  53. )
  54. _NON_OK_CALL_REPRESENTATION = (
  55. "<{} of RPC that terminated with:\n"
  56. "\tstatus = {}\n"
  57. '\tdetails = "{}"\n'
  58. '\tdebug_error_string = "{}"\n'
  59. ">"
  60. )
  61. _LOGGER = logging.getLogger(__name__)
  62. class AioRpcError(grpc.RpcError):
  63. """An implementation of RpcError to be used by the asynchronous API.
  64. Raised RpcError is a snapshot of the final status of the RPC, values are
  65. determined. Hence, its methods no longer needs to be coroutines.
  66. """
  67. _code: grpc.StatusCode
  68. _details: Optional[str]
  69. _initial_metadata: Optional[Metadata]
  70. _trailing_metadata: Optional[Metadata]
  71. _debug_error_string: Optional[str]
  72. def __init__(
  73. self,
  74. code: grpc.StatusCode,
  75. initial_metadata: Metadata,
  76. trailing_metadata: Metadata,
  77. details: Optional[str] = None,
  78. debug_error_string: Optional[str] = None,
  79. ) -> None:
  80. """Constructor.
  81. Args:
  82. code: The status code with which the RPC has been finalized.
  83. details: Optional details explaining the reason of the error.
  84. initial_metadata: Optional initial metadata that could be sent by the
  85. Server.
  86. trailing_metadata: Optional metadata that could be sent by the Server.
  87. """
  88. super().__init__()
  89. self._code = code
  90. self._details = details
  91. self._initial_metadata = initial_metadata
  92. self._trailing_metadata = trailing_metadata
  93. self._debug_error_string = debug_error_string
  94. def code(self) -> grpc.StatusCode:
  95. """Accesses the status code sent by the server.
  96. Returns:
  97. The `grpc.StatusCode` status code.
  98. """
  99. return self._code
  100. def details(self) -> Optional[str]:
  101. """Accesses the details sent by the server.
  102. Returns:
  103. The description of the error.
  104. """
  105. return self._details
  106. def initial_metadata(self) -> Metadata:
  107. """Accesses the initial metadata sent by the server.
  108. Returns:
  109. The initial metadata received.
  110. """
  111. return self._initial_metadata
  112. def trailing_metadata(self) -> Metadata:
  113. """Accesses the trailing metadata sent by the server.
  114. Returns:
  115. The trailing metadata received.
  116. """
  117. return self._trailing_metadata
  118. def debug_error_string(self) -> str:
  119. """Accesses the debug error string sent by the server.
  120. Returns:
  121. The debug error string received.
  122. """
  123. return self._debug_error_string
  124. def _repr(self) -> str:
  125. """Assembles the error string for the RPC error."""
  126. return _NON_OK_CALL_REPRESENTATION.format(
  127. self.__class__.__name__,
  128. self._code,
  129. self._details,
  130. self._debug_error_string,
  131. )
  132. def __repr__(self) -> str:
  133. return self._repr()
  134. def __str__(self) -> str:
  135. return self._repr()
  136. def __reduce__(self):
  137. return (
  138. type(self),
  139. (
  140. self._code,
  141. self._initial_metadata,
  142. self._trailing_metadata,
  143. self._details,
  144. self._debug_error_string,
  145. ),
  146. )
  147. def _create_rpc_error(
  148. initial_metadata: Metadata, status: cygrpc.AioRpcStatus
  149. ) -> AioRpcError:
  150. return AioRpcError(
  151. _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()],
  152. Metadata.from_tuple(initial_metadata),
  153. Metadata.from_tuple(status.trailing_metadata()),
  154. details=status.details(),
  155. debug_error_string=status.debug_error_string(),
  156. )
  157. class Call:
  158. """Base implementation of client RPC Call object.
  159. Implements logic around final status, metadata and cancellation.
  160. """
  161. _loop: asyncio.AbstractEventLoop
  162. _code: grpc.StatusCode
  163. _cython_call: cygrpc._AioCall
  164. _metadata: Tuple[MetadatumType, ...]
  165. _request_serializer: SerializingFunction
  166. _response_deserializer: DeserializingFunction
  167. def __init__(
  168. self,
  169. cython_call: cygrpc._AioCall,
  170. metadata: Metadata,
  171. request_serializer: SerializingFunction,
  172. response_deserializer: DeserializingFunction,
  173. loop: asyncio.AbstractEventLoop,
  174. ) -> None:
  175. self._loop = loop
  176. self._cython_call = cython_call
  177. self._metadata = tuple(metadata)
  178. self._request_serializer = request_serializer
  179. self._response_deserializer = response_deserializer
  180. def __del__(self) -> None:
  181. # The '_cython_call' object might be destructed before Call object
  182. if hasattr(self, "_cython_call"):
  183. if not self._cython_call.done():
  184. self._cancel(_GC_CANCELLATION_DETAILS)
  185. def cancelled(self) -> bool:
  186. return self._cython_call.cancelled()
  187. def _cancel(self, details: str) -> bool:
  188. """Forwards the application cancellation reasoning."""
  189. if not self._cython_call.done():
  190. self._cython_call.cancel(details)
  191. return True
  192. else:
  193. return False
  194. def cancel(self) -> bool:
  195. return self._cancel(_LOCAL_CANCELLATION_DETAILS)
  196. def done(self) -> bool:
  197. return self._cython_call.done()
  198. def add_done_callback(self, callback: DoneCallbackType) -> None:
  199. cb = partial(callback, self)
  200. self._cython_call.add_done_callback(cb)
  201. def time_remaining(self) -> Optional[float]:
  202. return self._cython_call.time_remaining()
  203. async def initial_metadata(self) -> Metadata:
  204. raw_metadata_tuple = await self._cython_call.initial_metadata()
  205. return Metadata.from_tuple(raw_metadata_tuple)
  206. async def trailing_metadata(self) -> Metadata:
  207. raw_metadata_tuple = (
  208. await self._cython_call.status()
  209. ).trailing_metadata()
  210. return Metadata.from_tuple(raw_metadata_tuple)
  211. async def code(self) -> grpc.StatusCode:
  212. cygrpc_code = (await self._cython_call.status()).code()
  213. return _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[cygrpc_code]
  214. async def details(self) -> str:
  215. return (await self._cython_call.status()).details()
  216. async def debug_error_string(self) -> str:
  217. return (await self._cython_call.status()).debug_error_string()
  218. async def _raise_for_status(self) -> None:
  219. if self._cython_call.is_locally_cancelled():
  220. raise asyncio.CancelledError()
  221. code = await self.code()
  222. if code != grpc.StatusCode.OK:
  223. raise _create_rpc_error(
  224. await self.initial_metadata(), await self._cython_call.status()
  225. )
  226. def _repr(self) -> str:
  227. return repr(self._cython_call)
  228. def __repr__(self) -> str:
  229. return self._repr()
  230. def __str__(self) -> str:
  231. return self._repr()
  232. class _APIStyle(enum.IntEnum):
  233. UNKNOWN = 0
  234. ASYNC_GENERATOR = 1
  235. READER_WRITER = 2
  236. class _UnaryResponseMixin(Call, Generic[ResponseType]):
  237. _call_response: asyncio.Task
  238. def _init_unary_response_mixin(self, response_task: asyncio.Task):
  239. self._call_response = response_task
  240. def cancel(self) -> bool:
  241. if super().cancel():
  242. self._call_response.cancel()
  243. return True
  244. else:
  245. return False
  246. def __await__(self) -> Generator[Any, None, ResponseType]:
  247. """Wait till the ongoing RPC request finishes."""
  248. try:
  249. response = yield from self._call_response
  250. except asyncio.CancelledError:
  251. # Even if we caught all other CancelledError, there is still
  252. # this corner case. If the application cancels immediately after
  253. # the Call object is created, we will observe this
  254. # `CancelledError`.
  255. if not self.cancelled():
  256. self.cancel()
  257. raise
  258. # NOTE(lidiz) If we raise RpcError in the task, and users doesn't
  259. # 'await' on it. AsyncIO will log 'Task exception was never retrieved'.
  260. # Instead, if we move the exception raising here, the spam stops.
  261. # Unfortunately, there can only be one 'yield from' in '__await__'. So,
  262. # we need to access the private instance variable.
  263. if response is cygrpc.EOF:
  264. if self._cython_call.is_locally_cancelled():
  265. raise asyncio.CancelledError()
  266. else:
  267. raise _create_rpc_error(
  268. self._cython_call._initial_metadata,
  269. self._cython_call._status,
  270. )
  271. else:
  272. return response
  273. class _StreamResponseMixin(Call):
  274. _message_aiter: AsyncIterator[ResponseType]
  275. _preparation: asyncio.Task
  276. _response_style: _APIStyle
  277. def _init_stream_response_mixin(self, preparation: asyncio.Task):
  278. self._message_aiter = None
  279. self._preparation = preparation
  280. self._response_style = _APIStyle.UNKNOWN
  281. def _update_response_style(self, style: _APIStyle):
  282. if self._response_style is _APIStyle.UNKNOWN:
  283. self._response_style = style
  284. elif self._response_style is not style:
  285. raise cygrpc.UsageError(_API_STYLE_ERROR)
  286. def cancel(self) -> bool:
  287. if super().cancel():
  288. self._preparation.cancel()
  289. return True
  290. else:
  291. return False
  292. async def _fetch_stream_responses(self) -> ResponseType:
  293. message = await self._read()
  294. while message is not cygrpc.EOF:
  295. yield message
  296. message = await self._read()
  297. # If the read operation failed, Core should explain why.
  298. await self._raise_for_status()
  299. def __aiter__(self) -> AsyncIterator[ResponseType]:
  300. self._update_response_style(_APIStyle.ASYNC_GENERATOR)
  301. if self._message_aiter is None:
  302. self._message_aiter = self._fetch_stream_responses()
  303. return self._message_aiter
  304. async def _read(self) -> ResponseType:
  305. # Wait for the request being sent
  306. await self._preparation
  307. # Reads response message from Core
  308. try:
  309. raw_response = await self._cython_call.receive_serialized_message()
  310. except asyncio.CancelledError:
  311. if not self.cancelled():
  312. self.cancel()
  313. raise
  314. if raw_response is cygrpc.EOF:
  315. return cygrpc.EOF
  316. else:
  317. return _common.deserialize(
  318. raw_response, self._response_deserializer
  319. )
  320. async def read(self) -> Union[EOFType, ResponseType]:
  321. if self.done():
  322. await self._raise_for_status()
  323. return cygrpc.EOF
  324. self._update_response_style(_APIStyle.READER_WRITER)
  325. response_message = await self._read()
  326. if response_message is cygrpc.EOF:
  327. # If the read operation failed, Core should explain why.
  328. await self._raise_for_status()
  329. return response_message
  330. class _StreamRequestMixin(Call):
  331. _metadata_sent: asyncio.Event
  332. _done_writing_flag: bool
  333. _async_request_poller: Optional[asyncio.Task]
  334. _request_style: _APIStyle
  335. def _init_stream_request_mixin(
  336. self, request_iterator: Optional[RequestIterableType]
  337. ):
  338. self._metadata_sent = asyncio.Event()
  339. self._done_writing_flag = False
  340. # If user passes in an async iterator, create a consumer Task.
  341. if request_iterator is not None:
  342. self._async_request_poller = self._loop.create_task(
  343. self._consume_request_iterator(request_iterator)
  344. )
  345. self._request_style = _APIStyle.ASYNC_GENERATOR
  346. else:
  347. self._async_request_poller = None
  348. self._request_style = _APIStyle.READER_WRITER
  349. def _raise_for_different_style(self, style: _APIStyle):
  350. if self._request_style is not style:
  351. raise cygrpc.UsageError(_API_STYLE_ERROR)
  352. def cancel(self) -> bool:
  353. if super().cancel():
  354. if self._async_request_poller is not None:
  355. self._async_request_poller.cancel()
  356. return True
  357. else:
  358. return False
  359. def _metadata_sent_observer(self):
  360. self._metadata_sent.set()
  361. async def _consume_request_iterator(
  362. self, request_iterator: RequestIterableType
  363. ) -> None:
  364. try:
  365. if inspect.isasyncgen(request_iterator) or hasattr(
  366. request_iterator, "__aiter__"
  367. ):
  368. async for request in request_iterator:
  369. try:
  370. await self._write(request)
  371. except AioRpcError as rpc_error:
  372. _LOGGER.debug(
  373. (
  374. "Exception while consuming the"
  375. " request_iterator: %s"
  376. ),
  377. rpc_error,
  378. )
  379. return
  380. else:
  381. for request in request_iterator:
  382. try:
  383. await self._write(request)
  384. except AioRpcError as rpc_error:
  385. _LOGGER.debug(
  386. (
  387. "Exception while consuming the"
  388. " request_iterator: %s"
  389. ),
  390. rpc_error,
  391. )
  392. return
  393. await self._done_writing()
  394. except: # pylint: disable=bare-except
  395. # Client iterators can raise exceptions, which we should handle by
  396. # cancelling the RPC and logging the client's error. No exceptions
  397. # should escape this function.
  398. _LOGGER.debug(
  399. "Client request_iterator raised exception:\n%s",
  400. traceback.format_exc(),
  401. )
  402. self.cancel()
  403. async def _write(self, request: RequestType) -> None:
  404. if self.done():
  405. raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
  406. if self._done_writing_flag:
  407. raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS)
  408. if not self._metadata_sent.is_set():
  409. await self._metadata_sent.wait()
  410. if self.done():
  411. await self._raise_for_status()
  412. serialized_request = _common.serialize(
  413. request, self._request_serializer
  414. )
  415. try:
  416. await self._cython_call.send_serialized_message(serialized_request)
  417. except cygrpc.InternalError as err:
  418. self._cython_call.set_internal_error(str(err))
  419. await self._raise_for_status()
  420. except asyncio.CancelledError:
  421. if not self.cancelled():
  422. self.cancel()
  423. raise
  424. async def _done_writing(self) -> None:
  425. if self.done():
  426. # If the RPC is finished, do nothing.
  427. return
  428. if not self._done_writing_flag:
  429. # If the done writing is not sent before, try to send it.
  430. self._done_writing_flag = True
  431. try:
  432. await self._cython_call.send_receive_close()
  433. except asyncio.CancelledError:
  434. if not self.cancelled():
  435. self.cancel()
  436. raise
  437. async def write(self, request: RequestType) -> None:
  438. self._raise_for_different_style(_APIStyle.READER_WRITER)
  439. await self._write(request)
  440. async def done_writing(self) -> None:
  441. """Signal peer that client is done writing.
  442. This method is idempotent.
  443. """
  444. self._raise_for_different_style(_APIStyle.READER_WRITER)
  445. await self._done_writing()
  446. async def wait_for_connection(self) -> None:
  447. await self._metadata_sent.wait()
  448. if self.done():
  449. await self._raise_for_status()
  450. class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall):
  451. """Object for managing unary-unary RPC calls.
  452. Returned when an instance of `UnaryUnaryMultiCallable` object is called.
  453. """
  454. _request: RequestType
  455. _invocation_task: asyncio.Task
  456. # pylint: disable=too-many-arguments
  457. def __init__(
  458. self,
  459. request: RequestType,
  460. deadline: Optional[float],
  461. metadata: Metadata,
  462. credentials: Optional[grpc.CallCredentials],
  463. wait_for_ready: Optional[bool],
  464. channel: cygrpc.AioChannel,
  465. method: bytes,
  466. request_serializer: SerializingFunction,
  467. response_deserializer: DeserializingFunction,
  468. loop: asyncio.AbstractEventLoop,
  469. ) -> None:
  470. super().__init__(
  471. channel.call(method, deadline, credentials, wait_for_ready),
  472. metadata,
  473. request_serializer,
  474. response_deserializer,
  475. loop,
  476. )
  477. self._request = request
  478. self._context = cygrpc.build_census_context()
  479. self._invocation_task = loop.create_task(self._invoke())
  480. self._init_unary_response_mixin(self._invocation_task)
  481. async def _invoke(self) -> ResponseType:
  482. serialized_request = _common.serialize(
  483. self._request, self._request_serializer
  484. )
  485. # NOTE(lidiz) asyncio.CancelledError is not a good transport for status,
  486. # because the asyncio.Task class do not cache the exception object.
  487. # https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785
  488. try:
  489. serialized_response = await self._cython_call.unary_unary(
  490. serialized_request, self._metadata, self._context
  491. )
  492. except asyncio.CancelledError:
  493. if not self.cancelled():
  494. self.cancel()
  495. if self._cython_call.is_ok():
  496. return _common.deserialize(
  497. serialized_response, self._response_deserializer
  498. )
  499. else:
  500. return cygrpc.EOF
  501. async def wait_for_connection(self) -> None:
  502. await self._invocation_task
  503. if self.done():
  504. await self._raise_for_status()
  505. class UnaryStreamCall(_StreamResponseMixin, Call, _base_call.UnaryStreamCall):
  506. """Object for managing unary-stream RPC calls.
  507. Returned when an instance of `UnaryStreamMultiCallable` object is called.
  508. """
  509. _request: RequestType
  510. _send_unary_request_task: asyncio.Task
  511. # pylint: disable=too-many-arguments
  512. def __init__(
  513. self,
  514. request: RequestType,
  515. deadline: Optional[float],
  516. metadata: Metadata,
  517. credentials: Optional[grpc.CallCredentials],
  518. wait_for_ready: Optional[bool],
  519. channel: cygrpc.AioChannel,
  520. method: bytes,
  521. request_serializer: SerializingFunction,
  522. response_deserializer: DeserializingFunction,
  523. loop: asyncio.AbstractEventLoop,
  524. ) -> None:
  525. super().__init__(
  526. channel.call(method, deadline, credentials, wait_for_ready),
  527. metadata,
  528. request_serializer,
  529. response_deserializer,
  530. loop,
  531. )
  532. self._request = request
  533. self._context = cygrpc.build_census_context()
  534. self._send_unary_request_task = loop.create_task(
  535. self._send_unary_request()
  536. )
  537. self._init_stream_response_mixin(self._send_unary_request_task)
  538. async def _send_unary_request(self) -> ResponseType:
  539. serialized_request = _common.serialize(
  540. self._request, self._request_serializer
  541. )
  542. try:
  543. await self._cython_call.initiate_unary_stream(
  544. serialized_request, self._metadata, self._context
  545. )
  546. except asyncio.CancelledError:
  547. if not self.cancelled():
  548. self.cancel()
  549. raise
  550. async def wait_for_connection(self) -> None:
  551. await self._send_unary_request_task
  552. if self.done():
  553. await self._raise_for_status()
  554. # pylint: disable=too-many-ancestors
  555. class StreamUnaryCall(
  556. _StreamRequestMixin, _UnaryResponseMixin, Call, _base_call.StreamUnaryCall
  557. ):
  558. """Object for managing stream-unary RPC calls.
  559. Returned when an instance of `StreamUnaryMultiCallable` object is called.
  560. """
  561. # pylint: disable=too-many-arguments
  562. def __init__(
  563. self,
  564. request_iterator: Optional[RequestIterableType],
  565. deadline: Optional[float],
  566. metadata: Metadata,
  567. credentials: Optional[grpc.CallCredentials],
  568. wait_for_ready: Optional[bool],
  569. channel: cygrpc.AioChannel,
  570. method: bytes,
  571. request_serializer: SerializingFunction,
  572. response_deserializer: DeserializingFunction,
  573. loop: asyncio.AbstractEventLoop,
  574. ) -> None:
  575. super().__init__(
  576. channel.call(method, deadline, credentials, wait_for_ready),
  577. metadata,
  578. request_serializer,
  579. response_deserializer,
  580. loop,
  581. )
  582. self._context = cygrpc.build_census_context()
  583. self._init_stream_request_mixin(request_iterator)
  584. self._init_unary_response_mixin(loop.create_task(self._conduct_rpc()))
  585. async def _conduct_rpc(self) -> ResponseType:
  586. try:
  587. serialized_response = await self._cython_call.stream_unary(
  588. self._metadata, self._metadata_sent_observer, self._context
  589. )
  590. except asyncio.CancelledError:
  591. if not self.cancelled():
  592. self.cancel()
  593. raise
  594. if self._cython_call.is_ok():
  595. return _common.deserialize(
  596. serialized_response, self._response_deserializer
  597. )
  598. else:
  599. return cygrpc.EOF
  600. class StreamStreamCall(
  601. _StreamRequestMixin, _StreamResponseMixin, Call, _base_call.StreamStreamCall
  602. ):
  603. """Object for managing stream-stream RPC calls.
  604. Returned when an instance of `StreamStreamMultiCallable` object is called.
  605. """
  606. _initializer: asyncio.Task
  607. # pylint: disable=too-many-arguments
  608. def __init__(
  609. self,
  610. request_iterator: Optional[RequestIterableType],
  611. deadline: Optional[float],
  612. metadata: Metadata,
  613. credentials: Optional[grpc.CallCredentials],
  614. wait_for_ready: Optional[bool],
  615. channel: cygrpc.AioChannel,
  616. method: bytes,
  617. request_serializer: SerializingFunction,
  618. response_deserializer: DeserializingFunction,
  619. loop: asyncio.AbstractEventLoop,
  620. ) -> None:
  621. super().__init__(
  622. channel.call(method, deadline, credentials, wait_for_ready),
  623. metadata,
  624. request_serializer,
  625. response_deserializer,
  626. loop,
  627. )
  628. self._context = cygrpc.build_census_context()
  629. self._initializer = self._loop.create_task(self._prepare_rpc())
  630. self._init_stream_request_mixin(request_iterator)
  631. self._init_stream_response_mixin(self._initializer)
  632. async def _prepare_rpc(self):
  633. """This method prepares the RPC for receiving/sending messages.
  634. All other operations around the stream should only happen after the
  635. completion of this method.
  636. """
  637. try:
  638. await self._cython_call.initiate_stream_stream(
  639. self._metadata, self._metadata_sent_observer, self._context
  640. )
  641. except asyncio.CancelledError:
  642. if not self.cancelled():
  643. self.cancel()
  644. # No need to raise RpcError here, because no one will `await` this task.