Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/path.py
5457 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2021-2025 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
"""Filesystem path handling"""
10
11
import os
12
import shutil
13
import functools
14
from . import util, formatter, exception
15
16
WINDOWS = util.WINDOWS
17
EXTENSION_MAP = {
18
"jpeg": "jpg",
19
"jpe" : "jpg",
20
"jfif": "jpg",
21
"jif" : "jpg",
22
"jfi" : "jpg",
23
}
24
25
26
class PathFormat():
27
28
def __init__(self, extractor):
29
config = extractor.config
30
kwdefault = config("keywords-default")
31
if kwdefault is None:
32
kwdefault = util.NONE
33
34
filename_fmt = config("filename")
35
try:
36
if filename_fmt is None:
37
filename_fmt = extractor.filename_fmt
38
elif isinstance(filename_fmt, dict):
39
self.filename_conditions = [
40
(util.compile_filter(expr),
41
formatter.parse(fmt, kwdefault).format_map)
42
for expr, fmt in filename_fmt.items() if expr
43
]
44
self.build_filename = self.build_filename_conditional
45
filename_fmt = filename_fmt.get("", extractor.filename_fmt)
46
47
self.filename_formatter = formatter.parse(
48
filename_fmt, kwdefault).format_map
49
except Exception as exc:
50
raise exception.FilenameFormatError(exc)
51
52
directory_fmt = config("directory")
53
self.directory_conditions = ()
54
try:
55
if directory_fmt is None:
56
directory_fmt = extractor.directory_fmt
57
elif isinstance(directory_fmt, dict):
58
self.directory_conditions = [
59
(util.compile_filter(expr), [
60
formatter.parse(fmt, kwdefault).format_map
61
for fmt in fmts
62
])
63
for expr, fmts in directory_fmt.items() if expr
64
]
65
self.build_directory = self.build_directory_conditional
66
directory_fmt = directory_fmt.get("", extractor.directory_fmt)
67
68
self.directory_formatters = [
69
formatter.parse(dirfmt, kwdefault).format_map
70
for dirfmt in directory_fmt
71
]
72
except Exception as exc:
73
raise exception.DirectoryFormatError(exc)
74
75
self.kwdict = {}
76
self.delete = False
77
self.prefix = ""
78
self.filename = ""
79
self.extension = ""
80
self.directory = ""
81
self.realdirectory = ""
82
self.path = ""
83
self.realpath = ""
84
self.temppath = ""
85
86
extension_map = config("extension-map")
87
if extension_map is None:
88
extension_map = EXTENSION_MAP
89
self.extension_map = extension_map.get
90
91
restrict = config("path-restrict", "auto")
92
replace = config("path-replace", "_")
93
conv = config("path-convert")
94
if restrict == "auto":
95
restrict = "\\\\|/<>:\"?*" if WINDOWS else "/"
96
elif restrict == "unix":
97
restrict = "/"
98
elif restrict == "windows":
99
restrict = "\\\\|/<>:\"?*"
100
elif restrict == "ascii":
101
restrict = "^0-9A-Za-z_."
102
elif restrict == "ascii+":
103
restrict = "^0-9@-[\\]-{ #-)+-.;=!}~"
104
self.clean_segment = _build_cleanfunc(restrict, replace, conv)
105
106
remove = config("path-remove", "\x00-\x1f\x7f")
107
self.clean_path = _build_cleanfunc(remove, "")
108
109
strip = config("path-strip", "auto")
110
if strip == "auto":
111
strip = ". " if WINDOWS else ""
112
elif strip == "unix":
113
strip = ""
114
elif strip == "windows":
115
strip = ". "
116
self.strip = strip
117
118
if WINDOWS:
119
self.extended = config("path-extended", True)
120
121
basedir = extractor._parentdir
122
if not basedir:
123
basedir = config("base-directory")
124
sep = os.sep
125
if basedir is None:
126
basedir = f".{sep}gallery-dl{sep}"
127
elif basedir:
128
basedir = util.expand_path(basedir)
129
altsep = os.altsep
130
if altsep and altsep in basedir:
131
basedir = basedir.replace(altsep, sep)
132
if basedir[-1] != sep:
133
basedir += sep
134
basedir = self.clean_path(basedir)
135
self.basedirectory = basedir
136
137
def __str__(self):
138
return self.realpath
139
140
def open(self, mode="wb"):
141
"""Open file and return a corresponding file object"""
142
try:
143
return open(self.temppath, mode)
144
except FileNotFoundError:
145
if "r" in mode:
146
# '.part' file no longer exists
147
return util.NullContext()
148
os.makedirs(self.realdirectory)
149
return open(self.temppath, mode)
150
151
def exists(self):
152
"""Return True if the file exists on disk"""
153
if self.extension and os.path.exists(self.realpath):
154
return self.check_file()
155
return False
156
157
def check_file(self):
158
return True
159
160
def _enum_file(self):
161
num = 1
162
try:
163
while True:
164
prefix = format(num) + "."
165
self.kwdict["extension"] = prefix + self.extension
166
self.build_path()
167
os.stat(self.realpath) # raises OSError if file doesn't exist
168
num += 1
169
except OSError:
170
pass
171
self.prefix = prefix
172
return False
173
174
def set_directory(self, kwdict):
175
"""Build directory path and create it if necessary"""
176
self.kwdict = kwdict
177
178
if segments := self.build_directory(kwdict):
179
self.directory = directory = self.basedirectory + self.clean_path(
180
os.sep.join(segments) + os.sep)
181
else:
182
self.directory = directory = self.basedirectory
183
184
if WINDOWS and self.extended:
185
directory = self._extended_path(directory)
186
self.realdirectory = directory
187
188
def _extended_path(self, path):
189
# Enable longer-than-260-character paths
190
path = os.path.abspath(path)
191
if not path.startswith("\\\\"):
192
path = "\\\\?\\" + path
193
elif not path.startswith("\\\\?\\"):
194
path = "\\\\?\\UNC\\" + path[2:]
195
196
# abspath() in Python 3.7+ removes trailing path separators (#402)
197
if path[-1] != os.sep:
198
return path + os.sep
199
return path
200
201
def set_filename(self, kwdict):
202
"""Set general filename data"""
203
self.kwdict = kwdict
204
self.filename = self.temppath = self.prefix = ""
205
206
ext = kwdict["extension"]
207
kwdict["extension"] = self.extension = self.extension_map(ext, ext)
208
209
def set_extension(self, extension, real=True):
210
"""Set filename extension"""
211
self.extension = extension = self.extension_map(extension, extension)
212
self.kwdict["extension"] = self.prefix + extension
213
214
def fix_extension(self, _=None):
215
"""Fix filenames without a given filename extension"""
216
try:
217
if not self.extension:
218
self.kwdict["extension"] = \
219
self.prefix + self.extension_map("", "")
220
self.build_path()
221
if self.path[-1] == ".":
222
self.path = self.path[:-1]
223
self.temppath = self.realpath = self.realpath[:-1]
224
elif not self.temppath:
225
self.build_path()
226
except exception.GalleryDLException:
227
raise
228
except Exception:
229
self.path = self.directory + "?"
230
self.realpath = self.temppath = self.realdirectory + "?"
231
return True
232
233
def build_filename(self, kwdict):
234
"""Apply 'kwdict' to filename format string"""
235
try:
236
return self.clean_path(self.clean_segment(
237
self.filename_formatter(kwdict)))
238
except Exception as exc:
239
raise exception.FilenameFormatError(exc)
240
241
def build_filename_conditional(self, kwdict):
242
try:
243
for condition, fmt in self.filename_conditions:
244
if condition(kwdict):
245
break
246
else:
247
fmt = self.filename_formatter
248
return self.clean_path(self.clean_segment(fmt(kwdict)))
249
except Exception as exc:
250
raise exception.FilenameFormatError(exc)
251
252
def build_directory(self, kwdict):
253
"""Apply 'kwdict' to directory format strings"""
254
segments = []
255
strip = self.strip
256
257
try:
258
for fmt in self.directory_formatters:
259
segment = fmt(kwdict).strip()
260
if strip and segment not in {".", ".."}:
261
# remove trailing dots and spaces (#647)
262
segment = segment.rstrip(strip)
263
if segment:
264
segments.append(self.clean_segment(segment))
265
return segments
266
except Exception as exc:
267
raise exception.DirectoryFormatError(exc)
268
269
def build_directory_conditional(self, kwdict):
270
segments = []
271
strip = self.strip
272
273
try:
274
for condition, formatters in self.directory_conditions:
275
if condition(kwdict):
276
break
277
else:
278
formatters = self.directory_formatters
279
for fmt in formatters:
280
segment = fmt(kwdict).strip()
281
if strip and segment != "..":
282
segment = segment.rstrip(strip)
283
if segment:
284
segments.append(self.clean_segment(segment))
285
return segments
286
except Exception as exc:
287
raise exception.DirectoryFormatError(exc)
288
289
def build_path(self):
290
"""Combine directory and filename to full paths"""
291
self.filename = filename = self.build_filename(self.kwdict)
292
self.path = self.directory + filename
293
self.realpath = self.realdirectory + filename
294
if not self.temppath:
295
self.temppath = self.realpath
296
297
def part_enable(self, part_directory=None):
298
"""Enable .part file usage"""
299
if self.extension:
300
self.temppath += ".part"
301
else:
302
self.kwdict["extension"] = self.prefix + self.extension_map(
303
"part", "part")
304
self.build_path()
305
if part_directory:
306
self.temppath = os.path.join(
307
part_directory,
308
os.path.basename(self.temppath),
309
)
310
311
def part_size(self):
312
"""Return size of .part file"""
313
try:
314
return os.stat(self.temppath).st_size
315
except OSError:
316
pass
317
return 0
318
319
def set_mtime(self, path=None):
320
if (mtime := (self.kwdict.get("_mtime_meta") or
321
self.kwdict.get("_mtime_http"))):
322
util.set_mtime(self.realpath if path is None else path, mtime)
323
324
def finalize(self):
325
"""Move tempfile to its target location"""
326
if self.delete:
327
self.delete = False
328
os.unlink(self.temppath)
329
return
330
331
if self.temppath != self.realpath:
332
# Move temp file to its actual location
333
while True:
334
try:
335
os.replace(self.temppath, self.realpath)
336
except FileNotFoundError:
337
try:
338
# delayed directory creation
339
os.makedirs(self.realdirectory)
340
except FileExistsError:
341
# file at self.temppath does not exist
342
return False
343
continue
344
except OSError:
345
# move across different filesystems
346
try:
347
shutil.copyfile(self.temppath, self.realpath)
348
except FileNotFoundError:
349
try:
350
os.makedirs(self.realdirectory)
351
except FileExistsError:
352
return False
353
shutil.copyfile(self.temppath, self.realpath)
354
os.unlink(self.temppath)
355
break
356
357
self.set_mtime()
358
359
360
def _build_convertfunc(func, conv):
361
if len(conv) <= 1:
362
conv = formatter._CONVERSIONS[conv]
363
return lambda x: conv(func(x))
364
365
def convert_many(x):
366
x = func(x)
367
for conv in convs:
368
x = conv(x)
369
return x
370
convs = [formatter._CONVERSIONS[c] for c in conv]
371
return convert_many
372
373
374
def _build_cleanfunc(chars, repl, conv=None):
375
if not chars:
376
func = util.identity
377
elif isinstance(chars, dict):
378
if 0 not in chars:
379
chars = _process_repl_dict(chars)
380
chars[0] = None
381
382
def func(x):
383
return x.translate(table)
384
table = str.maketrans(chars)
385
elif len(chars) == 1:
386
def func(x):
387
return x.replace(chars, repl)
388
else:
389
func = functools.partial(util.re(f"[{chars}]").sub, repl)
390
return _build_convertfunc(func, conv) if conv else func
391
392
393
def _process_repl_dict(chars):
394
# can't modify 'chars' while *directly* iterating over its keys
395
for char in [c for c in chars if len(c) > 1]:
396
if len(char) == 3 and char[1] == "-":
397
citer = range(ord(char[0]), ord(char[2])+1)
398
else:
399
citer = char
400
401
repl = chars.pop(char)
402
for c in citer:
403
chars[c] = repl
404
405
return chars
406
407