Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/formatter.py
8837 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2021-2026 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
"""String formatters"""
10
11
import os
12
import sys
13
import time
14
import string
15
import _string
16
import operator
17
from . import text, util, dt
18
19
NONE = util.NONE
20
21
22
def parse(format_string, default=NONE, fmt=format):
23
key = format_string, default, fmt
24
25
try:
26
return _CACHE[key]
27
except KeyError:
28
pass
29
30
if format_string and format_string[0] == "\f":
31
kind, _, format_string = format_string.partition(" ")
32
try:
33
cls = _FORMATTERS[kind[1:]]
34
except KeyError:
35
import logging
36
logging.getLogger("formatter").error(
37
"Invalid formatter type '%s'", kind[1:])
38
cls = StringFormatter
39
else:
40
cls = StringFormatter
41
42
try:
43
formatter = _CACHE[key] = cls(format_string, default, fmt)
44
except Exception as exc:
45
import logging
46
logging.getLogger("formatter").error(
47
"Invalid format string '%s' (%s: %s)",
48
format_string, exc.__class__.__name__, exc)
49
raise
50
51
return formatter
52
53
54
class StringFormatter():
55
"""Custom, extended version of string.Formatter
56
57
This string formatter implementation is a mostly performance-optimized
58
variant of the original string.Formatter class. Unnecessary features have
59
been removed (positional arguments, unused argument check) and new
60
formatting options have been added.
61
62
Extra Conversions:
63
- "l": calls str.lower on the target value
64
- "u": calls str.upper
65
- "c": calls str.capitalize
66
- "C": calls string.capwords
67
- "g": calls text.slugify()
68
- "j": calls json.dumps
69
- "t": calls str.strip
70
- "T": calls dt.to_ts_string()
71
- "d": calls dt.parse_ts()
72
- "s": calls str()
73
- "S": calls util.to_string()
74
- "U": calls urllib.parse.unescape
75
- "r": calls repr()
76
- "a": calls ascii()
77
- Example: {f!l} -> "example"; {f!u} -> "EXAMPLE"
78
79
# Go to _CONVERSIONS and _SPECIFIERS below to se all of them, read:
80
# https://github.com/mikf/gallery-dl/blob/master/docs/formatting.md
81
82
Extra Format Specifiers:
83
- "?<before>/<after>/":
84
Adds <before> and <after> to the actual value if it evaluates to True.
85
Otherwise the whole replacement field becomes an empty string.
86
Example: {f:?-+/+-/} -> "-+Example+-" (if "f" contains "Example")
87
-> "" (if "f" is None, 0, "")
88
89
- "L<maxlen>/<replacement>/":
90
Replaces the output with <replacement> if its length (in characters)
91
exceeds <maxlen>. Otherwise everything is left as is.
92
Example: {f:L5/too long/} -> "foo" (if "f" is "foo")
93
-> "too long" (if "f" is "foobar")
94
95
- "J<separator>/":
96
Joins elements of a list (or string) using <separator>
97
Example: {f:J - /} -> "a - b - c" (if "f" is ["a", "b", "c"])
98
99
- "R<old>/<new>/":
100
Replaces all occurrences of <old> with <new>
101
Example: {f:R /_/} -> "f_o_o_b_a_r" (if "f" is "f o o b a r")
102
"""
103
104
def __init__(self, format_string, default=NONE, fmt=format):
105
self.default = default
106
self.format = fmt
107
self.result = []
108
self.fields = []
109
110
for literal_text, field_name, format_spec, conv in \
111
_string.formatter_parser(format_string):
112
if literal_text:
113
self.result.append(literal_text)
114
if field_name:
115
self.fields.append((
116
len(self.result),
117
self._field_access(field_name, format_spec, conv),
118
))
119
self.result.append("")
120
121
if len(self.result) == 1:
122
if self.fields:
123
self.format_map = self.fields[0][1]
124
else:
125
self.format_map = lambda _: format_string
126
del self.result, self.fields
127
128
def format_map(self, kwdict):
129
"""Apply 'kwdict' to the initial format_string and return its result"""
130
result = self.result
131
for index, func in self.fields:
132
result[index] = func(kwdict)
133
return "".join(result)
134
135
def _field_access(self, field_name, format_spec, conversion):
136
fmt = self._parse_format_spec(format_spec, conversion)
137
138
if "|" in field_name:
139
return self._apply_list([
140
parse_field_name(fn)
141
for fn in field_name.split("|")
142
], fmt)
143
else:
144
key, funcs = parse_field_name(field_name)
145
return self._apply(key, funcs, fmt)
146
147
def _apply(self, key, funcs, fmt):
148
if key in _GLOBALS:
149
def wrap(_):
150
try:
151
obj = gobj()
152
for func in funcs:
153
obj = func(obj)
154
except Exception:
155
obj = self.default
156
return fmt(obj)
157
gobj = _GLOBALS[key]
158
elif funcs:
159
def wrap(kwdict):
160
try:
161
obj = kwdict[key]
162
for func in funcs:
163
obj = func(obj)
164
except Exception:
165
obj = self.default
166
return fmt(obj)
167
else:
168
def wrap(kwdict):
169
return fmt(kwdict[key] if key in kwdict else self.default)
170
del funcs
171
return wrap
172
173
def _apply_list(self, lst, fmt):
174
def wrap(kwdict):
175
for key, funcs in lst:
176
try:
177
obj = _GLOBALS[key]() if key in _GLOBALS else kwdict[key]
178
for func in funcs:
179
obj = func(obj)
180
if obj:
181
break
182
except Exception:
183
obj = None
184
else:
185
if obj is None:
186
obj = self.default
187
return fmt(obj)
188
return wrap
189
190
def _parse_format_spec(self, format_spec, conversion):
191
fmt = _build_format_func(format_spec, self.format)
192
if not conversion:
193
return fmt
194
195
conversion = _CONVERSIONS[conversion]
196
if fmt is self.format:
197
return conversion
198
else:
199
return lambda obj: fmt(conversion(obj))
200
201
202
class ExpressionFormatter():
203
"""Generate text by evaluating a Python expression"""
204
205
def __init__(self, expression, default=NONE, fmt=None):
206
self.format_map = util.compile_expression(expression)
207
208
209
class FStringFormatter():
210
"""Generate text by evaluating an f-string literal"""
211
212
def __init__(self, fstring, default=NONE, fmt=None):
213
self.format_map = util.compile_expression(f'f"""{fstring}"""')
214
215
216
def _init_jinja():
217
import jinja2
218
from . import config
219
220
if opts := config.get((), "jinja"):
221
JinjaFormatter.env = env = jinja2.Environment(
222
**opts.get("environment") or {})
223
else:
224
JinjaFormatter.env = jinja2.Environment()
225
return
226
227
if policies := opts.get("policies"):
228
env.policies.update(policies)
229
230
if path := opts.get("filters"):
231
module = util.import_file(path).__dict__
232
env.filters.update(
233
module["__filters__"] if "__filters__" in module else module)
234
235
if path := opts.get("tests"):
236
module = util.import_file(path).__dict__
237
env.tests.update(
238
module["__tests__"] if "__tests__" in module else module)
239
240
241
class JinjaFormatter():
242
"""Generate text by evaluating a Jinja template string"""
243
env = None
244
245
def __init__(self, source, default=NONE, fmt=None):
246
if self.env is None:
247
_init_jinja()
248
self.format_map = self.env.from_string(source).render
249
250
251
class ModuleFormatter():
252
"""Generate text by calling an external function"""
253
254
def __init__(self, function_spec, default=NONE, fmt=None):
255
module_name, _, function_name = function_spec.rpartition(":")
256
module = util.import_file(module_name)
257
self.format_map = getattr(module, function_name)
258
259
260
class TemplateFormatter(StringFormatter):
261
"""Read format_string from file"""
262
263
def __init__(self, path, default=NONE, fmt=format):
264
with open(util.expand_path(path), encoding="utf-8") as fp:
265
format_string = fp.read()
266
StringFormatter.__init__(self, format_string, default, fmt)
267
268
269
class TemplateFStringFormatter(FStringFormatter):
270
"""Read f-string from file"""
271
272
def __init__(self, path, default=NONE, fmt=None):
273
with open(util.expand_path(path), encoding="utf-8") as fp:
274
fstring = fp.read()
275
FStringFormatter.__init__(self, fstring, default, fmt)
276
277
278
class TemplateJinjaFormatter(JinjaFormatter):
279
"""Generate text by evaluating a Jinja template"""
280
281
def __init__(self, path, default=NONE, fmt=None):
282
with open(util.expand_path(path), encoding="utf-8") as fp:
283
source = fp.read()
284
JinjaFormatter.__init__(self, source, default, fmt)
285
286
287
def parse_field_name(field_name):
288
if field_name[0] == "'":
289
return "_lit", (operator.itemgetter(field_name[1:-1]),)
290
291
first, rest = _string.formatter_field_name_split(field_name)
292
funcs = []
293
294
for is_attr, key in rest:
295
if is_attr:
296
func = _attrgetter
297
else:
298
func = operator.itemgetter
299
try:
300
if ":" in key:
301
if key[0] == "b":
302
func = _bytesgetter
303
key = _slice(key[1:])
304
else:
305
key = _slice(key)
306
elif key[0] == "-":
307
key = int(key)
308
else:
309
key = key.strip("\"'")
310
except TypeError:
311
pass # key is an integer
312
313
funcs.append(func(key))
314
315
return first, funcs
316
317
318
def _slice(indices):
319
start, _, stop = indices.partition(":")
320
stop, _, step = stop.partition(":")
321
return slice(
322
int(start) if start else None,
323
int(stop) if stop else None,
324
int(step) if step else None,
325
)
326
327
328
def _attrgetter(key):
329
330
if key.isdecimal() or key[0] == "-":
331
try:
332
return operator.itemgetter(int(key))
333
except ValueError:
334
pass
335
336
def apply_key(obj):
337
try:
338
return obj[key]
339
except (TypeError, KeyError):
340
return getattr(obj, key)
341
return apply_key
342
343
344
def _bytesgetter(slice):
345
346
def apply_slice_bytes(obj):
347
return obj.encode(_ENCODING)[slice].decode(_ENCODING, "ignore")
348
349
return apply_slice_bytes
350
351
352
def _build_format_func(format_spec, default):
353
if format_spec:
354
return _FORMAT_SPECIFIERS.get(
355
format_spec[0], _default_format)(format_spec, default)
356
return default
357
358
359
def _parse_optional(format_spec, default):
360
before, after, format_spec = format_spec.split(_SEPARATOR, 2)
361
before = before[1:]
362
fmt = _build_format_func(format_spec, default)
363
364
def optional(obj):
365
return f"{before}{fmt(obj)}{after}" if obj else ""
366
return optional
367
368
369
def _parse_slice(format_spec, default):
370
indices, _, format_spec = format_spec.partition("]")
371
fmt = _build_format_func(format_spec, default)
372
373
if indices[1] == "b":
374
slice_bytes = _bytesgetter(_slice(indices[2:]))
375
376
def apply_slice(obj):
377
return fmt(slice_bytes(obj))
378
379
else:
380
slice = _slice(indices[1:])
381
382
def apply_slice(obj):
383
return fmt(obj[slice])
384
385
return apply_slice
386
387
388
def _parse_arithmetic(format_spec, default):
389
op, _, format_spec = format_spec.partition(_SEPARATOR)
390
fmt = _build_format_func(format_spec, default)
391
392
value = int(op[2:])
393
op = op[1]
394
395
if op == "+":
396
return lambda obj: fmt(obj + value)
397
if op == "-":
398
return lambda obj: fmt(obj - value)
399
if op == "*":
400
return lambda obj: fmt(obj * value)
401
402
return fmt
403
404
405
def _parse_conversion(format_spec, default):
406
conversions, _, format_spec = format_spec.partition(_SEPARATOR)
407
convs = [_CONVERSIONS[c] for c in conversions[1:]]
408
fmt = _build_format_func(format_spec, default)
409
410
if len(conversions) <= 2:
411
412
def convert_one(obj):
413
return fmt(conv(obj))
414
conv = _CONVERSIONS[conversions[1]]
415
return convert_one
416
417
def convert_many(obj):
418
for conv in convs:
419
obj = conv(obj)
420
return fmt(obj)
421
convs = [_CONVERSIONS[c] for c in conversions[1:]]
422
return convert_many
423
424
425
def _parse_maxlen(format_spec, default):
426
maxlen, replacement, format_spec = format_spec.split(_SEPARATOR, 2)
427
fmt = _build_format_func(format_spec, default)
428
429
if maxlen[1] == "b":
430
maxlen = text.parse_int(maxlen[2:])
431
432
def mlen(obj):
433
obj = fmt(obj)
434
return obj if len(obj.encode(_ENCODING)) <= maxlen else replacement
435
else:
436
maxlen = text.parse_int(maxlen[1:])
437
438
def mlen(obj):
439
obj = fmt(obj)
440
return obj if len(obj) <= maxlen else replacement
441
return mlen
442
443
444
def _parse_identity(format_spec, default):
445
return util.identity
446
447
448
def _parse_join(format_spec, default):
449
separator, _, format_spec = format_spec.partition(_SEPARATOR)
450
join = separator[1:].join
451
fmt = _build_format_func(format_spec, default)
452
453
def apply_join(obj):
454
if isinstance(obj, str):
455
return fmt(obj)
456
return fmt(join(obj))
457
return apply_join
458
459
460
def _parse_map(format_spec, default):
461
key, _, format_spec = format_spec.partition(_SEPARATOR)
462
key = key[1:]
463
fmt = _build_format_func(format_spec, default)
464
465
def map_(obj):
466
if not obj or isinstance(obj, str):
467
return fmt(obj)
468
469
results = []
470
for item in obj:
471
if isinstance(item, dict):
472
value = item.get(key, ...)
473
results.append(default if value is ... else value)
474
else:
475
results.append(item)
476
return fmt(results)
477
478
return map_
479
480
481
def _parse_replace(format_spec, default):
482
old, new, format_spec = format_spec.split(_SEPARATOR, 2)
483
old = old[1:]
484
fmt = _build_format_func(format_spec, default)
485
486
def replace(obj):
487
return fmt(obj.replace(old, new))
488
return replace
489
490
491
def _parse_datetime(format_spec, default):
492
dt_format, _, format_spec = format_spec.partition(_SEPARATOR)
493
dt_format = dt_format[1:]
494
fmt = _build_format_func(format_spec, default)
495
496
def dt_parse(obj):
497
return fmt(dt.parse(obj, dt_format))
498
return dt_parse
499
500
501
def _parse_offset(format_spec, default):
502
offset, _, format_spec = format_spec.partition(_SEPARATOR)
503
offset = offset[1:]
504
fmt = _build_format_func(format_spec, default)
505
506
if not offset or offset == "local":
507
def off(dt_utc):
508
local = time.localtime(dt.to_ts(dt_utc))
509
return fmt(dt_utc + dt.timedelta(0, local.tm_gmtoff))
510
else:
511
hours, _, minutes = offset.partition(":")
512
offset = 3600 * int(hours)
513
if minutes:
514
offset += 60 * (int(minutes) if offset > 0 else -int(minutes))
515
offset = dt.timedelta(0, offset)
516
517
def off(obj):
518
return fmt(obj + offset)
519
return off
520
521
522
def _parse_sort(format_spec, default):
523
args, _, format_spec = format_spec.partition(_SEPARATOR)
524
fmt = _build_format_func(format_spec, default)
525
526
if "d" in args or "r" in args:
527
def sort(obj):
528
return fmt(sorted(obj, reverse=True))
529
else:
530
def sort(obj):
531
return fmt(sorted(obj))
532
return sort
533
534
535
def _parse_limit(format_spec, default):
536
limit, hint, format_spec = format_spec.split(_SEPARATOR, 2)
537
fmt = _build_format_func(format_spec, default)
538
539
if limit[1] == "b":
540
hint = hint.encode(_ENCODING)
541
limit = int(limit[2:])
542
limit_hint = limit - len(hint)
543
544
def apply_limit(obj):
545
objb = obj.encode(_ENCODING)
546
if len(objb) > limit:
547
obj = (objb[:limit_hint] + hint).decode(_ENCODING, "ignore")
548
return fmt(obj)
549
else:
550
limit = int(limit[1:])
551
limit_hint = limit - len(hint)
552
553
def apply_limit(obj):
554
if len(obj) > limit:
555
obj = obj[:limit_hint] + hint
556
return fmt(obj)
557
return apply_limit
558
559
560
def _default_format(format_spec, default):
561
def wrap(obj):
562
return format(obj, format_spec)
563
return wrap
564
565
566
class Literal():
567
# __getattr__, __getattribute__, and __class_getitem__
568
# are all slower than regular __getitem__
569
570
def __getitem__(self, key):
571
return key
572
573
574
_literal = Literal()
575
576
_CACHE = {}
577
_ENCODING = sys.getfilesystemencoding()
578
_SEPARATOR = "/"
579
_FORMATTERS = {
580
"E" : ExpressionFormatter,
581
"F" : FStringFormatter,
582
"J" : JinjaFormatter,
583
"M" : ModuleFormatter,
584
"S" : StringFormatter,
585
"T" : TemplateFormatter,
586
"TF": TemplateFStringFormatter,
587
"FT": TemplateFStringFormatter,
588
"TJ": TemplateJinjaFormatter,
589
"JT": TemplateJinjaFormatter,
590
}
591
_GLOBALS = {
592
"_env": lambda: os.environ,
593
"_lit": lambda: _literal,
594
"_now": dt.datetime.now,
595
"_nul": lambda: util.NONE,
596
}
597
_CONVERSIONS = {
598
"l": str.lower,
599
"u": str.upper,
600
"c": str.capitalize,
601
"C": string.capwords,
602
"j": util.json_dumps,
603
"t": str.strip,
604
"n": len,
605
"L": util.code_to_language,
606
"T": dt.to_ts_string,
607
"d": dt.parse_ts,
608
"D": dt.convert,
609
"q": text.quote,
610
"Q": text.unquote,
611
"U": text.unescape,
612
"H": lambda s: text.unescape(text.remove_html(s)),
613
"g": text.slugify,
614
"R": text.extract_urls,
615
"W": text.sanitize_whitespace,
616
"S": util.to_string,
617
"s": str,
618
"r": repr,
619
"a": ascii,
620
"i": int,
621
"f": float,
622
}
623
_FORMAT_SPECIFIERS = {
624
"?": _parse_optional,
625
"[": _parse_slice,
626
"A": _parse_arithmetic,
627
"C": _parse_conversion,
628
"D": _parse_datetime,
629
"I": _parse_identity,
630
"J": _parse_join,
631
"L": _parse_maxlen,
632
"M": _parse_map,
633
"O": _parse_offset,
634
"R": _parse_replace,
635
"S": _parse_sort,
636
"X": _parse_limit,
637
}
638
639