Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
keewenaw
GitHub Repository: keewenaw/ethereum-wallet-cracker
Path: blob/main/test/lib/python3.9/site-packages/pip/_internal/vcs/versioncontrol.py
4804 views
1
"""Handles all VCS (version control) support"""
2
3
import logging
4
import os
5
import shutil
6
import sys
7
import urllib.parse
8
from typing import (
9
TYPE_CHECKING,
10
Any,
11
Dict,
12
Iterable,
13
Iterator,
14
List,
15
Mapping,
16
Optional,
17
Tuple,
18
Type,
19
Union,
20
)
21
22
from pip._internal.cli.spinners import SpinnerInterface
23
from pip._internal.exceptions import BadCommand, InstallationError
24
from pip._internal.utils.misc import (
25
HiddenText,
26
ask_path_exists,
27
backup_dir,
28
display_path,
29
hide_url,
30
hide_value,
31
is_installable_dir,
32
rmtree,
33
)
34
from pip._internal.utils.subprocess import (
35
CommandArgs,
36
call_subprocess,
37
format_command_args,
38
make_command,
39
)
40
from pip._internal.utils.urls import get_url_scheme
41
42
if TYPE_CHECKING:
43
# Literal was introduced in Python 3.8.
44
#
45
# TODO: Remove `if TYPE_CHECKING` when dropping support for Python 3.7.
46
from typing import Literal
47
48
49
__all__ = ["vcs"]
50
51
52
logger = logging.getLogger(__name__)
53
54
AuthInfo = Tuple[Optional[str], Optional[str]]
55
56
57
def is_url(name: str) -> bool:
58
"""
59
Return true if the name looks like a URL.
60
"""
61
scheme = get_url_scheme(name)
62
if scheme is None:
63
return False
64
return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes
65
66
67
def make_vcs_requirement_url(
68
repo_url: str, rev: str, project_name: str, subdir: Optional[str] = None
69
) -> str:
70
"""
71
Return the URL for a VCS requirement.
72
73
Args:
74
repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").
75
project_name: the (unescaped) project name.
76
"""
77
egg_project_name = project_name.replace("-", "_")
78
req = f"{repo_url}@{rev}#egg={egg_project_name}"
79
if subdir:
80
req += f"&subdirectory={subdir}"
81
82
return req
83
84
85
def find_path_to_project_root_from_repo_root(
86
location: str, repo_root: str
87
) -> Optional[str]:
88
"""
89
Find the the Python project's root by searching up the filesystem from
90
`location`. Return the path to project root relative to `repo_root`.
91
Return None if the project root is `repo_root`, or cannot be found.
92
"""
93
# find project root.
94
orig_location = location
95
while not is_installable_dir(location):
96
last_location = location
97
location = os.path.dirname(location)
98
if location == last_location:
99
# We've traversed up to the root of the filesystem without
100
# finding a Python project.
101
logger.warning(
102
"Could not find a Python project for directory %s (tried all "
103
"parent directories)",
104
orig_location,
105
)
106
return None
107
108
if os.path.samefile(repo_root, location):
109
return None
110
111
return os.path.relpath(location, repo_root)
112
113
114
class RemoteNotFoundError(Exception):
115
pass
116
117
118
class RemoteNotValidError(Exception):
119
def __init__(self, url: str):
120
super().__init__(url)
121
self.url = url
122
123
124
class RevOptions:
125
126
"""
127
Encapsulates a VCS-specific revision to install, along with any VCS
128
install options.
129
130
Instances of this class should be treated as if immutable.
131
"""
132
133
def __init__(
134
self,
135
vc_class: Type["VersionControl"],
136
rev: Optional[str] = None,
137
extra_args: Optional[CommandArgs] = None,
138
) -> None:
139
"""
140
Args:
141
vc_class: a VersionControl subclass.
142
rev: the name of the revision to install.
143
extra_args: a list of extra options.
144
"""
145
if extra_args is None:
146
extra_args = []
147
148
self.extra_args = extra_args
149
self.rev = rev
150
self.vc_class = vc_class
151
self.branch_name: Optional[str] = None
152
153
def __repr__(self) -> str:
154
return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>"
155
156
@property
157
def arg_rev(self) -> Optional[str]:
158
if self.rev is None:
159
return self.vc_class.default_arg_rev
160
161
return self.rev
162
163
def to_args(self) -> CommandArgs:
164
"""
165
Return the VCS-specific command arguments.
166
"""
167
args: CommandArgs = []
168
rev = self.arg_rev
169
if rev is not None:
170
args += self.vc_class.get_base_rev_args(rev)
171
args += self.extra_args
172
173
return args
174
175
def to_display(self) -> str:
176
if not self.rev:
177
return ""
178
179
return f" (to revision {self.rev})"
180
181
def make_new(self, rev: str) -> "RevOptions":
182
"""
183
Make a copy of the current instance, but with a new rev.
184
185
Args:
186
rev: the name of the revision for the new object.
187
"""
188
return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)
189
190
191
class VcsSupport:
192
_registry: Dict[str, "VersionControl"] = {}
193
schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"]
194
195
def __init__(self) -> None:
196
# Register more schemes with urlparse for various version control
197
# systems
198
urllib.parse.uses_netloc.extend(self.schemes)
199
super().__init__()
200
201
def __iter__(self) -> Iterator[str]:
202
return self._registry.__iter__()
203
204
@property
205
def backends(self) -> List["VersionControl"]:
206
return list(self._registry.values())
207
208
@property
209
def dirnames(self) -> List[str]:
210
return [backend.dirname for backend in self.backends]
211
212
@property
213
def all_schemes(self) -> List[str]:
214
schemes: List[str] = []
215
for backend in self.backends:
216
schemes.extend(backend.schemes)
217
return schemes
218
219
def register(self, cls: Type["VersionControl"]) -> None:
220
if not hasattr(cls, "name"):
221
logger.warning("Cannot register VCS %s", cls.__name__)
222
return
223
if cls.name not in self._registry:
224
self._registry[cls.name] = cls()
225
logger.debug("Registered VCS backend: %s", cls.name)
226
227
def unregister(self, name: str) -> None:
228
if name in self._registry:
229
del self._registry[name]
230
231
def get_backend_for_dir(self, location: str) -> Optional["VersionControl"]:
232
"""
233
Return a VersionControl object if a repository of that type is found
234
at the given directory.
235
"""
236
vcs_backends = {}
237
for vcs_backend in self._registry.values():
238
repo_path = vcs_backend.get_repository_root(location)
239
if not repo_path:
240
continue
241
logger.debug("Determine that %s uses VCS: %s", location, vcs_backend.name)
242
vcs_backends[repo_path] = vcs_backend
243
244
if not vcs_backends:
245
return None
246
247
# Choose the VCS in the inner-most directory. Since all repository
248
# roots found here would be either `location` or one of its
249
# parents, the longest path should have the most path components,
250
# i.e. the backend representing the inner-most repository.
251
inner_most_repo_path = max(vcs_backends, key=len)
252
return vcs_backends[inner_most_repo_path]
253
254
def get_backend_for_scheme(self, scheme: str) -> Optional["VersionControl"]:
255
"""
256
Return a VersionControl object or None.
257
"""
258
for vcs_backend in self._registry.values():
259
if scheme in vcs_backend.schemes:
260
return vcs_backend
261
return None
262
263
def get_backend(self, name: str) -> Optional["VersionControl"]:
264
"""
265
Return a VersionControl object or None.
266
"""
267
name = name.lower()
268
return self._registry.get(name)
269
270
271
vcs = VcsSupport()
272
273
274
class VersionControl:
275
name = ""
276
dirname = ""
277
repo_name = ""
278
# List of supported schemes for this Version Control
279
schemes: Tuple[str, ...] = ()
280
# Iterable of environment variable names to pass to call_subprocess().
281
unset_environ: Tuple[str, ...] = ()
282
default_arg_rev: Optional[str] = None
283
284
@classmethod
285
def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
286
"""
287
Return whether the vcs prefix (e.g. "git+") should be added to a
288
repository's remote url when used in a requirement.
289
"""
290
return not remote_url.lower().startswith(f"{cls.name}:")
291
292
@classmethod
293
def get_subdirectory(cls, location: str) -> Optional[str]:
294
"""
295
Return the path to Python project root, relative to the repo root.
296
Return None if the project root is in the repo root.
297
"""
298
return None
299
300
@classmethod
301
def get_requirement_revision(cls, repo_dir: str) -> str:
302
"""
303
Return the revision string that should be used in a requirement.
304
"""
305
return cls.get_revision(repo_dir)
306
307
@classmethod
308
def get_src_requirement(cls, repo_dir: str, project_name: str) -> str:
309
"""
310
Return the requirement string to use to redownload the files
311
currently at the given repository directory.
312
313
Args:
314
project_name: the (unescaped) project name.
315
316
The return value has a form similar to the following:
317
318
{repository_url}@{revision}#egg={project_name}
319
"""
320
repo_url = cls.get_remote_url(repo_dir)
321
322
if cls.should_add_vcs_url_prefix(repo_url):
323
repo_url = f"{cls.name}+{repo_url}"
324
325
revision = cls.get_requirement_revision(repo_dir)
326
subdir = cls.get_subdirectory(repo_dir)
327
req = make_vcs_requirement_url(repo_url, revision, project_name, subdir=subdir)
328
329
return req
330
331
@staticmethod
332
def get_base_rev_args(rev: str) -> List[str]:
333
"""
334
Return the base revision arguments for a vcs command.
335
336
Args:
337
rev: the name of a revision to install. Cannot be None.
338
"""
339
raise NotImplementedError
340
341
def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
342
"""
343
Return true if the commit hash checked out at dest matches
344
the revision in url.
345
346
Always return False, if the VCS does not support immutable commit
347
hashes.
348
349
This method does not check if there are local uncommitted changes
350
in dest after checkout, as pip currently has no use case for that.
351
"""
352
return False
353
354
@classmethod
355
def make_rev_options(
356
cls, rev: Optional[str] = None, extra_args: Optional[CommandArgs] = None
357
) -> RevOptions:
358
"""
359
Return a RevOptions object.
360
361
Args:
362
rev: the name of a revision to install.
363
extra_args: a list of extra options.
364
"""
365
return RevOptions(cls, rev, extra_args=extra_args)
366
367
@classmethod
368
def _is_local_repository(cls, repo: str) -> bool:
369
"""
370
posix absolute paths start with os.path.sep,
371
win32 ones start with drive (like c:\\folder)
372
"""
373
drive, tail = os.path.splitdrive(repo)
374
return repo.startswith(os.path.sep) or bool(drive)
375
376
@classmethod
377
def get_netloc_and_auth(
378
cls, netloc: str, scheme: str
379
) -> Tuple[str, Tuple[Optional[str], Optional[str]]]:
380
"""
381
Parse the repository URL's netloc, and return the new netloc to use
382
along with auth information.
383
384
Args:
385
netloc: the original repository URL netloc.
386
scheme: the repository URL's scheme without the vcs prefix.
387
388
This is mainly for the Subversion class to override, so that auth
389
information can be provided via the --username and --password options
390
instead of through the URL. For other subclasses like Git without
391
such an option, auth information must stay in the URL.
392
393
Returns: (netloc, (username, password)).
394
"""
395
return netloc, (None, None)
396
397
@classmethod
398
def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
399
"""
400
Parse the repository URL to use, and return the URL, revision,
401
and auth info to use.
402
403
Returns: (url, rev, (username, password)).
404
"""
405
scheme, netloc, path, query, frag = urllib.parse.urlsplit(url)
406
if "+" not in scheme:
407
raise ValueError(
408
"Sorry, {!r} is a malformed VCS url. "
409
"The format is <vcs>+<protocol>://<url>, "
410
"e.g. svn+http://myrepo/svn/MyApp#egg=MyApp".format(url)
411
)
412
# Remove the vcs prefix.
413
scheme = scheme.split("+", 1)[1]
414
netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)
415
rev = None
416
if "@" in path:
417
path, rev = path.rsplit("@", 1)
418
if not rev:
419
raise InstallationError(
420
"The URL {!r} has an empty revision (after @) "
421
"which is not supported. Include a revision after @ "
422
"or remove @ from the URL.".format(url)
423
)
424
url = urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
425
return url, rev, user_pass
426
427
@staticmethod
428
def make_rev_args(
429
username: Optional[str], password: Optional[HiddenText]
430
) -> CommandArgs:
431
"""
432
Return the RevOptions "extra arguments" to use in obtain().
433
"""
434
return []
435
436
def get_url_rev_options(self, url: HiddenText) -> Tuple[HiddenText, RevOptions]:
437
"""
438
Return the URL and RevOptions object to use in obtain(),
439
as a tuple (url, rev_options).
440
"""
441
secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
442
username, secret_password = user_pass
443
password: Optional[HiddenText] = None
444
if secret_password is not None:
445
password = hide_value(secret_password)
446
extra_args = self.make_rev_args(username, password)
447
rev_options = self.make_rev_options(rev, extra_args=extra_args)
448
449
return hide_url(secret_url), rev_options
450
451
@staticmethod
452
def normalize_url(url: str) -> str:
453
"""
454
Normalize a URL for comparison by unquoting it and removing any
455
trailing slash.
456
"""
457
return urllib.parse.unquote(url).rstrip("/")
458
459
@classmethod
460
def compare_urls(cls, url1: str, url2: str) -> bool:
461
"""
462
Compare two repo URLs for identity, ignoring incidental differences.
463
"""
464
return cls.normalize_url(url1) == cls.normalize_url(url2)
465
466
def fetch_new(
467
self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
468
) -> None:
469
"""
470
Fetch a revision from a repository, in the case that this is the
471
first fetch from the repository.
472
473
Args:
474
dest: the directory to fetch the repository to.
475
rev_options: a RevOptions object.
476
verbosity: verbosity level.
477
"""
478
raise NotImplementedError
479
480
def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
481
"""
482
Switch the repo at ``dest`` to point to ``URL``.
483
484
Args:
485
rev_options: a RevOptions object.
486
"""
487
raise NotImplementedError
488
489
def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
490
"""
491
Update an already-existing repo to the given ``rev_options``.
492
493
Args:
494
rev_options: a RevOptions object.
495
"""
496
raise NotImplementedError
497
498
@classmethod
499
def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
500
"""
501
Return whether the id of the current commit equals the given name.
502
503
Args:
504
dest: the repository directory.
505
name: a string name.
506
"""
507
raise NotImplementedError
508
509
def obtain(self, dest: str, url: HiddenText, verbosity: int) -> None:
510
"""
511
Install or update in editable mode the package represented by this
512
VersionControl object.
513
514
:param dest: the repository directory in which to install or update.
515
:param url: the repository URL starting with a vcs prefix.
516
:param verbosity: verbosity level.
517
"""
518
url, rev_options = self.get_url_rev_options(url)
519
520
if not os.path.exists(dest):
521
self.fetch_new(dest, url, rev_options, verbosity=verbosity)
522
return
523
524
rev_display = rev_options.to_display()
525
if self.is_repository_directory(dest):
526
existing_url = self.get_remote_url(dest)
527
if self.compare_urls(existing_url, url.secret):
528
logger.debug(
529
"%s in %s exists, and has correct URL (%s)",
530
self.repo_name.title(),
531
display_path(dest),
532
url,
533
)
534
if not self.is_commit_id_equal(dest, rev_options.rev):
535
logger.info(
536
"Updating %s %s%s",
537
display_path(dest),
538
self.repo_name,
539
rev_display,
540
)
541
self.update(dest, url, rev_options)
542
else:
543
logger.info("Skipping because already up-to-date.")
544
return
545
546
logger.warning(
547
"%s %s in %s exists with URL %s",
548
self.name,
549
self.repo_name,
550
display_path(dest),
551
existing_url,
552
)
553
prompt = ("(s)witch, (i)gnore, (w)ipe, (b)ackup ", ("s", "i", "w", "b"))
554
else:
555
logger.warning(
556
"Directory %s already exists, and is not a %s %s.",
557
dest,
558
self.name,
559
self.repo_name,
560
)
561
# https://github.com/python/mypy/issues/1174
562
prompt = ("(i)gnore, (w)ipe, (b)ackup ", ("i", "w", "b")) # type: ignore
563
564
logger.warning(
565
"The plan is to install the %s repository %s",
566
self.name,
567
url,
568
)
569
response = ask_path_exists("What to do? {}".format(prompt[0]), prompt[1])
570
571
if response == "a":
572
sys.exit(-1)
573
574
if response == "w":
575
logger.warning("Deleting %s", display_path(dest))
576
rmtree(dest)
577
self.fetch_new(dest, url, rev_options, verbosity=verbosity)
578
return
579
580
if response == "b":
581
dest_dir = backup_dir(dest)
582
logger.warning("Backing up %s to %s", display_path(dest), dest_dir)
583
shutil.move(dest, dest_dir)
584
self.fetch_new(dest, url, rev_options, verbosity=verbosity)
585
return
586
587
# Do nothing if the response is "i".
588
if response == "s":
589
logger.info(
590
"Switching %s %s to %s%s",
591
self.repo_name,
592
display_path(dest),
593
url,
594
rev_display,
595
)
596
self.switch(dest, url, rev_options)
597
598
def unpack(self, location: str, url: HiddenText, verbosity: int) -> None:
599
"""
600
Clean up current location and download the url repository
601
(and vcs infos) into location
602
603
:param url: the repository URL starting with a vcs prefix.
604
:param verbosity: verbosity level.
605
"""
606
if os.path.exists(location):
607
rmtree(location)
608
self.obtain(location, url=url, verbosity=verbosity)
609
610
@classmethod
611
def get_remote_url(cls, location: str) -> str:
612
"""
613
Return the url used at location
614
615
Raises RemoteNotFoundError if the repository does not have a remote
616
url configured.
617
"""
618
raise NotImplementedError
619
620
@classmethod
621
def get_revision(cls, location: str) -> str:
622
"""
623
Return the current commit id of the files at the given location.
624
"""
625
raise NotImplementedError
626
627
@classmethod
628
def run_command(
629
cls,
630
cmd: Union[List[str], CommandArgs],
631
show_stdout: bool = True,
632
cwd: Optional[str] = None,
633
on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise",
634
extra_ok_returncodes: Optional[Iterable[int]] = None,
635
command_desc: Optional[str] = None,
636
extra_environ: Optional[Mapping[str, Any]] = None,
637
spinner: Optional[SpinnerInterface] = None,
638
log_failed_cmd: bool = True,
639
stdout_only: bool = False,
640
) -> str:
641
"""
642
Run a VCS subcommand
643
This is simply a wrapper around call_subprocess that adds the VCS
644
command name, and checks that the VCS is available
645
"""
646
cmd = make_command(cls.name, *cmd)
647
if command_desc is None:
648
command_desc = format_command_args(cmd)
649
try:
650
return call_subprocess(
651
cmd,
652
show_stdout,
653
cwd,
654
on_returncode=on_returncode,
655
extra_ok_returncodes=extra_ok_returncodes,
656
command_desc=command_desc,
657
extra_environ=extra_environ,
658
unset_environ=cls.unset_environ,
659
spinner=spinner,
660
log_failed_cmd=log_failed_cmd,
661
stdout_only=stdout_only,
662
)
663
except FileNotFoundError:
664
# errno.ENOENT = no such file or directory
665
# In other words, the VCS executable isn't available
666
raise BadCommand(
667
f"Cannot find command {cls.name!r} - do you have "
668
f"{cls.name!r} installed and in your PATH?"
669
)
670
except PermissionError:
671
# errno.EACCES = Permission denied
672
# This error occurs, for instance, when the command is installed
673
# only for another user. So, the current user don't have
674
# permission to call the other user command.
675
raise BadCommand(
676
f"No permission to execute {cls.name!r} - install it "
677
f"locally, globally (ask admin), or check your PATH. "
678
f"See possible solutions at "
679
f"https://pip.pypa.io/en/latest/reference/pip_freeze/"
680
f"#fixing-permission-denied."
681
)
682
683
@classmethod
684
def is_repository_directory(cls, path: str) -> bool:
685
"""
686
Return whether a directory path is a repository directory.
687
"""
688
logger.debug("Checking in %s for %s (%s)...", path, cls.dirname, cls.name)
689
return os.path.exists(os.path.join(path, cls.dirname))
690
691
@classmethod
692
def get_repository_root(cls, location: str) -> Optional[str]:
693
"""
694
Return the "root" (top-level) directory controlled by the vcs,
695
or `None` if the directory is not in any.
696
697
It is meant to be overridden to implement smarter detection
698
mechanisms for specific vcs.
699
700
This can do more than is_repository_directory() alone. For
701
example, the Git override checks that Git is actually available.
702
"""
703
if cls.is_repository_directory(location):
704
return location
705
return None
706
707