Path: blob/master/venv/Lib/site-packages/pip/_internal/vcs/versioncontrol.py
811 views
"""Handles all VCS (version control) support"""12from __future__ import absolute_import34import errno5import logging6import os7import shutil8import sys910from pip._vendor import pkg_resources11from pip._vendor.six.moves.urllib import parse as urllib_parse1213from pip._internal.exceptions import BadCommand, InstallationError14from pip._internal.utils.compat import samefile15from pip._internal.utils.misc import (16ask_path_exists,17backup_dir,18display_path,19hide_url,20hide_value,21rmtree,22)23from pip._internal.utils.subprocess import call_subprocess, make_command24from pip._internal.utils.typing import MYPY_CHECK_RUNNING25from pip._internal.utils.urls import get_url_scheme2627if MYPY_CHECK_RUNNING:28from typing import (29Any, Dict, Iterable, Iterator, List, Mapping, Optional, Text, Tuple,30Type, Union31)32from pip._internal.cli.spinners import SpinnerInterface33from pip._internal.utils.misc import HiddenText34from pip._internal.utils.subprocess import CommandArgs3536AuthInfo = Tuple[Optional[str], Optional[str]]373839__all__ = ['vcs']404142logger = logging.getLogger(__name__)434445def is_url(name):46# type: (Union[str, Text]) -> bool47"""48Return true if the name looks like a URL.49"""50scheme = get_url_scheme(name)51if scheme is None:52return False53return scheme in ['http', 'https', 'file', 'ftp'] + vcs.all_schemes545556def make_vcs_requirement_url(repo_url, rev, project_name, subdir=None):57# type: (str, str, str, Optional[str]) -> str58"""59Return the URL for a VCS requirement.6061Args:62repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").63project_name: the (unescaped) project name.64"""65egg_project_name = pkg_resources.to_filename(project_name)66req = '{}@{}#egg={}'.format(repo_url, rev, egg_project_name)67if subdir:68req += '&subdirectory={}'.format(subdir)6970return req717273def find_path_to_setup_from_repo_root(location, repo_root):74# type: (str, str) -> Optional[str]75"""76Find the path to `setup.py` by searching up the filesystem from `location`.77Return the path to `setup.py` relative to `repo_root`.78Return None if `setup.py` is in `repo_root` or cannot be found.79"""80# find setup.py81orig_location = location82while not os.path.exists(os.path.join(location, 'setup.py')):83last_location = location84location = os.path.dirname(location)85if location == last_location:86# We've traversed up to the root of the filesystem without87# finding setup.py88logger.warning(89"Could not find setup.py for directory %s (tried all "90"parent directories)",91orig_location,92)93return None9495if samefile(repo_root, location):96return None9798return os.path.relpath(location, repo_root)99100101class RemoteNotFoundError(Exception):102pass103104105class RevOptions(object):106107"""108Encapsulates a VCS-specific revision to install, along with any VCS109install options.110111Instances of this class should be treated as if immutable.112"""113114def __init__(115self,116vc_class, # type: Type[VersionControl]117rev=None, # type: Optional[str]118extra_args=None, # type: Optional[CommandArgs]119):120# type: (...) -> None121"""122Args:123vc_class: a VersionControl subclass.124rev: the name of the revision to install.125extra_args: a list of extra options.126"""127if extra_args is None:128extra_args = []129130self.extra_args = extra_args131self.rev = rev132self.vc_class = vc_class133self.branch_name = None # type: Optional[str]134135def __repr__(self):136# type: () -> str137return '<RevOptions {}: rev={!r}>'.format(self.vc_class.name, self.rev)138139@property140def arg_rev(self):141# type: () -> Optional[str]142if self.rev is None:143return self.vc_class.default_arg_rev144145return self.rev146147def to_args(self):148# type: () -> CommandArgs149"""150Return the VCS-specific command arguments.151"""152args = [] # type: CommandArgs153rev = self.arg_rev154if rev is not None:155args += self.vc_class.get_base_rev_args(rev)156args += self.extra_args157158return args159160def to_display(self):161# type: () -> str162if not self.rev:163return ''164165return ' (to revision {})'.format(self.rev)166167def make_new(self, rev):168# type: (str) -> RevOptions169"""170Make a copy of the current instance, but with a new rev.171172Args:173rev: the name of the revision for the new object.174"""175return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)176177178class VcsSupport(object):179_registry = {} # type: Dict[str, VersionControl]180schemes = ['ssh', 'git', 'hg', 'bzr', 'sftp', 'svn']181182def __init__(self):183# type: () -> None184# Register more schemes with urlparse for various version control185# systems186urllib_parse.uses_netloc.extend(self.schemes)187# Python >= 2.7.4, 3.3 doesn't have uses_fragment188if getattr(urllib_parse, 'uses_fragment', None):189urllib_parse.uses_fragment.extend(self.schemes)190super(VcsSupport, self).__init__()191192def __iter__(self):193# type: () -> Iterator[str]194return self._registry.__iter__()195196@property197def backends(self):198# type: () -> List[VersionControl]199return list(self._registry.values())200201@property202def dirnames(self):203# type: () -> List[str]204return [backend.dirname for backend in self.backends]205206@property207def all_schemes(self):208# type: () -> List[str]209schemes = [] # type: List[str]210for backend in self.backends:211schemes.extend(backend.schemes)212return schemes213214def register(self, cls):215# type: (Type[VersionControl]) -> None216if not hasattr(cls, 'name'):217logger.warning('Cannot register VCS %s', cls.__name__)218return219if cls.name not in self._registry:220self._registry[cls.name] = cls()221logger.debug('Registered VCS backend: %s', cls.name)222223def unregister(self, name):224# type: (str) -> None225if name in self._registry:226del self._registry[name]227228def get_backend_for_dir(self, location):229# type: (str) -> Optional[VersionControl]230"""231Return a VersionControl object if a repository of that type is found232at the given directory.233"""234vcs_backends = {}235for vcs_backend in self._registry.values():236repo_path = vcs_backend.get_repository_root(location)237if not repo_path:238continue239logger.debug('Determine that %s uses VCS: %s',240location, vcs_backend.name)241vcs_backends[repo_path] = vcs_backend242243if not vcs_backends:244return None245246# Choose the VCS in the inner-most directory. Since all repository247# roots found here would be either `location` or one of its248# parents, the longest path should have the most path components,249# i.e. the backend representing the inner-most repository.250inner_most_repo_path = max(vcs_backends, key=len)251return vcs_backends[inner_most_repo_path]252253def get_backend_for_scheme(self, scheme):254# type: (str) -> Optional[VersionControl]255"""256Return a VersionControl object or None.257"""258for vcs_backend in self._registry.values():259if scheme in vcs_backend.schemes:260return vcs_backend261return None262263def get_backend(self, name):264# type: (str) -> Optional[VersionControl]265"""266Return a VersionControl object or None.267"""268name = name.lower()269return self._registry.get(name)270271272vcs = VcsSupport()273274275class VersionControl(object):276name = ''277dirname = ''278repo_name = ''279# List of supported schemes for this Version Control280schemes = () # type: Tuple[str, ...]281# Iterable of environment variable names to pass to call_subprocess().282unset_environ = () # type: Tuple[str, ...]283default_arg_rev = None # type: Optional[str]284285@classmethod286def should_add_vcs_url_prefix(cls, remote_url):287# type: (str) -> bool288"""289Return whether the vcs prefix (e.g. "git+") should be added to a290repository's remote url when used in a requirement.291"""292return not remote_url.lower().startswith('{}:'.format(cls.name))293294@classmethod295def get_subdirectory(cls, location):296# type: (str) -> Optional[str]297"""298Return the path to setup.py, relative to the repo root.299Return None if setup.py is in the repo root.300"""301return None302303@classmethod304def get_requirement_revision(cls, repo_dir):305# type: (str) -> str306"""307Return the revision string that should be used in a requirement.308"""309return cls.get_revision(repo_dir)310311@classmethod312def get_src_requirement(cls, repo_dir, project_name):313# type: (str, str) -> Optional[str]314"""315Return the requirement string to use to redownload the files316currently at the given repository directory.317318Args:319project_name: the (unescaped) project name.320321The return value has a form similar to the following:322323{repository_url}@{revision}#egg={project_name}324"""325repo_url = cls.get_remote_url(repo_dir)326if repo_url is None:327return None328329if cls.should_add_vcs_url_prefix(repo_url):330repo_url = '{}+{}'.format(cls.name, repo_url)331332revision = cls.get_requirement_revision(repo_dir)333subdir = cls.get_subdirectory(repo_dir)334req = make_vcs_requirement_url(repo_url, revision, project_name,335subdir=subdir)336337return req338339@staticmethod340def get_base_rev_args(rev):341# type: (str) -> List[str]342"""343Return the base revision arguments for a vcs command.344345Args:346rev: the name of a revision to install. Cannot be None.347"""348raise NotImplementedError349350def is_immutable_rev_checkout(self, url, dest):351# type: (str, str) -> bool352"""353Return true if the commit hash checked out at dest matches354the revision in url.355356Always return False, if the VCS does not support immutable commit357hashes.358359This method does not check if there are local uncommitted changes360in dest after checkout, as pip currently has no use case for that.361"""362return False363364@classmethod365def make_rev_options(cls, rev=None, extra_args=None):366# type: (Optional[str], Optional[CommandArgs]) -> RevOptions367"""368Return a RevOptions object.369370Args:371rev: the name of a revision to install.372extra_args: a list of extra options.373"""374return RevOptions(cls, rev, extra_args=extra_args)375376@classmethod377def _is_local_repository(cls, repo):378# type: (str) -> bool379"""380posix absolute paths start with os.path.sep,381win32 ones start with drive (like c:\\folder)382"""383drive, tail = os.path.splitdrive(repo)384return repo.startswith(os.path.sep) or bool(drive)385386def export(self, location, url):387# type: (str, HiddenText) -> None388"""389Export the repository at the url to the destination location390i.e. only download the files, without vcs informations391392:param url: the repository URL starting with a vcs prefix.393"""394raise NotImplementedError395396@classmethod397def get_netloc_and_auth(cls, netloc, scheme):398# type: (str, str) -> Tuple[str, Tuple[Optional[str], Optional[str]]]399"""400Parse the repository URL's netloc, and return the new netloc to use401along with auth information.402403Args:404netloc: the original repository URL netloc.405scheme: the repository URL's scheme without the vcs prefix.406407This is mainly for the Subversion class to override, so that auth408information can be provided via the --username and --password options409instead of through the URL. For other subclasses like Git without410such an option, auth information must stay in the URL.411412Returns: (netloc, (username, password)).413"""414return netloc, (None, None)415416@classmethod417def get_url_rev_and_auth(cls, url):418# type: (str) -> Tuple[str, Optional[str], AuthInfo]419"""420Parse the repository URL to use, and return the URL, revision,421and auth info to use.422423Returns: (url, rev, (username, password)).424"""425scheme, netloc, path, query, frag = urllib_parse.urlsplit(url)426if '+' not in scheme:427raise ValueError(428"Sorry, {!r} is a malformed VCS url. "429"The format is <vcs>+<protocol>://<url>, "430"e.g. svn+http://myrepo/svn/MyApp#egg=MyApp".format(url)431)432# Remove the vcs prefix.433scheme = scheme.split('+', 1)[1]434netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)435rev = None436if '@' in path:437path, rev = path.rsplit('@', 1)438if not rev:439raise InstallationError(440"The URL {!r} has an empty revision (after @) "441"which is not supported. Include a revision after @ "442"or remove @ from the URL.".format(url)443)444url = urllib_parse.urlunsplit((scheme, netloc, path, query, ''))445return url, rev, user_pass446447@staticmethod448def make_rev_args(username, password):449# type: (Optional[str], Optional[HiddenText]) -> CommandArgs450"""451Return the RevOptions "extra arguments" to use in obtain().452"""453return []454455def get_url_rev_options(self, url):456# type: (HiddenText) -> Tuple[HiddenText, RevOptions]457"""458Return the URL and RevOptions object to use in obtain() and in459some cases export(), as a tuple (url, rev_options).460"""461secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)462username, secret_password = user_pass463password = None # type: Optional[HiddenText]464if secret_password is not None:465password = hide_value(secret_password)466extra_args = self.make_rev_args(username, password)467rev_options = self.make_rev_options(rev, extra_args=extra_args)468469return hide_url(secret_url), rev_options470471@staticmethod472def normalize_url(url):473# type: (str) -> str474"""475Normalize a URL for comparison by unquoting it and removing any476trailing slash.477"""478return urllib_parse.unquote(url).rstrip('/')479480@classmethod481def compare_urls(cls, url1, url2):482# type: (str, str) -> bool483"""484Compare two repo URLs for identity, ignoring incidental differences.485"""486return (cls.normalize_url(url1) == cls.normalize_url(url2))487488def fetch_new(self, dest, url, rev_options):489# type: (str, HiddenText, RevOptions) -> None490"""491Fetch a revision from a repository, in the case that this is the492first fetch from the repository.493494Args:495dest: the directory to fetch the repository to.496rev_options: a RevOptions object.497"""498raise NotImplementedError499500def switch(self, dest, url, rev_options):501# type: (str, HiddenText, RevOptions) -> None502"""503Switch the repo at ``dest`` to point to ``URL``.504505Args:506rev_options: a RevOptions object.507"""508raise NotImplementedError509510def update(self, dest, url, rev_options):511# type: (str, HiddenText, RevOptions) -> None512"""513Update an already-existing repo to the given ``rev_options``.514515Args:516rev_options: a RevOptions object.517"""518raise NotImplementedError519520@classmethod521def is_commit_id_equal(cls, dest, name):522# type: (str, Optional[str]) -> bool523"""524Return whether the id of the current commit equals the given name.525526Args:527dest: the repository directory.528name: a string name.529"""530raise NotImplementedError531532def obtain(self, dest, url):533# type: (str, HiddenText) -> None534"""535Install or update in editable mode the package represented by this536VersionControl object.537538:param dest: the repository directory in which to install or update.539:param url: the repository URL starting with a vcs prefix.540"""541url, rev_options = self.get_url_rev_options(url)542543if not os.path.exists(dest):544self.fetch_new(dest, url, rev_options)545return546547rev_display = rev_options.to_display()548if self.is_repository_directory(dest):549existing_url = self.get_remote_url(dest)550if self.compare_urls(existing_url, url.secret):551logger.debug(552'%s in %s exists, and has correct URL (%s)',553self.repo_name.title(),554display_path(dest),555url,556)557if not self.is_commit_id_equal(dest, rev_options.rev):558logger.info(559'Updating %s %s%s',560display_path(dest),561self.repo_name,562rev_display,563)564self.update(dest, url, rev_options)565else:566logger.info('Skipping because already up-to-date.')567return568569logger.warning(570'%s %s in %s exists with URL %s',571self.name,572self.repo_name,573display_path(dest),574existing_url,575)576prompt = ('(s)witch, (i)gnore, (w)ipe, (b)ackup ',577('s', 'i', 'w', 'b'))578else:579logger.warning(580'Directory %s already exists, and is not a %s %s.',581dest,582self.name,583self.repo_name,584)585# https://github.com/python/mypy/issues/1174586prompt = ('(i)gnore, (w)ipe, (b)ackup ', # type: ignore587('i', 'w', 'b'))588589logger.warning(590'The plan is to install the %s repository %s',591self.name,592url,593)594response = ask_path_exists('What to do? {}'.format(595prompt[0]), prompt[1])596597if response == 'a':598sys.exit(-1)599600if response == 'w':601logger.warning('Deleting %s', display_path(dest))602rmtree(dest)603self.fetch_new(dest, url, rev_options)604return605606if response == 'b':607dest_dir = backup_dir(dest)608logger.warning(609'Backing up %s to %s', display_path(dest), dest_dir,610)611shutil.move(dest, dest_dir)612self.fetch_new(dest, url, rev_options)613return614615# Do nothing if the response is "i".616if response == 's':617logger.info(618'Switching %s %s to %s%s',619self.repo_name,620display_path(dest),621url,622rev_display,623)624self.switch(dest, url, rev_options)625626def unpack(self, location, url):627# type: (str, HiddenText) -> None628"""629Clean up current location and download the url repository630(and vcs infos) into location631632:param url: the repository URL starting with a vcs prefix.633"""634if os.path.exists(location):635rmtree(location)636self.obtain(location, url=url)637638@classmethod639def get_remote_url(cls, location):640# type: (str) -> str641"""642Return the url used at location643644Raises RemoteNotFoundError if the repository does not have a remote645url configured.646"""647raise NotImplementedError648649@classmethod650def get_revision(cls, location):651# type: (str) -> str652"""653Return the current commit id of the files at the given location.654"""655raise NotImplementedError656657@classmethod658def run_command(659cls,660cmd, # type: Union[List[str], CommandArgs]661show_stdout=True, # type: bool662cwd=None, # type: Optional[str]663on_returncode='raise', # type: str664extra_ok_returncodes=None, # type: Optional[Iterable[int]]665command_desc=None, # type: Optional[str]666extra_environ=None, # type: Optional[Mapping[str, Any]]667spinner=None, # type: Optional[SpinnerInterface]668log_failed_cmd=True # type: bool669):670# type: (...) -> Text671"""672Run a VCS subcommand673This is simply a wrapper around call_subprocess that adds the VCS674command name, and checks that the VCS is available675"""676cmd = make_command(cls.name, *cmd)677try:678return call_subprocess(cmd, show_stdout, cwd,679on_returncode=on_returncode,680extra_ok_returncodes=extra_ok_returncodes,681command_desc=command_desc,682extra_environ=extra_environ,683unset_environ=cls.unset_environ,684spinner=spinner,685log_failed_cmd=log_failed_cmd)686except OSError as e:687# errno.ENOENT = no such file or directory688# In other words, the VCS executable isn't available689if e.errno == errno.ENOENT:690raise BadCommand(691'Cannot find command {cls.name!r} - do you have '692'{cls.name!r} installed and in your '693'PATH?'.format(**locals()))694else:695raise # re-raise exception if a different error occurred696697@classmethod698def is_repository_directory(cls, path):699# type: (str) -> bool700"""701Return whether a directory path is a repository directory.702"""703logger.debug('Checking in %s for %s (%s)...',704path, cls.dirname, cls.name)705return os.path.exists(os.path.join(path, cls.dirname))706707@classmethod708def get_repository_root(cls, location):709# type: (str) -> Optional[str]710"""711Return the "root" (top-level) directory controlled by the vcs,712or `None` if the directory is not in any.713714It is meant to be overridden to implement smarter detection715mechanisms for specific vcs.716717This can do more than is_repository_directory() alone. For718example, the Git override checks that Git is actually available.719"""720if cls.is_repository_directory(location):721return location722return None723724725