Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
keewenaw
GitHub Repository: keewenaw/ethereum-wallet-cracker
Path: blob/main/test/lib/python3.9/site-packages/pip/_internal/utils/logging.py
4804 views
1
import contextlib
2
import errno
3
import logging
4
import logging.handlers
5
import os
6
import sys
7
import threading
8
from dataclasses import dataclass
9
from io import TextIOWrapper
10
from logging import Filter
11
from typing import Any, ClassVar, Generator, List, Optional, TextIO, Type
12
13
from pip._vendor.rich.console import (
14
Console,
15
ConsoleOptions,
16
ConsoleRenderable,
17
RenderResult,
18
)
19
from pip._vendor.rich.highlighter import NullHighlighter
20
from pip._vendor.rich.logging import RichHandler
21
from pip._vendor.rich.segment import Segment
22
from pip._vendor.rich.style import Style
23
24
from pip._internal.utils._log import VERBOSE, getLogger
25
from pip._internal.utils.compat import WINDOWS
26
from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX
27
from pip._internal.utils.misc import ensure_dir
28
29
_log_state = threading.local()
30
subprocess_logger = getLogger("pip.subprocessor")
31
32
33
class BrokenStdoutLoggingError(Exception):
34
"""
35
Raised if BrokenPipeError occurs for the stdout stream while logging.
36
"""
37
38
39
def _is_broken_pipe_error(exc_class: Type[BaseException], exc: BaseException) -> bool:
40
if exc_class is BrokenPipeError:
41
return True
42
43
# On Windows, a broken pipe can show up as EINVAL rather than EPIPE:
44
# https://bugs.python.org/issue19612
45
# https://bugs.python.org/issue30418
46
if not WINDOWS:
47
return False
48
49
return isinstance(exc, OSError) and exc.errno in (errno.EINVAL, errno.EPIPE)
50
51
52
@contextlib.contextmanager
53
def indent_log(num: int = 2) -> Generator[None, None, None]:
54
"""
55
A context manager which will cause the log output to be indented for any
56
log messages emitted inside it.
57
"""
58
# For thread-safety
59
_log_state.indentation = get_indentation()
60
_log_state.indentation += num
61
try:
62
yield
63
finally:
64
_log_state.indentation -= num
65
66
67
def get_indentation() -> int:
68
return getattr(_log_state, "indentation", 0)
69
70
71
class IndentingFormatter(logging.Formatter):
72
default_time_format = "%Y-%m-%dT%H:%M:%S"
73
74
def __init__(
75
self,
76
*args: Any,
77
add_timestamp: bool = False,
78
**kwargs: Any,
79
) -> None:
80
"""
81
A logging.Formatter that obeys the indent_log() context manager.
82
83
:param add_timestamp: A bool indicating output lines should be prefixed
84
with their record's timestamp.
85
"""
86
self.add_timestamp = add_timestamp
87
super().__init__(*args, **kwargs)
88
89
def get_message_start(self, formatted: str, levelno: int) -> str:
90
"""
91
Return the start of the formatted log message (not counting the
92
prefix to add to each line).
93
"""
94
if levelno < logging.WARNING:
95
return ""
96
if formatted.startswith(DEPRECATION_MSG_PREFIX):
97
# Then the message already has a prefix. We don't want it to
98
# look like "WARNING: DEPRECATION: ...."
99
return ""
100
if levelno < logging.ERROR:
101
return "WARNING: "
102
103
return "ERROR: "
104
105
def format(self, record: logging.LogRecord) -> str:
106
"""
107
Calls the standard formatter, but will indent all of the log message
108
lines by our current indentation level.
109
"""
110
formatted = super().format(record)
111
message_start = self.get_message_start(formatted, record.levelno)
112
formatted = message_start + formatted
113
114
prefix = ""
115
if self.add_timestamp:
116
prefix = f"{self.formatTime(record)} "
117
prefix += " " * get_indentation()
118
formatted = "".join([prefix + line for line in formatted.splitlines(True)])
119
return formatted
120
121
122
@dataclass
123
class IndentedRenderable:
124
renderable: ConsoleRenderable
125
indent: int
126
127
def __rich_console__(
128
self, console: Console, options: ConsoleOptions
129
) -> RenderResult:
130
segments = console.render(self.renderable, options)
131
lines = Segment.split_lines(segments)
132
for line in lines:
133
yield Segment(" " * self.indent)
134
yield from line
135
yield Segment("\n")
136
137
138
class RichPipStreamHandler(RichHandler):
139
KEYWORDS: ClassVar[Optional[List[str]]] = []
140
141
def __init__(self, stream: Optional[TextIO], no_color: bool) -> None:
142
super().__init__(
143
console=Console(file=stream, no_color=no_color, soft_wrap=True),
144
show_time=False,
145
show_level=False,
146
show_path=False,
147
highlighter=NullHighlighter(),
148
)
149
150
# Our custom override on Rich's logger, to make things work as we need them to.
151
def emit(self, record: logging.LogRecord) -> None:
152
style: Optional[Style] = None
153
154
# If we are given a diagnostic error to present, present it with indentation.
155
assert isinstance(record.args, tuple)
156
if record.msg == "[present-rich] %s" and len(record.args) == 1:
157
rich_renderable = record.args[0]
158
assert isinstance(
159
rich_renderable, ConsoleRenderable
160
), f"{rich_renderable} is not rich-console-renderable"
161
162
renderable: ConsoleRenderable = IndentedRenderable(
163
rich_renderable, indent=get_indentation()
164
)
165
else:
166
message = self.format(record)
167
renderable = self.render_message(record, message)
168
if record.levelno is not None:
169
if record.levelno >= logging.ERROR:
170
style = Style(color="red")
171
elif record.levelno >= logging.WARNING:
172
style = Style(color="yellow")
173
174
try:
175
self.console.print(renderable, overflow="ignore", crop=False, style=style)
176
except Exception:
177
self.handleError(record)
178
179
def handleError(self, record: logging.LogRecord) -> None:
180
"""Called when logging is unable to log some output."""
181
182
exc_class, exc = sys.exc_info()[:2]
183
# If a broken pipe occurred while calling write() or flush() on the
184
# stdout stream in logging's Handler.emit(), then raise our special
185
# exception so we can handle it in main() instead of logging the
186
# broken pipe error and continuing.
187
if (
188
exc_class
189
and exc
190
and self.console.file is sys.stdout
191
and _is_broken_pipe_error(exc_class, exc)
192
):
193
raise BrokenStdoutLoggingError()
194
195
return super().handleError(record)
196
197
198
class BetterRotatingFileHandler(logging.handlers.RotatingFileHandler):
199
def _open(self) -> TextIOWrapper:
200
ensure_dir(os.path.dirname(self.baseFilename))
201
return super()._open()
202
203
204
class MaxLevelFilter(Filter):
205
def __init__(self, level: int) -> None:
206
self.level = level
207
208
def filter(self, record: logging.LogRecord) -> bool:
209
return record.levelno < self.level
210
211
212
class ExcludeLoggerFilter(Filter):
213
214
"""
215
A logging Filter that excludes records from a logger (or its children).
216
"""
217
218
def filter(self, record: logging.LogRecord) -> bool:
219
# The base Filter class allows only records from a logger (or its
220
# children).
221
return not super().filter(record)
222
223
224
def setup_logging(verbosity: int, no_color: bool, user_log_file: Optional[str]) -> int:
225
"""Configures and sets up all of the logging
226
227
Returns the requested logging level, as its integer value.
228
"""
229
230
# Determine the level to be logging at.
231
if verbosity >= 2:
232
level_number = logging.DEBUG
233
elif verbosity == 1:
234
level_number = VERBOSE
235
elif verbosity == -1:
236
level_number = logging.WARNING
237
elif verbosity == -2:
238
level_number = logging.ERROR
239
elif verbosity <= -3:
240
level_number = logging.CRITICAL
241
else:
242
level_number = logging.INFO
243
244
level = logging.getLevelName(level_number)
245
246
# The "root" logger should match the "console" level *unless* we also need
247
# to log to a user log file.
248
include_user_log = user_log_file is not None
249
if include_user_log:
250
additional_log_file = user_log_file
251
root_level = "DEBUG"
252
else:
253
additional_log_file = "/dev/null"
254
root_level = level
255
256
# Disable any logging besides WARNING unless we have DEBUG level logging
257
# enabled for vendored libraries.
258
vendored_log_level = "WARNING" if level in ["INFO", "ERROR"] else "DEBUG"
259
260
# Shorthands for clarity
261
log_streams = {
262
"stdout": "ext://sys.stdout",
263
"stderr": "ext://sys.stderr",
264
}
265
handler_classes = {
266
"stream": "pip._internal.utils.logging.RichPipStreamHandler",
267
"file": "pip._internal.utils.logging.BetterRotatingFileHandler",
268
}
269
handlers = ["console", "console_errors", "console_subprocess"] + (
270
["user_log"] if include_user_log else []
271
)
272
273
logging.config.dictConfig(
274
{
275
"version": 1,
276
"disable_existing_loggers": False,
277
"filters": {
278
"exclude_warnings": {
279
"()": "pip._internal.utils.logging.MaxLevelFilter",
280
"level": logging.WARNING,
281
},
282
"restrict_to_subprocess": {
283
"()": "logging.Filter",
284
"name": subprocess_logger.name,
285
},
286
"exclude_subprocess": {
287
"()": "pip._internal.utils.logging.ExcludeLoggerFilter",
288
"name": subprocess_logger.name,
289
},
290
},
291
"formatters": {
292
"indent": {
293
"()": IndentingFormatter,
294
"format": "%(message)s",
295
},
296
"indent_with_timestamp": {
297
"()": IndentingFormatter,
298
"format": "%(message)s",
299
"add_timestamp": True,
300
},
301
},
302
"handlers": {
303
"console": {
304
"level": level,
305
"class": handler_classes["stream"],
306
"no_color": no_color,
307
"stream": log_streams["stdout"],
308
"filters": ["exclude_subprocess", "exclude_warnings"],
309
"formatter": "indent",
310
},
311
"console_errors": {
312
"level": "WARNING",
313
"class": handler_classes["stream"],
314
"no_color": no_color,
315
"stream": log_streams["stderr"],
316
"filters": ["exclude_subprocess"],
317
"formatter": "indent",
318
},
319
# A handler responsible for logging to the console messages
320
# from the "subprocessor" logger.
321
"console_subprocess": {
322
"level": level,
323
"class": handler_classes["stream"],
324
"stream": log_streams["stderr"],
325
"no_color": no_color,
326
"filters": ["restrict_to_subprocess"],
327
"formatter": "indent",
328
},
329
"user_log": {
330
"level": "DEBUG",
331
"class": handler_classes["file"],
332
"filename": additional_log_file,
333
"encoding": "utf-8",
334
"delay": True,
335
"formatter": "indent_with_timestamp",
336
},
337
},
338
"root": {
339
"level": root_level,
340
"handlers": handlers,
341
},
342
"loggers": {"pip._vendor": {"level": vendored_log_level}},
343
}
344
)
345
346
return level_number
347
348