Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/test/test_postprocessor.py
8753 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
# Copyright 2019-2025 Mike Fährmann
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License version 2 as
8
# published by the Free Software Foundation.
9
10
import os
11
import sys
12
import unittest
13
from unittest.mock import Mock, mock_open, patch, call
14
15
import shutil
16
import logging
17
import zipfile
18
import tempfile
19
import collections
20
from datetime import datetime
21
22
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
23
from gallery_dl import extractor, output, path, util, exception # noqa E402
24
from gallery_dl import postprocessor, config, archive, dt # noqa E402
25
from gallery_dl.postprocessor.common import PostProcessor # noqa E402
26
27
28
class MockPostprocessorModule(Mock):
29
__postprocessor__ = "mock"
30
31
32
class FakeJob():
33
34
def __init__(self, extr=extractor.find("generic:https://example.org/")):
35
extr.directory_fmt = ("{category}",)
36
self.extractor = extr
37
self.pathfmt = path.PathFormat(extr)
38
self.out = output.NullOutput()
39
self.get_logger = logging.getLogger
40
self.hooks = collections.defaultdict(list)
41
42
def register_hooks(self, hooks, options=None):
43
for hook, callback in hooks.items():
44
self.hooks[hook].append(callback)
45
46
47
class TestPostprocessorModule(unittest.TestCase):
48
49
def setUp(self):
50
postprocessor._cache.clear()
51
52
def test_find(self):
53
for name in (postprocessor.modules):
54
cls = postprocessor.find(name)
55
self.assertEqual(cls.__name__, f"{name.capitalize()}PP")
56
self.assertIs(cls.__base__, PostProcessor)
57
58
self.assertEqual(postprocessor.find("foo"), None)
59
self.assertEqual(postprocessor.find(1234) , None)
60
self.assertEqual(postprocessor.find(None) , None)
61
62
@patch("builtins.__import__")
63
def test_cache(self, import_module):
64
import_module.return_value = MockPostprocessorModule()
65
66
for name in (postprocessor.modules):
67
postprocessor.find(name)
68
self.assertEqual(import_module.call_count, len(postprocessor.modules))
69
70
# no new calls to import_module
71
for name in (postprocessor.modules):
72
postprocessor.find(name)
73
self.assertEqual(import_module.call_count, len(postprocessor.modules))
74
75
76
class BasePostprocessorTest(unittest.TestCase):
77
78
@classmethod
79
def setUpClass(cls):
80
cls.dir = tempfile.TemporaryDirectory()
81
config.clear()
82
config.set((), "base-directory", cls.dir.name)
83
cls.job = FakeJob()
84
85
@classmethod
86
def tearDownClass(cls):
87
cls.dir.cleanup()
88
config.clear()
89
90
def tearDown(self):
91
self.job.hooks.clear()
92
93
def _create(self, options=None, data=None):
94
kwdict = {"category": "test", "filename": "file", "extension": "ext"}
95
if options is None:
96
options = {}
97
if data is not None:
98
kwdict.update(data)
99
100
self.pathfmt = self.job.pathfmt
101
self.pathfmt.set_directory(kwdict)
102
self.pathfmt.set_filename(kwdict)
103
self.pathfmt.build_path()
104
105
pp = postprocessor.find(self.__class__.__name__[:-4].lower())
106
return pp(self.job, options)
107
108
def _trigger(self, events=None):
109
for event in (events or ("prepare", "file")):
110
for callback in self.job.hooks[event]:
111
callback(self.pathfmt)
112
113
114
class ClassifyTest(BasePostprocessorTest):
115
116
def test_classify_default(self):
117
pp = self._create()
118
119
self.assertEqual(pp.mapping, {
120
ext: directory
121
for directory, exts in pp.DEFAULT_MAPPING.items()
122
for ext in exts
123
})
124
125
self.assertEqual(pp.directory, "")
126
self._trigger(("post",))
127
self.assertEqual(pp.directory, self.pathfmt.directory)
128
129
self.pathfmt.set_extension("jpg")
130
self._trigger(("prepare",))
131
self.pathfmt.build_path()
132
path = os.path.join(self.dir.name, "test", "Pictures")
133
self.assertEqual(self.pathfmt.path, f"{path}/file.jpg")
134
self.assertEqual(self.pathfmt.realpath, f"{path}/file.jpg")
135
136
self.pathfmt.set_extension("mp4")
137
self._trigger(("prepare",))
138
self.pathfmt.build_path()
139
path = os.path.join(self.dir.name, "test", "Video")
140
self.assertEqual(self.pathfmt.path, f"{path}/file.mp4")
141
self.assertEqual(self.pathfmt.realpath, f"{path}/file.mp4")
142
143
def test_classify_noop(self):
144
pp = self._create()
145
rp = self.pathfmt.realpath
146
147
self.assertEqual(pp.directory, "")
148
self._trigger(("post",))
149
self._trigger(("prepare",))
150
151
self.assertEqual(pp.directory, self.pathfmt.directory)
152
self.assertEqual(self.pathfmt.path, rp)
153
self.assertEqual(self.pathfmt.realpath, rp)
154
155
def test_classify_custom(self):
156
pp = self._create({"mapping": {
157
"foo/bar": ["foo", "bar"],
158
}})
159
160
self.assertEqual(pp.mapping, {
161
"foo": "foo/bar",
162
"bar": "foo/bar",
163
})
164
165
self.assertEqual(pp.directory, "")
166
self._trigger(("post",))
167
self.assertEqual(pp.directory, self.pathfmt.directory)
168
169
self.pathfmt.set_extension("foo")
170
self._trigger(("prepare",))
171
self.pathfmt.build_path()
172
path = os.path.join(self.dir.name, "test", "foo", "bar")
173
self.assertEqual(self.pathfmt.path, f"{path}/file.foo")
174
self.assertEqual(self.pathfmt.realpath, f"{path}/file.foo")
175
176
177
class DirectoryTest(BasePostprocessorTest):
178
179
def test_default(self):
180
self._create()
181
182
path = os.path.join(self.dir.name, "test")
183
self.assertEqual(self.pathfmt.realdirectory, f"{path}/")
184
self.assertEqual(self.pathfmt.realpath, f"{path}/file.ext")
185
186
self.pathfmt.kwdict["category"] = "custom"
187
self._trigger()
188
189
path = os.path.join(self.dir.name, "custom")
190
self.assertEqual(self.pathfmt.realdirectory, f"{path}/")
191
self.pathfmt.build_path()
192
self.assertEqual(self.pathfmt.realpath, f"{path}/file.ext")
193
194
195
class ExecTest(BasePostprocessorTest):
196
197
def test_command_string(self):
198
self._create({
199
"command": "echo {} {_path} {_temppath} {_directory} {_filename} "
200
"&& rm {};",
201
})
202
203
with patch("gallery_dl.util.Popen") as p:
204
i = Mock()
205
i.wait.return_value = 0
206
p.return_value = i
207
self._trigger(("after",))
208
209
p.assert_called_once_with(
210
(f"echo "
211
f"{self.pathfmt.realpath} "
212
f"{self.pathfmt.realpath} "
213
f"{self.pathfmt.temppath} "
214
f"{self.pathfmt.realdirectory} "
215
f"{self.pathfmt.filename} "
216
f"&& rm {self.pathfmt.realpath};"),
217
shell=True,
218
creationflags=0,
219
start_new_session=False,
220
)
221
i.wait.assert_called_once_with()
222
223
def test_command_list(self):
224
self._create({
225
"command": ["~/script.sh", "{category}",
226
"\fE _directory.upper()"],
227
})
228
229
with patch("gallery_dl.util.Popen") as p:
230
i = Mock()
231
i.wait.return_value = 0
232
p.return_value = i
233
self._trigger(("after",))
234
235
p.assert_called_once_with(
236
[
237
os.path.expanduser("~/script.sh"),
238
self.pathfmt.kwdict["category"],
239
self.pathfmt.realdirectory.upper(),
240
],
241
shell=False,
242
creationflags=0,
243
start_new_session=False,
244
)
245
246
def test_command_many(self):
247
self._create({
248
"commands": [
249
"echo {} {_path} {_temppath} {_directory} {_filename} "
250
"&& rm {};",
251
["~/script.sh", "{category}", "\fE _directory.upper()"],
252
]
253
})
254
255
with patch("gallery_dl.util.Popen") as p:
256
i = Mock()
257
i.wait.return_value = 0
258
p.return_value = i
259
self._trigger(("after",))
260
261
self.assertEqual(p.call_args_list, [
262
call(
263
(f"echo "
264
f"{self.pathfmt.realpath} "
265
f"{self.pathfmt.realpath} "
266
f"{self.pathfmt.temppath} "
267
f"{self.pathfmt.realdirectory} "
268
f"{self.pathfmt.filename} "
269
f"&& rm {self.pathfmt.realpath};"),
270
shell=True,
271
creationflags=0,
272
start_new_session=False,
273
),
274
call(
275
[
276
os.path.expanduser("~/script.sh"),
277
self.pathfmt.kwdict["category"],
278
self.pathfmt.realdirectory.upper(),
279
],
280
shell=False,
281
creationflags=0,
282
start_new_session=False,
283
),
284
])
285
286
def test_command_returncode(self):
287
self._create({
288
"command": "echo {}",
289
})
290
291
with patch("gallery_dl.util.Popen") as p:
292
i = Mock()
293
i.wait.return_value = 123
294
p.return_value = i
295
296
with self.assertLogs() as log:
297
self._trigger(("after",))
298
299
msg = (f"WARNING:postprocessor.exec:"
300
f"'echo {self.pathfmt.realpath}' "
301
f"returned with non-zero exit status (123)")
302
self.assertEqual(log.output[0], msg)
303
304
def test_async(self):
305
self._create({
306
"async" : True,
307
"command": "echo {}",
308
})
309
310
with patch("gallery_dl.util.Popen") as p:
311
i = Mock()
312
p.return_value = i
313
self._trigger(("after",))
314
315
self.assertTrue(p.called)
316
self.assertFalse(i.wait.called)
317
318
@unittest.skipIf(util.WINDOWS, "not POSIX")
319
def test_session_posix(self):
320
self._create({
321
"session": True,
322
"command": ["echo", "foobar"],
323
})
324
325
with patch("gallery_dl.util.Popen") as p:
326
i = Mock()
327
i.wait.return_value = 0
328
p.return_value = i
329
self._trigger(("after",))
330
331
p.assert_called_once_with(
332
["echo", "foobar"],
333
shell=False,
334
creationflags=0,
335
start_new_session=True,
336
)
337
i.wait.assert_called_once_with()
338
339
@unittest.skipIf(not util.WINDOWS, "not Windows")
340
def test_session_windows(self):
341
self._create({
342
"session": True,
343
"command": ["echo", "foobar"],
344
})
345
346
with patch("gallery_dl.util.Popen") as p:
347
i = Mock()
348
i.wait.return_value = 0
349
p.return_value = i
350
self._trigger(("after",))
351
352
import subprocess
353
p.assert_called_once_with(
354
["echo", "foobar"],
355
shell=False,
356
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
357
start_new_session=False,
358
)
359
i.wait.assert_called_once_with()
360
361
def test_archive(self):
362
pp = self._create({
363
"command": ["echo", "foobar"],
364
"archive": ":memory:",
365
"event" : "finalize",
366
})
367
368
self.assertIsInstance(pp.archive, archive.DownloadArchive)
369
370
with patch.object(pp.archive, "add") as m_aa, \
371
patch.object(pp.archive, "close") as m_ac:
372
self._trigger(("finalize",))
373
pp.archive.close()
374
375
m_aa.assert_called_once_with(self.pathfmt.kwdict)
376
m_ac.assert_called_once()
377
378
def test_verbose_string(self):
379
self._create({
380
"command": "echo foo bar",
381
"verbose": False,
382
})
383
384
with patch("gallery_dl.util.Popen") as p, \
385
self.assertLogs(level=10) as log_info:
386
i = Mock()
387
i.wait.return_value = 123
388
p.return_value = i
389
self._trigger(("after",))
390
391
msg = "DEBUG:postprocessor.exec:Running 'echo'"
392
self.assertEqual(log_info.output[0], msg)
393
self.assertIn("'echo' returned with non-zero ", log_info.output[1])
394
395
def test_verbose_list(self):
396
self._create({
397
"command": ["echo", "foo", "bar"],
398
"verbose": False,
399
})
400
401
with patch("gallery_dl.util.Popen") as p, \
402
self.assertLogs(level=10) as log_info:
403
i = Mock()
404
i.wait.return_value = 123
405
p.return_value = i
406
self._trigger(("after",))
407
408
msg = "DEBUG:postprocessor.exec:Running 'echo'"
409
self.assertEqual(log_info.output[0], msg)
410
self.assertIn("'echo' returned with non-zero ", log_info.output[1])
411
412
413
class HashTest(BasePostprocessorTest):
414
415
def test_default(self):
416
self._create({})
417
418
with self.pathfmt.open() as fp:
419
fp.write(b"Foo Bar\n")
420
421
self._trigger()
422
423
kwdict = self.pathfmt.kwdict
424
self.assertEqual(
425
"35c9c9c7c90ad764bae9e2623f522c24", kwdict["md5"], "md5")
426
self.assertEqual(
427
"14d3d804494ef4e57d72de63e4cfee761240471a", kwdict["sha1"], "sha1")
428
429
def test_custom_hashes(self):
430
self._create({"hashes": "sha256:a,sha512:b"})
431
432
with self.pathfmt.open() as fp:
433
fp.write(b"Foo Bar\n")
434
435
self._trigger()
436
437
kwdict = self.pathfmt.kwdict
438
self.assertEqual(
439
"4775b55be17206445d7015a5fc7656f38a74b880670523c3b175455f885f2395",
440
kwdict["a"], "sha256")
441
self.assertEqual(
442
"6028f9e6957f4ca929941318c4bba6258713fd5162f9e33bd10e1c456d252700"
443
"3e1095b50736c4fd1e2deea152e3c8ecd5993462a747208e4d842659935a1c62",
444
kwdict["b"], "sha512")
445
446
def test_custom_hashes_dict(self):
447
self._create({"hashes": {"a": "sha256", "b": "sha512"}})
448
449
with self.pathfmt.open() as fp:
450
fp.write(b"Foo Bar\n")
451
452
self._trigger()
453
454
kwdict = self.pathfmt.kwdict
455
self.assertEqual(
456
"4775b55be17206445d7015a5fc7656f38a74b880670523c3b175455f885f2395",
457
kwdict["a"], "sha256")
458
self.assertEqual(
459
"6028f9e6957f4ca929941318c4bba6258713fd5162f9e33bd10e1c456d252700"
460
"3e1095b50736c4fd1e2deea152e3c8ecd5993462a747208e4d842659935a1c62",
461
kwdict["b"], "sha512")
462
463
464
class MetadataTest(BasePostprocessorTest):
465
466
def test_metadata_default(self):
467
pp = self._create()
468
469
# default arguments
470
self.assertEqual(pp.write , pp._write_json)
471
self.assertEqual(pp.extension, "json")
472
self.assertTrue(callable(pp._json_encode))
473
474
def test_metadata_json(self):
475
pp = self._create({
476
"mode" : "json",
477
"extension" : "JSON",
478
}, {
479
"public" : "hello ワールド",
480
"_private" : "foo バー",
481
})
482
483
self.assertEqual(pp.write , pp._write_json)
484
self.assertEqual(pp.extension, "JSON")
485
self.assertTrue(callable(pp._json_encode))
486
487
with patch("builtins.open", mock_open()) as m:
488
self._trigger()
489
490
path = f"{self.pathfmt.realpath}.JSON"
491
m.assert_called_once_with(path, "w", encoding="utf-8", newline=None)
492
493
self.assertEqual(self._output(m), """{
494
"category": "test",
495
"filename": "file",
496
"extension": "ext",
497
"public": "hello ワールド"
498
}
499
""")
500
501
def test_metadata_json_options(self):
502
pp = self._create({
503
"mode" : "json",
504
"ascii" : True,
505
"sort" : True,
506
"separators": [",", " : "],
507
"private" : True,
508
"indent" : None,
509
"open" : "a",
510
"encoding" : "UTF-8",
511
"newline" : "\r\n",
512
"extension" : "JSON",
513
}, {
514
"public" : "hello ワールド",
515
"_private" : "foo バー",
516
})
517
518
self.assertEqual(pp.write , pp._write_json)
519
self.assertEqual(pp.extension, "JSON")
520
self.assertTrue(callable(pp._json_encode))
521
522
with patch("builtins.open", mock_open()) as m:
523
self._trigger()
524
525
path = f"{self.pathfmt.realpath}.JSON"
526
m.assert_called_once_with(path, "a", encoding="UTF-8", newline='\r\n')
527
# Since we mocked the call to open,
528
# we don't actually see the effect of setting newline.
529
self.assertEqual(self._output(m), """{\
530
"_private" : "foo \\u30d0\\u30fc",\
531
"category" : "test",\
532
"extension" : "ext",\
533
"filename" : "file",\
534
"public" : "hello \\u30ef\\u30fc\\u30eb\\u30c9"}
535
""")
536
537
def test_metadata_tags(self):
538
pp = self._create(
539
{"mode": "tags"},
540
{"tags": ["foo", "bar", "baz"]},
541
)
542
self.assertEqual(pp.write, pp._write_tags)
543
self.assertEqual(pp.extension, "txt")
544
545
with patch("builtins.open", mock_open()) as m:
546
self._trigger()
547
548
path = f"{self.pathfmt.realpath}.txt"
549
m.assert_called_once_with(path, "w", encoding="utf-8", newline=None)
550
self.assertEqual(self._output(m), "foo\nbar\nbaz\n")
551
552
def test_metadata_tags_split_1(self):
553
self._create(
554
{"mode": "tags"},
555
{"tags": "foo, bar, baz"},
556
)
557
with patch("builtins.open", mock_open()) as m:
558
self._trigger()
559
self.assertEqual(self._output(m), "foo\nbar\nbaz\n")
560
561
def test_metadata_tags_split_2(self):
562
self._create(
563
{"mode": "tags"},
564
{"tags": "foobar1 foobar2 foobarbaz"},
565
)
566
with patch("builtins.open", mock_open()) as m:
567
self._trigger()
568
self.assertEqual(self._output(m), "foobar1\nfoobar2\nfoobarbaz\n")
569
570
def test_metadata_tags_tagstring(self):
571
self._create(
572
{"mode": "tags"},
573
{"tag_string": "foo, bar, baz"},
574
)
575
with patch("builtins.open", mock_open()) as m:
576
self._trigger()
577
self.assertEqual(self._output(m), "foo\nbar\nbaz\n")
578
579
def test_metadata_tags_dict(self):
580
self._create(
581
{"mode": "tags"},
582
{"tags": {"g": ["foobar1", "foobar2"], "m": ["foobarbaz"]}},
583
)
584
with patch("builtins.open", mock_open()) as m:
585
self._trigger()
586
self.assertEqual(self._output(m), "foobar1\nfoobar2\nfoobarbaz\n")
587
588
def test_metadata_tags_list_of_dict(self):
589
self._create(
590
{"mode": "tags"},
591
{"tags": [
592
{"g": "foobar1", "m": "foobar2", "u": True},
593
{"g": None, "m": "foobarbaz", "u": [3, 4]},
594
]},
595
)
596
with patch("builtins.open", mock_open()) as m:
597
self._trigger()
598
self.assertEqual(self._output(m), "foobar1\nfoobar2\nfoobarbaz\n")
599
600
def test_metadata_custom(self):
601
def test(pp_info):
602
pp = self._create(pp_info, {"foo": "bar"})
603
self.assertEqual(pp.write, pp._write_custom)
604
self.assertEqual(pp.extension, "txt")
605
self.assertTrue(pp._content_fmt)
606
607
with patch("builtins.open", mock_open()) as m:
608
self._trigger()
609
self.assertEqual(self._output(m), "bar\nNone\n")
610
self.job.hooks.clear()
611
612
test({"mode": "custom", "content-format": "{foo}\n{missing}\n"})
613
test({"mode": "custom", "content-format": ["{foo}", "{missing}"]})
614
test({"mode": "custom", "format": "{foo}\n{missing}\n"})
615
test({"format": "{foo}\n{missing}\n"})
616
617
def test_metadata_mode_print(self):
618
self._create(
619
{"mode": "print", "format": "{foo}\n{missing}"},
620
{"foo": "bar"},
621
)
622
623
with patch("sys.stdout", Mock()) as m:
624
self._trigger()
625
626
self.assertEqual(self._output(m), "bar\nNone\n")
627
628
def test_metadata_extfmt(self):
629
pp = self._create({
630
"extension" : "ignored",
631
"extension-format": "json",
632
})
633
634
self.assertEqual(pp._filename, pp._filename_extfmt)
635
636
with patch("builtins.open", mock_open()) as m:
637
self._trigger()
638
639
path = f"{self.pathfmt.realdirectory}file.json"
640
m.assert_called_once_with(path, "w", encoding="utf-8", newline=None)
641
642
def test_metadata_extfmt_2(self):
643
self._create({
644
"extension-format": "{extension!u}-data:{category:Res/ES/}",
645
})
646
647
self.pathfmt.prefix = "2."
648
with patch("builtins.open", mock_open()) as m:
649
self._trigger()
650
651
path = f"{self.pathfmt.realdirectory}file.2.EXT-data:tESt"
652
m.assert_called_once_with(path, "w", encoding="utf-8", newline=None)
653
654
def test_metadata_directory(self):
655
self._create({
656
"directory": "metadata",
657
})
658
659
with patch("builtins.open", mock_open()) as m:
660
self._trigger()
661
662
path = f"{self.pathfmt.realdirectory}metadata/file.ext.json"
663
m.assert_called_once_with(path, "w", encoding="utf-8", newline=None)
664
665
def test_metadata_directory_2(self):
666
self._create({
667
"directory" : "metadata////",
668
"extension-format": "json",
669
})
670
671
with patch("builtins.open", mock_open()) as m:
672
self._trigger()
673
674
path = f"{self.pathfmt.realdirectory}metadata/file.json"
675
m.assert_called_once_with(path, "w", encoding="utf-8", newline=None)
676
677
def test_metadata_directory_format(self):
678
self._create(
679
{"directory": ["..", "json", "\fE str(id // 500 * 500 + 500)"]},
680
{"id": 12345},
681
)
682
683
with patch("builtins.open", mock_open()) as m:
684
self._trigger()
685
686
path = f"{self.pathfmt.realdirectory}../json/12500/file.ext.json"
687
m.assert_called_once_with(path, "w", encoding="utf-8", newline=None)
688
689
def test_metadata_directory_empty(self):
690
self._create(
691
{"directory": []},
692
)
693
694
with patch("builtins.open", mock_open()) as m:
695
self._trigger()
696
697
path = f"{self.pathfmt.realdirectory}./file.ext.json"
698
m.assert_called_once_with(path, "w", encoding="utf-8", newline=None)
699
700
def test_metadata_basedirectory(self):
701
self._create({"base-directory": True})
702
703
with patch("builtins.open", mock_open()) as m:
704
self._trigger()
705
706
path = f"{self.pathfmt.basedirectory}file.ext.json"
707
m.assert_called_once_with(path, "w", encoding="utf-8", newline=None)
708
709
def test_metadata_basedirectory_custom(self):
710
self._create({
711
"base-directory": "/home/test",
712
"directory": "meta",
713
})
714
715
with patch("builtins.open", mock_open()) as m:
716
self._trigger()
717
718
path = "/home/test/meta/file.ext.json"
719
m.assert_called_once_with(path, "w", encoding="utf-8", newline=None)
720
721
def test_metadata_filename(self):
722
self._create({
723
"filename" : "{category}_{filename}_/meta/\n\r.data",
724
"extension-format": "json",
725
})
726
727
with patch("builtins.open", mock_open()) as m:
728
self._trigger()
729
730
path = f"{self.pathfmt.realdirectory}test_file__meta_.data"
731
m.assert_called_once_with(path, "w", encoding="utf-8", newline=None)
732
733
def test_metadata_meta_path(self):
734
self._create({
735
"metadata-path": "_meta_path",
736
})
737
738
self._trigger()
739
740
self.assertEqual(self.pathfmt.kwdict["_meta_path"],
741
f"{self.pathfmt.realpath}.json")
742
743
def test_metadata_stdout(self):
744
self._create({"filename": "-", "indent": None, "sort": True})
745
746
with patch("sys.stdout", Mock()) as m:
747
self._trigger()
748
749
self.assertEqual(self._output(m), """\
750
{"category": "test", "extension": "ext", "filename": "file"}
751
""")
752
753
def test_metadata_modify(self):
754
kwdict = {"foo": 0, "bar": {"bax": 1, "bay": 2, "baz": 3, "ba2": {}}}
755
self._create({
756
"mode": "modify",
757
"fields": {
758
"foo" : "{filename}-{foo!s}",
759
"foo2" : "\fE bar['bax'] + 122",
760
"bar[\"baz\"]" : "{_now}",
761
"bar['ba2'][a]": "test",
762
},
763
}, kwdict)
764
765
pdict = self.pathfmt.kwdict
766
self.assertIsNot(kwdict, pdict)
767
self.assertEqual(pdict["foo"], kwdict["foo"])
768
self.assertEqual(pdict["bar"], kwdict["bar"])
769
770
self._trigger()
771
772
self.assertEqual(pdict["foo"] , "file-0")
773
self.assertEqual(pdict["foo2"], 123)
774
self.assertEqual(pdict["bar"]["ba2"]["a"], "test")
775
self.assertIsInstance(pdict["bar"]["baz"], datetime)
776
777
def test_metadata_delete(self):
778
kwdict = {
779
"foo": 0,
780
"bar": {
781
"bax": 1,
782
"bay": 2,
783
"baz": {"a": 3, "b": 4},
784
},
785
}
786
self._create({
787
"mode": "delete",
788
"fields": ["foo", "bar['bax']", "bar[\"baz\"][a]"],
789
}, kwdict)
790
791
pdict = self.pathfmt.kwdict
792
self.assertIsNot(kwdict, pdict)
793
794
self.assertEqual(pdict["foo"], kwdict["foo"])
795
self.assertEqual(pdict["bar"], kwdict["bar"])
796
797
self._trigger()
798
799
self.assertNotIn("foo", pdict)
800
self.assertNotIn("bax", pdict["bar"])
801
self.assertNotIn("a", pdict["bar"]["baz"])
802
803
# no errors for deleted/undefined fields
804
self._trigger()
805
self.assertNotIn("foo", pdict)
806
self.assertNotIn("bax", pdict["bar"])
807
self.assertNotIn("a", pdict["bar"]["baz"])
808
809
def test_metadata_option_skip(self):
810
self._create({"skip": True})
811
812
with patch("builtins.open", mock_open()) as m, \
813
patch("os.path.exists") as e:
814
e.return_value = True
815
self._trigger()
816
817
self.assertTrue(e.called)
818
self.assertTrue(not m.called)
819
self.assertTrue(not len(self._output(m)))
820
821
with patch("builtins.open", mock_open()) as m, \
822
patch("os.path.exists") as e:
823
e.return_value = False
824
self._trigger()
825
826
self.assertTrue(e.called)
827
self.assertTrue(m.called)
828
self.assertGreater(len(self._output(m)), 0)
829
830
path = f"{self.pathfmt.realdirectory}file.ext.json"
831
m.assert_called_once_with(path, "w", encoding="utf-8", newline=None)
832
833
def test_metadata_option_skip_false(self):
834
self._create({"skip": False})
835
836
with patch("builtins.open", mock_open()) as m, \
837
patch("os.path.exists") as e:
838
self._trigger()
839
840
self.assertTrue(not e.called)
841
self.assertTrue(m.called)
842
843
def test_metadata_option_newline(self):
844
self._create({
845
"newline": "\r\n",
846
"filename" : "data.json",
847
"directory" : "",
848
"base-directory": self.dir.name,
849
})
850
851
self._trigger()
852
853
path = os.path.join(self.dir.name, "data.json")
854
with open(path, newline="") as fp:
855
content = fp.read()
856
857
self.assertEqual(content, """\
858
{\r\n\
859
"category": "test",\r\n\
860
"filename": "file",\r\n\
861
"extension": "ext"\r\n\
862
}\r\n\
863
""")
864
865
def test_metadata_option_include(self):
866
self._create(
867
{"include": ["_private", "filename", "foo"], "sort": True},
868
{"public": "hello ワールド", "_private": "foo バー"},
869
)
870
871
with patch("builtins.open", mock_open()) as m:
872
self._trigger()
873
874
self.assertEqual(self._output(m), """{
875
"_private": "foo バー",
876
"filename": "file"
877
}
878
""")
879
880
def test_metadata_option_exclude(self):
881
self._create(
882
{"exclude": ["category", "filename", "foo"], "sort": True},
883
{"public": "hello ワールド", "_private": "foo バー"},
884
)
885
886
with patch("builtins.open", mock_open()) as m:
887
self._trigger()
888
889
self.assertEqual(self._output(m), """{
890
"extension": "ext",
891
"public": "hello ワールド"
892
}
893
""")
894
895
def test_archive(self):
896
pp = self._create({
897
"archive": ":memory:",
898
"event" : "finalize",
899
})
900
901
self.assertIsInstance(pp.archive, archive.DownloadArchive)
902
903
with patch.object(pp.archive, "add") as m_aa, \
904
patch.object(pp.archive, "close") as m_ac:
905
self._trigger(("finalize",))
906
pp.archive.close()
907
908
m_aa.assert_called_once_with(self.pathfmt.kwdict)
909
m_ac.assert_called_once()
910
911
def _output(self, mock):
912
return "".join(
913
call[1][0]
914
for call in mock.mock_calls
915
if call[0].endswith("write")
916
)
917
918
919
class MtimeTest(BasePostprocessorTest):
920
921
def test_mtime_datetime(self):
922
self._create(None, {"date": datetime(1980, 1, 1)})
923
self._trigger()
924
self.assertEqual(self.pathfmt.kwdict["_mtime_meta"], 315532800)
925
926
def test_mtime_timestamp(self):
927
self._create(None, {"date": 315532800})
928
self._trigger()
929
self.assertEqual(self.pathfmt.kwdict["_mtime_meta"], 315532800)
930
931
def test_mtime_none(self):
932
self._create(None, {"date": None})
933
self._trigger()
934
self.assertFalse(self.pathfmt.kwdict["_mtime_meta"])
935
936
def test_mtime_none_dt(self):
937
self._create(None, {"date": dt.NONE})
938
self._trigger()
939
self.assertFalse(self.pathfmt.kwdict["_mtime_meta"])
940
941
def test_mtime_undefined(self):
942
self._create(None, {})
943
self._trigger()
944
self.assertFalse(self.pathfmt.kwdict["_mtime_meta"])
945
946
def test_mtime_invalid(self):
947
self._create(None, {"date": "foobar"})
948
self._trigger()
949
self.assertFalse(self.pathfmt.kwdict["_mtime_meta"])
950
951
def test_mtime_key(self):
952
self._create({"key": "foo"}, {"foo": 315532800})
953
self._trigger()
954
self.assertEqual(self.pathfmt.kwdict["_mtime_meta"], 315532800)
955
956
def test_mtime_value(self):
957
self._create({"value": "{foo}"}, {"foo": 315532800})
958
self._trigger()
959
self.assertEqual(self.pathfmt.kwdict["_mtime_meta"], 315532800)
960
961
962
class PythonTest(BasePostprocessorTest):
963
964
def test_module(self):
965
path = os.path.join(self.dir.name, "module.py")
966
self._write_module(path)
967
968
sys.path.insert(0, self.dir.name)
969
try:
970
self._create({"function": "module:calc"}, {"_value": 123})
971
finally:
972
del sys.path[0]
973
974
self.assertNotIn("_result", self.pathfmt.kwdict)
975
self._trigger()
976
self.assertEqual(self.pathfmt.kwdict["_result"], 246)
977
978
def test_path(self):
979
path = os.path.join(self.dir.name, "module.py")
980
self._write_module(path)
981
982
self._create({"function": f"{path}:calc"}, {"_value": 12})
983
984
self.assertNotIn("_result", self.pathfmt.kwdict)
985
self._trigger()
986
self.assertEqual(self.pathfmt.kwdict["_result"], 24)
987
988
def test_eval(self):
989
self._create({"mode": "eval", "expression": "abort()"})
990
991
with self.assertRaises(exception.StopExtraction):
992
self._trigger()
993
994
def test_eval_auto(self):
995
self._create({"expression": "abort()"})
996
997
with self.assertRaises(exception.StopExtraction):
998
self._trigger()
999
1000
def test_archive(self):
1001
pp = self._create({
1002
"expression": "True",
1003
"archive" : ":memory:",
1004
"event" : "finalize",
1005
})
1006
1007
self.assertIsInstance(pp.archive, archive.DownloadArchive)
1008
1009
with patch.object(pp.archive, "add") as m_aa, \
1010
patch.object(pp.archive, "close") as m_ac:
1011
self._trigger(("finalize",))
1012
pp.archive.close()
1013
1014
m_aa.assert_called_once_with(self.pathfmt.kwdict)
1015
m_ac.assert_called_once()
1016
1017
def _write_module(self, path):
1018
with open(path, "w") as fp:
1019
fp.write("""
1020
def calc(kwdict):
1021
kwdict["_result"] = kwdict["_value"] * 2
1022
""")
1023
1024
1025
class RenameTest(BasePostprocessorTest):
1026
1027
def _prepare(self, filename):
1028
path = self.pathfmt.realdirectory
1029
shutil.rmtree(path, ignore_errors=True)
1030
os.makedirs(path, exist_ok=True)
1031
1032
with open(path + filename, "w"):
1033
pass
1034
1035
return path
1036
1037
def test_rename_from(self):
1038
self._create({"from": "{id}.{extension}"}, {"id": 12345})
1039
path = self._prepare("12345.ext")
1040
1041
self._trigger()
1042
1043
self.assertEqual(os.listdir(path), ["file.ext"])
1044
1045
def test_rename_to(self):
1046
self._create({"to": "{id}.{extension}"}, {"id": 12345})
1047
path = self._prepare("file.ext")
1048
1049
self._trigger(("skip",))
1050
1051
self.assertEqual(os.listdir(path), ["12345.ext"])
1052
1053
def test_rename_from_to(self):
1054
self._create({"from": "name", "to": "{id}"}, {"id": 12345})
1055
path = self._prepare("name")
1056
1057
self._trigger()
1058
1059
self.assertEqual(os.listdir(path), ["12345"])
1060
1061
def test_rename_noopt(self):
1062
with self.assertRaises(ValueError):
1063
self._create({})
1064
1065
def test_rename_skip(self):
1066
self._create({"from": "{id}.{extension}"}, {"id": 12345})
1067
path = self._prepare("12345.ext")
1068
with open(f"{path}file.ext", "w"):
1069
pass
1070
1071
with self.assertLogs("postprocessor.rename", level="WARNING") as cm:
1072
self._trigger()
1073
self.assertTrue(cm.output[0].startswith(
1074
"WARNING:postprocessor.rename:Not renaming "
1075
"'12345.ext' to 'file.ext'"))
1076
self.assertEqual(sorted(os.listdir(path)), ["12345.ext", "file.ext"])
1077
1078
1079
class ZipTest(BasePostprocessorTest):
1080
1081
def test_zip_default(self):
1082
pp = self._create()
1083
self.assertEqual(self.job.hooks["file"][0], pp.write_fast)
1084
self.assertEqual(pp.path, self.pathfmt.realdirectory[:-1])
1085
self.assertEqual(pp.delete, True)
1086
self.assertEqual(pp.args, (
1087
f"{pp.path}.zip", "a", zipfile.ZIP_STORED, True,
1088
))
1089
self.assertTrue(pp.args[0].endswith("/test.zip"))
1090
1091
def test_zip_safe(self):
1092
pp = self._create({"mode": "safe"})
1093
self.assertEqual(self.job.hooks["file"][0], pp.write_safe)
1094
self.assertEqual(pp.path, self.pathfmt.realdirectory[:-1])
1095
self.assertEqual(pp.delete, True)
1096
self.assertEqual(pp.args, (
1097
f"{pp.path}.zip", "a", zipfile.ZIP_STORED, True,
1098
))
1099
self.assertTrue(pp.args[0].endswith("/test.zip"))
1100
1101
def test_zip_options(self):
1102
pp = self._create({
1103
"keep-files": True,
1104
"compression": "zip",
1105
"extension": "cbz",
1106
})
1107
self.assertEqual(pp.delete, False)
1108
self.assertEqual(pp.args, (
1109
f"{pp.path}.cbz", "a", zipfile.ZIP_DEFLATED, True,
1110
))
1111
self.assertTrue(pp.args[0].endswith("/test.cbz"))
1112
1113
def test_zip_write(self):
1114
with tempfile.NamedTemporaryFile("w", dir=self.dir.name) as file:
1115
pp = self._create({"files": [file.name, "_info_.json"],
1116
"keep-files": True})
1117
1118
filename = os.path.basename(file.name)
1119
file.write("foobar\n")
1120
1121
# write dummy file with 3 different names
1122
for i in range(3):
1123
name = f"file{i}.ext"
1124
self.pathfmt.temppath = file.name
1125
self.pathfmt.filename = name
1126
1127
self._trigger()
1128
1129
nti = pp.zfile.NameToInfo
1130
self.assertEqual(len(nti), i+2)
1131
self.assertIn(name, nti)
1132
1133
# check file contents
1134
self.assertEqual(len(nti), 4)
1135
self.assertIn("file0.ext", nti)
1136
self.assertIn("file1.ext", nti)
1137
self.assertIn("file2.ext", nti)
1138
self.assertIn(filename, nti)
1139
1140
# write the last file a second time (will be skipped)
1141
self._trigger()
1142
self.assertEqual(len(pp.zfile.NameToInfo), 4)
1143
1144
# close file
1145
self._trigger(("finalize",))
1146
1147
# reopen to check persistence
1148
with zipfile.ZipFile(pp.zfile.filename) as file:
1149
nti = file.NameToInfo
1150
self.assertEqual(len(pp.zfile.NameToInfo), 4)
1151
self.assertIn("file0.ext", nti)
1152
self.assertIn("file1.ext", nti)
1153
self.assertIn("file2.ext", nti)
1154
self.assertIn(filename, nti)
1155
1156
os.unlink(pp.zfile.filename)
1157
1158
def test_zip_write_mock(self):
1159
1160
def side_effect(_, name):
1161
pp.zfile.NameToInfo.add(name)
1162
1163
pp = self._create()
1164
pp.zfile = Mock()
1165
pp.zfile.NameToInfo = set()
1166
pp.zfile.write.side_effect = side_effect
1167
1168
# write 3 files
1169
for i in range(3):
1170
self.pathfmt.temppath = f"{self.pathfmt.realdirectory}file.ext"
1171
self.pathfmt.filename = f"file{i}.ext"
1172
self._trigger()
1173
1174
# write the last file a second time (should be skipped)
1175
self._trigger()
1176
1177
# close file
1178
self._trigger(("finalize",))
1179
1180
self.assertEqual(pp.zfile.write.call_count, 3)
1181
for call_args in pp.zfile.write.call_args_list:
1182
args, kwargs = call_args
1183
self.assertEqual(len(args), 2)
1184
self.assertEqual(len(kwargs), 0)
1185
self.assertEqual(args[0], self.pathfmt.temppath)
1186
self.assertRegex(args[1], r"file\d\.ext")
1187
self.assertEqual(pp.zfile.close.call_count, 1)
1188
1189
1190
if __name__ == "__main__":
1191
unittest.main()
1192
1193