Path: blob/master/venv/Lib/site-packages/pip/_internal/cli/spinners.py
811 views
from __future__ import absolute_import, division12import contextlib3import itertools4import logging5import sys6import time78from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR910from pip._internal.utils.compat import WINDOWS11from pip._internal.utils.logging import get_indentation12from pip._internal.utils.typing import MYPY_CHECK_RUNNING1314if MYPY_CHECK_RUNNING:15from typing import Iterator, IO1617logger = logging.getLogger(__name__)181920class SpinnerInterface(object):21def spin(self):22# type: () -> None23raise NotImplementedError()2425def finish(self, final_status):26# type: (str) -> None27raise NotImplementedError()282930class InteractiveSpinner(SpinnerInterface):31def __init__(self, message, file=None, spin_chars="-\\|/",32# Empirically, 8 updates/second looks nice33min_update_interval_seconds=0.125):34# type: (str, IO[str], str, float) -> None35self._message = message36if file is None:37file = sys.stdout38self._file = file39self._rate_limiter = RateLimiter(min_update_interval_seconds)40self._finished = False4142self._spin_cycle = itertools.cycle(spin_chars)4344self._file.write(" " * get_indentation() + self._message + " ... ")45self._width = 04647def _write(self, status):48# type: (str) -> None49assert not self._finished50# Erase what we wrote before by backspacing to the beginning, writing51# spaces to overwrite the old text, and then backspacing again52backup = "\b" * self._width53self._file.write(backup + " " * self._width + backup)54# Now we have a blank slate to add our status55self._file.write(status)56self._width = len(status)57self._file.flush()58self._rate_limiter.reset()5960def spin(self):61# type: () -> None62if self._finished:63return64if not self._rate_limiter.ready():65return66self._write(next(self._spin_cycle))6768def finish(self, final_status):69# type: (str) -> None70if self._finished:71return72self._write(final_status)73self._file.write("\n")74self._file.flush()75self._finished = True767778# Used for dumb terminals, non-interactive installs (no tty), etc.79# We still print updates occasionally (once every 60 seconds by default) to80# act as a keep-alive for systems like Travis-CI that take lack-of-output as81# an indication that a task has frozen.82class NonInteractiveSpinner(SpinnerInterface):83def __init__(self, message, min_update_interval_seconds=60):84# type: (str, float) -> None85self._message = message86self._finished = False87self._rate_limiter = RateLimiter(min_update_interval_seconds)88self._update("started")8990def _update(self, status):91# type: (str) -> None92assert not self._finished93self._rate_limiter.reset()94logger.info("%s: %s", self._message, status)9596def spin(self):97# type: () -> None98if self._finished:99return100if not self._rate_limiter.ready():101return102self._update("still running...")103104def finish(self, final_status):105# type: (str) -> None106if self._finished:107return108self._update(109"finished with status '{final_status}'".format(**locals()))110self._finished = True111112113class RateLimiter(object):114def __init__(self, min_update_interval_seconds):115# type: (float) -> None116self._min_update_interval_seconds = min_update_interval_seconds117self._last_update = 0 # type: float118119def ready(self):120# type: () -> bool121now = time.time()122delta = now - self._last_update123return delta >= self._min_update_interval_seconds124125def reset(self):126# type: () -> None127self._last_update = time.time()128129130@contextlib.contextmanager131def open_spinner(message):132# type: (str) -> Iterator[SpinnerInterface]133# Interactive spinner goes directly to sys.stdout rather than being routed134# through the logging system, but it acts like it has level INFO,135# i.e. it's only displayed if we're at level INFO or better.136# Non-interactive spinner goes through the logging system, so it is always137# in sync with logging configuration.138if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:139spinner = InteractiveSpinner(message) # type: SpinnerInterface140else:141spinner = NonInteractiveSpinner(message)142try:143with hidden_cursor(sys.stdout):144yield spinner145except KeyboardInterrupt:146spinner.finish("canceled")147raise148except Exception:149spinner.finish("error")150raise151else:152spinner.finish("done")153154155@contextlib.contextmanager156def hidden_cursor(file):157# type: (IO[str]) -> Iterator[None]158# The Windows terminal does not support the hide/show cursor ANSI codes,159# even via colorama. So don't even try.160if WINDOWS:161yield162# We don't want to clutter the output with control characters if we're163# writing to a file, or if the user is running with --quiet.164# See https://github.com/pypa/pip/issues/3418165elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:166yield167else:168file.write(HIDE_CURSOR)169try:170yield171finally:172file.write(SHOW_CURSOR)173174175