Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/postprocessor/exec.py
8764 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2018-2023 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
"""Execute processes"""
10
11
from .common import PostProcessor
12
from .. import util, formatter
13
import subprocess
14
import os
15
16
17
if util.WINDOWS:
18
def quote(s):
19
s = s.replace('"', '\\"')
20
return f'"{s}"'
21
else:
22
from shlex import quote
23
24
25
def trim(args):
26
return (args.partition(" ") if isinstance(args, str) else args)[0]
27
28
29
class ExecPP(PostProcessor):
30
31
def __init__(self, job, options):
32
PostProcessor.__init__(self, job)
33
34
if cmds := options.get("commands"):
35
self.cmds = [self._prepare_cmd(c) for c in cmds]
36
execute = self.exec_many
37
else:
38
execute, self.args = self._prepare_cmd(options["command"])
39
if options.get("async", False):
40
self._exec = self._popen
41
42
self.verbose = options.get("verbose", True)
43
self.session = False
44
self.creationflags = 0
45
if options.get("session"):
46
if util.WINDOWS:
47
self.creationflags = subprocess.CREATE_NEW_PROCESS_GROUP
48
else:
49
self.session = True
50
51
events = options.get("event")
52
if events is None:
53
events = ("after",)
54
elif isinstance(events, str):
55
events = events.split(",")
56
job.register_hooks({event: execute for event in events}, options)
57
58
if self._archive_init(job, options):
59
self._archive_register(job)
60
61
def _prepare_cmd(self, cmd):
62
if isinstance(cmd, str):
63
self._sub = util.re(
64
r"\{(_directory|_filename|_(?:temp)?path|)\}").sub
65
return self.exec_string, cmd
66
else:
67
return self.exec_list, [formatter.parse(arg) for arg in cmd]
68
69
def exec_list(self, pathfmt):
70
archive = self.archive
71
kwdict = pathfmt.kwdict
72
73
if archive and archive.check(kwdict):
74
return
75
76
kwdict["_directory"] = pathfmt.realdirectory
77
kwdict["_filename"] = pathfmt.filename
78
kwdict["_temppath"] = pathfmt.temppath
79
kwdict["_path"] = pathfmt.realpath
80
81
args = [arg.format_map(kwdict) for arg in self.args]
82
args[0] = os.path.expanduser(args[0])
83
retcode = self._exec(args, False)
84
85
if archive:
86
archive.add(kwdict)
87
return retcode
88
89
def exec_string(self, pathfmt):
90
archive = self.archive
91
if archive and archive.check(pathfmt.kwdict):
92
return
93
94
self.pathfmt = pathfmt
95
args = self._sub(self._replace, self.args)
96
retcode = self._exec(args, True)
97
98
if archive:
99
archive.add(pathfmt.kwdict)
100
return retcode
101
102
def exec_many(self, pathfmt):
103
if archive := self.archive:
104
if archive.check(pathfmt.kwdict):
105
return
106
self.archive = False
107
108
retcode = 0
109
for execute, args in self.cmds:
110
self.args = args
111
if retcode := execute(pathfmt):
112
# non-zero exit status
113
break
114
115
if archive:
116
self.archive = archive
117
archive.add(pathfmt.kwdict)
118
return retcode
119
120
def _exec(self, args, shell):
121
if retcode := self._popen(args, shell).wait():
122
self.log.warning("'%s' returned with non-zero exit status (%d)",
123
args if self.verbose else trim(args), retcode)
124
return retcode
125
126
def _popen(self, args, shell):
127
self.log.debug("Running '%s'", args if self.verbose else trim(args))
128
return util.Popen(
129
args,
130
shell=shell,
131
creationflags=self.creationflags,
132
start_new_session=self.session,
133
)
134
135
def _replace(self, match):
136
name = match[1]
137
if name == "_directory":
138
return quote(self.pathfmt.realdirectory)
139
if name == "_filename":
140
return quote(self.pathfmt.filename)
141
if name == "_temppath":
142
return quote(self.pathfmt.temppath)
143
return quote(self.pathfmt.realpath)
144
145
146
__postprocessor__ = ExecPP
147
148