Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
hhhrrrttt222111
GitHub Repository: hhhrrrttt222111/Dorkify
Path: blob/master/venv/Lib/site-packages/pip/_internal/vcs/versioncontrol.py
811 views
1
"""Handles all VCS (version control) support"""
2
3
from __future__ import absolute_import
4
5
import errno
6
import logging
7
import os
8
import shutil
9
import sys
10
11
from pip._vendor import pkg_resources
12
from pip._vendor.six.moves.urllib import parse as urllib_parse
13
14
from pip._internal.exceptions import BadCommand, InstallationError
15
from pip._internal.utils.compat import samefile
16
from pip._internal.utils.misc import (
17
ask_path_exists,
18
backup_dir,
19
display_path,
20
hide_url,
21
hide_value,
22
rmtree,
23
)
24
from pip._internal.utils.subprocess import call_subprocess, make_command
25
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
26
from pip._internal.utils.urls import get_url_scheme
27
28
if MYPY_CHECK_RUNNING:
29
from typing import (
30
Any, Dict, Iterable, Iterator, List, Mapping, Optional, Text, Tuple,
31
Type, Union
32
)
33
from pip._internal.cli.spinners import SpinnerInterface
34
from pip._internal.utils.misc import HiddenText
35
from pip._internal.utils.subprocess import CommandArgs
36
37
AuthInfo = Tuple[Optional[str], Optional[str]]
38
39
40
__all__ = ['vcs']
41
42
43
logger = logging.getLogger(__name__)
44
45
46
def is_url(name):
47
# type: (Union[str, Text]) -> bool
48
"""
49
Return true if the name looks like a URL.
50
"""
51
scheme = get_url_scheme(name)
52
if scheme is None:
53
return False
54
return scheme in ['http', 'https', 'file', 'ftp'] + vcs.all_schemes
55
56
57
def make_vcs_requirement_url(repo_url, rev, project_name, subdir=None):
58
# type: (str, str, str, Optional[str]) -> str
59
"""
60
Return the URL for a VCS requirement.
61
62
Args:
63
repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").
64
project_name: the (unescaped) project name.
65
"""
66
egg_project_name = pkg_resources.to_filename(project_name)
67
req = '{}@{}#egg={}'.format(repo_url, rev, egg_project_name)
68
if subdir:
69
req += '&subdirectory={}'.format(subdir)
70
71
return req
72
73
74
def find_path_to_setup_from_repo_root(location, repo_root):
75
# type: (str, str) -> Optional[str]
76
"""
77
Find the path to `setup.py` by searching up the filesystem from `location`.
78
Return the path to `setup.py` relative to `repo_root`.
79
Return None if `setup.py` is in `repo_root` or cannot be found.
80
"""
81
# find setup.py
82
orig_location = location
83
while not os.path.exists(os.path.join(location, 'setup.py')):
84
last_location = location
85
location = os.path.dirname(location)
86
if location == last_location:
87
# We've traversed up to the root of the filesystem without
88
# finding setup.py
89
logger.warning(
90
"Could not find setup.py for directory %s (tried all "
91
"parent directories)",
92
orig_location,
93
)
94
return None
95
96
if samefile(repo_root, location):
97
return None
98
99
return os.path.relpath(location, repo_root)
100
101
102
class RemoteNotFoundError(Exception):
103
pass
104
105
106
class RevOptions(object):
107
108
"""
109
Encapsulates a VCS-specific revision to install, along with any VCS
110
install options.
111
112
Instances of this class should be treated as if immutable.
113
"""
114
115
def __init__(
116
self,
117
vc_class, # type: Type[VersionControl]
118
rev=None, # type: Optional[str]
119
extra_args=None, # type: Optional[CommandArgs]
120
):
121
# type: (...) -> None
122
"""
123
Args:
124
vc_class: a VersionControl subclass.
125
rev: the name of the revision to install.
126
extra_args: a list of extra options.
127
"""
128
if extra_args is None:
129
extra_args = []
130
131
self.extra_args = extra_args
132
self.rev = rev
133
self.vc_class = vc_class
134
self.branch_name = None # type: Optional[str]
135
136
def __repr__(self):
137
# type: () -> str
138
return '<RevOptions {}: rev={!r}>'.format(self.vc_class.name, self.rev)
139
140
@property
141
def arg_rev(self):
142
# type: () -> Optional[str]
143
if self.rev is None:
144
return self.vc_class.default_arg_rev
145
146
return self.rev
147
148
def to_args(self):
149
# type: () -> CommandArgs
150
"""
151
Return the VCS-specific command arguments.
152
"""
153
args = [] # type: CommandArgs
154
rev = self.arg_rev
155
if rev is not None:
156
args += self.vc_class.get_base_rev_args(rev)
157
args += self.extra_args
158
159
return args
160
161
def to_display(self):
162
# type: () -> str
163
if not self.rev:
164
return ''
165
166
return ' (to revision {})'.format(self.rev)
167
168
def make_new(self, rev):
169
# type: (str) -> RevOptions
170
"""
171
Make a copy of the current instance, but with a new rev.
172
173
Args:
174
rev: the name of the revision for the new object.
175
"""
176
return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)
177
178
179
class VcsSupport(object):
180
_registry = {} # type: Dict[str, VersionControl]
181
schemes = ['ssh', 'git', 'hg', 'bzr', 'sftp', 'svn']
182
183
def __init__(self):
184
# type: () -> None
185
# Register more schemes with urlparse for various version control
186
# systems
187
urllib_parse.uses_netloc.extend(self.schemes)
188
# Python >= 2.7.4, 3.3 doesn't have uses_fragment
189
if getattr(urllib_parse, 'uses_fragment', None):
190
urllib_parse.uses_fragment.extend(self.schemes)
191
super(VcsSupport, self).__init__()
192
193
def __iter__(self):
194
# type: () -> Iterator[str]
195
return self._registry.__iter__()
196
197
@property
198
def backends(self):
199
# type: () -> List[VersionControl]
200
return list(self._registry.values())
201
202
@property
203
def dirnames(self):
204
# type: () -> List[str]
205
return [backend.dirname for backend in self.backends]
206
207
@property
208
def all_schemes(self):
209
# type: () -> List[str]
210
schemes = [] # type: List[str]
211
for backend in self.backends:
212
schemes.extend(backend.schemes)
213
return schemes
214
215
def register(self, cls):
216
# type: (Type[VersionControl]) -> None
217
if not hasattr(cls, 'name'):
218
logger.warning('Cannot register VCS %s', cls.__name__)
219
return
220
if cls.name not in self._registry:
221
self._registry[cls.name] = cls()
222
logger.debug('Registered VCS backend: %s', cls.name)
223
224
def unregister(self, name):
225
# type: (str) -> None
226
if name in self._registry:
227
del self._registry[name]
228
229
def get_backend_for_dir(self, location):
230
# type: (str) -> Optional[VersionControl]
231
"""
232
Return a VersionControl object if a repository of that type is found
233
at the given directory.
234
"""
235
vcs_backends = {}
236
for vcs_backend in self._registry.values():
237
repo_path = vcs_backend.get_repository_root(location)
238
if not repo_path:
239
continue
240
logger.debug('Determine that %s uses VCS: %s',
241
location, vcs_backend.name)
242
vcs_backends[repo_path] = vcs_backend
243
244
if not vcs_backends:
245
return None
246
247
# Choose the VCS in the inner-most directory. Since all repository
248
# roots found here would be either `location` or one of its
249
# parents, the longest path should have the most path components,
250
# i.e. the backend representing the inner-most repository.
251
inner_most_repo_path = max(vcs_backends, key=len)
252
return vcs_backends[inner_most_repo_path]
253
254
def get_backend_for_scheme(self, scheme):
255
# type: (str) -> Optional[VersionControl]
256
"""
257
Return a VersionControl object or None.
258
"""
259
for vcs_backend in self._registry.values():
260
if scheme in vcs_backend.schemes:
261
return vcs_backend
262
return None
263
264
def get_backend(self, name):
265
# type: (str) -> Optional[VersionControl]
266
"""
267
Return a VersionControl object or None.
268
"""
269
name = name.lower()
270
return self._registry.get(name)
271
272
273
vcs = VcsSupport()
274
275
276
class VersionControl(object):
277
name = ''
278
dirname = ''
279
repo_name = ''
280
# List of supported schemes for this Version Control
281
schemes = () # type: Tuple[str, ...]
282
# Iterable of environment variable names to pass to call_subprocess().
283
unset_environ = () # type: Tuple[str, ...]
284
default_arg_rev = None # type: Optional[str]
285
286
@classmethod
287
def should_add_vcs_url_prefix(cls, remote_url):
288
# type: (str) -> bool
289
"""
290
Return whether the vcs prefix (e.g. "git+") should be added to a
291
repository's remote url when used in a requirement.
292
"""
293
return not remote_url.lower().startswith('{}:'.format(cls.name))
294
295
@classmethod
296
def get_subdirectory(cls, location):
297
# type: (str) -> Optional[str]
298
"""
299
Return the path to setup.py, relative to the repo root.
300
Return None if setup.py is in the repo root.
301
"""
302
return None
303
304
@classmethod
305
def get_requirement_revision(cls, repo_dir):
306
# type: (str) -> str
307
"""
308
Return the revision string that should be used in a requirement.
309
"""
310
return cls.get_revision(repo_dir)
311
312
@classmethod
313
def get_src_requirement(cls, repo_dir, project_name):
314
# type: (str, str) -> Optional[str]
315
"""
316
Return the requirement string to use to redownload the files
317
currently at the given repository directory.
318
319
Args:
320
project_name: the (unescaped) project name.
321
322
The return value has a form similar to the following:
323
324
{repository_url}@{revision}#egg={project_name}
325
"""
326
repo_url = cls.get_remote_url(repo_dir)
327
if repo_url is None:
328
return None
329
330
if cls.should_add_vcs_url_prefix(repo_url):
331
repo_url = '{}+{}'.format(cls.name, repo_url)
332
333
revision = cls.get_requirement_revision(repo_dir)
334
subdir = cls.get_subdirectory(repo_dir)
335
req = make_vcs_requirement_url(repo_url, revision, project_name,
336
subdir=subdir)
337
338
return req
339
340
@staticmethod
341
def get_base_rev_args(rev):
342
# type: (str) -> List[str]
343
"""
344
Return the base revision arguments for a vcs command.
345
346
Args:
347
rev: the name of a revision to install. Cannot be None.
348
"""
349
raise NotImplementedError
350
351
def is_immutable_rev_checkout(self, url, dest):
352
# type: (str, str) -> bool
353
"""
354
Return true if the commit hash checked out at dest matches
355
the revision in url.
356
357
Always return False, if the VCS does not support immutable commit
358
hashes.
359
360
This method does not check if there are local uncommitted changes
361
in dest after checkout, as pip currently has no use case for that.
362
"""
363
return False
364
365
@classmethod
366
def make_rev_options(cls, rev=None, extra_args=None):
367
# type: (Optional[str], Optional[CommandArgs]) -> RevOptions
368
"""
369
Return a RevOptions object.
370
371
Args:
372
rev: the name of a revision to install.
373
extra_args: a list of extra options.
374
"""
375
return RevOptions(cls, rev, extra_args=extra_args)
376
377
@classmethod
378
def _is_local_repository(cls, repo):
379
# type: (str) -> bool
380
"""
381
posix absolute paths start with os.path.sep,
382
win32 ones start with drive (like c:\\folder)
383
"""
384
drive, tail = os.path.splitdrive(repo)
385
return repo.startswith(os.path.sep) or bool(drive)
386
387
def export(self, location, url):
388
# type: (str, HiddenText) -> None
389
"""
390
Export the repository at the url to the destination location
391
i.e. only download the files, without vcs informations
392
393
:param url: the repository URL starting with a vcs prefix.
394
"""
395
raise NotImplementedError
396
397
@classmethod
398
def get_netloc_and_auth(cls, netloc, scheme):
399
# type: (str, str) -> Tuple[str, Tuple[Optional[str], Optional[str]]]
400
"""
401
Parse the repository URL's netloc, and return the new netloc to use
402
along with auth information.
403
404
Args:
405
netloc: the original repository URL netloc.
406
scheme: the repository URL's scheme without the vcs prefix.
407
408
This is mainly for the Subversion class to override, so that auth
409
information can be provided via the --username and --password options
410
instead of through the URL. For other subclasses like Git without
411
such an option, auth information must stay in the URL.
412
413
Returns: (netloc, (username, password)).
414
"""
415
return netloc, (None, None)
416
417
@classmethod
418
def get_url_rev_and_auth(cls, url):
419
# type: (str) -> Tuple[str, Optional[str], AuthInfo]
420
"""
421
Parse the repository URL to use, and return the URL, revision,
422
and auth info to use.
423
424
Returns: (url, rev, (username, password)).
425
"""
426
scheme, netloc, path, query, frag = urllib_parse.urlsplit(url)
427
if '+' not in scheme:
428
raise ValueError(
429
"Sorry, {!r} is a malformed VCS url. "
430
"The format is <vcs>+<protocol>://<url>, "
431
"e.g. svn+http://myrepo/svn/MyApp#egg=MyApp".format(url)
432
)
433
# Remove the vcs prefix.
434
scheme = scheme.split('+', 1)[1]
435
netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)
436
rev = None
437
if '@' in path:
438
path, rev = path.rsplit('@', 1)
439
if not rev:
440
raise InstallationError(
441
"The URL {!r} has an empty revision (after @) "
442
"which is not supported. Include a revision after @ "
443
"or remove @ from the URL.".format(url)
444
)
445
url = urllib_parse.urlunsplit((scheme, netloc, path, query, ''))
446
return url, rev, user_pass
447
448
@staticmethod
449
def make_rev_args(username, password):
450
# type: (Optional[str], Optional[HiddenText]) -> CommandArgs
451
"""
452
Return the RevOptions "extra arguments" to use in obtain().
453
"""
454
return []
455
456
def get_url_rev_options(self, url):
457
# type: (HiddenText) -> Tuple[HiddenText, RevOptions]
458
"""
459
Return the URL and RevOptions object to use in obtain() and in
460
some cases export(), as a tuple (url, rev_options).
461
"""
462
secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
463
username, secret_password = user_pass
464
password = None # type: Optional[HiddenText]
465
if secret_password is not None:
466
password = hide_value(secret_password)
467
extra_args = self.make_rev_args(username, password)
468
rev_options = self.make_rev_options(rev, extra_args=extra_args)
469
470
return hide_url(secret_url), rev_options
471
472
@staticmethod
473
def normalize_url(url):
474
# type: (str) -> str
475
"""
476
Normalize a URL for comparison by unquoting it and removing any
477
trailing slash.
478
"""
479
return urllib_parse.unquote(url).rstrip('/')
480
481
@classmethod
482
def compare_urls(cls, url1, url2):
483
# type: (str, str) -> bool
484
"""
485
Compare two repo URLs for identity, ignoring incidental differences.
486
"""
487
return (cls.normalize_url(url1) == cls.normalize_url(url2))
488
489
def fetch_new(self, dest, url, rev_options):
490
# type: (str, HiddenText, RevOptions) -> None
491
"""
492
Fetch a revision from a repository, in the case that this is the
493
first fetch from the repository.
494
495
Args:
496
dest: the directory to fetch the repository to.
497
rev_options: a RevOptions object.
498
"""
499
raise NotImplementedError
500
501
def switch(self, dest, url, rev_options):
502
# type: (str, HiddenText, RevOptions) -> None
503
"""
504
Switch the repo at ``dest`` to point to ``URL``.
505
506
Args:
507
rev_options: a RevOptions object.
508
"""
509
raise NotImplementedError
510
511
def update(self, dest, url, rev_options):
512
# type: (str, HiddenText, RevOptions) -> None
513
"""
514
Update an already-existing repo to the given ``rev_options``.
515
516
Args:
517
rev_options: a RevOptions object.
518
"""
519
raise NotImplementedError
520
521
@classmethod
522
def is_commit_id_equal(cls, dest, name):
523
# type: (str, Optional[str]) -> bool
524
"""
525
Return whether the id of the current commit equals the given name.
526
527
Args:
528
dest: the repository directory.
529
name: a string name.
530
"""
531
raise NotImplementedError
532
533
def obtain(self, dest, url):
534
# type: (str, HiddenText) -> None
535
"""
536
Install or update in editable mode the package represented by this
537
VersionControl object.
538
539
:param dest: the repository directory in which to install or update.
540
:param url: the repository URL starting with a vcs prefix.
541
"""
542
url, rev_options = self.get_url_rev_options(url)
543
544
if not os.path.exists(dest):
545
self.fetch_new(dest, url, rev_options)
546
return
547
548
rev_display = rev_options.to_display()
549
if self.is_repository_directory(dest):
550
existing_url = self.get_remote_url(dest)
551
if self.compare_urls(existing_url, url.secret):
552
logger.debug(
553
'%s in %s exists, and has correct URL (%s)',
554
self.repo_name.title(),
555
display_path(dest),
556
url,
557
)
558
if not self.is_commit_id_equal(dest, rev_options.rev):
559
logger.info(
560
'Updating %s %s%s',
561
display_path(dest),
562
self.repo_name,
563
rev_display,
564
)
565
self.update(dest, url, rev_options)
566
else:
567
logger.info('Skipping because already up-to-date.')
568
return
569
570
logger.warning(
571
'%s %s in %s exists with URL %s',
572
self.name,
573
self.repo_name,
574
display_path(dest),
575
existing_url,
576
)
577
prompt = ('(s)witch, (i)gnore, (w)ipe, (b)ackup ',
578
('s', 'i', 'w', 'b'))
579
else:
580
logger.warning(
581
'Directory %s already exists, and is not a %s %s.',
582
dest,
583
self.name,
584
self.repo_name,
585
)
586
# https://github.com/python/mypy/issues/1174
587
prompt = ('(i)gnore, (w)ipe, (b)ackup ', # type: ignore
588
('i', 'w', 'b'))
589
590
logger.warning(
591
'The plan is to install the %s repository %s',
592
self.name,
593
url,
594
)
595
response = ask_path_exists('What to do? {}'.format(
596
prompt[0]), prompt[1])
597
598
if response == 'a':
599
sys.exit(-1)
600
601
if response == 'w':
602
logger.warning('Deleting %s', display_path(dest))
603
rmtree(dest)
604
self.fetch_new(dest, url, rev_options)
605
return
606
607
if response == 'b':
608
dest_dir = backup_dir(dest)
609
logger.warning(
610
'Backing up %s to %s', display_path(dest), dest_dir,
611
)
612
shutil.move(dest, dest_dir)
613
self.fetch_new(dest, url, rev_options)
614
return
615
616
# Do nothing if the response is "i".
617
if response == 's':
618
logger.info(
619
'Switching %s %s to %s%s',
620
self.repo_name,
621
display_path(dest),
622
url,
623
rev_display,
624
)
625
self.switch(dest, url, rev_options)
626
627
def unpack(self, location, url):
628
# type: (str, HiddenText) -> None
629
"""
630
Clean up current location and download the url repository
631
(and vcs infos) into location
632
633
:param url: the repository URL starting with a vcs prefix.
634
"""
635
if os.path.exists(location):
636
rmtree(location)
637
self.obtain(location, url=url)
638
639
@classmethod
640
def get_remote_url(cls, location):
641
# type: (str) -> str
642
"""
643
Return the url used at location
644
645
Raises RemoteNotFoundError if the repository does not have a remote
646
url configured.
647
"""
648
raise NotImplementedError
649
650
@classmethod
651
def get_revision(cls, location):
652
# type: (str) -> str
653
"""
654
Return the current commit id of the files at the given location.
655
"""
656
raise NotImplementedError
657
658
@classmethod
659
def run_command(
660
cls,
661
cmd, # type: Union[List[str], CommandArgs]
662
show_stdout=True, # type: bool
663
cwd=None, # type: Optional[str]
664
on_returncode='raise', # type: str
665
extra_ok_returncodes=None, # type: Optional[Iterable[int]]
666
command_desc=None, # type: Optional[str]
667
extra_environ=None, # type: Optional[Mapping[str, Any]]
668
spinner=None, # type: Optional[SpinnerInterface]
669
log_failed_cmd=True # type: bool
670
):
671
# type: (...) -> Text
672
"""
673
Run a VCS subcommand
674
This is simply a wrapper around call_subprocess that adds the VCS
675
command name, and checks that the VCS is available
676
"""
677
cmd = make_command(cls.name, *cmd)
678
try:
679
return call_subprocess(cmd, show_stdout, cwd,
680
on_returncode=on_returncode,
681
extra_ok_returncodes=extra_ok_returncodes,
682
command_desc=command_desc,
683
extra_environ=extra_environ,
684
unset_environ=cls.unset_environ,
685
spinner=spinner,
686
log_failed_cmd=log_failed_cmd)
687
except OSError as e:
688
# errno.ENOENT = no such file or directory
689
# In other words, the VCS executable isn't available
690
if e.errno == errno.ENOENT:
691
raise BadCommand(
692
'Cannot find command {cls.name!r} - do you have '
693
'{cls.name!r} installed and in your '
694
'PATH?'.format(**locals()))
695
else:
696
raise # re-raise exception if a different error occurred
697
698
@classmethod
699
def is_repository_directory(cls, path):
700
# type: (str) -> bool
701
"""
702
Return whether a directory path is a repository directory.
703
"""
704
logger.debug('Checking in %s for %s (%s)...',
705
path, cls.dirname, cls.name)
706
return os.path.exists(os.path.join(path, cls.dirname))
707
708
@classmethod
709
def get_repository_root(cls, location):
710
# type: (str) -> Optional[str]
711
"""
712
Return the "root" (top-level) directory controlled by the vcs,
713
or `None` if the directory is not in any.
714
715
It is meant to be overridden to implement smarter detection
716
mechanisms for specific vcs.
717
718
This can do more than is_repository_directory() alone. For
719
example, the Git override checks that Git is actually available.
720
"""
721
if cls.is_repository_directory(location):
722
return location
723
return None
724
725