Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
keewenaw
GitHub Repository: keewenaw/ethereum-wallet-cracker
Path: blob/main/test/lib/python3.9/site-packages/pip/_internal/cli/spinners.py
4804 views
1
import contextlib
2
import itertools
3
import logging
4
import sys
5
import time
6
from typing import IO, Generator
7
8
from pip._internal.utils.compat import WINDOWS
9
from pip._internal.utils.logging import get_indentation
10
11
logger = logging.getLogger(__name__)
12
13
14
class SpinnerInterface:
15
def spin(self) -> None:
16
raise NotImplementedError()
17
18
def finish(self, final_status: str) -> None:
19
raise NotImplementedError()
20
21
22
class InteractiveSpinner(SpinnerInterface):
23
def __init__(
24
self,
25
message: str,
26
file: IO[str] = None,
27
spin_chars: str = "-\\|/",
28
# Empirically, 8 updates/second looks nice
29
min_update_interval_seconds: float = 0.125,
30
):
31
self._message = message
32
if file is None:
33
file = sys.stdout
34
self._file = file
35
self._rate_limiter = RateLimiter(min_update_interval_seconds)
36
self._finished = False
37
38
self._spin_cycle = itertools.cycle(spin_chars)
39
40
self._file.write(" " * get_indentation() + self._message + " ... ")
41
self._width = 0
42
43
def _write(self, status: str) -> None:
44
assert not self._finished
45
# Erase what we wrote before by backspacing to the beginning, writing
46
# spaces to overwrite the old text, and then backspacing again
47
backup = "\b" * self._width
48
self._file.write(backup + " " * self._width + backup)
49
# Now we have a blank slate to add our status
50
self._file.write(status)
51
self._width = len(status)
52
self._file.flush()
53
self._rate_limiter.reset()
54
55
def spin(self) -> None:
56
if self._finished:
57
return
58
if not self._rate_limiter.ready():
59
return
60
self._write(next(self._spin_cycle))
61
62
def finish(self, final_status: str) -> None:
63
if self._finished:
64
return
65
self._write(final_status)
66
self._file.write("\n")
67
self._file.flush()
68
self._finished = True
69
70
71
# Used for dumb terminals, non-interactive installs (no tty), etc.
72
# We still print updates occasionally (once every 60 seconds by default) to
73
# act as a keep-alive for systems like Travis-CI that take lack-of-output as
74
# an indication that a task has frozen.
75
class NonInteractiveSpinner(SpinnerInterface):
76
def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None:
77
self._message = message
78
self._finished = False
79
self._rate_limiter = RateLimiter(min_update_interval_seconds)
80
self._update("started")
81
82
def _update(self, status: str) -> None:
83
assert not self._finished
84
self._rate_limiter.reset()
85
logger.info("%s: %s", self._message, status)
86
87
def spin(self) -> None:
88
if self._finished:
89
return
90
if not self._rate_limiter.ready():
91
return
92
self._update("still running...")
93
94
def finish(self, final_status: str) -> None:
95
if self._finished:
96
return
97
self._update(f"finished with status '{final_status}'")
98
self._finished = True
99
100
101
class RateLimiter:
102
def __init__(self, min_update_interval_seconds: float) -> None:
103
self._min_update_interval_seconds = min_update_interval_seconds
104
self._last_update: float = 0
105
106
def ready(self) -> bool:
107
now = time.time()
108
delta = now - self._last_update
109
return delta >= self._min_update_interval_seconds
110
111
def reset(self) -> None:
112
self._last_update = time.time()
113
114
115
@contextlib.contextmanager
116
def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]:
117
# Interactive spinner goes directly to sys.stdout rather than being routed
118
# through the logging system, but it acts like it has level INFO,
119
# i.e. it's only displayed if we're at level INFO or better.
120
# Non-interactive spinner goes through the logging system, so it is always
121
# in sync with logging configuration.
122
if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
123
spinner: SpinnerInterface = InteractiveSpinner(message)
124
else:
125
spinner = NonInteractiveSpinner(message)
126
try:
127
with hidden_cursor(sys.stdout):
128
yield spinner
129
except KeyboardInterrupt:
130
spinner.finish("canceled")
131
raise
132
except Exception:
133
spinner.finish("error")
134
raise
135
else:
136
spinner.finish("done")
137
138
139
HIDE_CURSOR = "\x1b[?25l"
140
SHOW_CURSOR = "\x1b[?25h"
141
142
143
@contextlib.contextmanager
144
def hidden_cursor(file: IO[str]) -> Generator[None, None, None]:
145
# The Windows terminal does not support the hide/show cursor ANSI codes,
146
# even via colorama. So don't even try.
147
if WINDOWS:
148
yield
149
# We don't want to clutter the output with control characters if we're
150
# writing to a file, or if the user is running with --quiet.
151
# See https://github.com/pypa/pip/issues/3418
152
elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
153
yield
154
else:
155
file.write(HIDE_CURSOR)
156
try:
157
yield
158
finally:
159
file.write(SHOW_CURSOR)
160
161