Path: blob/main/test/lib/python3.9/site-packages/pip/_internal/vcs/git.py
4804 views
import logging1import os.path2import pathlib3import re4import urllib.parse5import urllib.request6from typing import List, Optional, Tuple78from pip._internal.exceptions import BadCommand, InstallationError9from pip._internal.utils.misc import HiddenText, display_path, hide_url10from pip._internal.utils.subprocess import make_command11from pip._internal.vcs.versioncontrol import (12AuthInfo,13RemoteNotFoundError,14RemoteNotValidError,15RevOptions,16VersionControl,17find_path_to_project_root_from_repo_root,18vcs,19)2021urlsplit = urllib.parse.urlsplit22urlunsplit = urllib.parse.urlunsplit232425logger = logging.getLogger(__name__)262728GIT_VERSION_REGEX = re.compile(29r"^git version " # Prefix.30r"(\d+)" # Major.31r"\.(\d+)" # Dot, minor.32r"(?:\.(\d+))?" # Optional dot, patch.33r".*$" # Suffix, including any pre- and post-release segments we don't care about.34)3536HASH_REGEX = re.compile("^[a-fA-F0-9]{40}$")3738# SCP (Secure copy protocol) shorthand. e.g. '[email protected]:foo/bar.git'39SCP_REGEX = re.compile(40r"""^41# Optional user, e.g. 'git@'42(\w+@)?43# Server, e.g. 'github.com'.44([^/:]+):45# The server-side path. e.g. 'user/project.git'. Must start with an46# alphanumeric character so as not to be confusable with a Windows paths47# like 'C:/foo/bar' or 'C:\foo\bar'.48(\w[^:]*)49$""",50re.VERBOSE,51)525354def looks_like_hash(sha: str) -> bool:55return bool(HASH_REGEX.match(sha))565758class Git(VersionControl):59name = "git"60dirname = ".git"61repo_name = "clone"62schemes = (63"git+http",64"git+https",65"git+ssh",66"git+git",67"git+file",68)69# Prevent the user's environment variables from interfering with pip:70# https://github.com/pypa/pip/issues/113071unset_environ = ("GIT_DIR", "GIT_WORK_TREE")72default_arg_rev = "HEAD"7374@staticmethod75def get_base_rev_args(rev: str) -> List[str]:76return [rev]7778def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:79_, rev_options = self.get_url_rev_options(hide_url(url))80if not rev_options.rev:81return False82if not self.is_commit_id_equal(dest, rev_options.rev):83# the current commit is different from rev,84# which means rev was something else than a commit hash85return False86# return False in the rare case rev is both a commit hash87# and a tag or a branch; we don't want to cache in that case88# because that branch/tag could point to something else in the future89is_tag_or_branch = bool(self.get_revision_sha(dest, rev_options.rev)[0])90return not is_tag_or_branch9192def get_git_version(self) -> Tuple[int, ...]:93version = self.run_command(94["version"],95command_desc="git version",96show_stdout=False,97stdout_only=True,98)99match = GIT_VERSION_REGEX.match(version)100if not match:101logger.warning("Can't parse git version: %s", version)102return ()103return tuple(int(c) for c in match.groups())104105@classmethod106def get_current_branch(cls, location: str) -> Optional[str]:107"""108Return the current branch, or None if HEAD isn't at a branch109(e.g. detached HEAD).110"""111# git-symbolic-ref exits with empty stdout if "HEAD" is a detached112# HEAD rather than a symbolic ref. In addition, the -q causes the113# command to exit with status code 1 instead of 128 in this case114# and to suppress the message to stderr.115args = ["symbolic-ref", "-q", "HEAD"]116output = cls.run_command(117args,118extra_ok_returncodes=(1,),119show_stdout=False,120stdout_only=True,121cwd=location,122)123ref = output.strip()124125if ref.startswith("refs/heads/"):126return ref[len("refs/heads/") :]127128return None129130@classmethod131def get_revision_sha(cls, dest: str, rev: str) -> Tuple[Optional[str], bool]:132"""133Return (sha_or_none, is_branch), where sha_or_none is a commit hash134if the revision names a remote branch or tag, otherwise None.135136Args:137dest: the repository directory.138rev: the revision name.139"""140# Pass rev to pre-filter the list.141output = cls.run_command(142["show-ref", rev],143cwd=dest,144show_stdout=False,145stdout_only=True,146on_returncode="ignore",147)148refs = {}149# NOTE: We do not use splitlines here since that would split on other150# unicode separators, which can be maliciously used to install a151# different revision.152for line in output.strip().split("\n"):153line = line.rstrip("\r")154if not line:155continue156try:157ref_sha, ref_name = line.split(" ", maxsplit=2)158except ValueError:159# Include the offending line to simplify troubleshooting if160# this error ever occurs.161raise ValueError(f"unexpected show-ref line: {line!r}")162163refs[ref_name] = ref_sha164165branch_ref = f"refs/remotes/origin/{rev}"166tag_ref = f"refs/tags/{rev}"167168sha = refs.get(branch_ref)169if sha is not None:170return (sha, True)171172sha = refs.get(tag_ref)173174return (sha, False)175176@classmethod177def _should_fetch(cls, dest: str, rev: str) -> bool:178"""179Return true if rev is a ref or is a commit that we don't have locally.180181Branches and tags are not considered in this method because they are182assumed to be always available locally (which is a normal outcome of183``git clone`` and ``git fetch --tags``).184"""185if rev.startswith("refs/"):186# Always fetch remote refs.187return True188189if not looks_like_hash(rev):190# Git fetch would fail with abbreviated commits.191return False192193if cls.has_commit(dest, rev):194# Don't fetch if we have the commit locally.195return False196197return True198199@classmethod200def resolve_revision(201cls, dest: str, url: HiddenText, rev_options: RevOptions202) -> RevOptions:203"""204Resolve a revision to a new RevOptions object with the SHA1 of the205branch, tag, or ref if found.206207Args:208rev_options: a RevOptions object.209"""210rev = rev_options.arg_rev211# The arg_rev property's implementation for Git ensures that the212# rev return value is always non-None.213assert rev is not None214215sha, is_branch = cls.get_revision_sha(dest, rev)216217if sha is not None:218rev_options = rev_options.make_new(sha)219rev_options.branch_name = rev if is_branch else None220221return rev_options222223# Do not show a warning for the common case of something that has224# the form of a Git commit hash.225if not looks_like_hash(rev):226logger.warning(227"Did not find branch or tag '%s', assuming revision or ref.",228rev,229)230231if not cls._should_fetch(dest, rev):232return rev_options233234# fetch the requested revision235cls.run_command(236make_command("fetch", "-q", url, rev_options.to_args()),237cwd=dest,238)239# Change the revision to the SHA of the ref we fetched240sha = cls.get_revision(dest, rev="FETCH_HEAD")241rev_options = rev_options.make_new(sha)242243return rev_options244245@classmethod246def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:247"""248Return whether the current commit hash equals the given name.249250Args:251dest: the repository directory.252name: a string name.253"""254if not name:255# Then avoid an unnecessary subprocess call.256return False257258return cls.get_revision(dest) == name259260def fetch_new(261self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int262) -> None:263rev_display = rev_options.to_display()264logger.info("Cloning %s%s to %s", url, rev_display, display_path(dest))265if verbosity <= 0:266flags: Tuple[str, ...] = ("--quiet",)267elif verbosity == 1:268flags = ()269else:270flags = ("--verbose", "--progress")271if self.get_git_version() >= (2, 17):272# Git added support for partial clone in 2.17273# https://git-scm.com/docs/partial-clone274# Speeds up cloning by functioning without a complete copy of repository275self.run_command(276make_command(277"clone",278"--filter=blob:none",279*flags,280url,281dest,282)283)284else:285self.run_command(make_command("clone", *flags, url, dest))286287if rev_options.rev:288# Then a specific revision was requested.289rev_options = self.resolve_revision(dest, url, rev_options)290branch_name = getattr(rev_options, "branch_name", None)291logger.debug("Rev options %s, branch_name %s", rev_options, branch_name)292if branch_name is None:293# Only do a checkout if the current commit id doesn't match294# the requested revision.295if not self.is_commit_id_equal(dest, rev_options.rev):296cmd_args = make_command(297"checkout",298"-q",299rev_options.to_args(),300)301self.run_command(cmd_args, cwd=dest)302elif self.get_current_branch(dest) != branch_name:303# Then a specific branch was requested, and that branch304# is not yet checked out.305track_branch = f"origin/{branch_name}"306cmd_args = [307"checkout",308"-b",309branch_name,310"--track",311track_branch,312]313self.run_command(cmd_args, cwd=dest)314else:315sha = self.get_revision(dest)316rev_options = rev_options.make_new(sha)317318logger.info("Resolved %s to commit %s", url, rev_options.rev)319320#: repo may contain submodules321self.update_submodules(dest)322323def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:324self.run_command(325make_command("config", "remote.origin.url", url),326cwd=dest,327)328cmd_args = make_command("checkout", "-q", rev_options.to_args())329self.run_command(cmd_args, cwd=dest)330331self.update_submodules(dest)332333def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:334# First fetch changes from the default remote335if self.get_git_version() >= (1, 9):336# fetch tags in addition to everything else337self.run_command(["fetch", "-q", "--tags"], cwd=dest)338else:339self.run_command(["fetch", "-q"], cwd=dest)340# Then reset to wanted revision (maybe even origin/master)341rev_options = self.resolve_revision(dest, url, rev_options)342cmd_args = make_command("reset", "--hard", "-q", rev_options.to_args())343self.run_command(cmd_args, cwd=dest)344#: update submodules345self.update_submodules(dest)346347@classmethod348def get_remote_url(cls, location: str) -> str:349"""350Return URL of the first remote encountered.351352Raises RemoteNotFoundError if the repository does not have a remote353url configured.354"""355# We need to pass 1 for extra_ok_returncodes since the command356# exits with return code 1 if there are no matching lines.357stdout = cls.run_command(358["config", "--get-regexp", r"remote\..*\.url"],359extra_ok_returncodes=(1,),360show_stdout=False,361stdout_only=True,362cwd=location,363)364remotes = stdout.splitlines()365try:366found_remote = remotes[0]367except IndexError:368raise RemoteNotFoundError369370for remote in remotes:371if remote.startswith("remote.origin.url "):372found_remote = remote373break374url = found_remote.split(" ")[1]375return cls._git_remote_to_pip_url(url.strip())376377@staticmethod378def _git_remote_to_pip_url(url: str) -> str:379"""380Convert a remote url from what git uses to what pip accepts.381382There are 3 legal forms **url** may take:3833841. A fully qualified url: ssh://[email protected]/foo/bar.git3852. A local project.git folder: /path/to/bare/repository.git3863. SCP shorthand for form 1: [email protected]:foo/bar.git387388Form 1 is output as-is. Form 2 must be converted to URI and form 3 must389be converted to form 1.390391See the corresponding test test_git_remote_url_to_pip() for examples of392sample inputs/outputs.393"""394if re.match(r"\w+://", url):395# This is already valid. Pass it though as-is.396return url397if os.path.exists(url):398# A local bare remote (git clone --mirror).399# Needs a file:// prefix.400return pathlib.PurePath(url).as_uri()401scp_match = SCP_REGEX.match(url)402if scp_match:403# Add an ssh:// prefix and replace the ':' with a '/'.404return scp_match.expand(r"ssh://\1\2/\3")405# Otherwise, bail out.406raise RemoteNotValidError(url)407408@classmethod409def has_commit(cls, location: str, rev: str) -> bool:410"""411Check if rev is a commit that is available in the local repository.412"""413try:414cls.run_command(415["rev-parse", "-q", "--verify", "sha^" + rev],416cwd=location,417log_failed_cmd=False,418)419except InstallationError:420return False421else:422return True423424@classmethod425def get_revision(cls, location: str, rev: Optional[str] = None) -> str:426if rev is None:427rev = "HEAD"428current_rev = cls.run_command(429["rev-parse", rev],430show_stdout=False,431stdout_only=True,432cwd=location,433)434return current_rev.strip()435436@classmethod437def get_subdirectory(cls, location: str) -> Optional[str]:438"""439Return the path to Python project root, relative to the repo root.440Return None if the project root is in the repo root.441"""442# find the repo root443git_dir = cls.run_command(444["rev-parse", "--git-dir"],445show_stdout=False,446stdout_only=True,447cwd=location,448).strip()449if not os.path.isabs(git_dir):450git_dir = os.path.join(location, git_dir)451repo_root = os.path.abspath(os.path.join(git_dir, ".."))452return find_path_to_project_root_from_repo_root(location, repo_root)453454@classmethod455def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:456"""457Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'.458That's required because although they use SSH they sometimes don't459work with a ssh:// scheme (e.g. GitHub). But we need a scheme for460parsing. Hence we remove it again afterwards and return it as a stub.461"""462# Works around an apparent Git bug463# (see https://article.gmane.org/gmane.comp.version-control.git/146500)464scheme, netloc, path, query, fragment = urlsplit(url)465if scheme.endswith("file"):466initial_slashes = path[: -len(path.lstrip("/"))]467newpath = initial_slashes + urllib.request.url2pathname(path).replace(468"\\", "/"469).lstrip("/")470after_plus = scheme.find("+") + 1471url = scheme[:after_plus] + urlunsplit(472(scheme[after_plus:], netloc, newpath, query, fragment),473)474475if "://" not in url:476assert "file:" not in url477url = url.replace("git+", "git+ssh://")478url, rev, user_pass = super().get_url_rev_and_auth(url)479url = url.replace("ssh://", "")480else:481url, rev, user_pass = super().get_url_rev_and_auth(url)482483return url, rev, user_pass484485@classmethod486def update_submodules(cls, location: str) -> None:487if not os.path.exists(os.path.join(location, ".gitmodules")):488return489cls.run_command(490["submodule", "update", "--init", "--recursive", "-q"],491cwd=location,492)493494@classmethod495def get_repository_root(cls, location: str) -> Optional[str]:496loc = super().get_repository_root(location)497if loc:498return loc499try:500r = cls.run_command(501["rev-parse", "--show-toplevel"],502cwd=location,503show_stdout=False,504stdout_only=True,505on_returncode="raise",506log_failed_cmd=False,507)508except BadCommand:509logger.debug(510"could not determine if %s is under git control "511"because git is not available",512location,513)514return None515except InstallationError:516return None517return os.path.normpath(r.rstrip("\r\n"))518519@staticmethod520def should_add_vcs_url_prefix(repo_url: str) -> bool:521"""In either https or ssh form, requirements must be prefixed with git+."""522return True523524525vcs.register(Git)526527528