Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/subprocess.py
12 views
1
__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
2
3
import subprocess
4
5
from . import events
6
from . import protocols
7
from . import streams
8
from . import tasks
9
from .log import logger
10
11
12
PIPE = subprocess.PIPE
13
STDOUT = subprocess.STDOUT
14
DEVNULL = subprocess.DEVNULL
15
16
17
class SubprocessStreamProtocol(streams.FlowControlMixin,
18
protocols.SubprocessProtocol):
19
"""Like StreamReaderProtocol, but for a subprocess."""
20
21
def __init__(self, limit, loop):
22
super().__init__(loop=loop)
23
self._limit = limit
24
self.stdin = self.stdout = self.stderr = None
25
self._transport = None
26
self._process_exited = False
27
self._pipe_fds = []
28
self._stdin_closed = self._loop.create_future()
29
30
def __repr__(self):
31
info = [self.__class__.__name__]
32
if self.stdin is not None:
33
info.append(f'stdin={self.stdin!r}')
34
if self.stdout is not None:
35
info.append(f'stdout={self.stdout!r}')
36
if self.stderr is not None:
37
info.append(f'stderr={self.stderr!r}')
38
return '<{}>'.format(' '.join(info))
39
40
def connection_made(self, transport):
41
self._transport = transport
42
43
stdout_transport = transport.get_pipe_transport(1)
44
if stdout_transport is not None:
45
self.stdout = streams.StreamReader(limit=self._limit,
46
loop=self._loop)
47
self.stdout.set_transport(stdout_transport)
48
self._pipe_fds.append(1)
49
50
stderr_transport = transport.get_pipe_transport(2)
51
if stderr_transport is not None:
52
self.stderr = streams.StreamReader(limit=self._limit,
53
loop=self._loop)
54
self.stderr.set_transport(stderr_transport)
55
self._pipe_fds.append(2)
56
57
stdin_transport = transport.get_pipe_transport(0)
58
if stdin_transport is not None:
59
self.stdin = streams.StreamWriter(stdin_transport,
60
protocol=self,
61
reader=None,
62
loop=self._loop)
63
64
def pipe_data_received(self, fd, data):
65
if fd == 1:
66
reader = self.stdout
67
elif fd == 2:
68
reader = self.stderr
69
else:
70
reader = None
71
if reader is not None:
72
reader.feed_data(data)
73
74
def pipe_connection_lost(self, fd, exc):
75
if fd == 0:
76
pipe = self.stdin
77
if pipe is not None:
78
pipe.close()
79
self.connection_lost(exc)
80
if exc is None:
81
self._stdin_closed.set_result(None)
82
else:
83
self._stdin_closed.set_exception(exc)
84
# Since calling `wait_closed()` is not mandatory,
85
# we shouldn't log the traceback if this is not awaited.
86
self._stdin_closed._log_traceback = False
87
return
88
if fd == 1:
89
reader = self.stdout
90
elif fd == 2:
91
reader = self.stderr
92
else:
93
reader = None
94
if reader is not None:
95
if exc is None:
96
reader.feed_eof()
97
else:
98
reader.set_exception(exc)
99
100
if fd in self._pipe_fds:
101
self._pipe_fds.remove(fd)
102
self._maybe_close_transport()
103
104
def process_exited(self):
105
self._process_exited = True
106
self._maybe_close_transport()
107
108
def _maybe_close_transport(self):
109
if len(self._pipe_fds) == 0 and self._process_exited:
110
self._transport.close()
111
self._transport = None
112
113
def _get_close_waiter(self, stream):
114
if stream is self.stdin:
115
return self._stdin_closed
116
117
118
class Process:
119
def __init__(self, transport, protocol, loop):
120
self._transport = transport
121
self._protocol = protocol
122
self._loop = loop
123
self.stdin = protocol.stdin
124
self.stdout = protocol.stdout
125
self.stderr = protocol.stderr
126
self.pid = transport.get_pid()
127
128
def __repr__(self):
129
return f'<{self.__class__.__name__} {self.pid}>'
130
131
@property
132
def returncode(self):
133
return self._transport.get_returncode()
134
135
async def wait(self):
136
"""Wait until the process exit and return the process return code."""
137
return await self._transport._wait()
138
139
def send_signal(self, signal):
140
self._transport.send_signal(signal)
141
142
def terminate(self):
143
self._transport.terminate()
144
145
def kill(self):
146
self._transport.kill()
147
148
async def _feed_stdin(self, input):
149
debug = self._loop.get_debug()
150
if input is not None:
151
self.stdin.write(input)
152
if debug:
153
logger.debug(
154
'%r communicate: feed stdin (%s bytes)', self, len(input))
155
try:
156
await self.stdin.drain()
157
except (BrokenPipeError, ConnectionResetError) as exc:
158
# communicate() ignores BrokenPipeError and ConnectionResetError
159
if debug:
160
logger.debug('%r communicate: stdin got %r', self, exc)
161
162
if debug:
163
logger.debug('%r communicate: close stdin', self)
164
self.stdin.close()
165
166
async def _noop(self):
167
return None
168
169
async def _read_stream(self, fd):
170
transport = self._transport.get_pipe_transport(fd)
171
if fd == 2:
172
stream = self.stderr
173
else:
174
assert fd == 1
175
stream = self.stdout
176
if self._loop.get_debug():
177
name = 'stdout' if fd == 1 else 'stderr'
178
logger.debug('%r communicate: read %s', self, name)
179
output = await stream.read()
180
if self._loop.get_debug():
181
name = 'stdout' if fd == 1 else 'stderr'
182
logger.debug('%r communicate: close %s', self, name)
183
transport.close()
184
return output
185
186
async def communicate(self, input=None):
187
if self.stdin is not None:
188
stdin = self._feed_stdin(input)
189
else:
190
stdin = self._noop()
191
if self.stdout is not None:
192
stdout = self._read_stream(1)
193
else:
194
stdout = self._noop()
195
if self.stderr is not None:
196
stderr = self._read_stream(2)
197
else:
198
stderr = self._noop()
199
stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr)
200
await self.wait()
201
return (stdout, stderr)
202
203
204
async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
205
limit=streams._DEFAULT_LIMIT, **kwds):
206
loop = events.get_running_loop()
207
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
208
loop=loop)
209
transport, protocol = await loop.subprocess_shell(
210
protocol_factory,
211
cmd, stdin=stdin, stdout=stdout,
212
stderr=stderr, **kwds)
213
return Process(transport, protocol, loop)
214
215
216
async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
217
stderr=None, limit=streams._DEFAULT_LIMIT,
218
**kwds):
219
loop = events.get_running_loop()
220
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
221
loop=loop)
222
transport, protocol = await loop.subprocess_exec(
223
protocol_factory,
224
program, *args,
225
stdin=stdin, stdout=stdout,
226
stderr=stderr, **kwds)
227
return Process(transport, protocol, loop)
228
229