Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/postprocessor/metadata.py
8753 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2019-2025 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
"""Write metadata to external files"""
10
11
from .common import PostProcessor
12
from .. import util, formatter
13
import json
14
import sys
15
import os
16
17
18
class MetadataPP(PostProcessor):
19
20
def __init__(self, job, options):
21
PostProcessor.__init__(self, job)
22
23
mode = options.get("mode")
24
cfmt = options.get("content-format") or options.get("format")
25
omode = "w"
26
filename = None
27
28
if mode == "tags":
29
self.write = self._write_tags
30
ext = "txt"
31
elif mode == "modify":
32
self.run = self._run_modify
33
self.fields = {
34
name: formatter.parse(value, None, util.identity).format_map
35
for name, value in options.get("fields").items()
36
}
37
ext = None
38
elif mode == "delete":
39
self.run = self._run_delete
40
self.fields = options.get("fields")
41
ext = None
42
elif mode == "custom" or not mode and cfmt:
43
self.write = self._write_custom
44
if isinstance(cfmt, list):
45
cfmt = "\n".join(cfmt) + "\n"
46
self._content_fmt = formatter.parse(cfmt).format_map
47
ext = "txt"
48
elif mode == "print":
49
nl = "\n"
50
if isinstance(cfmt, list):
51
cfmt = f"{nl.join(cfmt)}{nl}"
52
if cfmt[-1] != nl and (cfmt[0] != "\f" or cfmt[1] == "F"):
53
cfmt = f"{cfmt}{nl}"
54
self.write = self._write_custom
55
self._content_fmt = formatter.parse(cfmt).format_map
56
filename = "-"
57
elif mode == "jsonl":
58
self.write = self._write_json
59
self._json_encode = self._make_encoder(options).encode
60
omode = "a"
61
filename = "data.jsonl"
62
else:
63
self.write = self._write_json
64
self._json_encode = self._make_encoder(options, 4).encode
65
ext = "json"
66
67
if base_directory := options.get("base-directory"):
68
if base_directory is True:
69
self._base = lambda p: p.basedirectory
70
else:
71
sep = os.sep
72
altsep = os.altsep
73
base_directory = util.expand_path(base_directory)
74
if altsep and altsep in base_directory:
75
base_directory = base_directory.replace(altsep, sep)
76
if base_directory[-1] != sep:
77
base_directory += sep
78
self._base = lambda p: base_directory
79
80
directory = options.get("directory")
81
if isinstance(directory, list):
82
self._directory = self._directory_format
83
self._directory_formatters = [
84
formatter.parse(dirfmt, util.NONE).format_map
85
for dirfmt in directory
86
]
87
elif directory:
88
self._directory = self._directory_custom
89
sep = os.sep + (os.altsep or "")
90
self._metadir = util.expand_path(directory).rstrip(sep) + os.sep
91
92
filename = options.get("filename", filename)
93
extfmt = options.get("extension-format")
94
if filename:
95
if filename == "-":
96
self.run = self._run_stdout
97
else:
98
self._filename = self._filename_custom
99
self._filename_fmt = formatter.parse(filename).format_map
100
elif extfmt:
101
self._filename = self._filename_extfmt
102
self._extension_fmt = formatter.parse(extfmt).format_map
103
else:
104
self.extension = options.get("extension", ext)
105
106
events = options.get("event")
107
if events is None:
108
events = ("file",)
109
elif isinstance(events, str):
110
events = events.split(",")
111
job.register_hooks({event: self.run for event in events}, options)
112
113
if self._archive_init(job, options, "_MD_"):
114
self._archive_register(job)
115
116
self.filter = self._make_filter(options)
117
self.mtime = options.get("mtime")
118
self.omode = options.get("open", omode)
119
self.encoding = options.get("encoding", "utf-8")
120
self.newline = options.get("newline")
121
self.skip = options.get("skip", False)
122
self.meta_path = options.get("metadata-path")
123
124
def open(self, path):
125
return open(path, self.omode,
126
encoding=self.encoding,
127
newline=self.newline)
128
129
def run(self, pathfmt):
130
archive = self.archive
131
if archive and archive.check(pathfmt.kwdict):
132
return
133
134
if util.WINDOWS and pathfmt.extended:
135
directory = pathfmt._extended_path(self._directory(pathfmt))
136
else:
137
directory = self._directory(pathfmt)
138
path = directory + self._filename(pathfmt)
139
140
if self.meta_path is not None:
141
pathfmt.kwdict[self.meta_path] = path
142
143
if self.skip and os.path.exists(path):
144
return
145
146
try:
147
with self.open(path) as fp:
148
self.write(fp, pathfmt.kwdict)
149
except FileNotFoundError:
150
os.makedirs(directory, exist_ok=True)
151
with self.open(path) as fp:
152
self.write(fp, pathfmt.kwdict)
153
154
if archive:
155
archive.add(pathfmt.kwdict)
156
157
if self.mtime:
158
pathfmt.set_mtime(path)
159
160
def _run_stdout(self, pathfmt):
161
self.write(sys.stdout, pathfmt.kwdict)
162
163
def _run_modify(self, pathfmt):
164
kwdict = pathfmt.kwdict
165
for key, func in self.fields.items():
166
obj = kwdict
167
try:
168
if "[" in key:
169
obj, key = _traverse(obj, key)
170
obj[key] = func(kwdict)
171
except Exception:
172
pass
173
174
def _run_delete(self, pathfmt):
175
kwdict = pathfmt.kwdict
176
for key in self.fields:
177
obj = kwdict
178
try:
179
if "[" in key:
180
obj, key = _traverse(obj, key)
181
del obj[key]
182
except Exception:
183
pass
184
185
def _base(self, pathfmt):
186
return pathfmt.realdirectory
187
188
def _directory(self, pathfmt):
189
return self._base(pathfmt)
190
191
def _directory_custom(self, pathfmt):
192
return os.path.join(self._base(pathfmt), self._metadir)
193
194
def _directory_format(self, pathfmt):
195
formatters = pathfmt.directory_formatters
196
conditions = pathfmt.directory_conditions
197
try:
198
pathfmt.directory_formatters = self._directory_formatters
199
pathfmt.directory_conditions = ()
200
if segments := pathfmt.build_directory(pathfmt.kwdict):
201
directory = pathfmt.clean_path(os.sep.join(segments) + os.sep)
202
else:
203
directory = "." + os.sep
204
return os.path.join(self._base(pathfmt), directory)
205
finally:
206
pathfmt.directory_conditions = conditions
207
pathfmt.directory_formatters = formatters
208
209
def _filename(self, pathfmt):
210
return (pathfmt.filename or "metadata") + "." + self.extension
211
212
def _filename_custom(self, pathfmt):
213
return pathfmt.clean_path(pathfmt.clean_segment(
214
self._filename_fmt(pathfmt.kwdict)))
215
216
def _filename_extfmt(self, pathfmt):
217
kwdict = pathfmt.kwdict
218
ext = kwdict.get("extension")
219
kwdict["extension"] = pathfmt.extension
220
kwdict["extension"] = pathfmt.prefix + self._extension_fmt(kwdict)
221
filename = pathfmt.build_filename(kwdict)
222
kwdict["extension"] = ext
223
return filename
224
225
def _write_custom(self, fp, kwdict):
226
fp.write(self._content_fmt(kwdict))
227
228
def _write_tags(self, fp, kwdict):
229
tags = kwdict.get("tags") or kwdict.get("tag_string")
230
231
if not tags:
232
return
233
234
if isinstance(tags, str):
235
taglist = tags.split(", ")
236
if len(taglist) < len(tags) / 16:
237
taglist = tags.split(" ")
238
tags = taglist
239
elif isinstance(tags, dict):
240
taglists = tags.values()
241
tags = []
242
extend = tags.extend
243
for taglist in taglists:
244
extend(taglist)
245
tags.sort()
246
elif all(isinstance(e, dict) for e in tags):
247
taglists = tags
248
tags = []
249
extend = tags.extend
250
for tagdict in taglists:
251
extend([x for x in tagdict.values() if isinstance(x, str)])
252
tags.sort()
253
254
fp.write("\n".join(tags) + "\n")
255
256
def _write_json(self, fp, kwdict):
257
if self.filter:
258
kwdict = self.filter(kwdict)
259
fp.write(self._json_encode(kwdict) + "\n")
260
261
def _make_filter(self, options):
262
if include := options.get("include"):
263
if isinstance(include, str):
264
include = include.split(",")
265
return lambda d: {k: d[k] for k in include if k in d}
266
267
exclude = options.get("exclude")
268
private = options.get("private")
269
if exclude:
270
if isinstance(exclude, str):
271
exclude = exclude.split(",")
272
exclude = set(exclude)
273
274
if private:
275
return lambda d: {k: v for k, v in d.items()
276
if k not in exclude}
277
return lambda d: {k: v for k, v in util.filter_dict(d).items()
278
if k not in exclude}
279
280
if not private:
281
return util.filter_dict
282
283
def _make_encoder(self, options, indent=None):
284
return json.JSONEncoder(
285
ensure_ascii=options.get("ascii", False),
286
sort_keys=options.get("sort", False),
287
separators=options.get("separators"),
288
indent=options.get("indent", indent),
289
check_circular=False,
290
default=util.json_default,
291
)
292
293
294
def _traverse(obj, key):
295
name, _, key = key.partition("[")
296
obj = obj[name]
297
298
while "[" in key:
299
name, _, key = key.partition("[")
300
obj = obj[name.strip("\"']")]
301
302
return obj, key.strip("\"']")
303
304
305
__postprocessor__ = MetadataPP
306
307