Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/unix_events.py
12 views
1
"""Selector event loop for Unix with signal handling."""
2
3
import errno
4
import io
5
import itertools
6
import os
7
import selectors
8
import signal
9
import socket
10
import stat
11
import subprocess
12
import sys
13
import threading
14
import warnings
15
16
from . import base_events
17
from . import base_subprocess
18
from . import constants
19
from . import coroutines
20
from . import events
21
from . import exceptions
22
from . import futures
23
from . import selector_events
24
from . import tasks
25
from . import transports
26
from .log import logger
27
28
29
__all__ = (
30
'SelectorEventLoop',
31
'AbstractChildWatcher', 'SafeChildWatcher',
32
'FastChildWatcher', 'PidfdChildWatcher',
33
'MultiLoopChildWatcher', 'ThreadedChildWatcher',
34
'DefaultEventLoopPolicy',
35
)
36
37
38
if sys.platform == 'win32': # pragma: no cover
39
raise ImportError('Signals are not really supported on Windows')
40
41
42
def _sighandler_noop(signum, frame):
43
"""Dummy signal handler."""
44
pass
45
46
47
def waitstatus_to_exitcode(status):
48
try:
49
return os.waitstatus_to_exitcode(status)
50
except ValueError:
51
# The child exited, but we don't understand its status.
52
# This shouldn't happen, but if it does, let's just
53
# return that status; perhaps that helps debug it.
54
return status
55
56
57
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
58
"""Unix event loop.
59
60
Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
61
"""
62
63
def __init__(self, selector=None):
64
super().__init__(selector)
65
self._signal_handlers = {}
66
67
def close(self):
68
super().close()
69
if not sys.is_finalizing():
70
for sig in list(self._signal_handlers):
71
self.remove_signal_handler(sig)
72
else:
73
if self._signal_handlers:
74
warnings.warn(f"Closing the loop {self!r} "
75
f"on interpreter shutdown "
76
f"stage, skipping signal handlers removal",
77
ResourceWarning,
78
source=self)
79
self._signal_handlers.clear()
80
81
def _process_self_data(self, data):
82
for signum in data:
83
if not signum:
84
# ignore null bytes written by _write_to_self()
85
continue
86
self._handle_signal(signum)
87
88
def add_signal_handler(self, sig, callback, *args):
89
"""Add a handler for a signal. UNIX only.
90
91
Raise ValueError if the signal number is invalid or uncatchable.
92
Raise RuntimeError if there is a problem setting up the handler.
93
"""
94
if (coroutines.iscoroutine(callback) or
95
coroutines.iscoroutinefunction(callback)):
96
raise TypeError("coroutines cannot be used "
97
"with add_signal_handler()")
98
self._check_signal(sig)
99
self._check_closed()
100
try:
101
# set_wakeup_fd() raises ValueError if this is not the
102
# main thread. By calling it early we ensure that an
103
# event loop running in another thread cannot add a signal
104
# handler.
105
signal.set_wakeup_fd(self._csock.fileno())
106
except (ValueError, OSError) as exc:
107
raise RuntimeError(str(exc))
108
109
handle = events.Handle(callback, args, self, None)
110
self._signal_handlers[sig] = handle
111
112
try:
113
# Register a dummy signal handler to ask Python to write the signal
114
# number in the wakeup file descriptor. _process_self_data() will
115
# read signal numbers from this file descriptor to handle signals.
116
signal.signal(sig, _sighandler_noop)
117
118
# Set SA_RESTART to limit EINTR occurrences.
119
signal.siginterrupt(sig, False)
120
except OSError as exc:
121
del self._signal_handlers[sig]
122
if not self._signal_handlers:
123
try:
124
signal.set_wakeup_fd(-1)
125
except (ValueError, OSError) as nexc:
126
logger.info('set_wakeup_fd(-1) failed: %s', nexc)
127
128
if exc.errno == errno.EINVAL:
129
raise RuntimeError(f'sig {sig} cannot be caught')
130
else:
131
raise
132
133
def _handle_signal(self, sig):
134
"""Internal helper that is the actual signal handler."""
135
handle = self._signal_handlers.get(sig)
136
if handle is None:
137
return # Assume it's some race condition.
138
if handle._cancelled:
139
self.remove_signal_handler(sig) # Remove it properly.
140
else:
141
self._add_callback_signalsafe(handle)
142
143
def remove_signal_handler(self, sig):
144
"""Remove a handler for a signal. UNIX only.
145
146
Return True if a signal handler was removed, False if not.
147
"""
148
self._check_signal(sig)
149
try:
150
del self._signal_handlers[sig]
151
except KeyError:
152
return False
153
154
if sig == signal.SIGINT:
155
handler = signal.default_int_handler
156
else:
157
handler = signal.SIG_DFL
158
159
try:
160
signal.signal(sig, handler)
161
except OSError as exc:
162
if exc.errno == errno.EINVAL:
163
raise RuntimeError(f'sig {sig} cannot be caught')
164
else:
165
raise
166
167
if not self._signal_handlers:
168
try:
169
signal.set_wakeup_fd(-1)
170
except (ValueError, OSError) as exc:
171
logger.info('set_wakeup_fd(-1) failed: %s', exc)
172
173
return True
174
175
def _check_signal(self, sig):
176
"""Internal helper to validate a signal.
177
178
Raise ValueError if the signal number is invalid or uncatchable.
179
Raise RuntimeError if there is a problem setting up the handler.
180
"""
181
if not isinstance(sig, int):
182
raise TypeError(f'sig must be an int, not {sig!r}')
183
184
if sig not in signal.valid_signals():
185
raise ValueError(f'invalid signal number {sig}')
186
187
def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
188
extra=None):
189
return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
190
191
def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
192
extra=None):
193
return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
194
195
async def _make_subprocess_transport(self, protocol, args, shell,
196
stdin, stdout, stderr, bufsize,
197
extra=None, **kwargs):
198
with warnings.catch_warnings():
199
warnings.simplefilter('ignore', DeprecationWarning)
200
watcher = events.get_child_watcher()
201
202
with watcher:
203
if not watcher.is_active():
204
# Check early.
205
# Raising exception before process creation
206
# prevents subprocess execution if the watcher
207
# is not ready to handle it.
208
raise RuntimeError("asyncio.get_child_watcher() is not activated, "
209
"subprocess support is not installed.")
210
waiter = self.create_future()
211
transp = _UnixSubprocessTransport(self, protocol, args, shell,
212
stdin, stdout, stderr, bufsize,
213
waiter=waiter, extra=extra,
214
**kwargs)
215
watcher.add_child_handler(transp.get_pid(),
216
self._child_watcher_callback, transp)
217
try:
218
await waiter
219
except (SystemExit, KeyboardInterrupt):
220
raise
221
except BaseException:
222
transp.close()
223
await transp._wait()
224
raise
225
226
return transp
227
228
def _child_watcher_callback(self, pid, returncode, transp):
229
# Skip one iteration for callbacks to be executed
230
self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode)
231
232
async def create_unix_connection(
233
self, protocol_factory, path=None, *,
234
ssl=None, sock=None,
235
server_hostname=None,
236
ssl_handshake_timeout=None,
237
ssl_shutdown_timeout=None):
238
assert server_hostname is None or isinstance(server_hostname, str)
239
if ssl:
240
if server_hostname is None:
241
raise ValueError(
242
'you have to pass server_hostname when using ssl')
243
else:
244
if server_hostname is not None:
245
raise ValueError('server_hostname is only meaningful with ssl')
246
if ssl_handshake_timeout is not None:
247
raise ValueError(
248
'ssl_handshake_timeout is only meaningful with ssl')
249
if ssl_shutdown_timeout is not None:
250
raise ValueError(
251
'ssl_shutdown_timeout is only meaningful with ssl')
252
253
if path is not None:
254
if sock is not None:
255
raise ValueError(
256
'path and sock can not be specified at the same time')
257
258
path = os.fspath(path)
259
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
260
try:
261
sock.setblocking(False)
262
await self.sock_connect(sock, path)
263
except:
264
sock.close()
265
raise
266
267
else:
268
if sock is None:
269
raise ValueError('no path and sock were specified')
270
if (sock.family != socket.AF_UNIX or
271
sock.type != socket.SOCK_STREAM):
272
raise ValueError(
273
f'A UNIX Domain Stream Socket was expected, got {sock!r}')
274
sock.setblocking(False)
275
276
transport, protocol = await self._create_connection_transport(
277
sock, protocol_factory, ssl, server_hostname,
278
ssl_handshake_timeout=ssl_handshake_timeout,
279
ssl_shutdown_timeout=ssl_shutdown_timeout)
280
return transport, protocol
281
282
async def create_unix_server(
283
self, protocol_factory, path=None, *,
284
sock=None, backlog=100, ssl=None,
285
ssl_handshake_timeout=None,
286
ssl_shutdown_timeout=None,
287
start_serving=True):
288
if isinstance(ssl, bool):
289
raise TypeError('ssl argument must be an SSLContext or None')
290
291
if ssl_handshake_timeout is not None and not ssl:
292
raise ValueError(
293
'ssl_handshake_timeout is only meaningful with ssl')
294
295
if ssl_shutdown_timeout is not None and not ssl:
296
raise ValueError(
297
'ssl_shutdown_timeout is only meaningful with ssl')
298
299
if path is not None:
300
if sock is not None:
301
raise ValueError(
302
'path and sock can not be specified at the same time')
303
304
path = os.fspath(path)
305
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
306
307
# Check for abstract socket. `str` and `bytes` paths are supported.
308
if path[0] not in (0, '\x00'):
309
try:
310
if stat.S_ISSOCK(os.stat(path).st_mode):
311
os.remove(path)
312
except FileNotFoundError:
313
pass
314
except OSError as err:
315
# Directory may have permissions only to create socket.
316
logger.error('Unable to check or remove stale UNIX socket '
317
'%r: %r', path, err)
318
319
try:
320
sock.bind(path)
321
except OSError as exc:
322
sock.close()
323
if exc.errno == errno.EADDRINUSE:
324
# Let's improve the error message by adding
325
# with what exact address it occurs.
326
msg = f'Address {path!r} is already in use'
327
raise OSError(errno.EADDRINUSE, msg) from None
328
else:
329
raise
330
except:
331
sock.close()
332
raise
333
else:
334
if sock is None:
335
raise ValueError(
336
'path was not specified, and no sock specified')
337
338
if (sock.family != socket.AF_UNIX or
339
sock.type != socket.SOCK_STREAM):
340
raise ValueError(
341
f'A UNIX Domain Stream Socket was expected, got {sock!r}')
342
343
sock.setblocking(False)
344
server = base_events.Server(self, [sock], protocol_factory,
345
ssl, backlog, ssl_handshake_timeout,
346
ssl_shutdown_timeout)
347
if start_serving:
348
server._start_serving()
349
# Skip one loop iteration so that all 'loop.add_reader'
350
# go through.
351
await tasks.sleep(0)
352
353
return server
354
355
async def _sock_sendfile_native(self, sock, file, offset, count):
356
try:
357
os.sendfile
358
except AttributeError:
359
raise exceptions.SendfileNotAvailableError(
360
"os.sendfile() is not available")
361
try:
362
fileno = file.fileno()
363
except (AttributeError, io.UnsupportedOperation) as err:
364
raise exceptions.SendfileNotAvailableError("not a regular file")
365
try:
366
fsize = os.fstat(fileno).st_size
367
except OSError:
368
raise exceptions.SendfileNotAvailableError("not a regular file")
369
blocksize = count if count else fsize
370
if not blocksize:
371
return 0 # empty file
372
373
fut = self.create_future()
374
self._sock_sendfile_native_impl(fut, None, sock, fileno,
375
offset, count, blocksize, 0)
376
return await fut
377
378
def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
379
offset, count, blocksize, total_sent):
380
fd = sock.fileno()
381
if registered_fd is not None:
382
# Remove the callback early. It should be rare that the
383
# selector says the fd is ready but the call still returns
384
# EAGAIN, and I am willing to take a hit in that case in
385
# order to simplify the common case.
386
self.remove_writer(registered_fd)
387
if fut.cancelled():
388
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
389
return
390
if count:
391
blocksize = count - total_sent
392
if blocksize <= 0:
393
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
394
fut.set_result(total_sent)
395
return
396
397
try:
398
sent = os.sendfile(fd, fileno, offset, blocksize)
399
except (BlockingIOError, InterruptedError):
400
if registered_fd is None:
401
self._sock_add_cancellation_callback(fut, sock)
402
self.add_writer(fd, self._sock_sendfile_native_impl, fut,
403
fd, sock, fileno,
404
offset, count, blocksize, total_sent)
405
except OSError as exc:
406
if (registered_fd is not None and
407
exc.errno == errno.ENOTCONN and
408
type(exc) is not ConnectionError):
409
# If we have an ENOTCONN and this isn't a first call to
410
# sendfile(), i.e. the connection was closed in the middle
411
# of the operation, normalize the error to ConnectionError
412
# to make it consistent across all Posix systems.
413
new_exc = ConnectionError(
414
"socket is not connected", errno.ENOTCONN)
415
new_exc.__cause__ = exc
416
exc = new_exc
417
if total_sent == 0:
418
# We can get here for different reasons, the main
419
# one being 'file' is not a regular mmap(2)-like
420
# file, in which case we'll fall back on using
421
# plain send().
422
err = exceptions.SendfileNotAvailableError(
423
"os.sendfile call failed")
424
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
425
fut.set_exception(err)
426
else:
427
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
428
fut.set_exception(exc)
429
except (SystemExit, KeyboardInterrupt):
430
raise
431
except BaseException as exc:
432
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
433
fut.set_exception(exc)
434
else:
435
if sent == 0:
436
# EOF
437
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
438
fut.set_result(total_sent)
439
else:
440
offset += sent
441
total_sent += sent
442
if registered_fd is None:
443
self._sock_add_cancellation_callback(fut, sock)
444
self.add_writer(fd, self._sock_sendfile_native_impl, fut,
445
fd, sock, fileno,
446
offset, count, blocksize, total_sent)
447
448
def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
449
if total_sent > 0:
450
os.lseek(fileno, offset, os.SEEK_SET)
451
452
def _sock_add_cancellation_callback(self, fut, sock):
453
def cb(fut):
454
if fut.cancelled():
455
fd = sock.fileno()
456
if fd != -1:
457
self.remove_writer(fd)
458
fut.add_done_callback(cb)
459
460
461
class _UnixReadPipeTransport(transports.ReadTransport):
462
463
max_size = 256 * 1024 # max bytes we read in one event loop iteration
464
465
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
466
super().__init__(extra)
467
self._extra['pipe'] = pipe
468
self._loop = loop
469
self._pipe = pipe
470
self._fileno = pipe.fileno()
471
self._protocol = protocol
472
self._closing = False
473
self._paused = False
474
475
mode = os.fstat(self._fileno).st_mode
476
if not (stat.S_ISFIFO(mode) or
477
stat.S_ISSOCK(mode) or
478
stat.S_ISCHR(mode)):
479
self._pipe = None
480
self._fileno = None
481
self._protocol = None
482
raise ValueError("Pipe transport is for pipes/sockets only.")
483
484
os.set_blocking(self._fileno, False)
485
486
self._loop.call_soon(self._protocol.connection_made, self)
487
# only start reading when connection_made() has been called
488
self._loop.call_soon(self._add_reader,
489
self._fileno, self._read_ready)
490
if waiter is not None:
491
# only wake up the waiter when connection_made() has been called
492
self._loop.call_soon(futures._set_result_unless_cancelled,
493
waiter, None)
494
495
def _add_reader(self, fd, callback):
496
if not self.is_reading():
497
return
498
self._loop._add_reader(fd, callback)
499
500
def is_reading(self):
501
return not self._paused and not self._closing
502
503
def __repr__(self):
504
info = [self.__class__.__name__]
505
if self._pipe is None:
506
info.append('closed')
507
elif self._closing:
508
info.append('closing')
509
info.append(f'fd={self._fileno}')
510
selector = getattr(self._loop, '_selector', None)
511
if self._pipe is not None and selector is not None:
512
polling = selector_events._test_selector_event(
513
selector, self._fileno, selectors.EVENT_READ)
514
if polling:
515
info.append('polling')
516
else:
517
info.append('idle')
518
elif self._pipe is not None:
519
info.append('open')
520
else:
521
info.append('closed')
522
return '<{}>'.format(' '.join(info))
523
524
def _read_ready(self):
525
try:
526
data = os.read(self._fileno, self.max_size)
527
except (BlockingIOError, InterruptedError):
528
pass
529
except OSError as exc:
530
self._fatal_error(exc, 'Fatal read error on pipe transport')
531
else:
532
if data:
533
self._protocol.data_received(data)
534
else:
535
if self._loop.get_debug():
536
logger.info("%r was closed by peer", self)
537
self._closing = True
538
self._loop._remove_reader(self._fileno)
539
self._loop.call_soon(self._protocol.eof_received)
540
self._loop.call_soon(self._call_connection_lost, None)
541
542
def pause_reading(self):
543
if not self.is_reading():
544
return
545
self._paused = True
546
self._loop._remove_reader(self._fileno)
547
if self._loop.get_debug():
548
logger.debug("%r pauses reading", self)
549
550
def resume_reading(self):
551
if self._closing or not self._paused:
552
return
553
self._paused = False
554
self._loop._add_reader(self._fileno, self._read_ready)
555
if self._loop.get_debug():
556
logger.debug("%r resumes reading", self)
557
558
def set_protocol(self, protocol):
559
self._protocol = protocol
560
561
def get_protocol(self):
562
return self._protocol
563
564
def is_closing(self):
565
return self._closing
566
567
def close(self):
568
if not self._closing:
569
self._close(None)
570
571
def __del__(self, _warn=warnings.warn):
572
if self._pipe is not None:
573
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
574
self._pipe.close()
575
576
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
577
# should be called by exception handler only
578
if (isinstance(exc, OSError) and exc.errno == errno.EIO):
579
if self._loop.get_debug():
580
logger.debug("%r: %s", self, message, exc_info=True)
581
else:
582
self._loop.call_exception_handler({
583
'message': message,
584
'exception': exc,
585
'transport': self,
586
'protocol': self._protocol,
587
})
588
self._close(exc)
589
590
def _close(self, exc):
591
self._closing = True
592
self._loop._remove_reader(self._fileno)
593
self._loop.call_soon(self._call_connection_lost, exc)
594
595
def _call_connection_lost(self, exc):
596
try:
597
self._protocol.connection_lost(exc)
598
finally:
599
self._pipe.close()
600
self._pipe = None
601
self._protocol = None
602
self._loop = None
603
604
605
class _UnixWritePipeTransport(transports._FlowControlMixin,
606
transports.WriteTransport):
607
608
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
609
super().__init__(extra, loop)
610
self._extra['pipe'] = pipe
611
self._pipe = pipe
612
self._fileno = pipe.fileno()
613
self._protocol = protocol
614
self._buffer = bytearray()
615
self._conn_lost = 0
616
self._closing = False # Set when close() or write_eof() called.
617
618
mode = os.fstat(self._fileno).st_mode
619
is_char = stat.S_ISCHR(mode)
620
is_fifo = stat.S_ISFIFO(mode)
621
is_socket = stat.S_ISSOCK(mode)
622
if not (is_char or is_fifo or is_socket):
623
self._pipe = None
624
self._fileno = None
625
self._protocol = None
626
raise ValueError("Pipe transport is only for "
627
"pipes, sockets and character devices")
628
629
os.set_blocking(self._fileno, False)
630
self._loop.call_soon(self._protocol.connection_made, self)
631
632
# On AIX, the reader trick (to be notified when the read end of the
633
# socket is closed) only works for sockets. On other platforms it
634
# works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
635
if is_socket or (is_fifo and not sys.platform.startswith("aix")):
636
# only start reading when connection_made() has been called
637
self._loop.call_soon(self._loop._add_reader,
638
self._fileno, self._read_ready)
639
640
if waiter is not None:
641
# only wake up the waiter when connection_made() has been called
642
self._loop.call_soon(futures._set_result_unless_cancelled,
643
waiter, None)
644
645
def __repr__(self):
646
info = [self.__class__.__name__]
647
if self._pipe is None:
648
info.append('closed')
649
elif self._closing:
650
info.append('closing')
651
info.append(f'fd={self._fileno}')
652
selector = getattr(self._loop, '_selector', None)
653
if self._pipe is not None and selector is not None:
654
polling = selector_events._test_selector_event(
655
selector, self._fileno, selectors.EVENT_WRITE)
656
if polling:
657
info.append('polling')
658
else:
659
info.append('idle')
660
661
bufsize = self.get_write_buffer_size()
662
info.append(f'bufsize={bufsize}')
663
elif self._pipe is not None:
664
info.append('open')
665
else:
666
info.append('closed')
667
return '<{}>'.format(' '.join(info))
668
669
def get_write_buffer_size(self):
670
return len(self._buffer)
671
672
def _read_ready(self):
673
# Pipe was closed by peer.
674
if self._loop.get_debug():
675
logger.info("%r was closed by peer", self)
676
if self._buffer:
677
self._close(BrokenPipeError())
678
else:
679
self._close()
680
681
def write(self, data):
682
assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
683
if isinstance(data, bytearray):
684
data = memoryview(data)
685
if not data:
686
return
687
688
if self._conn_lost or self._closing:
689
if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
690
logger.warning('pipe closed by peer or '
691
'os.write(pipe, data) raised exception.')
692
self._conn_lost += 1
693
return
694
695
if not self._buffer:
696
# Attempt to send it right away first.
697
try:
698
n = os.write(self._fileno, data)
699
except (BlockingIOError, InterruptedError):
700
n = 0
701
except (SystemExit, KeyboardInterrupt):
702
raise
703
except BaseException as exc:
704
self._conn_lost += 1
705
self._fatal_error(exc, 'Fatal write error on pipe transport')
706
return
707
if n == len(data):
708
return
709
elif n > 0:
710
data = memoryview(data)[n:]
711
self._loop._add_writer(self._fileno, self._write_ready)
712
713
self._buffer += data
714
self._maybe_pause_protocol()
715
716
def _write_ready(self):
717
assert self._buffer, 'Data should not be empty'
718
719
try:
720
n = os.write(self._fileno, self._buffer)
721
except (BlockingIOError, InterruptedError):
722
pass
723
except (SystemExit, KeyboardInterrupt):
724
raise
725
except BaseException as exc:
726
self._buffer.clear()
727
self._conn_lost += 1
728
# Remove writer here, _fatal_error() doesn't it
729
# because _buffer is empty.
730
self._loop._remove_writer(self._fileno)
731
self._fatal_error(exc, 'Fatal write error on pipe transport')
732
else:
733
if n == len(self._buffer):
734
self._buffer.clear()
735
self._loop._remove_writer(self._fileno)
736
self._maybe_resume_protocol() # May append to buffer.
737
if self._closing:
738
self._loop._remove_reader(self._fileno)
739
self._call_connection_lost(None)
740
return
741
elif n > 0:
742
del self._buffer[:n]
743
744
def can_write_eof(self):
745
return True
746
747
def write_eof(self):
748
if self._closing:
749
return
750
assert self._pipe
751
self._closing = True
752
if not self._buffer:
753
self._loop._remove_reader(self._fileno)
754
self._loop.call_soon(self._call_connection_lost, None)
755
756
def set_protocol(self, protocol):
757
self._protocol = protocol
758
759
def get_protocol(self):
760
return self._protocol
761
762
def is_closing(self):
763
return self._closing
764
765
def close(self):
766
if self._pipe is not None and not self._closing:
767
# write_eof is all what we needed to close the write pipe
768
self.write_eof()
769
770
def __del__(self, _warn=warnings.warn):
771
if self._pipe is not None:
772
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
773
self._pipe.close()
774
775
def abort(self):
776
self._close(None)
777
778
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
779
# should be called by exception handler only
780
if isinstance(exc, OSError):
781
if self._loop.get_debug():
782
logger.debug("%r: %s", self, message, exc_info=True)
783
else:
784
self._loop.call_exception_handler({
785
'message': message,
786
'exception': exc,
787
'transport': self,
788
'protocol': self._protocol,
789
})
790
self._close(exc)
791
792
def _close(self, exc=None):
793
self._closing = True
794
if self._buffer:
795
self._loop._remove_writer(self._fileno)
796
self._buffer.clear()
797
self._loop._remove_reader(self._fileno)
798
self._loop.call_soon(self._call_connection_lost, exc)
799
800
def _call_connection_lost(self, exc):
801
try:
802
self._protocol.connection_lost(exc)
803
finally:
804
self._pipe.close()
805
self._pipe = None
806
self._protocol = None
807
self._loop = None
808
809
810
class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
811
812
def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
813
stdin_w = None
814
if stdin == subprocess.PIPE and sys.platform.startswith('aix'):
815
# Use a socket pair for stdin on AIX, since it does not
816
# support selecting read events on the write end of a
817
# socket (which we use in order to detect closing of the
818
# other end).
819
stdin, stdin_w = socket.socketpair()
820
try:
821
self._proc = subprocess.Popen(
822
args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
823
universal_newlines=False, bufsize=bufsize, **kwargs)
824
if stdin_w is not None:
825
stdin.close()
826
self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
827
stdin_w = None
828
finally:
829
if stdin_w is not None:
830
stdin.close()
831
stdin_w.close()
832
833
834
class AbstractChildWatcher:
835
"""Abstract base class for monitoring child processes.
836
837
Objects derived from this class monitor a collection of subprocesses and
838
report their termination or interruption by a signal.
839
840
New callbacks are registered with .add_child_handler(). Starting a new
841
process must be done within a 'with' block to allow the watcher to suspend
842
its activity until the new process if fully registered (this is needed to
843
prevent a race condition in some implementations).
844
845
Example:
846
with watcher:
847
proc = subprocess.Popen("sleep 1")
848
watcher.add_child_handler(proc.pid, callback)
849
850
Notes:
851
Implementations of this class must be thread-safe.
852
853
Since child watcher objects may catch the SIGCHLD signal and call
854
waitpid(-1), there should be only one active object per process.
855
"""
856
857
def __init_subclass__(cls) -> None:
858
if cls.__module__ != __name__:
859
warnings._deprecated("AbstractChildWatcher",
860
"{name!r} is deprecated as of Python 3.12 and will be "
861
"removed in Python {remove}.",
862
remove=(3, 14))
863
864
def add_child_handler(self, pid, callback, *args):
865
"""Register a new child handler.
866
867
Arrange for callback(pid, returncode, *args) to be called when
868
process 'pid' terminates. Specifying another callback for the same
869
process replaces the previous handler.
870
871
Note: callback() must be thread-safe.
872
"""
873
raise NotImplementedError()
874
875
def remove_child_handler(self, pid):
876
"""Removes the handler for process 'pid'.
877
878
The function returns True if the handler was successfully removed,
879
False if there was nothing to remove."""
880
881
raise NotImplementedError()
882
883
def attach_loop(self, loop):
884
"""Attach the watcher to an event loop.
885
886
If the watcher was previously attached to an event loop, then it is
887
first detached before attaching to the new loop.
888
889
Note: loop may be None.
890
"""
891
raise NotImplementedError()
892
893
def close(self):
894
"""Close the watcher.
895
896
This must be called to make sure that any underlying resource is freed.
897
"""
898
raise NotImplementedError()
899
900
def is_active(self):
901
"""Return ``True`` if the watcher is active and is used by the event loop.
902
903
Return True if the watcher is installed and ready to handle process exit
904
notifications.
905
906
"""
907
raise NotImplementedError()
908
909
def __enter__(self):
910
"""Enter the watcher's context and allow starting new processes
911
912
This function must return self"""
913
raise NotImplementedError()
914
915
def __exit__(self, a, b, c):
916
"""Exit the watcher's context"""
917
raise NotImplementedError()
918
919
920
class PidfdChildWatcher(AbstractChildWatcher):
921
"""Child watcher implementation using Linux's pid file descriptors.
922
923
This child watcher polls process file descriptors (pidfds) to await child
924
process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
925
child watcher implementation. It doesn't require signals or threads, doesn't
926
interfere with any processes launched outside the event loop, and scales
927
linearly with the number of subprocesses launched by the event loop. The
928
main disadvantage is that pidfds are specific to Linux, and only work on
929
recent (5.3+) kernels.
930
"""
931
932
def __enter__(self):
933
return self
934
935
def __exit__(self, exc_type, exc_value, exc_traceback):
936
pass
937
938
def is_active(self):
939
return True
940
941
def close(self):
942
pass
943
944
def attach_loop(self, loop):
945
pass
946
947
def add_child_handler(self, pid, callback, *args):
948
loop = events.get_running_loop()
949
pidfd = os.pidfd_open(pid)
950
loop._add_reader(pidfd, self._do_wait, pid, pidfd, callback, args)
951
952
def _do_wait(self, pid, pidfd, callback, args):
953
loop = events.get_running_loop()
954
loop._remove_reader(pidfd)
955
try:
956
_, status = os.waitpid(pid, 0)
957
except ChildProcessError:
958
# The child process is already reaped
959
# (may happen if waitpid() is called elsewhere).
960
returncode = 255
961
logger.warning(
962
"child process pid %d exit status already read: "
963
" will report returncode 255",
964
pid)
965
else:
966
returncode = waitstatus_to_exitcode(status)
967
968
os.close(pidfd)
969
callback(pid, returncode, *args)
970
971
def remove_child_handler(self, pid):
972
# asyncio never calls remove_child_handler() !!!
973
# The method is no-op but is implemented because
974
# abstract base classes require it.
975
return True
976
977
978
class BaseChildWatcher(AbstractChildWatcher):
979
980
def __init__(self):
981
self._loop = None
982
self._callbacks = {}
983
984
def close(self):
985
self.attach_loop(None)
986
987
def is_active(self):
988
return self._loop is not None and self._loop.is_running()
989
990
def _do_waitpid(self, expected_pid):
991
raise NotImplementedError()
992
993
def _do_waitpid_all(self):
994
raise NotImplementedError()
995
996
def attach_loop(self, loop):
997
assert loop is None or isinstance(loop, events.AbstractEventLoop)
998
999
if self._loop is not None and loop is None and self._callbacks:
1000
warnings.warn(
1001
'A loop is being detached '
1002
'from a child watcher with pending handlers',
1003
RuntimeWarning)
1004
1005
if self._loop is not None:
1006
self._loop.remove_signal_handler(signal.SIGCHLD)
1007
1008
self._loop = loop
1009
if loop is not None:
1010
loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
1011
1012
# Prevent a race condition in case a child terminated
1013
# during the switch.
1014
self._do_waitpid_all()
1015
1016
def _sig_chld(self):
1017
try:
1018
self._do_waitpid_all()
1019
except (SystemExit, KeyboardInterrupt):
1020
raise
1021
except BaseException as exc:
1022
# self._loop should always be available here
1023
# as '_sig_chld' is added as a signal handler
1024
# in 'attach_loop'
1025
self._loop.call_exception_handler({
1026
'message': 'Unknown exception in SIGCHLD handler',
1027
'exception': exc,
1028
})
1029
1030
1031
class SafeChildWatcher(BaseChildWatcher):
1032
"""'Safe' child watcher implementation.
1033
1034
This implementation avoids disrupting other code spawning processes by
1035
polling explicitly each process in the SIGCHLD handler instead of calling
1036
os.waitpid(-1).
1037
1038
This is a safe solution but it has a significant overhead when handling a
1039
big number of children (O(n) each time SIGCHLD is raised)
1040
"""
1041
1042
def __init__(self):
1043
super().__init__()
1044
warnings._deprecated("SafeChildWatcher",
1045
"{name!r} is deprecated as of Python 3.12 and will be "
1046
"removed in Python {remove}.",
1047
remove=(3, 14))
1048
1049
def close(self):
1050
self._callbacks.clear()
1051
super().close()
1052
1053
def __enter__(self):
1054
return self
1055
1056
def __exit__(self, a, b, c):
1057
pass
1058
1059
def add_child_handler(self, pid, callback, *args):
1060
self._callbacks[pid] = (callback, args)
1061
1062
# Prevent a race condition in case the child is already terminated.
1063
self._do_waitpid(pid)
1064
1065
def remove_child_handler(self, pid):
1066
try:
1067
del self._callbacks[pid]
1068
return True
1069
except KeyError:
1070
return False
1071
1072
def _do_waitpid_all(self):
1073
1074
for pid in list(self._callbacks):
1075
self._do_waitpid(pid)
1076
1077
def _do_waitpid(self, expected_pid):
1078
assert expected_pid > 0
1079
1080
try:
1081
pid, status = os.waitpid(expected_pid, os.WNOHANG)
1082
except ChildProcessError:
1083
# The child process is already reaped
1084
# (may happen if waitpid() is called elsewhere).
1085
pid = expected_pid
1086
returncode = 255
1087
logger.warning(
1088
"Unknown child process pid %d, will report returncode 255",
1089
pid)
1090
else:
1091
if pid == 0:
1092
# The child process is still alive.
1093
return
1094
1095
returncode = waitstatus_to_exitcode(status)
1096
if self._loop.get_debug():
1097
logger.debug('process %s exited with returncode %s',
1098
expected_pid, returncode)
1099
1100
try:
1101
callback, args = self._callbacks.pop(pid)
1102
except KeyError: # pragma: no cover
1103
# May happen if .remove_child_handler() is called
1104
# after os.waitpid() returns.
1105
if self._loop.get_debug():
1106
logger.warning("Child watcher got an unexpected pid: %r",
1107
pid, exc_info=True)
1108
else:
1109
callback(pid, returncode, *args)
1110
1111
1112
class FastChildWatcher(BaseChildWatcher):
1113
"""'Fast' child watcher implementation.
1114
1115
This implementation reaps every terminated processes by calling
1116
os.waitpid(-1) directly, possibly breaking other code spawning processes
1117
and waiting for their termination.
1118
1119
There is no noticeable overhead when handling a big number of children
1120
(O(1) each time a child terminates).
1121
"""
1122
def __init__(self):
1123
super().__init__()
1124
self._lock = threading.Lock()
1125
self._zombies = {}
1126
self._forks = 0
1127
warnings._deprecated("FastChildWatcher",
1128
"{name!r} is deprecated as of Python 3.12 and will be "
1129
"removed in Python {remove}.",
1130
remove=(3, 14))
1131
1132
def close(self):
1133
self._callbacks.clear()
1134
self._zombies.clear()
1135
super().close()
1136
1137
def __enter__(self):
1138
with self._lock:
1139
self._forks += 1
1140
1141
return self
1142
1143
def __exit__(self, a, b, c):
1144
with self._lock:
1145
self._forks -= 1
1146
1147
if self._forks or not self._zombies:
1148
return
1149
1150
collateral_victims = str(self._zombies)
1151
self._zombies.clear()
1152
1153
logger.warning(
1154
"Caught subprocesses termination from unknown pids: %s",
1155
collateral_victims)
1156
1157
def add_child_handler(self, pid, callback, *args):
1158
assert self._forks, "Must use the context manager"
1159
1160
with self._lock:
1161
try:
1162
returncode = self._zombies.pop(pid)
1163
except KeyError:
1164
# The child is running.
1165
self._callbacks[pid] = callback, args
1166
return
1167
1168
# The child is dead already. We can fire the callback.
1169
callback(pid, returncode, *args)
1170
1171
def remove_child_handler(self, pid):
1172
try:
1173
del self._callbacks[pid]
1174
return True
1175
except KeyError:
1176
return False
1177
1178
def _do_waitpid_all(self):
1179
# Because of signal coalescing, we must keep calling waitpid() as
1180
# long as we're able to reap a child.
1181
while True:
1182
try:
1183
pid, status = os.waitpid(-1, os.WNOHANG)
1184
except ChildProcessError:
1185
# No more child processes exist.
1186
return
1187
else:
1188
if pid == 0:
1189
# A child process is still alive.
1190
return
1191
1192
returncode = waitstatus_to_exitcode(status)
1193
1194
with self._lock:
1195
try:
1196
callback, args = self._callbacks.pop(pid)
1197
except KeyError:
1198
# unknown child
1199
if self._forks:
1200
# It may not be registered yet.
1201
self._zombies[pid] = returncode
1202
if self._loop.get_debug():
1203
logger.debug('unknown process %s exited '
1204
'with returncode %s',
1205
pid, returncode)
1206
continue
1207
callback = None
1208
else:
1209
if self._loop.get_debug():
1210
logger.debug('process %s exited with returncode %s',
1211
pid, returncode)
1212
1213
if callback is None:
1214
logger.warning(
1215
"Caught subprocess termination from unknown pid: "
1216
"%d -> %d", pid, returncode)
1217
else:
1218
callback(pid, returncode, *args)
1219
1220
1221
class MultiLoopChildWatcher(AbstractChildWatcher):
1222
"""A watcher that doesn't require running loop in the main thread.
1223
1224
This implementation registers a SIGCHLD signal handler on
1225
instantiation (which may conflict with other code that
1226
install own handler for this signal).
1227
1228
The solution is safe but it has a significant overhead when
1229
handling a big number of processes (*O(n)* each time a
1230
SIGCHLD is received).
1231
"""
1232
1233
# Implementation note:
1234
# The class keeps compatibility with AbstractChildWatcher ABC
1235
# To achieve this it has empty attach_loop() method
1236
# and doesn't accept explicit loop argument
1237
# for add_child_handler()/remove_child_handler()
1238
# but retrieves the current loop by get_running_loop()
1239
1240
def __init__(self):
1241
self._callbacks = {}
1242
self._saved_sighandler = None
1243
warnings._deprecated("MultiLoopChildWatcher",
1244
"{name!r} is deprecated as of Python 3.12 and will be "
1245
"removed in Python {remove}.",
1246
remove=(3, 14))
1247
1248
def is_active(self):
1249
return self._saved_sighandler is not None
1250
1251
def close(self):
1252
self._callbacks.clear()
1253
if self._saved_sighandler is None:
1254
return
1255
1256
handler = signal.getsignal(signal.SIGCHLD)
1257
if handler != self._sig_chld:
1258
logger.warning("SIGCHLD handler was changed by outside code")
1259
else:
1260
signal.signal(signal.SIGCHLD, self._saved_sighandler)
1261
self._saved_sighandler = None
1262
1263
def __enter__(self):
1264
return self
1265
1266
def __exit__(self, exc_type, exc_val, exc_tb):
1267
pass
1268
1269
def add_child_handler(self, pid, callback, *args):
1270
loop = events.get_running_loop()
1271
self._callbacks[pid] = (loop, callback, args)
1272
1273
# Prevent a race condition in case the child is already terminated.
1274
self._do_waitpid(pid)
1275
1276
def remove_child_handler(self, pid):
1277
try:
1278
del self._callbacks[pid]
1279
return True
1280
except KeyError:
1281
return False
1282
1283
def attach_loop(self, loop):
1284
# Don't save the loop but initialize itself if called first time
1285
# The reason to do it here is that attach_loop() is called from
1286
# unix policy only for the main thread.
1287
# Main thread is required for subscription on SIGCHLD signal
1288
if self._saved_sighandler is not None:
1289
return
1290
1291
self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1292
if self._saved_sighandler is None:
1293
logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1294
"restore to default handler on watcher close.")
1295
self._saved_sighandler = signal.SIG_DFL
1296
1297
# Set SA_RESTART to limit EINTR occurrences.
1298
signal.siginterrupt(signal.SIGCHLD, False)
1299
1300
def _do_waitpid_all(self):
1301
for pid in list(self._callbacks):
1302
self._do_waitpid(pid)
1303
1304
def _do_waitpid(self, expected_pid):
1305
assert expected_pid > 0
1306
1307
try:
1308
pid, status = os.waitpid(expected_pid, os.WNOHANG)
1309
except ChildProcessError:
1310
# The child process is already reaped
1311
# (may happen if waitpid() is called elsewhere).
1312
pid = expected_pid
1313
returncode = 255
1314
logger.warning(
1315
"Unknown child process pid %d, will report returncode 255",
1316
pid)
1317
debug_log = False
1318
else:
1319
if pid == 0:
1320
# The child process is still alive.
1321
return
1322
1323
returncode = waitstatus_to_exitcode(status)
1324
debug_log = True
1325
try:
1326
loop, callback, args = self._callbacks.pop(pid)
1327
except KeyError: # pragma: no cover
1328
# May happen if .remove_child_handler() is called
1329
# after os.waitpid() returns.
1330
logger.warning("Child watcher got an unexpected pid: %r",
1331
pid, exc_info=True)
1332
else:
1333
if loop.is_closed():
1334
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1335
else:
1336
if debug_log and loop.get_debug():
1337
logger.debug('process %s exited with returncode %s',
1338
expected_pid, returncode)
1339
loop.call_soon_threadsafe(callback, pid, returncode, *args)
1340
1341
def _sig_chld(self, signum, frame):
1342
try:
1343
self._do_waitpid_all()
1344
except (SystemExit, KeyboardInterrupt):
1345
raise
1346
except BaseException:
1347
logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1348
1349
1350
class ThreadedChildWatcher(AbstractChildWatcher):
1351
"""Threaded child watcher implementation.
1352
1353
The watcher uses a thread per process
1354
for waiting for the process finish.
1355
1356
It doesn't require subscription on POSIX signal
1357
but a thread creation is not free.
1358
1359
The watcher has O(1) complexity, its performance doesn't depend
1360
on amount of spawn processes.
1361
"""
1362
1363
def __init__(self):
1364
self._pid_counter = itertools.count(0)
1365
self._threads = {}
1366
1367
def is_active(self):
1368
return True
1369
1370
def close(self):
1371
self._join_threads()
1372
1373
def _join_threads(self):
1374
"""Internal: Join all non-daemon threads"""
1375
threads = [thread for thread in list(self._threads.values())
1376
if thread.is_alive() and not thread.daemon]
1377
for thread in threads:
1378
thread.join()
1379
1380
def __enter__(self):
1381
return self
1382
1383
def __exit__(self, exc_type, exc_val, exc_tb):
1384
pass
1385
1386
def __del__(self, _warn=warnings.warn):
1387
threads = [thread for thread in list(self._threads.values())
1388
if thread.is_alive()]
1389
if threads:
1390
_warn(f"{self.__class__} has registered but not finished child processes",
1391
ResourceWarning,
1392
source=self)
1393
1394
def add_child_handler(self, pid, callback, *args):
1395
loop = events.get_running_loop()
1396
thread = threading.Thread(target=self._do_waitpid,
1397
name=f"waitpid-{next(self._pid_counter)}",
1398
args=(loop, pid, callback, args),
1399
daemon=True)
1400
self._threads[pid] = thread
1401
thread.start()
1402
1403
def remove_child_handler(self, pid):
1404
# asyncio never calls remove_child_handler() !!!
1405
# The method is no-op but is implemented because
1406
# abstract base classes require it.
1407
return True
1408
1409
def attach_loop(self, loop):
1410
pass
1411
1412
def _do_waitpid(self, loop, expected_pid, callback, args):
1413
assert expected_pid > 0
1414
1415
try:
1416
pid, status = os.waitpid(expected_pid, 0)
1417
except ChildProcessError:
1418
# The child process is already reaped
1419
# (may happen if waitpid() is called elsewhere).
1420
pid = expected_pid
1421
returncode = 255
1422
logger.warning(
1423
"Unknown child process pid %d, will report returncode 255",
1424
pid)
1425
else:
1426
returncode = waitstatus_to_exitcode(status)
1427
if loop.get_debug():
1428
logger.debug('process %s exited with returncode %s',
1429
expected_pid, returncode)
1430
1431
if loop.is_closed():
1432
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1433
else:
1434
loop.call_soon_threadsafe(callback, pid, returncode, *args)
1435
1436
self._threads.pop(expected_pid)
1437
1438
def can_use_pidfd():
1439
if not hasattr(os, 'pidfd_open'):
1440
return False
1441
try:
1442
pid = os.getpid()
1443
os.close(os.pidfd_open(pid, 0))
1444
except OSError:
1445
# blocked by security policy like SECCOMP
1446
return False
1447
return True
1448
1449
1450
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
1451
"""UNIX event loop policy with a watcher for child processes."""
1452
_loop_factory = _UnixSelectorEventLoop
1453
1454
def __init__(self):
1455
super().__init__()
1456
self._watcher = None
1457
1458
def _init_watcher(self):
1459
with events._lock:
1460
if self._watcher is None: # pragma: no branch
1461
if can_use_pidfd():
1462
self._watcher = PidfdChildWatcher()
1463
else:
1464
self._watcher = ThreadedChildWatcher()
1465
if threading.current_thread() is threading.main_thread():
1466
self._watcher.attach_loop(self._local._loop)
1467
1468
def set_event_loop(self, loop):
1469
"""Set the event loop.
1470
1471
As a side effect, if a child watcher was set before, then calling
1472
.set_event_loop() from the main thread will call .attach_loop(loop) on
1473
the child watcher.
1474
"""
1475
1476
super().set_event_loop(loop)
1477
1478
if (self._watcher is not None and
1479
threading.current_thread() is threading.main_thread()):
1480
self._watcher.attach_loop(loop)
1481
1482
def get_child_watcher(self):
1483
"""Get the watcher for child processes.
1484
1485
If not yet set, a ThreadedChildWatcher object is automatically created.
1486
"""
1487
if self._watcher is None:
1488
self._init_watcher()
1489
1490
warnings._deprecated("get_child_watcher",
1491
"{name!r} is deprecated as of Python 3.12 and will be "
1492
"removed in Python {remove}.", remove=(3, 14))
1493
return self._watcher
1494
1495
def set_child_watcher(self, watcher):
1496
"""Set the watcher for child processes."""
1497
1498
assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1499
1500
if self._watcher is not None:
1501
self._watcher.close()
1502
1503
self._watcher = watcher
1504
warnings._deprecated("set_child_watcher",
1505
"{name!r} is deprecated as of Python 3.12 and will be "
1506
"removed in Python {remove}.", remove=(3, 14))
1507
1508
1509
SelectorEventLoop = _UnixSelectorEventLoop
1510
DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
1511
1512