Path: blob/main/test/lib/python3.9/site-packages/pip/_internal/utils/misc.py
4804 views
# The following comment should be removed at some point in the future.1# mypy: strict-optional=False23import contextlib4import errno5import getpass6import hashlib7import io8import logging9import os10import posixpath11import shutil12import stat13import sys14import urllib.parse15from io import StringIO16from itertools import filterfalse, tee, zip_longest17from types import TracebackType18from typing import (19Any,20BinaryIO,21Callable,22ContextManager,23Dict,24Generator,25Iterable,26Iterator,27List,28Optional,29TextIO,30Tuple,31Type,32TypeVar,33cast,34)3536from pip._vendor.pep517 import Pep517HookCaller37from pip._vendor.tenacity import retry, stop_after_delay, wait_fixed3839from pip import __version__40from pip._internal.exceptions import CommandError41from pip._internal.locations import get_major_minor_version42from pip._internal.utils.compat import WINDOWS43from pip._internal.utils.virtualenv import running_under_virtualenv4445__all__ = [46"rmtree",47"display_path",48"backup_dir",49"ask",50"splitext",51"format_size",52"is_installable_dir",53"normalize_path",54"renames",55"get_prog",56"captured_stdout",57"ensure_dir",58"remove_auth_from_url",59"ConfiguredPep517HookCaller",60]616263logger = logging.getLogger(__name__)6465T = TypeVar("T")66ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]67VersionInfo = Tuple[int, int, int]68NetlocTuple = Tuple[str, Tuple[Optional[str], Optional[str]]]697071def get_pip_version() -> str:72pip_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..")73pip_pkg_dir = os.path.abspath(pip_pkg_dir)7475return "pip {} from {} (python {})".format(76__version__,77pip_pkg_dir,78get_major_minor_version(),79)808182def normalize_version_info(py_version_info: Tuple[int, ...]) -> Tuple[int, int, int]:83"""84Convert a tuple of ints representing a Python version to one of length85three.8687:param py_version_info: a tuple of ints representing a Python version,88or None to specify no version. The tuple can have any length.8990:return: a tuple of length three if `py_version_info` is non-None.91Otherwise, return `py_version_info` unchanged (i.e. None).92"""93if len(py_version_info) < 3:94py_version_info += (3 - len(py_version_info)) * (0,)95elif len(py_version_info) > 3:96py_version_info = py_version_info[:3]9798return cast("VersionInfo", py_version_info)99100101def ensure_dir(path: str) -> None:102"""os.path.makedirs without EEXIST."""103try:104os.makedirs(path)105except OSError as e:106# Windows can raise spurious ENOTEMPTY errors. See #6426.107if e.errno != errno.EEXIST and e.errno != errno.ENOTEMPTY:108raise109110111def get_prog() -> str:112try:113prog = os.path.basename(sys.argv[0])114if prog in ("__main__.py", "-c"):115return f"{sys.executable} -m pip"116else:117return prog118except (AttributeError, TypeError, IndexError):119pass120return "pip"121122123# Retry every half second for up to 3 seconds124# Tenacity raises RetryError by default, explicitly raise the original exception125@retry(reraise=True, stop=stop_after_delay(3), wait=wait_fixed(0.5))126def rmtree(dir: str, ignore_errors: bool = False) -> None:127shutil.rmtree(dir, ignore_errors=ignore_errors, onerror=rmtree_errorhandler)128129130def rmtree_errorhandler(func: Callable[..., Any], path: str, exc_info: ExcInfo) -> None:131"""On Windows, the files in .svn are read-only, so when rmtree() tries to132remove them, an exception is thrown. We catch that here, remove the133read-only attribute, and hopefully continue without problems."""134try:135has_attr_readonly = not (os.stat(path).st_mode & stat.S_IWRITE)136except OSError:137# it's equivalent to os.path.exists138return139140if has_attr_readonly:141# convert to read/write142os.chmod(path, stat.S_IWRITE)143# use the original function to repeat the operation144func(path)145return146else:147raise148149150def display_path(path: str) -> str:151"""Gives the display value for a given path, making it relative to cwd152if possible."""153path = os.path.normcase(os.path.abspath(path))154if path.startswith(os.getcwd() + os.path.sep):155path = "." + path[len(os.getcwd()) :]156return path157158159def backup_dir(dir: str, ext: str = ".bak") -> str:160"""Figure out the name of a directory to back up the given dir to161(adding .bak, .bak2, etc)"""162n = 1163extension = ext164while os.path.exists(dir + extension):165n += 1166extension = ext + str(n)167return dir + extension168169170def ask_path_exists(message: str, options: Iterable[str]) -> str:171for action in os.environ.get("PIP_EXISTS_ACTION", "").split():172if action in options:173return action174return ask(message, options)175176177def _check_no_input(message: str) -> None:178"""Raise an error if no input is allowed."""179if os.environ.get("PIP_NO_INPUT"):180raise Exception(181f"No input was expected ($PIP_NO_INPUT set); question: {message}"182)183184185def ask(message: str, options: Iterable[str]) -> str:186"""Ask the message interactively, with the given possible responses"""187while 1:188_check_no_input(message)189response = input(message)190response = response.strip().lower()191if response not in options:192print(193"Your response ({!r}) was not one of the expected responses: "194"{}".format(response, ", ".join(options))195)196else:197return response198199200def ask_input(message: str) -> str:201"""Ask for input interactively."""202_check_no_input(message)203return input(message)204205206def ask_password(message: str) -> str:207"""Ask for a password interactively."""208_check_no_input(message)209return getpass.getpass(message)210211212def strtobool(val: str) -> int:213"""Convert a string representation of truth to true (1) or false (0).214215True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values216are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if217'val' is anything else.218"""219val = val.lower()220if val in ("y", "yes", "t", "true", "on", "1"):221return 1222elif val in ("n", "no", "f", "false", "off", "0"):223return 0224else:225raise ValueError(f"invalid truth value {val!r}")226227228def format_size(bytes: float) -> str:229if bytes > 1000 * 1000:230return "{:.1f} MB".format(bytes / 1000.0 / 1000)231elif bytes > 10 * 1000:232return "{} kB".format(int(bytes / 1000))233elif bytes > 1000:234return "{:.1f} kB".format(bytes / 1000.0)235else:236return "{} bytes".format(int(bytes))237238239def tabulate(rows: Iterable[Iterable[Any]]) -> Tuple[List[str], List[int]]:240"""Return a list of formatted rows and a list of column sizes.241242For example::243244>>> tabulate([['foobar', 2000], [0xdeadbeef]])245(['foobar 2000', '3735928559'], [10, 4])246"""247rows = [tuple(map(str, row)) for row in rows]248sizes = [max(map(len, col)) for col in zip_longest(*rows, fillvalue="")]249table = [" ".join(map(str.ljust, row, sizes)).rstrip() for row in rows]250return table, sizes251252253def is_installable_dir(path: str) -> bool:254"""Is path is a directory containing pyproject.toml or setup.py?255256If pyproject.toml exists, this is a PEP 517 project. Otherwise we look for257a legacy setuptools layout by identifying setup.py. We don't check for the258setup.cfg because using it without setup.py is only available for PEP 517259projects, which are already covered by the pyproject.toml check.260"""261if not os.path.isdir(path):262return False263if os.path.isfile(os.path.join(path, "pyproject.toml")):264return True265if os.path.isfile(os.path.join(path, "setup.py")):266return True267return False268269270def read_chunks(271file: BinaryIO, size: int = io.DEFAULT_BUFFER_SIZE272) -> Generator[bytes, None, None]:273"""Yield pieces of data from a file-like object until EOF."""274while True:275chunk = file.read(size)276if not chunk:277break278yield chunk279280281def normalize_path(path: str, resolve_symlinks: bool = True) -> str:282"""283Convert a path to its canonical, case-normalized, absolute version.284285"""286path = os.path.expanduser(path)287if resolve_symlinks:288path = os.path.realpath(path)289else:290path = os.path.abspath(path)291return os.path.normcase(path)292293294def splitext(path: str) -> Tuple[str, str]:295"""Like os.path.splitext, but take off .tar too"""296base, ext = posixpath.splitext(path)297if base.lower().endswith(".tar"):298ext = base[-4:] + ext299base = base[:-4]300return base, ext301302303def renames(old: str, new: str) -> None:304"""Like os.renames(), but handles renaming across devices."""305# Implementation borrowed from os.renames().306head, tail = os.path.split(new)307if head and tail and not os.path.exists(head):308os.makedirs(head)309310shutil.move(old, new)311312head, tail = os.path.split(old)313if head and tail:314try:315os.removedirs(head)316except OSError:317pass318319320def is_local(path: str) -> bool:321"""322Return True if path is within sys.prefix, if we're running in a virtualenv.323324If we're not in a virtualenv, all paths are considered "local."325326Caution: this function assumes the head of path has been normalized327with normalize_path.328"""329if not running_under_virtualenv():330return True331return path.startswith(normalize_path(sys.prefix))332333334def write_output(msg: Any, *args: Any) -> None:335logger.info(msg, *args)336337338class StreamWrapper(StringIO):339orig_stream: TextIO = None340341@classmethod342def from_stream(cls, orig_stream: TextIO) -> "StreamWrapper":343cls.orig_stream = orig_stream344return cls()345346# compileall.compile_dir() needs stdout.encoding to print to stdout347# https://github.com/python/mypy/issues/4125348@property349def encoding(self): # type: ignore350return self.orig_stream.encoding351352353@contextlib.contextmanager354def captured_output(stream_name: str) -> Generator[StreamWrapper, None, None]:355"""Return a context manager used by captured_stdout/stdin/stderr356that temporarily replaces the sys stream *stream_name* with a StringIO.357358Taken from Lib/support/__init__.py in the CPython repo.359"""360orig_stdout = getattr(sys, stream_name)361setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout))362try:363yield getattr(sys, stream_name)364finally:365setattr(sys, stream_name, orig_stdout)366367368def captured_stdout() -> ContextManager[StreamWrapper]:369"""Capture the output of sys.stdout:370371with captured_stdout() as stdout:372print('hello')373self.assertEqual(stdout.getvalue(), 'hello\n')374375Taken from Lib/support/__init__.py in the CPython repo.376"""377return captured_output("stdout")378379380def captured_stderr() -> ContextManager[StreamWrapper]:381"""382See captured_stdout().383"""384return captured_output("stderr")385386387# Simulates an enum388def enum(*sequential: Any, **named: Any) -> Type[Any]:389enums = dict(zip(sequential, range(len(sequential))), **named)390reverse = {value: key for key, value in enums.items()}391enums["reverse_mapping"] = reverse392return type("Enum", (), enums)393394395def build_netloc(host: str, port: Optional[int]) -> str:396"""397Build a netloc from a host-port pair398"""399if port is None:400return host401if ":" in host:402# Only wrap host with square brackets when it is IPv6403host = f"[{host}]"404return f"{host}:{port}"405406407def build_url_from_netloc(netloc: str, scheme: str = "https") -> str:408"""409Build a full URL from a netloc.410"""411if netloc.count(":") >= 2 and "@" not in netloc and "[" not in netloc:412# It must be a bare IPv6 address, so wrap it with brackets.413netloc = f"[{netloc}]"414return f"{scheme}://{netloc}"415416417def parse_netloc(netloc: str) -> Tuple[str, Optional[int]]:418"""419Return the host-port pair from a netloc.420"""421url = build_url_from_netloc(netloc)422parsed = urllib.parse.urlparse(url)423return parsed.hostname, parsed.port424425426def split_auth_from_netloc(netloc: str) -> NetlocTuple:427"""428Parse out and remove the auth information from a netloc.429430Returns: (netloc, (username, password)).431"""432if "@" not in netloc:433return netloc, (None, None)434435# Split from the right because that's how urllib.parse.urlsplit()436# behaves if more than one @ is present (which can be checked using437# the password attribute of urlsplit()'s return value).438auth, netloc = netloc.rsplit("@", 1)439pw: Optional[str] = None440if ":" in auth:441# Split from the left because that's how urllib.parse.urlsplit()442# behaves if more than one : is present (which again can be checked443# using the password attribute of the return value)444user, pw = auth.split(":", 1)445else:446user, pw = auth, None447448user = urllib.parse.unquote(user)449if pw is not None:450pw = urllib.parse.unquote(pw)451452return netloc, (user, pw)453454455def redact_netloc(netloc: str) -> str:456"""457Replace the sensitive data in a netloc with "****", if it exists.458459For example:460- "user:[email protected]" returns "user:****@example.com"461- "[email protected]" returns "****@example.com"462"""463netloc, (user, password) = split_auth_from_netloc(netloc)464if user is None:465return netloc466if password is None:467user = "****"468password = ""469else:470user = urllib.parse.quote(user)471password = ":****"472return "{user}{password}@{netloc}".format(473user=user, password=password, netloc=netloc474)475476477def _transform_url(478url: str, transform_netloc: Callable[[str], Tuple[Any, ...]]479) -> Tuple[str, NetlocTuple]:480"""Transform and replace netloc in a url.481482transform_netloc is a function taking the netloc and returning a483tuple. The first element of this tuple is the new netloc. The484entire tuple is returned.485486Returns a tuple containing the transformed url as item 0 and the487original tuple returned by transform_netloc as item 1.488"""489purl = urllib.parse.urlsplit(url)490netloc_tuple = transform_netloc(purl.netloc)491# stripped url492url_pieces = (purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment)493surl = urllib.parse.urlunsplit(url_pieces)494return surl, cast("NetlocTuple", netloc_tuple)495496497def _get_netloc(netloc: str) -> NetlocTuple:498return split_auth_from_netloc(netloc)499500501def _redact_netloc(netloc: str) -> Tuple[str]:502return (redact_netloc(netloc),)503504505def split_auth_netloc_from_url(url: str) -> Tuple[str, str, Tuple[str, str]]:506"""507Parse a url into separate netloc, auth, and url with no auth.508509Returns: (url_without_auth, netloc, (username, password))510"""511url_without_auth, (netloc, auth) = _transform_url(url, _get_netloc)512return url_without_auth, netloc, auth513514515def remove_auth_from_url(url: str) -> str:516"""Return a copy of url with 'username:password@' removed."""517# username/pass params are passed to subversion through flags518# and are not recognized in the url.519return _transform_url(url, _get_netloc)[0]520521522def redact_auth_from_url(url: str) -> str:523"""Replace the password in a given url with ****."""524return _transform_url(url, _redact_netloc)[0]525526527class HiddenText:528def __init__(self, secret: str, redacted: str) -> None:529self.secret = secret530self.redacted = redacted531532def __repr__(self) -> str:533return "<HiddenText {!r}>".format(str(self))534535def __str__(self) -> str:536return self.redacted537538# This is useful for testing.539def __eq__(self, other: Any) -> bool:540if type(self) != type(other):541return False542543# The string being used for redaction doesn't also have to match,544# just the raw, original string.545return self.secret == other.secret546547548def hide_value(value: str) -> HiddenText:549return HiddenText(value, redacted="****")550551552def hide_url(url: str) -> HiddenText:553redacted = redact_auth_from_url(url)554return HiddenText(url, redacted=redacted)555556557def protect_pip_from_modification_on_windows(modifying_pip: bool) -> None:558"""Protection of pip.exe from modification on Windows559560On Windows, any operation modifying pip should be run as:561python -m pip ...562"""563pip_names = [564"pip",565f"pip{sys.version_info.major}",566f"pip{sys.version_info.major}.{sys.version_info.minor}",567]568569# See https://github.com/pypa/pip/issues/1299 for more discussion570should_show_use_python_msg = (571modifying_pip and WINDOWS and os.path.basename(sys.argv[0]) in pip_names572)573574if should_show_use_python_msg:575new_command = [sys.executable, "-m", "pip"] + sys.argv[1:]576raise CommandError(577"To modify pip, please run the following command:\n{}".format(578" ".join(new_command)579)580)581582583def is_console_interactive() -> bool:584"""Is this console interactive?"""585return sys.stdin is not None and sys.stdin.isatty()586587588def hash_file(path: str, blocksize: int = 1 << 20) -> Tuple[Any, int]:589"""Return (hash, length) for path using hashlib.sha256()"""590591h = hashlib.sha256()592length = 0593with open(path, "rb") as f:594for block in read_chunks(f, size=blocksize):595length += len(block)596h.update(block)597return h, length598599600def is_wheel_installed() -> bool:601"""602Return whether the wheel package is installed.603"""604try:605import wheel # noqa: F401606except ImportError:607return False608609return True610611612def pairwise(iterable: Iterable[Any]) -> Iterator[Tuple[Any, Any]]:613"""614Return paired elements.615616For example:617s -> (s0, s1), (s2, s3), (s4, s5), ...618"""619iterable = iter(iterable)620return zip_longest(iterable, iterable)621622623def partition(624pred: Callable[[T], bool],625iterable: Iterable[T],626) -> Tuple[Iterable[T], Iterable[T]]:627"""628Use a predicate to partition entries into false entries and true entries,629like630631partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9632"""633t1, t2 = tee(iterable)634return filterfalse(pred, t1), filter(pred, t2)635636637class ConfiguredPep517HookCaller(Pep517HookCaller):638def __init__(639self,640config_holder: Any,641source_dir: str,642build_backend: str,643backend_path: Optional[str] = None,644runner: Optional[Callable[..., None]] = None,645python_executable: Optional[str] = None,646):647super().__init__(648source_dir, build_backend, backend_path, runner, python_executable649)650self.config_holder = config_holder651652def build_wheel(653self,654wheel_directory: str,655config_settings: Optional[Dict[str, str]] = None,656metadata_directory: Optional[str] = None,657) -> str:658cs = self.config_holder.config_settings659return super().build_wheel(660wheel_directory, config_settings=cs, metadata_directory=metadata_directory661)662663def build_sdist(664self, sdist_directory: str, config_settings: Optional[Dict[str, str]] = None665) -> str:666cs = self.config_holder.config_settings667return super().build_sdist(sdist_directory, config_settings=cs)668669def build_editable(670self,671wheel_directory: str,672config_settings: Optional[Dict[str, str]] = None,673metadata_directory: Optional[str] = None,674) -> str:675cs = self.config_holder.config_settings676return super().build_editable(677wheel_directory, config_settings=cs, metadata_directory=metadata_directory678)679680def get_requires_for_build_wheel(681self, config_settings: Optional[Dict[str, str]] = None682) -> List[str]:683cs = self.config_holder.config_settings684return super().get_requires_for_build_wheel(config_settings=cs)685686def get_requires_for_build_sdist(687self, config_settings: Optional[Dict[str, str]] = None688) -> List[str]:689cs = self.config_holder.config_settings690return super().get_requires_for_build_sdist(config_settings=cs)691692def get_requires_for_build_editable(693self, config_settings: Optional[Dict[str, str]] = None694) -> List[str]:695cs = self.config_holder.config_settings696return super().get_requires_for_build_editable(config_settings=cs)697698def prepare_metadata_for_build_wheel(699self,700metadata_directory: str,701config_settings: Optional[Dict[str, str]] = None,702_allow_fallback: bool = True,703) -> str:704cs = self.config_holder.config_settings705return super().prepare_metadata_for_build_wheel(706metadata_directory=metadata_directory,707config_settings=cs,708_allow_fallback=_allow_fallback,709)710711def prepare_metadata_for_build_editable(712self,713metadata_directory: str,714config_settings: Optional[Dict[str, str]] = None,715_allow_fallback: bool = True,716) -> str:717cs = self.config_holder.config_settings718return super().prepare_metadata_for_build_editable(719metadata_directory=metadata_directory,720config_settings=cs,721_allow_fallback=_allow_fallback,722)723724725