Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
hhhrrrttt222111
GitHub Repository: hhhrrrttt222111/Dorkify
Path: blob/master/venv/Lib/site-packages/pip/_internal/cli/spinners.py
811 views
1
from __future__ import absolute_import, division
2
3
import contextlib
4
import itertools
5
import logging
6
import sys
7
import time
8
9
from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR
10
11
from pip._internal.utils.compat import WINDOWS
12
from pip._internal.utils.logging import get_indentation
13
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
14
15
if MYPY_CHECK_RUNNING:
16
from typing import Iterator, IO
17
18
logger = logging.getLogger(__name__)
19
20
21
class SpinnerInterface(object):
22
def spin(self):
23
# type: () -> None
24
raise NotImplementedError()
25
26
def finish(self, final_status):
27
# type: (str) -> None
28
raise NotImplementedError()
29
30
31
class InteractiveSpinner(SpinnerInterface):
32
def __init__(self, message, file=None, spin_chars="-\\|/",
33
# Empirically, 8 updates/second looks nice
34
min_update_interval_seconds=0.125):
35
# type: (str, IO[str], str, float) -> None
36
self._message = message
37
if file is None:
38
file = sys.stdout
39
self._file = file
40
self._rate_limiter = RateLimiter(min_update_interval_seconds)
41
self._finished = False
42
43
self._spin_cycle = itertools.cycle(spin_chars)
44
45
self._file.write(" " * get_indentation() + self._message + " ... ")
46
self._width = 0
47
48
def _write(self, status):
49
# type: (str) -> None
50
assert not self._finished
51
# Erase what we wrote before by backspacing to the beginning, writing
52
# spaces to overwrite the old text, and then backspacing again
53
backup = "\b" * self._width
54
self._file.write(backup + " " * self._width + backup)
55
# Now we have a blank slate to add our status
56
self._file.write(status)
57
self._width = len(status)
58
self._file.flush()
59
self._rate_limiter.reset()
60
61
def spin(self):
62
# type: () -> None
63
if self._finished:
64
return
65
if not self._rate_limiter.ready():
66
return
67
self._write(next(self._spin_cycle))
68
69
def finish(self, final_status):
70
# type: (str) -> None
71
if self._finished:
72
return
73
self._write(final_status)
74
self._file.write("\n")
75
self._file.flush()
76
self._finished = True
77
78
79
# Used for dumb terminals, non-interactive installs (no tty), etc.
80
# We still print updates occasionally (once every 60 seconds by default) to
81
# act as a keep-alive for systems like Travis-CI that take lack-of-output as
82
# an indication that a task has frozen.
83
class NonInteractiveSpinner(SpinnerInterface):
84
def __init__(self, message, min_update_interval_seconds=60):
85
# type: (str, float) -> None
86
self._message = message
87
self._finished = False
88
self._rate_limiter = RateLimiter(min_update_interval_seconds)
89
self._update("started")
90
91
def _update(self, status):
92
# type: (str) -> None
93
assert not self._finished
94
self._rate_limiter.reset()
95
logger.info("%s: %s", self._message, status)
96
97
def spin(self):
98
# type: () -> None
99
if self._finished:
100
return
101
if not self._rate_limiter.ready():
102
return
103
self._update("still running...")
104
105
def finish(self, final_status):
106
# type: (str) -> None
107
if self._finished:
108
return
109
self._update(
110
"finished with status '{final_status}'".format(**locals()))
111
self._finished = True
112
113
114
class RateLimiter(object):
115
def __init__(self, min_update_interval_seconds):
116
# type: (float) -> None
117
self._min_update_interval_seconds = min_update_interval_seconds
118
self._last_update = 0 # type: float
119
120
def ready(self):
121
# type: () -> bool
122
now = time.time()
123
delta = now - self._last_update
124
return delta >= self._min_update_interval_seconds
125
126
def reset(self):
127
# type: () -> None
128
self._last_update = time.time()
129
130
131
@contextlib.contextmanager
132
def open_spinner(message):
133
# type: (str) -> Iterator[SpinnerInterface]
134
# Interactive spinner goes directly to sys.stdout rather than being routed
135
# through the logging system, but it acts like it has level INFO,
136
# i.e. it's only displayed if we're at level INFO or better.
137
# Non-interactive spinner goes through the logging system, so it is always
138
# in sync with logging configuration.
139
if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
140
spinner = InteractiveSpinner(message) # type: SpinnerInterface
141
else:
142
spinner = NonInteractiveSpinner(message)
143
try:
144
with hidden_cursor(sys.stdout):
145
yield spinner
146
except KeyboardInterrupt:
147
spinner.finish("canceled")
148
raise
149
except Exception:
150
spinner.finish("error")
151
raise
152
else:
153
spinner.finish("done")
154
155
156
@contextlib.contextmanager
157
def hidden_cursor(file):
158
# type: (IO[str]) -> Iterator[None]
159
# The Windows terminal does not support the hide/show cursor ANSI codes,
160
# even via colorama. So don't even try.
161
if WINDOWS:
162
yield
163
# We don't want to clutter the output with control characters if we're
164
# writing to a file, or if the user is running with --quiet.
165
# See https://github.com/pypa/pip/issues/3418
166
elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
167
yield
168
else:
169
file.write(HIDE_CURSOR)
170
try:
171
yield
172
finally:
173
file.write(SHOW_CURSOR)
174
175