| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604 |
- import re
- import sys
- import warnings
- from collections.abc import Mapping, Sequence
- from enum import Enum
- from functools import _CacheInfo, lru_cache
- from ipaddress import ip_address
- from typing import TYPE_CHECKING, Any, NoReturn, TypedDict, TypeVar, Union, overload
- from urllib.parse import SplitResult, uses_relative
- import idna
- from multidict import MultiDict, MultiDictProxy
- from propcache.api import under_cached_property as cached_property
- from ._parse import (
- USES_AUTHORITY,
- SplitURLType,
- make_netloc,
- query_to_pairs,
- split_netloc,
- split_url,
- unsplit_result,
- )
- from ._path import normalize_path, normalize_path_segments
- from ._query import (
- Query,
- QueryVariable,
- SimpleQuery,
- get_str_query,
- get_str_query_from_iterable,
- get_str_query_from_sequence_iterable,
- )
- from ._quoters import (
- FRAGMENT_QUOTER,
- FRAGMENT_REQUOTER,
- PATH_QUOTER,
- PATH_REQUOTER,
- PATH_SAFE_UNQUOTER,
- PATH_UNQUOTER,
- QS_UNQUOTER,
- QUERY_QUOTER,
- QUERY_REQUOTER,
- QUOTER,
- REQUOTER,
- UNQUOTER,
- human_quote,
- )
- DEFAULT_PORTS = {"http": 80, "https": 443, "ws": 80, "wss": 443, "ftp": 21}
- USES_RELATIVE = frozenset(uses_relative)
- # Special schemes https://url.spec.whatwg.org/#special-scheme
- # are not allowed to have an empty host https://url.spec.whatwg.org/#url-representation
- SCHEME_REQUIRES_HOST = frozenset(("http", "https", "ws", "wss", "ftp"))
- # reg-name: unreserved / pct-encoded / sub-delims
- # this pattern matches anything that is *not* in those classes. and is only used
- # on lower-cased ASCII values.
- NOT_REG_NAME = re.compile(
- r"""
- # any character not in the unreserved or sub-delims sets, plus %
- # (validated with the additional check for pct-encoded sequences below)
- [^a-z0-9\-._~!$&'()*+,;=%]
- |
- # % only allowed if it is part of a pct-encoded
- # sequence of 2 hex digits.
- %(?![0-9a-f]{2})
- """,
- re.VERBOSE,
- )
- _T = TypeVar("_T")
- if sys.version_info >= (3, 11):
- from typing import Self
- else:
- Self = Any
- class UndefinedType(Enum):
- """Singleton type for use with not set sentinel values."""
- _singleton = 0
- UNDEFINED = UndefinedType._singleton
- class CacheInfo(TypedDict):
- """Host encoding cache."""
- idna_encode: _CacheInfo
- idna_decode: _CacheInfo
- ip_address: _CacheInfo
- host_validate: _CacheInfo
- encode_host: _CacheInfo
- class _InternalURLCache(TypedDict, total=False):
- _val: SplitURLType
- _origin: "URL"
- absolute: bool
- hash: int
- scheme: str
- raw_authority: str
- authority: str
- raw_user: Union[str, None]
- user: Union[str, None]
- raw_password: Union[str, None]
- password: Union[str, None]
- raw_host: Union[str, None]
- host: Union[str, None]
- host_subcomponent: Union[str, None]
- host_port_subcomponent: Union[str, None]
- port: Union[int, None]
- explicit_port: Union[int, None]
- raw_path: str
- path: str
- _parsed_query: list[tuple[str, str]]
- query: "MultiDictProxy[str]"
- raw_query_string: str
- query_string: str
- path_qs: str
- raw_path_qs: str
- raw_fragment: str
- fragment: str
- raw_parts: tuple[str, ...]
- parts: tuple[str, ...]
- parent: "URL"
- raw_name: str
- name: str
- raw_suffix: str
- suffix: str
- raw_suffixes: tuple[str, ...]
- suffixes: tuple[str, ...]
- def rewrite_module(obj: _T) -> _T:
- obj.__module__ = "yarl"
- return obj
- @lru_cache
- def encode_url(url_str: str) -> "URL":
- """Parse unencoded URL."""
- cache: _InternalURLCache = {}
- host: Union[str, None]
- scheme, netloc, path, query, fragment = split_url(url_str)
- if not netloc: # netloc
- host = ""
- else:
- if ":" in netloc or "@" in netloc or "[" in netloc:
- # Complex netloc
- username, password, host, port = split_netloc(netloc)
- else:
- username = password = port = None
- host = netloc
- if host is None:
- if scheme in SCHEME_REQUIRES_HOST:
- msg = (
- "Invalid URL: host is required for "
- f"absolute urls with the {scheme} scheme"
- )
- raise ValueError(msg)
- else:
- host = ""
- host = _encode_host(host, validate_host=False)
- # Remove brackets as host encoder adds back brackets for IPv6 addresses
- cache["raw_host"] = host[1:-1] if "[" in host else host
- cache["explicit_port"] = port
- if password is None and username is None:
- # Fast path for URLs without user, password
- netloc = host if port is None else f"{host}:{port}"
- cache["raw_user"] = None
- cache["raw_password"] = None
- else:
- raw_user = REQUOTER(username) if username else username
- raw_password = REQUOTER(password) if password else password
- netloc = make_netloc(raw_user, raw_password, host, port)
- cache["raw_user"] = raw_user
- cache["raw_password"] = raw_password
- if path:
- path = PATH_REQUOTER(path)
- if netloc and "." in path:
- path = normalize_path(path)
- if query:
- query = QUERY_REQUOTER(query)
- if fragment:
- fragment = FRAGMENT_REQUOTER(fragment)
- cache["scheme"] = scheme
- cache["raw_path"] = "/" if not path and netloc else path
- cache["raw_query_string"] = query
- cache["raw_fragment"] = fragment
- self = object.__new__(URL)
- self._scheme = scheme
- self._netloc = netloc
- self._path = path
- self._query = query
- self._fragment = fragment
- self._cache = cache
- return self
- @lru_cache
- def pre_encoded_url(url_str: str) -> "URL":
- """Parse pre-encoded URL."""
- self = object.__new__(URL)
- val = split_url(url_str)
- self._scheme, self._netloc, self._path, self._query, self._fragment = val
- self._cache = {}
- return self
- @lru_cache
- def build_pre_encoded_url(
- scheme: str,
- authority: str,
- user: Union[str, None],
- password: Union[str, None],
- host: str,
- port: Union[int, None],
- path: str,
- query_string: str,
- fragment: str,
- ) -> "URL":
- """Build a pre-encoded URL from parts."""
- self = object.__new__(URL)
- self._scheme = scheme
- if authority:
- self._netloc = authority
- elif host:
- if port is not None:
- port = None if port == DEFAULT_PORTS.get(scheme) else port
- if user is None and password is None:
- self._netloc = host if port is None else f"{host}:{port}"
- else:
- self._netloc = make_netloc(user, password, host, port)
- else:
- self._netloc = ""
- self._path = path
- self._query = query_string
- self._fragment = fragment
- self._cache = {}
- return self
- def from_parts_uncached(
- scheme: str, netloc: str, path: str, query: str, fragment: str
- ) -> "URL":
- """Create a new URL from parts."""
- self = object.__new__(URL)
- self._scheme = scheme
- self._netloc = netloc
- self._path = path
- self._query = query
- self._fragment = fragment
- self._cache = {}
- return self
- from_parts = lru_cache(from_parts_uncached)
- @rewrite_module
- class URL:
- # Don't derive from str
- # follow pathlib.Path design
- # probably URL will not suffer from pathlib problems:
- # it's intended for libraries like aiohttp,
- # not to be passed into standard library functions like os.open etc.
- # URL grammar (RFC 3986)
- # pct-encoded = "%" HEXDIG HEXDIG
- # reserved = gen-delims / sub-delims
- # gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@"
- # sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
- # / "*" / "+" / "," / ";" / "="
- # unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
- # URI = scheme ":" hier-part [ "?" query ] [ "#" fragment ]
- # hier-part = "//" authority path-abempty
- # / path-absolute
- # / path-rootless
- # / path-empty
- # scheme = ALPHA *( ALPHA / DIGIT / "+" / "-" / "." )
- # authority = [ userinfo "@" ] host [ ":" port ]
- # userinfo = *( unreserved / pct-encoded / sub-delims / ":" )
- # host = IP-literal / IPv4address / reg-name
- # IP-literal = "[" ( IPv6address / IPvFuture ) "]"
- # IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" )
- # IPv6address = 6( h16 ":" ) ls32
- # / "::" 5( h16 ":" ) ls32
- # / [ h16 ] "::" 4( h16 ":" ) ls32
- # / [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32
- # / [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32
- # / [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32
- # / [ *4( h16 ":" ) h16 ] "::" ls32
- # / [ *5( h16 ":" ) h16 ] "::" h16
- # / [ *6( h16 ":" ) h16 ] "::"
- # ls32 = ( h16 ":" h16 ) / IPv4address
- # ; least-significant 32 bits of address
- # h16 = 1*4HEXDIG
- # ; 16 bits of address represented in hexadecimal
- # IPv4address = dec-octet "." dec-octet "." dec-octet "." dec-octet
- # dec-octet = DIGIT ; 0-9
- # / %x31-39 DIGIT ; 10-99
- # / "1" 2DIGIT ; 100-199
- # / "2" %x30-34 DIGIT ; 200-249
- # / "25" %x30-35 ; 250-255
- # reg-name = *( unreserved / pct-encoded / sub-delims )
- # port = *DIGIT
- # path = path-abempty ; begins with "/" or is empty
- # / path-absolute ; begins with "/" but not "//"
- # / path-noscheme ; begins with a non-colon segment
- # / path-rootless ; begins with a segment
- # / path-empty ; zero characters
- # path-abempty = *( "/" segment )
- # path-absolute = "/" [ segment-nz *( "/" segment ) ]
- # path-noscheme = segment-nz-nc *( "/" segment )
- # path-rootless = segment-nz *( "/" segment )
- # path-empty = 0<pchar>
- # segment = *pchar
- # segment-nz = 1*pchar
- # segment-nz-nc = 1*( unreserved / pct-encoded / sub-delims / "@" )
- # ; non-zero-length segment without any colon ":"
- # pchar = unreserved / pct-encoded / sub-delims / ":" / "@"
- # query = *( pchar / "/" / "?" )
- # fragment = *( pchar / "/" / "?" )
- # URI-reference = URI / relative-ref
- # relative-ref = relative-part [ "?" query ] [ "#" fragment ]
- # relative-part = "//" authority path-abempty
- # / path-absolute
- # / path-noscheme
- # / path-empty
- # absolute-URI = scheme ":" hier-part [ "?" query ]
- __slots__ = ("_cache", "_scheme", "_netloc", "_path", "_query", "_fragment")
- _cache: _InternalURLCache
- _scheme: str
- _netloc: str
- _path: str
- _query: str
- _fragment: str
- def __new__(
- cls,
- val: Union[str, SplitResult, "URL", UndefinedType] = UNDEFINED,
- *,
- encoded: bool = False,
- strict: Union[bool, None] = None,
- ) -> "URL":
- if strict is not None: # pragma: no cover
- warnings.warn("strict parameter is ignored")
- if type(val) is str:
- return pre_encoded_url(val) if encoded else encode_url(val)
- if type(val) is cls:
- return val
- if type(val) is SplitResult:
- if not encoded:
- raise ValueError("Cannot apply decoding to SplitResult")
- return from_parts(*val)
- if isinstance(val, str):
- return pre_encoded_url(str(val)) if encoded else encode_url(str(val))
- if val is UNDEFINED:
- # Special case for UNDEFINED since it might be unpickling and we do
- # not want to cache as the `__set_state__` call would mutate the URL
- # object in the `pre_encoded_url` or `encoded_url` caches.
- self = object.__new__(URL)
- self._scheme = self._netloc = self._path = self._query = self._fragment = ""
- self._cache = {}
- return self
- raise TypeError("Constructor parameter should be str")
- @classmethod
- def build(
- cls,
- *,
- scheme: str = "",
- authority: str = "",
- user: Union[str, None] = None,
- password: Union[str, None] = None,
- host: str = "",
- port: Union[int, None] = None,
- path: str = "",
- query: Union[Query, None] = None,
- query_string: str = "",
- fragment: str = "",
- encoded: bool = False,
- ) -> "URL":
- """Creates and returns a new URL"""
- if authority and (user or password or host or port):
- raise ValueError(
- 'Can\'t mix "authority" with "user", "password", "host" or "port".'
- )
- if port is not None and not isinstance(port, int):
- raise TypeError(f"The port is required to be int, got {type(port)!r}.")
- if port and not host:
- raise ValueError('Can\'t build URL with "port" but without "host".')
- if query and query_string:
- raise ValueError('Only one of "query" or "query_string" should be passed')
- if (
- scheme is None # type: ignore[redundant-expr]
- or authority is None # type: ignore[redundant-expr]
- or host is None # type: ignore[redundant-expr]
- or path is None # type: ignore[redundant-expr]
- or query_string is None # type: ignore[redundant-expr]
- or fragment is None
- ):
- raise TypeError(
- 'NoneType is illegal for "scheme", "authority", "host", "path", '
- '"query_string", and "fragment" args, use empty string instead.'
- )
- if query:
- query_string = get_str_query(query) or ""
- if encoded:
- return build_pre_encoded_url(
- scheme,
- authority,
- user,
- password,
- host,
- port,
- path,
- query_string,
- fragment,
- )
- self = object.__new__(URL)
- self._scheme = scheme
- _host: Union[str, None] = None
- if authority:
- user, password, _host, port = split_netloc(authority)
- _host = _encode_host(_host, validate_host=False) if _host else ""
- elif host:
- _host = _encode_host(host, validate_host=True)
- else:
- self._netloc = ""
- if _host is not None:
- if port is not None:
- port = None if port == DEFAULT_PORTS.get(scheme) else port
- if user is None and password is None:
- self._netloc = _host if port is None else f"{_host}:{port}"
- else:
- self._netloc = make_netloc(user, password, _host, port, True)
- path = PATH_QUOTER(path) if path else path
- if path and self._netloc:
- if "." in path:
- path = normalize_path(path)
- if path[0] != "/":
- msg = (
- "Path in a URL with authority should "
- "start with a slash ('/') if set"
- )
- raise ValueError(msg)
- self._path = path
- if not query and query_string:
- query_string = QUERY_QUOTER(query_string)
- self._query = query_string
- self._fragment = FRAGMENT_QUOTER(fragment) if fragment else fragment
- self._cache = {}
- return self
- def __init_subclass__(cls) -> NoReturn:
- raise TypeError(f"Inheriting a class {cls!r} from URL is forbidden")
- def __str__(self) -> str:
- if not self._path and self._netloc and (self._query or self._fragment):
- path = "/"
- else:
- path = self._path
- if (port := self.explicit_port) is not None and port == DEFAULT_PORTS.get(
- self._scheme
- ):
- # port normalization - using None for default ports to remove from rendering
- # https://datatracker.ietf.org/doc/html/rfc3986.html#section-6.2.3
- host = self.host_subcomponent
- netloc = make_netloc(self.raw_user, self.raw_password, host, None)
- else:
- netloc = self._netloc
- return unsplit_result(self._scheme, netloc, path, self._query, self._fragment)
- def __repr__(self) -> str:
- return f"{self.__class__.__name__}('{str(self)}')"
- def __bytes__(self) -> bytes:
- return str(self).encode("ascii")
- def __eq__(self, other: object) -> bool:
- if type(other) is not URL:
- return NotImplemented
- path1 = "/" if not self._path and self._netloc else self._path
- path2 = "/" if not other._path and other._netloc else other._path
- return (
- self._scheme == other._scheme
- and self._netloc == other._netloc
- and path1 == path2
- and self._query == other._query
- and self._fragment == other._fragment
- )
- def __hash__(self) -> int:
- if (ret := self._cache.get("hash")) is None:
- path = "/" if not self._path and self._netloc else self._path
- ret = self._cache["hash"] = hash(
- (self._scheme, self._netloc, path, self._query, self._fragment)
- )
- return ret
- def __le__(self, other: object) -> bool:
- if type(other) is not URL:
- return NotImplemented
- return self._val <= other._val
- def __lt__(self, other: object) -> bool:
- if type(other) is not URL:
- return NotImplemented
- return self._val < other._val
- def __ge__(self, other: object) -> bool:
- if type(other) is not URL:
- return NotImplemented
- return self._val >= other._val
- def __gt__(self, other: object) -> bool:
- if type(other) is not URL:
- return NotImplemented
- return self._val > other._val
- def __truediv__(self, name: str) -> "URL":
- if not isinstance(name, str):
- return NotImplemented # type: ignore[unreachable]
- return self._make_child((str(name),))
- def __mod__(self, query: Query) -> "URL":
- return self.update_query(query)
- def __bool__(self) -> bool:
- return bool(self._netloc or self._path or self._query or self._fragment)
- def __getstate__(self) -> tuple[SplitResult]:
- return (tuple.__new__(SplitResult, self._val),)
- def __setstate__(
- self, state: Union[tuple[SplitURLType], tuple[None, _InternalURLCache]]
- ) -> None:
- if state[0] is None and isinstance(state[1], dict):
- # default style pickle
- val = state[1]["_val"]
- else:
- unused: list[object]
- val, *unused = state
- self._scheme, self._netloc, self._path, self._query, self._fragment = val
- self._cache = {}
- def _cache_netloc(self) -> None:
- """Cache the netloc parts of the URL."""
- c = self._cache
- split_loc = split_netloc(self._netloc)
- c["raw_user"], c["raw_password"], c["raw_host"], c["explicit_port"] = split_loc
- def is_absolute(self) -> bool:
- """A check for absolute URLs.
- Return True for absolute ones (having scheme or starting
- with //), False otherwise.
- Is is preferred to call the .absolute property instead
- as it is cached.
- """
- return self.absolute
- def is_default_port(self) -> bool:
- """A check for default port.
- Return True if port is default for specified scheme,
- e.g. 'http://python.org' or 'http://python.org:80', False
- otherwise.
- Return False for relative URLs.
- """
- if (explicit := self.explicit_port) is None:
- # If the explicit port is None, then the URL must be
- # using the default port unless its a relative URL
- # which does not have an implicit port / default port
- return self._netloc != ""
- return explicit == DEFAULT_PORTS.get(self._scheme)
- def origin(self) -> "URL":
- """Return an URL with scheme, host and port parts only.
- user, password, path, query and fragment are removed.
- """
- # TODO: add a keyword-only option for keeping user/pass maybe?
- return self._origin
- @cached_property
- def _val(self) -> SplitURLType:
- return (self._scheme, self._netloc, self._path, self._query, self._fragment)
- @cached_property
- def _origin(self) -> "URL":
- """Return an URL with scheme, host and port parts only.
- user, password, path, query and fragment are removed.
- """
- if not (netloc := self._netloc):
- raise ValueError("URL should be absolute")
- if not (scheme := self._scheme):
- raise ValueError("URL should have scheme")
- if "@" in netloc:
- encoded_host = self.host_subcomponent
- netloc = make_netloc(None, None, encoded_host, self.explicit_port)
- elif not self._path and not self._query and not self._fragment:
- return self
- return from_parts(scheme, netloc, "", "", "")
- def relative(self) -> "URL":
- """Return a relative part of the URL.
- scheme, user, password, host and port are removed.
- """
- if not self._netloc:
- raise ValueError("URL should be absolute")
- return from_parts("", "", self._path, self._query, self._fragment)
- @cached_property
- def absolute(self) -> bool:
- """A check for absolute URLs.
- Return True for absolute ones (having scheme or starting
- with //), False otherwise.
- """
- # `netloc`` is an empty string for relative URLs
- # Checking `netloc` is faster than checking `hostname`
- # because `hostname` is a property that does some extra work
- # to parse the host from the `netloc`
- return self._netloc != ""
- @cached_property
- def scheme(self) -> str:
- """Scheme for absolute URLs.
- Empty string for relative URLs or URLs starting with //
- """
- return self._scheme
- @cached_property
- def raw_authority(self) -> str:
- """Encoded authority part of URL.
- Empty string for relative URLs.
- """
- return self._netloc
- @cached_property
- def authority(self) -> str:
- """Decoded authority part of URL.
- Empty string for relative URLs.
- """
- return make_netloc(self.user, self.password, self.host, self.port)
- @cached_property
- def raw_user(self) -> Union[str, None]:
- """Encoded user part of URL.
- None if user is missing.
- """
- # not .username
- self._cache_netloc()
- return self._cache["raw_user"]
- @cached_property
- def user(self) -> Union[str, None]:
- """Decoded user part of URL.
- None if user is missing.
- """
- if (raw_user := self.raw_user) is None:
- return None
- return UNQUOTER(raw_user)
- @cached_property
- def raw_password(self) -> Union[str, None]:
- """Encoded password part of URL.
- None if password is missing.
- """
- self._cache_netloc()
- return self._cache["raw_password"]
- @cached_property
- def password(self) -> Union[str, None]:
- """Decoded password part of URL.
- None if password is missing.
- """
- if (raw_password := self.raw_password) is None:
- return None
- return UNQUOTER(raw_password)
- @cached_property
- def raw_host(self) -> Union[str, None]:
- """Encoded host part of URL.
- None for relative URLs.
- When working with IPv6 addresses, use the `host_subcomponent` property instead
- as it will return the host subcomponent with brackets.
- """
- # Use host instead of hostname for sake of shortness
- # May add .hostname prop later
- self._cache_netloc()
- return self._cache["raw_host"]
- @cached_property
- def host(self) -> Union[str, None]:
- """Decoded host part of URL.
- None for relative URLs.
- """
- if (raw := self.raw_host) is None:
- return None
- if raw and raw[-1].isdigit() or ":" in raw:
- # IP addresses are never IDNA encoded
- return raw
- return _idna_decode(raw)
- @cached_property
- def host_subcomponent(self) -> Union[str, None]:
- """Return the host subcomponent part of URL.
- None for relative URLs.
- https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
- `IP-literal = "[" ( IPv6address / IPvFuture ) "]"`
- Examples:
- - `http://example.com:8080` -> `example.com`
- - `http://example.com:80` -> `example.com`
- - `https://127.0.0.1:8443` -> `127.0.0.1`
- - `https://[::1]:8443` -> `[::1]`
- - `http://[::1]` -> `[::1]`
- """
- if (raw := self.raw_host) is None:
- return None
- return f"[{raw}]" if ":" in raw else raw
- @cached_property
- def host_port_subcomponent(self) -> Union[str, None]:
- """Return the host and port subcomponent part of URL.
- Trailing dots are removed from the host part.
- This value is suitable for use in the Host header of an HTTP request.
- None for relative URLs.
- https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
- `IP-literal = "[" ( IPv6address / IPvFuture ) "]"`
- https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.3
- port = *DIGIT
- Examples:
- - `http://example.com:8080` -> `example.com:8080`
- - `http://example.com:80` -> `example.com`
- - `http://example.com.:80` -> `example.com`
- - `https://127.0.0.1:8443` -> `127.0.0.1:8443`
- - `https://[::1]:8443` -> `[::1]:8443`
- - `http://[::1]` -> `[::1]`
- """
- if (raw := self.raw_host) is None:
- return None
- if raw[-1] == ".":
- # Remove all trailing dots from the netloc as while
- # they are valid FQDNs in DNS, TLS validation fails.
- # See https://github.com/aio-libs/aiohttp/issues/3636.
- # To avoid string manipulation we only call rstrip if
- # the last character is a dot.
- raw = raw.rstrip(".")
- port = self.explicit_port
- if port is None or port == DEFAULT_PORTS.get(self._scheme):
- return f"[{raw}]" if ":" in raw else raw
- return f"[{raw}]:{port}" if ":" in raw else f"{raw}:{port}"
- @cached_property
- def port(self) -> Union[int, None]:
- """Port part of URL, with scheme-based fallback.
- None for relative URLs or URLs without explicit port and
- scheme without default port substitution.
- """
- if (explicit_port := self.explicit_port) is not None:
- return explicit_port
- return DEFAULT_PORTS.get(self._scheme)
- @cached_property
- def explicit_port(self) -> Union[int, None]:
- """Port part of URL, without scheme-based fallback.
- None for relative URLs or URLs without explicit port.
- """
- self._cache_netloc()
- return self._cache["explicit_port"]
- @cached_property
- def raw_path(self) -> str:
- """Encoded path of URL.
- / for absolute URLs without path part.
- """
- return self._path if self._path or not self._netloc else "/"
- @cached_property
- def path(self) -> str:
- """Decoded path of URL.
- / for absolute URLs without path part.
- """
- return PATH_UNQUOTER(self._path) if self._path else "/" if self._netloc else ""
- @cached_property
- def path_safe(self) -> str:
- """Decoded path of URL.
- / for absolute URLs without path part.
- / (%2F) and % (%25) are not decoded
- """
- if self._path:
- return PATH_SAFE_UNQUOTER(self._path)
- return "/" if self._netloc else ""
- @cached_property
- def _parsed_query(self) -> list[tuple[str, str]]:
- """Parse query part of URL."""
- return query_to_pairs(self._query)
- @cached_property
- def query(self) -> "MultiDictProxy[str]":
- """A MultiDictProxy representing parsed query parameters in decoded
- representation.
- Empty value if URL has no query part.
- """
- return MultiDictProxy(MultiDict(self._parsed_query))
- @cached_property
- def raw_query_string(self) -> str:
- """Encoded query part of URL.
- Empty string if query is missing.
- """
- return self._query
- @cached_property
- def query_string(self) -> str:
- """Decoded query part of URL.
- Empty string if query is missing.
- """
- return QS_UNQUOTER(self._query) if self._query else ""
- @cached_property
- def path_qs(self) -> str:
- """Decoded path of URL with query."""
- return self.path if not (q := self.query_string) else f"{self.path}?{q}"
- @cached_property
- def raw_path_qs(self) -> str:
- """Encoded path of URL with query."""
- if q := self._query:
- return f"{self._path}?{q}" if self._path or not self._netloc else f"/?{q}"
- return self._path if self._path or not self._netloc else "/"
- @cached_property
- def raw_fragment(self) -> str:
- """Encoded fragment part of URL.
- Empty string if fragment is missing.
- """
- return self._fragment
- @cached_property
- def fragment(self) -> str:
- """Decoded fragment part of URL.
- Empty string if fragment is missing.
- """
- return UNQUOTER(self._fragment) if self._fragment else ""
- @cached_property
- def raw_parts(self) -> tuple[str, ...]:
- """A tuple containing encoded *path* parts.
- ('/',) for absolute URLs if *path* is missing.
- """
- path = self._path
- if self._netloc:
- return ("/", *path[1:].split("/")) if path else ("/",)
- if path and path[0] == "/":
- return ("/", *path[1:].split("/"))
- return tuple(path.split("/"))
- @cached_property
- def parts(self) -> tuple[str, ...]:
- """A tuple containing decoded *path* parts.
- ('/',) for absolute URLs if *path* is missing.
- """
- return tuple(UNQUOTER(part) for part in self.raw_parts)
- @cached_property
- def parent(self) -> "URL":
- """A new URL with last part of path removed and cleaned up query and
- fragment.
- """
- path = self._path
- if not path or path == "/":
- if self._fragment or self._query:
- return from_parts(self._scheme, self._netloc, path, "", "")
- return self
- parts = path.split("/")
- return from_parts(self._scheme, self._netloc, "/".join(parts[:-1]), "", "")
- @cached_property
- def raw_name(self) -> str:
- """The last part of raw_parts."""
- parts = self.raw_parts
- if not self._netloc:
- return parts[-1]
- parts = parts[1:]
- return parts[-1] if parts else ""
- @cached_property
- def name(self) -> str:
- """The last part of parts."""
- return UNQUOTER(self.raw_name)
- @cached_property
- def raw_suffix(self) -> str:
- name = self.raw_name
- i = name.rfind(".")
- return name[i:] if 0 < i < len(name) - 1 else ""
- @cached_property
- def suffix(self) -> str:
- return UNQUOTER(self.raw_suffix)
- @cached_property
- def raw_suffixes(self) -> tuple[str, ...]:
- name = self.raw_name
- if name.endswith("."):
- return ()
- name = name.lstrip(".")
- return tuple("." + suffix for suffix in name.split(".")[1:])
- @cached_property
- def suffixes(self) -> tuple[str, ...]:
- return tuple(UNQUOTER(suffix) for suffix in self.raw_suffixes)
- def _make_child(self, paths: "Sequence[str]", encoded: bool = False) -> "URL":
- """
- add paths to self._path, accounting for absolute vs relative paths,
- keep existing, but do not create new, empty segments
- """
- parsed: list[str] = []
- needs_normalize: bool = False
- for idx, path in enumerate(reversed(paths)):
- # empty segment of last is not removed
- last = idx == 0
- if path and path[0] == "/":
- raise ValueError(
- f"Appending path {path!r} starting from slash is forbidden"
- )
- # We need to quote the path if it is not already encoded
- # This cannot be done at the end because the existing
- # path is already quoted and we do not want to double quote
- # the existing path.
- path = path if encoded else PATH_QUOTER(path)
- needs_normalize |= "." in path
- segments = path.split("/")
- segments.reverse()
- # remove trailing empty segment for all but the last path
- parsed += segments[1:] if not last and segments[0] == "" else segments
- if (path := self._path) and (old_segments := path.split("/")):
- # If the old path ends with a slash, the last segment is an empty string
- # and should be removed before adding the new path segments.
- old = old_segments[:-1] if old_segments[-1] == "" else old_segments
- old.reverse()
- parsed += old
- # If the netloc is present, inject a leading slash when adding a
- # path to an absolute URL where there was none before.
- if (netloc := self._netloc) and parsed and parsed[-1] != "":
- parsed.append("")
- parsed.reverse()
- if not netloc or not needs_normalize:
- return from_parts(self._scheme, netloc, "/".join(parsed), "", "")
- path = "/".join(normalize_path_segments(parsed))
- # If normalizing the path segments removed the leading slash, add it back.
- if path and path[0] != "/":
- path = f"/{path}"
- return from_parts(self._scheme, netloc, path, "", "")
- def with_scheme(self, scheme: str) -> "URL":
- """Return a new URL with scheme replaced."""
- # N.B. doesn't cleanup query/fragment
- if not isinstance(scheme, str):
- raise TypeError("Invalid scheme type")
- lower_scheme = scheme.lower()
- netloc = self._netloc
- if not netloc and lower_scheme in SCHEME_REQUIRES_HOST:
- msg = (
- "scheme replacement is not allowed for "
- f"relative URLs for the {lower_scheme} scheme"
- )
- raise ValueError(msg)
- return from_parts(lower_scheme, netloc, self._path, self._query, self._fragment)
- def with_user(self, user: Union[str, None]) -> "URL":
- """Return a new URL with user replaced.
- Autoencode user if needed.
- Clear user/password if user is None.
- """
- # N.B. doesn't cleanup query/fragment
- if user is None:
- password = None
- elif isinstance(user, str):
- user = QUOTER(user)
- password = self.raw_password
- else:
- raise TypeError("Invalid user type")
- if not (netloc := self._netloc):
- raise ValueError("user replacement is not allowed for relative URLs")
- encoded_host = self.host_subcomponent or ""
- netloc = make_netloc(user, password, encoded_host, self.explicit_port)
- return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
- def with_password(self, password: Union[str, None]) -> "URL":
- """Return a new URL with password replaced.
- Autoencode password if needed.
- Clear password if argument is None.
- """
- # N.B. doesn't cleanup query/fragment
- if password is None:
- pass
- elif isinstance(password, str):
- password = QUOTER(password)
- else:
- raise TypeError("Invalid password type")
- if not (netloc := self._netloc):
- raise ValueError("password replacement is not allowed for relative URLs")
- encoded_host = self.host_subcomponent or ""
- port = self.explicit_port
- netloc = make_netloc(self.raw_user, password, encoded_host, port)
- return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
- def with_host(self, host: str) -> "URL":
- """Return a new URL with host replaced.
- Autoencode host if needed.
- Changing host for relative URLs is not allowed, use .join()
- instead.
- """
- # N.B. doesn't cleanup query/fragment
- if not isinstance(host, str):
- raise TypeError("Invalid host type")
- if not (netloc := self._netloc):
- raise ValueError("host replacement is not allowed for relative URLs")
- if not host:
- raise ValueError("host removing is not allowed")
- encoded_host = _encode_host(host, validate_host=True) if host else ""
- port = self.explicit_port
- netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port)
- return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
- def with_port(self, port: Union[int, None]) -> "URL":
- """Return a new URL with port replaced.
- Clear port to default if None is passed.
- """
- # N.B. doesn't cleanup query/fragment
- if port is not None:
- if isinstance(port, bool) or not isinstance(port, int):
- raise TypeError(f"port should be int or None, got {type(port)}")
- if not (0 <= port <= 65535):
- raise ValueError(f"port must be between 0 and 65535, got {port}")
- if not (netloc := self._netloc):
- raise ValueError("port replacement is not allowed for relative URLs")
- encoded_host = self.host_subcomponent or ""
- netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port)
- return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
- def with_path(
- self,
- path: str,
- *,
- encoded: bool = False,
- keep_query: bool = False,
- keep_fragment: bool = False,
- ) -> "URL":
- """Return a new URL with path replaced."""
- netloc = self._netloc
- if not encoded:
- path = PATH_QUOTER(path)
- if netloc:
- path = normalize_path(path) if "." in path else path
- if path and path[0] != "/":
- path = f"/{path}"
- query = self._query if keep_query else ""
- fragment = self._fragment if keep_fragment else ""
- return from_parts(self._scheme, netloc, path, query, fragment)
- @overload
- def with_query(self, query: Query) -> "URL": ...
- @overload
- def with_query(self, **kwargs: QueryVariable) -> "URL": ...
- def with_query(self, *args: Any, **kwargs: Any) -> "URL":
- """Return a new URL with query part replaced.
- Accepts any Mapping (e.g. dict, multidict.MultiDict instances)
- or str, autoencode the argument if needed.
- A sequence of (key, value) pairs is supported as well.
- It also can take an arbitrary number of keyword arguments.
- Clear query if None is passed.
- """
- # N.B. doesn't cleanup query/fragment
- query = get_str_query(*args, **kwargs) or ""
- return from_parts_uncached(
- self._scheme, self._netloc, self._path, query, self._fragment
- )
- @overload
- def extend_query(self, query: Query) -> "URL": ...
- @overload
- def extend_query(self, **kwargs: QueryVariable) -> "URL": ...
- def extend_query(self, *args: Any, **kwargs: Any) -> "URL":
- """Return a new URL with query part combined with the existing.
- This method will not remove existing query parameters.
- Example:
- >>> url = URL('http://example.com/?a=1&b=2')
- >>> url.extend_query(a=3, c=4)
- URL('http://example.com/?a=1&b=2&a=3&c=4')
- """
- if not (new_query := get_str_query(*args, **kwargs)):
- return self
- if query := self._query:
- # both strings are already encoded so we can use a simple
- # string join
- query += new_query if query[-1] == "&" else f"&{new_query}"
- else:
- query = new_query
- return from_parts_uncached(
- self._scheme, self._netloc, self._path, query, self._fragment
- )
- @overload
- def update_query(self, query: Query) -> "URL": ...
- @overload
- def update_query(self, **kwargs: QueryVariable) -> "URL": ...
- def update_query(self, *args: Any, **kwargs: Any) -> "URL":
- """Return a new URL with query part updated.
- This method will overwrite existing query parameters.
- Example:
- >>> url = URL('http://example.com/?a=1&b=2')
- >>> url.update_query(a=3, c=4)
- URL('http://example.com/?a=3&b=2&c=4')
- """
- in_query: Union[str, Mapping[str, QueryVariable], None]
- if kwargs:
- if args:
- msg = "Either kwargs or single query parameter must be present"
- raise ValueError(msg)
- in_query = kwargs
- elif len(args) == 1:
- in_query = args[0]
- else:
- raise ValueError("Either kwargs or single query parameter must be present")
- if in_query is None:
- query = ""
- elif not in_query:
- query = self._query
- elif isinstance(in_query, Mapping):
- qm: MultiDict[QueryVariable] = MultiDict(self._parsed_query)
- qm.update(in_query)
- query = get_str_query_from_sequence_iterable(qm.items())
- elif isinstance(in_query, str):
- qstr: MultiDict[str] = MultiDict(self._parsed_query)
- qstr.update(query_to_pairs(in_query))
- query = get_str_query_from_iterable(qstr.items())
- elif isinstance(in_query, (bytes, bytearray, memoryview)): # type: ignore[unreachable]
- msg = "Invalid query type: bytes, bytearray and memoryview are forbidden"
- raise TypeError(msg)
- elif isinstance(in_query, Sequence):
- # We don't expect sequence values if we're given a list of pairs
- # already; only mappings like builtin `dict` which can't have the
- # same key pointing to multiple values are allowed to use
- # `_query_seq_pairs`.
- qs: MultiDict[SimpleQuery] = MultiDict(self._parsed_query)
- qs.update(in_query)
- query = get_str_query_from_iterable(qs.items())
- else:
- raise TypeError(
- "Invalid query type: only str, mapping or "
- "sequence of (key, value) pairs is allowed"
- )
- return from_parts_uncached(
- self._scheme, self._netloc, self._path, query, self._fragment
- )
- def without_query_params(self, *query_params: str) -> "URL":
- """Remove some keys from query part and return new URL."""
- params_to_remove = set(query_params) & self.query.keys()
- if not params_to_remove:
- return self
- return self.with_query(
- tuple(
- (name, value)
- for name, value in self.query.items()
- if name not in params_to_remove
- )
- )
- def with_fragment(self, fragment: Union[str, None]) -> "URL":
- """Return a new URL with fragment replaced.
- Autoencode fragment if needed.
- Clear fragment to default if None is passed.
- """
- # N.B. doesn't cleanup query/fragment
- if fragment is None:
- raw_fragment = ""
- elif not isinstance(fragment, str):
- raise TypeError("Invalid fragment type")
- else:
- raw_fragment = FRAGMENT_QUOTER(fragment)
- if self._fragment == raw_fragment:
- return self
- return from_parts(
- self._scheme, self._netloc, self._path, self._query, raw_fragment
- )
- def with_name(
- self,
- name: str,
- *,
- keep_query: bool = False,
- keep_fragment: bool = False,
- ) -> "URL":
- """Return a new URL with name (last part of path) replaced.
- Query and fragment parts are cleaned up.
- Name is encoded if needed.
- """
- # N.B. DOES cleanup query/fragment
- if not isinstance(name, str):
- raise TypeError("Invalid name type")
- if "/" in name:
- raise ValueError("Slash in name is not allowed")
- name = PATH_QUOTER(name)
- if name in (".", ".."):
- raise ValueError(". and .. values are forbidden")
- parts = list(self.raw_parts)
- if netloc := self._netloc:
- if len(parts) == 1:
- parts.append(name)
- else:
- parts[-1] = name
- parts[0] = "" # replace leading '/'
- else:
- parts[-1] = name
- if parts[0] == "/":
- parts[0] = "" # replace leading '/'
- query = self._query if keep_query else ""
- fragment = self._fragment if keep_fragment else ""
- return from_parts(self._scheme, netloc, "/".join(parts), query, fragment)
- def with_suffix(
- self,
- suffix: str,
- *,
- keep_query: bool = False,
- keep_fragment: bool = False,
- ) -> "URL":
- """Return a new URL with suffix (file extension of name) replaced.
- Query and fragment parts are cleaned up.
- suffix is encoded if needed.
- """
- if not isinstance(suffix, str):
- raise TypeError("Invalid suffix type")
- if suffix and not suffix[0] == "." or suffix == "." or "/" in suffix:
- raise ValueError(f"Invalid suffix {suffix!r}")
- name = self.raw_name
- if not name:
- raise ValueError(f"{self!r} has an empty name")
- old_suffix = self.raw_suffix
- suffix = PATH_QUOTER(suffix)
- name = name + suffix if not old_suffix else name[: -len(old_suffix)] + suffix
- if name in (".", ".."):
- raise ValueError(". and .. values are forbidden")
- parts = list(self.raw_parts)
- if netloc := self._netloc:
- if len(parts) == 1:
- parts.append(name)
- else:
- parts[-1] = name
- parts[0] = "" # replace leading '/'
- else:
- parts[-1] = name
- if parts[0] == "/":
- parts[0] = "" # replace leading '/'
- query = self._query if keep_query else ""
- fragment = self._fragment if keep_fragment else ""
- return from_parts(self._scheme, netloc, "/".join(parts), query, fragment)
- def join(self, url: "URL") -> "URL":
- """Join URLs
- Construct a full (“absolute”) URL by combining a “base URL”
- (self) with another URL (url).
- Informally, this uses components of the base URL, in
- particular the addressing scheme, the network location and
- (part of) the path, to provide missing components in the
- relative URL.
- """
- if type(url) is not URL:
- raise TypeError("url should be URL")
- scheme = url._scheme or self._scheme
- if scheme != self._scheme or scheme not in USES_RELATIVE:
- return url
- # scheme is in uses_authority as uses_authority is a superset of uses_relative
- if (join_netloc := url._netloc) and scheme in USES_AUTHORITY:
- return from_parts(scheme, join_netloc, url._path, url._query, url._fragment)
- orig_path = self._path
- if join_path := url._path:
- if join_path[0] == "/":
- path = join_path
- elif not orig_path:
- path = f"/{join_path}"
- elif orig_path[-1] == "/":
- path = f"{orig_path}{join_path}"
- else:
- # …
- # and relativizing ".."
- # parts[0] is / for absolute urls,
- # this join will add a double slash there
- path = "/".join([*self.parts[:-1], ""]) + join_path
- # which has to be removed
- if orig_path[0] == "/":
- path = path[1:]
- path = normalize_path(path) if "." in path else path
- else:
- path = orig_path
- return from_parts(
- scheme,
- self._netloc,
- path,
- url._query if join_path or url._query else self._query,
- url._fragment if join_path or url._fragment else self._fragment,
- )
- def joinpath(self, *other: str, encoded: bool = False) -> "URL":
- """Return a new URL with the elements in other appended to the path."""
- return self._make_child(other, encoded=encoded)
- def human_repr(self) -> str:
- """Return decoded human readable string for URL representation."""
- user = human_quote(self.user, "#/:?@[]")
- password = human_quote(self.password, "#/:?@[]")
- if (host := self.host) and ":" in host:
- host = f"[{host}]"
- path = human_quote(self.path, "#?")
- if TYPE_CHECKING:
- assert path is not None
- query_string = "&".join(
- "{}={}".format(human_quote(k, "#&+;="), human_quote(v, "#&+;="))
- for k, v in self.query.items()
- )
- fragment = human_quote(self.fragment, "")
- if TYPE_CHECKING:
- assert fragment is not None
- netloc = make_netloc(user, password, host, self.explicit_port)
- return unsplit_result(self._scheme, netloc, path, query_string, fragment)
- _DEFAULT_IDNA_SIZE = 256
- _DEFAULT_ENCODE_SIZE = 512
- @lru_cache(_DEFAULT_IDNA_SIZE)
- def _idna_decode(raw: str) -> str:
- try:
- return idna.decode(raw.encode("ascii"))
- except UnicodeError: # e.g. '::1'
- return raw.encode("ascii").decode("idna")
- @lru_cache(_DEFAULT_IDNA_SIZE)
- def _idna_encode(host: str) -> str:
- try:
- return idna.encode(host, uts46=True).decode("ascii")
- except UnicodeError:
- return host.encode("idna").decode("ascii")
- @lru_cache(_DEFAULT_ENCODE_SIZE)
- def _encode_host(host: str, validate_host: bool) -> str:
- """Encode host part of URL."""
- # If the host ends with a digit or contains a colon, its likely
- # an IP address.
- if host and (host[-1].isdigit() or ":" in host):
- raw_ip, sep, zone = host.partition("%")
- # If it looks like an IP, we check with _ip_compressed_version
- # and fall-through if its not an IP address. This is a performance
- # optimization to avoid parsing IP addresses as much as possible
- # because it is orders of magnitude slower than almost any other
- # operation this library does.
- # Might be an IP address, check it
- #
- # IP Addresses can look like:
- # https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
- # - 127.0.0.1 (last character is a digit)
- # - 2001:db8::ff00:42:8329 (contains a colon)
- # - 2001:db8::ff00:42:8329%eth0 (contains a colon)
- # - [2001:db8::ff00:42:8329] (contains a colon -- brackets should
- # have been removed before it gets here)
- # Rare IP Address formats are not supported per:
- # https://datatracker.ietf.org/doc/html/rfc3986#section-7.4
- #
- # IP parsing is slow, so its wrapped in an LRU
- try:
- ip = ip_address(raw_ip)
- except ValueError:
- pass
- else:
- # These checks should not happen in the
- # LRU to keep the cache size small
- host = ip.compressed
- if ip.version == 6:
- return f"[{host}%{zone}]" if sep else f"[{host}]"
- return f"{host}%{zone}" if sep else host
- # IDNA encoding is slow, skip it for ASCII-only strings
- if host.isascii():
- # Check for invalid characters explicitly; _idna_encode() does this
- # for non-ascii host names.
- host = host.lower()
- if validate_host and (invalid := NOT_REG_NAME.search(host)):
- value, pos, extra = invalid.group(), invalid.start(), ""
- if value == "@" or (value == ":" and "@" in host[pos:]):
- # this looks like an authority string
- extra = (
- ", if the value includes a username or password, "
- "use 'authority' instead of 'host'"
- )
- raise ValueError(
- f"Host {host!r} cannot contain {value!r} (at position {pos}){extra}"
- ) from None
- return host
- return _idna_encode(host)
- @rewrite_module
- def cache_clear() -> None:
- """Clear all LRU caches."""
- _idna_encode.cache_clear()
- _idna_decode.cache_clear()
- _encode_host.cache_clear()
- @rewrite_module
- def cache_info() -> CacheInfo:
- """Report cache statistics."""
- return {
- "idna_encode": _idna_encode.cache_info(),
- "idna_decode": _idna_decode.cache_info(),
- "ip_address": _encode_host.cache_info(),
- "host_validate": _encode_host.cache_info(),
- "encode_host": _encode_host.cache_info(),
- }
- @rewrite_module
- def cache_configure(
- *,
- idna_encode_size: Union[int, None] = _DEFAULT_IDNA_SIZE,
- idna_decode_size: Union[int, None] = _DEFAULT_IDNA_SIZE,
- ip_address_size: Union[int, None, UndefinedType] = UNDEFINED,
- host_validate_size: Union[int, None, UndefinedType] = UNDEFINED,
- encode_host_size: Union[int, None, UndefinedType] = UNDEFINED,
- ) -> None:
- """Configure LRU cache sizes."""
- global _idna_decode, _idna_encode, _encode_host
- # ip_address_size, host_validate_size are no longer
- # used, but are kept for backwards compatibility.
- if ip_address_size is not UNDEFINED or host_validate_size is not UNDEFINED:
- warnings.warn(
- "cache_configure() no longer accepts the "
- "ip_address_size or host_validate_size arguments, "
- "they are used to set the encode_host_size instead "
- "and will be removed in the future",
- DeprecationWarning,
- stacklevel=2,
- )
- if encode_host_size is not None:
- for size in (ip_address_size, host_validate_size):
- if size is None:
- encode_host_size = None
- elif encode_host_size is UNDEFINED:
- if size is not UNDEFINED:
- encode_host_size = size
- elif size is not UNDEFINED:
- if TYPE_CHECKING:
- assert isinstance(size, int)
- assert isinstance(encode_host_size, int)
- encode_host_size = max(size, encode_host_size)
- if encode_host_size is UNDEFINED:
- encode_host_size = _DEFAULT_ENCODE_SIZE
- _encode_host = lru_cache(encode_host_size)(_encode_host.__wrapped__)
- _idna_decode = lru_cache(idna_decode_size)(_idna_decode.__wrapped__)
- _idna_encode = lru_cache(idna_encode_size)(_idna_encode.__wrapped__)
|