_simple_stubs.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588
  1. # Copyright 2020 The gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Functions that obviate explicit stubs and explicit channels."""
  15. import collections
  16. import datetime
  17. import logging
  18. import os
  19. import threading
  20. from typing import (
  21. Any,
  22. AnyStr,
  23. Callable,
  24. Dict,
  25. Iterator,
  26. Optional,
  27. Sequence,
  28. Tuple,
  29. TypeVar,
  30. Union,
  31. )
  32. import grpc
  33. from grpc.experimental import experimental_api
  34. RequestType = TypeVar("RequestType")
  35. ResponseType = TypeVar("ResponseType")
  36. OptionsType = Sequence[Tuple[str, str]]
  37. CacheKey = Tuple[
  38. str,
  39. OptionsType,
  40. Optional[grpc.ChannelCredentials],
  41. Optional[grpc.Compression],
  42. ]
  43. _LOGGER = logging.getLogger(__name__)
  44. _EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"
  45. if _EVICTION_PERIOD_KEY in os.environ:
  46. _EVICTION_PERIOD = datetime.timedelta(
  47. seconds=float(os.environ[_EVICTION_PERIOD_KEY])
  48. )
  49. _LOGGER.debug(
  50. "Setting managed channel eviction period to %s", _EVICTION_PERIOD
  51. )
  52. else:
  53. _EVICTION_PERIOD = datetime.timedelta(minutes=10)
  54. _MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"
  55. if _MAXIMUM_CHANNELS_KEY in os.environ:
  56. _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY])
  57. _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS)
  58. else:
  59. _MAXIMUM_CHANNELS = 2**8
  60. _DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS"
  61. if _DEFAULT_TIMEOUT_KEY in os.environ:
  62. _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY])
  63. _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT)
  64. else:
  65. _DEFAULT_TIMEOUT = 60.0
  66. def _create_channel(
  67. target: str,
  68. options: Sequence[Tuple[str, str]],
  69. channel_credentials: Optional[grpc.ChannelCredentials],
  70. compression: Optional[grpc.Compression],
  71. ) -> grpc.Channel:
  72. _LOGGER.debug(
  73. f"Creating secure channel with credentials '{channel_credentials}', "
  74. + f"options '{options}' and compression '{compression}'"
  75. )
  76. return grpc.secure_channel(
  77. target,
  78. credentials=channel_credentials,
  79. options=options,
  80. compression=compression,
  81. )
  82. class ChannelCache:
  83. # NOTE(rbellevi): Untyped due to reference cycle.
  84. _singleton = None
  85. _lock: threading.RLock = threading.RLock()
  86. _condition: threading.Condition = threading.Condition(lock=_lock)
  87. _eviction_ready: threading.Event = threading.Event()
  88. _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]]
  89. _eviction_thread: threading.Thread
  90. def __init__(self):
  91. self._mapping = collections.OrderedDict()
  92. self._eviction_thread = threading.Thread(
  93. target=ChannelCache._perform_evictions, daemon=True
  94. )
  95. self._eviction_thread.start()
  96. @staticmethod
  97. def get():
  98. with ChannelCache._lock:
  99. if ChannelCache._singleton is None:
  100. ChannelCache._singleton = ChannelCache()
  101. ChannelCache._eviction_ready.wait()
  102. return ChannelCache._singleton
  103. def _evict_locked(self, key: CacheKey):
  104. channel, _ = self._mapping.pop(key)
  105. _LOGGER.debug(
  106. "Evicting channel %s with configuration %s.", channel, key
  107. )
  108. channel.close()
  109. del channel
  110. @staticmethod
  111. def _perform_evictions():
  112. while True:
  113. with ChannelCache._lock:
  114. ChannelCache._eviction_ready.set()
  115. if not ChannelCache._singleton._mapping:
  116. ChannelCache._condition.wait()
  117. elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS:
  118. key = next(iter(ChannelCache._singleton._mapping.keys()))
  119. ChannelCache._singleton._evict_locked(key)
  120. # And immediately reevaluate.
  121. else:
  122. key, (_, eviction_time) = next(
  123. iter(ChannelCache._singleton._mapping.items())
  124. )
  125. now = datetime.datetime.now()
  126. if eviction_time <= now:
  127. ChannelCache._singleton._evict_locked(key)
  128. continue
  129. else:
  130. time_to_eviction = (eviction_time - now).total_seconds()
  131. # NOTE: We aim to *eventually* coalesce to a state in
  132. # which no overdue channels are in the cache and the
  133. # length of the cache is longer than _MAXIMUM_CHANNELS.
  134. # We tolerate momentary states in which these two
  135. # criteria are not met.
  136. ChannelCache._condition.wait(timeout=time_to_eviction)
  137. def get_channel(
  138. self,
  139. target: str,
  140. options: Sequence[Tuple[str, str]],
  141. channel_credentials: Optional[grpc.ChannelCredentials],
  142. insecure: bool,
  143. compression: Optional[grpc.Compression],
  144. method: str,
  145. _registered_method: bool,
  146. ) -> Tuple[grpc.Channel, Optional[int]]:
  147. """Get a channel from cache or creates a new channel.
  148. This method also takes care of register method for channel,
  149. which means we'll register a new call handle if we're calling a
  150. non-registered method for an existing channel.
  151. Returns:
  152. A tuple with two items. The first item is the channel, second item is
  153. the call handle if the method is registered, None if it's not registered.
  154. """
  155. if insecure and channel_credentials:
  156. raise ValueError(
  157. "The insecure option is mutually exclusive with "
  158. + "the channel_credentials option. Please use one "
  159. + "or the other."
  160. )
  161. if insecure:
  162. channel_credentials = (
  163. grpc.experimental.insecure_channel_credentials()
  164. )
  165. elif channel_credentials is None:
  166. _LOGGER.debug("Defaulting to SSL channel credentials.")
  167. channel_credentials = grpc.ssl_channel_credentials()
  168. key = (target, options, channel_credentials, compression)
  169. with self._lock:
  170. channel_data = self._mapping.get(key, None)
  171. call_handle = None
  172. if channel_data is not None:
  173. channel = channel_data[0]
  174. # Register a new call handle if we're calling a registered method for an
  175. # existing channel and this method is not registered.
  176. if _registered_method:
  177. call_handle = channel._get_registered_call_handle(method)
  178. self._mapping.pop(key)
  179. self._mapping[key] = (
  180. channel,
  181. datetime.datetime.now() + _EVICTION_PERIOD,
  182. )
  183. return channel, call_handle
  184. else:
  185. channel = _create_channel(
  186. target, options, channel_credentials, compression
  187. )
  188. if _registered_method:
  189. call_handle = channel._get_registered_call_handle(method)
  190. self._mapping[key] = (
  191. channel,
  192. datetime.datetime.now() + _EVICTION_PERIOD,
  193. )
  194. if (
  195. len(self._mapping) == 1
  196. or len(self._mapping) >= _MAXIMUM_CHANNELS
  197. ):
  198. self._condition.notify()
  199. return channel, call_handle
  200. def _test_only_channel_count(self) -> int:
  201. with self._lock:
  202. return len(self._mapping)
  203. @experimental_api
  204. # pylint: disable=too-many-locals
  205. def unary_unary(
  206. request: RequestType,
  207. target: str,
  208. method: str,
  209. request_serializer: Optional[Callable[[Any], bytes]] = None,
  210. response_deserializer: Optional[Callable[[bytes], Any]] = None,
  211. options: Sequence[Tuple[AnyStr, AnyStr]] = (),
  212. channel_credentials: Optional[grpc.ChannelCredentials] = None,
  213. insecure: bool = False,
  214. call_credentials: Optional[grpc.CallCredentials] = None,
  215. compression: Optional[grpc.Compression] = None,
  216. wait_for_ready: Optional[bool] = None,
  217. timeout: Optional[float] = _DEFAULT_TIMEOUT,
  218. metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
  219. _registered_method: Optional[bool] = False,
  220. ) -> ResponseType:
  221. """Invokes a unary-unary RPC without an explicitly specified channel.
  222. THIS IS AN EXPERIMENTAL API.
  223. This is backed by a per-process cache of channels. Channels are evicted
  224. from the cache after a fixed period by a background. Channels will also be
  225. evicted if more than a configured maximum accumulate.
  226. The default eviction period is 10 minutes. One may set the environment
  227. variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
  228. The default maximum number of channels is 256. One may set the
  229. environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
  230. this.
  231. Args:
  232. request: An iterator that yields request values for the RPC.
  233. target: The server address.
  234. method: The name of the RPC method.
  235. request_serializer: Optional :term:`serializer` for serializing the request
  236. message. Request goes unserialized in case None is passed.
  237. response_deserializer: Optional :term:`deserializer` for deserializing the response
  238. message. Response goes undeserialized in case None is passed.
  239. options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
  240. runtime) to configure the channel.
  241. channel_credentials: A credential applied to the whole channel, e.g. the
  242. return value of grpc.ssl_channel_credentials() or
  243. grpc.insecure_channel_credentials().
  244. insecure: If True, specifies channel_credentials as
  245. :term:`grpc.insecure_channel_credentials()`. This option is mutually
  246. exclusive with the `channel_credentials` option.
  247. call_credentials: A call credential applied to each call individually,
  248. e.g. the output of grpc.metadata_call_credentials() or
  249. grpc.access_token_call_credentials().
  250. compression: An optional value indicating the compression method to be
  251. used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
  252. wait_for_ready: An optional flag indicating whether the RPC should fail
  253. immediately if the connection is not ready at the time the RPC is
  254. invoked, or if it should wait until the connection to the server
  255. becomes ready. When using this option, the user will likely also want
  256. to set a timeout. Defaults to True.
  257. timeout: An optional duration of time in seconds to allow for the RPC,
  258. after which an exception will be raised. If timeout is unspecified,
  259. defaults to a timeout controlled by the
  260. GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
  261. unset, defaults to 60 seconds. Supply a value of None to indicate that
  262. no timeout should be enforced.
  263. metadata: Optional metadata to send to the server.
  264. Returns:
  265. The response to the RPC.
  266. """
  267. channel, method_handle = ChannelCache.get().get_channel(
  268. target,
  269. options,
  270. channel_credentials,
  271. insecure,
  272. compression,
  273. method,
  274. _registered_method,
  275. )
  276. multicallable = channel.unary_unary(
  277. method, request_serializer, response_deserializer, method_handle
  278. )
  279. wait_for_ready = wait_for_ready if wait_for_ready is not None else True
  280. return multicallable(
  281. request,
  282. metadata=metadata,
  283. wait_for_ready=wait_for_ready,
  284. credentials=call_credentials,
  285. timeout=timeout,
  286. )
  287. @experimental_api
  288. # pylint: disable=too-many-locals
  289. def unary_stream(
  290. request: RequestType,
  291. target: str,
  292. method: str,
  293. request_serializer: Optional[Callable[[Any], bytes]] = None,
  294. response_deserializer: Optional[Callable[[bytes], Any]] = None,
  295. options: Sequence[Tuple[AnyStr, AnyStr]] = (),
  296. channel_credentials: Optional[grpc.ChannelCredentials] = None,
  297. insecure: bool = False,
  298. call_credentials: Optional[grpc.CallCredentials] = None,
  299. compression: Optional[grpc.Compression] = None,
  300. wait_for_ready: Optional[bool] = None,
  301. timeout: Optional[float] = _DEFAULT_TIMEOUT,
  302. metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
  303. _registered_method: Optional[bool] = False,
  304. ) -> Iterator[ResponseType]:
  305. """Invokes a unary-stream RPC without an explicitly specified channel.
  306. THIS IS AN EXPERIMENTAL API.
  307. This is backed by a per-process cache of channels. Channels are evicted
  308. from the cache after a fixed period by a background. Channels will also be
  309. evicted if more than a configured maximum accumulate.
  310. The default eviction period is 10 minutes. One may set the environment
  311. variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
  312. The default maximum number of channels is 256. One may set the
  313. environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
  314. this.
  315. Args:
  316. request: An iterator that yields request values for the RPC.
  317. target: The server address.
  318. method: The name of the RPC method.
  319. request_serializer: Optional :term:`serializer` for serializing the request
  320. message. Request goes unserialized in case None is passed.
  321. response_deserializer: Optional :term:`deserializer` for deserializing the response
  322. message. Response goes undeserialized in case None is passed.
  323. options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
  324. runtime) to configure the channel.
  325. channel_credentials: A credential applied to the whole channel, e.g. the
  326. return value of grpc.ssl_channel_credentials().
  327. insecure: If True, specifies channel_credentials as
  328. :term:`grpc.insecure_channel_credentials()`. This option is mutually
  329. exclusive with the `channel_credentials` option.
  330. call_credentials: A call credential applied to each call individually,
  331. e.g. the output of grpc.metadata_call_credentials() or
  332. grpc.access_token_call_credentials().
  333. compression: An optional value indicating the compression method to be
  334. used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
  335. wait_for_ready: An optional flag indicating whether the RPC should fail
  336. immediately if the connection is not ready at the time the RPC is
  337. invoked, or if it should wait until the connection to the server
  338. becomes ready. When using this option, the user will likely also want
  339. to set a timeout. Defaults to True.
  340. timeout: An optional duration of time in seconds to allow for the RPC,
  341. after which an exception will be raised. If timeout is unspecified,
  342. defaults to a timeout controlled by the
  343. GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
  344. unset, defaults to 60 seconds. Supply a value of None to indicate that
  345. no timeout should be enforced.
  346. metadata: Optional metadata to send to the server.
  347. Returns:
  348. An iterator of responses.
  349. """
  350. channel, method_handle = ChannelCache.get().get_channel(
  351. target,
  352. options,
  353. channel_credentials,
  354. insecure,
  355. compression,
  356. method,
  357. _registered_method,
  358. )
  359. multicallable = channel.unary_stream(
  360. method, request_serializer, response_deserializer, method_handle
  361. )
  362. wait_for_ready = wait_for_ready if wait_for_ready is not None else True
  363. return multicallable(
  364. request,
  365. metadata=metadata,
  366. wait_for_ready=wait_for_ready,
  367. credentials=call_credentials,
  368. timeout=timeout,
  369. )
  370. @experimental_api
  371. # pylint: disable=too-many-locals
  372. def stream_unary(
  373. request_iterator: Iterator[RequestType],
  374. target: str,
  375. method: str,
  376. request_serializer: Optional[Callable[[Any], bytes]] = None,
  377. response_deserializer: Optional[Callable[[bytes], Any]] = None,
  378. options: Sequence[Tuple[AnyStr, AnyStr]] = (),
  379. channel_credentials: Optional[grpc.ChannelCredentials] = None,
  380. insecure: bool = False,
  381. call_credentials: Optional[grpc.CallCredentials] = None,
  382. compression: Optional[grpc.Compression] = None,
  383. wait_for_ready: Optional[bool] = None,
  384. timeout: Optional[float] = _DEFAULT_TIMEOUT,
  385. metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
  386. _registered_method: Optional[bool] = False,
  387. ) -> ResponseType:
  388. """Invokes a stream-unary RPC without an explicitly specified channel.
  389. THIS IS AN EXPERIMENTAL API.
  390. This is backed by a per-process cache of channels. Channels are evicted
  391. from the cache after a fixed period by a background. Channels will also be
  392. evicted if more than a configured maximum accumulate.
  393. The default eviction period is 10 minutes. One may set the environment
  394. variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
  395. The default maximum number of channels is 256. One may set the
  396. environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
  397. this.
  398. Args:
  399. request_iterator: An iterator that yields request values for the RPC.
  400. target: The server address.
  401. method: The name of the RPC method.
  402. request_serializer: Optional :term:`serializer` for serializing the request
  403. message. Request goes unserialized in case None is passed.
  404. response_deserializer: Optional :term:`deserializer` for deserializing the response
  405. message. Response goes undeserialized in case None is passed.
  406. options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
  407. runtime) to configure the channel.
  408. channel_credentials: A credential applied to the whole channel, e.g. the
  409. return value of grpc.ssl_channel_credentials().
  410. call_credentials: A call credential applied to each call individually,
  411. e.g. the output of grpc.metadata_call_credentials() or
  412. grpc.access_token_call_credentials().
  413. insecure: If True, specifies channel_credentials as
  414. :term:`grpc.insecure_channel_credentials()`. This option is mutually
  415. exclusive with the `channel_credentials` option.
  416. compression: An optional value indicating the compression method to be
  417. used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
  418. wait_for_ready: An optional flag indicating whether the RPC should fail
  419. immediately if the connection is not ready at the time the RPC is
  420. invoked, or if it should wait until the connection to the server
  421. becomes ready. When using this option, the user will likely also want
  422. to set a timeout. Defaults to True.
  423. timeout: An optional duration of time in seconds to allow for the RPC,
  424. after which an exception will be raised. If timeout is unspecified,
  425. defaults to a timeout controlled by the
  426. GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
  427. unset, defaults to 60 seconds. Supply a value of None to indicate that
  428. no timeout should be enforced.
  429. metadata: Optional metadata to send to the server.
  430. Returns:
  431. The response to the RPC.
  432. """
  433. channel, method_handle = ChannelCache.get().get_channel(
  434. target,
  435. options,
  436. channel_credentials,
  437. insecure,
  438. compression,
  439. method,
  440. _registered_method,
  441. )
  442. multicallable = channel.stream_unary(
  443. method, request_serializer, response_deserializer, method_handle
  444. )
  445. wait_for_ready = wait_for_ready if wait_for_ready is not None else True
  446. return multicallable(
  447. request_iterator,
  448. metadata=metadata,
  449. wait_for_ready=wait_for_ready,
  450. credentials=call_credentials,
  451. timeout=timeout,
  452. )
  453. @experimental_api
  454. # pylint: disable=too-many-locals
  455. def stream_stream(
  456. request_iterator: Iterator[RequestType],
  457. target: str,
  458. method: str,
  459. request_serializer: Optional[Callable[[Any], bytes]] = None,
  460. response_deserializer: Optional[Callable[[bytes], Any]] = None,
  461. options: Sequence[Tuple[AnyStr, AnyStr]] = (),
  462. channel_credentials: Optional[grpc.ChannelCredentials] = None,
  463. insecure: bool = False,
  464. call_credentials: Optional[grpc.CallCredentials] = None,
  465. compression: Optional[grpc.Compression] = None,
  466. wait_for_ready: Optional[bool] = None,
  467. timeout: Optional[float] = _DEFAULT_TIMEOUT,
  468. metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
  469. _registered_method: Optional[bool] = False,
  470. ) -> Iterator[ResponseType]:
  471. """Invokes a stream-stream RPC without an explicitly specified channel.
  472. THIS IS AN EXPERIMENTAL API.
  473. This is backed by a per-process cache of channels. Channels are evicted
  474. from the cache after a fixed period by a background. Channels will also be
  475. evicted if more than a configured maximum accumulate.
  476. The default eviction period is 10 minutes. One may set the environment
  477. variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
  478. The default maximum number of channels is 256. One may set the
  479. environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
  480. this.
  481. Args:
  482. request_iterator: An iterator that yields request values for the RPC.
  483. target: The server address.
  484. method: The name of the RPC method.
  485. request_serializer: Optional :term:`serializer` for serializing the request
  486. message. Request goes unserialized in case None is passed.
  487. response_deserializer: Optional :term:`deserializer` for deserializing the response
  488. message. Response goes undeserialized in case None is passed.
  489. options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
  490. runtime) to configure the channel.
  491. channel_credentials: A credential applied to the whole channel, e.g. the
  492. return value of grpc.ssl_channel_credentials().
  493. call_credentials: A call credential applied to each call individually,
  494. e.g. the output of grpc.metadata_call_credentials() or
  495. grpc.access_token_call_credentials().
  496. insecure: If True, specifies channel_credentials as
  497. :term:`grpc.insecure_channel_credentials()`. This option is mutually
  498. exclusive with the `channel_credentials` option.
  499. compression: An optional value indicating the compression method to be
  500. used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
  501. wait_for_ready: An optional flag indicating whether the RPC should fail
  502. immediately if the connection is not ready at the time the RPC is
  503. invoked, or if it should wait until the connection to the server
  504. becomes ready. When using this option, the user will likely also want
  505. to set a timeout. Defaults to True.
  506. timeout: An optional duration of time in seconds to allow for the RPC,
  507. after which an exception will be raised. If timeout is unspecified,
  508. defaults to a timeout controlled by the
  509. GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
  510. unset, defaults to 60 seconds. Supply a value of None to indicate that
  511. no timeout should be enforced.
  512. metadata: Optional metadata to send to the server.
  513. Returns:
  514. An iterator of responses.
  515. """
  516. channel, method_handle = ChannelCache.get().get_channel(
  517. target,
  518. options,
  519. channel_credentials,
  520. insecure,
  521. compression,
  522. method,
  523. _registered_method,
  524. )
  525. multicallable = channel.stream_stream(
  526. method, request_serializer, response_deserializer, method_handle
  527. )
  528. wait_for_ready = wait_for_ready if wait_for_ready is not None else True
  529. return multicallable(
  530. request_iterator,
  531. metadata=metadata,
  532. wait_for_ready=wait_for_ready,
  533. credentials=call_credentials,
  534. timeout=timeout,
  535. )