| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- from __future__ import annotations
- import os
- import typing
- from ._models import Headers
- from ._types import CertTypes, HeaderTypes, TimeoutTypes
- from ._urls import URL
- if typing.TYPE_CHECKING:
- import ssl # pragma: no cover
- __all__ = ["Limits", "Proxy", "Timeout", "create_ssl_context"]
- class UnsetType:
- pass # pragma: no cover
- UNSET = UnsetType()
- def create_ssl_context(
- verify: ssl.SSLContext | str | bool = True,
- cert: CertTypes | None = None,
- trust_env: bool = True,
- ) -> ssl.SSLContext:
- import ssl
- import warnings
- import certifi
- if verify is True:
- if trust_env and os.environ.get("SSL_CERT_FILE"): # pragma: nocover
- ctx = ssl.create_default_context(cafile=os.environ["SSL_CERT_FILE"])
- elif trust_env and os.environ.get("SSL_CERT_DIR"): # pragma: nocover
- ctx = ssl.create_default_context(capath=os.environ["SSL_CERT_DIR"])
- else:
- # Default case...
- ctx = ssl.create_default_context(cafile=certifi.where())
- elif verify is False:
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- ctx.check_hostname = False
- ctx.verify_mode = ssl.CERT_NONE
- elif isinstance(verify, str): # pragma: nocover
- message = (
- "`verify=<str>` is deprecated. "
- "Use `verify=ssl.create_default_context(cafile=...)` "
- "or `verify=ssl.create_default_context(capath=...)` instead."
- )
- warnings.warn(message, DeprecationWarning)
- if os.path.isdir(verify):
- return ssl.create_default_context(capath=verify)
- return ssl.create_default_context(cafile=verify)
- else:
- ctx = verify
- if cert: # pragma: nocover
- message = (
- "`cert=...` is deprecated. Use `verify=<ssl_context>` instead,"
- "with `.load_cert_chain()` to configure the certificate chain."
- )
- warnings.warn(message, DeprecationWarning)
- if isinstance(cert, str):
- ctx.load_cert_chain(cert)
- else:
- ctx.load_cert_chain(*cert)
- return ctx
- class Timeout:
- """
- Timeout configuration.
- **Usage**:
- Timeout(None) # No timeouts.
- Timeout(5.0) # 5s timeout on all operations.
- Timeout(None, connect=5.0) # 5s timeout on connect, no other timeouts.
- Timeout(5.0, connect=10.0) # 10s timeout on connect. 5s timeout elsewhere.
- Timeout(5.0, pool=None) # No timeout on acquiring connection from pool.
- # 5s timeout elsewhere.
- """
- def __init__(
- self,
- timeout: TimeoutTypes | UnsetType = UNSET,
- *,
- connect: None | float | UnsetType = UNSET,
- read: None | float | UnsetType = UNSET,
- write: None | float | UnsetType = UNSET,
- pool: None | float | UnsetType = UNSET,
- ) -> None:
- if isinstance(timeout, Timeout):
- # Passed as a single explicit Timeout.
- assert connect is UNSET
- assert read is UNSET
- assert write is UNSET
- assert pool is UNSET
- self.connect = timeout.connect # type: typing.Optional[float]
- self.read = timeout.read # type: typing.Optional[float]
- self.write = timeout.write # type: typing.Optional[float]
- self.pool = timeout.pool # type: typing.Optional[float]
- elif isinstance(timeout, tuple):
- # Passed as a tuple.
- self.connect = timeout[0]
- self.read = timeout[1]
- self.write = None if len(timeout) < 3 else timeout[2]
- self.pool = None if len(timeout) < 4 else timeout[3]
- elif not (
- isinstance(connect, UnsetType)
- or isinstance(read, UnsetType)
- or isinstance(write, UnsetType)
- or isinstance(pool, UnsetType)
- ):
- self.connect = connect
- self.read = read
- self.write = write
- self.pool = pool
- else:
- if isinstance(timeout, UnsetType):
- raise ValueError(
- "httpx.Timeout must either include a default, or set all "
- "four parameters explicitly."
- )
- self.connect = timeout if isinstance(connect, UnsetType) else connect
- self.read = timeout if isinstance(read, UnsetType) else read
- self.write = timeout if isinstance(write, UnsetType) else write
- self.pool = timeout if isinstance(pool, UnsetType) else pool
- def as_dict(self) -> dict[str, float | None]:
- return {
- "connect": self.connect,
- "read": self.read,
- "write": self.write,
- "pool": self.pool,
- }
- def __eq__(self, other: typing.Any) -> bool:
- return (
- isinstance(other, self.__class__)
- and self.connect == other.connect
- and self.read == other.read
- and self.write == other.write
- and self.pool == other.pool
- )
- def __repr__(self) -> str:
- class_name = self.__class__.__name__
- if len({self.connect, self.read, self.write, self.pool}) == 1:
- return f"{class_name}(timeout={self.connect})"
- return (
- f"{class_name}(connect={self.connect}, "
- f"read={self.read}, write={self.write}, pool={self.pool})"
- )
- class Limits:
- """
- Configuration for limits to various client behaviors.
- **Parameters:**
- * **max_connections** - The maximum number of concurrent connections that may be
- established.
- * **max_keepalive_connections** - Allow the connection pool to maintain
- keep-alive connections below this point. Should be less than or equal
- to `max_connections`.
- * **keepalive_expiry** - Time limit on idle keep-alive connections in seconds.
- """
- def __init__(
- self,
- *,
- max_connections: int | None = None,
- max_keepalive_connections: int | None = None,
- keepalive_expiry: float | None = 5.0,
- ) -> None:
- self.max_connections = max_connections
- self.max_keepalive_connections = max_keepalive_connections
- self.keepalive_expiry = keepalive_expiry
- def __eq__(self, other: typing.Any) -> bool:
- return (
- isinstance(other, self.__class__)
- and self.max_connections == other.max_connections
- and self.max_keepalive_connections == other.max_keepalive_connections
- and self.keepalive_expiry == other.keepalive_expiry
- )
- def __repr__(self) -> str:
- class_name = self.__class__.__name__
- return (
- f"{class_name}(max_connections={self.max_connections}, "
- f"max_keepalive_connections={self.max_keepalive_connections}, "
- f"keepalive_expiry={self.keepalive_expiry})"
- )
- class Proxy:
- def __init__(
- self,
- url: URL | str,
- *,
- ssl_context: ssl.SSLContext | None = None,
- auth: tuple[str, str] | None = None,
- headers: HeaderTypes | None = None,
- ) -> None:
- url = URL(url)
- headers = Headers(headers)
- if url.scheme not in ("http", "https", "socks5", "socks5h"):
- raise ValueError(f"Unknown scheme for proxy URL {url!r}")
- if url.username or url.password:
- # Remove any auth credentials from the URL.
- auth = (url.username, url.password)
- url = url.copy_with(username=None, password=None)
- self.url = url
- self.auth = auth
- self.headers = headers
- self.ssl_context = ssl_context
- @property
- def raw_auth(self) -> tuple[bytes, bytes] | None:
- # The proxy authentication as raw bytes.
- return (
- None
- if self.auth is None
- else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8"))
- )
- def __repr__(self) -> str:
- # The authentication is represented with the password component masked.
- auth = (self.auth[0], "********") if self.auth else None
- # Build a nice concise representation.
- url_str = f"{str(self.url)!r}"
- auth_str = f", auth={auth!r}" if auth else ""
- headers_str = f", headers={dict(self.headers)!r}" if self.headers else ""
- return f"Proxy({url_str}{auth_str}{headers_str})"
- DEFAULT_TIMEOUT_CONFIG = Timeout(timeout=5.0)
- DEFAULT_LIMITS = Limits(max_connections=100, max_keepalive_connections=20)
- DEFAULT_MAX_REDIRECTS = 20
|