_multidict_py.py 27 KB


  1. import enum
  2. import reprlib
  3. import sys
  4. from abc import abstractmethod
  5. from array import array
  6. from collections.abc import (
  7. Callable,
  8. ItemsView,
  9. Iterable,
  10. Iterator,
  11. KeysView,
  12. Mapping,
  13. ValuesView,
  14. )
  15. from typing import (
  16. TYPE_CHECKING,
  17. Any,
  18. Generic,
  19. NoReturn,
  20. Optional,
  21. TypeVar,
  22. Union,
  23. cast,
  24. overload,
  25. )
  26. from ._abc import MDArg, MultiMapping, MutableMultiMapping, SupportsKeys
  27. if sys.version_info >= (3, 11):
  28. from typing import Self
  29. else:
  30. from typing_extensions import Self
  31. class istr(str):
  32. """Case insensitive str."""
  33. __is_istr__ = True
  34. __istr_title__: Optional[str] = None
  35. _V = TypeVar("_V")
  36. _T = TypeVar("_T")
  37. _SENTINEL = enum.Enum("_SENTINEL", "sentinel")
  38. sentinel = _SENTINEL.sentinel
  39. _version = array("Q", [0])
  40. class _Impl(Generic[_V]):
  41. __slots__ = ("_items", "_version")
  42. def __init__(self) -> None:
  43. self._items: list[tuple[str, str, _V]] = []
  44. self.incr_version()
  45. def incr_version(self) -> None:
  46. global _version
  47. v = _version
  48. v[0] += 1
  49. self._version = v[0]
  50. if sys.implementation.name != "pypy":
  51. def __sizeof__(self) -> int:
  52. return object.__sizeof__(self) + sys.getsizeof(self._items)
  53. class _Iter(Generic[_T]):
  54. __slots__ = ("_size", "_iter")
  55. def __init__(self, size: int, iterator: Iterator[_T]):
  56. self._size = size
  57. self._iter = iterator
  58. def __iter__(self) -> Self:
  59. return self
  60. def __next__(self) -> _T:
  61. return next(self._iter)
  62. def __length_hint__(self) -> int:
  63. return self._size
  64. class _ViewBase(Generic[_V]):
  65. def __init__(
  66. self,
  67. impl: _Impl[_V],
  68. identfunc: Callable[[str], str],
  69. keyfunc: Callable[[str], str],
  70. ):
  71. self._impl = impl
  72. self._identfunc = identfunc
  73. self._keyfunc = keyfunc
  74. def __len__(self) -> int:
  75. return len(self._impl._items)
  76. class _ItemsView(_ViewBase[_V], ItemsView[str, _V]):
  77. def __contains__(self, item: object) -> bool:
  78. if not isinstance(item, (tuple, list)) or len(item) != 2:
  79. return False
  80. key, value = item
  81. try:
  82. ident = self._identfunc(key)
  83. except TypeError:
  84. return False
  85. for i, k, v in self._impl._items:
  86. if ident == i and value == v:
  87. return True
  88. return False
  89. def __iter__(self) -> _Iter[tuple[str, _V]]:
  90. return _Iter(len(self), self._iter(self._impl._version))
  91. def _iter(self, version: int) -> Iterator[tuple[str, _V]]:
  92. for i, k, v in self._impl._items:
  93. if version != self._impl._version:
  94. raise RuntimeError("Dictionary changed during iteration")
  95. yield self._keyfunc(k), v
  96. @reprlib.recursive_repr()
  97. def __repr__(self) -> str:
  98. lst = []
  99. for i, k, v in self._impl._items:
  100. lst.append(f"'{k}': {v!r}")
  101. body = ", ".join(lst)
  102. return f"<{self.__class__.__name__}({body})>"
  103. def _parse_item(
  104. self, arg: Union[tuple[str, _V], _T]
  105. ) -> Optional[tuple[str, str, _V]]:
  106. if not isinstance(arg, tuple):
  107. return None
  108. if len(arg) != 2:
  109. return None
  110. try:
  111. return (self._identfunc(arg[0]), arg[0], arg[1])
  112. except TypeError:
  113. return None
  114. def _tmp_set(self, it: Iterable[_T]) -> set[tuple[str, _V]]:
  115. tmp = set()
  116. for arg in it:
  117. item = self._parse_item(arg)
  118. if item is None:
  119. continue
  120. else:
  121. tmp.add((item[0], item[2]))
  122. return tmp
  123. def __and__(self, other: Iterable[Any]) -> set[tuple[str, _V]]:
  124. ret = set()
  125. try:
  126. it = iter(other)
  127. except TypeError:
  128. return NotImplemented
  129. for arg in it:
  130. item = self._parse_item(arg)
  131. if item is None:
  132. continue
  133. identity, key, value = item
  134. for i, k, v in self._impl._items:
  135. if i == identity and v == value:
  136. ret.add((k, v))
  137. return ret
  138. def __rand__(self, other: Iterable[_T]) -> set[_T]:
  139. ret = set()
  140. try:
  141. it = iter(other)
  142. except TypeError:
  143. return NotImplemented
  144. for arg in it:
  145. item = self._parse_item(arg)
  146. if item is None:
  147. continue
  148. identity, key, value = item
  149. for i, k, v in self._impl._items:
  150. if i == identity and v == value:
  151. ret.add(arg)
  152. break
  153. return ret
  154. def __or__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]:
  155. ret: set[Union[tuple[str, _V], _T]] = set(self)
  156. try:
  157. it = iter(other)
  158. except TypeError:
  159. return NotImplemented
  160. for arg in it:
  161. item: Optional[tuple[str, str, _V]] = self._parse_item(arg)
  162. if item is None:
  163. ret.add(arg)
  164. continue
  165. identity, key, value = item
  166. for i, k, v in self._impl._items:
  167. if i == identity and v == value:
  168. break
  169. else:
  170. ret.add(arg)
  171. return ret
  172. def __ror__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]:
  173. try:
  174. ret: set[Union[tuple[str, _V], _T]] = set(other)
  175. except TypeError:
  176. return NotImplemented
  177. tmp = self._tmp_set(ret)
  178. for i, k, v in self._impl._items:
  179. if (i, v) not in tmp:
  180. ret.add((k, v))
  181. return ret
  182. def __sub__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]:
  183. ret: set[Union[tuple[str, _V], _T]] = set()
  184. try:
  185. it = iter(other)
  186. except TypeError:
  187. return NotImplemented
  188. tmp = self._tmp_set(it)
  189. for i, k, v in self._impl._items:
  190. if (i, v) not in tmp:
  191. ret.add((k, v))
  192. return ret
  193. def __rsub__(self, other: Iterable[_T]) -> set[_T]:
  194. ret: set[_T] = set()
  195. try:
  196. it = iter(other)
  197. except TypeError:
  198. return NotImplemented
  199. for arg in it:
  200. item = self._parse_item(arg)
  201. if item is None:
  202. ret.add(arg)
  203. continue
  204. identity, key, value = item
  205. for i, k, v in self._impl._items:
  206. if i == identity and v == value:
  207. break
  208. else:
  209. ret.add(arg)
  210. return ret
  211. def __xor__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]:
  212. try:
  213. rgt = set(other)
  214. except TypeError:
  215. return NotImplemented
  216. ret: set[Union[tuple[str, _V], _T]] = self - rgt
  217. ret |= rgt - self
  218. return ret
  219. __rxor__ = __xor__
  220. def isdisjoint(self, other: Iterable[tuple[str, _V]]) -> bool:
  221. for arg in other:
  222. item = self._parse_item(arg)
  223. if item is None:
  224. continue
  225. identity, key, value = item
  226. for i, k, v in self._impl._items:
  227. if i == identity and v == value:
  228. return False
  229. return True
  230. class _ValuesView(_ViewBase[_V], ValuesView[_V]):
  231. def __contains__(self, value: object) -> bool:
  232. for i, k, v in self._impl._items:
  233. if v == value:
  234. return True
  235. return False
  236. def __iter__(self) -> _Iter[_V]:
  237. return _Iter(len(self), self._iter(self._impl._version))
  238. def _iter(self, version: int) -> Iterator[_V]:
  239. for i, k, v in self._impl._items:
  240. if version != self._impl._version:
  241. raise RuntimeError("Dictionary changed during iteration")
  242. yield v
  243. @reprlib.recursive_repr()
  244. def __repr__(self) -> str:
  245. lst = []
  246. for i, k, v in self._impl._items:
  247. lst.append(repr(v))
  248. body = ", ".join(lst)
  249. return f"<{self.__class__.__name__}({body})>"
  250. class _KeysView(_ViewBase[_V], KeysView[str]):
  251. def __contains__(self, key: object) -> bool:
  252. if not isinstance(key, str):
  253. return False
  254. identity = self._identfunc(key)
  255. for i, k, v in self._impl._items:
  256. if i == identity:
  257. return True
  258. return False
  259. def __iter__(self) -> _Iter[str]:
  260. return _Iter(len(self), self._iter(self._impl._version))
  261. def _iter(self, version: int) -> Iterator[str]:
  262. for i, k, v in self._impl._items:
  263. if version != self._impl._version:
  264. raise RuntimeError("Dictionary changed during iteration")
  265. yield self._keyfunc(k)
  266. def __repr__(self) -> str:
  267. lst = []
  268. for i, k, v in self._impl._items:
  269. lst.append(f"'{k}'")
  270. body = ", ".join(lst)
  271. return f"<{self.__class__.__name__}({body})>"
  272. def __and__(self, other: Iterable[object]) -> set[str]:
  273. ret = set()
  274. try:
  275. it = iter(other)
  276. except TypeError:
  277. return NotImplemented
  278. for key in it:
  279. if not isinstance(key, str):
  280. continue
  281. identity = self._identfunc(key)
  282. for i, k, v in self._impl._items:
  283. if i == identity:
  284. ret.add(k)
  285. return ret
  286. def __rand__(self, other: Iterable[_T]) -> set[_T]:
  287. ret = set()
  288. try:
  289. it = iter(other)
  290. except TypeError:
  291. return NotImplemented
  292. for key in it:
  293. if not isinstance(key, str):
  294. continue
  295. identity = self._identfunc(key)
  296. for i, k, v in self._impl._items:
  297. if i == identity:
  298. ret.add(key)
  299. return cast(set[_T], ret)
  300. def __or__(self, other: Iterable[_T]) -> set[Union[str, _T]]:
  301. ret: set[Union[str, _T]] = set(self)
  302. try:
  303. it = iter(other)
  304. except TypeError:
  305. return NotImplemented
  306. for key in it:
  307. if not isinstance(key, str):
  308. ret.add(key)
  309. continue
  310. identity = self._identfunc(key)
  311. for i, k, v in self._impl._items:
  312. if i == identity:
  313. break
  314. else:
  315. ret.add(key)
  316. return ret
  317. def __ror__(self, other: Iterable[_T]) -> set[Union[str, _T]]:
  318. try:
  319. ret: set[Union[str, _T]] = set(other)
  320. except TypeError:
  321. return NotImplemented
  322. tmp = set()
  323. for key in ret:
  324. if not isinstance(key, str):
  325. continue
  326. identity = self._identfunc(key)
  327. tmp.add(identity)
  328. for i, k, v in self._impl._items:
  329. if i not in tmp:
  330. ret.add(k)
  331. return ret
  332. def __sub__(self, other: Iterable[object]) -> set[str]:
  333. ret = set(self)
  334. try:
  335. it = iter(other)
  336. except TypeError:
  337. return NotImplemented
  338. for key in it:
  339. if not isinstance(key, str):
  340. continue
  341. identity = self._identfunc(key)
  342. for i, k, v in self._impl._items:
  343. if i == identity:
  344. ret.discard(k)
  345. break
  346. return ret
  347. def __rsub__(self, other: Iterable[_T]) -> set[_T]:
  348. try:
  349. ret: set[_T] = set(other)
  350. except TypeError:
  351. return NotImplemented
  352. for key in other:
  353. if not isinstance(key, str):
  354. continue
  355. identity = self._identfunc(key)
  356. for i, k, v in self._impl._items:
  357. if i == identity:
  358. ret.discard(key) # type: ignore[arg-type]
  359. break
  360. return ret
  361. def __xor__(self, other: Iterable[_T]) -> set[Union[str, _T]]:
  362. try:
  363. rgt = set(other)
  364. except TypeError:
  365. return NotImplemented
  366. ret: set[Union[str, _T]] = self - rgt # type: ignore[assignment]
  367. ret |= rgt - self
  368. return ret
  369. __rxor__ = __xor__
  370. def isdisjoint(self, other: Iterable[object]) -> bool:
  371. for key in other:
  372. if not isinstance(key, str):
  373. continue
  374. identity = self._identfunc(key)
  375. for i, k, v in self._impl._items:
  376. if i == identity:
  377. return False
  378. return True
  379. class _CSMixin:
  380. def _key(self, key: str) -> str:
  381. return key
  382. def _title(self, key: str) -> str:
  383. if isinstance(key, str):
  384. return key
  385. else:
  386. raise TypeError("MultiDict keys should be either str or subclasses of str")
  387. class _CIMixin:
  388. _ci: bool = True
  389. def _key(self, key: str) -> str:
  390. if type(key) is istr:
  391. return key
  392. else:
  393. return istr(key)
  394. def _title(self, key: str) -> str:
  395. if isinstance(key, istr):
  396. ret = key.__istr_title__
  397. if ret is None:
  398. ret = key.title()
  399. key.__istr_title__ = ret
  400. return ret
  401. if isinstance(key, str):
  402. return key.title()
  403. else:
  404. raise TypeError("MultiDict keys should be either str or subclasses of str")
  405. class _Base(MultiMapping[_V]):
  406. _impl: _Impl[_V]
  407. _ci: bool = False
  408. @abstractmethod
  409. def _key(self, key: str) -> str: ...
  410. @abstractmethod
  411. def _title(self, key: str) -> str: ...
  412. @overload
  413. def getall(self, key: str) -> list[_V]: ...
  414. @overload
  415. def getall(self, key: str, default: _T) -> Union[list[_V], _T]: ...
  416. def getall(
  417. self, key: str, default: Union[_T, _SENTINEL] = sentinel
  418. ) -> Union[list[_V], _T]:
  419. """Return a list of all values matching the key."""
  420. identity = self._title(key)
  421. res = [v for i, k, v in self._impl._items if i == identity]
  422. if res:
  423. return res
  424. if not res and default is not sentinel:
  425. return default
  426. raise KeyError("Key not found: %r" % key)
  427. @overload
  428. def getone(self, key: str) -> _V: ...
  429. @overload
  430. def getone(self, key: str, default: _T) -> Union[_V, _T]: ...
  431. def getone(
  432. self, key: str, default: Union[_T, _SENTINEL] = sentinel
  433. ) -> Union[_V, _T]:
  434. """Get first value matching the key.
  435. Raises KeyError if the key is not found and no default is provided.
  436. """
  437. identity = self._title(key)
  438. for i, k, v in self._impl._items:
  439. if i == identity:
  440. return v
  441. if default is not sentinel:
  442. return default
  443. raise KeyError("Key not found: %r" % key)
  444. # Mapping interface #
  445. def __getitem__(self, key: str) -> _V:
  446. return self.getone(key)
  447. @overload
  448. def get(self, key: str, /) -> Union[_V, None]: ...
  449. @overload
  450. def get(self, key: str, /, default: _T) -> Union[_V, _T]: ...
  451. def get(self, key: str, default: Union[_T, None] = None) -> Union[_V, _T, None]:
  452. """Get first value matching the key.
  453. If the key is not found, returns the default (or None if no default is provided)
  454. """
  455. return self.getone(key, default)
  456. def __iter__(self) -> Iterator[str]:
  457. return iter(self.keys())
  458. def __len__(self) -> int:
  459. return len(self._impl._items)
  460. def keys(self) -> KeysView[str]:
  461. """Return a new view of the dictionary's keys."""
  462. return _KeysView(self._impl, self._title, self._key)
  463. def items(self) -> ItemsView[str, _V]:
  464. """Return a new view of the dictionary's items *(key, value) pairs)."""
  465. return _ItemsView(self._impl, self._title, self._key)
  466. def values(self) -> _ValuesView[_V]:
  467. """Return a new view of the dictionary's values."""
  468. return _ValuesView(self._impl, self._title, self._key)
  469. def __eq__(self, other: object) -> bool:
  470. if not isinstance(other, Mapping):
  471. return NotImplemented
  472. if isinstance(other, _Base):
  473. lft = self._impl._items
  474. rht = other._impl._items
  475. if len(lft) != len(rht):
  476. return False
  477. for (i1, k2, v1), (i2, k2, v2) in zip(lft, rht):
  478. if i1 != i2 or v1 != v2:
  479. return False
  480. return True
  481. if len(self._impl._items) != len(other):
  482. return False
  483. for k, v in self.items():
  484. nv = other.get(k, sentinel)
  485. if v != nv:
  486. return False
  487. return True
  488. def __contains__(self, key: object) -> bool:
  489. if not isinstance(key, str):
  490. return False
  491. identity = self._title(key)
  492. for i, k, v in self._impl._items:
  493. if i == identity:
  494. return True
  495. return False
  496. @reprlib.recursive_repr()
  497. def __repr__(self) -> str:
  498. body = ", ".join(f"'{k}': {v!r}" for i, k, v in self._impl._items)
  499. return f"<{self.__class__.__name__}({body})>"
  500. class MultiDict(_CSMixin, _Base[_V], MutableMultiMapping[_V]):
  501. """Dictionary with the support for duplicate keys."""
  502. def __init__(self, arg: MDArg[_V] = None, /, **kwargs: _V):
  503. self._impl = _Impl()
  504. self._extend(arg, kwargs, self.__class__.__name__, self._extend_items)
  505. if sys.implementation.name != "pypy":
  506. def __sizeof__(self) -> int:
  507. return object.__sizeof__(self) + sys.getsizeof(self._impl)
  508. def __reduce__(self) -> tuple[type[Self], tuple[list[tuple[str, _V]]]]:
  509. return (self.__class__, (list(self.items()),))
  510. def add(self, key: str, value: _V) -> None:
  511. identity = self._title(key)
  512. self._impl._items.append((identity, key, value))
  513. self._impl.incr_version()
  514. def copy(self) -> Self:
  515. """Return a copy of itself."""
  516. cls = self.__class__
  517. return cls(self.items())
  518. __copy__ = copy
  519. def extend(self, arg: MDArg[_V] = None, /, **kwargs: _V) -> None:
  520. """Extend current MultiDict with more values.
  521. This method must be used instead of update.
  522. """
  523. self._extend(arg, kwargs, "extend", self._extend_items)
  524. def _extend(
  525. self,
  526. arg: MDArg[_V],
  527. kwargs: Mapping[str, _V],
  528. name: str,
  529. method: Callable[[list[tuple[str, str, _V]]], None],
  530. ) -> None:
  531. if arg:
  532. if isinstance(arg, (MultiDict, MultiDictProxy)):
  533. if self._ci is not arg._ci:
  534. items = [(self._title(k), k, v) for _, k, v in arg._impl._items]
  535. else:
  536. items = arg._impl._items
  537. if kwargs:
  538. items = items.copy()
  539. if kwargs:
  540. for key, value in kwargs.items():
  541. items.append((self._title(key), key, value))
  542. else:
  543. if hasattr(arg, "keys"):
  544. arg = cast(SupportsKeys[_V], arg)
  545. arg = [(k, arg[k]) for k in arg.keys()]
  546. if kwargs:
  547. arg = list(arg)
  548. arg.extend(list(kwargs.items()))
  549. items = []
  550. for pos, item in enumerate(arg):
  551. if not len(item) == 2:
  552. raise ValueError(
  553. f"multidict update sequence element #{pos}"
  554. f"has length {len(item)}; 2 is required"
  555. )
  556. items.append((self._title(item[0]), item[0], item[1]))
  557. method(items)
  558. else:
  559. method([(self._title(key), key, value) for key, value in kwargs.items()])
  560. def _extend_items(self, items: Iterable[tuple[str, str, _V]]) -> None:
  561. for identity, key, value in items:
  562. self._impl._items.append((identity, key, value))
  563. self._impl.incr_version()
  564. def clear(self) -> None:
  565. """Remove all items from MultiDict."""
  566. self._impl._items.clear()
  567. self._impl.incr_version()
  568. # Mapping interface #
  569. def __setitem__(self, key: str, value: _V) -> None:
  570. self._replace(key, value)
  571. def __delitem__(self, key: str) -> None:
  572. identity = self._title(key)
  573. items = self._impl._items
  574. found = False
  575. for i in range(len(items) - 1, -1, -1):
  576. if items[i][0] == identity:
  577. del items[i]
  578. found = True
  579. if not found:
  580. raise KeyError(key)
  581. else:
  582. self._impl.incr_version()
  583. @overload
  584. def setdefault(
  585. self: "MultiDict[Union[_T, None]]", key: str, default: None = None
  586. ) -> Union[_T, None]: ...
  587. @overload
  588. def setdefault(self, key: str, default: _V) -> _V: ...
  589. def setdefault(self, key: str, default: Union[_V, None] = None) -> Union[_V, None]: # type: ignore[misc]
  590. """Return value for key, set value to default if key is not present."""
  591. identity = self._title(key)
  592. for i, k, v in self._impl._items:
  593. if i == identity:
  594. return v
  595. self.add(key, default) # type: ignore[arg-type]
  596. return default
  597. @overload
  598. def popone(self, key: str) -> _V: ...
  599. @overload
  600. def popone(self, key: str, default: _T) -> Union[_V, _T]: ...
  601. def popone(
  602. self, key: str, default: Union[_T, _SENTINEL] = sentinel
  603. ) -> Union[_V, _T]:
  604. """Remove specified key and return the corresponding value.
  605. If key is not found, d is returned if given, otherwise
  606. KeyError is raised.
  607. """
  608. identity = self._title(key)
  609. for i in range(len(self._impl._items)):
  610. if self._impl._items[i][0] == identity:
  611. value = self._impl._items[i][2]
  612. del self._impl._items[i]
  613. self._impl.incr_version()
  614. return value
  615. if default is sentinel:
  616. raise KeyError(key)
  617. else:
  618. return default
  619. # Type checking will inherit signature for pop() if we don't confuse it here.
  620. if not TYPE_CHECKING:
  621. pop = popone
  622. @overload
  623. def popall(self, key: str) -> list[_V]: ...
  624. @overload
  625. def popall(self, key: str, default: _T) -> Union[list[_V], _T]: ...
  626. def popall(
  627. self, key: str, default: Union[_T, _SENTINEL] = sentinel
  628. ) -> Union[list[_V], _T]:
  629. """Remove all occurrences of key and return the list of corresponding
  630. values.
  631. If key is not found, default is returned if given, otherwise
  632. KeyError is raised.
  633. """
  634. found = False
  635. identity = self._title(key)
  636. ret = []
  637. for i in range(len(self._impl._items) - 1, -1, -1):
  638. item = self._impl._items[i]
  639. if item[0] == identity:
  640. ret.append(item[2])
  641. del self._impl._items[i]
  642. self._impl.incr_version()
  643. found = True
  644. if not found:
  645. if default is sentinel:
  646. raise KeyError(key)
  647. else:
  648. return default
  649. else:
  650. ret.reverse()
  651. return ret
  652. def popitem(self) -> tuple[str, _V]:
  653. """Remove and return an arbitrary (key, value) pair."""
  654. if self._impl._items:
  655. i, k, v = self._impl._items.pop()
  656. self._impl.incr_version()
  657. return self._key(k), v
  658. else:
  659. raise KeyError("empty multidict")
  660. def update(self, arg: MDArg[_V] = None, /, **kwargs: _V) -> None:
  661. """Update the dictionary from *other*, overwriting existing keys."""
  662. self._extend(arg, kwargs, "update", self._update_items)
  663. def _update_items(self, items: list[tuple[str, str, _V]]) -> None:
  664. if not items:
  665. return
  666. used_keys: dict[str, int] = {}
  667. for identity, key, value in items:
  668. start = used_keys.get(identity, 0)
  669. for i in range(start, len(self._impl._items)):
  670. item = self._impl._items[i]
  671. if item[0] == identity:
  672. used_keys[identity] = i + 1
  673. self._impl._items[i] = (identity, key, value)
  674. break
  675. else:
  676. self._impl._items.append((identity, key, value))
  677. used_keys[identity] = len(self._impl._items)
  678. # drop tails
  679. i = 0
  680. while i < len(self._impl._items):
  681. item = self._impl._items[i]
  682. identity = item[0]
  683. pos = used_keys.get(identity)
  684. if pos is None:
  685. i += 1
  686. continue
  687. if i >= pos:
  688. del self._impl._items[i]
  689. else:
  690. i += 1
  691. self._impl.incr_version()
  692. def _replace(self, key: str, value: _V) -> None:
  693. identity = self._title(key)
  694. items = self._impl._items
  695. for i in range(len(items)):
  696. item = items[i]
  697. if item[0] == identity:
  698. items[i] = (identity, key, value)
  699. # i points to last found item
  700. rgt = i
  701. self._impl.incr_version()
  702. break
  703. else:
  704. self._impl._items.append((identity, key, value))
  705. self._impl.incr_version()
  706. return
  707. # remove all tail items
  708. # Mypy bug: https://github.com/python/mypy/issues/14209
  709. i = rgt + 1 # type: ignore[possibly-undefined]
  710. while i < len(items):
  711. item = items[i]
  712. if item[0] == identity:
  713. del items[i]
  714. else:
  715. i += 1
  716. class CIMultiDict(_CIMixin, MultiDict[_V]):
  717. """Dictionary with the support for duplicate case-insensitive keys."""
  718. class MultiDictProxy(_CSMixin, _Base[_V]):
  719. """Read-only proxy for MultiDict instance."""
  720. def __init__(self, arg: Union[MultiDict[_V], "MultiDictProxy[_V]"]):
  721. if not isinstance(arg, (MultiDict, MultiDictProxy)):
  722. raise TypeError(
  723. "ctor requires MultiDict or MultiDictProxy instance"
  724. f", not {type(arg)}"
  725. )
  726. self._impl = arg._impl
  727. def __reduce__(self) -> NoReturn:
  728. raise TypeError(f"can't pickle {self.__class__.__name__} objects")
  729. def copy(self) -> MultiDict[_V]:
  730. """Return a copy of itself."""
  731. return MultiDict(self.items())
  732. class CIMultiDictProxy(_CIMixin, MultiDictProxy[_V]):
  733. """Read-only proxy for CIMultiDict instance."""
  734. def __init__(self, arg: Union[MultiDict[_V], MultiDictProxy[_V]]):
  735. if not isinstance(arg, (CIMultiDict, CIMultiDictProxy)):
  736. raise TypeError(
  737. "ctor requires CIMultiDict or CIMultiDictProxy instance"
  738. f", not {type(arg)}"
  739. )
  740. self._impl = arg._impl
  741. def copy(self) -> CIMultiDict[_V]:
  742. """Return a copy of itself."""
  743. return CIMultiDict(self.items())
  744. def getversion(md: Union[MultiDict[object], MultiDictProxy[object]]) -> int:
  745. if not isinstance(md, _Base):
  746. raise TypeError("Parameter should be multidict or proxy")
  747. return md._impl._version