Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/path.py
8778 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2021-2025 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
"""Filesystem path handling"""
10
11
import os
12
import shutil
13
import functools
14
from . import util, formatter, exception
15
16
WINDOWS = util.WINDOWS
17
EXTENSION_MAP = {
18
"jpeg": "jpg",
19
"jpe" : "jpg",
20
"jfif": "jpg",
21
"jif" : "jpg",
22
"jfi" : "jpg",
23
}
24
25
26
class PathFormat():
27
28
def __init__(self, extractor):
29
config = extractor.config
30
kwdefault = config("keywords-default")
31
if kwdefault is None:
32
kwdefault = util.NONE
33
34
self.filename_conditions = self.directory_conditions = None
35
36
filename_fmt = config("filename")
37
try:
38
if filename_fmt is None:
39
filename_fmt = extractor.filename_fmt
40
elif isinstance(filename_fmt, dict):
41
self.filename_conditions = [
42
(util.compile_filter(expr),
43
formatter.parse(fmt, kwdefault).format_map)
44
for expr, fmt in filename_fmt.items() if expr
45
]
46
filename_fmt = filename_fmt.get("", extractor.filename_fmt)
47
48
self.filename_formatter = formatter.parse(
49
filename_fmt, kwdefault).format_map
50
except Exception as exc:
51
raise exception.FilenameFormatError(exc)
52
53
directory_fmt = config("directory")
54
try:
55
if directory_fmt is None:
56
directory_fmt = extractor.directory_fmt
57
elif isinstance(directory_fmt, dict):
58
self.directory_conditions = [
59
(util.compile_filter(expr), [
60
formatter.parse(fmt, kwdefault).format_map
61
for fmt in fmts
62
])
63
for expr, fmts in directory_fmt.items() if expr
64
]
65
directory_fmt = directory_fmt.get("", extractor.directory_fmt)
66
67
self.directory_formatters = [
68
formatter.parse(dirfmt, kwdefault).format_map
69
for dirfmt in directory_fmt
70
]
71
except Exception as exc:
72
raise exception.DirectoryFormatError(exc)
73
74
self.kwdict = {}
75
self.delete = False
76
self.prefix = ""
77
self.filename = ""
78
self.extension = ""
79
self.directory = ""
80
self.realdirectory = ""
81
self.path = ""
82
self.realpath = ""
83
self.temppath = ""
84
85
extension_map = config("extension-map")
86
if extension_map is None:
87
extension_map = EXTENSION_MAP
88
self.extension_map = extension_map.get
89
90
restrict = config("path-restrict", "auto")
91
replace = config("path-replace", "_")
92
conv = config("path-convert")
93
if restrict == "auto":
94
restrict = "\\\\|/<>:\"?*" if WINDOWS else "/"
95
elif restrict == "unix":
96
restrict = "/"
97
elif restrict == "windows":
98
restrict = "\\\\|/<>:\"?*"
99
elif restrict == "ascii":
100
restrict = "^0-9A-Za-z_."
101
elif restrict == "ascii+":
102
restrict = "^0-9@-[\\]-{ #-)+-.;=!}~"
103
self.clean_segment = _build_cleanfunc(restrict, replace, conv)
104
105
remove = config("path-remove", "\x00-\x1f\x7f")
106
self.clean_path = _build_cleanfunc(remove, "")
107
108
strip = config("path-strip", "auto")
109
if strip == "auto":
110
strip = ". " if WINDOWS else ""
111
elif strip == "unix":
112
strip = ""
113
elif strip == "windows":
114
strip = ". "
115
self.strip = strip
116
117
if WINDOWS:
118
self.extended = config("path-extended", True)
119
120
self.basedirectory_conditions = None
121
basedir = extractor._parentdir
122
if not basedir:
123
basedir = config("base-directory")
124
if basedir is None:
125
basedir = self.clean_path(f".{os.sep}gallery-dl{os.sep}")
126
elif basedir:
127
if isinstance(basedir, dict):
128
self.basedirectory_conditions = conds = []
129
for expr, bdir in basedir.items():
130
if not expr:
131
basedir = bdir
132
continue
133
conds.append((util.compile_filter(expr),
134
self._prepare_basedirectory(bdir)))
135
basedir = self._prepare_basedirectory(basedir)
136
self.basedirectory = basedir
137
138
def _prepare_basedirectory(self, basedir):
139
basedir = util.expand_path(basedir)
140
if os.altsep and os.altsep in basedir:
141
basedir = basedir.replace(os.altsep, os.sep)
142
if basedir[-1] != os.sep:
143
basedir += os.sep
144
return self.clean_path(basedir)
145
146
def __str__(self):
147
return self.realpath
148
149
def open(self, mode="wb"):
150
"""Open file and return a corresponding file object"""
151
try:
152
return open(self.temppath, mode)
153
except FileNotFoundError:
154
if "r" in mode:
155
# '.part' file no longer exists
156
return util.NullContext()
157
os.makedirs(self.realdirectory)
158
return open(self.temppath, mode)
159
160
def exists(self):
161
"""Return True if the file exists on disk"""
162
if self.extension:
163
try:
164
os.lstat(self.realpath) # raises OSError if file doesn't exist
165
return self.check_file()
166
except OSError:
167
pass
168
return False
169
170
def check_file(self):
171
return True
172
173
def _enum_file(self):
174
num = 1
175
try:
176
while True:
177
prefix = format(num) + "."
178
self.kwdict["extension"] = prefix + self.extension
179
self.build_path()
180
os.lstat(self.realpath) # raises OSError if file doesn't exist
181
num += 1
182
except OSError:
183
pass
184
self.prefix = prefix
185
return False
186
187
def set_directory(self, kwdict):
188
"""Build directory path and create it if necessary"""
189
self.kwdict = kwdict
190
191
if self.basedirectory_conditions is None:
192
basedir = self.basedirectory
193
else:
194
for condition, basedir in self.basedirectory_conditions:
195
if condition(kwdict):
196
break
197
else:
198
basedir = self.basedirectory
199
200
if segments := self.build_directory(kwdict):
201
self.directory = directory = \
202
f"{basedir}{self.clean_path(os.sep.join(segments))}{os.sep}"
203
else:
204
self.directory = directory = basedir
205
206
if WINDOWS and self.extended:
207
directory = self._extended_path(directory)
208
self.realdirectory = directory
209
210
def _extended_path(self, path):
211
# Enable longer-than-260-character paths
212
path = os.path.abspath(path)
213
if not path.startswith("\\\\"):
214
path = "\\\\?\\" + path
215
elif not path.startswith("\\\\?\\"):
216
path = "\\\\?\\UNC\\" + path[2:]
217
218
# abspath() in Python 3.7+ removes trailing path separators (#402)
219
if path[-1] != os.sep:
220
return path + os.sep
221
return path
222
223
def set_filename(self, kwdict):
224
"""Set general filename data"""
225
self.kwdict = kwdict
226
self.filename = self.temppath = self.prefix = ""
227
228
ext = kwdict["extension"]
229
kwdict["extension"] = self.extension = self.extension_map(ext, ext)
230
231
def set_extension(self, extension, real=True):
232
"""Set filename extension"""
233
self.extension = extension = self.extension_map(extension, extension)
234
self.kwdict["extension"] = self.prefix + extension
235
236
def fix_extension(self, _=None):
237
"""Fix filenames without a given filename extension"""
238
try:
239
if not self.extension:
240
self.kwdict["extension"] = \
241
self.prefix + self.extension_map("", "")
242
self.build_path()
243
if self.path[-1] == ".":
244
self.path = self.path[:-1]
245
self.temppath = self.realpath = self.realpath[:-1]
246
elif not self.temppath:
247
self.build_path()
248
except exception.GalleryDLException:
249
raise
250
except Exception:
251
self.path = self.directory + "?"
252
self.realpath = self.temppath = self.realdirectory + "?"
253
return True
254
255
def build_filename(self, kwdict):
256
"""Apply 'kwdict' to filename format string"""
257
try:
258
if self.filename_conditions is None:
259
fmt = self.filename_formatter
260
else:
261
for condition, fmt in self.filename_conditions:
262
if condition(kwdict):
263
break
264
else:
265
fmt = self.filename_formatter
266
return self.clean_path(self.clean_segment(fmt(kwdict)))
267
except Exception as exc:
268
raise exception.FilenameFormatError(exc)
269
270
def build_directory(self, kwdict):
271
"""Apply 'kwdict' to directory format strings"""
272
try:
273
if self.directory_conditions is None:
274
formatters = self.directory_formatters
275
else:
276
for condition, formatters in self.directory_conditions:
277
if condition(kwdict):
278
break
279
else:
280
formatters = self.directory_formatters
281
282
segments = []
283
strip = self.strip
284
for fmt in formatters:
285
segment = fmt(kwdict)
286
if segment.__class__ is str:
287
segment = segment.strip()
288
if strip and segment not in {".", ".."}:
289
segment = segment.rstrip(strip)
290
if segment:
291
segments.append(self.clean_segment(segment))
292
else: # assume list
293
for segment in segment:
294
segment = segment.strip()
295
if strip and segment not in {".", ".."}:
296
segment = segment.rstrip(strip)
297
if segment:
298
segments.append(self.clean_segment(segment))
299
return segments
300
except Exception as exc:
301
raise exception.DirectoryFormatError(exc)
302
303
def build_path(self):
304
"""Combine directory and filename to full paths"""
305
self.filename = filename = self.build_filename(self.kwdict)
306
self.path = self.directory + filename
307
self.realpath = self.realdirectory + filename
308
if not self.temppath:
309
self.temppath = self.realpath
310
311
def part_enable(self, part_directory=None):
312
"""Enable .part file usage"""
313
if self.extension:
314
self.temppath += ".part"
315
else:
316
self.kwdict["extension"] = self.prefix + self.extension_map(
317
"part", "part")
318
self.build_path()
319
320
if part_directory is not None:
321
if isinstance(part_directory, list):
322
for condition, part_directory in part_directory:
323
if condition(self.kwdict):
324
break
325
else:
326
return
327
328
self.temppath = os.path.join(
329
part_directory,
330
os.path.basename(self.temppath),
331
)
332
333
def part_size(self):
334
"""Return size of .part file"""
335
try:
336
return os.stat(self.temppath).st_size
337
except OSError:
338
pass
339
return 0
340
341
def set_mtime(self, path=None):
342
if (mtime := (self.kwdict.get("_mtime_meta") or
343
self.kwdict.get("_mtime_http"))):
344
util.set_mtime(self.realpath if path is None else path, mtime)
345
346
def finalize(self):
347
"""Move tempfile to its target location"""
348
if self.delete:
349
self.delete = False
350
os.unlink(self.temppath)
351
return
352
353
if self.temppath != self.realpath:
354
# Move temp file to its actual location
355
while True:
356
try:
357
os.replace(self.temppath, self.realpath)
358
except FileNotFoundError:
359
try:
360
# delayed directory creation
361
os.makedirs(self.realdirectory)
362
except FileExistsError:
363
# file at self.temppath does not exist
364
return False
365
continue
366
except OSError:
367
# move across different filesystems
368
try:
369
shutil.copyfile(self.temppath, self.realpath)
370
except FileNotFoundError:
371
try:
372
os.makedirs(self.realdirectory)
373
except FileExistsError:
374
return False
375
shutil.copyfile(self.temppath, self.realpath)
376
os.unlink(self.temppath)
377
break
378
379
self.set_mtime()
380
381
382
def _build_convertfunc(func, conv):
383
if len(conv) <= 1:
384
conv = formatter._CONVERSIONS[conv]
385
return lambda x: conv(func(x))
386
387
def convert_many(x):
388
x = func(x)
389
for conv in convs:
390
x = conv(x)
391
return x
392
convs = [formatter._CONVERSIONS[c] for c in conv]
393
return convert_many
394
395
396
def _build_cleanfunc(chars, repl, conv=None):
397
if not chars:
398
func = util.identity
399
elif isinstance(chars, dict):
400
if 0 not in chars:
401
chars = _process_repl_dict(chars)
402
chars[0] = None
403
404
def func(x):
405
return x.translate(table)
406
table = str.maketrans(chars)
407
elif len(chars) == 1:
408
def func(x):
409
return x.replace(chars, repl)
410
else:
411
func = functools.partial(util.re(f"[{chars}]").sub, repl)
412
return _build_convertfunc(func, conv) if conv else func
413
414
415
def _process_repl_dict(chars):
416
# can't modify 'chars' while *directly* iterating over its keys
417
for char in [c for c in chars if len(c) > 1]:
418
if len(char) == 3 and char[1] == "-":
419
citer = range(ord(char[0]), ord(char[2])+1)
420
else:
421
citer = char
422
423
repl = chars.pop(char)
424
for c in citer:
425
chars[c] = repl
426
427
return chars
428
429