Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/test/test_job.py
5457 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
# Copyright 2021-2025 Mike Fährmann
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License version 2 as
8
# published by the Free Software Foundation.
9
10
import os
11
import sys
12
import unittest
13
from unittest.mock import patch
14
15
import io
16
17
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
18
from gallery_dl import job, config, text # noqa E402
19
from gallery_dl.extractor.common import Extractor, Message # noqa E402
20
21
22
class TestJob(unittest.TestCase):
23
24
def tearDown(self):
25
config.clear()
26
27
def _capture_stdout(self, extr_or_job):
28
if isinstance(extr_or_job, Extractor):
29
jobinstance = self.jobclass(extr_or_job)
30
else:
31
jobinstance = extr_or_job
32
33
with io.StringIO() as buffer:
34
stdout = sys.stdout
35
sys.stdout = buffer
36
try:
37
jobinstance.run()
38
finally:
39
sys.stdout = stdout
40
41
return buffer.getvalue()
42
43
44
class TestDownloadJob(TestJob):
45
jobclass = job.DownloadJob
46
47
def test_extractor_filter(self):
48
extr = TestExtractor.from_url("test:")
49
tjob = self.jobclass(extr)
50
51
func = tjob._build_extractor_filter()
52
self.assertEqual(func(TestExtractor) , False)
53
self.assertEqual(func(TestExtractorParent), False)
54
self.assertEqual(func(TestExtractorAlt) , True)
55
56
config.set((), "blacklist", ":test_subcategory")
57
func = tjob._build_extractor_filter()
58
self.assertEqual(func(TestExtractor) , False)
59
self.assertEqual(func(TestExtractorParent), True)
60
self.assertEqual(func(TestExtractorAlt) , False)
61
62
config.set((), "whitelist", "test_category:test_subcategory")
63
func = tjob._build_extractor_filter()
64
self.assertEqual(func(TestExtractor) , True)
65
self.assertEqual(func(TestExtractorParent), False)
66
self.assertEqual(func(TestExtractorAlt) , False)
67
68
69
class TestKeywordJob(TestJob):
70
jobclass = job.KeywordJob
71
72
def test_default(self):
73
self.maxDiff = None
74
extr = TestExtractor.from_url("test:self")
75
self.assertEqual(self._capture_stdout(extr), """\
76
Keywords for directory names:
77
-----------------------------
78
author['id']
79
123
80
author['name']
81
test
82
author['self']
83
<circular reference>
84
category
85
test_category
86
subcategory
87
test_subcategory
88
user['id']
89
123
90
user['name']
91
test
92
user['self']
93
<circular reference>
94
95
Keywords for filenames and --filter:
96
------------------------------------
97
author['id']
98
123
99
author['name']
100
test
101
author['self']
102
<circular reference>
103
category
104
test_category
105
extension
106
jpg
107
filename
108
1
109
num
110
1
111
subcategory
112
test_subcategory
113
tags[N]
114
0 foo
115
1 bar
116
2 テスト
117
user['id']
118
123
119
user['name']
120
test
121
user['self']
122
<circular reference>
123
""")
124
125
126
class TestUrlJob(TestJob):
127
jobclass = job.UrlJob
128
129
def test_default(self):
130
extr = TestExtractor.from_url("test:")
131
self.assertEqual(self._capture_stdout(extr), """\
132
https://example.org/1.jpg
133
https://example.org/2.jpg
134
https://example.org/3.jpg
135
""")
136
137
def test_fallback(self):
138
extr = TestExtractor.from_url("test:")
139
tjob = self.jobclass(extr)
140
tjob.handle_url = tjob.handle_url_fallback
141
142
self.assertEqual(self._capture_stdout(tjob), """\
143
https://example.org/1.jpg
144
| https://example.org/alt/1.jpg
145
https://example.org/2.jpg
146
| https://example.org/alt/2.jpg
147
https://example.org/3.jpg
148
| https://example.org/alt/3.jpg
149
""")
150
151
def test_parent(self):
152
extr = TestExtractorParent.from_url("test:parent")
153
self.assertEqual(self._capture_stdout(extr), """\
154
test:child
155
test:child
156
test:child
157
""")
158
159
def test_child(self):
160
extr = TestExtractorParent.from_url("test:parent")
161
tjob = job.UrlJob(extr, depth=0)
162
self.assertEqual(self._capture_stdout(tjob), 3 * """\
163
https://example.org/1.jpg
164
https://example.org/2.jpg
165
https://example.org/3.jpg
166
""")
167
168
169
class TestInfoJob(TestJob):
170
jobclass = job.InfoJob
171
172
def test_default(self):
173
extr = TestExtractor.from_url("test:")
174
self.assertEqual(self._capture_stdout(extr), """\
175
Category / Subcategory
176
"test_category" / "test_subcategory"
177
178
Filename format (default):
179
"test_{filename}.{extension}"
180
181
Directory format (default):
182
["{category}"]
183
184
""")
185
186
def test_custom(self):
187
config.set((), "filename", "custom")
188
config.set((), "directory", ("custom",))
189
config.set((), "sleep-request", 321)
190
extr = TestExtractor.from_url("test:")
191
extr.request_interval = 123.456
192
193
self.assertEqual(self._capture_stdout(extr), """\
194
Category / Subcategory
195
"test_category" / "test_subcategory"
196
197
Filename format (custom):
198
"custom"
199
Filename format (default):
200
"test_{filename}.{extension}"
201
202
Directory format (custom):
203
["custom"]
204
Directory format (default):
205
["{category}"]
206
207
Request interval (custom):
208
321
209
Request interval (default):
210
123.456
211
212
""")
213
214
def test_base_category(self):
215
extr = TestExtractor.from_url("test:")
216
extr.basecategory = "test_basecategory"
217
218
self.assertEqual(self._capture_stdout(extr), """\
219
Category / Subcategory / Basecategory
220
"test_category" / "test_subcategory" / "test_basecategory"
221
222
Filename format (default):
223
"test_{filename}.{extension}"
224
225
Directory format (default):
226
["{category}"]
227
228
""")
229
230
231
class TestDataJob(TestJob):
232
jobclass = job.DataJob
233
234
def test_default(self):
235
extr = TestExtractor.from_url("test:")
236
tjob = self.jobclass(extr, file=io.StringIO())
237
user = {"id": 123, "name": "test"}
238
239
tjob.run()
240
241
self.assertEqual(tjob.data, [
242
(Message.Directory, {
243
"category" : "test_category",
244
"subcategory": "test_subcategory",
245
"user" : user,
246
"author" : user,
247
}),
248
(Message.Url, "https://example.org/1.jpg", {
249
"category" : "test_category",
250
"subcategory": "test_subcategory",
251
"filename" : "1",
252
"extension" : "jpg",
253
"num" : 1,
254
"tags" : ["foo", "bar", "テスト"],
255
"user" : user,
256
"author" : user,
257
}),
258
(Message.Url, "https://example.org/2.jpg", {
259
"category" : "test_category",
260
"subcategory": "test_subcategory",
261
"filename" : "2",
262
"extension" : "jpg",
263
"num" : 2,
264
"tags" : ["foo", "bar", "テスト"],
265
"user" : user,
266
"author" : user,
267
}),
268
(Message.Url, "https://example.org/3.jpg", {
269
"category" : "test_category",
270
"subcategory": "test_subcategory",
271
"filename" : "3",
272
"extension" : "jpg",
273
"num" : 3,
274
"tags" : ["foo", "bar", "テスト"],
275
"user" : user,
276
"author" : user,
277
}),
278
])
279
280
def test_exception(self):
281
extr = TestExtractorException.from_url("test:exception")
282
tjob = self.jobclass(extr, file=io.StringIO())
283
tjob.run()
284
self.assertEqual(
285
tjob.data[-1],
286
(-1, {
287
"error" : "ZeroDivisionError",
288
"message": "division by zero",
289
})
290
)
291
292
def test_private(self):
293
config.set(("output",), "private", True)
294
extr = TestExtractor.from_url("test:")
295
tjob = self.jobclass(extr, file=io.StringIO())
296
297
tjob.run()
298
299
for i in range(1, 4):
300
self.assertEqual(
301
tjob.data[i][2]["_fallback"],
302
(f"https://example.org/alt/{i}.jpg",),
303
)
304
305
def test_sleep(self):
306
extr = TestExtractor.from_url("test:")
307
tjob = self.jobclass(extr, file=io.StringIO())
308
309
config.set((), "sleep-extractor", 123)
310
with patch("time.sleep") as sleep:
311
tjob.run()
312
sleep.assert_called_once_with(123)
313
314
config.set((), "sleep-extractor", 0)
315
with patch("time.sleep") as sleep:
316
tjob.run()
317
sleep.assert_not_called()
318
319
def test_ascii(self):
320
extr = TestExtractor.from_url("test:")
321
tjob = self.jobclass(extr)
322
323
tjob.file = buffer = io.StringIO()
324
tjob.run()
325
self.assertIn("""\
326
"tags": [
327
"foo",
328
"bar",
329
"\\u30c6\\u30b9\\u30c8"
330
],
331
""", buffer.getvalue())
332
333
tjob.file = buffer = io.StringIO()
334
tjob.ascii = False
335
tjob.run()
336
self.assertIn("""\
337
"tags": [
338
"foo",
339
"bar",
340
"テスト"
341
],
342
""", buffer.getvalue())
343
344
def test_num_string(self):
345
extr = TestExtractor.from_url("test:")
346
tjob = self.jobclass(extr, file=io.StringIO())
347
348
with patch("gallery_dl.util.number_to_string") as nts:
349
tjob.run()
350
self.assertEqual(len(nts.call_args_list), 0)
351
352
config.set(("output",), "num-to-str", True)
353
with patch("gallery_dl.util.number_to_string") as nts:
354
tjob.run()
355
self.assertEqual(len(nts.call_args_list), 72)
356
357
tjob.run()
358
self.assertEqual(tjob.data[-1][0], Message.Url)
359
self.assertEqual(tjob.data[-1][2]["num"], "3")
360
361
362
class TestExtractor(Extractor):
363
category = "test_category"
364
subcategory = "test_subcategory"
365
directory_fmt = ("{category}",)
366
filename_fmt = "test_{filename}.{extension}"
367
pattern = r"test:(child|self)?$"
368
369
def __init__(self, match):
370
Extractor.__init__(self, match)
371
self.user = {"id": 123, "name": "test"}
372
if match[1] == "self":
373
self.user["self"] = self.user
374
375
def items(self):
376
root = "https://example.org"
377
user = self.user
378
379
yield Message.Directory, {
380
"user": user,
381
"author": user,
382
}
383
384
for i in range(1, 4):
385
url = f"{root}/{i}.jpg"
386
yield Message.Url, url, text.nameext_from_url(url, {
387
"num" : i,
388
"tags": ["foo", "bar", "テスト"],
389
"user": user,
390
"author": user,
391
"_fallback": (f"{root}/alt/{i}.jpg",),
392
})
393
394
395
class TestExtractorParent(Extractor):
396
category = "test_category"
397
subcategory = "test_subcategory_parent"
398
pattern = r"test:parent"
399
400
def items(self):
401
url = "test:child"
402
403
for i in range(11, 14):
404
yield Message.Queue, url, {
405
"num" : i,
406
"tags": ["abc", "def"],
407
"_extractor": TestExtractor,
408
}
409
410
411
class TestExtractorException(Extractor):
412
category = "test_category"
413
subcategory = "test_subcategory_exception"
414
pattern = r"test:exception$"
415
416
def items(self):
417
return 1/0
418
419
420
class TestExtractorAlt(Extractor):
421
category = "test_category_alt"
422
subcategory = "test_subcategory"
423
424
425
if __name__ == "__main__":
426
unittest.main()
427
428