Path: blob/master/venv/Lib/site-packages/pip/_internal/utils/misc.py
811 views
# The following comment should be removed at some point in the future.1# mypy: strict-optional=False2# mypy: disallow-untyped-defs=False34from __future__ import absolute_import56import contextlib7import errno8import getpass9import hashlib10import io11import logging12import os13import posixpath14import shutil15import stat16import sys17from collections import deque1819from pip._vendor import pkg_resources20# NOTE: retrying is not annotated in typeshed as on 2017-07-17, which is21# why we ignore the type on this import.22from pip._vendor.retrying import retry # type: ignore23from pip._vendor.six import PY2, text_type24from pip._vendor.six.moves import input, map, zip_longest25from pip._vendor.six.moves.urllib import parse as urllib_parse26from pip._vendor.six.moves.urllib.parse import unquote as urllib_unquote2728from pip import __version__29from pip._internal.exceptions import CommandError30from pip._internal.locations import (31get_major_minor_version,32site_packages,33user_site,34)35from pip._internal.utils.compat import (36WINDOWS,37expanduser,38stdlib_pkgs,39str_to_display,40)41from pip._internal.utils.typing import MYPY_CHECK_RUNNING, cast42from pip._internal.utils.virtualenv import (43running_under_virtualenv,44virtualenv_no_global,45)4647if PY2:48from io import BytesIO as StringIO49else:50from io import StringIO5152if MYPY_CHECK_RUNNING:53from typing import (54Any, AnyStr, Container, Iterable, Iterator, List, Optional, Text,55Tuple, Union,56)57from pip._vendor.pkg_resources import Distribution5859VersionInfo = Tuple[int, int, int]606162__all__ = ['rmtree', 'display_path', 'backup_dir',63'ask', 'splitext',64'format_size', 'is_installable_dir',65'normalize_path',66'renames', 'get_prog',67'captured_stdout', 'ensure_dir',68'get_installed_version', 'remove_auth_from_url']697071logger = logging.getLogger(__name__)727374def get_pip_version():75# type: () -> str76pip_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..")77pip_pkg_dir = os.path.abspath(pip_pkg_dir)7879return (80'pip {} from {} (python {})'.format(81__version__, pip_pkg_dir, get_major_minor_version(),82)83)848586def normalize_version_info(py_version_info):87# type: (Tuple[int, ...]) -> Tuple[int, int, int]88"""89Convert a tuple of ints representing a Python version to one of length90three.9192:param py_version_info: a tuple of ints representing a Python version,93or None to specify no version. The tuple can have any length.9495:return: a tuple of length three if `py_version_info` is non-None.96Otherwise, return `py_version_info` unchanged (i.e. None).97"""98if len(py_version_info) < 3:99py_version_info += (3 - len(py_version_info)) * (0,)100elif len(py_version_info) > 3:101py_version_info = py_version_info[:3]102103return cast('VersionInfo', py_version_info)104105106def ensure_dir(path):107# type: (AnyStr) -> None108"""os.path.makedirs without EEXIST."""109try:110os.makedirs(path)111except OSError as e:112# Windows can raise spurious ENOTEMPTY errors. See #6426.113if e.errno != errno.EEXIST and e.errno != errno.ENOTEMPTY:114raise115116117def get_prog():118# type: () -> str119try:120prog = os.path.basename(sys.argv[0])121if prog in ('__main__.py', '-c'):122return "{} -m pip".format(sys.executable)123else:124return prog125except (AttributeError, TypeError, IndexError):126pass127return 'pip'128129130# Retry every half second for up to 3 seconds131@retry(stop_max_delay=3000, wait_fixed=500)132def rmtree(dir, ignore_errors=False):133# type: (str, bool) -> None134shutil.rmtree(dir, ignore_errors=ignore_errors,135onerror=rmtree_errorhandler)136137138def rmtree_errorhandler(func, path, exc_info):139"""On Windows, the files in .svn are read-only, so when rmtree() tries to140remove them, an exception is thrown. We catch that here, remove the141read-only attribute, and hopefully continue without problems."""142try:143has_attr_readonly = not (os.stat(path).st_mode & stat.S_IWRITE)144except (IOError, OSError):145# it's equivalent to os.path.exists146return147148if has_attr_readonly:149# convert to read/write150os.chmod(path, stat.S_IWRITE)151# use the original function to repeat the operation152func(path)153return154else:155raise156157158def path_to_display(path):159# type: (Optional[Union[str, Text]]) -> Optional[Text]160"""161Convert a bytes (or text) path to text (unicode in Python 2) for display162and logging purposes.163164This function should never error out. Also, this function is mainly needed165for Python 2 since in Python 3 str paths are already text.166"""167if path is None:168return None169if isinstance(path, text_type):170return path171# Otherwise, path is a bytes object (str in Python 2).172try:173display_path = path.decode(sys.getfilesystemencoding(), 'strict')174except UnicodeDecodeError:175# Include the full bytes to make troubleshooting easier, even though176# it may not be very human readable.177if PY2:178# Convert the bytes to a readable str representation using179# repr(), and then convert the str to unicode.180# Also, we add the prefix "b" to the repr() return value both181# to make the Python 2 output look like the Python 3 output, and182# to signal to the user that this is a bytes representation.183display_path = str_to_display('b{!r}'.format(path))184else:185# Silence the "F821 undefined name 'ascii'" flake8 error since186# in Python 3 ascii() is a built-in.187display_path = ascii(path) # noqa: F821188189return display_path190191192def display_path(path):193# type: (Union[str, Text]) -> str194"""Gives the display value for a given path, making it relative to cwd195if possible."""196path = os.path.normcase(os.path.abspath(path))197if sys.version_info[0] == 2:198path = path.decode(sys.getfilesystemencoding(), 'replace')199path = path.encode(sys.getdefaultencoding(), 'replace')200if path.startswith(os.getcwd() + os.path.sep):201path = '.' + path[len(os.getcwd()):]202return path203204205def backup_dir(dir, ext='.bak'):206# type: (str, str) -> str207"""Figure out the name of a directory to back up the given dir to208(adding .bak, .bak2, etc)"""209n = 1210extension = ext211while os.path.exists(dir + extension):212n += 1213extension = ext + str(n)214return dir + extension215216217def ask_path_exists(message, options):218# type: (str, Iterable[str]) -> str219for action in os.environ.get('PIP_EXISTS_ACTION', '').split():220if action in options:221return action222return ask(message, options)223224225def _check_no_input(message):226# type: (str) -> None227"""Raise an error if no input is allowed."""228if os.environ.get('PIP_NO_INPUT'):229raise Exception(230'No input was expected ($PIP_NO_INPUT set); question: {}'.format(231message)232)233234235def ask(message, options):236# type: (str, Iterable[str]) -> str237"""Ask the message interactively, with the given possible responses"""238while 1:239_check_no_input(message)240response = input(message)241response = response.strip().lower()242if response not in options:243print(244'Your response ({!r}) was not one of the expected responses: '245'{}'.format(response, ', '.join(options))246)247else:248return response249250251def ask_input(message):252# type: (str) -> str253"""Ask for input interactively."""254_check_no_input(message)255return input(message)256257258def ask_password(message):259# type: (str) -> str260"""Ask for a password interactively."""261_check_no_input(message)262return getpass.getpass(message)263264265def format_size(bytes):266# type: (float) -> str267if bytes > 1000 * 1000:268return '{:.1f} MB'.format(bytes / 1000.0 / 1000)269elif bytes > 10 * 1000:270return '{} kB'.format(int(bytes / 1000))271elif bytes > 1000:272return '{:.1f} kB'.format(bytes / 1000.0)273else:274return '{} bytes'.format(int(bytes))275276277def tabulate(rows):278# type: (Iterable[Iterable[Any]]) -> Tuple[List[str], List[int]]279"""Return a list of formatted rows and a list of column sizes.280281For example::282283>>> tabulate([['foobar', 2000], [0xdeadbeef]])284(['foobar 2000', '3735928559'], [10, 4])285"""286rows = [tuple(map(str, row)) for row in rows]287sizes = [max(map(len, col)) for col in zip_longest(*rows, fillvalue='')]288table = [" ".join(map(str.ljust, row, sizes)).rstrip() for row in rows]289return table, sizes290291292def is_installable_dir(path):293# type: (str) -> bool294"""Is path is a directory containing setup.py or pyproject.toml?295"""296if not os.path.isdir(path):297return False298setup_py = os.path.join(path, 'setup.py')299if os.path.isfile(setup_py):300return True301pyproject_toml = os.path.join(path, 'pyproject.toml')302if os.path.isfile(pyproject_toml):303return True304return False305306307def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):308"""Yield pieces of data from a file-like object until EOF."""309while True:310chunk = file.read(size)311if not chunk:312break313yield chunk314315316def normalize_path(path, resolve_symlinks=True):317# type: (str, bool) -> str318"""319Convert a path to its canonical, case-normalized, absolute version.320321"""322path = expanduser(path)323if resolve_symlinks:324path = os.path.realpath(path)325else:326path = os.path.abspath(path)327return os.path.normcase(path)328329330def splitext(path):331# type: (str) -> Tuple[str, str]332"""Like os.path.splitext, but take off .tar too"""333base, ext = posixpath.splitext(path)334if base.lower().endswith('.tar'):335ext = base[-4:] + ext336base = base[:-4]337return base, ext338339340def renames(old, new):341# type: (str, str) -> None342"""Like os.renames(), but handles renaming across devices."""343# Implementation borrowed from os.renames().344head, tail = os.path.split(new)345if head and tail and not os.path.exists(head):346os.makedirs(head)347348shutil.move(old, new)349350head, tail = os.path.split(old)351if head and tail:352try:353os.removedirs(head)354except OSError:355pass356357358def is_local(path):359# type: (str) -> bool360"""361Return True if path is within sys.prefix, if we're running in a virtualenv.362363If we're not in a virtualenv, all paths are considered "local."364365Caution: this function assumes the head of path has been normalized366with normalize_path.367"""368if not running_under_virtualenv():369return True370return path.startswith(normalize_path(sys.prefix))371372373def dist_is_local(dist):374# type: (Distribution) -> bool375"""376Return True if given Distribution object is installed locally377(i.e. within current virtualenv).378379Always True if we're not in a virtualenv.380381"""382return is_local(dist_location(dist))383384385def dist_in_usersite(dist):386# type: (Distribution) -> bool387"""388Return True if given Distribution is installed in user site.389"""390return dist_location(dist).startswith(normalize_path(user_site))391392393def dist_in_site_packages(dist):394# type: (Distribution) -> bool395"""396Return True if given Distribution is installed in397sysconfig.get_python_lib().398"""399return dist_location(dist).startswith(normalize_path(site_packages))400401402def dist_is_editable(dist):403# type: (Distribution) -> bool404"""405Return True if given Distribution is an editable install.406"""407for path_item in sys.path:408egg_link = os.path.join(path_item, dist.project_name + '.egg-link')409if os.path.isfile(egg_link):410return True411return False412413414def get_installed_distributions(415local_only=True, # type: bool416skip=stdlib_pkgs, # type: Container[str]417include_editables=True, # type: bool418editables_only=False, # type: bool419user_only=False, # type: bool420paths=None # type: Optional[List[str]]421):422# type: (...) -> List[Distribution]423"""424Return a list of installed Distribution objects.425426If ``local_only`` is True (default), only return installations427local to the current virtualenv, if in a virtualenv.428429``skip`` argument is an iterable of lower-case project names to430ignore; defaults to stdlib_pkgs431432If ``include_editables`` is False, don't report editables.433434If ``editables_only`` is True , only report editables.435436If ``user_only`` is True , only report installations in the user437site directory.438439If ``paths`` is set, only report the distributions present at the440specified list of locations.441"""442if paths:443working_set = pkg_resources.WorkingSet(paths)444else:445working_set = pkg_resources.working_set446447if local_only:448local_test = dist_is_local449else:450def local_test(d):451return True452453if include_editables:454def editable_test(d):455return True456else:457def editable_test(d):458return not dist_is_editable(d)459460if editables_only:461def editables_only_test(d):462return dist_is_editable(d)463else:464def editables_only_test(d):465return True466467if user_only:468user_test = dist_in_usersite469else:470def user_test(d):471return True472473return [d for d in working_set474if local_test(d) and475d.key not in skip and476editable_test(d) and477editables_only_test(d) and478user_test(d)479]480481482def egg_link_path(dist):483# type: (Distribution) -> Optional[str]484"""485Return the path for the .egg-link file if it exists, otherwise, None.486487There's 3 scenarios:4881) not in a virtualenv489try to find in site.USER_SITE, then site_packages4902) in a no-global virtualenv491try to find in site_packages4923) in a yes-global virtualenv493try to find in site_packages, then site.USER_SITE494(don't look in global location)495496For #1 and #3, there could be odd cases, where there's an egg-link in 2497locations.498499This method will just return the first one found.500"""501sites = []502if running_under_virtualenv():503sites.append(site_packages)504if not virtualenv_no_global() and user_site:505sites.append(user_site)506else:507if user_site:508sites.append(user_site)509sites.append(site_packages)510511for site in sites:512egglink = os.path.join(site, dist.project_name) + '.egg-link'513if os.path.isfile(egglink):514return egglink515return None516517518def dist_location(dist):519# type: (Distribution) -> str520"""521Get the site-packages location of this distribution. Generally522this is dist.location, except in the case of develop-installed523packages, where dist.location is the source code location, and we524want to know where the egg-link file is.525526The returned location is normalized (in particular, with symlinks removed).527"""528egg_link = egg_link_path(dist)529if egg_link:530return normalize_path(egg_link)531return normalize_path(dist.location)532533534def write_output(msg, *args):535# type: (str, str) -> None536logger.info(msg, *args)537538539class FakeFile(object):540"""Wrap a list of lines in an object with readline() to make541ConfigParser happy."""542def __init__(self, lines):543self._gen = (l for l in lines)544545def readline(self):546try:547try:548return next(self._gen)549except NameError:550return self._gen.next()551except StopIteration:552return ''553554def __iter__(self):555return self._gen556557558class StreamWrapper(StringIO):559560@classmethod561def from_stream(cls, orig_stream):562cls.orig_stream = orig_stream563return cls()564565# compileall.compile_dir() needs stdout.encoding to print to stdout566@property567def encoding(self):568return self.orig_stream.encoding569570571@contextlib.contextmanager572def captured_output(stream_name):573"""Return a context manager used by captured_stdout/stdin/stderr574that temporarily replaces the sys stream *stream_name* with a StringIO.575576Taken from Lib/support/__init__.py in the CPython repo.577"""578orig_stdout = getattr(sys, stream_name)579setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout))580try:581yield getattr(sys, stream_name)582finally:583setattr(sys, stream_name, orig_stdout)584585586def captured_stdout():587"""Capture the output of sys.stdout:588589with captured_stdout() as stdout:590print('hello')591self.assertEqual(stdout.getvalue(), 'hello\n')592593Taken from Lib/support/__init__.py in the CPython repo.594"""595return captured_output('stdout')596597598def captured_stderr():599"""600See captured_stdout().601"""602return captured_output('stderr')603604605class cached_property(object):606"""A property that is only computed once per instance and then replaces607itself with an ordinary attribute. Deleting the attribute resets the608property.609610Source: https://github.com/bottlepy/bottle/blob/0.11.5/bottle.py#L175611"""612613def __init__(self, func):614self.__doc__ = getattr(func, '__doc__')615self.func = func616617def __get__(self, obj, cls):618if obj is None:619# We're being accessed from the class itself, not from an object620return self621value = obj.__dict__[self.func.__name__] = self.func(obj)622return value623624625def get_installed_version(dist_name, working_set=None):626"""Get the installed version of dist_name avoiding pkg_resources cache"""627# Create a requirement that we'll look for inside of setuptools.628req = pkg_resources.Requirement.parse(dist_name)629630if working_set is None:631# We want to avoid having this cached, so we need to construct a new632# working set each time.633working_set = pkg_resources.WorkingSet()634635# Get the installed distribution from our working set636dist = working_set.find(req)637638# Check to see if we got an installed distribution or not, if we did639# we want to return it's version.640return dist.version if dist else None641642643def consume(iterator):644"""Consume an iterable at C speed."""645deque(iterator, maxlen=0)646647648# Simulates an enum649def enum(*sequential, **named):650enums = dict(zip(sequential, range(len(sequential))), **named)651reverse = {value: key for key, value in enums.items()}652enums['reverse_mapping'] = reverse653return type('Enum', (), enums)654655656def build_netloc(host, port):657# type: (str, Optional[int]) -> str658"""659Build a netloc from a host-port pair660"""661if port is None:662return host663if ':' in host:664# Only wrap host with square brackets when it is IPv6665host = '[{}]'.format(host)666return '{}:{}'.format(host, port)667668669def build_url_from_netloc(netloc, scheme='https'):670# type: (str, str) -> str671"""672Build a full URL from a netloc.673"""674if netloc.count(':') >= 2 and '@' not in netloc and '[' not in netloc:675# It must be a bare IPv6 address, so wrap it with brackets.676netloc = '[{}]'.format(netloc)677return '{}://{}'.format(scheme, netloc)678679680def parse_netloc(netloc):681# type: (str) -> Tuple[str, Optional[int]]682"""683Return the host-port pair from a netloc.684"""685url = build_url_from_netloc(netloc)686parsed = urllib_parse.urlparse(url)687return parsed.hostname, parsed.port688689690def split_auth_from_netloc(netloc):691"""692Parse out and remove the auth information from a netloc.693694Returns: (netloc, (username, password)).695"""696if '@' not in netloc:697return netloc, (None, None)698699# Split from the right because that's how urllib.parse.urlsplit()700# behaves if more than one @ is present (which can be checked using701# the password attribute of urlsplit()'s return value).702auth, netloc = netloc.rsplit('@', 1)703if ':' in auth:704# Split from the left because that's how urllib.parse.urlsplit()705# behaves if more than one : is present (which again can be checked706# using the password attribute of the return value)707user_pass = auth.split(':', 1)708else:709user_pass = auth, None710711user_pass = tuple(712None if x is None else urllib_unquote(x) for x in user_pass713)714715return netloc, user_pass716717718def redact_netloc(netloc):719# type: (str) -> str720"""721Replace the sensitive data in a netloc with "****", if it exists.722723For example:724- "user:[email protected]" returns "user:****@example.com"725- "[email protected]" returns "****@example.com"726"""727netloc, (user, password) = split_auth_from_netloc(netloc)728if user is None:729return netloc730if password is None:731user = '****'732password = ''733else:734user = urllib_parse.quote(user)735password = ':****'736return '{user}{password}@{netloc}'.format(user=user,737password=password,738netloc=netloc)739740741def _transform_url(url, transform_netloc):742"""Transform and replace netloc in a url.743744transform_netloc is a function taking the netloc and returning a745tuple. The first element of this tuple is the new netloc. The746entire tuple is returned.747748Returns a tuple containing the transformed url as item 0 and the749original tuple returned by transform_netloc as item 1.750"""751purl = urllib_parse.urlsplit(url)752netloc_tuple = transform_netloc(purl.netloc)753# stripped url754url_pieces = (755purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment756)757surl = urllib_parse.urlunsplit(url_pieces)758return surl, netloc_tuple759760761def _get_netloc(netloc):762return split_auth_from_netloc(netloc)763764765def _redact_netloc(netloc):766return (redact_netloc(netloc),)767768769def split_auth_netloc_from_url(url):770# type: (str) -> Tuple[str, str, Tuple[str, str]]771"""772Parse a url into separate netloc, auth, and url with no auth.773774Returns: (url_without_auth, netloc, (username, password))775"""776url_without_auth, (netloc, auth) = _transform_url(url, _get_netloc)777return url_without_auth, netloc, auth778779780def remove_auth_from_url(url):781# type: (str) -> str782"""Return a copy of url with 'username:password@' removed."""783# username/pass params are passed to subversion through flags784# and are not recognized in the url.785return _transform_url(url, _get_netloc)[0]786787788def redact_auth_from_url(url):789# type: (str) -> str790"""Replace the password in a given url with ****."""791return _transform_url(url, _redact_netloc)[0]792793794class HiddenText(object):795def __init__(796self,797secret, # type: str798redacted, # type: str799):800# type: (...) -> None801self.secret = secret802self.redacted = redacted803804def __repr__(self):805# type: (...) -> str806return '<HiddenText {!r}>'.format(str(self))807808def __str__(self):809# type: (...) -> str810return self.redacted811812# This is useful for testing.813def __eq__(self, other):814# type: (Any) -> bool815if type(self) != type(other):816return False817818# The string being used for redaction doesn't also have to match,819# just the raw, original string.820return (self.secret == other.secret)821822# We need to provide an explicit __ne__ implementation for Python 2.823# TODO: remove this when we drop PY2 support.824def __ne__(self, other):825# type: (Any) -> bool826return not self == other827828829def hide_value(value):830# type: (str) -> HiddenText831return HiddenText(value, redacted='****')832833834def hide_url(url):835# type: (str) -> HiddenText836redacted = redact_auth_from_url(url)837return HiddenText(url, redacted=redacted)838839840def protect_pip_from_modification_on_windows(modifying_pip):841# type: (bool) -> None842"""Protection of pip.exe from modification on Windows843844On Windows, any operation modifying pip should be run as:845python -m pip ...846"""847pip_names = [848"pip.exe",849"pip{}.exe".format(sys.version_info[0]),850"pip{}.{}.exe".format(*sys.version_info[:2])851]852853# See https://github.com/pypa/pip/issues/1299 for more discussion854should_show_use_python_msg = (855modifying_pip and856WINDOWS and857os.path.basename(sys.argv[0]) in pip_names858)859860if should_show_use_python_msg:861new_command = [862sys.executable, "-m", "pip"863] + sys.argv[1:]864raise CommandError(865'To modify pip, please run the following command:\n{}'866.format(" ".join(new_command))867)868869870def is_console_interactive():871# type: () -> bool872"""Is this console interactive?873"""874return sys.stdin is not None and sys.stdin.isatty()875876877def hash_file(path, blocksize=1 << 20):878# type: (str, int) -> Tuple[Any, int]879"""Return (hash, length) for path using hashlib.sha256()880"""881882h = hashlib.sha256()883length = 0884with open(path, 'rb') as f:885for block in read_chunks(f, size=blocksize):886length += len(block)887h.update(block)888return h, length889890891def is_wheel_installed():892"""893Return whether the wheel package is installed.894"""895try:896import wheel # noqa: F401897except ImportError:898return False899900return True901902903def pairwise(iterable):904# type: (Iterable[Any]) -> Iterator[Tuple[Any, Any]]905"""906Return paired elements.907908For example:909s -> (s0, s1), (s2, s3), (s4, s5), ...910"""911iterable = iter(iterable)912return zip_longest(iterable, iterable)913914915