Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/test/test_postprocessor.py
5457 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
# Copyright 2019-2025 Mike Fährmann
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License version 2 as
8
# published by the Free Software Foundation.
9
10
import os
11
import sys
12
import unittest
13
from unittest.mock import Mock, mock_open, patch, call
14
15
import shutil
16
import logging
17
import zipfile
18
import tempfile
19
import collections
20
from datetime import datetime
21
22
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
23
from gallery_dl import extractor, output, path, util # noqa E402
24
from gallery_dl import postprocessor, config # noqa E402
25
from gallery_dl.postprocessor.common import PostProcessor # noqa E402
26
27
28
class MockPostprocessorModule(Mock):
29
__postprocessor__ = "mock"
30
31
32
class FakeJob():
33
34
def __init__(self, extr=extractor.find("generic:https://example.org/")):
35
extr.directory_fmt = ("{category}",)
36
self.extractor = extr
37
self.pathfmt = path.PathFormat(extr)
38
self.out = output.NullOutput()
39
self.get_logger = logging.getLogger
40
self.hooks = collections.defaultdict(list)
41
42
def register_hooks(self, hooks, options):
43
for hook, callback in hooks.items():
44
self.hooks[hook].append(callback)
45
46
47
class TestPostprocessorModule(unittest.TestCase):
48
49
def setUp(self):
50
postprocessor._cache.clear()
51
52
def test_find(self):
53
for name in (postprocessor.modules):
54
cls = postprocessor.find(name)
55
self.assertEqual(cls.__name__, f"{name.capitalize()}PP")
56
self.assertIs(cls.__base__, PostProcessor)
57
58
self.assertEqual(postprocessor.find("foo"), None)
59
self.assertEqual(postprocessor.find(1234) , None)
60
self.assertEqual(postprocessor.find(None) , None)
61
62
@patch("builtins.__import__")
63
def test_cache(self, import_module):
64
import_module.return_value = MockPostprocessorModule()
65
66
for name in (postprocessor.modules):
67
postprocessor.find(name)
68
self.assertEqual(import_module.call_count, len(postprocessor.modules))
69
70
# no new calls to import_module
71
for name in (postprocessor.modules):
72
postprocessor.find(name)
73
self.assertEqual(import_module.call_count, len(postprocessor.modules))
74
75
76
class BasePostprocessorTest(unittest.TestCase):
77
78
@classmethod
79
def setUpClass(cls):
80
cls.dir = tempfile.TemporaryDirectory()
81
config.set((), "base-directory", cls.dir.name)
82
cls.job = FakeJob()
83
84
@classmethod
85
def tearDownClass(cls):
86
cls.dir.cleanup()
87
config.clear()
88
89
def tearDown(self):
90
self.job.hooks.clear()
91
92
def _create(self, options=None, data=None):
93
kwdict = {"category": "test", "filename": "file", "extension": "ext"}
94
if options is None:
95
options = {}
96
if data is not None:
97
kwdict.update(data)
98
99
self.pathfmt = self.job.pathfmt
100
self.pathfmt.set_directory(kwdict)
101
self.pathfmt.set_filename(kwdict)
102
self.pathfmt.build_path()
103
104
pp = postprocessor.find(self.__class__.__name__[:-4].lower())
105
return pp(self.job, options)
106
107
def _trigger(self, events=None):
108
for event in (events or ("prepare", "file")):
109
for callback in self.job.hooks[event]:
110
callback(self.pathfmt)
111
112
113
class ClassifyTest(BasePostprocessorTest):
114
115
def test_classify_default(self):
116
pp = self._create()
117
118
self.assertEqual(pp.mapping, {
119
ext: directory
120
for directory, exts in pp.DEFAULT_MAPPING.items()
121
for ext in exts
122
})
123
124
self.assertEqual(pp.directory, "")
125
self._trigger(("post",))
126
self.assertEqual(pp.directory, self.pathfmt.directory)
127
128
self.pathfmt.set_extension("jpg")
129
self._trigger(("prepare",))
130
self.pathfmt.build_path()
131
path = os.path.join(self.dir.name, "test", "Pictures")
132
self.assertEqual(self.pathfmt.path, f"{path}/file.jpg")
133
self.assertEqual(self.pathfmt.realpath, f"{path}/file.jpg")
134
135
self.pathfmt.set_extension("mp4")
136
self._trigger(("prepare",))
137
self.pathfmt.build_path()
138
path = os.path.join(self.dir.name, "test", "Video")
139
self.assertEqual(self.pathfmt.path, f"{path}/file.mp4")
140
self.assertEqual(self.pathfmt.realpath, f"{path}/file.mp4")
141
142
def test_classify_noop(self):
143
pp = self._create()
144
rp = self.pathfmt.realpath
145
146
self.assertEqual(pp.directory, "")
147
self._trigger(("post",))
148
self._trigger(("prepare",))
149
150
self.assertEqual(pp.directory, self.pathfmt.directory)
151
self.assertEqual(self.pathfmt.path, rp)
152
self.assertEqual(self.pathfmt.realpath, rp)
153
154
def test_classify_custom(self):
155
pp = self._create({"mapping": {
156
"foo/bar": ["foo", "bar"],
157
}})
158
159
self.assertEqual(pp.mapping, {
160
"foo": "foo/bar",
161
"bar": "foo/bar",
162
})
163
164
self.assertEqual(pp.directory, "")
165
self._trigger(("post",))
166
self.assertEqual(pp.directory, self.pathfmt.directory)
167
168
self.pathfmt.set_extension("foo")
169
self._trigger(("prepare",))
170
self.pathfmt.build_path()
171
path = os.path.join(self.dir.name, "test", "foo", "bar")
172
self.assertEqual(self.pathfmt.path, f"{path}/file.foo")
173
self.assertEqual(self.pathfmt.realpath, f"{path}/file.foo")
174
175
176
class DirectoryTest(BasePostprocessorTest):
177
178
def test_default(self):
179
self._create()
180
181
path = os.path.join(self.dir.name, "test")
182
self.assertEqual(self.pathfmt.realdirectory, f"{path}/")
183
self.assertEqual(self.pathfmt.realpath, f"{path}/file.ext")
184
185
self.pathfmt.kwdict["category"] = "custom"
186
self._trigger()
187
188
path = os.path.join(self.dir.name, "custom")
189
self.assertEqual(self.pathfmt.realdirectory, f"{path}/")
190
self.pathfmt.build_path()
191
self.assertEqual(self.pathfmt.realpath, f"{path}/file.ext")
192
193
194
class ExecTest(BasePostprocessorTest):
195
196
def test_command_string(self):
197
self._create({
198
"command": "echo {} {_path} {_directory} {_filename} && rm {};",
199
})
200
201
with patch("gallery_dl.util.Popen") as p:
202
i = Mock()
203
i.wait.return_value = 0
204
p.return_value = i
205
self._trigger(("after",))
206
207
p.assert_called_once_with(
208
(f"echo "
209
f"{self.pathfmt.realpath} "
210
f"{self.pathfmt.realpath} "
211
f"{self.pathfmt.realdirectory} "
212
f"{self.pathfmt.filename} "
213
f"&& rm {self.pathfmt.realpath};"),
214
shell=True,
215
creationflags=0,
216
start_new_session=False,
217
)
218
i.wait.assert_called_once_with()
219
220
def test_command_list(self):
221
self._create({
222
"command": ["~/script.sh", "{category}",
223
"\fE _directory.upper()"],
224
})
225
226
with patch("gallery_dl.util.Popen") as p:
227
i = Mock()
228
i.wait.return_value = 0
229
p.return_value = i
230
self._trigger(("after",))
231
232
p.assert_called_once_with(
233
[
234
os.path.expanduser("~/script.sh"),
235
self.pathfmt.kwdict["category"],
236
self.pathfmt.realdirectory.upper(),
237
],
238
shell=False,
239
creationflags=0,
240
start_new_session=False,
241
)
242
243
def test_command_many(self):
244
self._create({
245
"commands": [
246
"echo {} {_path} {_directory} {_filename} && rm {};",
247
["~/script.sh", "{category}", "\fE _directory.upper()"],
248
]
249
})
250
251
with patch("gallery_dl.util.Popen") as p:
252
i = Mock()
253
i.wait.return_value = 0
254
p.return_value = i
255
self._trigger(("after",))
256
257
self.assertEqual(p.call_args_list, [
258
call(
259
(f"echo "
260
f"{self.pathfmt.realpath} "
261
f"{self.pathfmt.realpath} "
262
f"{self.pathfmt.realdirectory} "
263
f"{self.pathfmt.filename} "
264
f"&& rm {self.pathfmt.realpath};"),
265
shell=True,
266
creationflags=0,
267
start_new_session=False,
268
),
269
call(
270
[
271
os.path.expanduser("~/script.sh"),
272
self.pathfmt.kwdict["category"],
273
self.pathfmt.realdirectory.upper(),
274
],
275
shell=False,
276
creationflags=0,
277
start_new_session=False,
278
),
279
])
280
281
def test_command_returncode(self):
282
self._create({
283
"command": "echo {}",
284
})
285
286
with patch("gallery_dl.util.Popen") as p:
287
i = Mock()
288
i.wait.return_value = 123
289
p.return_value = i
290
291
with self.assertLogs() as log:
292
self._trigger(("after",))
293
294
msg = (f"WARNING:postprocessor.exec:"
295
f"'echo {self.pathfmt.realpath}' "
296
f"returned with non-zero exit status (123)")
297
self.assertEqual(log.output[0], msg)
298
299
def test_async(self):
300
self._create({
301
"async" : True,
302
"command": "echo {}",
303
})
304
305
with patch("gallery_dl.util.Popen") as p:
306
i = Mock()
307
p.return_value = i
308
self._trigger(("after",))
309
310
self.assertTrue(p.called)
311
self.assertFalse(i.wait.called)
312
313
@unittest.skipIf(util.WINDOWS, "not POSIX")
314
def test_session_posix(self):
315
self._create({
316
"session": True,
317
"command": ["echo", "foobar"],
318
})
319
320
with patch("gallery_dl.util.Popen") as p:
321
i = Mock()
322
i.wait.return_value = 0
323
p.return_value = i
324
self._trigger(("after",))
325
326
p.assert_called_once_with(
327
["echo", "foobar"],
328
shell=False,
329
creationflags=0,
330
start_new_session=True,
331
)
332
i.wait.assert_called_once_with()
333
334
@unittest.skipIf(not util.WINDOWS, "not Windows")
335
def test_session_windows(self):
336
self._create({
337
"session": True,
338
"command": ["echo", "foobar"],
339
})
340
341
with patch("gallery_dl.util.Popen") as p:
342
i = Mock()
343
i.wait.return_value = 0
344
p.return_value = i
345
self._trigger(("after",))
346
347
import subprocess
348
p.assert_called_once_with(
349
["echo", "foobar"],
350
shell=False,
351
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
352
start_new_session=False,
353
)
354
i.wait.assert_called_once_with()
355
356
357
class HashTest(BasePostprocessorTest):
358
359
def test_default(self):
360
self._create({})
361
362
with self.pathfmt.open() as fp:
363
fp.write(b"Foo Bar\n")
364
365
self._trigger()
366
367
kwdict = self.pathfmt.kwdict
368
self.assertEqual(
369
"35c9c9c7c90ad764bae9e2623f522c24", kwdict["md5"], "md5")
370
self.assertEqual(
371
"14d3d804494ef4e57d72de63e4cfee761240471a", kwdict["sha1"], "sha1")
372
373
def test_custom_hashes(self):
374
self._create({"hashes": "sha256:a,sha512:b"})
375
376
with self.pathfmt.open() as fp:
377
fp.write(b"Foo Bar\n")
378
379
self._trigger()
380
381
kwdict = self.pathfmt.kwdict
382
self.assertEqual(
383
"4775b55be17206445d7015a5fc7656f38a74b880670523c3b175455f885f2395",
384
kwdict["a"], "sha256")
385
self.assertEqual(
386
"6028f9e6957f4ca929941318c4bba6258713fd5162f9e33bd10e1c456d252700"
387
"3e1095b50736c4fd1e2deea152e3c8ecd5993462a747208e4d842659935a1c62",
388
kwdict["b"], "sha512")
389
390
def test_custom_hashes_dict(self):
391
self._create({"hashes": {"a": "sha256", "b": "sha512"}})
392
393
with self.pathfmt.open() as fp:
394
fp.write(b"Foo Bar\n")
395
396
self._trigger()
397
398
kwdict = self.pathfmt.kwdict
399
self.assertEqual(
400
"4775b55be17206445d7015a5fc7656f38a74b880670523c3b175455f885f2395",
401
kwdict["a"], "sha256")
402
self.assertEqual(
403
"6028f9e6957f4ca929941318c4bba6258713fd5162f9e33bd10e1c456d252700"
404
"3e1095b50736c4fd1e2deea152e3c8ecd5993462a747208e4d842659935a1c62",
405
kwdict["b"], "sha512")
406
407
408
class MetadataTest(BasePostprocessorTest):
409
410
def test_metadata_default(self):
411
pp = self._create()
412
413
# default arguments
414
self.assertEqual(pp.write , pp._write_json)
415
self.assertEqual(pp.extension, "json")
416
self.assertTrue(callable(pp._json_encode))
417
418
def test_metadata_json(self):
419
pp = self._create({
420
"mode" : "json",
421
"extension" : "JSON",
422
}, {
423
"public" : "hello ワールド",
424
"_private" : "foo バー",
425
})
426
427
self.assertEqual(pp.write , pp._write_json)
428
self.assertEqual(pp.extension, "JSON")
429
self.assertTrue(callable(pp._json_encode))
430
431
with patch("builtins.open", mock_open()) as m:
432
self._trigger()
433
434
path = f"{self.pathfmt.realpath}.JSON"
435
m.assert_called_once_with(path, "w", encoding="utf-8")
436
437
self.assertEqual(self._output(m), """{
438
"category": "test",
439
"filename": "file",
440
"extension": "ext",
441
"public": "hello ワールド"
442
}
443
""")
444
445
def test_metadata_json_options(self):
446
pp = self._create({
447
"mode" : "json",
448
"ascii" : True,
449
"sort" : True,
450
"separators": [",", " : "],
451
"private" : True,
452
"indent" : None,
453
"open" : "a",
454
"encoding" : "UTF-8",
455
"extension" : "JSON",
456
}, {
457
"public" : "hello ワールド",
458
"_private" : "foo バー",
459
})
460
461
self.assertEqual(pp.write , pp._write_json)
462
self.assertEqual(pp.extension, "JSON")
463
self.assertTrue(callable(pp._json_encode))
464
465
with patch("builtins.open", mock_open()) as m:
466
self._trigger()
467
468
path = f"{self.pathfmt.realpath}.JSON"
469
m.assert_called_once_with(path, "a", encoding="UTF-8")
470
self.assertEqual(self._output(m), """{\
471
"_private" : "foo \\u30d0\\u30fc",\
472
"category" : "test",\
473
"extension" : "ext",\
474
"filename" : "file",\
475
"public" : "hello \\u30ef\\u30fc\\u30eb\\u30c9"}
476
""")
477
478
def test_metadata_tags(self):
479
pp = self._create(
480
{"mode": "tags"},
481
{"tags": ["foo", "bar", "baz"]},
482
)
483
self.assertEqual(pp.write, pp._write_tags)
484
self.assertEqual(pp.extension, "txt")
485
486
with patch("builtins.open", mock_open()) as m:
487
self._trigger()
488
489
path = f"{self.pathfmt.realpath}.txt"
490
m.assert_called_once_with(path, "w", encoding="utf-8")
491
self.assertEqual(self._output(m), "foo\nbar\nbaz\n")
492
493
def test_metadata_tags_split_1(self):
494
self._create(
495
{"mode": "tags"},
496
{"tags": "foo, bar, baz"},
497
)
498
with patch("builtins.open", mock_open()) as m:
499
self._trigger()
500
self.assertEqual(self._output(m), "foo\nbar\nbaz\n")
501
502
def test_metadata_tags_split_2(self):
503
self._create(
504
{"mode": "tags"},
505
{"tags": "foobar1 foobar2 foobarbaz"},
506
)
507
with patch("builtins.open", mock_open()) as m:
508
self._trigger()
509
self.assertEqual(self._output(m), "foobar1\nfoobar2\nfoobarbaz\n")
510
511
def test_metadata_tags_tagstring(self):
512
self._create(
513
{"mode": "tags"},
514
{"tag_string": "foo, bar, baz"},
515
)
516
with patch("builtins.open", mock_open()) as m:
517
self._trigger()
518
self.assertEqual(self._output(m), "foo\nbar\nbaz\n")
519
520
def test_metadata_tags_dict(self):
521
self._create(
522
{"mode": "tags"},
523
{"tags": {"g": ["foobar1", "foobar2"], "m": ["foobarbaz"]}},
524
)
525
with patch("builtins.open", mock_open()) as m:
526
self._trigger()
527
self.assertEqual(self._output(m), "foobar1\nfoobar2\nfoobarbaz\n")
528
529
def test_metadata_tags_list_of_dict(self):
530
self._create(
531
{"mode": "tags"},
532
{"tags": [
533
{"g": "foobar1", "m": "foobar2", "u": True},
534
{"g": None, "m": "foobarbaz", "u": [3, 4]},
535
]},
536
)
537
with patch("builtins.open", mock_open()) as m:
538
self._trigger()
539
self.assertEqual(self._output(m), "foobar1\nfoobar2\nfoobarbaz\n")
540
541
def test_metadata_custom(self):
542
def test(pp_info):
543
pp = self._create(pp_info, {"foo": "bar"})
544
self.assertEqual(pp.write, pp._write_custom)
545
self.assertEqual(pp.extension, "txt")
546
self.assertTrue(pp._content_fmt)
547
548
with patch("builtins.open", mock_open()) as m:
549
self._trigger()
550
self.assertEqual(self._output(m), "bar\nNone\n")
551
self.job.hooks.clear()
552
553
test({"mode": "custom", "content-format": "{foo}\n{missing}\n"})
554
test({"mode": "custom", "content-format": ["{foo}", "{missing}"]})
555
test({"mode": "custom", "format": "{foo}\n{missing}\n"})
556
test({"format": "{foo}\n{missing}\n"})
557
558
def test_metadata_extfmt(self):
559
pp = self._create({
560
"extension" : "ignored",
561
"extension-format": "json",
562
})
563
564
self.assertEqual(pp._filename, pp._filename_extfmt)
565
566
with patch("builtins.open", mock_open()) as m:
567
self._trigger()
568
569
path = f"{self.pathfmt.realdirectory}file.json"
570
m.assert_called_once_with(path, "w", encoding="utf-8")
571
572
def test_metadata_extfmt_2(self):
573
self._create({
574
"extension-format": "{extension!u}-data:{category:Res/ES/}",
575
})
576
577
self.pathfmt.prefix = "2."
578
with patch("builtins.open", mock_open()) as m:
579
self._trigger()
580
581
path = f"{self.pathfmt.realdirectory}file.2.EXT-data:tESt"
582
m.assert_called_once_with(path, "w", encoding="utf-8")
583
584
def test_metadata_directory(self):
585
self._create({
586
"directory": "metadata",
587
})
588
589
with patch("builtins.open", mock_open()) as m:
590
self._trigger()
591
592
path = f"{self.pathfmt.realdirectory}metadata/file.ext.json"
593
m.assert_called_once_with(path, "w", encoding="utf-8")
594
595
def test_metadata_directory_2(self):
596
self._create({
597
"directory" : "metadata////",
598
"extension-format": "json",
599
})
600
601
with patch("builtins.open", mock_open()) as m:
602
self._trigger()
603
604
path = f"{self.pathfmt.realdirectory}metadata/file.json"
605
m.assert_called_once_with(path, "w", encoding="utf-8")
606
607
def test_metadata_directory_format(self):
608
self._create(
609
{"directory": ["..", "json", "\fE str(id // 500 * 500 + 500)"]},
610
{"id": 12345},
611
)
612
613
with patch("builtins.open", mock_open()) as m:
614
self._trigger()
615
616
path = f"{self.pathfmt.realdirectory}../json/12500/file.ext.json"
617
m.assert_called_once_with(path, "w", encoding="utf-8")
618
619
def test_metadata_directory_empty(self):
620
self._create(
621
{"directory": []},
622
)
623
624
with patch("builtins.open", mock_open()) as m:
625
self._trigger()
626
627
path = f"{self.pathfmt.realdirectory}./file.ext.json"
628
m.assert_called_once_with(path, "w", encoding="utf-8")
629
630
def test_metadata_basedirectory(self):
631
self._create({"base-directory": True})
632
633
with patch("builtins.open", mock_open()) as m:
634
self._trigger()
635
636
path = f"{self.pathfmt.basedirectory}file.ext.json"
637
m.assert_called_once_with(path, "w", encoding="utf-8")
638
639
def test_metadata_basedirectory_custom(self):
640
self._create({
641
"base-directory": "/home/test",
642
"directory": "meta",
643
})
644
645
with patch("builtins.open", mock_open()) as m:
646
self._trigger()
647
648
path = "/home/test/meta/file.ext.json"
649
m.assert_called_once_with(path, "w", encoding="utf-8")
650
651
def test_metadata_filename(self):
652
self._create({
653
"filename" : "{category}_{filename}_/meta/\n\r.data",
654
"extension-format": "json",
655
})
656
657
with patch("builtins.open", mock_open()) as m:
658
self._trigger()
659
660
path = f"{self.pathfmt.realdirectory}test_file__meta_.data"
661
m.assert_called_once_with(path, "w", encoding="utf-8")
662
663
def test_metadata_meta_path(self):
664
self._create({
665
"metadata-path": "_meta_path",
666
})
667
668
self._trigger()
669
670
self.assertEqual(self.pathfmt.kwdict["_meta_path"],
671
f"{self.pathfmt.realpath}.json")
672
673
def test_metadata_stdout(self):
674
self._create({"filename": "-", "indent": None, "sort": True})
675
676
with patch("sys.stdout", Mock()) as m:
677
self._trigger()
678
679
self.assertEqual(self._output(m), """\
680
{"category": "test", "extension": "ext", "filename": "file"}
681
""")
682
683
def test_metadata_modify(self):
684
kwdict = {"foo": 0, "bar": {"bax": 1, "bay": 2, "baz": 3, "ba2": {}}}
685
self._create({
686
"mode": "modify",
687
"fields": {
688
"foo" : "{filename}-{foo!s}",
689
"foo2" : "\fE bar['bax'] + 122",
690
"bar[\"baz\"]" : "{_now}",
691
"bar['ba2'][a]": "test",
692
},
693
}, kwdict)
694
695
pdict = self.pathfmt.kwdict
696
self.assertIsNot(kwdict, pdict)
697
self.assertEqual(pdict["foo"], kwdict["foo"])
698
self.assertEqual(pdict["bar"], kwdict["bar"])
699
700
self._trigger()
701
702
self.assertEqual(pdict["foo"] , "file-0")
703
self.assertEqual(pdict["foo2"], 123)
704
self.assertEqual(pdict["bar"]["ba2"]["a"], "test")
705
self.assertIsInstance(pdict["bar"]["baz"], datetime)
706
707
def test_metadata_delete(self):
708
kwdict = {
709
"foo": 0,
710
"bar": {
711
"bax": 1,
712
"bay": 2,
713
"baz": {"a": 3, "b": 4},
714
},
715
}
716
self._create({
717
"mode": "delete",
718
"fields": ["foo", "bar['bax']", "bar[\"baz\"][a]"],
719
}, kwdict)
720
721
pdict = self.pathfmt.kwdict
722
self.assertIsNot(kwdict, pdict)
723
724
self.assertEqual(pdict["foo"], kwdict["foo"])
725
self.assertEqual(pdict["bar"], kwdict["bar"])
726
727
self._trigger()
728
729
self.assertNotIn("foo", pdict)
730
self.assertNotIn("bax", pdict["bar"])
731
self.assertNotIn("a", pdict["bar"]["baz"])
732
733
# no errors for deleted/undefined fields
734
self._trigger()
735
self.assertNotIn("foo", pdict)
736
self.assertNotIn("bax", pdict["bar"])
737
self.assertNotIn("a", pdict["bar"]["baz"])
738
739
def test_metadata_option_skip(self):
740
self._create({"skip": True})
741
742
with patch("builtins.open", mock_open()) as m, \
743
patch("os.path.exists") as e:
744
e.return_value = True
745
self._trigger()
746
747
self.assertTrue(e.called)
748
self.assertTrue(not m.called)
749
self.assertTrue(not len(self._output(m)))
750
751
with patch("builtins.open", mock_open()) as m, \
752
patch("os.path.exists") as e:
753
e.return_value = False
754
self._trigger()
755
756
self.assertTrue(e.called)
757
self.assertTrue(m.called)
758
self.assertGreater(len(self._output(m)), 0)
759
760
path = f"{self.pathfmt.realdirectory}file.ext.json"
761
m.assert_called_once_with(path, "w", encoding="utf-8")
762
763
def test_metadata_option_skip_false(self):
764
self._create({"skip": False})
765
766
with patch("builtins.open", mock_open()) as m, \
767
patch("os.path.exists") as e:
768
self._trigger()
769
770
self.assertTrue(not e.called)
771
self.assertTrue(m.called)
772
773
def test_metadata_option_include(self):
774
self._create(
775
{"include": ["_private", "filename", "foo"], "sort": True},
776
{"public": "hello ワールド", "_private": "foo バー"},
777
)
778
779
with patch("builtins.open", mock_open()) as m:
780
self._trigger()
781
782
self.assertEqual(self._output(m), """{
783
"_private": "foo バー",
784
"filename": "file"
785
}
786
""")
787
788
def test_metadata_option_exclude(self):
789
self._create(
790
{"exclude": ["category", "filename", "foo"], "sort": True},
791
{"public": "hello ワールド", "_private": "foo バー"},
792
)
793
794
with patch("builtins.open", mock_open()) as m:
795
self._trigger()
796
797
self.assertEqual(self._output(m), """{
798
"extension": "ext",
799
"public": "hello ワールド"
800
}
801
""")
802
803
def _output(self, mock):
804
return "".join(
805
call[1][0]
806
for call in mock.mock_calls
807
if call[0].endswith("write")
808
)
809
810
811
class MtimeTest(BasePostprocessorTest):
812
813
def test_mtime_datetime(self):
814
self._create(None, {"date": datetime(1980, 1, 1)})
815
self._trigger()
816
self.assertEqual(self.pathfmt.kwdict["_mtime_meta"], 315532800)
817
818
def test_mtime_timestamp(self):
819
self._create(None, {"date": 315532800})
820
self._trigger()
821
self.assertEqual(self.pathfmt.kwdict["_mtime_meta"], 315532800)
822
823
def test_mtime_none(self):
824
self._create(None, {"date": None})
825
self._trigger()
826
self.assertNotIn("_mtime_meta", self.pathfmt.kwdict)
827
828
def test_mtime_undefined(self):
829
self._create(None, {})
830
self._trigger()
831
self.assertNotIn("_mtime_meta", self.pathfmt.kwdict)
832
833
def test_mtime_key(self):
834
self._create({"key": "foo"}, {"foo": 315532800})
835
self._trigger()
836
self.assertEqual(self.pathfmt.kwdict["_mtime_meta"], 315532800)
837
838
def test_mtime_value(self):
839
self._create({"value": "{foo}"}, {"foo": 315532800})
840
self._trigger()
841
self.assertEqual(self.pathfmt.kwdict["_mtime_meta"], 315532800)
842
843
844
class PythonTest(BasePostprocessorTest):
845
846
def test_module(self):
847
path = os.path.join(self.dir.name, "module.py")
848
self._write_module(path)
849
850
sys.path.insert(0, self.dir.name)
851
try:
852
self._create({"function": "module:calc"}, {"_value": 123})
853
finally:
854
del sys.path[0]
855
856
self.assertNotIn("_result", self.pathfmt.kwdict)
857
self._trigger()
858
self.assertEqual(self.pathfmt.kwdict["_result"], 246)
859
860
def test_path(self):
861
path = os.path.join(self.dir.name, "module.py")
862
self._write_module(path)
863
864
self._create({"function": f"{path}:calc"}, {"_value": 12})
865
866
self.assertNotIn("_result", self.pathfmt.kwdict)
867
self._trigger()
868
self.assertEqual(self.pathfmt.kwdict["_result"], 24)
869
870
def _write_module(self, path):
871
with open(path, "w") as fp:
872
fp.write("""
873
def calc(kwdict):
874
kwdict["_result"] = kwdict["_value"] * 2
875
""")
876
877
878
class RenameTest(BasePostprocessorTest):
879
880
def _prepare(self, filename):
881
path = self.pathfmt.realdirectory
882
shutil.rmtree(path, ignore_errors=True)
883
os.makedirs(path, exist_ok=True)
884
885
with open(path + filename, "w"):
886
pass
887
888
return path
889
890
def test_rename_from(self):
891
self._create({"from": "{id}.{extension}"}, {"id": 12345})
892
path = self._prepare("12345.ext")
893
894
self._trigger()
895
896
self.assertEqual(os.listdir(path), ["file.ext"])
897
898
def test_rename_to(self):
899
self._create({"to": "{id}.{extension}"}, {"id": 12345})
900
path = self._prepare("file.ext")
901
902
self._trigger(("skip",))
903
904
self.assertEqual(os.listdir(path), ["12345.ext"])
905
906
def test_rename_from_to(self):
907
self._create({"from": "name", "to": "{id}"}, {"id": 12345})
908
path = self._prepare("name")
909
910
self._trigger()
911
912
self.assertEqual(os.listdir(path), ["12345"])
913
914
def test_rename_noopt(self):
915
with self.assertRaises(ValueError):
916
self._create({})
917
918
def test_rename_skip(self):
919
self._create({"from": "{id}.{extension}"}, {"id": 12345})
920
path = self._prepare("12345.ext")
921
with open(f"{path}file.ext", "w"):
922
pass
923
924
with self.assertLogs("postprocessor.rename", level="WARNING") as cm:
925
self._trigger()
926
self.assertTrue(cm.output[0].startswith(
927
"WARNING:postprocessor.rename:Not renaming "
928
"'12345.ext' to 'file.ext'"))
929
self.assertEqual(sorted(os.listdir(path)), ["12345.ext", "file.ext"])
930
931
932
class ZipTest(BasePostprocessorTest):
933
934
def test_zip_default(self):
935
pp = self._create()
936
self.assertEqual(self.job.hooks["file"][0], pp.write_fast)
937
self.assertEqual(pp.path, self.pathfmt.realdirectory[:-1])
938
self.assertEqual(pp.delete, True)
939
self.assertEqual(pp.args, (
940
f"{pp.path}.zip", "a", zipfile.ZIP_STORED, True,
941
))
942
self.assertTrue(pp.args[0].endswith("/test.zip"))
943
944
def test_zip_safe(self):
945
pp = self._create({"mode": "safe"})
946
self.assertEqual(self.job.hooks["file"][0], pp.write_safe)
947
self.assertEqual(pp.path, self.pathfmt.realdirectory[:-1])
948
self.assertEqual(pp.delete, True)
949
self.assertEqual(pp.args, (
950
f"{pp.path}.zip", "a", zipfile.ZIP_STORED, True,
951
))
952
self.assertTrue(pp.args[0].endswith("/test.zip"))
953
954
def test_zip_options(self):
955
pp = self._create({
956
"keep-files": True,
957
"compression": "zip",
958
"extension": "cbz",
959
})
960
self.assertEqual(pp.delete, False)
961
self.assertEqual(pp.args, (
962
f"{pp.path}.cbz", "a", zipfile.ZIP_DEFLATED, True,
963
))
964
self.assertTrue(pp.args[0].endswith("/test.cbz"))
965
966
def test_zip_write(self):
967
with tempfile.NamedTemporaryFile("w", dir=self.dir.name) as file:
968
pp = self._create({"files": [file.name, "_info_.json"],
969
"keep-files": True})
970
971
filename = os.path.basename(file.name)
972
file.write("foobar\n")
973
974
# write dummy file with 3 different names
975
for i in range(3):
976
name = f"file{i}.ext"
977
self.pathfmt.temppath = file.name
978
self.pathfmt.filename = name
979
980
self._trigger()
981
982
nti = pp.zfile.NameToInfo
983
self.assertEqual(len(nti), i+2)
984
self.assertIn(name, nti)
985
986
# check file contents
987
self.assertEqual(len(nti), 4)
988
self.assertIn("file0.ext", nti)
989
self.assertIn("file1.ext", nti)
990
self.assertIn("file2.ext", nti)
991
self.assertIn(filename, nti)
992
993
# write the last file a second time (will be skipped)
994
self._trigger()
995
self.assertEqual(len(pp.zfile.NameToInfo), 4)
996
997
# close file
998
self._trigger(("finalize",))
999
1000
# reopen to check persistence
1001
with zipfile.ZipFile(pp.zfile.filename) as file:
1002
nti = file.NameToInfo
1003
self.assertEqual(len(pp.zfile.NameToInfo), 4)
1004
self.assertIn("file0.ext", nti)
1005
self.assertIn("file1.ext", nti)
1006
self.assertIn("file2.ext", nti)
1007
self.assertIn(filename, nti)
1008
1009
os.unlink(pp.zfile.filename)
1010
1011
def test_zip_write_mock(self):
1012
1013
def side_effect(_, name):
1014
pp.zfile.NameToInfo.add(name)
1015
1016
pp = self._create()
1017
pp.zfile = Mock()
1018
pp.zfile.NameToInfo = set()
1019
pp.zfile.write.side_effect = side_effect
1020
1021
# write 3 files
1022
for i in range(3):
1023
self.pathfmt.temppath = f"{self.pathfmt.realdirectory}file.ext"
1024
self.pathfmt.filename = f"file{i}.ext"
1025
self._trigger()
1026
1027
# write the last file a second time (should be skipped)
1028
self._trigger()
1029
1030
# close file
1031
self._trigger(("finalize",))
1032
1033
self.assertEqual(pp.zfile.write.call_count, 3)
1034
for call_args in pp.zfile.write.call_args_list:
1035
args, kwargs = call_args
1036
self.assertEqual(len(args), 2)
1037
self.assertEqual(len(kwargs), 0)
1038
self.assertEqual(args[0], self.pathfmt.temppath)
1039
self.assertRegex(args[1], r"file\d\.ext")
1040
self.assertEqual(pp.zfile.close.call_count, 1)
1041
1042
1043
if __name__ == "__main__":
1044
unittest.main()
1045
1046