_interceptor.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813
  1. # Copyright 2017 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Implementation of gRPC Python interceptors."""
  15. import collections
  16. import sys
  17. import types
  18. from typing import Any, Callable, Optional, Sequence, Tuple, Union
  19. import grpc
  20. from ._typing import DeserializingFunction
  21. from ._typing import DoneCallbackType
  22. from ._typing import MetadataType
  23. from ._typing import RequestIterableType
  24. from ._typing import SerializingFunction
  25. class _ServicePipeline(object):
  26. interceptors: Tuple[grpc.ServerInterceptor]
  27. def __init__(self, interceptors: Sequence[grpc.ServerInterceptor]):
  28. self.interceptors = tuple(interceptors)
  29. def _continuation(self, thunk: Callable, index: int) -> Callable:
  30. return lambda context: self._intercept_at(thunk, index, context)
  31. def _intercept_at(
  32. self, thunk: Callable, index: int, context: grpc.HandlerCallDetails
  33. ) -> grpc.RpcMethodHandler:
  34. if index < len(self.interceptors):
  35. interceptor = self.interceptors[index]
  36. thunk = self._continuation(thunk, index + 1)
  37. return interceptor.intercept_service(thunk, context)
  38. else:
  39. return thunk(context)
  40. def execute(
  41. self, thunk: Callable, context: grpc.HandlerCallDetails
  42. ) -> grpc.RpcMethodHandler:
  43. return self._intercept_at(thunk, 0, context)
  44. def service_pipeline(
  45. interceptors: Optional[Sequence[grpc.ServerInterceptor]],
  46. ) -> Optional[_ServicePipeline]:
  47. return _ServicePipeline(interceptors) if interceptors else None
  48. class _ClientCallDetails(
  49. collections.namedtuple(
  50. "_ClientCallDetails",
  51. (
  52. "method",
  53. "timeout",
  54. "metadata",
  55. "credentials",
  56. "wait_for_ready",
  57. "compression",
  58. ),
  59. ),
  60. grpc.ClientCallDetails,
  61. ):
  62. pass
  63. def _unwrap_client_call_details(
  64. call_details: grpc.ClientCallDetails,
  65. default_details: grpc.ClientCallDetails,
  66. ) -> Tuple[
  67. str, float, MetadataType, grpc.CallCredentials, bool, grpc.Compression
  68. ]:
  69. try:
  70. method = call_details.method # pytype: disable=attribute-error
  71. except AttributeError:
  72. method = default_details.method # pytype: disable=attribute-error
  73. try:
  74. timeout = call_details.timeout # pytype: disable=attribute-error
  75. except AttributeError:
  76. timeout = default_details.timeout # pytype: disable=attribute-error
  77. try:
  78. metadata = call_details.metadata # pytype: disable=attribute-error
  79. except AttributeError:
  80. metadata = default_details.metadata # pytype: disable=attribute-error
  81. try:
  82. credentials = (
  83. call_details.credentials
  84. ) # pytype: disable=attribute-error
  85. except AttributeError:
  86. credentials = (
  87. default_details.credentials
  88. ) # pytype: disable=attribute-error
  89. try:
  90. wait_for_ready = (
  91. call_details.wait_for_ready
  92. ) # pytype: disable=attribute-error
  93. except AttributeError:
  94. wait_for_ready = (
  95. default_details.wait_for_ready
  96. ) # pytype: disable=attribute-error
  97. try:
  98. compression = (
  99. call_details.compression
  100. ) # pytype: disable=attribute-error
  101. except AttributeError:
  102. compression = (
  103. default_details.compression
  104. ) # pytype: disable=attribute-error
  105. return method, timeout, metadata, credentials, wait_for_ready, compression
  106. class _FailureOutcome(
  107. grpc.RpcError, grpc.Future, grpc.Call
  108. ): # pylint: disable=too-many-ancestors
  109. _exception: Exception
  110. _traceback: types.TracebackType
  111. def __init__(self, exception: Exception, traceback: types.TracebackType):
  112. super(_FailureOutcome, self).__init__()
  113. self._exception = exception
  114. self._traceback = traceback
  115. def initial_metadata(self) -> Optional[MetadataType]:
  116. return None
  117. def trailing_metadata(self) -> Optional[MetadataType]:
  118. return None
  119. def code(self) -> Optional[grpc.StatusCode]:
  120. return grpc.StatusCode.INTERNAL
  121. def details(self) -> Optional[str]:
  122. return "Exception raised while intercepting the RPC"
  123. def cancel(self) -> bool:
  124. return False
  125. def cancelled(self) -> bool:
  126. return False
  127. def is_active(self) -> bool:
  128. return False
  129. def time_remaining(self) -> Optional[float]:
  130. return None
  131. def running(self) -> bool:
  132. return False
  133. def done(self) -> bool:
  134. return True
  135. def result(self, ignored_timeout: Optional[float] = None):
  136. raise self._exception
  137. def exception(
  138. self, ignored_timeout: Optional[float] = None
  139. ) -> Optional[Exception]:
  140. return self._exception
  141. def traceback(
  142. self, ignored_timeout: Optional[float] = None
  143. ) -> Optional[types.TracebackType]:
  144. return self._traceback
  145. def add_callback(self, unused_callback) -> bool:
  146. return False
  147. def add_done_callback(self, fn: DoneCallbackType) -> None:
  148. fn(self)
  149. def __iter__(self):
  150. return self
  151. def __next__(self):
  152. raise self._exception
  153. def next(self):
  154. return self.__next__()
  155. class _UnaryOutcome(grpc.Call, grpc.Future):
  156. _response: Any
  157. _call: grpc.Call
  158. def __init__(self, response: Any, call: grpc.Call):
  159. self._response = response
  160. self._call = call
  161. def initial_metadata(self) -> Optional[MetadataType]:
  162. return self._call.initial_metadata()
  163. def trailing_metadata(self) -> Optional[MetadataType]:
  164. return self._call.trailing_metadata()
  165. def code(self) -> Optional[grpc.StatusCode]:
  166. return self._call.code()
  167. def details(self) -> Optional[str]:
  168. return self._call.details()
  169. def is_active(self) -> bool:
  170. return self._call.is_active()
  171. def time_remaining(self) -> Optional[float]:
  172. return self._call.time_remaining()
  173. def cancel(self) -> bool:
  174. return self._call.cancel()
  175. def add_callback(self, callback) -> bool:
  176. return self._call.add_callback(callback)
  177. def cancelled(self) -> bool:
  178. return False
  179. def running(self) -> bool:
  180. return False
  181. def done(self) -> bool:
  182. return True
  183. def result(self, ignored_timeout: Optional[float] = None):
  184. return self._response
  185. def exception(self, ignored_timeout: Optional[float] = None):
  186. return None
  187. def traceback(self, ignored_timeout: Optional[float] = None):
  188. return None
  189. def add_done_callback(self, fn: DoneCallbackType) -> None:
  190. fn(self)
  191. class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
  192. _thunk: Callable
  193. _method: str
  194. _interceptor: grpc.UnaryUnaryClientInterceptor
  195. def __init__(
  196. self,
  197. thunk: Callable,
  198. method: str,
  199. interceptor: grpc.UnaryUnaryClientInterceptor,
  200. ):
  201. self._thunk = thunk
  202. self._method = method
  203. self._interceptor = interceptor
  204. def __call__(
  205. self,
  206. request: Any,
  207. timeout: Optional[float] = None,
  208. metadata: Optional[MetadataType] = None,
  209. credentials: Optional[grpc.CallCredentials] = None,
  210. wait_for_ready: Optional[bool] = None,
  211. compression: Optional[grpc.Compression] = None,
  212. ) -> Any:
  213. response, ignored_call = self._with_call(
  214. request,
  215. timeout=timeout,
  216. metadata=metadata,
  217. credentials=credentials,
  218. wait_for_ready=wait_for_ready,
  219. compression=compression,
  220. )
  221. return response
  222. def _with_call(
  223. self,
  224. request: Any,
  225. timeout: Optional[float] = None,
  226. metadata: Optional[MetadataType] = None,
  227. credentials: Optional[grpc.CallCredentials] = None,
  228. wait_for_ready: Optional[bool] = None,
  229. compression: Optional[grpc.Compression] = None,
  230. ) -> Tuple[Any, grpc.Call]:
  231. client_call_details = _ClientCallDetails(
  232. self._method,
  233. timeout,
  234. metadata,
  235. credentials,
  236. wait_for_ready,
  237. compression,
  238. )
  239. def continuation(new_details, request):
  240. (
  241. new_method,
  242. new_timeout,
  243. new_metadata,
  244. new_credentials,
  245. new_wait_for_ready,
  246. new_compression,
  247. ) = _unwrap_client_call_details(new_details, client_call_details)
  248. try:
  249. response, call = self._thunk(new_method).with_call(
  250. request,
  251. timeout=new_timeout,
  252. metadata=new_metadata,
  253. credentials=new_credentials,
  254. wait_for_ready=new_wait_for_ready,
  255. compression=new_compression,
  256. )
  257. return _UnaryOutcome(response, call)
  258. except grpc.RpcError as rpc_error:
  259. return rpc_error
  260. except Exception as exception: # pylint:disable=broad-except
  261. return _FailureOutcome(exception, sys.exc_info()[2])
  262. call = self._interceptor.intercept_unary_unary(
  263. continuation, client_call_details, request
  264. )
  265. return call.result(), call
  266. def with_call(
  267. self,
  268. request: Any,
  269. timeout: Optional[float] = None,
  270. metadata: Optional[MetadataType] = None,
  271. credentials: Optional[grpc.CallCredentials] = None,
  272. wait_for_ready: Optional[bool] = None,
  273. compression: Optional[grpc.Compression] = None,
  274. ) -> Tuple[Any, grpc.Call]:
  275. return self._with_call(
  276. request,
  277. timeout=timeout,
  278. metadata=metadata,
  279. credentials=credentials,
  280. wait_for_ready=wait_for_ready,
  281. compression=compression,
  282. )
  283. def future(
  284. self,
  285. request: Any,
  286. timeout: Optional[float] = None,
  287. metadata: Optional[MetadataType] = None,
  288. credentials: Optional[grpc.CallCredentials] = None,
  289. wait_for_ready: Optional[bool] = None,
  290. compression: Optional[grpc.Compression] = None,
  291. ) -> Any:
  292. client_call_details = _ClientCallDetails(
  293. self._method,
  294. timeout,
  295. metadata,
  296. credentials,
  297. wait_for_ready,
  298. compression,
  299. )
  300. def continuation(new_details, request):
  301. (
  302. new_method,
  303. new_timeout,
  304. new_metadata,
  305. new_credentials,
  306. new_wait_for_ready,
  307. new_compression,
  308. ) = _unwrap_client_call_details(new_details, client_call_details)
  309. return self._thunk(new_method).future(
  310. request,
  311. timeout=new_timeout,
  312. metadata=new_metadata,
  313. credentials=new_credentials,
  314. wait_for_ready=new_wait_for_ready,
  315. compression=new_compression,
  316. )
  317. try:
  318. return self._interceptor.intercept_unary_unary(
  319. continuation, client_call_details, request
  320. )
  321. except Exception as exception: # pylint:disable=broad-except
  322. return _FailureOutcome(exception, sys.exc_info()[2])
  323. class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
  324. _thunk: Callable
  325. _method: str
  326. _interceptor: grpc.UnaryStreamClientInterceptor
  327. def __init__(
  328. self,
  329. thunk: Callable,
  330. method: str,
  331. interceptor: grpc.UnaryStreamClientInterceptor,
  332. ):
  333. self._thunk = thunk
  334. self._method = method
  335. self._interceptor = interceptor
  336. def __call__(
  337. self,
  338. request: Any,
  339. timeout: Optional[float] = None,
  340. metadata: Optional[MetadataType] = None,
  341. credentials: Optional[grpc.CallCredentials] = None,
  342. wait_for_ready: Optional[bool] = None,
  343. compression: Optional[grpc.Compression] = None,
  344. ):
  345. client_call_details = _ClientCallDetails(
  346. self._method,
  347. timeout,
  348. metadata,
  349. credentials,
  350. wait_for_ready,
  351. compression,
  352. )
  353. def continuation(new_details, request):
  354. (
  355. new_method,
  356. new_timeout,
  357. new_metadata,
  358. new_credentials,
  359. new_wait_for_ready,
  360. new_compression,
  361. ) = _unwrap_client_call_details(new_details, client_call_details)
  362. return self._thunk(new_method)(
  363. request,
  364. timeout=new_timeout,
  365. metadata=new_metadata,
  366. credentials=new_credentials,
  367. wait_for_ready=new_wait_for_ready,
  368. compression=new_compression,
  369. )
  370. try:
  371. return self._interceptor.intercept_unary_stream(
  372. continuation, client_call_details, request
  373. )
  374. except Exception as exception: # pylint:disable=broad-except
  375. return _FailureOutcome(exception, sys.exc_info()[2])
  376. class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
  377. _thunk: Callable
  378. _method: str
  379. _interceptor: grpc.StreamUnaryClientInterceptor
  380. def __init__(
  381. self,
  382. thunk: Callable,
  383. method: str,
  384. interceptor: grpc.StreamUnaryClientInterceptor,
  385. ):
  386. self._thunk = thunk
  387. self._method = method
  388. self._interceptor = interceptor
  389. def __call__(
  390. self,
  391. request_iterator: RequestIterableType,
  392. timeout: Optional[float] = None,
  393. metadata: Optional[MetadataType] = None,
  394. credentials: Optional[grpc.CallCredentials] = None,
  395. wait_for_ready: Optional[bool] = None,
  396. compression: Optional[grpc.Compression] = None,
  397. ) -> Any:
  398. response, ignored_call = self._with_call(
  399. request_iterator,
  400. timeout=timeout,
  401. metadata=metadata,
  402. credentials=credentials,
  403. wait_for_ready=wait_for_ready,
  404. compression=compression,
  405. )
  406. return response
  407. def _with_call(
  408. self,
  409. request_iterator: RequestIterableType,
  410. timeout: Optional[float] = None,
  411. metadata: Optional[MetadataType] = None,
  412. credentials: Optional[grpc.CallCredentials] = None,
  413. wait_for_ready: Optional[bool] = None,
  414. compression: Optional[grpc.Compression] = None,
  415. ) -> Tuple[Any, grpc.Call]:
  416. client_call_details = _ClientCallDetails(
  417. self._method,
  418. timeout,
  419. metadata,
  420. credentials,
  421. wait_for_ready,
  422. compression,
  423. )
  424. def continuation(new_details, request_iterator):
  425. (
  426. new_method,
  427. new_timeout,
  428. new_metadata,
  429. new_credentials,
  430. new_wait_for_ready,
  431. new_compression,
  432. ) = _unwrap_client_call_details(new_details, client_call_details)
  433. try:
  434. response, call = self._thunk(new_method).with_call(
  435. request_iterator,
  436. timeout=new_timeout,
  437. metadata=new_metadata,
  438. credentials=new_credentials,
  439. wait_for_ready=new_wait_for_ready,
  440. compression=new_compression,
  441. )
  442. return _UnaryOutcome(response, call)
  443. except grpc.RpcError as rpc_error:
  444. return rpc_error
  445. except Exception as exception: # pylint:disable=broad-except
  446. return _FailureOutcome(exception, sys.exc_info()[2])
  447. call = self._interceptor.intercept_stream_unary(
  448. continuation, client_call_details, request_iterator
  449. )
  450. return call.result(), call
  451. def with_call(
  452. self,
  453. request_iterator: RequestIterableType,
  454. timeout: Optional[float] = None,
  455. metadata: Optional[MetadataType] = None,
  456. credentials: Optional[grpc.CallCredentials] = None,
  457. wait_for_ready: Optional[bool] = None,
  458. compression: Optional[grpc.Compression] = None,
  459. ) -> Tuple[Any, grpc.Call]:
  460. return self._with_call(
  461. request_iterator,
  462. timeout=timeout,
  463. metadata=metadata,
  464. credentials=credentials,
  465. wait_for_ready=wait_for_ready,
  466. compression=compression,
  467. )
  468. def future(
  469. self,
  470. request_iterator: RequestIterableType,
  471. timeout: Optional[float] = None,
  472. metadata: Optional[MetadataType] = None,
  473. credentials: Optional[grpc.CallCredentials] = None,
  474. wait_for_ready: Optional[bool] = None,
  475. compression: Optional[grpc.Compression] = None,
  476. ) -> Any:
  477. client_call_details = _ClientCallDetails(
  478. self._method,
  479. timeout,
  480. metadata,
  481. credentials,
  482. wait_for_ready,
  483. compression,
  484. )
  485. def continuation(new_details, request_iterator):
  486. (
  487. new_method,
  488. new_timeout,
  489. new_metadata,
  490. new_credentials,
  491. new_wait_for_ready,
  492. new_compression,
  493. ) = _unwrap_client_call_details(new_details, client_call_details)
  494. return self._thunk(new_method).future(
  495. request_iterator,
  496. timeout=new_timeout,
  497. metadata=new_metadata,
  498. credentials=new_credentials,
  499. wait_for_ready=new_wait_for_ready,
  500. compression=new_compression,
  501. )
  502. try:
  503. return self._interceptor.intercept_stream_unary(
  504. continuation, client_call_details, request_iterator
  505. )
  506. except Exception as exception: # pylint:disable=broad-except
  507. return _FailureOutcome(exception, sys.exc_info()[2])
  508. class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
  509. _thunk: Callable
  510. _method: str
  511. _interceptor: grpc.StreamStreamClientInterceptor
  512. def __init__(
  513. self,
  514. thunk: Callable,
  515. method: str,
  516. interceptor: grpc.StreamStreamClientInterceptor,
  517. ):
  518. self._thunk = thunk
  519. self._method = method
  520. self._interceptor = interceptor
  521. def __call__(
  522. self,
  523. request_iterator: RequestIterableType,
  524. timeout: Optional[float] = None,
  525. metadata: Optional[MetadataType] = None,
  526. credentials: Optional[grpc.CallCredentials] = None,
  527. wait_for_ready: Optional[bool] = None,
  528. compression: Optional[grpc.Compression] = None,
  529. ):
  530. client_call_details = _ClientCallDetails(
  531. self._method,
  532. timeout,
  533. metadata,
  534. credentials,
  535. wait_for_ready,
  536. compression,
  537. )
  538. def continuation(new_details, request_iterator):
  539. (
  540. new_method,
  541. new_timeout,
  542. new_metadata,
  543. new_credentials,
  544. new_wait_for_ready,
  545. new_compression,
  546. ) = _unwrap_client_call_details(new_details, client_call_details)
  547. return self._thunk(new_method)(
  548. request_iterator,
  549. timeout=new_timeout,
  550. metadata=new_metadata,
  551. credentials=new_credentials,
  552. wait_for_ready=new_wait_for_ready,
  553. compression=new_compression,
  554. )
  555. try:
  556. return self._interceptor.intercept_stream_stream(
  557. continuation, client_call_details, request_iterator
  558. )
  559. except Exception as exception: # pylint:disable=broad-except
  560. return _FailureOutcome(exception, sys.exc_info()[2])
  561. class _Channel(grpc.Channel):
  562. _channel: grpc.Channel
  563. _interceptor: Union[
  564. grpc.UnaryUnaryClientInterceptor,
  565. grpc.UnaryStreamClientInterceptor,
  566. grpc.StreamStreamClientInterceptor,
  567. grpc.StreamUnaryClientInterceptor,
  568. ]
  569. def __init__(
  570. self,
  571. channel: grpc.Channel,
  572. interceptor: Union[
  573. grpc.UnaryUnaryClientInterceptor,
  574. grpc.UnaryStreamClientInterceptor,
  575. grpc.StreamStreamClientInterceptor,
  576. grpc.StreamUnaryClientInterceptor,
  577. ],
  578. ):
  579. self._channel = channel
  580. self._interceptor = interceptor
  581. def subscribe(
  582. self, callback: Callable, try_to_connect: Optional[bool] = False
  583. ):
  584. self._channel.subscribe(callback, try_to_connect=try_to_connect)
  585. def unsubscribe(self, callback: Callable):
  586. self._channel.unsubscribe(callback)
  587. # pylint: disable=arguments-differ
  588. def unary_unary(
  589. self,
  590. method: str,
  591. request_serializer: Optional[SerializingFunction] = None,
  592. response_deserializer: Optional[DeserializingFunction] = None,
  593. _registered_method: Optional[bool] = False,
  594. ) -> grpc.UnaryUnaryMultiCallable:
  595. # pytype: disable=wrong-arg-count
  596. thunk = lambda m: self._channel.unary_unary(
  597. m,
  598. request_serializer,
  599. response_deserializer,
  600. _registered_method,
  601. )
  602. # pytype: enable=wrong-arg-count
  603. if isinstance(self._interceptor, grpc.UnaryUnaryClientInterceptor):
  604. return _UnaryUnaryMultiCallable(thunk, method, self._interceptor)
  605. else:
  606. return thunk(method)
  607. # pylint: disable=arguments-differ
  608. def unary_stream(
  609. self,
  610. method: str,
  611. request_serializer: Optional[SerializingFunction] = None,
  612. response_deserializer: Optional[DeserializingFunction] = None,
  613. _registered_method: Optional[bool] = False,
  614. ) -> grpc.UnaryStreamMultiCallable:
  615. # pytype: disable=wrong-arg-count
  616. thunk = lambda m: self._channel.unary_stream(
  617. m,
  618. request_serializer,
  619. response_deserializer,
  620. _registered_method,
  621. )
  622. # pytype: enable=wrong-arg-count
  623. if isinstance(self._interceptor, grpc.UnaryStreamClientInterceptor):
  624. return _UnaryStreamMultiCallable(thunk, method, self._interceptor)
  625. else:
  626. return thunk(method)
  627. # pylint: disable=arguments-differ
  628. def stream_unary(
  629. self,
  630. method: str,
  631. request_serializer: Optional[SerializingFunction] = None,
  632. response_deserializer: Optional[DeserializingFunction] = None,
  633. _registered_method: Optional[bool] = False,
  634. ) -> grpc.StreamUnaryMultiCallable:
  635. # pytype: disable=wrong-arg-count
  636. thunk = lambda m: self._channel.stream_unary(
  637. m,
  638. request_serializer,
  639. response_deserializer,
  640. _registered_method,
  641. )
  642. # pytype: enable=wrong-arg-count
  643. if isinstance(self._interceptor, grpc.StreamUnaryClientInterceptor):
  644. return _StreamUnaryMultiCallable(thunk, method, self._interceptor)
  645. else:
  646. return thunk(method)
  647. # pylint: disable=arguments-differ
  648. def stream_stream(
  649. self,
  650. method: str,
  651. request_serializer: Optional[SerializingFunction] = None,
  652. response_deserializer: Optional[DeserializingFunction] = None,
  653. _registered_method: Optional[bool] = False,
  654. ) -> grpc.StreamStreamMultiCallable:
  655. # pytype: disable=wrong-arg-count
  656. thunk = lambda m: self._channel.stream_stream(
  657. m,
  658. request_serializer,
  659. response_deserializer,
  660. _registered_method,
  661. )
  662. # pytype: enable=wrong-arg-count
  663. if isinstance(self._interceptor, grpc.StreamStreamClientInterceptor):
  664. return _StreamStreamMultiCallable(thunk, method, self._interceptor)
  665. else:
  666. return thunk(method)
  667. def _close(self):
  668. self._channel.close()
  669. def __enter__(self):
  670. return self
  671. def __exit__(self, exc_type, exc_val, exc_tb):
  672. self._close()
  673. return False
  674. def close(self):
  675. self._channel.close()
  676. def intercept_channel(
  677. channel: grpc.Channel,
  678. *interceptors: Optional[
  679. Sequence[
  680. Union[
  681. grpc.UnaryUnaryClientInterceptor,
  682. grpc.UnaryStreamClientInterceptor,
  683. grpc.StreamStreamClientInterceptor,
  684. grpc.StreamUnaryClientInterceptor,
  685. ]
  686. ]
  687. ],
  688. ) -> grpc.Channel:
  689. for interceptor in reversed(list(interceptors)):
  690. if (
  691. not isinstance(interceptor, grpc.UnaryUnaryClientInterceptor)
  692. and not isinstance(interceptor, grpc.UnaryStreamClientInterceptor)
  693. and not isinstance(interceptor, grpc.StreamUnaryClientInterceptor)
  694. and not isinstance(interceptor, grpc.StreamStreamClientInterceptor)
  695. ):
  696. raise TypeError(
  697. "interceptor must be "
  698. "grpc.UnaryUnaryClientInterceptor or "
  699. "grpc.UnaryStreamClientInterceptor or "
  700. "grpc.StreamUnaryClientInterceptor or "
  701. "grpc.StreamStreamClientInterceptor or "
  702. )
  703. channel = _Channel(channel, interceptor)
  704. return channel