Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/base_subprocess.py
12 views
1
import collections
2
import subprocess
3
import warnings
4
5
from . import protocols
6
from . import transports
7
from .log import logger
8
9
10
class BaseSubprocessTransport(transports.SubprocessTransport):
11
12
def __init__(self, loop, protocol, args, shell,
13
stdin, stdout, stderr, bufsize,
14
waiter=None, extra=None, **kwargs):
15
super().__init__(extra)
16
self._closed = False
17
self._protocol = protocol
18
self._loop = loop
19
self._proc = None
20
self._pid = None
21
self._returncode = None
22
self._exit_waiters = []
23
self._pending_calls = collections.deque()
24
self._pipes = {}
25
self._finished = False
26
27
if stdin == subprocess.PIPE:
28
self._pipes[0] = None
29
if stdout == subprocess.PIPE:
30
self._pipes[1] = None
31
if stderr == subprocess.PIPE:
32
self._pipes[2] = None
33
34
# Create the child process: set the _proc attribute
35
try:
36
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
37
stderr=stderr, bufsize=bufsize, **kwargs)
38
except:
39
self.close()
40
raise
41
42
self._pid = self._proc.pid
43
self._extra['subprocess'] = self._proc
44
45
if self._loop.get_debug():
46
if isinstance(args, (bytes, str)):
47
program = args
48
else:
49
program = args[0]
50
logger.debug('process %r created: pid %s',
51
program, self._pid)
52
53
self._loop.create_task(self._connect_pipes(waiter))
54
55
def __repr__(self):
56
info = [self.__class__.__name__]
57
if self._closed:
58
info.append('closed')
59
if self._pid is not None:
60
info.append(f'pid={self._pid}')
61
if self._returncode is not None:
62
info.append(f'returncode={self._returncode}')
63
elif self._pid is not None:
64
info.append('running')
65
else:
66
info.append('not started')
67
68
stdin = self._pipes.get(0)
69
if stdin is not None:
70
info.append(f'stdin={stdin.pipe}')
71
72
stdout = self._pipes.get(1)
73
stderr = self._pipes.get(2)
74
if stdout is not None and stderr is stdout:
75
info.append(f'stdout=stderr={stdout.pipe}')
76
else:
77
if stdout is not None:
78
info.append(f'stdout={stdout.pipe}')
79
if stderr is not None:
80
info.append(f'stderr={stderr.pipe}')
81
82
return '<{}>'.format(' '.join(info))
83
84
def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
85
raise NotImplementedError
86
87
def set_protocol(self, protocol):
88
self._protocol = protocol
89
90
def get_protocol(self):
91
return self._protocol
92
93
def is_closing(self):
94
return self._closed
95
96
def close(self):
97
if self._closed:
98
return
99
self._closed = True
100
101
for proto in self._pipes.values():
102
if proto is None:
103
continue
104
proto.pipe.close()
105
106
if (self._proc is not None and
107
# has the child process finished?
108
self._returncode is None and
109
# the child process has finished, but the
110
# transport hasn't been notified yet?
111
self._proc.poll() is None):
112
113
if self._loop.get_debug():
114
logger.warning('Close running child process: kill %r', self)
115
116
try:
117
self._proc.kill()
118
except ProcessLookupError:
119
pass
120
121
# Don't clear the _proc reference yet: _post_init() may still run
122
123
def __del__(self, _warn=warnings.warn):
124
if not self._closed:
125
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
126
self.close()
127
128
def get_pid(self):
129
return self._pid
130
131
def get_returncode(self):
132
return self._returncode
133
134
def get_pipe_transport(self, fd):
135
if fd in self._pipes:
136
return self._pipes[fd].pipe
137
else:
138
return None
139
140
def _check_proc(self):
141
if self._proc is None:
142
raise ProcessLookupError()
143
144
def send_signal(self, signal):
145
self._check_proc()
146
self._proc.send_signal(signal)
147
148
def terminate(self):
149
self._check_proc()
150
self._proc.terminate()
151
152
def kill(self):
153
self._check_proc()
154
self._proc.kill()
155
156
async def _connect_pipes(self, waiter):
157
try:
158
proc = self._proc
159
loop = self._loop
160
161
if proc.stdin is not None:
162
_, pipe = await loop.connect_write_pipe(
163
lambda: WriteSubprocessPipeProto(self, 0),
164
proc.stdin)
165
self._pipes[0] = pipe
166
167
if proc.stdout is not None:
168
_, pipe = await loop.connect_read_pipe(
169
lambda: ReadSubprocessPipeProto(self, 1),
170
proc.stdout)
171
self._pipes[1] = pipe
172
173
if proc.stderr is not None:
174
_, pipe = await loop.connect_read_pipe(
175
lambda: ReadSubprocessPipeProto(self, 2),
176
proc.stderr)
177
self._pipes[2] = pipe
178
179
assert self._pending_calls is not None
180
181
loop.call_soon(self._protocol.connection_made, self)
182
for callback, data in self._pending_calls:
183
loop.call_soon(callback, *data)
184
self._pending_calls = None
185
except (SystemExit, KeyboardInterrupt):
186
raise
187
except BaseException as exc:
188
if waiter is not None and not waiter.cancelled():
189
waiter.set_exception(exc)
190
else:
191
if waiter is not None and not waiter.cancelled():
192
waiter.set_result(None)
193
194
def _call(self, cb, *data):
195
if self._pending_calls is not None:
196
self._pending_calls.append((cb, data))
197
else:
198
self._loop.call_soon(cb, *data)
199
200
def _pipe_connection_lost(self, fd, exc):
201
self._call(self._protocol.pipe_connection_lost, fd, exc)
202
self._try_finish()
203
204
def _pipe_data_received(self, fd, data):
205
self._call(self._protocol.pipe_data_received, fd, data)
206
207
def _process_exited(self, returncode):
208
assert returncode is not None, returncode
209
assert self._returncode is None, self._returncode
210
if self._loop.get_debug():
211
logger.info('%r exited with return code %r', self, returncode)
212
self._returncode = returncode
213
if self._proc.returncode is None:
214
# asyncio uses a child watcher: copy the status into the Popen
215
# object. On Python 3.6, it is required to avoid a ResourceWarning.
216
self._proc.returncode = returncode
217
self._call(self._protocol.process_exited)
218
219
self._try_finish()
220
221
async def _wait(self):
222
"""Wait until the process exit and return the process return code.
223
224
This method is a coroutine."""
225
if self._returncode is not None:
226
return self._returncode
227
228
waiter = self._loop.create_future()
229
self._exit_waiters.append(waiter)
230
return await waiter
231
232
def _try_finish(self):
233
assert not self._finished
234
if self._returncode is None:
235
return
236
if all(p is not None and p.disconnected
237
for p in self._pipes.values()):
238
self._finished = True
239
self._call(self._call_connection_lost, None)
240
241
def _call_connection_lost(self, exc):
242
try:
243
self._protocol.connection_lost(exc)
244
finally:
245
# wake up futures waiting for wait()
246
for waiter in self._exit_waiters:
247
if not waiter.cancelled():
248
waiter.set_result(self._returncode)
249
self._exit_waiters = None
250
self._loop = None
251
self._proc = None
252
self._protocol = None
253
254
255
class WriteSubprocessPipeProto(protocols.BaseProtocol):
256
257
def __init__(self, proc, fd):
258
self.proc = proc
259
self.fd = fd
260
self.pipe = None
261
self.disconnected = False
262
263
def connection_made(self, transport):
264
self.pipe = transport
265
266
def __repr__(self):
267
return f'<{self.__class__.__name__} fd={self.fd} pipe={self.pipe!r}>'
268
269
def connection_lost(self, exc):
270
self.disconnected = True
271
self.proc._pipe_connection_lost(self.fd, exc)
272
self.proc = None
273
274
def pause_writing(self):
275
self.proc._protocol.pause_writing()
276
277
def resume_writing(self):
278
self.proc._protocol.resume_writing()
279
280
281
class ReadSubprocessPipeProto(WriteSubprocessPipeProto,
282
protocols.Protocol):
283
284
def data_received(self, data):
285
self.proc._pipe_data_received(self.fd, data)
286
287