Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/test/test_results.py
5457 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
# Copyright 2015-2025 Mike Fährmann
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License version 2 as
8
# published by the Free Software Foundation.
9
10
import os
11
import sys
12
import unittest
13
14
import re
15
import json
16
import hashlib
17
import datetime
18
import collections
19
20
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
21
from gallery_dl import \
22
extractor, util, job, config, exception, formatter # noqa E402
23
24
25
RESULTS = os.environ.get("GDL_TEST_RESULTS")
26
if RESULTS:
27
results = util.import_file(RESULTS)
28
else:
29
from test import results
30
31
32
# temporary issues, etc.
33
BROKEN = {
34
}
35
36
CONFIG = {
37
"cache": {
38
"file": None,
39
},
40
"downloader": {
41
"adjust-extensions": False,
42
"part": False,
43
},
44
}
45
46
47
AUTH_REQUIRED = {
48
"pixiv",
49
"nijie",
50
"horne",
51
"reddit",
52
"seiga",
53
"fantia",
54
"instagram",
55
"twitter",
56
}
57
58
AUTH_KEYS = (
59
"username",
60
"cookies",
61
"api-key",
62
"client-id",
63
"access-token",
64
"refresh-token",
65
)
66
67
68
class TestExtractorResults(unittest.TestCase):
69
70
def setUp(self):
71
setup_test_config()
72
73
def tearDown(self):
74
config.clear()
75
76
@classmethod
77
def setUpClass(cls):
78
cls._skipped = []
79
80
@classmethod
81
def tearDownClass(cls):
82
if cls._skipped:
83
sys.stdout.write("\n\nSkipped tests:\n")
84
for url, reason in cls._skipped:
85
sys.stdout.write(f'- {url} ("{reason}")\n')
86
87
def assertRange(self, value, range, msg=None):
88
if range.step > 1:
89
self.assertIn(value, range, msg=msg)
90
else:
91
self.assertLessEqual(value, range.stop, msg=msg)
92
self.assertGreaterEqual(value, range.start, msg=msg)
93
94
def assertLogEqual(self, expected, output):
95
if isinstance(expected, str):
96
expected = (expected,)
97
self.assertEqual(len(expected), len(output), "#log/count")
98
99
for exp, out in zip(expected, output):
100
level, name, message = out.split(":", 2)
101
102
if isinstance(exp, str):
103
return self.assertEqual(exp, message, "#log")
104
105
self.assertEqual(exp[0].lower(), level.lower(), "#log/level")
106
if len(exp) < 3:
107
self.assertEqual(exp[1], message, "#log/message")
108
else:
109
self.assertEqual(exp[1], name , "#log/name")
110
self.assertEqual(exp[2], message, "#log/message")
111
112
def _run_test(self, result):
113
if result.get("#fail"):
114
del result["#fail"]
115
try:
116
self._run_test(result)
117
except AssertionError:
118
return
119
else:
120
self.fail("Test did not fail")
121
122
base, cat, sub = result_categories(result)
123
result.pop("#comment", None)
124
result.pop("#category", None)
125
auth = result.pop("#auth", None)
126
127
extr_url = extractor.find(result["#url"])
128
self.assertTrue(extr_url, "extractor by URL/find")
129
extr_cls = extr = result["#class"].from_url(result["#url"])
130
self.assertTrue(extr_url, "extractor by cls.from_url()")
131
self.assertIs(extr_url.__class__, extr_cls.__class__)
132
133
if len(result) <= 2:
134
return # only matching
135
136
skip = result.pop("#skip", False)
137
if skip:
138
return self._skipped.append((result["#url"], skip))
139
140
if auth is None:
141
auth = (cat in AUTH_REQUIRED)
142
elif not auth:
143
# auth explicitly disabled
144
for key in AUTH_KEYS:
145
config.set((), key, None)
146
147
if auth and not any(extr.config(key) for key in AUTH_KEYS):
148
self._skipped.append((result["#url"], "no auth"))
149
self.skipTest("no auth")
150
151
if "#options" in result:
152
for key, value in result["#options"].items():
153
key = key.split(".")
154
config.set(key[:-1], key[-1], value)
155
if "#range" in result:
156
config.set((), "image-range" , result["#range"])
157
config.set((), "chapter-range", result["#range"])
158
159
tjob = ResultJob(extr,
160
content=("#sha1_content" in result),
161
format=(result.get("#metadata") != "post"))
162
163
if "#exception" in result:
164
with self.assertRaises(result["#exception"], msg="#exception"), \
165
self.assertLogs() as log_info:
166
tjob.run()
167
if "#log" in result:
168
self.assertLogEqual(result["#log"], log_info.output)
169
return
170
171
try:
172
if "#log" in result:
173
with self.assertLogs() as log_info:
174
tjob.run()
175
else:
176
tjob.run()
177
except exception.StopExtraction:
178
pass
179
except exception.HttpError as exc:
180
exc = str(exc)
181
if re.match(r"'5\d\d ", exc) or \
182
re.search(r"\bRead timed out\b", exc):
183
self._skipped.append((result["#url"], exc))
184
self.skipTest(exc)
185
raise
186
187
if "#log" in result:
188
self.assertLogEqual(result["#log"], log_info.output)
189
190
if result.get("#archive", True):
191
self.assertEqual(
192
len(set(tjob.archive_list)),
193
len(tjob.archive_list),
194
msg="archive-id uniqueness")
195
196
if tjob.queue:
197
# test '_extractor' entries
198
for url, kwdict in zip(tjob.url_list, tjob.kwdict_list):
199
if "_extractor" in kwdict:
200
extr = kwdict["_extractor"].from_url(url)
201
if extr is None and not result.get("#extractor", True):
202
continue
203
self.assertIsInstance(extr, kwdict["_extractor"])
204
self.assertEqual(extr.url, url)
205
else:
206
# test 'extension' entries
207
for kwdict in tjob.kwdict_list:
208
self.assertIn("extension", kwdict, msg="#extension")
209
210
# test extraction results
211
if "#sha1_url" in result:
212
self.assertEqual(
213
result["#sha1_url"],
214
tjob.url_hash.hexdigest(),
215
msg="#sha1_url")
216
217
if "#sha1_content" in result:
218
expected = result["#sha1_content"]
219
digest = tjob.content_hash.hexdigest()
220
if isinstance(expected, str):
221
self.assertEqual(expected, digest, msg="#sha1_content")
222
else: # iterable
223
self.assertIn(digest, expected, msg="#sha1_content")
224
225
if "#sha1_metadata" in result:
226
self.assertEqual(
227
result["#sha1_metadata"],
228
tjob.kwdict_hash.hexdigest(),
229
"#sha1_metadata")
230
231
if "#count" in result:
232
count = result["#count"]
233
len_urls = len(tjob.url_list)
234
if isinstance(count, str):
235
self.assertRegex(
236
count, r"^ *(==|!=|<|<=|>|>=) *\d+ *$", msg="#count")
237
expr = f"{len_urls} {count}"
238
self.assertTrue(eval(expr), msg=expr)
239
elif isinstance(count, range):
240
self.assertRange(len_urls, count, msg="#count")
241
else: # assume integer
242
self.assertEqual(len_urls, count, msg="#count")
243
244
if "#pattern" in result:
245
self.assertGreater(len(tjob.url_list), 0, msg="#pattern")
246
pattern = result["#pattern"]
247
if isinstance(pattern, str):
248
for url in tjob.url_list:
249
self.assertRegex(url, pattern, msg="#pattern")
250
else:
251
for url, pat in zip(tjob.url_list, pattern):
252
self.assertRegex(url, pat, msg="#pattern")
253
254
if "#results" in result:
255
expected = result["#results"]
256
if isinstance(expected, str):
257
self.assertTrue(tjob.url_list, msg="#results")
258
self.assertEqual(
259
tjob.url_list[0], expected, msg="#results")
260
else:
261
self.assertSequenceEqual(
262
tjob.url_list, expected, msg="#results")
263
264
metadata = {k: v for k, v in result.items() if k[0] != "#"}
265
if metadata:
266
if result.get("#metadata") == "post":
267
kwdicts = tjob.kwdict_post
268
else:
269
kwdicts = tjob.kwdict_list
270
for kwdict in kwdicts:
271
self._test_kwdict(kwdict, metadata)
272
273
def _test_kwdict(self, kwdict, tests, parent=None):
274
for key, test in tests.items():
275
276
if key.startswith("?"):
277
key = key[1:]
278
if key not in kwdict:
279
continue
280
281
if key.endswith("[*]"):
282
key = key[:-3]
283
subtest = True
284
else:
285
subtest = False
286
287
path = f"{parent}.{key}" if parent else key
288
289
if key.startswith("!"):
290
self.assertNotIn(key[1:], kwdict, msg=path)
291
continue
292
293
self.assertIn(key, kwdict, msg=path)
294
value = kwdict[key]
295
296
if subtest:
297
self.assertNotIsInstance(value, str, msg=path)
298
for idx, item in enumerate(value):
299
subpath = f"{path}[{idx}]"
300
self._test_kwdict_value(item, test, subpath)
301
else:
302
self._test_kwdict_value(value, test, path)
303
304
def _test_kwdict_value(self, value, test, path):
305
if isinstance(test, dict):
306
self._test_kwdict(value, test, path)
307
elif isinstance(test, type):
308
self.assertIsInstance(value, test, msg=path)
309
elif isinstance(test, range):
310
self.assertRange(value, test, msg=path)
311
elif isinstance(test, set):
312
try:
313
self.assertIn(value, test, msg=path)
314
except AssertionError:
315
self.assertIn(type(value), test, msg=path)
316
elif isinstance(test, list):
317
subtest = False
318
for idx, item in enumerate(test):
319
if isinstance(item, dict):
320
subtest = True
321
subpath = f"{path}[{idx}]"
322
try:
323
obj = value[idx]
324
except Exception as exc:
325
self.fail(f"'{exc.__class__.__name__}: {exc}' "
326
f"when accessing {subpath}")
327
self._test_kwdict(obj, item, subpath)
328
if not subtest:
329
self.assertEqual(test, value, msg=path)
330
elif isinstance(test, str):
331
if test.startswith("re:"):
332
self.assertIsInstance(value, str, msg=path)
333
self.assertRegex(value, test[3:], msg=path)
334
elif test.startswith("dt:"):
335
self.assertIsInstance(value, datetime.datetime, msg=path)
336
self.assertEqual(test[3:], str(value), msg=path)
337
elif test.startswith("type:"):
338
self.assertEqual(test[5:], type(value).__name__, msg=path)
339
elif test.startswith("len:"):
340
cls, _, length = test[4:].rpartition(":")
341
if cls:
342
self.assertEqual(
343
cls, type(value).__name__, msg=f"{path}/type")
344
try:
345
len_value = len(value)
346
except Exception:
347
len_value = 0
348
for _ in value:
349
len_value += 1
350
self.assertEqual(int(length), len_value, msg=path)
351
elif test.startswith("iso:"):
352
iso = test[4:]
353
if iso in ("dt", "datetime", "8601"):
354
msg = f"{path} / ISO 8601"
355
try:
356
dt = datetime.datetime.fromisoformat(value)
357
except Exception as exc:
358
self.fail(f"Invalid datetime '{value}': {exc} {msg}")
359
self.assertIsInstance(dt, datetime.datetime, msg=msg)
360
elif iso in ("lang", "639", "639-1"):
361
msg = f"{path} / ISO 639-1"
362
self.assertIsInstance(value, str, msg=msg)
363
self.assertRegex(value, r"^[a-z]{2}(-\w+)?$", msg=msg)
364
elif iso in ("uuid", "11578", "11578:1996", "4122"):
365
msg = f"{path} / ISO 11578:1996"
366
pat = (r"(?i)[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-"
367
r"[0-9a-f]{4}-[0-9a-f]{12}")
368
self.assertIsInstance(value, str, msg=msg)
369
self.assertRegex(value, pat, msg=msg)
370
else:
371
self.fail(f"Unsupported ISO test '{test}'")
372
else:
373
self.assertEqual(test, value, msg=path)
374
else:
375
self.assertEqual(test, value, msg=path)
376
377
378
class ResultJob(job.DownloadJob):
379
"""Generate test-results for extractor runs"""
380
381
def __init__(self, url, parent=None, content=False, format=True):
382
job.DownloadJob.__init__(self, url, parent)
383
self.queue = False
384
self.content = content
385
386
self.url_list = []
387
self.url_hash = hashlib.sha1()
388
self.kwdict_list = []
389
self.kwdict_post = []
390
self.kwdict_hash = hashlib.sha1()
391
self.archive_list = []
392
self.archive_hash = hashlib.sha1()
393
self.content_hash = hashlib.sha1()
394
395
if content:
396
self.fileobj = TestPathfmt(self.content_hash)
397
else:
398
self._update_content = lambda url, kwdict: None
399
400
if format:
401
self.format_directory = TestFormatter(
402
"".join(self.extractor.directory_fmt)).format_map
403
self.format_filename = TestFormatter(
404
self.extractor.filename_fmt).format_map
405
self.format_archive = TestFormatter(
406
self.extractor.archive_fmt).format_map
407
else:
408
self.format_directory = \
409
self.format_filename = \
410
self.format_archive = lambda kwdict: ""
411
412
def run(self):
413
self._init()
414
for msg in self.extractor:
415
self.dispatch(msg)
416
417
def handle_url(self, url, kwdict, fallback=None):
418
self._update_url(url)
419
self._update_kwdict(kwdict)
420
self._update_archive(kwdict)
421
self._update_content(url, kwdict)
422
self.format_filename(kwdict)
423
424
def handle_directory(self, kwdict):
425
self._update_kwdict(kwdict, False)
426
self.format_directory(kwdict)
427
428
def handle_metadata(self, kwdict):
429
pass
430
431
def handle_queue(self, url, kwdict):
432
self.queue = True
433
self._update_url(url)
434
self._update_kwdict(kwdict)
435
436
def _update_url(self, url):
437
self.url_list.append(url)
438
self.url_hash.update(url.encode())
439
440
def _update_kwdict(self, kwdict, to_list=True):
441
if to_list:
442
self.kwdict_list.append(kwdict.copy())
443
else:
444
self.kwdict_post.append(kwdict.copy())
445
kwdict = util.filter_dict(kwdict)
446
self.kwdict_hash.update(
447
json.dumps(kwdict, sort_keys=True, default=str).encode())
448
449
def _update_archive(self, kwdict):
450
archive_id = self.format_archive(kwdict)
451
self.archive_list.append(archive_id)
452
self.archive_hash.update(archive_id.encode())
453
454
def _update_content(self, url, kwdict):
455
self.fileobj.kwdict = kwdict
456
457
downloader = self.get_downloader(url.partition(":")[0])
458
if downloader.download(url, self.fileobj):
459
return
460
461
for num, url in enumerate(kwdict.get("_fallback") or (), 1):
462
self.log.warning("Trying fallback URL #%d", num)
463
downloader = self.get_downloader(url.partition(":")[0])
464
if downloader.download(url, self.fileobj):
465
return
466
467
468
class TestPathfmt():
469
470
def __init__(self, hashobj):
471
self.hashobj = hashobj
472
self.path = ""
473
self.size = 0
474
self.kwdict = {}
475
self.extension = "jpg"
476
477
def __enter__(self):
478
return self
479
480
def __exit__(self, exc_type, exc_value, traceback):
481
pass
482
483
def open(self, mode):
484
self.size = 0
485
return self
486
487
def write(self, content):
488
"""Update SHA1 hash"""
489
self.size += len(content)
490
self.hashobj.update(content)
491
492
def tell(self):
493
return self.size
494
495
def part_size(self):
496
return 0
497
498
499
class TestFormatter(formatter.StringFormatter):
500
501
def _apply_simple(self, key, fmt):
502
if key == "extension" or "_parse_optional." in repr(fmt):
503
def wrap(obj):
504
try:
505
return fmt(obj[key])
506
except KeyError:
507
return ""
508
else:
509
def wrap(obj):
510
return fmt(obj[key])
511
return wrap
512
513
def _apply(self, key, funcs, fmt):
514
if key == "extension" or "_parse_optional." in repr(fmt):
515
def wrap(obj):
516
obj = obj[key] if key in obj else ""
517
for func in funcs:
518
obj = func(obj)
519
return fmt(obj)
520
else:
521
def wrap(obj):
522
obj = obj[key]
523
for func in funcs:
524
obj = func(obj)
525
return fmt(obj)
526
return wrap
527
528
529
def setup_test_config():
530
config._config.update(CONFIG)
531
532
533
def load_test_config():
534
try:
535
path = os.path.join(
536
os.path.dirname(os.path.dirname(__file__)),
537
"archive", "config.json")
538
with open(path) as fp:
539
CONFIG.update(json.loads(fp.read()))
540
except FileNotFoundError:
541
pass
542
except Exception as exc:
543
sys.exit(f"Error when loading {path}: {exc.__class__.__name__}: {exc}")
544
545
546
def result_categories(result):
547
categories = result.get("#category")
548
if categories:
549
return categories
550
551
cls = result["#class"]
552
return cls.basecategory, cls.category, cls.subcategory
553
554
555
def generate_tests():
556
"""Dynamically generate extractor unittests"""
557
def _generate_method(result):
558
def test(self):
559
sys.stdout.write(f"\n{result['#url']}\n")
560
if "#comment" in result:
561
sys.stdout.write(f"# {result['#comment']}\n")
562
563
try:
564
self._run_test(result)
565
except KeyboardInterrupt as exc:
566
v = input("\n[e]xit | [f]ail | [S]kip ? ").strip().lower()
567
if v in ("e", "exit"):
568
raise
569
if v in ("f", "fail"):
570
self.fail("manual test failure")
571
else:
572
self._skipped.append((result["#url"], "manual skip"))
573
self.skipTest(exc)
574
return test
575
576
# enable selective testing for direct calls
577
if __name__ == "__main__" and len(sys.argv) > 1:
578
category, _, subcategory = sys.argv[1].partition(":")
579
del sys.argv[1:]
580
581
if category.startswith("+"):
582
basecategory = category[1:].lower()
583
tests = [t for t in results.all()
584
if result_categories(t)[0].lower() == basecategory]
585
else:
586
tests = results.category(category)
587
588
if subcategory:
589
if subcategory.startswith("+"):
590
url = subcategory[1:]
591
tests = [t for t in tests if url in t["#url"]]
592
elif subcategory.startswith("~"):
593
com = subcategory[1:]
594
tests = [t for t in tests
595
if "#comment" in t and com in t["#comment"].lower()]
596
else:
597
tests = [t for t in tests
598
if result_categories(t)[-1] == subcategory]
599
else:
600
tests = results.all()
601
602
# add 'test_...' methods
603
enum = collections.defaultdict(int)
604
for result in tests:
605
base, cat, sub = result_categories(result)
606
name = f"{cat}_{sub}"
607
enum[name] += 1
608
609
method = _generate_method(result)
610
method.__doc__ = result["#url"]
611
method.__name__ = f"test_{name}_{enum[name]}"
612
setattr(TestExtractorResults, method.__name__, method)
613
614
615
generate_tests()
616
if __name__ == "__main__":
617
load_test_config()
618
unittest.main(warnings="ignore")
619
620