Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
keewenaw
GitHub Repository: keewenaw/ethereum-wallet-cracker
Path: blob/main/test/lib/python3.9/site-packages/pip/_internal/utils/misc.py
4804 views
1
# The following comment should be removed at some point in the future.
2
# mypy: strict-optional=False
3
4
import contextlib
5
import errno
6
import getpass
7
import hashlib
8
import io
9
import logging
10
import os
11
import posixpath
12
import shutil
13
import stat
14
import sys
15
import urllib.parse
16
from io import StringIO
17
from itertools import filterfalse, tee, zip_longest
18
from types import TracebackType
19
from typing import (
20
Any,
21
BinaryIO,
22
Callable,
23
ContextManager,
24
Dict,
25
Generator,
26
Iterable,
27
Iterator,
28
List,
29
Optional,
30
TextIO,
31
Tuple,
32
Type,
33
TypeVar,
34
cast,
35
)
36
37
from pip._vendor.pep517 import Pep517HookCaller
38
from pip._vendor.tenacity import retry, stop_after_delay, wait_fixed
39
40
from pip import __version__
41
from pip._internal.exceptions import CommandError
42
from pip._internal.locations import get_major_minor_version
43
from pip._internal.utils.compat import WINDOWS
44
from pip._internal.utils.virtualenv import running_under_virtualenv
45
46
__all__ = [
47
"rmtree",
48
"display_path",
49
"backup_dir",
50
"ask",
51
"splitext",
52
"format_size",
53
"is_installable_dir",
54
"normalize_path",
55
"renames",
56
"get_prog",
57
"captured_stdout",
58
"ensure_dir",
59
"remove_auth_from_url",
60
"ConfiguredPep517HookCaller",
61
]
62
63
64
logger = logging.getLogger(__name__)
65
66
T = TypeVar("T")
67
ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
68
VersionInfo = Tuple[int, int, int]
69
NetlocTuple = Tuple[str, Tuple[Optional[str], Optional[str]]]
70
71
72
def get_pip_version() -> str:
73
pip_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..")
74
pip_pkg_dir = os.path.abspath(pip_pkg_dir)
75
76
return "pip {} from {} (python {})".format(
77
__version__,
78
pip_pkg_dir,
79
get_major_minor_version(),
80
)
81
82
83
def normalize_version_info(py_version_info: Tuple[int, ...]) -> Tuple[int, int, int]:
84
"""
85
Convert a tuple of ints representing a Python version to one of length
86
three.
87
88
:param py_version_info: a tuple of ints representing a Python version,
89
or None to specify no version. The tuple can have any length.
90
91
:return: a tuple of length three if `py_version_info` is non-None.
92
Otherwise, return `py_version_info` unchanged (i.e. None).
93
"""
94
if len(py_version_info) < 3:
95
py_version_info += (3 - len(py_version_info)) * (0,)
96
elif len(py_version_info) > 3:
97
py_version_info = py_version_info[:3]
98
99
return cast("VersionInfo", py_version_info)
100
101
102
def ensure_dir(path: str) -> None:
103
"""os.path.makedirs without EEXIST."""
104
try:
105
os.makedirs(path)
106
except OSError as e:
107
# Windows can raise spurious ENOTEMPTY errors. See #6426.
108
if e.errno != errno.EEXIST and e.errno != errno.ENOTEMPTY:
109
raise
110
111
112
def get_prog() -> str:
113
try:
114
prog = os.path.basename(sys.argv[0])
115
if prog in ("__main__.py", "-c"):
116
return f"{sys.executable} -m pip"
117
else:
118
return prog
119
except (AttributeError, TypeError, IndexError):
120
pass
121
return "pip"
122
123
124
# Retry every half second for up to 3 seconds
125
# Tenacity raises RetryError by default, explicitly raise the original exception
126
@retry(reraise=True, stop=stop_after_delay(3), wait=wait_fixed(0.5))
127
def rmtree(dir: str, ignore_errors: bool = False) -> None:
128
shutil.rmtree(dir, ignore_errors=ignore_errors, onerror=rmtree_errorhandler)
129
130
131
def rmtree_errorhandler(func: Callable[..., Any], path: str, exc_info: ExcInfo) -> None:
132
"""On Windows, the files in .svn are read-only, so when rmtree() tries to
133
remove them, an exception is thrown. We catch that here, remove the
134
read-only attribute, and hopefully continue without problems."""
135
try:
136
has_attr_readonly = not (os.stat(path).st_mode & stat.S_IWRITE)
137
except OSError:
138
# it's equivalent to os.path.exists
139
return
140
141
if has_attr_readonly:
142
# convert to read/write
143
os.chmod(path, stat.S_IWRITE)
144
# use the original function to repeat the operation
145
func(path)
146
return
147
else:
148
raise
149
150
151
def display_path(path: str) -> str:
152
"""Gives the display value for a given path, making it relative to cwd
153
if possible."""
154
path = os.path.normcase(os.path.abspath(path))
155
if path.startswith(os.getcwd() + os.path.sep):
156
path = "." + path[len(os.getcwd()) :]
157
return path
158
159
160
def backup_dir(dir: str, ext: str = ".bak") -> str:
161
"""Figure out the name of a directory to back up the given dir to
162
(adding .bak, .bak2, etc)"""
163
n = 1
164
extension = ext
165
while os.path.exists(dir + extension):
166
n += 1
167
extension = ext + str(n)
168
return dir + extension
169
170
171
def ask_path_exists(message: str, options: Iterable[str]) -> str:
172
for action in os.environ.get("PIP_EXISTS_ACTION", "").split():
173
if action in options:
174
return action
175
return ask(message, options)
176
177
178
def _check_no_input(message: str) -> None:
179
"""Raise an error if no input is allowed."""
180
if os.environ.get("PIP_NO_INPUT"):
181
raise Exception(
182
f"No input was expected ($PIP_NO_INPUT set); question: {message}"
183
)
184
185
186
def ask(message: str, options: Iterable[str]) -> str:
187
"""Ask the message interactively, with the given possible responses"""
188
while 1:
189
_check_no_input(message)
190
response = input(message)
191
response = response.strip().lower()
192
if response not in options:
193
print(
194
"Your response ({!r}) was not one of the expected responses: "
195
"{}".format(response, ", ".join(options))
196
)
197
else:
198
return response
199
200
201
def ask_input(message: str) -> str:
202
"""Ask for input interactively."""
203
_check_no_input(message)
204
return input(message)
205
206
207
def ask_password(message: str) -> str:
208
"""Ask for a password interactively."""
209
_check_no_input(message)
210
return getpass.getpass(message)
211
212
213
def strtobool(val: str) -> int:
214
"""Convert a string representation of truth to true (1) or false (0).
215
216
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
217
are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
218
'val' is anything else.
219
"""
220
val = val.lower()
221
if val in ("y", "yes", "t", "true", "on", "1"):
222
return 1
223
elif val in ("n", "no", "f", "false", "off", "0"):
224
return 0
225
else:
226
raise ValueError(f"invalid truth value {val!r}")
227
228
229
def format_size(bytes: float) -> str:
230
if bytes > 1000 * 1000:
231
return "{:.1f} MB".format(bytes / 1000.0 / 1000)
232
elif bytes > 10 * 1000:
233
return "{} kB".format(int(bytes / 1000))
234
elif bytes > 1000:
235
return "{:.1f} kB".format(bytes / 1000.0)
236
else:
237
return "{} bytes".format(int(bytes))
238
239
240
def tabulate(rows: Iterable[Iterable[Any]]) -> Tuple[List[str], List[int]]:
241
"""Return a list of formatted rows and a list of column sizes.
242
243
For example::
244
245
>>> tabulate([['foobar', 2000], [0xdeadbeef]])
246
(['foobar 2000', '3735928559'], [10, 4])
247
"""
248
rows = [tuple(map(str, row)) for row in rows]
249
sizes = [max(map(len, col)) for col in zip_longest(*rows, fillvalue="")]
250
table = [" ".join(map(str.ljust, row, sizes)).rstrip() for row in rows]
251
return table, sizes
252
253
254
def is_installable_dir(path: str) -> bool:
255
"""Is path is a directory containing pyproject.toml or setup.py?
256
257
If pyproject.toml exists, this is a PEP 517 project. Otherwise we look for
258
a legacy setuptools layout by identifying setup.py. We don't check for the
259
setup.cfg because using it without setup.py is only available for PEP 517
260
projects, which are already covered by the pyproject.toml check.
261
"""
262
if not os.path.isdir(path):
263
return False
264
if os.path.isfile(os.path.join(path, "pyproject.toml")):
265
return True
266
if os.path.isfile(os.path.join(path, "setup.py")):
267
return True
268
return False
269
270
271
def read_chunks(
272
file: BinaryIO, size: int = io.DEFAULT_BUFFER_SIZE
273
) -> Generator[bytes, None, None]:
274
"""Yield pieces of data from a file-like object until EOF."""
275
while True:
276
chunk = file.read(size)
277
if not chunk:
278
break
279
yield chunk
280
281
282
def normalize_path(path: str, resolve_symlinks: bool = True) -> str:
283
"""
284
Convert a path to its canonical, case-normalized, absolute version.
285
286
"""
287
path = os.path.expanduser(path)
288
if resolve_symlinks:
289
path = os.path.realpath(path)
290
else:
291
path = os.path.abspath(path)
292
return os.path.normcase(path)
293
294
295
def splitext(path: str) -> Tuple[str, str]:
296
"""Like os.path.splitext, but take off .tar too"""
297
base, ext = posixpath.splitext(path)
298
if base.lower().endswith(".tar"):
299
ext = base[-4:] + ext
300
base = base[:-4]
301
return base, ext
302
303
304
def renames(old: str, new: str) -> None:
305
"""Like os.renames(), but handles renaming across devices."""
306
# Implementation borrowed from os.renames().
307
head, tail = os.path.split(new)
308
if head and tail and not os.path.exists(head):
309
os.makedirs(head)
310
311
shutil.move(old, new)
312
313
head, tail = os.path.split(old)
314
if head and tail:
315
try:
316
os.removedirs(head)
317
except OSError:
318
pass
319
320
321
def is_local(path: str) -> bool:
322
"""
323
Return True if path is within sys.prefix, if we're running in a virtualenv.
324
325
If we're not in a virtualenv, all paths are considered "local."
326
327
Caution: this function assumes the head of path has been normalized
328
with normalize_path.
329
"""
330
if not running_under_virtualenv():
331
return True
332
return path.startswith(normalize_path(sys.prefix))
333
334
335
def write_output(msg: Any, *args: Any) -> None:
336
logger.info(msg, *args)
337
338
339
class StreamWrapper(StringIO):
340
orig_stream: TextIO = None
341
342
@classmethod
343
def from_stream(cls, orig_stream: TextIO) -> "StreamWrapper":
344
cls.orig_stream = orig_stream
345
return cls()
346
347
# compileall.compile_dir() needs stdout.encoding to print to stdout
348
# https://github.com/python/mypy/issues/4125
349
@property
350
def encoding(self): # type: ignore
351
return self.orig_stream.encoding
352
353
354
@contextlib.contextmanager
355
def captured_output(stream_name: str) -> Generator[StreamWrapper, None, None]:
356
"""Return a context manager used by captured_stdout/stdin/stderr
357
that temporarily replaces the sys stream *stream_name* with a StringIO.
358
359
Taken from Lib/support/__init__.py in the CPython repo.
360
"""
361
orig_stdout = getattr(sys, stream_name)
362
setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout))
363
try:
364
yield getattr(sys, stream_name)
365
finally:
366
setattr(sys, stream_name, orig_stdout)
367
368
369
def captured_stdout() -> ContextManager[StreamWrapper]:
370
"""Capture the output of sys.stdout:
371
372
with captured_stdout() as stdout:
373
print('hello')
374
self.assertEqual(stdout.getvalue(), 'hello\n')
375
376
Taken from Lib/support/__init__.py in the CPython repo.
377
"""
378
return captured_output("stdout")
379
380
381
def captured_stderr() -> ContextManager[StreamWrapper]:
382
"""
383
See captured_stdout().
384
"""
385
return captured_output("stderr")
386
387
388
# Simulates an enum
389
def enum(*sequential: Any, **named: Any) -> Type[Any]:
390
enums = dict(zip(sequential, range(len(sequential))), **named)
391
reverse = {value: key for key, value in enums.items()}
392
enums["reverse_mapping"] = reverse
393
return type("Enum", (), enums)
394
395
396
def build_netloc(host: str, port: Optional[int]) -> str:
397
"""
398
Build a netloc from a host-port pair
399
"""
400
if port is None:
401
return host
402
if ":" in host:
403
# Only wrap host with square brackets when it is IPv6
404
host = f"[{host}]"
405
return f"{host}:{port}"
406
407
408
def build_url_from_netloc(netloc: str, scheme: str = "https") -> str:
409
"""
410
Build a full URL from a netloc.
411
"""
412
if netloc.count(":") >= 2 and "@" not in netloc and "[" not in netloc:
413
# It must be a bare IPv6 address, so wrap it with brackets.
414
netloc = f"[{netloc}]"
415
return f"{scheme}://{netloc}"
416
417
418
def parse_netloc(netloc: str) -> Tuple[str, Optional[int]]:
419
"""
420
Return the host-port pair from a netloc.
421
"""
422
url = build_url_from_netloc(netloc)
423
parsed = urllib.parse.urlparse(url)
424
return parsed.hostname, parsed.port
425
426
427
def split_auth_from_netloc(netloc: str) -> NetlocTuple:
428
"""
429
Parse out and remove the auth information from a netloc.
430
431
Returns: (netloc, (username, password)).
432
"""
433
if "@" not in netloc:
434
return netloc, (None, None)
435
436
# Split from the right because that's how urllib.parse.urlsplit()
437
# behaves if more than one @ is present (which can be checked using
438
# the password attribute of urlsplit()'s return value).
439
auth, netloc = netloc.rsplit("@", 1)
440
pw: Optional[str] = None
441
if ":" in auth:
442
# Split from the left because that's how urllib.parse.urlsplit()
443
# behaves if more than one : is present (which again can be checked
444
# using the password attribute of the return value)
445
user, pw = auth.split(":", 1)
446
else:
447
user, pw = auth, None
448
449
user = urllib.parse.unquote(user)
450
if pw is not None:
451
pw = urllib.parse.unquote(pw)
452
453
return netloc, (user, pw)
454
455
456
def redact_netloc(netloc: str) -> str:
457
"""
458
Replace the sensitive data in a netloc with "****", if it exists.
459
460
For example:
461
- "user:[email protected]" returns "user:****@example.com"
462
- "[email protected]" returns "****@example.com"
463
"""
464
netloc, (user, password) = split_auth_from_netloc(netloc)
465
if user is None:
466
return netloc
467
if password is None:
468
user = "****"
469
password = ""
470
else:
471
user = urllib.parse.quote(user)
472
password = ":****"
473
return "{user}{password}@{netloc}".format(
474
user=user, password=password, netloc=netloc
475
)
476
477
478
def _transform_url(
479
url: str, transform_netloc: Callable[[str], Tuple[Any, ...]]
480
) -> Tuple[str, NetlocTuple]:
481
"""Transform and replace netloc in a url.
482
483
transform_netloc is a function taking the netloc and returning a
484
tuple. The first element of this tuple is the new netloc. The
485
entire tuple is returned.
486
487
Returns a tuple containing the transformed url as item 0 and the
488
original tuple returned by transform_netloc as item 1.
489
"""
490
purl = urllib.parse.urlsplit(url)
491
netloc_tuple = transform_netloc(purl.netloc)
492
# stripped url
493
url_pieces = (purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment)
494
surl = urllib.parse.urlunsplit(url_pieces)
495
return surl, cast("NetlocTuple", netloc_tuple)
496
497
498
def _get_netloc(netloc: str) -> NetlocTuple:
499
return split_auth_from_netloc(netloc)
500
501
502
def _redact_netloc(netloc: str) -> Tuple[str]:
503
return (redact_netloc(netloc),)
504
505
506
def split_auth_netloc_from_url(url: str) -> Tuple[str, str, Tuple[str, str]]:
507
"""
508
Parse a url into separate netloc, auth, and url with no auth.
509
510
Returns: (url_without_auth, netloc, (username, password))
511
"""
512
url_without_auth, (netloc, auth) = _transform_url(url, _get_netloc)
513
return url_without_auth, netloc, auth
514
515
516
def remove_auth_from_url(url: str) -> str:
517
"""Return a copy of url with 'username:password@' removed."""
518
# username/pass params are passed to subversion through flags
519
# and are not recognized in the url.
520
return _transform_url(url, _get_netloc)[0]
521
522
523
def redact_auth_from_url(url: str) -> str:
524
"""Replace the password in a given url with ****."""
525
return _transform_url(url, _redact_netloc)[0]
526
527
528
class HiddenText:
529
def __init__(self, secret: str, redacted: str) -> None:
530
self.secret = secret
531
self.redacted = redacted
532
533
def __repr__(self) -> str:
534
return "<HiddenText {!r}>".format(str(self))
535
536
def __str__(self) -> str:
537
return self.redacted
538
539
# This is useful for testing.
540
def __eq__(self, other: Any) -> bool:
541
if type(self) != type(other):
542
return False
543
544
# The string being used for redaction doesn't also have to match,
545
# just the raw, original string.
546
return self.secret == other.secret
547
548
549
def hide_value(value: str) -> HiddenText:
550
return HiddenText(value, redacted="****")
551
552
553
def hide_url(url: str) -> HiddenText:
554
redacted = redact_auth_from_url(url)
555
return HiddenText(url, redacted=redacted)
556
557
558
def protect_pip_from_modification_on_windows(modifying_pip: bool) -> None:
559
"""Protection of pip.exe from modification on Windows
560
561
On Windows, any operation modifying pip should be run as:
562
python -m pip ...
563
"""
564
pip_names = [
565
"pip",
566
f"pip{sys.version_info.major}",
567
f"pip{sys.version_info.major}.{sys.version_info.minor}",
568
]
569
570
# See https://github.com/pypa/pip/issues/1299 for more discussion
571
should_show_use_python_msg = (
572
modifying_pip and WINDOWS and os.path.basename(sys.argv[0]) in pip_names
573
)
574
575
if should_show_use_python_msg:
576
new_command = [sys.executable, "-m", "pip"] + sys.argv[1:]
577
raise CommandError(
578
"To modify pip, please run the following command:\n{}".format(
579
" ".join(new_command)
580
)
581
)
582
583
584
def is_console_interactive() -> bool:
585
"""Is this console interactive?"""
586
return sys.stdin is not None and sys.stdin.isatty()
587
588
589
def hash_file(path: str, blocksize: int = 1 << 20) -> Tuple[Any, int]:
590
"""Return (hash, length) for path using hashlib.sha256()"""
591
592
h = hashlib.sha256()
593
length = 0
594
with open(path, "rb") as f:
595
for block in read_chunks(f, size=blocksize):
596
length += len(block)
597
h.update(block)
598
return h, length
599
600
601
def is_wheel_installed() -> bool:
602
"""
603
Return whether the wheel package is installed.
604
"""
605
try:
606
import wheel # noqa: F401
607
except ImportError:
608
return False
609
610
return True
611
612
613
def pairwise(iterable: Iterable[Any]) -> Iterator[Tuple[Any, Any]]:
614
"""
615
Return paired elements.
616
617
For example:
618
s -> (s0, s1), (s2, s3), (s4, s5), ...
619
"""
620
iterable = iter(iterable)
621
return zip_longest(iterable, iterable)
622
623
624
def partition(
625
pred: Callable[[T], bool],
626
iterable: Iterable[T],
627
) -> Tuple[Iterable[T], Iterable[T]]:
628
"""
629
Use a predicate to partition entries into false entries and true entries,
630
like
631
632
partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9
633
"""
634
t1, t2 = tee(iterable)
635
return filterfalse(pred, t1), filter(pred, t2)
636
637
638
class ConfiguredPep517HookCaller(Pep517HookCaller):
639
def __init__(
640
self,
641
config_holder: Any,
642
source_dir: str,
643
build_backend: str,
644
backend_path: Optional[str] = None,
645
runner: Optional[Callable[..., None]] = None,
646
python_executable: Optional[str] = None,
647
):
648
super().__init__(
649
source_dir, build_backend, backend_path, runner, python_executable
650
)
651
self.config_holder = config_holder
652
653
def build_wheel(
654
self,
655
wheel_directory: str,
656
config_settings: Optional[Dict[str, str]] = None,
657
metadata_directory: Optional[str] = None,
658
) -> str:
659
cs = self.config_holder.config_settings
660
return super().build_wheel(
661
wheel_directory, config_settings=cs, metadata_directory=metadata_directory
662
)
663
664
def build_sdist(
665
self, sdist_directory: str, config_settings: Optional[Dict[str, str]] = None
666
) -> str:
667
cs = self.config_holder.config_settings
668
return super().build_sdist(sdist_directory, config_settings=cs)
669
670
def build_editable(
671
self,
672
wheel_directory: str,
673
config_settings: Optional[Dict[str, str]] = None,
674
metadata_directory: Optional[str] = None,
675
) -> str:
676
cs = self.config_holder.config_settings
677
return super().build_editable(
678
wheel_directory, config_settings=cs, metadata_directory=metadata_directory
679
)
680
681
def get_requires_for_build_wheel(
682
self, config_settings: Optional[Dict[str, str]] = None
683
) -> List[str]:
684
cs = self.config_holder.config_settings
685
return super().get_requires_for_build_wheel(config_settings=cs)
686
687
def get_requires_for_build_sdist(
688
self, config_settings: Optional[Dict[str, str]] = None
689
) -> List[str]:
690
cs = self.config_holder.config_settings
691
return super().get_requires_for_build_sdist(config_settings=cs)
692
693
def get_requires_for_build_editable(
694
self, config_settings: Optional[Dict[str, str]] = None
695
) -> List[str]:
696
cs = self.config_holder.config_settings
697
return super().get_requires_for_build_editable(config_settings=cs)
698
699
def prepare_metadata_for_build_wheel(
700
self,
701
metadata_directory: str,
702
config_settings: Optional[Dict[str, str]] = None,
703
_allow_fallback: bool = True,
704
) -> str:
705
cs = self.config_holder.config_settings
706
return super().prepare_metadata_for_build_wheel(
707
metadata_directory=metadata_directory,
708
config_settings=cs,
709
_allow_fallback=_allow_fallback,
710
)
711
712
def prepare_metadata_for_build_editable(
713
self,
714
metadata_directory: str,
715
config_settings: Optional[Dict[str, str]] = None,
716
_allow_fallback: bool = True,
717
) -> str:
718
cs = self.config_holder.config_settings
719
return super().prepare_metadata_for_build_editable(
720
metadata_directory=metadata_directory,
721
config_settings=cs,
722
_allow_fallback=_allow_fallback,
723
)
724
725