Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/util.py
5457 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2017-2025 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
"""Utility functions and classes"""
10
11
import os
12
import sys
13
import json
14
import time
15
import random
16
import getpass
17
import hashlib
18
import binascii
19
import datetime
20
import functools
21
import itertools
22
import subprocess
23
import collections
24
import urllib.parse
25
from http.cookiejar import Cookie
26
from email.utils import mktime_tz, parsedate_tz
27
from . import text, version, exception
28
29
30
def bencode(num, alphabet="0123456789"):
31
"""Encode an integer into a base-N encoded string"""
32
data = ""
33
base = len(alphabet)
34
while num:
35
num, remainder = divmod(num, base)
36
data = alphabet[remainder] + data
37
return data
38
39
40
def bdecode(data, alphabet="0123456789"):
41
"""Decode a base-N encoded string ( N = len(alphabet) )"""
42
num = 0
43
base = len(alphabet)
44
for c in data:
45
num = num * base + alphabet.find(c)
46
return num
47
48
49
def decrypt_xor(encrypted, key, base64=True, fromhex=False):
50
if base64:
51
encrypted = binascii.a2b_base64(encrypted)
52
if fromhex:
53
encrypted = bytes.fromhex(encrypted.decode())
54
55
div = len(key)
56
return bytes([
57
encrypted[i] ^ key[i % div]
58
for i in range(len(encrypted))
59
]).decode()
60
61
62
def advance(iterable, num):
63
""""Advance 'iterable' by 'num' steps"""
64
iterator = iter(iterable)
65
next(itertools.islice(iterator, num, num), None)
66
return iterator
67
68
69
def repeat(times):
70
"""Return an iterator that returns None"""
71
if times < 0:
72
return itertools.repeat(None)
73
return itertools.repeat(None, times)
74
75
76
def unique(iterable):
77
"""Yield unique elements from 'iterable' while preserving order"""
78
seen = set()
79
add = seen.add
80
for element in iterable:
81
if element not in seen:
82
add(element)
83
yield element
84
85
86
def unique_sequence(iterable):
87
"""Yield sequentially unique elements from 'iterable'"""
88
last = None
89
for element in iterable:
90
if element != last:
91
last = element
92
yield element
93
94
95
def contains(values, elements, separator=" "):
96
"""Returns True if at least one of 'elements' is contained in 'values'"""
97
if isinstance(values, str) and (separator or separator is None):
98
values = values.split(separator)
99
100
if not isinstance(elements, (tuple, list)):
101
return elements in values
102
103
for e in elements:
104
if e in values:
105
return True
106
return False
107
108
109
def raises(cls):
110
"""Returns a function that raises 'cls' as exception"""
111
def wrap(*args):
112
raise cls(*args)
113
return wrap
114
115
116
def identity(x, _=None):
117
"""Returns its argument"""
118
return x
119
120
121
def true(_, __=None):
122
"""Always returns True"""
123
return True
124
125
126
def false(_, __=None):
127
"""Always returns False"""
128
return False
129
130
131
def noop(_=None):
132
"""Does nothing"""
133
134
135
def md5(s):
136
"""Generate MD5 hexdigest of 's'"""
137
if not s:
138
s = b""
139
elif isinstance(s, str):
140
s = s.encode()
141
return hashlib.md5(s).hexdigest()
142
143
144
def sha1(s):
145
"""Generate SHA1 hexdigest of 's'"""
146
if not s:
147
s = b""
148
elif isinstance(s, str):
149
s = s.encode()
150
return hashlib.sha1(s).hexdigest()
151
152
153
def generate_token(size=16):
154
"""Generate a random token with hexadecimal digits"""
155
return random.getrandbits(size * 8).to_bytes(size, "big").hex()
156
157
158
def format_value(value, suffixes="kMGTPEZY"):
159
value = str(value)
160
value_len = len(value)
161
index = value_len - 4
162
if index >= 0:
163
offset = (value_len - 1) % 3 + 1
164
return (f"{value[:offset]}.{value[offset:offset+2]}"
165
f"{suffixes[index // 3]}")
166
return value
167
168
169
def combine_dict(a, b):
170
"""Recursively combine the contents of 'b' into 'a'"""
171
for key, value in b.items():
172
if key in a and isinstance(value, dict) and isinstance(a[key], dict):
173
combine_dict(a[key], value)
174
else:
175
a[key] = value
176
return a
177
178
179
def transform_dict(a, func):
180
"""Recursively apply 'func' to all values in 'a'"""
181
for key, value in a.items():
182
if isinstance(value, dict):
183
transform_dict(value, func)
184
else:
185
a[key] = func(value)
186
187
188
def filter_dict(a):
189
"""Return a copy of 'a' without "private" entries"""
190
return {k: v for k, v in a.items() if k[0] != "_"}
191
192
193
def delete_items(obj, keys):
194
"""Remove all 'keys' from 'obj'"""
195
for key in keys:
196
if key in obj:
197
del obj[key]
198
199
200
def enumerate_reversed(iterable, start=0, length=None):
201
"""Enumerate 'iterable' and return its elements in reverse order"""
202
if length is None:
203
length = len(iterable)
204
205
try:
206
iterable = zip(range(start-1+length, start-1, -1), reversed(iterable))
207
except TypeError:
208
iterable = list(zip(range(start, start+length), iterable))
209
iterable.reverse()
210
211
return iterable
212
213
214
def number_to_string(value, numbers=(int, float)):
215
"""Convert numbers (int, float) to string; Return everything else as is."""
216
return str(value) if value.__class__ in numbers else value
217
218
219
def to_string(value):
220
"""str() with "better" defaults"""
221
if not value:
222
return ""
223
if value.__class__ is list:
224
try:
225
return ", ".join(value)
226
except Exception:
227
return ", ".join(map(str, value))
228
return str(value)
229
230
231
def to_datetime(value):
232
"""Convert 'value' to a datetime object"""
233
if not value:
234
return EPOCH
235
236
if isinstance(value, datetime.datetime):
237
return value
238
239
if isinstance(value, str):
240
try:
241
if value[-1] == "Z":
242
# compat for Python < 3.11
243
value = value[:-1]
244
dt = datetime.datetime.fromisoformat(value)
245
if dt.tzinfo is None:
246
if dt.microsecond:
247
dt = dt.replace(microsecond=0)
248
else:
249
# convert to naive UTC
250
dt = dt.astimezone(datetime.timezone.utc).replace(
251
microsecond=0, tzinfo=None)
252
return dt
253
except Exception:
254
pass
255
256
return text.parse_timestamp(value, EPOCH)
257
258
259
def datetime_to_timestamp(dt):
260
"""Convert naive UTC datetime to Unix timestamp"""
261
return (dt - EPOCH) / SECOND
262
263
264
def datetime_to_timestamp_string(dt):
265
"""Convert naive UTC datetime to Unix timestamp string"""
266
try:
267
return str((dt - EPOCH) // SECOND)
268
except Exception:
269
return ""
270
271
272
if sys.hexversion < 0x30c0000:
273
# Python <= 3.11
274
datetime_utcfromtimestamp = datetime.datetime.utcfromtimestamp
275
datetime_utcnow = datetime.datetime.utcnow
276
datetime_from_timestamp = datetime_utcfromtimestamp
277
else:
278
# Python >= 3.12
279
def datetime_from_timestamp(ts=None):
280
"""Convert Unix timestamp to naive UTC datetime"""
281
Y, m, d, H, M, S, _, _, _ = time.gmtime(ts)
282
return datetime.datetime(Y, m, d, H, M, S)
283
284
datetime_utcfromtimestamp = datetime_from_timestamp
285
datetime_utcnow = datetime_from_timestamp
286
287
288
def json_default(obj):
289
if isinstance(obj, CustomNone):
290
return None
291
return str(obj)
292
293
294
json_loads = json._default_decoder.decode
295
json_dumps = json.JSONEncoder(
296
check_circular=False,
297
separators=(",", ":"),
298
default=json_default,
299
).encode
300
301
302
def dump_json(obj, fp=sys.stdout, ensure_ascii=True, indent=4):
303
"""Serialize 'obj' as JSON and write it to 'fp'"""
304
json.dump(
305
obj, fp,
306
ensure_ascii=ensure_ascii,
307
indent=indent,
308
default=json_default,
309
sort_keys=True,
310
)
311
fp.write("\n")
312
313
314
def dump_response(response, fp, headers=False, content=True, hide_auth=True):
315
"""Write the contents of 'response' into a file-like object"""
316
317
if headers:
318
request = response.request
319
req_headers = request.headers.copy()
320
res_headers = response.headers.copy()
321
322
if hide_auth:
323
if authorization := req_headers.get("Authorization"):
324
atype, sep, _ = str(authorization).partition(" ")
325
req_headers["Authorization"] = f"{atype} ***" if sep else "***"
326
327
if cookie := req_headers.get("Cookie"):
328
req_headers["Cookie"] = ";".join(
329
c.partition("=")[0] + "=***"
330
for c in cookie.split(";")
331
)
332
333
if set_cookie := res_headers.get("Set-Cookie"):
334
res_headers["Set-Cookie"] = re(r"(^|, )([^ =]+)=[^,;]*").sub(
335
r"\1\2=***", set_cookie)
336
337
request_headers = "\n".join(
338
f"{name}: {value}"
339
for name, value in req_headers.items()
340
)
341
response_headers = "\n".join(
342
f"{name}: {value}"
343
for name, value in res_headers.items()
344
)
345
346
output = f"""\
347
{request.method} {request.url}
348
Status: {response.status_code} {response.reason}
349
350
Request Headers
351
---------------
352
{request_headers}
353
"""
354
if request.body:
355
output = f"""{output}
356
Request Body
357
------------
358
{request.body}
359
"""
360
output = f"""{output}
361
Response Headers
362
----------------
363
{response_headers}
364
"""
365
fp.write(output.encode())
366
367
if content:
368
if headers:
369
fp.write(b"\nContent\n-------\n")
370
fp.write(response.content)
371
372
373
def extract_headers(response):
374
headers = response.headers
375
data = dict(headers)
376
377
if hcd := headers.get("content-disposition"):
378
if name := text.extr(hcd, 'filename="', '"'):
379
text.nameext_from_url(name, data)
380
381
if hlm := headers.get("last-modified"):
382
data["date"] = datetime.datetime(*parsedate_tz(hlm)[:6])
383
384
return data
385
386
387
def detect_challenge(response):
388
server = response.headers.get("server")
389
if not server:
390
return
391
392
elif server.startswith("cloudflare"):
393
if response.status_code not in (403, 503):
394
return
395
396
mitigated = response.headers.get("cf-mitigated")
397
if mitigated and mitigated.lower() == "challenge":
398
return "Cloudflare challenge"
399
400
content = response.content
401
if b"_cf_chl_opt" in content or b"jschl-answer" in content:
402
return "Cloudflare challenge"
403
elif b'name="captcha-bypass"' in content:
404
return "Cloudflare CAPTCHA"
405
406
elif server.startswith("ddos-guard"):
407
if response.status_code == 403 and \
408
b"/ddos-guard/js-challenge/" in response.content:
409
return "DDoS-Guard challenge"
410
411
412
@functools.lru_cache(maxsize=None)
413
def git_head():
414
try:
415
out, err = Popen(
416
("git", "rev-parse", "--short", "HEAD"),
417
stdout=subprocess.PIPE,
418
stderr=subprocess.PIPE,
419
cwd=os.path.dirname(os.path.abspath(__file__)),
420
).communicate()
421
if out and not err:
422
return out.decode().rstrip()
423
except (OSError, subprocess.SubprocessError):
424
pass
425
return None
426
427
428
def expand_path(path):
429
"""Expand environment variables and tildes (~)"""
430
if not path:
431
return path
432
if not isinstance(path, str):
433
path = os.path.join(*path)
434
return os.path.expandvars(os.path.expanduser(path))
435
436
437
def remove_file(path):
438
try:
439
os.unlink(path)
440
except OSError:
441
pass
442
443
444
def remove_directory(path):
445
try:
446
os.rmdir(path)
447
except OSError:
448
pass
449
450
451
def set_mtime(path, mtime):
452
try:
453
if isinstance(mtime, str):
454
mtime = mktime_tz(parsedate_tz(mtime))
455
os.utime(path, (time.time(), mtime))
456
except Exception:
457
pass
458
459
460
def cookiestxt_load(fp):
461
"""Parse a Netscape cookies.txt file and add return its Cookies"""
462
cookies = []
463
464
for line in fp:
465
466
line = line.lstrip(" ")
467
# strip '#HttpOnly_'
468
if line.startswith("#HttpOnly_"):
469
line = line[10:]
470
# ignore empty lines and comments
471
if not line or line[0] in ("#", "$", "\n"):
472
continue
473
# strip trailing '\n'
474
if line[-1] == "\n":
475
line = line[:-1]
476
477
domain, domain_specified, path, secure, expires, name, value = \
478
line.split("\t")
479
480
if not name:
481
name = value
482
value = None
483
484
cookies.append(Cookie(
485
0, name, value,
486
None, False,
487
domain,
488
domain_specified == "TRUE",
489
domain[0] == "." if domain else False,
490
path, False,
491
secure == "TRUE",
492
None if expires == "0" or not expires else expires,
493
False, None, None, {},
494
))
495
496
return cookies
497
498
499
def cookiestxt_store(fp, cookies):
500
"""Write 'cookies' in Netscape cookies.txt format to 'fp'"""
501
fp.write("# Netscape HTTP Cookie File\n\n")
502
503
for cookie in cookies:
504
if not cookie.domain:
505
continue
506
507
if cookie.value is None:
508
name = ""
509
value = cookie.name
510
else:
511
name = cookie.name
512
value = cookie.value
513
514
domain = cookie.domain
515
fp.write(
516
f"{domain}\t"
517
f"{'TRUE' if domain and domain[0] == '.' else 'FALSE'}\t"
518
f"{cookie.path}\t"
519
f"{'TRUE' if cookie.secure else 'FALSE'}\t"
520
f"{'0' if cookie.expires is None else str(cookie.expires)}\t"
521
f"{name}\t"
522
f"{value}\n"
523
)
524
525
526
def code_to_language(code, default=None):
527
"""Map an ISO 639-1 language code to its actual name"""
528
return CODES.get((code or "").lower(), default)
529
530
531
def language_to_code(lang, default=None):
532
"""Map a language name to its ISO 639-1 code"""
533
if lang is None:
534
return default
535
lang = lang.capitalize()
536
for code, language in CODES.items():
537
if language == lang:
538
return code
539
return default
540
541
542
CODES = {
543
"ar": "Arabic",
544
"bg": "Bulgarian",
545
"ca": "Catalan",
546
"cs": "Czech",
547
"da": "Danish",
548
"de": "German",
549
"el": "Greek",
550
"en": "English",
551
"es": "Spanish",
552
"fi": "Finnish",
553
"fr": "French",
554
"he": "Hebrew",
555
"hu": "Hungarian",
556
"id": "Indonesian",
557
"it": "Italian",
558
"ja": "Japanese",
559
"ko": "Korean",
560
"ms": "Malay",
561
"nl": "Dutch",
562
"no": "Norwegian",
563
"pl": "Polish",
564
"pt": "Portuguese",
565
"ro": "Romanian",
566
"ru": "Russian",
567
"sv": "Swedish",
568
"th": "Thai",
569
"tr": "Turkish",
570
"vi": "Vietnamese",
571
"zh": "Chinese",
572
}
573
574
575
class HTTPBasicAuth():
576
__slots__ = ("authorization",)
577
578
def __init__(self, username, password):
579
self.authorization = b"Basic " + binascii.b2a_base64(
580
f"{username}:{password}".encode("latin1"), newline=False)
581
582
def __call__(self, request):
583
request.headers["Authorization"] = self.authorization
584
return request
585
586
587
class ModuleProxy():
588
__slots__ = ()
589
590
def __getitem__(self, key, modules=sys.modules):
591
try:
592
return modules[key]
593
except KeyError:
594
pass
595
try:
596
__import__(key)
597
except ImportError:
598
modules[key] = NONE
599
return NONE
600
return modules[key]
601
602
__getattr__ = __getitem__
603
604
605
class LazyPrompt():
606
__slots__ = ()
607
608
def __str__(self):
609
return getpass.getpass()
610
611
612
class NullContext():
613
__slots__ = ()
614
615
def __enter__(self):
616
return None
617
618
def __exit__(self, exc_type, exc_value, traceback):
619
pass
620
621
622
class NullResponse():
623
__slots__ = ("url", "reason")
624
625
ok = is_redirect = is_permanent_redirect = False
626
cookies = headers = history = links = {}
627
encoding = apparent_encoding = "utf-8"
628
content = b""
629
text = ""
630
status_code = 900
631
close = noop
632
633
def __init__(self, url, reason=""):
634
self.url = url
635
self.reason = str(reason)
636
637
def __str__(self):
638
return "900 " + self.reason
639
640
def json(self):
641
return {}
642
643
644
class CustomNone():
645
"""None-style type that supports more operations than regular None"""
646
__slots__ = ()
647
648
__getattribute__ = identity
649
__getitem__ = identity
650
__iter__ = identity
651
652
def __call__(self, *args, **kwargs):
653
return self
654
655
def __next__(self):
656
raise StopIteration
657
658
def __eq__(self, other):
659
return other is self or other is None
660
661
def __ne__(self, other):
662
return other is not self and other is not None
663
664
__lt__ = true
665
__le__ = true
666
__gt__ = false
667
__ge__ = false
668
__bool__ = false
669
670
__add__ = identity
671
__sub__ = identity
672
__mul__ = identity
673
__matmul__ = identity
674
__truediv__ = identity
675
__floordiv__ = identity
676
__mod__ = identity
677
678
__radd__ = identity
679
__rsub__ = identity
680
__rmul__ = identity
681
__rmatmul__ = identity
682
__rtruediv__ = identity
683
__rfloordiv__ = identity
684
__rmod__ = identity
685
686
__lshift__ = identity
687
__rshift__ = identity
688
__and__ = identity
689
__xor__ = identity
690
__or__ = identity
691
692
__rlshift__ = identity
693
__rrshift__ = identity
694
__rand__ = identity
695
__rxor__ = identity
696
__ror__ = identity
697
698
__neg__ = identity
699
__pos__ = identity
700
__abs__ = identity
701
__invert__ = identity
702
703
def __len__(self):
704
return 0
705
706
__int__ = __len__
707
__hash__ = __len__
708
__index__ = __len__
709
710
def __format__(self, _):
711
return "None"
712
713
def __str__(self):
714
return "None"
715
716
__repr__ = __str__
717
718
719
class Flags():
720
721
def __init__(self):
722
self.FILE = self.POST = self.CHILD = self.DOWNLOAD = None
723
724
def process(self, flag):
725
value = self.__dict__[flag]
726
self.__dict__[flag] = None
727
728
if value == "abort":
729
raise exception.AbortExtraction()
730
if value == "terminate":
731
raise exception.TerminateExtraction()
732
if value == "restart":
733
raise exception.RestartExtraction()
734
raise exception.StopExtraction()
735
736
737
# v137.0 release of Firefox on 2025-04-01 has ordinal 739342
738
# 735506 == 739342 - 137 * 28
739
# v135.0 release of Chrome on 2025-04-01 has ordinal 739342
740
# 735562 == 739342 - 135 * 28
741
# _ord_today = datetime.date.today().toordinal()
742
# _ff_ver = (_ord_today - 735506) // 28
743
# _ch_ver = (_ord_today - 735562) // 28
744
745
_ff_ver = (datetime.date.today().toordinal() - 735506) // 28
746
# _ch_ver = _ff_ver - 2
747
748
re = text.re
749
re_compile = text.re_compile
750
751
NONE = CustomNone()
752
FLAGS = Flags()
753
EPOCH = datetime.datetime(1970, 1, 1)
754
SECOND = datetime.timedelta(0, 1)
755
WINDOWS = (os.name == "nt")
756
SENTINEL = object()
757
EXECUTABLE = getattr(sys, "frozen", False)
758
SPECIAL_EXTRACTORS = {"oauth", "recursive", "generic"}
759
760
EXTS_IMAGE = {"jpg", "jpeg", "png", "gif", "bmp", "svg", "psd", "ico",
761
"webp", "avif", "heic", "heif"}
762
EXTS_VIDEO = {"mp4", "m4v", "mov", "webm", "mkv", "ogv", "flv", "avi", "wmv"}
763
EXTS_ARCHIVE = {"zip", "rar", "7z", "tar", "gz", "bz2", "lzma", "xz"}
764
765
USERAGENT = "gallery-dl/" + version.__version__
766
USERAGENT_FIREFOX = (f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; "
767
f"rv:{_ff_ver}.0) Gecko/20100101 Firefox/{_ff_ver}.0")
768
USERAGENT_CHROME = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
769
"AppleWebKit/537.36 (KHTML, like Gecko) "
770
f"Chrome/{_ff_ver - 2}.0.0.0 Safari/537.36")
771
772
GLOBALS = {
773
"contains" : contains,
774
"parse_int": text.parse_int,
775
"urlsplit" : urllib.parse.urlsplit,
776
"datetime" : datetime.datetime,
777
"timedelta": datetime.timedelta,
778
"abort" : raises(exception.StopExtraction),
779
"error" : raises(exception.AbortExtraction),
780
"terminate": raises(exception.TerminateExtraction),
781
"restart" : raises(exception.RestartExtraction),
782
"hash_sha1": sha1,
783
"hash_md5" : md5,
784
"std" : ModuleProxy(),
785
"re" : text.re_module,
786
"exts_image" : EXTS_IMAGE,
787
"exts_video" : EXTS_VIDEO,
788
"exts_archive": EXTS_ARCHIVE,
789
}
790
791
792
if EXECUTABLE and hasattr(sys, "_MEIPASS"):
793
# https://github.com/pyinstaller/pyinstaller/blob/develop/doc
794
# /runtime-information.rst#ld_library_path--libpath-considerations
795
_popen_env = os.environ.copy()
796
797
orig = _popen_env.get("LD_LIBRARY_PATH_ORIG")
798
if orig is None:
799
_popen_env.pop("LD_LIBRARY_PATH", None)
800
else:
801
_popen_env["LD_LIBRARY_PATH"] = orig
802
803
orig = _popen_env.get("DYLD_LIBRARY_PATH_ORIG")
804
if orig is None:
805
_popen_env.pop("DYLD_LIBRARY_PATH", None)
806
else:
807
_popen_env["DYLD_LIBRARY_PATH"] = orig
808
809
del orig
810
811
class Popen(subprocess.Popen):
812
def __init__(self, args, **kwargs):
813
kwargs["env"] = _popen_env
814
subprocess.Popen.__init__(self, args, **kwargs)
815
else:
816
Popen = subprocess.Popen
817
818
819
def compile_expression_raw(expr, name="<expr>", globals=None):
820
code_object = compile(expr, name, "eval")
821
return functools.partial(eval, code_object, globals or GLOBALS)
822
823
824
def compile_expression_defaultdict(expr, name="<expr>", globals=None):
825
global GLOBALS_DEFAULT
826
827
if isinstance(__builtins__, dict):
828
# cpython
829
GLOBALS_DEFAULT = collections.defaultdict(lambda n=NONE: n, GLOBALS)
830
else:
831
# pypy3 - insert __builtins__ symbols into globals dict
832
GLOBALS_DEFAULT = collections.defaultdict(
833
lambda n=NONE: n, __builtins__.__dict__)
834
GLOBALS_DEFAULT.update(GLOBALS)
835
836
global compile_expression_defaultdict
837
compile_expression_defaultdict = compile_expression_defaultdict_impl
838
return compile_expression_defaultdict_impl(expr, name, globals)
839
840
841
def compile_expression_defaultdict_impl(expr, name="<expr>", globals=None):
842
code_object = compile(expr, name, "eval")
843
return functools.partial(eval, code_object, globals or GLOBALS_DEFAULT)
844
845
846
def compile_expression_tryexcept(expr, name="<expr>", globals=None):
847
code_object = compile(expr, name, "eval")
848
if globals is None:
849
globals = GLOBALS
850
851
def _eval(locals=None):
852
try:
853
return eval(code_object, globals, locals)
854
except exception.GalleryDLException:
855
raise
856
except Exception:
857
return NONE
858
859
return _eval
860
861
862
compile_expression = compile_expression_tryexcept
863
864
865
def compile_filter(expr, name="<filter>", globals=None):
866
if not isinstance(expr, str):
867
expr = f"({') and ('.join(expr)})"
868
return compile_expression(expr, name, globals)
869
870
871
def import_file(path):
872
"""Import a Python module from a filesystem path"""
873
path, name = os.path.split(path)
874
875
name, sep, ext = name.rpartition(".")
876
if not sep:
877
name = ext
878
879
if path:
880
path = expand_path(path)
881
sys.path.insert(0, path)
882
try:
883
return __import__(name)
884
finally:
885
del sys.path[0]
886
else:
887
return __import__(name.replace("-", "_"))
888
889
890
def build_selection_func(value, min=0.0, conv=float):
891
if not value:
892
if min:
893
return lambda: min
894
return None
895
896
if isinstance(value, str):
897
lower, _, upper = value.partition("-")
898
else:
899
try:
900
lower, upper = value
901
except TypeError:
902
lower, upper = value, None
903
lower = conv(lower)
904
905
if upper:
906
upper = conv(upper)
907
return functools.partial(
908
random.uniform if lower.__class__ is float else random.randint,
909
lower if lower > min else min,
910
upper if upper > min else min,
911
)
912
else:
913
if lower < min:
914
lower = min
915
return lambda: lower
916
917
918
build_duration_func = build_selection_func
919
920
921
def build_extractor_filter(categories, negate=True, special=None):
922
"""Build a function that takes an Extractor class as argument
923
and returns True if that class is allowed by 'categories'
924
"""
925
if isinstance(categories, str):
926
categories = categories.split(",")
927
928
catset = set() # set of categories / basecategories
929
subset = set() # set of subcategories
930
catsub = [] # list of category-subcategory pairs
931
932
for item in categories:
933
category, _, subcategory = item.partition(":")
934
if category and category != "*":
935
if subcategory and subcategory != "*":
936
catsub.append((category, subcategory))
937
else:
938
catset.add(category)
939
elif subcategory and subcategory != "*":
940
subset.add(subcategory)
941
942
if special:
943
catset |= special
944
elif not catset and not subset and not catsub:
945
return true if negate else false
946
947
tests = []
948
949
if negate:
950
if catset:
951
tests.append(lambda extr:
952
extr.category not in catset and
953
extr.basecategory not in catset)
954
if subset:
955
tests.append(lambda extr: extr.subcategory not in subset)
956
else:
957
if catset:
958
tests.append(lambda extr:
959
extr.category in catset or
960
extr.basecategory in catset)
961
if subset:
962
tests.append(lambda extr: extr.subcategory in subset)
963
964
if catsub:
965
def test(extr):
966
for category, subcategory in catsub:
967
if subcategory == extr.subcategory and (
968
category == extr.category or
969
category == extr.basecategory):
970
return not negate
971
return negate
972
tests.append(test)
973
974
if len(tests) == 1:
975
return tests[0]
976
if negate:
977
return lambda extr: all(t(extr) for t in tests)
978
else:
979
return lambda extr: any(t(extr) for t in tests)
980
981
982
def build_proxy_map(proxies, log=None):
983
"""Generate a proxy map"""
984
if not proxies:
985
return None
986
987
if isinstance(proxies, str):
988
if "://" not in proxies:
989
proxies = "http://" + proxies.lstrip("/")
990
return {"http": proxies, "https": proxies}
991
992
if isinstance(proxies, dict):
993
for scheme, proxy in proxies.items():
994
if "://" not in proxy:
995
proxies[scheme] = "http://" + proxy.lstrip("/")
996
return proxies
997
998
if log is not None:
999
log.warning("invalid proxy specifier: %s", proxies)
1000
1001
1002
def build_predicate(predicates):
1003
if not predicates:
1004
return true
1005
elif len(predicates) == 1:
1006
return predicates[0]
1007
return functools.partial(chain_predicates, predicates)
1008
1009
1010
def chain_predicates(predicates, url, kwdict):
1011
for pred in predicates:
1012
if not pred(url, kwdict):
1013
return False
1014
return True
1015
1016
1017
class RangePredicate():
1018
"""Predicate; True if the current index is in the given range(s)"""
1019
1020
def __init__(self, rangespec):
1021
self.ranges = ranges = self._parse(rangespec)
1022
self.index = 0
1023
1024
if ranges:
1025
# technically wrong, but good enough for now
1026
# and evaluating min/max for a large range is slow
1027
self.lower = min(r.start for r in ranges)
1028
self.upper = max(r.stop for r in ranges) - 1
1029
else:
1030
self.lower = 0
1031
self.upper = 0
1032
1033
def __call__(self, _url, _kwdict):
1034
self.index = index = self.index + 1
1035
1036
if index > self.upper:
1037
raise exception.StopExtraction()
1038
1039
for range in self.ranges:
1040
if index in range:
1041
return True
1042
return False
1043
1044
def _parse(self, rangespec):
1045
"""Parse an integer range string and return the resulting ranges
1046
1047
Examples:
1048
_parse("-2,4,6-8,10-") -> [(1,3), (4,5), (6,9), (10,INTMAX)]
1049
_parse(" - 3 , 4- 4, 2-6") -> [(1,4), (4,5), (2,7)]
1050
_parse("1:2,4:8:2") -> [(1,1), (4,7,2)]
1051
"""
1052
ranges = []
1053
1054
if isinstance(rangespec, str):
1055
rangespec = rangespec.split(",")
1056
1057
for group in rangespec:
1058
if not group:
1059
continue
1060
1061
elif ":" in group:
1062
start, _, stop = group.partition(":")
1063
stop, _, step = stop.partition(":")
1064
ranges.append(range(
1065
int(start) if start.strip() else 1,
1066
int(stop) if stop.strip() else sys.maxsize,
1067
int(step) if step.strip() else 1,
1068
))
1069
1070
elif "-" in group:
1071
start, _, stop = group.partition("-")
1072
ranges.append(range(
1073
int(start) if start.strip() else 1,
1074
int(stop) + 1 if stop.strip() else sys.maxsize,
1075
))
1076
1077
else:
1078
start = int(group)
1079
ranges.append(range(start, start+1))
1080
1081
return ranges
1082
1083
1084
class UniquePredicate():
1085
"""Predicate; True if given URL has not been encountered before"""
1086
def __init__(self):
1087
self.urls = set()
1088
1089
def __call__(self, url, _):
1090
if url.startswith("text:"):
1091
return True
1092
if url not in self.urls:
1093
self.urls.add(url)
1094
return True
1095
return False
1096
1097
1098
class FilterPredicate():
1099
"""Predicate; True if evaluating the given expression returns True"""
1100
1101
def __init__(self, expr, target="image"):
1102
name = f"<{target} filter>"
1103
self.expr = compile_filter(expr, name)
1104
1105
def __call__(self, _, kwdict):
1106
try:
1107
return self.expr(kwdict)
1108
except exception.GalleryDLException:
1109
raise
1110
except Exception as exc:
1111
raise exception.FilterError(exc)
1112
1113