Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/scripts/generate_test_result.py
8773 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
# Copyright 2025-2026 Mike Fährmann
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License version 2 as
8
# published by the Free Software Foundation.
9
10
"""Generate test result data"""
11
12
import logging
13
import argparse
14
import json
15
import util
16
from pyprint import pyprint
17
from gallery_dl import extractor, job, config, exception
18
19
LOG = logging.getLogger("gen-test")
20
21
22
class LoggingCapture(logging.Handler):
23
24
def __init__(self, args):
25
logging.Handler.__init__(self)
26
27
if args.logging:
28
self.records = []
29
self.output = []
30
self.level = logging.INFO
31
else:
32
self.records = self.output = None
33
34
def __enter__(self):
35
if self.records is None:
36
return
37
38
logger = logging.getLogger(None)
39
logger.handlers.append(self)
40
return self
41
42
def __exit__(self, exc_type, exc_value, traceback):
43
pass
44
45
def flush(self):
46
pass
47
48
def emit(self, record):
49
self.records.append(record)
50
self.output.append(self.format(record))
51
52
53
def generate_test_result(args):
54
head = generate_head(args)
55
56
if args.only_matching:
57
opts = meta = None
58
else:
59
if args.auth:
60
cfg = util.path("archive", "config.json")
61
config.load((cfg,), strict=True)
62
if args.options:
63
args.options_parsed = options = {}
64
for opt in args.options:
65
key, _, value = opt.partition("=")
66
try:
67
value = json.loads(value)
68
except ValueError:
69
pass
70
options[key] = value
71
config.set((), key, value)
72
if args.range:
73
config.set((), "image-range" , args.range)
74
config.set((), "chapter-range", args.range)
75
76
djob = job.DataJob(args.extr, file=None)
77
djob.filter = dict.copy
78
with LoggingCapture(args) as log_info:
79
djob.run()
80
81
opts = generate_opts(
82
args, djob.data_urls, djob.data_meta, djob.exception, log_info)
83
ool = (len(opts) > 1 or "#options" in opts)
84
85
if args.metadata:
86
meta = generate_meta(args, djob.data_meta)
87
else:
88
meta = None
89
90
result = pyprint(head, oneline=False, lmin=9)
91
if opts:
92
result = result[:-2] + pyprint(opts, oneline=ool, lmin=9)[1:]
93
if meta:
94
result = result[:-1] + pyprint(meta, sort=sort_key)[1:]
95
return result + ",\n\n"
96
97
98
def generate_head(args):
99
head = {}
100
cls = args.cls
101
102
head["#url"] = args.extr.url
103
if args.comment is not None:
104
head["#comment"] = args.comment
105
if args.base or args.cat != cls.category or args.sub != cls.subcategory:
106
head["#category"] = (args.base, args.cat, args.sub)
107
head["#class"] = args.cls
108
109
return head
110
111
112
def generate_opts(args, urls, meta=(), exc=None, log=None):
113
opts = {}
114
115
if args.auth is not None:
116
opts["#auth"] = args.auth
117
118
if args.options:
119
opts["#options"] = args.options_parsed
120
121
if args.range:
122
opts["#range"] = args.range
123
124
if exc:
125
if isinstance(exc, exception.GalleryDLException):
126
opts["#exception"] = exc.__class__.__name__
127
else:
128
opts["#exception"] = exc.__class__
129
elif not urls:
130
opts["#count"] = 0
131
elif len(urls) == 1:
132
opts["#results"] = urls[0]
133
elif len(urls) < args.limit_urls:
134
opts["#results"] = tuple(urls)
135
else:
136
if meta and (extr := meta[0].get("_extractor")):
137
name = extr.__module__.rpartition(".")[2]
138
if name[0].isdecimal():
139
name = f"_{name}"
140
opts["#pattern"] = f"lit:{name}.{extr.__name__}.pattern"
141
else:
142
import re
143
opts["#pattern"] = re.escape(urls[0])
144
if "#range" in opts:
145
opts["#range"] = opts.pop("#range")
146
opts["#count"] = len(urls)
147
148
if log is not None:
149
if not log.records:
150
opts["#log"] = ()
151
elif len(log.records) == 1:
152
opts["#log"] = log.output[0]
153
else:
154
opts["#log"] = log.output
155
156
return opts
157
158
159
def generate_meta(args, data):
160
if not data:
161
return {}
162
163
for kwdict in data:
164
delete = ["category", "subcategory"]
165
for key in kwdict:
166
if not key or key[0] == "_":
167
delete.append(key)
168
for key in delete:
169
del kwdict[key]
170
171
return data[0]
172
173
174
def sort_key(key, value):
175
if not value:
176
return 0
177
if isinstance(value, str) and "\n" in value:
178
return 7000
179
if isinstance(value, list) and not small(value):
180
return 8000
181
if isinstance(value, dict) and not small(value):
182
return 9000
183
return 0
184
185
186
def small(obj):
187
if not obj:
188
return True
189
if isinstance(obj, list):
190
return False if len(obj) > 1 else small(obj[0])
191
if isinstance(obj, dict):
192
return False if len(obj) > 1 else small(next(iter(obj.values())))
193
return True
194
195
196
def insert_test_result(args, result, lines):
197
idx_block = None
198
flag = False
199
200
for idx, line in enumerate(lines):
201
line = line.lstrip()
202
if not line:
203
continue
204
elif line[0] == "{":
205
idx_block = idx
206
elif line.startswith('"#class"'):
207
if args.cls.__name__ in line:
208
flag = True
209
elif flag:
210
flag = None
211
break
212
213
if idx_block is None or flag is not None:
214
lines.insert(-1, result)
215
else:
216
lines.insert(idx_block, result)
217
218
219
def parse_args(args=None):
220
parser = argparse.ArgumentParser(args)
221
parser.add_argument("-a", "--auth", action="store_true", default=None)
222
parser.add_argument("-A", "--no-auth", action="store_false", dest="auth")
223
parser.add_argument("-c", "--comment", default=None)
224
parser.add_argument("-C", dest="comment", action="store_const", const="")
225
parser.add_argument("-g", "--git", action="store_true")
226
parser.add_argument("-l", "--logging", action="store_true")
227
parser.add_argument("-L", "--limit_urls", type=int, default=10)
228
parser.add_argument("-m", "--metadata", action="store_true")
229
parser.add_argument("-o", "--option", dest="options", action="append")
230
parser.add_argument("-O", "--only-matching", action="store_true")
231
parser.add_argument("-r", "--range")
232
parser.add_argument("URL")
233
234
return parser.parse_args()
235
236
237
def main():
238
args = parse_args()
239
args.url = args.URL
240
241
extr = extractor.find(args.url)
242
if extr is None:
243
LOG.error("Unsupported URL '%s'", args.url)
244
raise SystemExit(1)
245
246
args.extr = extr
247
args.cls = extr.__class__
248
args.cat = extr.category
249
args.sub = extr.subcategory
250
args.base = extr.basecategory
251
252
LOG.info("Collecting data for '%s'", args.url)
253
result = generate_test_result(args)
254
255
path = util.path("test", "results", f"{args.cat}.py")
256
path_tr = util.trim(path)
257
LOG.info("Writing '%s' results to '%s'", args.url, path_tr)
258
with util.lines(path) as lines:
259
insert_test_result(args, result, lines)
260
261
if args.git:
262
LOG.info("git add %s", path_tr)
263
util.git("add", "--", path_tr)
264
265
266
if __name__ == "__main__":
267
logging.basicConfig(
268
level=logging.DEBUG,
269
format="[%(levelname)s] %(message)s",
270
)
271
main()
272
273