_routing.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. # Copyright (c) "Neo4j"
  2. # Neo4j Sweden AB [https://neo4j.com]
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # https://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from collections.abc import MutableSet
  16. from contextlib import suppress
  17. from logging import getLogger
  18. from time import monotonic
  19. from .addressing import Address
  20. log = getLogger("neo4j.pool")
  21. class OrderedSet(MutableSet):
  22. def __init__(self, elements=()):
  23. # dicts keep insertion order starting with Python 3.7
  24. self._elements = dict.fromkeys(elements)
  25. self._current = None
  26. def __repr__(self):
  27. return f"{{{', '.join(map(repr, self._elements))}}}"
  28. def __contains__(self, element):
  29. return element in self._elements
  30. def __iter__(self):
  31. return iter(self._elements)
  32. def __len__(self):
  33. return len(self._elements)
  34. def __getitem__(self, index):
  35. return list(self._elements.keys())[index]
  36. def add(self, element):
  37. self._elements[element] = None
  38. def clear(self):
  39. self._elements.clear()
  40. def discard(self, element):
  41. with suppress(KeyError):
  42. del self._elements[element]
  43. def remove(self, element):
  44. try:
  45. del self._elements[element]
  46. except KeyError:
  47. raise ValueError(element) from None
  48. def update(self, elements=()):
  49. self._elements.update(dict.fromkeys(elements))
  50. def replace(self, elements=()):
  51. e = self._elements
  52. e.clear()
  53. e.update(dict.fromkeys(elements))
  54. class RoutingTable:
  55. @classmethod
  56. def parse_routing_info(cls, *, database, servers, ttl):
  57. """
  58. Construct a new RoutingTable instance from the given routing info.
  59. The routing table info is returned from the procedure or BOLT message.
  60. """
  61. routers = []
  62. readers = []
  63. writers = []
  64. try:
  65. for server in servers:
  66. role = server["role"]
  67. addresses = [
  68. Address.parse(address, default_port=7687)
  69. for address in server["addresses"]
  70. ]
  71. if role == "ROUTE":
  72. routers.extend(addresses)
  73. elif role == "READ":
  74. readers.extend(addresses)
  75. elif role == "WRITE":
  76. writers.extend(addresses)
  77. except (KeyError, TypeError) as exc:
  78. raise ValueError("Cannot parse routing info") from exc
  79. else:
  80. return cls(
  81. database=database,
  82. routers=routers,
  83. readers=readers,
  84. writers=writers,
  85. ttl=ttl,
  86. )
  87. def __init__(self, *, database, routers=(), readers=(), writers=(), ttl=0):
  88. self.initial_routers = OrderedSet(routers)
  89. self.routers = OrderedSet(routers)
  90. self.readers = OrderedSet(readers)
  91. self.writers = OrderedSet(writers)
  92. self.initialized_without_writers = not self.writers
  93. self.last_updated_time = monotonic()
  94. self.ttl = ttl
  95. self.database = database
  96. def __repr__(self):
  97. return (
  98. f"RoutingTable(database={self.database!r}, "
  99. f"routers={self.routers!r}, readers={self.readers!r}, "
  100. f"writers={self.writers!r}, "
  101. f"last_updated_time={self.last_updated_time!r}, ttl={self.ttl!r})"
  102. )
  103. def __contains__(self, address):
  104. return (
  105. address in self.routers
  106. or address in self.readers
  107. or address in self.writers
  108. )
  109. def is_fresh(self, readonly=False):
  110. """Indicate whether routing information is still usable."""
  111. assert isinstance(readonly, bool)
  112. expired = self.last_updated_time + self.ttl <= monotonic()
  113. if readonly:
  114. has_server_for_mode = bool(self.readers)
  115. else:
  116. has_server_for_mode = bool(self.writers)
  117. res = not expired and self.routers and has_server_for_mode
  118. log.debug(
  119. "[#0000] _: <ROUTING> checking table freshness "
  120. "(readonly=%r): table expired=%r, "
  121. "has_server_for_mode=%r, table routers=%r => %r",
  122. readonly,
  123. expired,
  124. has_server_for_mode,
  125. self.routers,
  126. res,
  127. )
  128. return res
  129. def should_be_purged_from_memory(self):
  130. """
  131. Check if routing table needs to be purged from memory.
  132. This is the case if the routing table is stale and has not been used
  133. for a long time.
  134. :returns: Returns true if it is old and not used for a while.
  135. :rtype: bool
  136. """
  137. from ._conf import RoutingConfig
  138. perf_time = monotonic()
  139. valid_until = (
  140. self.last_updated_time
  141. + self.ttl
  142. + RoutingConfig.routing_table_purge_delay
  143. )
  144. should_be_purged = valid_until <= perf_time
  145. log.debug(
  146. "[#0000] _: <ROUTING> purge check: "
  147. "last_updated_time=%r, ttl=%r, perf_time=%r => %r",
  148. self.last_updated_time,
  149. self.ttl,
  150. perf_time,
  151. should_be_purged,
  152. )
  153. return should_be_purged
  154. def update(self, new_routing_table):
  155. """Update the routing table with new routing information."""
  156. self.routers.replace(new_routing_table.routers)
  157. self.readers.replace(new_routing_table.readers)
  158. self.writers.replace(new_routing_table.writers)
  159. self.initialized_without_writers = not self.writers
  160. self.last_updated_time = monotonic()
  161. self.ttl = new_routing_table.ttl
  162. log.debug("[#0000] _: <ROUTING> updated table=%r", self)
  163. def servers(self):
  164. return set(self.routers) | set(self.writers) | set(self.readers)