Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/util.py
8803 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2017-2026 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
"""Utility functions and classes"""
10
11
import os
12
import sys
13
import json
14
import time
15
import random
16
import getpass
17
import hashlib
18
import binascii
19
import functools
20
import itertools
21
import subprocess
22
import collections
23
import urllib.parse
24
from http.cookiejar import Cookie
25
from email.utils import mktime_tz, parsedate_tz
26
from . import text, dt, version, exception
27
28
29
def bencode(num, alphabet="0123456789"):
30
"""Encode an integer into a base-N encoded string"""
31
data = ""
32
base = len(alphabet)
33
while num:
34
num, remainder = divmod(num, base)
35
data = alphabet[remainder] + data
36
return data
37
38
39
def bdecode(data, alphabet="0123456789"):
40
"""Decode a base-N encoded string ( N = len(alphabet) )"""
41
num = 0
42
base = len(alphabet)
43
for c in data:
44
num = num * base + alphabet.find(c)
45
return num
46
47
48
def decrypt_xor(encrypted, key, base64=True, fromhex=False):
49
if base64:
50
encrypted = binascii.a2b_base64(encrypted)
51
if fromhex:
52
encrypted = bytes.fromhex(encrypted.decode())
53
54
div = len(key)
55
return bytes([
56
encrypted[i] ^ key[i % div]
57
for i in range(len(encrypted))
58
]).decode()
59
60
61
def advance(iterable, num):
62
""""Advance 'iterable' by 'num' steps"""
63
iterator = iter(iterable)
64
next(itertools.islice(iterator, num, num), None)
65
return iterator
66
67
68
def repeat(times):
69
"""Return an iterator that returns None"""
70
if times < 0:
71
return itertools.repeat(None)
72
return itertools.repeat(None, times)
73
74
75
def unique(iterable):
76
"""Yield unique elements from 'iterable' while preserving order"""
77
seen = set()
78
add = seen.add
79
for element in iterable:
80
if element not in seen:
81
add(element)
82
yield element
83
84
85
def unique_sequence(iterable):
86
"""Yield sequentially unique elements from 'iterable'"""
87
last = None
88
for element in iterable:
89
if element != last:
90
last = element
91
yield element
92
93
94
def contains(values, elements, separator=" "):
95
"""Returns True if at least one of 'elements' is contained in 'values'"""
96
if isinstance(values, str) and (separator or separator is None):
97
values = values.split(separator)
98
99
if not isinstance(elements, (tuple, list)):
100
return elements in values
101
102
for e in elements:
103
if e in values:
104
return True
105
return False
106
107
108
def raises(cls):
109
"""Returns a function that raises 'cls' as exception"""
110
def wrap(*args):
111
raise cls(*args)
112
return wrap
113
114
115
def identity(x, _=None):
116
"""Returns its argument"""
117
return x
118
119
120
def true(_, __=None):
121
"""Always returns True"""
122
return True
123
124
125
def false(_, __=None):
126
"""Always returns False"""
127
return False
128
129
130
def noop(_=None):
131
"""Does nothing"""
132
133
134
def md5(s):
135
"""Generate MD5 hexdigest of 's'"""
136
if not s:
137
s = b""
138
elif isinstance(s, str):
139
s = s.encode()
140
return hashlib.md5(s).hexdigest()
141
142
143
def sha1(s):
144
"""Generate SHA1 hexdigest of 's'"""
145
if not s:
146
s = b""
147
elif isinstance(s, str):
148
s = s.encode()
149
return hashlib.sha1(s).hexdigest()
150
151
152
def generate_token(size=16):
153
"""Generate a random token with hexadecimal digits"""
154
return random.getrandbits(size * 8).to_bytes(size, "big").hex()
155
156
157
def format_value(value, suffixes="kMGTPEZY"):
158
value = str(value)
159
value_len = len(value)
160
index = value_len - 4
161
if index >= 0:
162
offset = (value_len - 1) % 3 + 1
163
return (f"{value[:offset]}.{value[offset:offset+2]}"
164
f"{suffixes[index // 3]}")
165
return value
166
167
168
def combine_dict(a, b):
169
"""Recursively combine the contents of 'b' into 'a'"""
170
for key, value in b.items():
171
if key in a and isinstance(value, dict) and isinstance(a[key], dict):
172
combine_dict(a[key], value)
173
else:
174
a[key] = value
175
return a
176
177
178
def transform_dict(a, func):
179
"""Recursively apply 'func' to all values in 'a'"""
180
for key, value in a.items():
181
if isinstance(value, dict):
182
transform_dict(value, func)
183
else:
184
a[key] = func(value)
185
186
187
def filter_dict(a):
188
"""Return a copy of 'a' without "private" entries"""
189
return {k: v for k, v in a.items() if k[0] != "_"}
190
191
192
def delete_items(obj, keys):
193
"""Remove all 'keys' from 'obj'"""
194
for key in keys:
195
if key in obj:
196
del obj[key]
197
198
199
def enumerate_reversed(iterable, start=0, length=None):
200
"""Enumerate 'iterable' and return its elements in reverse order"""
201
if length is None:
202
length = len(iterable)
203
204
try:
205
iterable = zip(range(start-1+length, start-1, -1), reversed(iterable))
206
except TypeError:
207
iterable = list(zip(range(start, start+length), iterable))
208
iterable.reverse()
209
210
return iterable
211
212
213
def number_to_string(value, numbers=(int, float)):
214
"""Convert numbers (int, float) to string; Return everything else as is."""
215
return str(value) if value.__class__ in numbers else value
216
217
218
def to_string(value):
219
"""str() with "better" defaults"""
220
if not value:
221
return ""
222
if value.__class__ is list:
223
try:
224
return ", ".join(value)
225
except Exception:
226
return ", ".join(map(str, value))
227
return str(value)
228
229
230
def json_default(obj):
231
if isinstance(obj, CustomNone):
232
return None
233
return str(obj)
234
235
236
json_loads = json._default_decoder.decode
237
json_dumps = json.JSONEncoder(
238
check_circular=False,
239
separators=(",", ":"),
240
default=json_default,
241
).encode
242
243
244
def dump_json(obj, fp=sys.stdout, ensure_ascii=True, indent=4):
245
"""Serialize 'obj' as JSON and write it to 'fp'"""
246
json.dump(
247
obj, fp,
248
ensure_ascii=ensure_ascii,
249
indent=indent,
250
default=json_default,
251
sort_keys=True,
252
)
253
fp.write("\n")
254
255
256
def dump_response(response, fp, headers=False, content=True, hide_auth=True):
257
"""Write the contents of 'response' into a file-like object"""
258
259
if headers:
260
request = response.request
261
req_headers = request.headers.copy()
262
res_headers = response.headers.copy()
263
264
if hide_auth:
265
if authorization := req_headers.get("Authorization"):
266
atype, sep, _ = str(authorization).partition(" ")
267
req_headers["Authorization"] = f"{atype} ***" if sep else "***"
268
269
if cookie := req_headers.get("Cookie"):
270
req_headers["Cookie"] = ";".join(
271
c.partition("=")[0] + "=***"
272
for c in cookie.split(";")
273
)
274
275
if set_cookie := res_headers.get("Set-Cookie"):
276
res_headers["Set-Cookie"] = re(r"(^|, )([^ =]+)=[^,;]*").sub(
277
r"\1\2=***", set_cookie)
278
279
request_headers = "\n".join(
280
f"{name}: {value}"
281
for name, value in req_headers.items()
282
)
283
response_headers = "\n".join(
284
f"{name}: {value}"
285
for name, value in res_headers.items()
286
)
287
288
output = f"""\
289
{request.method} {request.url}
290
Status: {response.status_code} {response.reason}
291
292
Request Headers
293
---------------
294
{request_headers}
295
"""
296
if request.body:
297
output = f"""{output}
298
Request Body
299
------------
300
{request.body}
301
"""
302
output = f"""{output}
303
Response Headers
304
----------------
305
{response_headers}
306
"""
307
fp.write(output.encode())
308
309
if content:
310
if headers:
311
fp.write(b"\nContent\n-------\n")
312
fp.write(response.content)
313
314
315
def extract_headers(response):
316
headers = response.headers
317
data = dict(headers)
318
319
if hcd := headers.get("content-disposition"):
320
if name := text.extr(hcd, 'filename="', '"'):
321
text.nameext_from_url(name, data)
322
323
if hlm := headers.get("last-modified"):
324
data["date"] = dt.datetime(*parsedate_tz(hlm)[:6])
325
326
return data
327
328
329
def detect_challenge(response):
330
server = response.headers.get("server")
331
if not server:
332
return
333
334
elif server.startswith("cloudflare"):
335
if response.status_code not in (403, 503):
336
return
337
338
mitigated = response.headers.get("cf-mitigated")
339
if mitigated and mitigated.lower() == "challenge":
340
return "Cloudflare challenge"
341
342
content = response.content
343
if b"_cf_chl_opt" in content or b"jschl-answer" in content:
344
return "Cloudflare challenge"
345
elif b'name="captcha-bypass"' in content:
346
return "Cloudflare CAPTCHA"
347
348
elif server.startswith("ddos-guard"):
349
if response.status_code == 403 and \
350
b"/ddos-guard/js-challenge/" in response.content:
351
return "DDoS-Guard challenge"
352
353
354
@functools.lru_cache(maxsize=None)
355
def git_head():
356
try:
357
out, err = Popen(
358
("git", "rev-parse", "--short", "HEAD"),
359
stdout=subprocess.PIPE,
360
stderr=subprocess.PIPE,
361
cwd=os.path.dirname(os.path.abspath(__file__)),
362
).communicate()
363
if out and not err:
364
return out.decode().rstrip()
365
except (OSError, subprocess.SubprocessError):
366
pass
367
return None
368
369
370
def expand_path(path):
371
"""Expand environment variables and tildes (~)"""
372
if not path:
373
return path
374
if not isinstance(path, str):
375
path = os.path.join(*path)
376
return os.path.expandvars(os.path.expanduser(path))
377
378
379
def remove_file(path):
380
try:
381
os.unlink(path)
382
except OSError:
383
pass
384
385
386
def remove_directory(path):
387
try:
388
os.rmdir(path)
389
except OSError:
390
pass
391
392
393
def set_mtime(path, mtime):
394
try:
395
if isinstance(mtime, str):
396
mtime = mktime_tz(parsedate_tz(mtime))
397
os.utime(path, (time.time(), mtime))
398
except Exception:
399
pass
400
401
402
def cookiestxt_load(fp):
403
"""Parse a Netscape cookies.txt file and add return its Cookies"""
404
cookies = []
405
406
for line in fp:
407
408
line = line.lstrip(" ")
409
# strip '#HttpOnly_'
410
if line.startswith("#HttpOnly_"):
411
line = line[10:]
412
# ignore empty lines and comments
413
if not line or line[0] in ("#", "$", "\n"):
414
continue
415
# strip trailing '\n'
416
if line[-1] == "\n":
417
line = line[:-1]
418
419
domain, domain_specified, path, secure, expires, name, value = \
420
line.split("\t")
421
422
if not name:
423
name = value
424
value = None
425
426
cookies.append(Cookie(
427
0, name, value,
428
None, False,
429
domain,
430
domain_specified == "TRUE",
431
domain[0] == "." if domain else False,
432
path, False,
433
secure == "TRUE",
434
None if expires == "0" or not expires else expires,
435
False, None, None, {},
436
))
437
438
return cookies
439
440
441
def cookiestxt_store(fp, cookies):
442
"""Write 'cookies' in Netscape cookies.txt format to 'fp'"""
443
fp.write("# Netscape HTTP Cookie File\n\n")
444
445
for cookie in cookies:
446
if not cookie.domain:
447
continue
448
449
if cookie.value is None:
450
name = ""
451
value = cookie.name
452
else:
453
name = cookie.name
454
value = cookie.value
455
456
domain = cookie.domain
457
fp.write(
458
f"{domain}\t"
459
f"{'TRUE' if domain and domain[0] == '.' else 'FALSE'}\t"
460
f"{cookie.path}\t"
461
f"{'TRUE' if cookie.secure else 'FALSE'}\t"
462
f"{'0' if cookie.expires is None else str(cookie.expires)}\t"
463
f"{name}\t"
464
f"{value}\n"
465
)
466
467
468
def code_to_language(code, default=None):
469
"""Map an ISO 639-1 language code to its actual name"""
470
return CODES.get((code or "").lower(), default)
471
472
473
def language_to_code(lang, default=None):
474
"""Map a language name to its ISO 639-1 code"""
475
if lang is None:
476
return default
477
lang = lang.capitalize()
478
for code, language in CODES.items():
479
if language == lang:
480
return code
481
return default
482
483
484
CODES = {
485
"ar": "Arabic",
486
"bg": "Bulgarian",
487
"bn": "Bengali",
488
"ca": "Catalan",
489
"cs": "Czech",
490
"da": "Danish",
491
"de": "German",
492
"el": "Greek",
493
"en": "English",
494
"es": "Spanish",
495
"fa": "Persian",
496
"fi": "Finnish",
497
"fr": "French",
498
"he": "Hebrew",
499
"hi": "Hindi",
500
"hu": "Hungarian",
501
"id": "Indonesian",
502
"it": "Italian",
503
"ja": "Japanese",
504
"ko": "Korean",
505
"ms": "Malay",
506
"nl": "Dutch",
507
"no": "Norwegian",
508
"pl": "Polish",
509
"pt": "Portuguese",
510
"ro": "Romanian",
511
"ru": "Russian",
512
"sk": "Slovak",
513
"sl": "Slovenian",
514
"sr": "Serbian",
515
"sv": "Swedish",
516
"th": "Thai",
517
"tr": "Turkish",
518
"uk": "Ukrainian",
519
"vi": "Vietnamese",
520
"zh": "Chinese",
521
}
522
523
524
def HTTPBasicAuth(username, password):
525
authorization = b"Basic " + binascii.b2a_base64(
526
f"{username}:{password}".encode("latin1"), newline=False)
527
del username, password
528
529
def _apply(request):
530
request.headers["Authorization"] = authorization
531
return request
532
return _apply
533
534
535
class ModuleProxy():
536
__slots__ = ()
537
538
def __getitem__(self, key, modules=sys.modules):
539
try:
540
return modules[key]
541
except KeyError:
542
pass
543
try:
544
__import__(key)
545
except ImportError:
546
modules[key] = NONE
547
return NONE
548
return modules[key]
549
550
__getattr__ = __getitem__
551
552
553
class LazyPrompt():
554
__slots__ = ()
555
556
def __str__(self):
557
return getpass.getpass()
558
559
560
class NullContext():
561
__slots__ = ()
562
563
def __enter__(self):
564
return None
565
566
def __exit__(self, exc_type, exc_value, traceback):
567
pass
568
569
570
class NullResponse():
571
__slots__ = ("url", "reason")
572
573
ok = is_redirect = is_permanent_redirect = False
574
cookies = headers = history = links = {}
575
encoding = apparent_encoding = "utf-8"
576
content = b""
577
text = ""
578
status_code = 900
579
close = noop
580
581
def __init__(self, url, reason=""):
582
self.url = url
583
self.reason = str(reason)
584
585
def __enter__(self):
586
return self
587
588
def __exit__(self, exc_type, exc_value, traceback):
589
pass
590
591
def __str__(self):
592
return "900 " + self.reason
593
594
def json(self):
595
return {}
596
597
598
class CustomNone():
599
"""None-style type that supports more operations than regular None"""
600
__slots__ = ()
601
602
__getattribute__ = identity
603
__getitem__ = identity
604
__iter__ = identity
605
606
def __call__(self, *args, **kwargs):
607
return self
608
609
def __next__(self):
610
raise StopIteration
611
612
def __eq__(self, other):
613
return other is self or other is None
614
615
def __ne__(self, other):
616
return other is not self and other is not None
617
618
__lt__ = true
619
__le__ = true
620
__gt__ = false
621
__ge__ = false
622
__bool__ = false
623
624
__add__ = identity
625
__sub__ = identity
626
__mul__ = identity
627
__matmul__ = identity
628
__truediv__ = identity
629
__floordiv__ = identity
630
__mod__ = identity
631
632
__radd__ = identity
633
__rsub__ = identity
634
__rmul__ = identity
635
__rmatmul__ = identity
636
__rtruediv__ = identity
637
__rfloordiv__ = identity
638
__rmod__ = identity
639
640
__lshift__ = identity
641
__rshift__ = identity
642
__and__ = identity
643
__xor__ = identity
644
__or__ = identity
645
646
__rlshift__ = identity
647
__rrshift__ = identity
648
__rand__ = identity
649
__rxor__ = identity
650
__ror__ = identity
651
652
__neg__ = identity
653
__pos__ = identity
654
__abs__ = identity
655
__invert__ = identity
656
657
def __len__(self):
658
return 0
659
660
__int__ = __len__
661
__hash__ = __len__
662
__index__ = __len__
663
664
def __format__(self, _):
665
return "None"
666
667
def __str__(self):
668
return "None"
669
670
__repr__ = __str__
671
672
673
class Flags():
674
675
def __init__(self):
676
self.FILE = self.POST = self.CHILD = self.DOWNLOAD = None
677
678
def process(self, flag):
679
value = self.__dict__[flag]
680
if value is False: # flag was set to "skip"
681
return "skip"
682
self.__dict__[flag] = None
683
684
if value == "abort":
685
raise exception.AbortExtraction()
686
if value == "terminate":
687
raise exception.TerminateExtraction()
688
if value == "restart":
689
raise exception.RestartExtraction()
690
raise exception.StopExtraction()
691
692
693
# v137.0 release of Firefox on 2025-04-01 has ordinal 739342
694
# 735506 == 739342 - 137 * 28
695
# v135.0 release of Chrome on 2025-04-01 has ordinal 739342
696
# 735562 == 739342 - 135 * 28
697
# _ord_today = dt.date.today().toordinal()
698
# _ff_ver = (_ord_today - 735506) // 28
699
# _ch_ver = (_ord_today - 735562) // 28
700
701
_ord_today = dt.date.today().toordinal()
702
_ff_ver = (_ord_today - 735_513) // 28 # 147 on 2026-01-13
703
_ch_ver = (_ord_today - 735_599) // 28 # 143 on 2025-12-18
704
705
re = text.re
706
re_compile = text.re_compile
707
708
NONE = CustomNone()
709
FLAGS = Flags()
710
WINDOWS = (os.name == "nt")
711
SENTINEL = object()
712
EXECUTABLE = getattr(sys, "frozen", False)
713
SPECIAL_EXTRACTORS = {"oauth", "recursive", "generic"}
714
715
EXTS_IMAGE = {"jpg", "jpeg", "png", "gif", "bmp", "svg", "psd", "ico",
716
"webp", "avif", "heic", "heif"}
717
EXTS_VIDEO = {"mp4", "m4v", "mov", "webm", "mkv", "ogv", "flv", "avi", "wmv"}
718
EXTS_ARCHIVE = {"zip", "rar", "7z", "tar", "gz", "bz2", "lzma", "xz"}
719
720
USERAGENT_GALLERYDL = "gallery-dl/" + version.__version__
721
USERAGENT_FIREFOX = (f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; "
722
f"rv:{_ff_ver}.0) Gecko/20100101 Firefox/{_ff_ver}.0")
723
USERAGENT_CHROME = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
724
"AppleWebKit/537.36 (KHTML, like Gecko) "
725
f"Chrome/{_ch_ver}.0.0.0 Safari/537.36")
726
727
GLOBALS = {
728
"contains" : contains,
729
"parse_int": text.parse_int,
730
"urlsplit" : urllib.parse.urlsplit,
731
"datetime" : dt.datetime,
732
"timedelta": dt.timedelta,
733
"abort" : raises(exception.StopExtraction),
734
"error" : raises(exception.AbortExtraction),
735
"terminate": raises(exception.TerminateExtraction),
736
"restart" : raises(exception.RestartExtraction),
737
"hash_sha1": sha1,
738
"hash_md5" : md5,
739
"std" : ModuleProxy(),
740
"re" : text.re_module,
741
"exts_image" : EXTS_IMAGE,
742
"exts_video" : EXTS_VIDEO,
743
"exts_archive": EXTS_ARCHIVE,
744
}
745
746
747
if EXECUTABLE and hasattr(sys, "_MEIPASS"):
748
# https://github.com/pyinstaller/pyinstaller/blob/develop/doc
749
# /runtime-information.rst#ld_library_path--libpath-considerations
750
_popen_env = os.environ.copy()
751
752
orig = _popen_env.get("LD_LIBRARY_PATH_ORIG")
753
if orig is None:
754
_popen_env.pop("LD_LIBRARY_PATH", None)
755
else:
756
_popen_env["LD_LIBRARY_PATH"] = orig
757
758
orig = _popen_env.get("DYLD_LIBRARY_PATH_ORIG")
759
if orig is None:
760
_popen_env.pop("DYLD_LIBRARY_PATH", None)
761
else:
762
_popen_env["DYLD_LIBRARY_PATH"] = orig
763
764
del orig
765
766
def Popen(args, **kwargs):
767
kwargs["env"] = _popen_env
768
return subprocess.Popen(args, **kwargs)
769
else:
770
Popen = subprocess.Popen
771
772
773
def compile_expression_raw(expr, name="<expr>", globals=None):
774
code_object = compile(expr, name, "eval")
775
return functools.partial(eval, code_object, globals or GLOBALS)
776
777
778
def compile_expression_defaultdict(expr, name="<expr>", globals=None):
779
global GLOBALS_DEFAULT
780
781
if isinstance(__builtins__, dict):
782
# cpython
783
GLOBALS_DEFAULT = collections.defaultdict(lambda n=NONE: n, GLOBALS)
784
else:
785
# pypy3 - insert __builtins__ symbols into globals dict
786
GLOBALS_DEFAULT = collections.defaultdict(
787
lambda n=NONE: n, __builtins__.__dict__)
788
GLOBALS_DEFAULT.update(GLOBALS)
789
790
global compile_expression_defaultdict
791
compile_expression_defaultdict = compile_expression_defaultdict_impl
792
return compile_expression_defaultdict_impl(expr, name, globals)
793
794
795
def compile_expression_defaultdict_impl(expr, name="<expr>", globals=None):
796
code_object = compile(expr, name, "eval")
797
return functools.partial(eval, code_object, globals or GLOBALS_DEFAULT)
798
799
800
def compile_expression_tryexcept(expr, name="<expr>", globals=None):
801
code_object = compile(expr, name, "eval")
802
if globals is None:
803
globals = GLOBALS
804
805
def _eval(locals=None):
806
try:
807
return eval(code_object, globals, locals)
808
except exception.GalleryDLException:
809
raise
810
except Exception:
811
return NONE
812
813
return _eval
814
815
816
compile_expression = compile_expression_tryexcept
817
818
819
def compile_filter(expr, name="<filter>", globals=None):
820
if not isinstance(expr, str):
821
expr = f"({') and ('.join(expr)})"
822
return compile_expression(expr, name, globals)
823
824
825
def import_file(path):
826
"""Import a Python module from a filesystem path"""
827
path, name = os.path.split(path)
828
829
name, sep, ext = name.rpartition(".")
830
if not sep:
831
name = ext
832
833
if path:
834
path = expand_path(path)
835
sys.path.insert(0, path)
836
try:
837
return __import__(name)
838
finally:
839
del sys.path[0]
840
else:
841
return __import__(name.replace("-", "_"))
842
843
844
def build_selection_func(value, min=0.0, conv=float):
845
if not value:
846
if min:
847
return lambda: min
848
return None
849
850
if isinstance(value, str):
851
lower, _, upper = value.partition("-")
852
else:
853
try:
854
lower, upper = value
855
except TypeError:
856
lower, upper = value, None
857
lower = conv(lower)
858
859
if upper:
860
upper = conv(upper)
861
return functools.partial(
862
random.uniform if lower.__class__ is float else random.randint,
863
lower if lower > min else min,
864
upper if upper > min else min,
865
)
866
else:
867
if lower < min:
868
lower = min
869
return lambda: lower
870
871
872
build_duration_func = build_selection_func
873
874
875
def build_extractor_filter(categories, negate=True, special=None):
876
"""Build a function that takes an Extractor class as argument
877
and returns True if that class is allowed by 'categories'
878
"""
879
if isinstance(categories, str):
880
categories = categories.split(",")
881
882
catset = set() # set of categories / basecategories
883
subset = set() # set of subcategories
884
catsub = [] # list of category-subcategory pairs
885
886
for item in categories:
887
category, _, subcategory = item.partition(":")
888
if category and category != "*":
889
if subcategory and subcategory != "*":
890
catsub.append((category, subcategory))
891
else:
892
catset.add(category)
893
elif subcategory and subcategory != "*":
894
subset.add(subcategory)
895
896
if special:
897
catset |= special
898
elif not catset and not subset and not catsub:
899
return true if negate else false
900
901
tests = []
902
903
if negate:
904
if catset:
905
tests.append(lambda extr:
906
extr.category not in catset and
907
extr.basecategory not in catset)
908
if subset:
909
tests.append(lambda extr: extr.subcategory not in subset)
910
else:
911
if catset:
912
tests.append(lambda extr:
913
extr.category in catset or
914
extr.basecategory in catset)
915
if subset:
916
tests.append(lambda extr: extr.subcategory in subset)
917
918
if catsub:
919
def test(extr):
920
for category, subcategory in catsub:
921
if subcategory == extr.subcategory and (
922
category == extr.category or
923
category == extr.basecategory):
924
return not negate
925
return negate
926
tests.append(test)
927
928
if len(tests) == 1:
929
return tests[0]
930
if negate:
931
return lambda extr: all(t(extr) for t in tests)
932
else:
933
return lambda extr: any(t(extr) for t in tests)
934
935
936
def build_proxy_map(proxies, log=None):
937
"""Generate a proxy map"""
938
if not proxies:
939
return None
940
941
if isinstance(proxies, str):
942
if "://" not in proxies:
943
proxies = "http://" + proxies.lstrip("/")
944
proxies = {"http": proxies, "https": proxies}
945
elif isinstance(proxies, dict):
946
for scheme, proxy in proxies.items():
947
if "://" not in proxy:
948
proxies[scheme] = "http://" + proxy.lstrip("/")
949
else:
950
proxies = None
951
952
if log is not None:
953
if proxies is None:
954
log.warning("Invalid proxy specifier: %r", proxies)
955
else:
956
log.debug("Proxy Map: %s", proxies)
957
958
return proxies
959
960
961
def predicate_build(predicates):
962
if not predicates:
963
return true
964
965
if len(predicates) == 1:
966
return predicates[0]
967
968
def chain(url, kwdict):
969
for pred in predicates:
970
if not pred(url, kwdict):
971
return False
972
return True
973
return chain
974
975
976
def predicate_unique():
977
"""Predicate; True if given URL has not been encountered before"""
978
def _pred(url, _):
979
if url.startswith("text:"):
980
return True
981
if url not in urls:
982
urls.add(url)
983
return True
984
return False
985
urls = set()
986
return _pred
987
988
989
def predicate_filter(expr, target="image"):
990
"""Predicate; True if evaluating the given expression returns True"""
991
def _pred(_, kwdict):
992
try:
993
return expr(kwdict)
994
except exception.GalleryDLException:
995
raise
996
except Exception as exc:
997
raise exception.FilterError(exc)
998
expr = compile_filter(expr, f"<{target} filter>")
999
return _pred
1000
1001
1002
def predicate_range(ranges, skip=None, flag=None):
1003
"""Predicate; True if the current index is in the given range(s)"""
1004
if ranges := predicate_range_parse(ranges):
1005
# technically wrong for 'step > 2', but good enough for now
1006
# and evaluating min/max for a large range is slow
1007
upper = max(r.stop for r in ranges) - 1
1008
lower = min(r.start for r in ranges)
1009
index = 0 if skip is None or lower <= 1 else skip(lower)
1010
del lower
1011
else:
1012
index = upper = 0
1013
1014
if flag is None:
1015
def _pred(_url, _kwdict):
1016
nonlocal index
1017
1018
if index >= upper:
1019
raise exception.StopExtraction()
1020
index += 1
1021
1022
for range in ranges:
1023
if index in range:
1024
return True
1025
return False
1026
else:
1027
def _pred(_url, _kwdict):
1028
nonlocal index
1029
1030
index += 1
1031
if index >= upper:
1032
if index > upper:
1033
raise exception.StopExtraction()
1034
FLAGS.__dict__[flag.upper()] = "stop"
1035
1036
for range in ranges:
1037
if index in range:
1038
return True
1039
return False
1040
return _pred
1041
1042
1043
def predicate_range_parse(rangespec):
1044
"""Parse an integer range string and return the resulting ranges
1045
1046
Examples:
1047
_parse("-2,4,6-8,10-") -> [(1,3), (4,5), (6,9), (10,INTMAX)]
1048
_parse(" - 3 , 4- 4, 2-6") -> [(1,4), (4,5), (2,7)]
1049
_parse("1:2,4:8:2") -> [(1,1), (4,7,2)]
1050
"""
1051
ranges = []
1052
1053
if isinstance(rangespec, str):
1054
rangespec = rangespec.split(",")
1055
elif isinstance(rangespec, int):
1056
rangespec = (str(rangespec),)
1057
1058
for group in rangespec:
1059
if not group:
1060
continue
1061
1062
elif ":" in group:
1063
start, _, stop = group.partition(":")
1064
stop, _, step = stop.partition(":")
1065
ranges.append(range(
1066
int(start) if start.strip() else 1,
1067
int(stop) if stop.strip() else sys.maxsize,
1068
int(step) if step.strip() else 1,
1069
))
1070
1071
elif "-" in group:
1072
start, _, stop = group.partition("-")
1073
ranges.append(range(
1074
int(start) if start.strip() else 1,
1075
int(stop) + 1 if stop.strip() else sys.maxsize,
1076
))
1077
1078
else:
1079
start = int(group)
1080
ranges.append(range(start, start+1))
1081
1082
return ranges
1083
1084