Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/output.py
5457 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2015-2025 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
import os
10
import sys
11
import shutil
12
import logging
13
import unicodedata
14
from . import config, util, formatter
15
16
17
# --------------------------------------------------------------------
18
# Globals
19
20
try:
21
TTY_STDOUT = sys.stdout.isatty()
22
except Exception:
23
TTY_STDOUT = False
24
25
try:
26
TTY_STDERR = sys.stderr.isatty()
27
except Exception:
28
TTY_STDERR = False
29
30
try:
31
TTY_STDIN = sys.stdin.isatty()
32
except Exception:
33
TTY_STDIN = False
34
35
36
COLORS_DEFAULT = {}
37
COLORS = not os.environ.get("NO_COLOR")
38
if COLORS:
39
if TTY_STDOUT:
40
COLORS_DEFAULT["success"] = "1;32"
41
COLORS_DEFAULT["skip"] = "2"
42
if TTY_STDERR:
43
COLORS_DEFAULT["debug"] = "0;37"
44
COLORS_DEFAULT["info"] = "1;37"
45
COLORS_DEFAULT["warning"] = "1;33"
46
COLORS_DEFAULT["error"] = "1;31"
47
48
49
if util.WINDOWS:
50
ANSI = COLORS and os.environ.get("TERM") == "ANSI"
51
OFFSET = 1
52
CHAR_SKIP = "# "
53
CHAR_SUCCESS = "* "
54
CHAR_ELLIPSIES = "..."
55
else:
56
ANSI = COLORS
57
OFFSET = 0
58
CHAR_SKIP = "# "
59
CHAR_SUCCESS = "✔ "
60
CHAR_ELLIPSIES = "…"
61
62
63
# --------------------------------------------------------------------
64
# Logging
65
66
LOG_FORMAT = "[{name}][{levelname}] {message}"
67
LOG_FORMAT_DATE = "%Y-%m-%d %H:%M:%S"
68
LOG_LEVEL = logging.INFO
69
LOG_LEVELS = ("debug", "info", "warning", "error")
70
71
72
class Logger(logging.Logger):
73
"""Custom Logger that includes extra info in log records"""
74
75
def makeRecord(self, name, level, fn, lno, msg, args, exc_info,
76
func=None, extra=None, sinfo=None,
77
factory=logging._logRecordFactory):
78
rv = factory(name, level, fn, lno, msg, args, exc_info, func, sinfo)
79
if extra:
80
rv.__dict__.update(extra)
81
return rv
82
83
84
class LoggerAdapter():
85
"""Trimmed-down version of logging.LoggingAdapter"""
86
__slots__ = ("logger", "extra")
87
88
def __init__(self, logger, job):
89
self.logger = logger
90
self.extra = job._logger_extra
91
92
def debug(self, msg, *args, **kwargs):
93
if self.logger.isEnabledFor(logging.DEBUG):
94
kwargs["extra"] = self.extra
95
self.logger._log(logging.DEBUG, msg, args, **kwargs)
96
97
def info(self, msg, *args, **kwargs):
98
if self.logger.isEnabledFor(logging.INFO):
99
kwargs["extra"] = self.extra
100
self.logger._log(logging.INFO, msg, args, **kwargs)
101
102
def warning(self, msg, *args, **kwargs):
103
if self.logger.isEnabledFor(logging.WARNING):
104
kwargs["extra"] = self.extra
105
self.logger._log(logging.WARNING, msg, args, **kwargs)
106
107
def error(self, msg, *args, **kwargs):
108
if self.logger.isEnabledFor(logging.ERROR):
109
kwargs["extra"] = self.extra
110
self.logger._log(logging.ERROR, msg, args, **kwargs)
111
112
113
class PathfmtProxy():
114
__slots__ = ("job",)
115
116
def __init__(self, job):
117
self.job = job
118
119
def __getattribute__(self, name):
120
pathfmt = object.__getattribute__(self, "job").pathfmt
121
return getattr(pathfmt, name, None) if pathfmt else None
122
123
def __str__(self):
124
if pathfmt := object.__getattribute__(self, "job").pathfmt:
125
return pathfmt.path or pathfmt.directory
126
return ""
127
128
129
class KwdictProxy():
130
__slots__ = ("job",)
131
132
def __init__(self, job):
133
self.job = job
134
135
def __getattribute__(self, name):
136
pathfmt = object.__getattribute__(self, "job").pathfmt
137
return pathfmt.kwdict.get(name) if pathfmt else None
138
139
140
class Formatter(logging.Formatter):
141
"""Custom formatter that supports different formats per loglevel"""
142
143
def __init__(self, fmt, datefmt):
144
if isinstance(fmt, dict):
145
for key in LOG_LEVELS:
146
value = fmt[key] if key in fmt else LOG_FORMAT
147
fmt[key] = (formatter.parse(value).format_map,
148
"{asctime" in value)
149
else:
150
if fmt == LOG_FORMAT:
151
fmt = (fmt.format_map, False)
152
else:
153
fmt = (formatter.parse(fmt).format_map, "{asctime" in fmt)
154
fmt = {"debug": fmt, "info": fmt, "warning": fmt, "error": fmt}
155
156
self.formats = fmt
157
self.datefmt = datefmt
158
159
def format(self, record):
160
record.message = record.getMessage()
161
fmt, asctime = self.formats[record.levelname]
162
if asctime:
163
record.asctime = self.formatTime(record, self.datefmt)
164
msg = fmt(record.__dict__)
165
if record.exc_info and not record.exc_text:
166
record.exc_text = self.formatException(record.exc_info)
167
if record.exc_text:
168
msg = f"{msg}\n{record.exc_text}"
169
if record.stack_info:
170
msg = f"{msg}\n{record.stack_info}"
171
return msg
172
173
174
def initialize_logging(loglevel):
175
"""Setup basic logging functionality before configfiles have been loaded"""
176
# convert levelnames to lowercase
177
for level in (10, 20, 30, 40, 50):
178
name = logging.getLevelName(level)
179
logging.addLevelName(level, name.lower())
180
181
# register custom Logging class
182
logging.Logger.manager.setLoggerClass(Logger)
183
184
# setup basic logging to stderr
185
formatter = Formatter(LOG_FORMAT, LOG_FORMAT_DATE)
186
handler = logging.StreamHandler()
187
handler.setFormatter(formatter)
188
handler.setLevel(loglevel)
189
root = logging.getLogger()
190
root.setLevel(logging.NOTSET)
191
root.addHandler(handler)
192
193
return logging.getLogger("gallery-dl")
194
195
196
def configure_logging(loglevel):
197
root = logging.getLogger()
198
minlevel = loglevel
199
200
# stream logging handler
201
handler = root.handlers[0]
202
opts = config.interpolate(("output",), "log")
203
204
colors = config.interpolate(("output",), "colors")
205
if colors is None:
206
colors = COLORS_DEFAULT
207
if colors and not opts:
208
opts = LOG_FORMAT
209
210
if opts:
211
if isinstance(opts, str):
212
logfmt = opts
213
opts = {}
214
elif "format" in opts:
215
logfmt = opts["format"]
216
else:
217
logfmt = LOG_FORMAT
218
219
if not isinstance(logfmt, dict) and colors:
220
ansifmt = "\033[{}m{}\033[0m".format
221
lf = {}
222
for level in LOG_LEVELS:
223
c = colors.get(level)
224
lf[level] = ansifmt(c, logfmt) if c else logfmt
225
logfmt = lf
226
227
handler.setFormatter(Formatter(
228
logfmt, opts.get("format-date", LOG_FORMAT_DATE)))
229
230
if "level" in opts and handler.level == LOG_LEVEL:
231
handler.setLevel(opts["level"])
232
233
if minlevel > handler.level:
234
minlevel = handler.level
235
236
# file logging handler
237
if handler := setup_logging_handler("logfile", lvl=loglevel):
238
root.addHandler(handler)
239
if minlevel > handler.level:
240
minlevel = handler.level
241
242
root.setLevel(minlevel)
243
244
245
def setup_logging_handler(key, fmt=LOG_FORMAT, lvl=LOG_LEVEL, mode="w"):
246
"""Setup a new logging handler"""
247
opts = config.interpolate(("output",), key)
248
if not opts:
249
return None
250
if not isinstance(opts, dict):
251
opts = {"path": opts}
252
253
path = opts.get("path")
254
mode = opts.get("mode", mode)
255
encoding = opts.get("encoding", "utf-8")
256
try:
257
path = util.expand_path(path)
258
handler = logging.FileHandler(path, mode, encoding)
259
except FileNotFoundError:
260
os.makedirs(os.path.dirname(path))
261
handler = logging.FileHandler(path, mode, encoding)
262
except (OSError, ValueError) as exc:
263
logging.getLogger("gallery-dl").warning(
264
"%s: %s", key, exc)
265
return None
266
except TypeError as exc:
267
logging.getLogger("gallery-dl").warning(
268
"%s: missing or invalid path (%s)", key, exc)
269
return None
270
271
handler.setLevel(opts.get("level", lvl))
272
handler.setFormatter(Formatter(
273
opts.get("format", fmt),
274
opts.get("format-date", LOG_FORMAT_DATE),
275
))
276
return handler
277
278
279
# --------------------------------------------------------------------
280
# Utility functions
281
282
def stdout_write_flush(s):
283
sys.stdout.write(s)
284
sys.stdout.flush()
285
286
287
def stderr_write_flush(s):
288
sys.stderr.write(s)
289
sys.stderr.flush()
290
291
292
if getattr(sys.stdout, "line_buffering", None):
293
def stdout_write(s):
294
sys.stdout.write(s)
295
else:
296
stdout_write = stdout_write_flush
297
298
299
if getattr(sys.stderr, "line_buffering", None):
300
def stderr_write(s):
301
sys.stderr.write(s)
302
else:
303
stderr_write = stderr_write_flush
304
305
306
def configure_standard_streams():
307
for name in ("stdout", "stderr", "stdin"):
308
stream = getattr(sys, name, None)
309
if not stream:
310
continue
311
312
options = config.get(("output",), name)
313
if not options:
314
options = {"errors": "replace"}
315
elif isinstance(options, str):
316
options = {"errors": "replace", "encoding": options}
317
elif not options.get("errors"):
318
options["errors"] = "replace"
319
320
stream.reconfigure(**options)
321
322
323
# --------------------------------------------------------------------
324
# Downloader output
325
326
def select():
327
"""Select a suitable output class"""
328
mode = config.get(("output",), "mode")
329
330
if mode is None or mode == "auto":
331
try:
332
if TTY_STDOUT:
333
output = ColorOutput() if ANSI else TerminalOutput()
334
else:
335
output = PipeOutput()
336
except Exception:
337
output = PipeOutput()
338
elif isinstance(mode, dict):
339
output = CustomOutput(mode)
340
elif not mode:
341
output = NullOutput()
342
else:
343
output = {
344
"default" : PipeOutput,
345
"pipe" : PipeOutput,
346
"term" : TerminalOutput,
347
"terminal": TerminalOutput,
348
"color" : ColorOutput,
349
"null" : NullOutput,
350
}[mode.lower()]()
351
352
if not config.get(("output",), "skip", True):
353
output.skip = util.identity
354
return output
355
356
357
class NullOutput():
358
359
def start(self, path):
360
"""Print a message indicating the start of a download"""
361
362
def skip(self, path):
363
"""Print a message indicating that a download has been skipped"""
364
365
def success(self, path):
366
"""Print a message indicating the completion of a download"""
367
368
def progress(self, bytes_total, bytes_downloaded, bytes_per_second):
369
"""Display download progress"""
370
371
372
class PipeOutput(NullOutput):
373
374
def skip(self, path):
375
stdout_write(f"{CHAR_SKIP}{path}\n")
376
377
def success(self, path):
378
stdout_write(f"{path}\n")
379
380
381
class TerminalOutput():
382
383
def __init__(self):
384
if shorten := config.get(("output",), "shorten", True):
385
func = shorten_string_eaw if shorten == "eaw" else shorten_string
386
limit = shutil.get_terminal_size().columns - OFFSET
387
sep = CHAR_ELLIPSIES
388
self.shorten = lambda txt: func(txt, limit, sep)
389
else:
390
self.shorten = util.identity
391
392
def start(self, path):
393
stdout_write_flush(self.shorten(f" {path}"))
394
395
def skip(self, path):
396
stdout_write(f"{self.shorten(CHAR_SKIP + path)}\n")
397
398
def success(self, path):
399
stdout_write(f"\r{self.shorten(CHAR_SUCCESS + path)}\n")
400
401
def progress(self, bytes_total, bytes_downloaded, bytes_per_second):
402
bdl = util.format_value(bytes_downloaded)
403
bps = util.format_value(bytes_per_second)
404
if bytes_total is None:
405
stderr_write(f"\r{bdl:>7}B {bps:>7}B/s ")
406
else:
407
stderr_write(f"\r{bytes_downloaded * 100 // bytes_total:>3}% "
408
f"{bdl:>7}B {bps:>7}B/s ")
409
410
411
class ColorOutput(TerminalOutput):
412
413
def __init__(self):
414
TerminalOutput.__init__(self)
415
416
colors = config.interpolate(("output",), "colors")
417
if colors is None:
418
colors = COLORS_DEFAULT
419
420
self.color_skip = f"\x1b[{colors.get('skip', '2')}m"
421
self.color_success = f"\r\x1b[{colors.get('success', '1;32')}m"
422
423
def start(self, path):
424
stdout_write_flush(self.shorten(path))
425
426
def skip(self, path):
427
stdout_write(f"{self.color_skip}{self.shorten(path)}\x1b[0m\n")
428
429
def success(self, path):
430
stdout_write(f"{self.color_success}{self.shorten(path)}\x1b[0m\n")
431
432
433
class CustomOutput():
434
435
def __init__(self, options):
436
437
fmt_skip = options.get("skip")
438
fmt_start = options.get("start")
439
fmt_success = options.get("success")
440
off_skip = off_start = off_success = 0
441
442
if isinstance(fmt_skip, list):
443
off_skip, fmt_skip = fmt_skip
444
if isinstance(fmt_start, list):
445
off_start, fmt_start = fmt_start
446
if isinstance(fmt_success, list):
447
off_success, fmt_success = fmt_success
448
449
if shorten := config.get(("output",), "shorten", True):
450
func = shorten_string_eaw if shorten == "eaw" else shorten_string
451
width = shutil.get_terminal_size().columns
452
453
self._fmt_skip = self._make_func(
454
func, fmt_skip, width - off_skip)
455
self._fmt_start = self._make_func(
456
func, fmt_start, width - off_start)
457
self._fmt_success = self._make_func(
458
func, fmt_success, width - off_success)
459
else:
460
self._fmt_skip = fmt_skip.format
461
self._fmt_start = fmt_start.format
462
self._fmt_success = fmt_success.format
463
464
self._fmt_progress = (options.get("progress") or
465
"\r{0:>7}B {1:>7}B/s ").format
466
self._fmt_progress_total = (options.get("progress-total") or
467
"\r{3:>3}% {0:>7}B {1:>7}B/s ").format
468
469
def _make_func(self, shorten, format_string, limit):
470
fmt = format_string.format
471
return lambda txt: fmt(shorten(txt, limit, CHAR_ELLIPSIES))
472
473
def start(self, path):
474
stdout_write_flush(self._fmt_start(path))
475
476
def skip(self, path):
477
stdout_write(self._fmt_skip(path))
478
479
def success(self, path):
480
stdout_write(self._fmt_success(path))
481
482
def progress(self, bytes_total, bytes_downloaded, bytes_per_second):
483
bdl = util.format_value(bytes_downloaded)
484
bps = util.format_value(bytes_per_second)
485
if bytes_total is None:
486
stderr_write(self._fmt_progress(bdl, bps))
487
else:
488
stderr_write(self._fmt_progress_total(
489
bdl, bps, util.format_value(bytes_total),
490
bytes_downloaded * 100 // bytes_total))
491
492
493
class EAWCache(dict):
494
495
def __missing__(self, key):
496
width = self[key] = \
497
2 if unicodedata.east_asian_width(key) in "WF" else 1
498
return width
499
500
501
def shorten_string(txt, limit, sep="…"):
502
"""Limit width of 'txt'; assume all characters have a width of 1"""
503
if len(txt) <= limit:
504
return txt
505
limit -= len(sep)
506
return f"{txt[:limit // 2]}{sep}{txt[-((limit+1) // 2):]}"
507
508
509
def shorten_string_eaw(txt, limit, sep="…", cache=EAWCache()):
510
"""Limit width of 'txt'; check for east-asian characters with width > 1"""
511
char_widths = [cache[c] for c in txt]
512
text_width = sum(char_widths)
513
514
if text_width <= limit:
515
# no shortening required
516
return txt
517
518
limit -= len(sep)
519
if text_width == len(txt):
520
# all characters have a width of 1
521
return f"{txt[:limit // 2]}{sep}{txt[-((limit+1) // 2):]}"
522
523
# wide characters
524
left = 0
525
lwidth = limit // 2
526
while True:
527
lwidth -= char_widths[left]
528
if lwidth < 0:
529
break
530
left += 1
531
532
right = -1
533
rwidth = (limit+1) // 2 + (lwidth + char_widths[left])
534
while True:
535
rwidth -= char_widths[right]
536
if rwidth < 0:
537
break
538
right -= 1
539
540
return f"{txt[:left]}{sep}{txt[right+1:]}"
541
542