import os
import sys
import shutil
import logging
import unicodedata
from . import config, util, formatter
try:
TTY_STDOUT = sys.stdout.isatty()
except Exception:
TTY_STDOUT = False
try:
TTY_STDERR = sys.stderr.isatty()
except Exception:
TTY_STDERR = False
try:
TTY_STDIN = sys.stdin.isatty()
except Exception:
TTY_STDIN = False
COLORS_DEFAULT = {}
COLORS = not os.environ.get("NO_COLOR")
if COLORS:
if TTY_STDOUT:
COLORS_DEFAULT["success"] = "1;32"
COLORS_DEFAULT["skip"] = "2"
if TTY_STDERR:
COLORS_DEFAULT["debug"] = "0;37"
COLORS_DEFAULT["info"] = "1;37"
COLORS_DEFAULT["warning"] = "1;33"
COLORS_DEFAULT["error"] = "1;31"
if util.WINDOWS:
ANSI = COLORS and os.environ.get("TERM") == "ANSI"
OFFSET = 1
CHAR_SKIP = "# "
CHAR_SUCCESS = "* "
CHAR_ELLIPSIES = "..."
else:
ANSI = COLORS
OFFSET = 0
CHAR_SKIP = "# "
CHAR_SUCCESS = "✔ "
CHAR_ELLIPSIES = "…"
LOG_FORMAT = "[{name}][{levelname}] {message}"
LOG_FORMAT_DATE = "%Y-%m-%d %H:%M:%S"
LOG_LEVEL = logging.INFO
LOG_LEVELS = ("debug", "info", "warning", "error")
class Logger(logging.Logger):
"""Custom Logger that includes extra info in log records"""
def makeRecord(self, name, level, fn, lno, msg, args, exc_info,
func=None, extra=None, sinfo=None,
factory=logging._logRecordFactory):
rv = factory(name, level, fn, lno, msg, args, exc_info, func, sinfo)
if extra:
rv.__dict__.update(extra)
return rv
class LoggerAdapter():
"""Trimmed-down version of logging.LoggingAdapter"""
__slots__ = ("logger", "extra")
def __init__(self, logger, job):
self.logger = logger
self.extra = job._logger_extra
def debug(self, msg, *args, **kwargs):
if self.logger.isEnabledFor(logging.DEBUG):
kwargs["extra"] = self.extra
self.logger._log(logging.DEBUG, msg, args, **kwargs)
def info(self, msg, *args, **kwargs):
if self.logger.isEnabledFor(logging.INFO):
kwargs["extra"] = self.extra
self.logger._log(logging.INFO, msg, args, **kwargs)
def warning(self, msg, *args, **kwargs):
if self.logger.isEnabledFor(logging.WARNING):
kwargs["extra"] = self.extra
self.logger._log(logging.WARNING, msg, args, **kwargs)
def error(self, msg, *args, **kwargs):
if self.logger.isEnabledFor(logging.ERROR):
kwargs["extra"] = self.extra
self.logger._log(logging.ERROR, msg, args, **kwargs)
class PathfmtProxy():
__slots__ = ("job",)
def __init__(self, job):
self.job = job
def __getattribute__(self, name):
pathfmt = object.__getattribute__(self, "job").pathfmt
return getattr(pathfmt, name, None) if pathfmt else None
def __str__(self):
if pathfmt := object.__getattribute__(self, "job").pathfmt:
return pathfmt.path or pathfmt.directory
return ""
class KwdictProxy():
__slots__ = ("job",)
def __init__(self, job):
self.job = job
def __getattribute__(self, name):
pathfmt = object.__getattribute__(self, "job").pathfmt
return pathfmt.kwdict.get(name) if pathfmt else None
class Formatter(logging.Formatter):
"""Custom formatter that supports different formats per loglevel"""
def __init__(self, fmt, datefmt):
if isinstance(fmt, dict):
for key in LOG_LEVELS:
value = fmt[key] if key in fmt else LOG_FORMAT
fmt[key] = (formatter.parse(value).format_map,
"{asctime" in value)
else:
if fmt == LOG_FORMAT:
fmt = (fmt.format_map, False)
else:
fmt = (formatter.parse(fmt).format_map, "{asctime" in fmt)
fmt = {"debug": fmt, "info": fmt, "warning": fmt, "error": fmt}
self.formats = fmt
self.datefmt = datefmt
def format(self, record):
record.message = record.getMessage()
fmt, asctime = self.formats[record.levelname]
if asctime:
record.asctime = self.formatTime(record, self.datefmt)
msg = fmt(record.__dict__)
if record.exc_info and not record.exc_text:
record.exc_text = self.formatException(record.exc_info)
if record.exc_text:
msg = f"{msg}\n{record.exc_text}"
if record.stack_info:
msg = f"{msg}\n{record.stack_info}"
return msg
def initialize_logging(loglevel):
"""Setup basic logging functionality before configfiles have been loaded"""
for level in (10, 20, 30, 40, 50):
name = logging.getLevelName(level)
logging.addLevelName(level, name.lower())
logging.Logger.manager.setLoggerClass(Logger)
formatter = Formatter(LOG_FORMAT, LOG_FORMAT_DATE)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.setLevel(loglevel)
root = logging.getLogger()
root.setLevel(logging.NOTSET)
root.addHandler(handler)
return logging.getLogger("gallery-dl")
def configure_logging(loglevel):
root = logging.getLogger()
minlevel = loglevel
handler = root.handlers[0]
opts = config.interpolate(("output",), "log")
colors = config.interpolate(("output",), "colors")
if colors is None:
colors = COLORS_DEFAULT
if colors and not opts:
opts = LOG_FORMAT
if opts:
if isinstance(opts, str):
logfmt = opts
opts = {}
elif "format" in opts:
logfmt = opts["format"]
else:
logfmt = LOG_FORMAT
if not isinstance(logfmt, dict) and colors:
ansifmt = "\033[{}m{}\033[0m".format
lf = {}
for level in LOG_LEVELS:
c = colors.get(level)
lf[level] = ansifmt(c, logfmt) if c else logfmt
logfmt = lf
handler.setFormatter(Formatter(
logfmt, opts.get("format-date", LOG_FORMAT_DATE)))
if "level" in opts and handler.level == LOG_LEVEL:
handler.setLevel(opts["level"])
if minlevel > handler.level:
minlevel = handler.level
if handler := setup_logging_handler("logfile", lvl=loglevel):
root.addHandler(handler)
if minlevel > handler.level:
minlevel = handler.level
root.setLevel(minlevel)
def setup_logging_handler(key, fmt=LOG_FORMAT, lvl=LOG_LEVEL, mode="w"):
"""Setup a new logging handler"""
opts = config.interpolate(("output",), key)
if not opts:
return None
if not isinstance(opts, dict):
opts = {"path": opts}
path = opts.get("path")
mode = opts.get("mode", mode)
encoding = opts.get("encoding", "utf-8")
try:
path = util.expand_path(path)
handler = logging.FileHandler(path, mode, encoding)
except FileNotFoundError:
os.makedirs(os.path.dirname(path))
handler = logging.FileHandler(path, mode, encoding)
except (OSError, ValueError) as exc:
logging.getLogger("gallery-dl").warning(
"%s: %s", key, exc)
return None
except TypeError as exc:
logging.getLogger("gallery-dl").warning(
"%s: missing or invalid path (%s)", key, exc)
return None
handler.setLevel(opts.get("level", lvl))
handler.setFormatter(Formatter(
opts.get("format", fmt),
opts.get("format-date", LOG_FORMAT_DATE),
))
return handler
def stdout_write_flush(s):
sys.stdout.write(s)
sys.stdout.flush()
def stderr_write_flush(s):
sys.stderr.write(s)
sys.stderr.flush()
if getattr(sys.stdout, "line_buffering", None):
def stdout_write(s):
sys.stdout.write(s)
else:
stdout_write = stdout_write_flush
if getattr(sys.stderr, "line_buffering", None):
def stderr_write(s):
sys.stderr.write(s)
else:
stderr_write = stderr_write_flush
def configure_standard_streams():
for name in ("stdout", "stderr", "stdin"):
stream = getattr(sys, name, None)
if not stream:
continue
options = config.get(("output",), name)
if not options:
options = {"errors": "replace"}
elif isinstance(options, str):
options = {"errors": "replace", "encoding": options}
elif not options.get("errors"):
options["errors"] = "replace"
stream.reconfigure(**options)
def select():
"""Select a suitable output class"""
mode = config.get(("output",), "mode")
if mode is None or mode == "auto":
try:
if TTY_STDOUT:
output = ColorOutput() if ANSI else TerminalOutput()
else:
output = PipeOutput()
except Exception:
output = PipeOutput()
elif isinstance(mode, dict):
output = CustomOutput(mode)
elif not mode:
output = NullOutput()
else:
output = {
"default" : PipeOutput,
"pipe" : PipeOutput,
"term" : TerminalOutput,
"terminal": TerminalOutput,
"color" : ColorOutput,
"null" : NullOutput,
}[mode.lower()]()
if not config.get(("output",), "skip", True):
output.skip = util.identity
return output
class NullOutput():
def start(self, path):
"""Print a message indicating the start of a download"""
def skip(self, path):
"""Print a message indicating that a download has been skipped"""
def success(self, path):
"""Print a message indicating the completion of a download"""
def progress(self, bytes_total, bytes_downloaded, bytes_per_second):
"""Display download progress"""
class PipeOutput(NullOutput):
def skip(self, path):
stdout_write(f"{CHAR_SKIP}{path}\n")
def success(self, path):
stdout_write(f"{path}\n")
class TerminalOutput():
def __init__(self):
if shorten := config.get(("output",), "shorten", True):
func = shorten_string_eaw if shorten == "eaw" else shorten_string
limit = shutil.get_terminal_size().columns - OFFSET
sep = CHAR_ELLIPSIES
self.shorten = lambda txt: func(txt, limit, sep)
else:
self.shorten = util.identity
def start(self, path):
stdout_write_flush(self.shorten(f" {path}"))
def skip(self, path):
stdout_write(f"{self.shorten(CHAR_SKIP + path)}\n")
def success(self, path):
stdout_write(f"\r{self.shorten(CHAR_SUCCESS + path)}\n")
def progress(self, bytes_total, bytes_downloaded, bytes_per_second):
bdl = util.format_value(bytes_downloaded)
bps = util.format_value(bytes_per_second)
if bytes_total is None:
stderr_write(f"\r{bdl:>7}B {bps:>7}B/s ")
else:
stderr_write(f"\r{bytes_downloaded * 100 // bytes_total:>3}% "
f"{bdl:>7}B {bps:>7}B/s ")
class ColorOutput(TerminalOutput):
def __init__(self):
TerminalOutput.__init__(self)
colors = config.interpolate(("output",), "colors")
if colors is None:
colors = COLORS_DEFAULT
self.color_skip = f"\x1b[{colors.get('skip', '2')}m"
self.color_success = f"\r\x1b[{colors.get('success', '1;32')}m"
def start(self, path):
stdout_write_flush(self.shorten(path))
def skip(self, path):
stdout_write(f"{self.color_skip}{self.shorten(path)}\x1b[0m\n")
def success(self, path):
stdout_write(f"{self.color_success}{self.shorten(path)}\x1b[0m\n")
class CustomOutput():
def __init__(self, options):
fmt_skip = options.get("skip")
fmt_start = options.get("start")
fmt_success = options.get("success")
off_skip = off_start = off_success = 0
if isinstance(fmt_skip, list):
off_skip, fmt_skip = fmt_skip
if isinstance(fmt_start, list):
off_start, fmt_start = fmt_start
if isinstance(fmt_success, list):
off_success, fmt_success = fmt_success
if shorten := config.get(("output",), "shorten", True):
func = shorten_string_eaw if shorten == "eaw" else shorten_string
width = shutil.get_terminal_size().columns
self._fmt_skip = self._make_func(
func, fmt_skip, width - off_skip)
self._fmt_start = self._make_func(
func, fmt_start, width - off_start)
self._fmt_success = self._make_func(
func, fmt_success, width - off_success)
else:
self._fmt_skip = fmt_skip.format
self._fmt_start = fmt_start.format
self._fmt_success = fmt_success.format
self._fmt_progress = (options.get("progress") or
"\r{0:>7}B {1:>7}B/s ").format
self._fmt_progress_total = (options.get("progress-total") or
"\r{3:>3}% {0:>7}B {1:>7}B/s ").format
def _make_func(self, shorten, format_string, limit):
fmt = format_string.format
return lambda txt: fmt(shorten(txt, limit, CHAR_ELLIPSIES))
def start(self, path):
stdout_write_flush(self._fmt_start(path))
def skip(self, path):
stdout_write(self._fmt_skip(path))
def success(self, path):
stdout_write(self._fmt_success(path))
def progress(self, bytes_total, bytes_downloaded, bytes_per_second):
bdl = util.format_value(bytes_downloaded)
bps = util.format_value(bytes_per_second)
if bytes_total is None:
stderr_write(self._fmt_progress(bdl, bps))
else:
stderr_write(self._fmt_progress_total(
bdl, bps, util.format_value(bytes_total),
bytes_downloaded * 100 // bytes_total))
class EAWCache(dict):
def __missing__(self, key):
width = self[key] = \
2 if unicodedata.east_asian_width(key) in "WF" else 1
return width
def shorten_string(txt, limit, sep="…"):
"""Limit width of 'txt'; assume all characters have a width of 1"""
if len(txt) <= limit:
return txt
limit -= len(sep)
return f"{txt[:limit // 2]}{sep}{txt[-((limit+1) // 2):]}"
def shorten_string_eaw(txt, limit, sep="…", cache=EAWCache()):
"""Limit width of 'txt'; check for east-asian characters with width > 1"""
char_widths = [cache[c] for c in txt]
text_width = sum(char_widths)
if text_width <= limit:
return txt
limit -= len(sep)
if text_width == len(txt):
return f"{txt[:limit // 2]}{sep}{txt[-((limit+1) // 2):]}"
left = 0
lwidth = limit // 2
while True:
lwidth -= char_widths[left]
if lwidth < 0:
break
left += 1
right = -1
rwidth = (limit+1) // 2 + (lwidth + char_widths[left])
while True:
rwidth -= char_widths[right]
if rwidth < 0:
break
right -= 1
return f"{txt[:left]}{sep}{txt[right+1:]}"