Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/test/test_results.py
8830 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
# Copyright 2015-2026 Mike Fährmann
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License version 2 as
8
# published by the Free Software Foundation.
9
10
import os
11
import sys
12
import unittest
13
14
import re
15
import json
16
import hashlib
17
import datetime
18
import collections
19
20
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
21
from gallery_dl import \
22
extractor, util, job, config, exception, formatter # noqa E402
23
24
25
RESULTS = os.environ.get("GDL_TEST_RESULTS")
26
if RESULTS:
27
results = util.import_file(RESULTS)
28
else:
29
from test import results
30
31
32
# temporary issues, etc.
33
BROKEN = {
34
}
35
36
CONFIG = {
37
"cache": {
38
"file": None,
39
},
40
"downloader": {
41
"adjust-extensions": False,
42
"part": False,
43
},
44
}
45
46
47
AUTH_REQUIRED = {
48
"pixiv",
49
"nijie",
50
"horne",
51
"reddit",
52
"seiga",
53
"fantia",
54
"instagram",
55
"twitter",
56
"poipiku",
57
}
58
59
AUTH_KEYS = {
60
"username",
61
"cookies",
62
"api-key",
63
"client-id",
64
"access-token",
65
"refresh-token",
66
}
67
68
69
class TestExtractorResults(unittest.TestCase):
70
71
def setUp(self):
72
setup_test_config()
73
74
def tearDown(self):
75
config.clear()
76
77
@classmethod
78
def setUpClass(cls):
79
cls._skipped = []
80
81
@classmethod
82
def tearDownClass(cls):
83
if cls._skipped:
84
sys.stdout.write("\n\nSkipped tests:\n")
85
for url, reason in cls._skipped:
86
sys.stdout.write(f'- {url} ("{reason}")\n')
87
88
def assertRange(self, value, range, msg=None):
89
if range.step > 1:
90
self.assertIn(value, range, msg=msg)
91
else:
92
self.assertLessEqual(value, range.stop, msg=msg)
93
self.assertGreaterEqual(value, range.start, msg=msg)
94
95
def assertLogEqual(self, expected, output):
96
if isinstance(expected, str):
97
expected = (expected,)
98
self.assertEqual(len(expected), len(output), "#log/count")
99
100
for exp, out in zip(expected, output):
101
level, name, message = out.split(":", 2)
102
103
if isinstance(exp, str):
104
return self.assertEqual(exp, message, "#log")
105
106
self.assertEqual(exp[0].lower(), level.lower(), "#log/level")
107
if len(exp) < 3:
108
self.assertEqual(exp[1], message, "#log/message")
109
else:
110
self.assertEqual(exp[1], name , "#log/name")
111
self.assertEqual(exp[2], message, "#log/message")
112
113
def _run_test(self, result):
114
if result.get("#fail"):
115
del result["#fail"]
116
try:
117
self._run_test(result)
118
except AssertionError:
119
return
120
else:
121
self.fail("Test did not fail")
122
123
base, cat, sub = result_categories(result)
124
result.pop("#comment", None)
125
result.pop("#category", None)
126
auth = result.pop("#auth", None)
127
128
extr_url = extractor.find(result["#url"])
129
self.assertTrue(extr_url, "extractor by URL/find")
130
extr_cls = extr = result["#class"].from_url(result["#url"])
131
self.assertTrue(extr_url, "extractor by cls.from_url()")
132
self.assertIs(extr_url.__class__, extr_cls.__class__)
133
134
if len(result) <= 2:
135
return # only matching
136
137
skip = result.pop("#skip", False)
138
if skip:
139
return self._skipped.append((result["#url"], skip))
140
141
if auth is None:
142
auth = (cat in AUTH_REQUIRED)
143
elif not auth:
144
# auth explicitly disabled
145
for key in AUTH_KEYS:
146
config.set((), key, None)
147
148
if auth and not self._has_auth(extr, auth):
149
self._skipped.append((result["#url"], "no auth"))
150
self.skipTest("no auth")
151
152
if "#options" in result:
153
for key, value in result["#options"].items():
154
key = key.split(".")
155
config.set(key[:-1], key[-1], value)
156
if "#range" in result:
157
config.set((), "image-range" , result["#range"])
158
config.set((), "chapter-range", result["#range"])
159
160
tjob = ResultJob(extr,
161
content=("#sha1_content" in result),
162
format=(result.get("#metadata") != "post"))
163
164
if "#exception" in result:
165
exc = result["#exception"]
166
if isinstance(exc, str):
167
exc = getattr(exception, exc, None)
168
with self.assertRaises(exc, msg="#exception"), \
169
self.assertLogs() as log_info:
170
tjob.run()
171
if "#log" in result:
172
self.assertLogEqual(result["#log"], log_info.output)
173
return
174
175
try:
176
if "#log" in result:
177
with self.assertLogs() as log_info:
178
tjob.run()
179
else:
180
tjob.run()
181
except exception.StopExtraction:
182
pass
183
except exception.HttpError as exc:
184
exc = str(exc)
185
if re.match(r"'5\d\d ", exc) or \
186
re.search(r"\bRead timed out\b", exc):
187
self._skipped.append((result["#url"], exc))
188
self.skipTest(exc)
189
raise
190
191
if "#log" in result:
192
self.assertLogEqual(result["#log"], log_info.output)
193
194
if result.get("#archive", True):
195
self.assertEqual(
196
len(set(tjob.archive_list)),
197
len(tjob.archive_list),
198
msg="archive-id uniqueness")
199
200
if tjob.queue:
201
# test '_extractor' entries
202
for url, kwdict in zip(tjob.url_list, tjob.kwdict_list):
203
if "_extractor" in kwdict:
204
extr = kwdict["_extractor"].from_url(url)
205
if extr is None and not result.get("#extractor", True):
206
continue
207
self.assertIsInstance(extr, kwdict["_extractor"], msg=url)
208
self.assertEqual(extr.url, url)
209
else:
210
# test 'extension' entries
211
for kwdict in tjob.kwdict_list:
212
self.assertIn("extension", kwdict, msg="#extension")
213
214
# test extraction results
215
if "#sha1_url" in result:
216
self.assertEqual(
217
result["#sha1_url"],
218
tjob.url_hash.hexdigest(),
219
msg="#sha1_url")
220
221
if "#sha1_content" in result:
222
expected = result["#sha1_content"]
223
digest = tjob.content_hash.hexdigest()
224
if isinstance(expected, str):
225
self.assertEqual(expected, digest, msg="#sha1_content")
226
else: # iterable
227
self.assertIn(digest, expected, msg="#sha1_content")
228
229
if "#sha1_metadata" in result:
230
self.assertEqual(
231
result["#sha1_metadata"],
232
tjob.kwdict_hash.hexdigest(),
233
"#sha1_metadata")
234
235
if "#count" in result:
236
count = result["#count"]
237
len_urls = len(tjob.url_list)
238
if isinstance(count, str):
239
self.assertRegex(
240
count, r"^ *(==|!=|<|<=|>|>=) *\d+ *$", msg="#count")
241
expr = f"{len_urls} {count}"
242
self.assertTrue(eval(expr), msg=expr)
243
elif isinstance(count, range):
244
self.assertRange(len_urls, count, msg="#count")
245
else: # assume integer
246
self.assertEqual(len_urls, count, msg="#count")
247
248
if "#pattern" in result:
249
self.assertGreater(len(tjob.url_list), 0, msg="#pattern")
250
pattern = result["#pattern"]
251
if isinstance(pattern, str):
252
for url in tjob.url_list:
253
self.assertRegex(url, pattern, msg="#pattern")
254
else:
255
for url, pat in zip(tjob.url_list, pattern):
256
self.assertRegex(url, pat, msg="#pattern")
257
258
if "#results" in result:
259
expected = result["#results"]
260
if isinstance(expected, str):
261
self.assertTrue(tjob.url_list, msg="#results")
262
self.assertEqual(
263
tjob.url_list[0], expected, msg="#results")
264
else:
265
self.assertSequenceEqual(
266
tjob.url_list, expected, msg="#results")
267
268
metadata = {k: v for k, v in result.items() if k[0] != "#"}
269
if metadata:
270
if result.get("#metadata") == "post":
271
kwdicts = tjob.kwdict_post
272
else:
273
kwdicts = tjob.kwdict_list
274
for kwdict in kwdicts:
275
self._test_kwdict(kwdict, metadata)
276
277
def _has_auth(self, extr, auth):
278
if auth is True:
279
auth = AUTH_KEYS
280
281
if isinstance(auth, str):
282
return extr.config(auth)
283
if isinstance(auth, set):
284
return any(self._has_auth(extr, a) for a in auth)
285
if isinstance(auth, (tuple, list)):
286
return all(self._has_auth(extr, k) for k in auth)
287
288
self.fail(f"Invalid '#auth' value: {auth!r}")
289
290
def _test_kwdict(self, kwdict, tests, parent=None):
291
for key, test in tests.items():
292
293
if key.startswith("?"):
294
key = key[1:]
295
if key not in kwdict:
296
continue
297
298
if key.endswith("[*]"):
299
key = key[:-3]
300
subtest = True
301
else:
302
subtest = False
303
304
path = f"{parent}.{key}" if parent else key
305
306
if key.startswith("!"):
307
self.assertNotIn(key[1:], kwdict, msg=path)
308
continue
309
310
self.assertIn(key, kwdict, msg=path)
311
value = kwdict[key]
312
313
if subtest:
314
self.assertNotIsInstance(value, str, msg=path)
315
for idx, item in enumerate(value):
316
subpath = f"{path}[{idx}]"
317
self._test_kwdict_value(item, test, subpath)
318
else:
319
self._test_kwdict_value(value, test, path)
320
321
def _test_kwdict_value(self, value, test, path):
322
if isinstance(test, dict):
323
self._test_kwdict(value, test, path)
324
elif isinstance(test, type):
325
self.assertIsInstance(value, test, msg=path)
326
elif isinstance(test, range):
327
self.assertRange(value, test, msg=path)
328
elif isinstance(test, set):
329
if isinstance(value, list):
330
value = tuple(value)
331
for item in test:
332
if isinstance(item, type) and isinstance(value, item) or \
333
value == item:
334
break
335
else:
336
v = type(value) if len(str(value)) > 64 else value
337
self.fail(f"{v!r} not in {test}: {path}")
338
elif isinstance(test, list):
339
subtest = False
340
for idx, item in enumerate(test):
341
if isinstance(item, dict):
342
subtest = True
343
subpath = f"{path}[{idx}]"
344
try:
345
obj = value[idx]
346
except Exception as exc:
347
self.fail(f"'{exc.__class__.__name__}: {exc}' "
348
f"when accessing {subpath}")
349
self._test_kwdict(obj, item, subpath)
350
if not subtest:
351
self.assertEqual(test, value, msg=path)
352
elif isinstance(test, str):
353
if test.startswith("re:"):
354
self.assertIsInstance(value, str, msg=path)
355
self.assertRegex(value, test[3:], msg=path)
356
elif test.startswith("dt:"):
357
self.assertIsInstance(value, datetime.datetime, msg=path)
358
self.assertEqual(test[3:], str(value), msg=path)
359
elif test.startswith("type:"):
360
self.assertEqual(test[5:], type(value).__name__, msg=path)
361
elif test.startswith("len:"):
362
cls, _, length = test[4:].rpartition(":")
363
if cls:
364
self.assertEqual(
365
cls, type(value).__name__, msg=f"{path}/type")
366
try:
367
len_value = len(value)
368
except Exception:
369
len_value = 0
370
for _ in value:
371
len_value += 1
372
self.assertEqual(int(length), len_value, msg=path)
373
elif test.startswith("hash:"):
374
digest = test[5:].lower()
375
msg = f"{path} / {digest}"
376
if digest == "md5":
377
self.assertRegex(value, r"^[0-9a-fA-F]{32}$", msg)
378
elif digest == "sha1":
379
self.assertRegex(value, r"^[0-9a-fA-F]{40}$", msg)
380
elif digest == "sha256":
381
self.assertRegex(value, r"^[0-9a-fA-F]{64}$", msg)
382
elif digest == "sha512":
383
self.assertRegex(value, r"^[0-9a-fA-F]{128}$", msg)
384
elif test.startswith("iso:"):
385
iso = test[4:]
386
if iso in ("dt", "datetime", "8601"):
387
msg = f"{path} / ISO 8601"
388
try:
389
dt = datetime.datetime.fromisoformat(value)
390
except Exception as exc:
391
self.fail(f"Invalid datetime '{value}': {exc} {msg}")
392
self.assertIsInstance(dt, datetime.datetime, msg=msg)
393
elif iso in ("lang", "639", "639-1"):
394
msg = f"{path} / ISO 639-1"
395
self.assertIsInstance(value, str, msg=msg)
396
self.assertRegex(value, r"^[a-z]{2}(-\w+)?$", msg=msg)
397
elif iso in ("uuid", "11578", "11578:1996", "4122"):
398
msg = f"{path} / ISO 11578:1996"
399
pat = (r"(?i)[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-"
400
r"[0-9a-f]{4}-[0-9a-f]{12}")
401
self.assertIsInstance(value, str, msg=msg)
402
self.assertRegex(value, pat, msg=msg)
403
else:
404
self.fail(f"Unsupported ISO test '{test}'")
405
else:
406
self.assertEqual(test, value, msg=path)
407
else:
408
self.assertEqual(test, value, msg=path)
409
410
411
class ResultJob(job.DownloadJob):
412
"""Generate test-results for extractor runs"""
413
414
def __init__(self, url, parent=None, content=False, format=True):
415
job.DownloadJob.__init__(self, url, parent)
416
self.queue = False
417
self.content = content
418
419
self.format = format
420
self.url_list = []
421
self.url_hash = hashlib.sha1()
422
self.kwdict_list = []
423
self.kwdict_post = []
424
self.kwdict_hash = hashlib.sha1()
425
self.archive_list = []
426
self.archive_hash = hashlib.sha1()
427
self.content_hash = hashlib.sha1()
428
429
if content:
430
self.fileobj = TestPathfmt(self.content_hash)
431
else:
432
self._update_content = lambda url, kwdict: None
433
434
def run(self):
435
self._init()
436
self.dispatch(self.extractor)
437
438
def handle_url(self, url, kwdict, fallback=None):
439
self._update_url(url)
440
self._update_kwdict(kwdict)
441
self._update_archive(kwdict)
442
self._update_content(url, kwdict)
443
self.format_filename(kwdict)
444
445
def handle_directory(self, kwdict):
446
if self.format is not None:
447
if self.format:
448
self.format_directory = TestFormatter(
449
"".join(self.extractor.directory_fmt)).format_map
450
self.format_filename = TestFormatter(
451
self.extractor.filename_fmt).format_map
452
self.format_archive = TestFormatter(
453
self.extractor.archive_fmt).format_map
454
else:
455
self.format_directory = \
456
self.format_filename = \
457
self.format_archive = lambda kwdict: ""
458
self.format = None
459
460
self._update_kwdict(kwdict, False)
461
self.format_directory(kwdict)
462
463
def handle_metadata(self, kwdict):
464
pass
465
466
def handle_queue(self, url, kwdict):
467
self.queue = True
468
self._update_url(url)
469
self._update_kwdict(kwdict)
470
471
def _update_url(self, url):
472
self.url_list.append(url)
473
self.url_hash.update(url.encode())
474
475
def _update_kwdict(self, kwdict, to_list=True):
476
if to_list:
477
self.kwdict_list.append(kwdict.copy())
478
else:
479
self.kwdict_post.append(kwdict.copy())
480
kwdict = util.filter_dict(kwdict)
481
self.kwdict_hash.update(
482
json.dumps(kwdict, sort_keys=True, default=str).encode())
483
484
def _update_archive(self, kwdict):
485
archive_id = self.format_archive(kwdict)
486
self.archive_list.append(archive_id)
487
self.archive_hash.update(archive_id.encode())
488
489
def _update_content(self, url, kwdict):
490
self.fileobj.kwdict = kwdict
491
492
downloader = self.get_downloader(url.partition(":")[0])
493
if downloader.download(url, self.fileobj):
494
return
495
496
for num, url in enumerate(kwdict.get("_fallback") or (), 1):
497
self.log.warning("Trying fallback URL #%d", num)
498
downloader = self.get_downloader(url.partition(":")[0])
499
if downloader.download(url, self.fileobj):
500
return
501
502
503
class TestPathfmt():
504
505
def __init__(self, hashobj):
506
self.hashobj = hashobj
507
self.path = ""
508
self.size = 0
509
self.kwdict = {}
510
self.extension = "jpg"
511
512
def __enter__(self):
513
return self
514
515
def __exit__(self, exc_type, exc_value, traceback):
516
pass
517
518
def open(self, mode):
519
self.size = 0
520
return self
521
522
def write(self, content):
523
"""Update SHA1 hash"""
524
self.size += len(content)
525
self.hashobj.update(content)
526
527
def tell(self):
528
return self.size
529
530
def part_size(self):
531
return 0
532
533
534
class TestFormatter(formatter.StringFormatter):
535
536
def _apply_simple(self, key, fmt):
537
if key == "extension" or "_parse_optional." in repr(fmt):
538
def wrap(obj):
539
try:
540
return fmt(obj[key])
541
except KeyError:
542
return ""
543
elif "<function identity at " in repr(fmt):
544
def wrap(obj):
545
return "".join(obj[key])
546
else:
547
def wrap(obj):
548
return fmt(obj[key])
549
return wrap
550
551
def _apply(self, key, funcs, fmt):
552
if key == "extension" or "_parse_optional." in repr(fmt):
553
def wrap(obj):
554
obj = obj[key] if key in obj else ""
555
for func in funcs:
556
obj = func(obj)
557
return fmt(obj)
558
elif "<function identity at " in repr(fmt):
559
def wrap(obj):
560
return "".join(obj[key])
561
else:
562
def wrap(obj):
563
obj = obj[key]
564
for func in funcs:
565
obj = func(obj)
566
return fmt(obj)
567
return wrap
568
569
570
def setup_test_config():
571
config._config.update(CONFIG)
572
573
574
def load_test_config():
575
try:
576
path = os.path.join(
577
os.path.dirname(os.path.dirname(__file__)),
578
"archive", "config.json")
579
with open(path) as fp:
580
CONFIG.update(json.loads(fp.read()))
581
except FileNotFoundError:
582
pass
583
except Exception as exc:
584
sys.exit(f"Error when loading {path}: {exc.__class__.__name__}: {exc}")
585
586
587
def result_categories(result):
588
categories = result.get("#category")
589
if categories:
590
return categories
591
592
cls = result["#class"]
593
return cls.basecategory, cls.category, cls.subcategory
594
595
596
def generate_tests():
597
"""Dynamically generate extractor unittests"""
598
def _generate_method(result):
599
def test(self):
600
sys.stdout.write(f"\n{result['#url']}\n")
601
if "#comment" in result:
602
sys.stdout.write(f"# {result['#comment']}\n")
603
604
try:
605
self._run_test(result)
606
except KeyboardInterrupt as exc:
607
v = input("\n[e]xit | [f]ail | [S]kip ? ").strip().lower()
608
if v in ("e", "exit"):
609
raise
610
if v in ("f", "fail"):
611
self.fail("manual test failure")
612
else:
613
self._skipped.append((result["#url"], "manual skip"))
614
self.skipTest(exc)
615
return test
616
617
# enable selective testing for direct calls
618
if __name__ == "__main__" and len(sys.argv) > 1:
619
category, _, subcategory = sys.argv[1].partition(":")
620
del sys.argv[1:]
621
622
if category.startswith("+"):
623
basecategory = category[1:].lower()
624
tests = [t for t in results.all()
625
if result_categories(t)[0].lower() == basecategory]
626
else:
627
tests = results.category(category)
628
629
if subcategory:
630
if subcategory.startswith("+"):
631
url = subcategory[1:]
632
tests = [t for t in tests if url in t["#url"]]
633
elif subcategory.startswith("~"):
634
com = subcategory[1:]
635
tests = [t for t in tests
636
if "#comment" in t and com in t["#comment"].lower()]
637
else:
638
tests = [t for t in tests
639
if result_categories(t)[-1] == subcategory]
640
else:
641
tests = results.all()
642
643
# add 'test_...' methods
644
enum = collections.defaultdict(int)
645
for result in tests:
646
base, cat, sub = result_categories(result)
647
name = f"{cat}_{sub}"
648
enum[name] += 1
649
650
method = _generate_method(result)
651
method.__doc__ = result["#url"]
652
method.__name__ = f"test_{name}_{enum[name]}"
653
setattr(TestExtractorResults, method.__name__, method)
654
655
656
generate_tests()
657
if __name__ == "__main__":
658
load_test_config()
659
unittest.main(warnings="ignore")
660
661