_utils.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. from __future__ import annotations
  2. import ipaddress
  3. import os
  4. import re
  5. import typing
  6. from urllib.request import getproxies
  7. from ._types import PrimitiveData
  8. if typing.TYPE_CHECKING: # pragma: no cover
  9. from ._urls import URL
  10. def primitive_value_to_str(value: PrimitiveData) -> str:
  11. """
  12. Coerce a primitive data type into a string value.
  13. Note that we prefer JSON-style 'true'/'false' for boolean values here.
  14. """
  15. if value is True:
  16. return "true"
  17. elif value is False:
  18. return "false"
  19. elif value is None:
  20. return ""
  21. return str(value)
  22. def get_environment_proxies() -> dict[str, str | None]:
  23. """Gets proxy information from the environment"""
  24. # urllib.request.getproxies() falls back on System
  25. # Registry and Config for proxies on Windows and macOS.
  26. # We don't want to propagate non-HTTP proxies into
  27. # our configuration such as 'TRAVIS_APT_PROXY'.
  28. proxy_info = getproxies()
  29. mounts: dict[str, str | None] = {}
  30. for scheme in ("http", "https", "all"):
  31. if proxy_info.get(scheme):
  32. hostname = proxy_info[scheme]
  33. mounts[f"{scheme}://"] = (
  34. hostname if "://" in hostname else f"http://{hostname}"
  35. )
  36. no_proxy_hosts = [host.strip() for host in proxy_info.get("no", "").split(",")]
  37. for hostname in no_proxy_hosts:
  38. # See https://curl.haxx.se/libcurl/c/CURLOPT_NOPROXY.html for details
  39. # on how names in `NO_PROXY` are handled.
  40. if hostname == "*":
  41. # If NO_PROXY=* is used or if "*" occurs as any one of the comma
  42. # separated hostnames, then we should just bypass any information
  43. # from HTTP_PROXY, HTTPS_PROXY, ALL_PROXY, and always ignore
  44. # proxies.
  45. return {}
  46. elif hostname:
  47. # NO_PROXY=.google.com is marked as "all://*.google.com,
  48. # which disables "www.google.com" but not "google.com"
  49. # NO_PROXY=google.com is marked as "all://*google.com,
  50. # which disables "www.google.com" and "google.com".
  51. # (But not "wwwgoogle.com")
  52. # NO_PROXY can include domains, IPv6, IPv4 addresses and "localhost"
  53. # NO_PROXY=example.com,::1,localhost,192.168.0.0/16
  54. if "://" in hostname:
  55. mounts[hostname] = None
  56. elif is_ipv4_hostname(hostname):
  57. mounts[f"all://{hostname}"] = None
  58. elif is_ipv6_hostname(hostname):
  59. mounts[f"all://[{hostname}]"] = None
  60. elif hostname.lower() == "localhost":
  61. mounts[f"all://{hostname}"] = None
  62. else:
  63. mounts[f"all://*{hostname}"] = None
  64. return mounts
  65. def to_bytes(value: str | bytes, encoding: str = "utf-8") -> bytes:
  66. return value.encode(encoding) if isinstance(value, str) else value
  67. def to_str(value: str | bytes, encoding: str = "utf-8") -> str:
  68. return value if isinstance(value, str) else value.decode(encoding)
  69. def to_bytes_or_str(value: str, match_type_of: typing.AnyStr) -> typing.AnyStr:
  70. return value if isinstance(match_type_of, str) else value.encode()
  71. def unquote(value: str) -> str:
  72. return value[1:-1] if value[0] == value[-1] == '"' else value
  73. def peek_filelike_length(stream: typing.Any) -> int | None:
  74. """
  75. Given a file-like stream object, return its length in number of bytes
  76. without reading it into memory.
  77. """
  78. try:
  79. # Is it an actual file?
  80. fd = stream.fileno()
  81. # Yup, seems to be an actual file.
  82. length = os.fstat(fd).st_size
  83. except (AttributeError, OSError):
  84. # No... Maybe it's something that supports random access, like `io.BytesIO`?
  85. try:
  86. # Assuming so, go to end of stream to figure out its length,
  87. # then put it back in place.
  88. offset = stream.tell()
  89. length = stream.seek(0, os.SEEK_END)
  90. stream.seek(offset)
  91. except (AttributeError, OSError):
  92. # Not even that? Sorry, we're doomed...
  93. return None
  94. return length
  95. class URLPattern:
  96. """
  97. A utility class currently used for making lookups against proxy keys...
  98. # Wildcard matching...
  99. >>> pattern = URLPattern("all://")
  100. >>> pattern.matches(httpx.URL("http://example.com"))
  101. True
  102. # Witch scheme matching...
  103. >>> pattern = URLPattern("https://")
  104. >>> pattern.matches(httpx.URL("https://example.com"))
  105. True
  106. >>> pattern.matches(httpx.URL("http://example.com"))
  107. False
  108. # With domain matching...
  109. >>> pattern = URLPattern("https://example.com")
  110. >>> pattern.matches(httpx.URL("https://example.com"))
  111. True
  112. >>> pattern.matches(httpx.URL("http://example.com"))
  113. False
  114. >>> pattern.matches(httpx.URL("https://other.com"))
  115. False
  116. # Wildcard scheme, with domain matching...
  117. >>> pattern = URLPattern("all://example.com")
  118. >>> pattern.matches(httpx.URL("https://example.com"))
  119. True
  120. >>> pattern.matches(httpx.URL("http://example.com"))
  121. True
  122. >>> pattern.matches(httpx.URL("https://other.com"))
  123. False
  124. # With port matching...
  125. >>> pattern = URLPattern("https://example.com:1234")
  126. >>> pattern.matches(httpx.URL("https://example.com:1234"))
  127. True
  128. >>> pattern.matches(httpx.URL("https://example.com"))
  129. False
  130. """
  131. def __init__(self, pattern: str) -> None:
  132. from ._urls import URL
  133. if pattern and ":" not in pattern:
  134. raise ValueError(
  135. f"Proxy keys should use proper URL forms rather "
  136. f"than plain scheme strings. "
  137. f'Instead of "{pattern}", use "{pattern}://"'
  138. )
  139. url = URL(pattern)
  140. self.pattern = pattern
  141. self.scheme = "" if url.scheme == "all" else url.scheme
  142. self.host = "" if url.host == "*" else url.host
  143. self.port = url.port
  144. if not url.host or url.host == "*":
  145. self.host_regex: typing.Pattern[str] | None = None
  146. elif url.host.startswith("*."):
  147. # *.example.com should match "www.example.com", but not "example.com"
  148. domain = re.escape(url.host[2:])
  149. self.host_regex = re.compile(f"^.+\\.{domain}$")
  150. elif url.host.startswith("*"):
  151. # *example.com should match "www.example.com" and "example.com"
  152. domain = re.escape(url.host[1:])
  153. self.host_regex = re.compile(f"^(.+\\.)?{domain}$")
  154. else:
  155. # example.com should match "example.com" but not "www.example.com"
  156. domain = re.escape(url.host)
  157. self.host_regex = re.compile(f"^{domain}$")
  158. def matches(self, other: URL) -> bool:
  159. if self.scheme and self.scheme != other.scheme:
  160. return False
  161. if (
  162. self.host
  163. and self.host_regex is not None
  164. and not self.host_regex.match(other.host)
  165. ):
  166. return False
  167. if self.port is not None and self.port != other.port:
  168. return False
  169. return True
  170. @property
  171. def priority(self) -> tuple[int, int, int]:
  172. """
  173. The priority allows URLPattern instances to be sortable, so that
  174. we can match from most specific to least specific.
  175. """
  176. # URLs with a port should take priority over URLs without a port.
  177. port_priority = 0 if self.port is not None else 1
  178. # Longer hostnames should match first.
  179. host_priority = -len(self.host)
  180. # Longer schemes should match first.
  181. scheme_priority = -len(self.scheme)
  182. return (port_priority, host_priority, scheme_priority)
  183. def __hash__(self) -> int:
  184. return hash(self.pattern)
  185. def __lt__(self, other: URLPattern) -> bool:
  186. return self.priority < other.priority
  187. def __eq__(self, other: typing.Any) -> bool:
  188. return isinstance(other, URLPattern) and self.pattern == other.pattern
  189. def is_ipv4_hostname(hostname: str) -> bool:
  190. try:
  191. ipaddress.IPv4Address(hostname.split("/")[0])
  192. except Exception:
  193. return False
  194. return True
  195. def is_ipv6_hostname(hostname: str) -> bool:
  196. try:
  197. ipaddress.IPv6Address(hostname.split("/")[0])
  198. except Exception:
  199. return False
  200. return True