check.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. """Validation of dependencies of packages
  2. """
  3. import logging
  4. from contextlib import suppress
  5. from email.parser import Parser
  6. from functools import reduce
  7. from typing import (
  8. Callable,
  9. Dict,
  10. FrozenSet,
  11. Generator,
  12. Iterable,
  13. List,
  14. NamedTuple,
  15. Optional,
  16. Set,
  17. Tuple,
  18. )
  19. from pip._vendor.packaging.requirements import Requirement
  20. from pip._vendor.packaging.tags import Tag, parse_tag
  21. from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
  22. from pip._vendor.packaging.version import Version
  23. from pip._internal.distributions import make_distribution_for_install_requirement
  24. from pip._internal.metadata import get_default_environment
  25. from pip._internal.metadata.base import BaseDistribution
  26. from pip._internal.req.req_install import InstallRequirement
  27. logger = logging.getLogger(__name__)
  28. class PackageDetails(NamedTuple):
  29. version: Version
  30. dependencies: List[Requirement]
  31. # Shorthands
  32. PackageSet = Dict[NormalizedName, PackageDetails]
  33. Missing = Tuple[NormalizedName, Requirement]
  34. Conflicting = Tuple[NormalizedName, Version, Requirement]
  35. MissingDict = Dict[NormalizedName, List[Missing]]
  36. ConflictingDict = Dict[NormalizedName, List[Conflicting]]
  37. CheckResult = Tuple[MissingDict, ConflictingDict]
  38. ConflictDetails = Tuple[PackageSet, CheckResult]
  39. def create_package_set_from_installed() -> Tuple[PackageSet, bool]:
  40. """Converts a list of distributions into a PackageSet."""
  41. package_set = {}
  42. problems = False
  43. env = get_default_environment()
  44. for dist in env.iter_installed_distributions(local_only=False, skip=()):
  45. name = dist.canonical_name
  46. try:
  47. dependencies = list(dist.iter_dependencies())
  48. package_set[name] = PackageDetails(dist.version, dependencies)
  49. except (OSError, ValueError) as e:
  50. # Don't crash on unreadable or broken metadata.
  51. logger.warning("Error parsing dependencies of %s: %s", name, e)
  52. problems = True
  53. return package_set, problems
  54. def check_package_set(
  55. package_set: PackageSet, should_ignore: Optional[Callable[[str], bool]] = None
  56. ) -> CheckResult:
  57. """Check if a package set is consistent
  58. If should_ignore is passed, it should be a callable that takes a
  59. package name and returns a boolean.
  60. """
  61. missing = {}
  62. conflicting = {}
  63. for package_name, package_detail in package_set.items():
  64. # Info about dependencies of package_name
  65. missing_deps: Set[Missing] = set()
  66. conflicting_deps: Set[Conflicting] = set()
  67. if should_ignore and should_ignore(package_name):
  68. continue
  69. for req in package_detail.dependencies:
  70. name = canonicalize_name(req.name)
  71. # Check if it's missing
  72. if name not in package_set:
  73. missed = True
  74. if req.marker is not None:
  75. missed = req.marker.evaluate({"extra": ""})
  76. if missed:
  77. missing_deps.add((name, req))
  78. continue
  79. # Check if there's a conflict
  80. version = package_set[name].version
  81. if not req.specifier.contains(version, prereleases=True):
  82. conflicting_deps.add((name, version, req))
  83. if missing_deps:
  84. missing[package_name] = sorted(missing_deps, key=str)
  85. if conflicting_deps:
  86. conflicting[package_name] = sorted(conflicting_deps, key=str)
  87. return missing, conflicting
  88. def check_install_conflicts(to_install: List[InstallRequirement]) -> ConflictDetails:
  89. """For checking if the dependency graph would be consistent after \
  90. installing given requirements
  91. """
  92. # Start from the current state
  93. package_set, _ = create_package_set_from_installed()
  94. # Install packages
  95. would_be_installed = _simulate_installation_of(to_install, package_set)
  96. # Only warn about directly-dependent packages; create a whitelist of them
  97. whitelist = _create_whitelist(would_be_installed, package_set)
  98. return (
  99. package_set,
  100. check_package_set(
  101. package_set, should_ignore=lambda name: name not in whitelist
  102. ),
  103. )
  104. def check_unsupported(
  105. packages: Iterable[BaseDistribution],
  106. supported_tags: Iterable[Tag],
  107. ) -> Generator[BaseDistribution, None, None]:
  108. for p in packages:
  109. with suppress(FileNotFoundError):
  110. wheel_file = p.read_text("WHEEL")
  111. wheel_tags: FrozenSet[Tag] = reduce(
  112. frozenset.union,
  113. map(parse_tag, Parser().parsestr(wheel_file).get_all("Tag", [])),
  114. frozenset(),
  115. )
  116. if wheel_tags.isdisjoint(supported_tags):
  117. yield p
  118. def _simulate_installation_of(
  119. to_install: List[InstallRequirement], package_set: PackageSet
  120. ) -> Set[NormalizedName]:
  121. """Computes the version of packages after installing to_install."""
  122. # Keep track of packages that were installed
  123. installed = set()
  124. # Modify it as installing requirement_set would (assuming no errors)
  125. for inst_req in to_install:
  126. abstract_dist = make_distribution_for_install_requirement(inst_req)
  127. dist = abstract_dist.get_metadata_distribution()
  128. name = dist.canonical_name
  129. package_set[name] = PackageDetails(dist.version, list(dist.iter_dependencies()))
  130. installed.add(name)
  131. return installed
  132. def _create_whitelist(
  133. would_be_installed: Set[NormalizedName], package_set: PackageSet
  134. ) -> Set[NormalizedName]:
  135. packages_affected = set(would_be_installed)
  136. for package_name in package_set:
  137. if package_name in packages_affected:
  138. continue
  139. for req in package_set[package_name].dependencies:
  140. if canonicalize_name(req.name) in packages_affected:
  141. packages_affected.add(package_name)
  142. break
  143. return packages_affected