utilities.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. # Copyright 2015 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Utilities for the gRPC Python Beta API."""
  15. import threading
  16. import time
  17. # implementations is referenced from specification in this module.
  18. from grpc.beta import implementations # pylint: disable=unused-import
  19. from grpc.beta import interfaces
  20. from grpc.framework.foundation import callable_util
  21. from grpc.framework.foundation import future
  22. _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
  23. 'Exception calling connectivity future "done" callback!'
  24. )
  25. class _ChannelReadyFuture(future.Future):
  26. def __init__(self, channel):
  27. self._condition = threading.Condition()
  28. self._channel = channel
  29. self._matured = False
  30. self._cancelled = False
  31. self._done_callbacks = []
  32. def _block(self, timeout):
  33. until = None if timeout is None else time.time() + timeout
  34. with self._condition:
  35. while True:
  36. if self._cancelled:
  37. raise future.CancelledError()
  38. elif self._matured:
  39. return
  40. else:
  41. if until is None:
  42. self._condition.wait()
  43. else:
  44. remaining = until - time.time()
  45. if remaining < 0:
  46. raise future.TimeoutError()
  47. else:
  48. self._condition.wait(timeout=remaining)
  49. def _update(self, connectivity):
  50. with self._condition:
  51. if (
  52. not self._cancelled
  53. and connectivity is interfaces.ChannelConnectivity.READY
  54. ):
  55. self._matured = True
  56. self._channel.unsubscribe(self._update)
  57. self._condition.notify_all()
  58. done_callbacks = tuple(self._done_callbacks)
  59. self._done_callbacks = None
  60. else:
  61. return
  62. for done_callback in done_callbacks:
  63. callable_util.call_logging_exceptions(
  64. done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self
  65. )
  66. def cancel(self):
  67. with self._condition:
  68. if not self._matured:
  69. self._cancelled = True
  70. self._channel.unsubscribe(self._update)
  71. self._condition.notify_all()
  72. done_callbacks = tuple(self._done_callbacks)
  73. self._done_callbacks = None
  74. else:
  75. return False
  76. for done_callback in done_callbacks:
  77. callable_util.call_logging_exceptions(
  78. done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self
  79. )
  80. return True
  81. def cancelled(self):
  82. with self._condition:
  83. return self._cancelled
  84. def running(self):
  85. with self._condition:
  86. return not self._cancelled and not self._matured
  87. def done(self):
  88. with self._condition:
  89. return self._cancelled or self._matured
  90. def result(self, timeout=None):
  91. self._block(timeout)
  92. return None
  93. def exception(self, timeout=None):
  94. self._block(timeout)
  95. return None
  96. def traceback(self, timeout=None):
  97. self._block(timeout)
  98. return None
  99. def add_done_callback(self, fn):
  100. with self._condition:
  101. if not self._cancelled and not self._matured:
  102. self._done_callbacks.append(fn)
  103. return
  104. fn(self)
  105. def start(self):
  106. with self._condition:
  107. self._channel.subscribe(self._update, try_to_connect=True)
  108. def __del__(self):
  109. with self._condition:
  110. if not self._cancelled and not self._matured:
  111. self._channel.unsubscribe(self._update)
  112. def channel_ready_future(channel):
  113. """Creates a future.Future tracking when an implementations.Channel is ready.
  114. Cancelling the returned future.Future does not tell the given
  115. implementations.Channel to abandon attempts it may have been making to
  116. connect; cancelling merely deactivates the return future.Future's
  117. subscription to the given implementations.Channel's connectivity.
  118. Args:
  119. channel: An implementations.Channel.
  120. Returns:
  121. A future.Future that matures when the given Channel has connectivity
  122. interfaces.ChannelConnectivity.READY.
  123. """
  124. ready_future = _ChannelReadyFuture(channel)
  125. ready_future.start()
  126. return ready_future