Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/test/test_job.py
8830 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
# Copyright 2021-2026 Mike Fährmann
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License version 2 as
8
# published by the Free Software Foundation.
9
10
import os
11
import sys
12
import unittest
13
from unittest.mock import patch
14
15
import io
16
17
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
18
from gallery_dl import job, config, text # noqa E402
19
from gallery_dl.extractor.common import Extractor, Message # noqa E402
20
21
22
class TestJob(unittest.TestCase):
23
24
def tearDown(self):
25
config.clear()
26
27
def _capture_stdout(self, extr_or_job):
28
if isinstance(extr_or_job, Extractor):
29
jobinstance = self.jobclass(extr_or_job)
30
else:
31
jobinstance = extr_or_job
32
33
with io.StringIO() as buffer:
34
stdout = sys.stdout
35
sys.stdout = buffer
36
try:
37
jobinstance.run()
38
finally:
39
sys.stdout = stdout
40
41
return buffer.getvalue()
42
43
44
class TestDownloadJob(TestJob):
45
jobclass = job.DownloadJob
46
47
def test_extractor_filter(self):
48
extr = TestExtractor.from_url("test:")
49
tjob = self.jobclass(extr)
50
51
func = tjob._build_extractor_filter()
52
self.assertEqual(func(TestExtractor) , False)
53
self.assertEqual(func(TestExtractorParent), False)
54
self.assertEqual(func(TestExtractorNoop) , True)
55
56
config.set((), "blacklist", ":test_subcategory")
57
func = tjob._build_extractor_filter()
58
self.assertEqual(func(TestExtractor) , False)
59
self.assertEqual(func(TestExtractorParent), True)
60
self.assertEqual(func(TestExtractorNoop) , False)
61
62
config.set((), "whitelist", "test_category:test_subcategory")
63
func = tjob._build_extractor_filter()
64
self.assertEqual(func(TestExtractor) , True)
65
self.assertEqual(func(TestExtractorParent), False)
66
self.assertEqual(func(TestExtractorNoop) , False)
67
68
def test_opt_init(self):
69
config.set((), "init", True)
70
config.set((), "archive", ":memory:")
71
config.set((), "postprocessors", "directory")
72
73
extr = TestExtractorNoop.from_url("test:noop")
74
tjob = self.jobclass(extr)
75
tjob._init()
76
77
self.assertTrue(tjob.pathfmt)
78
self.assertTrue(tjob.archive)
79
self.assertTrue(tjob.hooks)
80
81
def test_opt_init_false(self):
82
config.set((), "init", False)
83
config.set((), "archive", ":memory:")
84
config.set((), "postprocessors", "directory")
85
86
extr = TestExtractorNoop.from_url("test:noop")
87
tjob = self.jobclass(extr)
88
tjob._init()
89
90
self.assertFalse(tjob.pathfmt)
91
self.assertFalse(tjob.archive)
92
self.assertFalse(tjob.hooks)
93
94
def test_parent_metadata_extractor(self):
95
config.set((), "parent-metadata", True)
96
97
config.set(("output",), "mode", False)
98
config.set((), "download", False)
99
100
config.set((), "postprocessors", [{
101
"name" : "metadata/print@init",
102
"format": "{num}",
103
}])
104
105
extr = TestExtractorParent.from_url("test:parent:3")
106
out = self._capture_stdout(extr)
107
# no output if '_extractor' is overwritten (#8958)
108
self.assertEqual(out, "11\n")
109
110
111
class TestKeywordJob(TestJob):
112
jobclass = job.KeywordJob
113
114
def test_default(self):
115
self.maxDiff = None
116
extr = TestExtractor.from_url("test:self")
117
self.assertEqual(self._capture_stdout(extr), """\
118
Keywords for directory names:
119
-----------------------------
120
author['id']
121
123
122
author['name']
123
test
124
author['self']
125
<circular reference>
126
category
127
test_category
128
subcategory
129
test_subcategory
130
user['id']
131
123
132
user['name']
133
test
134
user['self']
135
<circular reference>
136
137
Keywords for filenames and --filter:
138
------------------------------------
139
author['id']
140
123
141
author['name']
142
test
143
author['self']
144
<circular reference>
145
category
146
test_category
147
extension
148
jpg
149
filename
150
1
151
num
152
1
153
subcategory
154
test_subcategory
155
tags[N]
156
0 foo
157
1 bar
158
2 テスト
159
user['id']
160
123
161
user['name']
162
test
163
user['self']
164
<circular reference>
165
""")
166
167
def test_opt_init(self):
168
config.set((), "init", True)
169
170
extr = TestExtractorNoop.from_url("test:noop")
171
tjob = self.jobclass(extr)
172
tjob._init()
173
174
175
class TestUrlJob(TestJob):
176
jobclass = job.UrlJob
177
178
def test_default(self):
179
extr = TestExtractor.from_url("test:")
180
self.assertEqual(self._capture_stdout(extr), """\
181
https://example.org/1.jpg
182
https://example.org/2.jpg
183
https://example.org/3.jpg
184
""")
185
186
def test_fallback(self):
187
extr = TestExtractor.from_url("test:")
188
tjob = self.jobclass(extr)
189
tjob.handle_url = tjob.handle_url_fallback
190
191
self.assertEqual(self._capture_stdout(tjob), """\
192
https://example.org/1.jpg
193
| https://example.org/alt/1.jpg
194
https://example.org/2.jpg
195
| https://example.org/alt/2.jpg
196
https://example.org/3.jpg
197
| https://example.org/alt/3.jpg
198
""")
199
200
def test_parent(self):
201
extr = TestExtractorParent.from_url("test:parent")
202
self.assertEqual(self._capture_stdout(extr), """\
203
test:child
204
test:child
205
test:child
206
""")
207
208
def test_child(self):
209
extr = TestExtractorParent.from_url("test:parent")
210
tjob = self.jobclass(extr, depth=0)
211
self.assertEqual(self._capture_stdout(tjob), 3 * """\
212
https://example.org/1.jpg
213
https://example.org/2.jpg
214
https://example.org/3.jpg
215
""")
216
217
def test_opt_init(self):
218
config.set((), "init", True)
219
220
extr = TestExtractorNoop.from_url("test:noop")
221
tjob = self.jobclass(extr)
222
tjob._init()
223
224
def test_opt_follow(self):
225
config.set((), "follow", "{user[bio]}")
226
227
extr = TestExtractor.from_url("test:urls")
228
tjob = self.jobclass(extr)
229
self.assertEqual(self._capture_stdout(tjob), """\
230
https://example.org/1.jpg
231
https://example.org/2.jpg
232
https://example.org/3.jpg
233
https://example1.org/content/abc
234
https://example2.org/content?query=123
235
https://example3.org/content/#frag
236
""")
237
238
239
class TestInfoJob(TestJob):
240
jobclass = job.InfoJob
241
242
def test_default(self):
243
extr = TestExtractor.from_url("test:")
244
self.assertEqual(self._capture_stdout(extr), """\
245
Category / Subcategory
246
"test_category" / "test_subcategory"
247
248
Filename format (default):
249
"test_{filename}.{extension}"
250
251
Directory format (default):
252
["{category}"]
253
254
""")
255
256
def test_custom(self):
257
config.set((), "filename", "custom")
258
config.set((), "directory", ("custom",))
259
config.set((), "sleep-request", 321)
260
extr = TestExtractor.from_url("test:")
261
extr.request_interval = 123.456
262
263
self.assertEqual(self._capture_stdout(extr), """\
264
Category / Subcategory
265
"test_category" / "test_subcategory"
266
267
Filename format (custom):
268
"custom"
269
Filename format (default):
270
"test_{filename}.{extension}"
271
272
Directory format (custom):
273
["custom"]
274
Directory format (default):
275
["{category}"]
276
277
Request interval (custom):
278
321
279
Request interval (default):
280
123.456
281
282
""")
283
284
def test_base_category(self):
285
extr = TestExtractor.from_url("test:")
286
extr.basecategory = "test_basecategory"
287
extr.basesubcategory = "test_basesubcategory"
288
289
self.assertEqual(self._capture_stdout(extr), """\
290
Category / Subcategory / Basecategory
291
"test_category" / "test_subcategory" / "test_basecategory"
292
293
Filename format (default):
294
"test_{filename}.{extension}"
295
296
Directory format (default):
297
["{category}"]
298
299
""")
300
301
def test_opt_init(self):
302
config.set((), "init", True)
303
304
extr = TestExtractorNoop.from_url("test:noop")
305
tjob = self.jobclass(extr)
306
tjob._init()
307
308
309
class TestDataJob(TestJob):
310
jobclass = job.DataJob
311
312
def test_default(self):
313
extr = TestExtractor.from_url("test:")
314
tjob = self.jobclass(extr, file=io.StringIO())
315
user = {"id": 123, "name": "test"}
316
317
tjob.run()
318
319
self.assertEqual(tjob.data, [
320
(Message.Directory, {
321
"category" : "test_category",
322
"subcategory": "test_subcategory",
323
"user" : user,
324
"author" : user,
325
}),
326
(Message.Url, "https://example.org/1.jpg", {
327
"category" : "test_category",
328
"subcategory": "test_subcategory",
329
"filename" : "1",
330
"extension" : "jpg",
331
"num" : 1,
332
"tags" : ["foo", "bar", "テスト"],
333
"user" : user,
334
"author" : user,
335
}),
336
(Message.Url, "https://example.org/2.jpg", {
337
"category" : "test_category",
338
"subcategory": "test_subcategory",
339
"filename" : "2",
340
"extension" : "jpg",
341
"num" : 2,
342
"tags" : ["foo", "bar", "テスト"],
343
"user" : user,
344
"author" : user,
345
}),
346
(Message.Url, "https://example.org/3.jpg", {
347
"category" : "test_category",
348
"subcategory": "test_subcategory",
349
"filename" : "3",
350
"extension" : "jpg",
351
"num" : 3,
352
"tags" : ["foo", "bar", "テスト"],
353
"user" : user,
354
"author" : user,
355
}),
356
])
357
358
def test_exception(self):
359
extr = TestExtractorException.from_url("test:exception")
360
tjob = self.jobclass(extr, file=io.StringIO())
361
tjob.run()
362
self.assertEqual(
363
tjob.data[-1],
364
(-1, {
365
"error" : "ZeroDivisionError",
366
"message": "division by zero",
367
})
368
)
369
370
def test_private(self):
371
config.set(("output",), "private", True)
372
extr = TestExtractor.from_url("test:")
373
tjob = self.jobclass(extr, file=io.StringIO())
374
375
tjob.run()
376
377
for i in range(1, 4):
378
self.assertEqual(
379
tjob.data[i][2]["_fallback"],
380
(f"https://example.org/alt/{i}.jpg",),
381
)
382
383
def test_sleep(self):
384
extr = TestExtractor.from_url("test:")
385
tjob = self.jobclass(extr, file=io.StringIO())
386
387
config.set((), "sleep-extractor", 123)
388
with patch("time.sleep") as sleep:
389
tjob.run()
390
sleep.assert_called_once_with(123)
391
392
config.set((), "sleep-extractor", 0)
393
with patch("time.sleep") as sleep:
394
tjob.run()
395
sleep.assert_not_called()
396
397
def test_ascii(self):
398
extr = TestExtractor.from_url("test:")
399
tjob = self.jobclass(extr)
400
401
tjob.file = buffer = io.StringIO()
402
tjob.run()
403
self.assertIn("""\
404
"tags": [
405
"foo",
406
"bar",
407
"\\u30c6\\u30b9\\u30c8"
408
],
409
""", buffer.getvalue())
410
411
tjob.file = buffer = io.StringIO()
412
tjob.ascii = False
413
tjob.run()
414
self.assertIn("""\
415
"tags": [
416
"foo",
417
"bar",
418
"テスト"
419
],
420
""", buffer.getvalue())
421
422
def test_num_string(self):
423
extr = TestExtractor.from_url("test:")
424
tjob = self.jobclass(extr, file=io.StringIO())
425
426
with patch("gallery_dl.util.number_to_string") as nts:
427
tjob.run()
428
self.assertEqual(len(nts.call_args_list), 0)
429
430
config.set(("output",), "num-to-str", True)
431
with patch("gallery_dl.util.number_to_string") as nts:
432
tjob.run()
433
self.assertEqual(len(nts.call_args_list), 72)
434
435
tjob.run()
436
self.assertEqual(tjob.data[-1][0], Message.Url)
437
self.assertEqual(tjob.data[-1][2]["num"], "3")
438
439
def test_jsonl(self):
440
extr = TestExtractor.from_url("test:")
441
tjob = self.jobclass(extr, file=io.StringIO())
442
with patch("gallery_dl.job.DataJob.out") as out:
443
tjob.run()
444
self.assertEqual(len(out.call_args_list), 0)
445
446
config.set(("output",), "jsonl", True)
447
extr = TestExtractor.from_url("test:")
448
file = io.StringIO()
449
tjob = self.jobclass(extr, file=file)
450
with patch("gallery_dl.job.DataJob.out") as out:
451
tjob.run()
452
self.assertEqual(len(out.call_args_list), 4)
453
454
tjob.run()
455
for line in file.getvalue().split():
456
self.assertRegex(line, r"""^\[[23],("http[^"]+",)?\{.+\}\]$""")
457
458
def test_opt_init(self):
459
config.set((), "init", True)
460
461
extr = TestExtractorNoop.from_url("test:noop")
462
tjob = self.jobclass(extr)
463
tjob._init()
464
465
def test_opt_follow(self):
466
config.set((), "follow", "{user[bio]!R}")
467
468
extr = TestExtractor.from_url("test:urls")
469
tjob = self.jobclass(extr, file=None)
470
tjob.run()
471
self.assertEqual(tjob.data_urls, [
472
"https://example.org/1.jpg",
473
"https://example.org/2.jpg",
474
"https://example.org/3.jpg",
475
"https://example1.org/content/abc",
476
"https://example2.org/content?query=123",
477
"https://example3.org/content/#frag"
478
])
479
480
def test_resolve(self):
481
extr = TestExtractorParent.from_url("test:parent:3")
482
tjob = self.jobclass(extr, file=None, resolve=0)
483
tjob.run()
484
self.assertEqual(len(tjob.data_urls), 3)
485
for url in tjob.data_urls:
486
self.assertEqual(url, "test:parent:2")
487
488
extr = TestExtractorParent.from_url("test:parent:3")
489
tjob = self.jobclass(extr, file=None, resolve=1)
490
tjob.run()
491
self.assertEqual(len(tjob.data_urls), 9)
492
for url in tjob.data_urls:
493
self.assertEqual(url, "test:parent:1")
494
495
extr = TestExtractorParent.from_url("test:parent")
496
tjob = self.jobclass(extr, file=None, resolve=64)
497
tjob.run()
498
self.assertEqual(len(tjob.data_urls), 9)
499
for url in tjob.data_urls:
500
self.assertRegex(url, r"^https://example.org/\d\.jpg$")
501
502
extr = TestExtractorParent.from_url("test:parent:1")
503
tjob = self.jobclass(extr, file=None, resolve=64)
504
tjob.run()
505
self.assertEqual(len(tjob.data_urls), 27)
506
507
extr = TestExtractorParent.from_url("test:parent:2")
508
tjob = self.jobclass(extr, file=None, resolve=64)
509
tjob.run()
510
self.assertEqual(len(tjob.data_urls), 81)
511
512
513
class TestExtractor(Extractor):
514
category = "test_category"
515
subcategory = "test_subcategory"
516
directory_fmt = ("{category}",)
517
filename_fmt = "test_{filename}.{extension}"
518
pattern = r"test:(child|self|urls)?$"
519
520
def __init__(self, match):
521
Extractor.__init__(self, match)
522
self.user = {"id": 123, "name": "test"}
523
if match[1] == "self":
524
self.user["self"] = self.user
525
elif match[1] == "urls":
526
self.user["bio"] = """
527
Site 1:
528
* https://example1.org/content/abc
529
Site 2:
530
* https://example2.org/content?query=123
531
532
<a href="https://example3.org/content/#frag">Site 3</a>
533
"""
534
535
def items(self):
536
root = "https://example.org"
537
user = self.user
538
539
yield Message.Directory, "", {
540
"user": user,
541
"author": user,
542
}
543
544
for i in range(1, 4):
545
url = f"{root}/{i}.jpg"
546
yield Message.Url, url, text.nameext_from_url(url, {
547
"num" : i,
548
"tags": ["foo", "bar", "テスト"],
549
"user": user,
550
"author": user,
551
"_fallback": (f"{root}/alt/{i}.jpg",),
552
})
553
554
555
class TestExtractorParent(Extractor):
556
category = "test_category"
557
subcategory = "test_subcategory_parent"
558
pattern = r"test:parent(:\d+)?"
559
560
def items(self):
561
level = self.groups[0]
562
if level in {None, ":0"}:
563
url = "test:child"
564
extr = TestExtractor
565
else:
566
url = f"test:parent:{int(level[1:])-1}"
567
extr = TestExtractorParent
568
569
for i in range(11, 14):
570
yield Message.Queue, url, {
571
"num" : i,
572
"tags": ["abc", "def"],
573
"_extractor": extr,
574
}
575
576
577
class TestExtractorException(Extractor):
578
category = "test_category"
579
subcategory = "test_subcategory_exception"
580
pattern = r"test:exception$"
581
582
def items(self):
583
return 1/0
584
585
586
class TestExtractorNoop(Extractor):
587
category = "test_category_alt"
588
subcategory = "test_subcategory"
589
pattern = r"test:noop"
590
591
592
if __name__ == "__main__":
593
unittest.main()
594
595