utils.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. """Utility functions for aiohappyeyeballs."""
  2. import ipaddress
  3. import socket
  4. from typing import Dict, List, Optional, Tuple, Union
  5. from .types import AddrInfoType
  6. def addr_to_addr_infos(
  7. addr: Optional[
  8. Union[Tuple[str, int, int, int], Tuple[str, int, int], Tuple[str, int]]
  9. ],
  10. ) -> Optional[List[AddrInfoType]]:
  11. """Convert an address tuple to a list of addr_info tuples."""
  12. if addr is None:
  13. return None
  14. host = addr[0]
  15. port = addr[1]
  16. is_ipv6 = ":" in host
  17. if is_ipv6:
  18. flowinfo = 0
  19. scopeid = 0
  20. addr_len = len(addr)
  21. if addr_len >= 4:
  22. scopeid = addr[3] # type: ignore[misc]
  23. if addr_len >= 3:
  24. flowinfo = addr[2] # type: ignore[misc]
  25. addr = (host, port, flowinfo, scopeid)
  26. family = socket.AF_INET6
  27. else:
  28. addr = (host, port)
  29. family = socket.AF_INET
  30. return [(family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)]
  31. def pop_addr_infos_interleave(
  32. addr_infos: List[AddrInfoType], interleave: Optional[int] = None
  33. ) -> None:
  34. """
  35. Pop addr_info from the list of addr_infos by family up to interleave times.
  36. The interleave parameter is used to know how many addr_infos for
  37. each family should be popped of the top of the list.
  38. """
  39. seen: Dict[int, int] = {}
  40. if interleave is None:
  41. interleave = 1
  42. to_remove: List[AddrInfoType] = []
  43. for addr_info in addr_infos:
  44. family = addr_info[0]
  45. if family not in seen:
  46. seen[family] = 0
  47. if seen[family] < interleave:
  48. to_remove.append(addr_info)
  49. seen[family] += 1
  50. for addr_info in to_remove:
  51. addr_infos.remove(addr_info)
  52. def _addr_tuple_to_ip_address(
  53. addr: Union[Tuple[str, int], Tuple[str, int, int, int]],
  54. ) -> Union[
  55. Tuple[ipaddress.IPv4Address, int], Tuple[ipaddress.IPv6Address, int, int, int]
  56. ]:
  57. """Convert an address tuple to an IPv4Address."""
  58. return (ipaddress.ip_address(addr[0]), *addr[1:])
  59. def remove_addr_infos(
  60. addr_infos: List[AddrInfoType],
  61. addr: Union[Tuple[str, int], Tuple[str, int, int, int]],
  62. ) -> None:
  63. """
  64. Remove an address from the list of addr_infos.
  65. The addr value is typically the return value of
  66. sock.getpeername().
  67. """
  68. bad_addrs_infos: List[AddrInfoType] = []
  69. for addr_info in addr_infos:
  70. if addr_info[-1] == addr:
  71. bad_addrs_infos.append(addr_info)
  72. if bad_addrs_infos:
  73. for bad_addr_info in bad_addrs_infos:
  74. addr_infos.remove(bad_addr_info)
  75. return
  76. # Slow path in case addr is formatted differently
  77. match_addr = _addr_tuple_to_ip_address(addr)
  78. for addr_info in addr_infos:
  79. if match_addr == _addr_tuple_to_ip_address(addr_info[-1]):
  80. bad_addrs_infos.append(addr_info)
  81. if bad_addrs_infos:
  82. for bad_addr_info in bad_addrs_infos:
  83. addr_infos.remove(bad_addr_info)
  84. return
  85. raise ValueError(f"Address {addr} not found in addr_infos")