Path: blob/main/test/lib/python3.9/site-packages/pip/_internal/vcs/versioncontrol.py
4804 views
"""Handles all VCS (version control) support"""12import logging3import os4import shutil5import sys6import urllib.parse7from typing import (8TYPE_CHECKING,9Any,10Dict,11Iterable,12Iterator,13List,14Mapping,15Optional,16Tuple,17Type,18Union,19)2021from pip._internal.cli.spinners import SpinnerInterface22from pip._internal.exceptions import BadCommand, InstallationError23from pip._internal.utils.misc import (24HiddenText,25ask_path_exists,26backup_dir,27display_path,28hide_url,29hide_value,30is_installable_dir,31rmtree,32)33from pip._internal.utils.subprocess import (34CommandArgs,35call_subprocess,36format_command_args,37make_command,38)39from pip._internal.utils.urls import get_url_scheme4041if TYPE_CHECKING:42# Literal was introduced in Python 3.8.43#44# TODO: Remove `if TYPE_CHECKING` when dropping support for Python 3.7.45from typing import Literal464748__all__ = ["vcs"]495051logger = logging.getLogger(__name__)5253AuthInfo = Tuple[Optional[str], Optional[str]]545556def is_url(name: str) -> bool:57"""58Return true if the name looks like a URL.59"""60scheme = get_url_scheme(name)61if scheme is None:62return False63return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes646566def make_vcs_requirement_url(67repo_url: str, rev: str, project_name: str, subdir: Optional[str] = None68) -> str:69"""70Return the URL for a VCS requirement.7172Args:73repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").74project_name: the (unescaped) project name.75"""76egg_project_name = project_name.replace("-", "_")77req = f"{repo_url}@{rev}#egg={egg_project_name}"78if subdir:79req += f"&subdirectory={subdir}"8081return req828384def find_path_to_project_root_from_repo_root(85location: str, repo_root: str86) -> Optional[str]:87"""88Find the the Python project's root by searching up the filesystem from89`location`. Return the path to project root relative to `repo_root`.90Return None if the project root is `repo_root`, or cannot be found.91"""92# find project root.93orig_location = location94while not is_installable_dir(location):95last_location = location96location = os.path.dirname(location)97if location == last_location:98# We've traversed up to the root of the filesystem without99# finding a Python project.100logger.warning(101"Could not find a Python project for directory %s (tried all "102"parent directories)",103orig_location,104)105return None106107if os.path.samefile(repo_root, location):108return None109110return os.path.relpath(location, repo_root)111112113class RemoteNotFoundError(Exception):114pass115116117class RemoteNotValidError(Exception):118def __init__(self, url: str):119super().__init__(url)120self.url = url121122123class RevOptions:124125"""126Encapsulates a VCS-specific revision to install, along with any VCS127install options.128129Instances of this class should be treated as if immutable.130"""131132def __init__(133self,134vc_class: Type["VersionControl"],135rev: Optional[str] = None,136extra_args: Optional[CommandArgs] = None,137) -> None:138"""139Args:140vc_class: a VersionControl subclass.141rev: the name of the revision to install.142extra_args: a list of extra options.143"""144if extra_args is None:145extra_args = []146147self.extra_args = extra_args148self.rev = rev149self.vc_class = vc_class150self.branch_name: Optional[str] = None151152def __repr__(self) -> str:153return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>"154155@property156def arg_rev(self) -> Optional[str]:157if self.rev is None:158return self.vc_class.default_arg_rev159160return self.rev161162def to_args(self) -> CommandArgs:163"""164Return the VCS-specific command arguments.165"""166args: CommandArgs = []167rev = self.arg_rev168if rev is not None:169args += self.vc_class.get_base_rev_args(rev)170args += self.extra_args171172return args173174def to_display(self) -> str:175if not self.rev:176return ""177178return f" (to revision {self.rev})"179180def make_new(self, rev: str) -> "RevOptions":181"""182Make a copy of the current instance, but with a new rev.183184Args:185rev: the name of the revision for the new object.186"""187return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)188189190class VcsSupport:191_registry: Dict[str, "VersionControl"] = {}192schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"]193194def __init__(self) -> None:195# Register more schemes with urlparse for various version control196# systems197urllib.parse.uses_netloc.extend(self.schemes)198super().__init__()199200def __iter__(self) -> Iterator[str]:201return self._registry.__iter__()202203@property204def backends(self) -> List["VersionControl"]:205return list(self._registry.values())206207@property208def dirnames(self) -> List[str]:209return [backend.dirname for backend in self.backends]210211@property212def all_schemes(self) -> List[str]:213schemes: List[str] = []214for backend in self.backends:215schemes.extend(backend.schemes)216return schemes217218def register(self, cls: Type["VersionControl"]) -> None:219if not hasattr(cls, "name"):220logger.warning("Cannot register VCS %s", cls.__name__)221return222if cls.name not in self._registry:223self._registry[cls.name] = cls()224logger.debug("Registered VCS backend: %s", cls.name)225226def unregister(self, name: str) -> None:227if name in self._registry:228del self._registry[name]229230def get_backend_for_dir(self, location: str) -> Optional["VersionControl"]:231"""232Return a VersionControl object if a repository of that type is found233at the given directory.234"""235vcs_backends = {}236for vcs_backend in self._registry.values():237repo_path = vcs_backend.get_repository_root(location)238if not repo_path:239continue240logger.debug("Determine that %s uses VCS: %s", location, vcs_backend.name)241vcs_backends[repo_path] = vcs_backend242243if not vcs_backends:244return None245246# Choose the VCS in the inner-most directory. Since all repository247# roots found here would be either `location` or one of its248# parents, the longest path should have the most path components,249# i.e. the backend representing the inner-most repository.250inner_most_repo_path = max(vcs_backends, key=len)251return vcs_backends[inner_most_repo_path]252253def get_backend_for_scheme(self, scheme: str) -> Optional["VersionControl"]:254"""255Return a VersionControl object or None.256"""257for vcs_backend in self._registry.values():258if scheme in vcs_backend.schemes:259return vcs_backend260return None261262def get_backend(self, name: str) -> Optional["VersionControl"]:263"""264Return a VersionControl object or None.265"""266name = name.lower()267return self._registry.get(name)268269270vcs = VcsSupport()271272273class VersionControl:274name = ""275dirname = ""276repo_name = ""277# List of supported schemes for this Version Control278schemes: Tuple[str, ...] = ()279# Iterable of environment variable names to pass to call_subprocess().280unset_environ: Tuple[str, ...] = ()281default_arg_rev: Optional[str] = None282283@classmethod284def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:285"""286Return whether the vcs prefix (e.g. "git+") should be added to a287repository's remote url when used in a requirement.288"""289return not remote_url.lower().startswith(f"{cls.name}:")290291@classmethod292def get_subdirectory(cls, location: str) -> Optional[str]:293"""294Return the path to Python project root, relative to the repo root.295Return None if the project root is in the repo root.296"""297return None298299@classmethod300def get_requirement_revision(cls, repo_dir: str) -> str:301"""302Return the revision string that should be used in a requirement.303"""304return cls.get_revision(repo_dir)305306@classmethod307def get_src_requirement(cls, repo_dir: str, project_name: str) -> str:308"""309Return the requirement string to use to redownload the files310currently at the given repository directory.311312Args:313project_name: the (unescaped) project name.314315The return value has a form similar to the following:316317{repository_url}@{revision}#egg={project_name}318"""319repo_url = cls.get_remote_url(repo_dir)320321if cls.should_add_vcs_url_prefix(repo_url):322repo_url = f"{cls.name}+{repo_url}"323324revision = cls.get_requirement_revision(repo_dir)325subdir = cls.get_subdirectory(repo_dir)326req = make_vcs_requirement_url(repo_url, revision, project_name, subdir=subdir)327328return req329330@staticmethod331def get_base_rev_args(rev: str) -> List[str]:332"""333Return the base revision arguments for a vcs command.334335Args:336rev: the name of a revision to install. Cannot be None.337"""338raise NotImplementedError339340def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:341"""342Return true if the commit hash checked out at dest matches343the revision in url.344345Always return False, if the VCS does not support immutable commit346hashes.347348This method does not check if there are local uncommitted changes349in dest after checkout, as pip currently has no use case for that.350"""351return False352353@classmethod354def make_rev_options(355cls, rev: Optional[str] = None, extra_args: Optional[CommandArgs] = None356) -> RevOptions:357"""358Return a RevOptions object.359360Args:361rev: the name of a revision to install.362extra_args: a list of extra options.363"""364return RevOptions(cls, rev, extra_args=extra_args)365366@classmethod367def _is_local_repository(cls, repo: str) -> bool:368"""369posix absolute paths start with os.path.sep,370win32 ones start with drive (like c:\\folder)371"""372drive, tail = os.path.splitdrive(repo)373return repo.startswith(os.path.sep) or bool(drive)374375@classmethod376def get_netloc_and_auth(377cls, netloc: str, scheme: str378) -> Tuple[str, Tuple[Optional[str], Optional[str]]]:379"""380Parse the repository URL's netloc, and return the new netloc to use381along with auth information.382383Args:384netloc: the original repository URL netloc.385scheme: the repository URL's scheme without the vcs prefix.386387This is mainly for the Subversion class to override, so that auth388information can be provided via the --username and --password options389instead of through the URL. For other subclasses like Git without390such an option, auth information must stay in the URL.391392Returns: (netloc, (username, password)).393"""394return netloc, (None, None)395396@classmethod397def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:398"""399Parse the repository URL to use, and return the URL, revision,400and auth info to use.401402Returns: (url, rev, (username, password)).403"""404scheme, netloc, path, query, frag = urllib.parse.urlsplit(url)405if "+" not in scheme:406raise ValueError(407"Sorry, {!r} is a malformed VCS url. "408"The format is <vcs>+<protocol>://<url>, "409"e.g. svn+http://myrepo/svn/MyApp#egg=MyApp".format(url)410)411# Remove the vcs prefix.412scheme = scheme.split("+", 1)[1]413netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)414rev = None415if "@" in path:416path, rev = path.rsplit("@", 1)417if not rev:418raise InstallationError(419"The URL {!r} has an empty revision (after @) "420"which is not supported. Include a revision after @ "421"or remove @ from the URL.".format(url)422)423url = urllib.parse.urlunsplit((scheme, netloc, path, query, ""))424return url, rev, user_pass425426@staticmethod427def make_rev_args(428username: Optional[str], password: Optional[HiddenText]429) -> CommandArgs:430"""431Return the RevOptions "extra arguments" to use in obtain().432"""433return []434435def get_url_rev_options(self, url: HiddenText) -> Tuple[HiddenText, RevOptions]:436"""437Return the URL and RevOptions object to use in obtain(),438as a tuple (url, rev_options).439"""440secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)441username, secret_password = user_pass442password: Optional[HiddenText] = None443if secret_password is not None:444password = hide_value(secret_password)445extra_args = self.make_rev_args(username, password)446rev_options = self.make_rev_options(rev, extra_args=extra_args)447448return hide_url(secret_url), rev_options449450@staticmethod451def normalize_url(url: str) -> str:452"""453Normalize a URL for comparison by unquoting it and removing any454trailing slash.455"""456return urllib.parse.unquote(url).rstrip("/")457458@classmethod459def compare_urls(cls, url1: str, url2: str) -> bool:460"""461Compare two repo URLs for identity, ignoring incidental differences.462"""463return cls.normalize_url(url1) == cls.normalize_url(url2)464465def fetch_new(466self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int467) -> None:468"""469Fetch a revision from a repository, in the case that this is the470first fetch from the repository.471472Args:473dest: the directory to fetch the repository to.474rev_options: a RevOptions object.475verbosity: verbosity level.476"""477raise NotImplementedError478479def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:480"""481Switch the repo at ``dest`` to point to ``URL``.482483Args:484rev_options: a RevOptions object.485"""486raise NotImplementedError487488def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:489"""490Update an already-existing repo to the given ``rev_options``.491492Args:493rev_options: a RevOptions object.494"""495raise NotImplementedError496497@classmethod498def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:499"""500Return whether the id of the current commit equals the given name.501502Args:503dest: the repository directory.504name: a string name.505"""506raise NotImplementedError507508def obtain(self, dest: str, url: HiddenText, verbosity: int) -> None:509"""510Install or update in editable mode the package represented by this511VersionControl object.512513:param dest: the repository directory in which to install or update.514:param url: the repository URL starting with a vcs prefix.515:param verbosity: verbosity level.516"""517url, rev_options = self.get_url_rev_options(url)518519if not os.path.exists(dest):520self.fetch_new(dest, url, rev_options, verbosity=verbosity)521return522523rev_display = rev_options.to_display()524if self.is_repository_directory(dest):525existing_url = self.get_remote_url(dest)526if self.compare_urls(existing_url, url.secret):527logger.debug(528"%s in %s exists, and has correct URL (%s)",529self.repo_name.title(),530display_path(dest),531url,532)533if not self.is_commit_id_equal(dest, rev_options.rev):534logger.info(535"Updating %s %s%s",536display_path(dest),537self.repo_name,538rev_display,539)540self.update(dest, url, rev_options)541else:542logger.info("Skipping because already up-to-date.")543return544545logger.warning(546"%s %s in %s exists with URL %s",547self.name,548self.repo_name,549display_path(dest),550existing_url,551)552prompt = ("(s)witch, (i)gnore, (w)ipe, (b)ackup ", ("s", "i", "w", "b"))553else:554logger.warning(555"Directory %s already exists, and is not a %s %s.",556dest,557self.name,558self.repo_name,559)560# https://github.com/python/mypy/issues/1174561prompt = ("(i)gnore, (w)ipe, (b)ackup ", ("i", "w", "b")) # type: ignore562563logger.warning(564"The plan is to install the %s repository %s",565self.name,566url,567)568response = ask_path_exists("What to do? {}".format(prompt[0]), prompt[1])569570if response == "a":571sys.exit(-1)572573if response == "w":574logger.warning("Deleting %s", display_path(dest))575rmtree(dest)576self.fetch_new(dest, url, rev_options, verbosity=verbosity)577return578579if response == "b":580dest_dir = backup_dir(dest)581logger.warning("Backing up %s to %s", display_path(dest), dest_dir)582shutil.move(dest, dest_dir)583self.fetch_new(dest, url, rev_options, verbosity=verbosity)584return585586# Do nothing if the response is "i".587if response == "s":588logger.info(589"Switching %s %s to %s%s",590self.repo_name,591display_path(dest),592url,593rev_display,594)595self.switch(dest, url, rev_options)596597def unpack(self, location: str, url: HiddenText, verbosity: int) -> None:598"""599Clean up current location and download the url repository600(and vcs infos) into location601602:param url: the repository URL starting with a vcs prefix.603:param verbosity: verbosity level.604"""605if os.path.exists(location):606rmtree(location)607self.obtain(location, url=url, verbosity=verbosity)608609@classmethod610def get_remote_url(cls, location: str) -> str:611"""612Return the url used at location613614Raises RemoteNotFoundError if the repository does not have a remote615url configured.616"""617raise NotImplementedError618619@classmethod620def get_revision(cls, location: str) -> str:621"""622Return the current commit id of the files at the given location.623"""624raise NotImplementedError625626@classmethod627def run_command(628cls,629cmd: Union[List[str], CommandArgs],630show_stdout: bool = True,631cwd: Optional[str] = None,632on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise",633extra_ok_returncodes: Optional[Iterable[int]] = None,634command_desc: Optional[str] = None,635extra_environ: Optional[Mapping[str, Any]] = None,636spinner: Optional[SpinnerInterface] = None,637log_failed_cmd: bool = True,638stdout_only: bool = False,639) -> str:640"""641Run a VCS subcommand642This is simply a wrapper around call_subprocess that adds the VCS643command name, and checks that the VCS is available644"""645cmd = make_command(cls.name, *cmd)646if command_desc is None:647command_desc = format_command_args(cmd)648try:649return call_subprocess(650cmd,651show_stdout,652cwd,653on_returncode=on_returncode,654extra_ok_returncodes=extra_ok_returncodes,655command_desc=command_desc,656extra_environ=extra_environ,657unset_environ=cls.unset_environ,658spinner=spinner,659log_failed_cmd=log_failed_cmd,660stdout_only=stdout_only,661)662except FileNotFoundError:663# errno.ENOENT = no such file or directory664# In other words, the VCS executable isn't available665raise BadCommand(666f"Cannot find command {cls.name!r} - do you have "667f"{cls.name!r} installed and in your PATH?"668)669except PermissionError:670# errno.EACCES = Permission denied671# This error occurs, for instance, when the command is installed672# only for another user. So, the current user don't have673# permission to call the other user command.674raise BadCommand(675f"No permission to execute {cls.name!r} - install it "676f"locally, globally (ask admin), or check your PATH. "677f"See possible solutions at "678f"https://pip.pypa.io/en/latest/reference/pip_freeze/"679f"#fixing-permission-denied."680)681682@classmethod683def is_repository_directory(cls, path: str) -> bool:684"""685Return whether a directory path is a repository directory.686"""687logger.debug("Checking in %s for %s (%s)...", path, cls.dirname, cls.name)688return os.path.exists(os.path.join(path, cls.dirname))689690@classmethod691def get_repository_root(cls, location: str) -> Optional[str]:692"""693Return the "root" (top-level) directory controlled by the vcs,694or `None` if the directory is not in any.695696It is meant to be overridden to implement smarter detection697mechanisms for specific vcs.698699This can do more than is_repository_directory() alone. For700example, the Git override checks that Git is actually available.701"""702if cls.is_repository_directory(location):703return location704return None705706707