Path: blob/main/test/lib/python3.9/site-packages/pip/_internal/cli/spinners.py
4804 views
import contextlib1import itertools2import logging3import sys4import time5from typing import IO, Generator67from pip._internal.utils.compat import WINDOWS8from pip._internal.utils.logging import get_indentation910logger = logging.getLogger(__name__)111213class SpinnerInterface:14def spin(self) -> None:15raise NotImplementedError()1617def finish(self, final_status: str) -> None:18raise NotImplementedError()192021class InteractiveSpinner(SpinnerInterface):22def __init__(23self,24message: str,25file: IO[str] = None,26spin_chars: str = "-\\|/",27# Empirically, 8 updates/second looks nice28min_update_interval_seconds: float = 0.125,29):30self._message = message31if file is None:32file = sys.stdout33self._file = file34self._rate_limiter = RateLimiter(min_update_interval_seconds)35self._finished = False3637self._spin_cycle = itertools.cycle(spin_chars)3839self._file.write(" " * get_indentation() + self._message + " ... ")40self._width = 04142def _write(self, status: str) -> None:43assert not self._finished44# Erase what we wrote before by backspacing to the beginning, writing45# spaces to overwrite the old text, and then backspacing again46backup = "\b" * self._width47self._file.write(backup + " " * self._width + backup)48# Now we have a blank slate to add our status49self._file.write(status)50self._width = len(status)51self._file.flush()52self._rate_limiter.reset()5354def spin(self) -> None:55if self._finished:56return57if not self._rate_limiter.ready():58return59self._write(next(self._spin_cycle))6061def finish(self, final_status: str) -> None:62if self._finished:63return64self._write(final_status)65self._file.write("\n")66self._file.flush()67self._finished = True686970# Used for dumb terminals, non-interactive installs (no tty), etc.71# We still print updates occasionally (once every 60 seconds by default) to72# act as a keep-alive for systems like Travis-CI that take lack-of-output as73# an indication that a task has frozen.74class NonInteractiveSpinner(SpinnerInterface):75def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None:76self._message = message77self._finished = False78self._rate_limiter = RateLimiter(min_update_interval_seconds)79self._update("started")8081def _update(self, status: str) -> None:82assert not self._finished83self._rate_limiter.reset()84logger.info("%s: %s", self._message, status)8586def spin(self) -> None:87if self._finished:88return89if not self._rate_limiter.ready():90return91self._update("still running...")9293def finish(self, final_status: str) -> None:94if self._finished:95return96self._update(f"finished with status '{final_status}'")97self._finished = True9899100class RateLimiter:101def __init__(self, min_update_interval_seconds: float) -> None:102self._min_update_interval_seconds = min_update_interval_seconds103self._last_update: float = 0104105def ready(self) -> bool:106now = time.time()107delta = now - self._last_update108return delta >= self._min_update_interval_seconds109110def reset(self) -> None:111self._last_update = time.time()112113114@contextlib.contextmanager115def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]:116# Interactive spinner goes directly to sys.stdout rather than being routed117# through the logging system, but it acts like it has level INFO,118# i.e. it's only displayed if we're at level INFO or better.119# Non-interactive spinner goes through the logging system, so it is always120# in sync with logging configuration.121if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:122spinner: SpinnerInterface = InteractiveSpinner(message)123else:124spinner = NonInteractiveSpinner(message)125try:126with hidden_cursor(sys.stdout):127yield spinner128except KeyboardInterrupt:129spinner.finish("canceled")130raise131except Exception:132spinner.finish("error")133raise134else:135spinner.finish("done")136137138HIDE_CURSOR = "\x1b[?25l"139SHOW_CURSOR = "\x1b[?25h"140141142@contextlib.contextmanager143def hidden_cursor(file: IO[str]) -> Generator[None, None, None]:144# The Windows terminal does not support the hide/show cursor ANSI codes,145# even via colorama. So don't even try.146if WINDOWS:147yield148# We don't want to clutter the output with control characters if we're149# writing to a file, or if the user is running with --quiet.150# See https://github.com/pypa/pip/issues/3418151elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:152yield153else:154file.write(HIDE_CURSOR)155try:156yield157finally:158file.write(SHOW_CURSOR)159160161