Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/selector_events.py
12 views
1
"""Event loop using a selector and related classes.
2
3
A selector is a "notify-when-ready" multiplexer. For a subclass which
4
also includes support for signal handling, see the unix_events sub-module.
5
"""
6
7
__all__ = 'BaseSelectorEventLoop',
8
9
import collections
10
import errno
11
import functools
12
import itertools
13
import os
14
import selectors
15
import socket
16
import warnings
17
import weakref
18
try:
19
import ssl
20
except ImportError: # pragma: no cover
21
ssl = None
22
23
from . import base_events
24
from . import constants
25
from . import events
26
from . import futures
27
from . import protocols
28
from . import sslproto
29
from . import transports
30
from . import trsock
31
from .log import logger
32
33
_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg')
34
35
if _HAS_SENDMSG:
36
try:
37
SC_IOV_MAX = os.sysconf('SC_IOV_MAX')
38
except OSError:
39
# Fallback to send
40
_HAS_SENDMSG = False
41
42
def _test_selector_event(selector, fd, event):
43
# Test if the selector is monitoring 'event' events
44
# for the file descriptor 'fd'.
45
try:
46
key = selector.get_key(fd)
47
except KeyError:
48
return False
49
else:
50
return bool(key.events & event)
51
52
53
class BaseSelectorEventLoop(base_events.BaseEventLoop):
54
"""Selector event loop.
55
56
See events.EventLoop for API specification.
57
"""
58
59
def __init__(self, selector=None):
60
super().__init__()
61
62
if selector is None:
63
selector = selectors.DefaultSelector()
64
logger.debug('Using selector: %s', selector.__class__.__name__)
65
self._selector = selector
66
self._make_self_pipe()
67
self._transports = weakref.WeakValueDictionary()
68
69
def _make_socket_transport(self, sock, protocol, waiter=None, *,
70
extra=None, server=None):
71
self._ensure_fd_no_transport(sock)
72
return _SelectorSocketTransport(self, sock, protocol, waiter,
73
extra, server)
74
75
def _make_ssl_transport(
76
self, rawsock, protocol, sslcontext, waiter=None,
77
*, server_side=False, server_hostname=None,
78
extra=None, server=None,
79
ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
80
ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
81
):
82
self._ensure_fd_no_transport(rawsock)
83
ssl_protocol = sslproto.SSLProtocol(
84
self, protocol, sslcontext, waiter,
85
server_side, server_hostname,
86
ssl_handshake_timeout=ssl_handshake_timeout,
87
ssl_shutdown_timeout=ssl_shutdown_timeout
88
)
89
_SelectorSocketTransport(self, rawsock, ssl_protocol,
90
extra=extra, server=server)
91
return ssl_protocol._app_transport
92
93
def _make_datagram_transport(self, sock, protocol,
94
address=None, waiter=None, extra=None):
95
self._ensure_fd_no_transport(sock)
96
return _SelectorDatagramTransport(self, sock, protocol,
97
address, waiter, extra)
98
99
def close(self):
100
if self.is_running():
101
raise RuntimeError("Cannot close a running event loop")
102
if self.is_closed():
103
return
104
self._close_self_pipe()
105
super().close()
106
if self._selector is not None:
107
self._selector.close()
108
self._selector = None
109
110
def _close_self_pipe(self):
111
self._remove_reader(self._ssock.fileno())
112
self._ssock.close()
113
self._ssock = None
114
self._csock.close()
115
self._csock = None
116
self._internal_fds -= 1
117
118
def _make_self_pipe(self):
119
# A self-socket, really. :-)
120
self._ssock, self._csock = socket.socketpair()
121
self._ssock.setblocking(False)
122
self._csock.setblocking(False)
123
self._internal_fds += 1
124
self._add_reader(self._ssock.fileno(), self._read_from_self)
125
126
def _process_self_data(self, data):
127
pass
128
129
def _read_from_self(self):
130
while True:
131
try:
132
data = self._ssock.recv(4096)
133
if not data:
134
break
135
self._process_self_data(data)
136
except InterruptedError:
137
continue
138
except BlockingIOError:
139
break
140
141
def _write_to_self(self):
142
# This may be called from a different thread, possibly after
143
# _close_self_pipe() has been called or even while it is
144
# running. Guard for self._csock being None or closed. When
145
# a socket is closed, send() raises OSError (with errno set to
146
# EBADF, but let's not rely on the exact error code).
147
csock = self._csock
148
if csock is None:
149
return
150
151
try:
152
csock.send(b'\0')
153
except OSError:
154
if self._debug:
155
logger.debug("Fail to write a null byte into the "
156
"self-pipe socket",
157
exc_info=True)
158
159
def _start_serving(self, protocol_factory, sock,
160
sslcontext=None, server=None, backlog=100,
161
ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
162
ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
163
self._add_reader(sock.fileno(), self._accept_connection,
164
protocol_factory, sock, sslcontext, server, backlog,
165
ssl_handshake_timeout, ssl_shutdown_timeout)
166
167
def _accept_connection(
168
self, protocol_factory, sock,
169
sslcontext=None, server=None, backlog=100,
170
ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
171
ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
172
# This method is only called once for each event loop tick where the
173
# listening socket has triggered an EVENT_READ. There may be multiple
174
# connections waiting for an .accept() so it is called in a loop.
175
# See https://bugs.python.org/issue27906 for more details.
176
for _ in range(backlog):
177
try:
178
conn, addr = sock.accept()
179
if self._debug:
180
logger.debug("%r got a new connection from %r: %r",
181
server, addr, conn)
182
conn.setblocking(False)
183
except (BlockingIOError, InterruptedError, ConnectionAbortedError):
184
# Early exit because the socket accept buffer is empty.
185
return None
186
except OSError as exc:
187
# There's nowhere to send the error, so just log it.
188
if exc.errno in (errno.EMFILE, errno.ENFILE,
189
errno.ENOBUFS, errno.ENOMEM):
190
# Some platforms (e.g. Linux keep reporting the FD as
191
# ready, so we remove the read handler temporarily.
192
# We'll try again in a while.
193
self.call_exception_handler({
194
'message': 'socket.accept() out of system resource',
195
'exception': exc,
196
'socket': trsock.TransportSocket(sock),
197
})
198
self._remove_reader(sock.fileno())
199
self.call_later(constants.ACCEPT_RETRY_DELAY,
200
self._start_serving,
201
protocol_factory, sock, sslcontext, server,
202
backlog, ssl_handshake_timeout,
203
ssl_shutdown_timeout)
204
else:
205
raise # The event loop will catch, log and ignore it.
206
else:
207
extra = {'peername': addr}
208
accept = self._accept_connection2(
209
protocol_factory, conn, extra, sslcontext, server,
210
ssl_handshake_timeout, ssl_shutdown_timeout)
211
self.create_task(accept)
212
213
async def _accept_connection2(
214
self, protocol_factory, conn, extra,
215
sslcontext=None, server=None,
216
ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
217
ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
218
protocol = None
219
transport = None
220
try:
221
protocol = protocol_factory()
222
waiter = self.create_future()
223
if sslcontext:
224
transport = self._make_ssl_transport(
225
conn, protocol, sslcontext, waiter=waiter,
226
server_side=True, extra=extra, server=server,
227
ssl_handshake_timeout=ssl_handshake_timeout,
228
ssl_shutdown_timeout=ssl_shutdown_timeout)
229
else:
230
transport = self._make_socket_transport(
231
conn, protocol, waiter=waiter, extra=extra,
232
server=server)
233
234
try:
235
await waiter
236
except BaseException:
237
transport.close()
238
raise
239
# It's now up to the protocol to handle the connection.
240
241
except (SystemExit, KeyboardInterrupt):
242
raise
243
except BaseException as exc:
244
if self._debug:
245
context = {
246
'message':
247
'Error on transport creation for incoming connection',
248
'exception': exc,
249
}
250
if protocol is not None:
251
context['protocol'] = protocol
252
if transport is not None:
253
context['transport'] = transport
254
self.call_exception_handler(context)
255
256
def _ensure_fd_no_transport(self, fd):
257
fileno = fd
258
if not isinstance(fileno, int):
259
try:
260
fileno = int(fileno.fileno())
261
except (AttributeError, TypeError, ValueError):
262
# This code matches selectors._fileobj_to_fd function.
263
raise ValueError(f"Invalid file object: {fd!r}") from None
264
try:
265
transport = self._transports[fileno]
266
except KeyError:
267
pass
268
else:
269
if not transport.is_closing():
270
raise RuntimeError(
271
f'File descriptor {fd!r} is used by transport '
272
f'{transport!r}')
273
274
def _add_reader(self, fd, callback, *args):
275
self._check_closed()
276
handle = events.Handle(callback, args, self, None)
277
try:
278
key = self._selector.get_key(fd)
279
except KeyError:
280
self._selector.register(fd, selectors.EVENT_READ,
281
(handle, None))
282
else:
283
mask, (reader, writer) = key.events, key.data
284
self._selector.modify(fd, mask | selectors.EVENT_READ,
285
(handle, writer))
286
if reader is not None:
287
reader.cancel()
288
return handle
289
290
def _remove_reader(self, fd):
291
if self.is_closed():
292
return False
293
try:
294
key = self._selector.get_key(fd)
295
except KeyError:
296
return False
297
else:
298
mask, (reader, writer) = key.events, key.data
299
mask &= ~selectors.EVENT_READ
300
if not mask:
301
self._selector.unregister(fd)
302
else:
303
self._selector.modify(fd, mask, (None, writer))
304
305
if reader is not None:
306
reader.cancel()
307
return True
308
else:
309
return False
310
311
def _add_writer(self, fd, callback, *args):
312
self._check_closed()
313
handle = events.Handle(callback, args, self, None)
314
try:
315
key = self._selector.get_key(fd)
316
except KeyError:
317
self._selector.register(fd, selectors.EVENT_WRITE,
318
(None, handle))
319
else:
320
mask, (reader, writer) = key.events, key.data
321
self._selector.modify(fd, mask | selectors.EVENT_WRITE,
322
(reader, handle))
323
if writer is not None:
324
writer.cancel()
325
return handle
326
327
def _remove_writer(self, fd):
328
"""Remove a writer callback."""
329
if self.is_closed():
330
return False
331
try:
332
key = self._selector.get_key(fd)
333
except KeyError:
334
return False
335
else:
336
mask, (reader, writer) = key.events, key.data
337
# Remove both writer and connector.
338
mask &= ~selectors.EVENT_WRITE
339
if not mask:
340
self._selector.unregister(fd)
341
else:
342
self._selector.modify(fd, mask, (reader, None))
343
344
if writer is not None:
345
writer.cancel()
346
return True
347
else:
348
return False
349
350
def add_reader(self, fd, callback, *args):
351
"""Add a reader callback."""
352
self._ensure_fd_no_transport(fd)
353
self._add_reader(fd, callback, *args)
354
355
def remove_reader(self, fd):
356
"""Remove a reader callback."""
357
self._ensure_fd_no_transport(fd)
358
return self._remove_reader(fd)
359
360
def add_writer(self, fd, callback, *args):
361
"""Add a writer callback.."""
362
self._ensure_fd_no_transport(fd)
363
self._add_writer(fd, callback, *args)
364
365
def remove_writer(self, fd):
366
"""Remove a writer callback."""
367
self._ensure_fd_no_transport(fd)
368
return self._remove_writer(fd)
369
370
async def sock_recv(self, sock, n):
371
"""Receive data from the socket.
372
373
The return value is a bytes object representing the data received.
374
The maximum amount of data to be received at once is specified by
375
nbytes.
376
"""
377
base_events._check_ssl_socket(sock)
378
if self._debug and sock.gettimeout() != 0:
379
raise ValueError("the socket must be non-blocking")
380
try:
381
return sock.recv(n)
382
except (BlockingIOError, InterruptedError):
383
pass
384
fut = self.create_future()
385
fd = sock.fileno()
386
self._ensure_fd_no_transport(fd)
387
handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
388
fut.add_done_callback(
389
functools.partial(self._sock_read_done, fd, handle=handle))
390
return await fut
391
392
def _sock_read_done(self, fd, fut, handle=None):
393
if handle is None or not handle.cancelled():
394
self.remove_reader(fd)
395
396
def _sock_recv(self, fut, sock, n):
397
# _sock_recv() can add itself as an I/O callback if the operation can't
398
# be done immediately. Don't use it directly, call sock_recv().
399
if fut.done():
400
return
401
try:
402
data = sock.recv(n)
403
except (BlockingIOError, InterruptedError):
404
return # try again next time
405
except (SystemExit, KeyboardInterrupt):
406
raise
407
except BaseException as exc:
408
fut.set_exception(exc)
409
else:
410
fut.set_result(data)
411
412
async def sock_recv_into(self, sock, buf):
413
"""Receive data from the socket.
414
415
The received data is written into *buf* (a writable buffer).
416
The return value is the number of bytes written.
417
"""
418
base_events._check_ssl_socket(sock)
419
if self._debug and sock.gettimeout() != 0:
420
raise ValueError("the socket must be non-blocking")
421
try:
422
return sock.recv_into(buf)
423
except (BlockingIOError, InterruptedError):
424
pass
425
fut = self.create_future()
426
fd = sock.fileno()
427
self._ensure_fd_no_transport(fd)
428
handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
429
fut.add_done_callback(
430
functools.partial(self._sock_read_done, fd, handle=handle))
431
return await fut
432
433
def _sock_recv_into(self, fut, sock, buf):
434
# _sock_recv_into() can add itself as an I/O callback if the operation
435
# can't be done immediately. Don't use it directly, call
436
# sock_recv_into().
437
if fut.done():
438
return
439
try:
440
nbytes = sock.recv_into(buf)
441
except (BlockingIOError, InterruptedError):
442
return # try again next time
443
except (SystemExit, KeyboardInterrupt):
444
raise
445
except BaseException as exc:
446
fut.set_exception(exc)
447
else:
448
fut.set_result(nbytes)
449
450
async def sock_recvfrom(self, sock, bufsize):
451
"""Receive a datagram from a datagram socket.
452
453
The return value is a tuple of (bytes, address) representing the
454
datagram received and the address it came from.
455
The maximum amount of data to be received at once is specified by
456
nbytes.
457
"""
458
base_events._check_ssl_socket(sock)
459
if self._debug and sock.gettimeout() != 0:
460
raise ValueError("the socket must be non-blocking")
461
try:
462
return sock.recvfrom(bufsize)
463
except (BlockingIOError, InterruptedError):
464
pass
465
fut = self.create_future()
466
fd = sock.fileno()
467
self._ensure_fd_no_transport(fd)
468
handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
469
fut.add_done_callback(
470
functools.partial(self._sock_read_done, fd, handle=handle))
471
return await fut
472
473
def _sock_recvfrom(self, fut, sock, bufsize):
474
# _sock_recvfrom() can add itself as an I/O callback if the operation
475
# can't be done immediately. Don't use it directly, call
476
# sock_recvfrom().
477
if fut.done():
478
return
479
try:
480
result = sock.recvfrom(bufsize)
481
except (BlockingIOError, InterruptedError):
482
return # try again next time
483
except (SystemExit, KeyboardInterrupt):
484
raise
485
except BaseException as exc:
486
fut.set_exception(exc)
487
else:
488
fut.set_result(result)
489
490
async def sock_recvfrom_into(self, sock, buf, nbytes=0):
491
"""Receive data from the socket.
492
493
The received data is written into *buf* (a writable buffer).
494
The return value is a tuple of (number of bytes written, address).
495
"""
496
base_events._check_ssl_socket(sock)
497
if self._debug and sock.gettimeout() != 0:
498
raise ValueError("the socket must be non-blocking")
499
if not nbytes:
500
nbytes = len(buf)
501
502
try:
503
return sock.recvfrom_into(buf, nbytes)
504
except (BlockingIOError, InterruptedError):
505
pass
506
fut = self.create_future()
507
fd = sock.fileno()
508
self._ensure_fd_no_transport(fd)
509
handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
510
nbytes)
511
fut.add_done_callback(
512
functools.partial(self._sock_read_done, fd, handle=handle))
513
return await fut
514
515
def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
516
# _sock_recv_into() can add itself as an I/O callback if the operation
517
# can't be done immediately. Don't use it directly, call
518
# sock_recv_into().
519
if fut.done():
520
return
521
try:
522
result = sock.recvfrom_into(buf, bufsize)
523
except (BlockingIOError, InterruptedError):
524
return # try again next time
525
except (SystemExit, KeyboardInterrupt):
526
raise
527
except BaseException as exc:
528
fut.set_exception(exc)
529
else:
530
fut.set_result(result)
531
532
async def sock_sendall(self, sock, data):
533
"""Send data to the socket.
534
535
The socket must be connected to a remote socket. This method continues
536
to send data from data until either all data has been sent or an
537
error occurs. None is returned on success. On error, an exception is
538
raised, and there is no way to determine how much data, if any, was
539
successfully processed by the receiving end of the connection.
540
"""
541
base_events._check_ssl_socket(sock)
542
if self._debug and sock.gettimeout() != 0:
543
raise ValueError("the socket must be non-blocking")
544
try:
545
n = sock.send(data)
546
except (BlockingIOError, InterruptedError):
547
n = 0
548
549
if n == len(data):
550
# all data sent
551
return
552
553
fut = self.create_future()
554
fd = sock.fileno()
555
self._ensure_fd_no_transport(fd)
556
# use a trick with a list in closure to store a mutable state
557
handle = self._add_writer(fd, self._sock_sendall, fut, sock,
558
memoryview(data), [n])
559
fut.add_done_callback(
560
functools.partial(self._sock_write_done, fd, handle=handle))
561
return await fut
562
563
def _sock_sendall(self, fut, sock, view, pos):
564
if fut.done():
565
# Future cancellation can be scheduled on previous loop iteration
566
return
567
start = pos[0]
568
try:
569
n = sock.send(view[start:])
570
except (BlockingIOError, InterruptedError):
571
return
572
except (SystemExit, KeyboardInterrupt):
573
raise
574
except BaseException as exc:
575
fut.set_exception(exc)
576
return
577
578
start += n
579
580
if start == len(view):
581
fut.set_result(None)
582
else:
583
pos[0] = start
584
585
async def sock_sendto(self, sock, data, address):
586
"""Send data to the socket.
587
588
The socket must be connected to a remote socket. This method continues
589
to send data from data until either all data has been sent or an
590
error occurs. None is returned on success. On error, an exception is
591
raised, and there is no way to determine how much data, if any, was
592
successfully processed by the receiving end of the connection.
593
"""
594
base_events._check_ssl_socket(sock)
595
if self._debug and sock.gettimeout() != 0:
596
raise ValueError("the socket must be non-blocking")
597
try:
598
return sock.sendto(data, address)
599
except (BlockingIOError, InterruptedError):
600
pass
601
602
fut = self.create_future()
603
fd = sock.fileno()
604
self._ensure_fd_no_transport(fd)
605
# use a trick with a list in closure to store a mutable state
606
handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
607
address)
608
fut.add_done_callback(
609
functools.partial(self._sock_write_done, fd, handle=handle))
610
return await fut
611
612
def _sock_sendto(self, fut, sock, data, address):
613
if fut.done():
614
# Future cancellation can be scheduled on previous loop iteration
615
return
616
try:
617
n = sock.sendto(data, 0, address)
618
except (BlockingIOError, InterruptedError):
619
return
620
except (SystemExit, KeyboardInterrupt):
621
raise
622
except BaseException as exc:
623
fut.set_exception(exc)
624
else:
625
fut.set_result(n)
626
627
async def sock_connect(self, sock, address):
628
"""Connect to a remote socket at address.
629
630
This method is a coroutine.
631
"""
632
base_events._check_ssl_socket(sock)
633
if self._debug and sock.gettimeout() != 0:
634
raise ValueError("the socket must be non-blocking")
635
636
if sock.family == socket.AF_INET or (
637
base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
638
resolved = await self._ensure_resolved(
639
address, family=sock.family, type=sock.type, proto=sock.proto,
640
loop=self,
641
)
642
_, _, _, _, address = resolved[0]
643
644
fut = self.create_future()
645
self._sock_connect(fut, sock, address)
646
try:
647
return await fut
648
finally:
649
# Needed to break cycles when an exception occurs.
650
fut = None
651
652
def _sock_connect(self, fut, sock, address):
653
fd = sock.fileno()
654
try:
655
sock.connect(address)
656
except (BlockingIOError, InterruptedError):
657
# Issue #23618: When the C function connect() fails with EINTR, the
658
# connection runs in background. We have to wait until the socket
659
# becomes writable to be notified when the connection succeed or
660
# fails.
661
self._ensure_fd_no_transport(fd)
662
handle = self._add_writer(
663
fd, self._sock_connect_cb, fut, sock, address)
664
fut.add_done_callback(
665
functools.partial(self._sock_write_done, fd, handle=handle))
666
except (SystemExit, KeyboardInterrupt):
667
raise
668
except BaseException as exc:
669
fut.set_exception(exc)
670
else:
671
fut.set_result(None)
672
finally:
673
fut = None
674
675
def _sock_write_done(self, fd, fut, handle=None):
676
if handle is None or not handle.cancelled():
677
self.remove_writer(fd)
678
679
def _sock_connect_cb(self, fut, sock, address):
680
if fut.done():
681
return
682
683
try:
684
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
685
if err != 0:
686
# Jump to any except clause below.
687
raise OSError(err, f'Connect call failed {address}')
688
except (BlockingIOError, InterruptedError):
689
# socket is still registered, the callback will be retried later
690
pass
691
except (SystemExit, KeyboardInterrupt):
692
raise
693
except BaseException as exc:
694
fut.set_exception(exc)
695
else:
696
fut.set_result(None)
697
finally:
698
fut = None
699
700
async def sock_accept(self, sock):
701
"""Accept a connection.
702
703
The socket must be bound to an address and listening for connections.
704
The return value is a pair (conn, address) where conn is a new socket
705
object usable to send and receive data on the connection, and address
706
is the address bound to the socket on the other end of the connection.
707
"""
708
base_events._check_ssl_socket(sock)
709
if self._debug and sock.gettimeout() != 0:
710
raise ValueError("the socket must be non-blocking")
711
fut = self.create_future()
712
self._sock_accept(fut, sock)
713
return await fut
714
715
def _sock_accept(self, fut, sock):
716
fd = sock.fileno()
717
try:
718
conn, address = sock.accept()
719
conn.setblocking(False)
720
except (BlockingIOError, InterruptedError):
721
self._ensure_fd_no_transport(fd)
722
handle = self._add_reader(fd, self._sock_accept, fut, sock)
723
fut.add_done_callback(
724
functools.partial(self._sock_read_done, fd, handle=handle))
725
except (SystemExit, KeyboardInterrupt):
726
raise
727
except BaseException as exc:
728
fut.set_exception(exc)
729
else:
730
fut.set_result((conn, address))
731
732
async def _sendfile_native(self, transp, file, offset, count):
733
del self._transports[transp._sock_fd]
734
resume_reading = transp.is_reading()
735
transp.pause_reading()
736
await transp._make_empty_waiter()
737
try:
738
return await self.sock_sendfile(transp._sock, file, offset, count,
739
fallback=False)
740
finally:
741
transp._reset_empty_waiter()
742
if resume_reading:
743
transp.resume_reading()
744
self._transports[transp._sock_fd] = transp
745
746
def _process_events(self, event_list):
747
for key, mask in event_list:
748
fileobj, (reader, writer) = key.fileobj, key.data
749
if mask & selectors.EVENT_READ and reader is not None:
750
if reader._cancelled:
751
self._remove_reader(fileobj)
752
else:
753
self._add_callback(reader)
754
if mask & selectors.EVENT_WRITE and writer is not None:
755
if writer._cancelled:
756
self._remove_writer(fileobj)
757
else:
758
self._add_callback(writer)
759
760
def _stop_serving(self, sock):
761
self._remove_reader(sock.fileno())
762
sock.close()
763
764
765
class _SelectorTransport(transports._FlowControlMixin,
766
transports.Transport):
767
768
max_size = 256 * 1024 # Buffer size passed to recv().
769
770
# Attribute used in the destructor: it must be set even if the constructor
771
# is not called (see _SelectorSslTransport which may start by raising an
772
# exception)
773
_sock = None
774
775
def __init__(self, loop, sock, protocol, extra=None, server=None):
776
super().__init__(extra, loop)
777
self._extra['socket'] = trsock.TransportSocket(sock)
778
try:
779
self._extra['sockname'] = sock.getsockname()
780
except OSError:
781
self._extra['sockname'] = None
782
if 'peername' not in self._extra:
783
try:
784
self._extra['peername'] = sock.getpeername()
785
except socket.error:
786
self._extra['peername'] = None
787
self._sock = sock
788
self._sock_fd = sock.fileno()
789
790
self._protocol_connected = False
791
self.set_protocol(protocol)
792
793
self._server = server
794
self._buffer = collections.deque()
795
self._conn_lost = 0 # Set when call to connection_lost scheduled.
796
self._closing = False # Set when close() called.
797
self._paused = False # Set when pause_reading() called
798
799
if self._server is not None:
800
self._server._attach()
801
loop._transports[self._sock_fd] = self
802
803
def __repr__(self):
804
info = [self.__class__.__name__]
805
if self._sock is None:
806
info.append('closed')
807
elif self._closing:
808
info.append('closing')
809
info.append(f'fd={self._sock_fd}')
810
# test if the transport was closed
811
if self._loop is not None and not self._loop.is_closed():
812
polling = _test_selector_event(self._loop._selector,
813
self._sock_fd, selectors.EVENT_READ)
814
if polling:
815
info.append('read=polling')
816
else:
817
info.append('read=idle')
818
819
polling = _test_selector_event(self._loop._selector,
820
self._sock_fd,
821
selectors.EVENT_WRITE)
822
if polling:
823
state = 'polling'
824
else:
825
state = 'idle'
826
827
bufsize = self.get_write_buffer_size()
828
info.append(f'write=<{state}, bufsize={bufsize}>')
829
return '<{}>'.format(' '.join(info))
830
831
def abort(self):
832
self._force_close(None)
833
834
def set_protocol(self, protocol):
835
self._protocol = protocol
836
self._protocol_connected = True
837
838
def get_protocol(self):
839
return self._protocol
840
841
def is_closing(self):
842
return self._closing
843
844
def is_reading(self):
845
return not self.is_closing() and not self._paused
846
847
def pause_reading(self):
848
if not self.is_reading():
849
return
850
self._paused = True
851
self._loop._remove_reader(self._sock_fd)
852
if self._loop.get_debug():
853
logger.debug("%r pauses reading", self)
854
855
def resume_reading(self):
856
if self._closing or not self._paused:
857
return
858
self._paused = False
859
self._add_reader(self._sock_fd, self._read_ready)
860
if self._loop.get_debug():
861
logger.debug("%r resumes reading", self)
862
863
def close(self):
864
if self._closing:
865
return
866
self._closing = True
867
self._loop._remove_reader(self._sock_fd)
868
if not self._buffer:
869
self._conn_lost += 1
870
self._loop._remove_writer(self._sock_fd)
871
self._loop.call_soon(self._call_connection_lost, None)
872
873
def __del__(self, _warn=warnings.warn):
874
if self._sock is not None:
875
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
876
self._sock.close()
877
878
def _fatal_error(self, exc, message='Fatal error on transport'):
879
# Should be called from exception handler only.
880
if isinstance(exc, OSError):
881
if self._loop.get_debug():
882
logger.debug("%r: %s", self, message, exc_info=True)
883
else:
884
self._loop.call_exception_handler({
885
'message': message,
886
'exception': exc,
887
'transport': self,
888
'protocol': self._protocol,
889
})
890
self._force_close(exc)
891
892
def _force_close(self, exc):
893
if self._conn_lost:
894
return
895
if self._buffer:
896
self._buffer.clear()
897
self._loop._remove_writer(self._sock_fd)
898
if not self._closing:
899
self._closing = True
900
self._loop._remove_reader(self._sock_fd)
901
self._conn_lost += 1
902
self._loop.call_soon(self._call_connection_lost, exc)
903
904
def _call_connection_lost(self, exc):
905
try:
906
if self._protocol_connected:
907
self._protocol.connection_lost(exc)
908
finally:
909
self._sock.close()
910
self._sock = None
911
self._protocol = None
912
self._loop = None
913
server = self._server
914
if server is not None:
915
server._detach()
916
self._server = None
917
918
def get_write_buffer_size(self):
919
return sum(map(len, self._buffer))
920
921
def _add_reader(self, fd, callback, *args):
922
if not self.is_reading():
923
return
924
self._loop._add_reader(fd, callback, *args)
925
926
927
class _SelectorSocketTransport(_SelectorTransport):
928
929
_start_tls_compatible = True
930
_sendfile_compatible = constants._SendfileMode.TRY_NATIVE
931
932
def __init__(self, loop, sock, protocol, waiter=None,
933
extra=None, server=None):
934
935
self._read_ready_cb = None
936
super().__init__(loop, sock, protocol, extra, server)
937
self._eof = False
938
self._empty_waiter = None
939
if _HAS_SENDMSG:
940
self._write_ready = self._write_sendmsg
941
else:
942
self._write_ready = self._write_send
943
# Disable the Nagle algorithm -- small writes will be
944
# sent without waiting for the TCP ACK. This generally
945
# decreases the latency (in some cases significantly.)
946
base_events._set_nodelay(self._sock)
947
948
self._loop.call_soon(self._protocol.connection_made, self)
949
# only start reading when connection_made() has been called
950
self._loop.call_soon(self._add_reader,
951
self._sock_fd, self._read_ready)
952
if waiter is not None:
953
# only wake up the waiter when connection_made() has been called
954
self._loop.call_soon(futures._set_result_unless_cancelled,
955
waiter, None)
956
957
def set_protocol(self, protocol):
958
if isinstance(protocol, protocols.BufferedProtocol):
959
self._read_ready_cb = self._read_ready__get_buffer
960
else:
961
self._read_ready_cb = self._read_ready__data_received
962
963
super().set_protocol(protocol)
964
965
def _read_ready(self):
966
self._read_ready_cb()
967
968
def _read_ready__get_buffer(self):
969
if self._conn_lost:
970
return
971
972
try:
973
buf = self._protocol.get_buffer(-1)
974
if not len(buf):
975
raise RuntimeError('get_buffer() returned an empty buffer')
976
except (SystemExit, KeyboardInterrupt):
977
raise
978
except BaseException as exc:
979
self._fatal_error(
980
exc, 'Fatal error: protocol.get_buffer() call failed.')
981
return
982
983
try:
984
nbytes = self._sock.recv_into(buf)
985
except (BlockingIOError, InterruptedError):
986
return
987
except (SystemExit, KeyboardInterrupt):
988
raise
989
except BaseException as exc:
990
self._fatal_error(exc, 'Fatal read error on socket transport')
991
return
992
993
if not nbytes:
994
self._read_ready__on_eof()
995
return
996
997
try:
998
self._protocol.buffer_updated(nbytes)
999
except (SystemExit, KeyboardInterrupt):
1000
raise
1001
except BaseException as exc:
1002
self._fatal_error(
1003
exc, 'Fatal error: protocol.buffer_updated() call failed.')
1004
1005
def _read_ready__data_received(self):
1006
if self._conn_lost:
1007
return
1008
try:
1009
data = self._sock.recv(self.max_size)
1010
except (BlockingIOError, InterruptedError):
1011
return
1012
except (SystemExit, KeyboardInterrupt):
1013
raise
1014
except BaseException as exc:
1015
self._fatal_error(exc, 'Fatal read error on socket transport')
1016
return
1017
1018
if not data:
1019
self._read_ready__on_eof()
1020
return
1021
1022
try:
1023
self._protocol.data_received(data)
1024
except (SystemExit, KeyboardInterrupt):
1025
raise
1026
except BaseException as exc:
1027
self._fatal_error(
1028
exc, 'Fatal error: protocol.data_received() call failed.')
1029
1030
def _read_ready__on_eof(self):
1031
if self._loop.get_debug():
1032
logger.debug("%r received EOF", self)
1033
1034
try:
1035
keep_open = self._protocol.eof_received()
1036
except (SystemExit, KeyboardInterrupt):
1037
raise
1038
except BaseException as exc:
1039
self._fatal_error(
1040
exc, 'Fatal error: protocol.eof_received() call failed.')
1041
return
1042
1043
if keep_open:
1044
# We're keeping the connection open so the
1045
# protocol can write more, but we still can't
1046
# receive more, so remove the reader callback.
1047
self._loop._remove_reader(self._sock_fd)
1048
else:
1049
self.close()
1050
1051
def write(self, data):
1052
if not isinstance(data, (bytes, bytearray, memoryview)):
1053
raise TypeError(f'data argument must be a bytes-like object, '
1054
f'not {type(data).__name__!r}')
1055
if self._eof:
1056
raise RuntimeError('Cannot call write() after write_eof()')
1057
if self._empty_waiter is not None:
1058
raise RuntimeError('unable to write; sendfile is in progress')
1059
if not data:
1060
return
1061
1062
if self._conn_lost:
1063
if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1064
logger.warning('socket.send() raised exception.')
1065
self._conn_lost += 1
1066
return
1067
1068
if not self._buffer:
1069
# Optimization: try to send now.
1070
try:
1071
n = self._sock.send(data)
1072
except (BlockingIOError, InterruptedError):
1073
pass
1074
except (SystemExit, KeyboardInterrupt):
1075
raise
1076
except BaseException as exc:
1077
self._fatal_error(exc, 'Fatal write error on socket transport')
1078
return
1079
else:
1080
data = memoryview(data)[n:]
1081
if not data:
1082
return
1083
# Not all was written; register write handler.
1084
self._loop._add_writer(self._sock_fd, self._write_ready)
1085
1086
# Add it to the buffer.
1087
self._buffer.append(data)
1088
self._maybe_pause_protocol()
1089
1090
def _get_sendmsg_buffer(self):
1091
return itertools.islice(self._buffer, SC_IOV_MAX)
1092
1093
def _write_sendmsg(self):
1094
assert self._buffer, 'Data should not be empty'
1095
if self._conn_lost:
1096
return
1097
try:
1098
nbytes = self._sock.sendmsg(self._get_sendmsg_buffer())
1099
self._adjust_leftover_buffer(nbytes)
1100
except (BlockingIOError, InterruptedError):
1101
pass
1102
except (SystemExit, KeyboardInterrupt):
1103
raise
1104
except BaseException as exc:
1105
self._loop._remove_writer(self._sock_fd)
1106
self._buffer.clear()
1107
self._fatal_error(exc, 'Fatal write error on socket transport')
1108
if self._empty_waiter is not None:
1109
self._empty_waiter.set_exception(exc)
1110
else:
1111
self._maybe_resume_protocol() # May append to buffer.
1112
if not self._buffer:
1113
self._loop._remove_writer(self._sock_fd)
1114
if self._empty_waiter is not None:
1115
self._empty_waiter.set_result(None)
1116
if self._closing:
1117
self._call_connection_lost(None)
1118
elif self._eof:
1119
self._sock.shutdown(socket.SHUT_WR)
1120
1121
def _adjust_leftover_buffer(self, nbytes: int) -> None:
1122
buffer = self._buffer
1123
while nbytes:
1124
b = buffer.popleft()
1125
b_len = len(b)
1126
if b_len <= nbytes:
1127
nbytes -= b_len
1128
else:
1129
buffer.appendleft(b[nbytes:])
1130
break
1131
1132
def _write_send(self):
1133
assert self._buffer, 'Data should not be empty'
1134
if self._conn_lost:
1135
return
1136
try:
1137
buffer = self._buffer.popleft()
1138
n = self._sock.send(buffer)
1139
if n != len(buffer):
1140
# Not all data was written
1141
self._buffer.appendleft(buffer[n:])
1142
except (BlockingIOError, InterruptedError):
1143
pass
1144
except (SystemExit, KeyboardInterrupt):
1145
raise
1146
except BaseException as exc:
1147
self._loop._remove_writer(self._sock_fd)
1148
self._buffer.clear()
1149
self._fatal_error(exc, 'Fatal write error on socket transport')
1150
if self._empty_waiter is not None:
1151
self._empty_waiter.set_exception(exc)
1152
else:
1153
self._maybe_resume_protocol() # May append to buffer.
1154
if not self._buffer:
1155
self._loop._remove_writer(self._sock_fd)
1156
if self._empty_waiter is not None:
1157
self._empty_waiter.set_result(None)
1158
if self._closing:
1159
self._call_connection_lost(None)
1160
elif self._eof:
1161
self._sock.shutdown(socket.SHUT_WR)
1162
1163
def write_eof(self):
1164
if self._closing or self._eof:
1165
return
1166
self._eof = True
1167
if not self._buffer:
1168
self._sock.shutdown(socket.SHUT_WR)
1169
1170
def writelines(self, list_of_data):
1171
if self._eof:
1172
raise RuntimeError('Cannot call writelines() after write_eof()')
1173
if self._empty_waiter is not None:
1174
raise RuntimeError('unable to writelines; sendfile is in progress')
1175
if not list_of_data:
1176
return
1177
self._buffer.extend([memoryview(data) for data in list_of_data])
1178
self._write_ready()
1179
# If the entire buffer couldn't be written, register a write handler
1180
if self._buffer:
1181
self._loop._add_writer(self._sock_fd, self._write_ready)
1182
1183
def can_write_eof(self):
1184
return True
1185
1186
def _call_connection_lost(self, exc):
1187
super()._call_connection_lost(exc)
1188
if self._empty_waiter is not None:
1189
self._empty_waiter.set_exception(
1190
ConnectionError("Connection is closed by peer"))
1191
1192
def _make_empty_waiter(self):
1193
if self._empty_waiter is not None:
1194
raise RuntimeError("Empty waiter is already set")
1195
self._empty_waiter = self._loop.create_future()
1196
if not self._buffer:
1197
self._empty_waiter.set_result(None)
1198
return self._empty_waiter
1199
1200
def _reset_empty_waiter(self):
1201
self._empty_waiter = None
1202
1203
def close(self):
1204
self._read_ready_cb = None
1205
super().close()
1206
1207
1208
class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):
1209
1210
_buffer_factory = collections.deque
1211
1212
def __init__(self, loop, sock, protocol, address=None,
1213
waiter=None, extra=None):
1214
super().__init__(loop, sock, protocol, extra)
1215
self._address = address
1216
self._buffer_size = 0
1217
self._loop.call_soon(self._protocol.connection_made, self)
1218
# only start reading when connection_made() has been called
1219
self._loop.call_soon(self._add_reader,
1220
self._sock_fd, self._read_ready)
1221
if waiter is not None:
1222
# only wake up the waiter when connection_made() has been called
1223
self._loop.call_soon(futures._set_result_unless_cancelled,
1224
waiter, None)
1225
1226
def get_write_buffer_size(self):
1227
return self._buffer_size
1228
1229
def _read_ready(self):
1230
if self._conn_lost:
1231
return
1232
try:
1233
data, addr = self._sock.recvfrom(self.max_size)
1234
except (BlockingIOError, InterruptedError):
1235
pass
1236
except OSError as exc:
1237
self._protocol.error_received(exc)
1238
except (SystemExit, KeyboardInterrupt):
1239
raise
1240
except BaseException as exc:
1241
self._fatal_error(exc, 'Fatal read error on datagram transport')
1242
else:
1243
self._protocol.datagram_received(data, addr)
1244
1245
def sendto(self, data, addr=None):
1246
if not isinstance(data, (bytes, bytearray, memoryview)):
1247
raise TypeError(f'data argument must be a bytes-like object, '
1248
f'not {type(data).__name__!r}')
1249
if not data:
1250
return
1251
1252
if self._address:
1253
if addr not in (None, self._address):
1254
raise ValueError(
1255
f'Invalid address: must be None or {self._address}')
1256
addr = self._address
1257
1258
if self._conn_lost and self._address:
1259
if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1260
logger.warning('socket.send() raised exception.')
1261
self._conn_lost += 1
1262
return
1263
1264
if not self._buffer:
1265
# Attempt to send it right away first.
1266
try:
1267
if self._extra['peername']:
1268
self._sock.send(data)
1269
else:
1270
self._sock.sendto(data, addr)
1271
return
1272
except (BlockingIOError, InterruptedError):
1273
self._loop._add_writer(self._sock_fd, self._sendto_ready)
1274
except OSError as exc:
1275
self._protocol.error_received(exc)
1276
return
1277
except (SystemExit, KeyboardInterrupt):
1278
raise
1279
except BaseException as exc:
1280
self._fatal_error(
1281
exc, 'Fatal write error on datagram transport')
1282
return
1283
1284
# Ensure that what we buffer is immutable.
1285
self._buffer.append((bytes(data), addr))
1286
self._buffer_size += len(data)
1287
self._maybe_pause_protocol()
1288
1289
def _sendto_ready(self):
1290
while self._buffer:
1291
data, addr = self._buffer.popleft()
1292
self._buffer_size -= len(data)
1293
try:
1294
if self._extra['peername']:
1295
self._sock.send(data)
1296
else:
1297
self._sock.sendto(data, addr)
1298
except (BlockingIOError, InterruptedError):
1299
self._buffer.appendleft((data, addr)) # Try again later.
1300
self._buffer_size += len(data)
1301
break
1302
except OSError as exc:
1303
self._protocol.error_received(exc)
1304
return
1305
except (SystemExit, KeyboardInterrupt):
1306
raise
1307
except BaseException as exc:
1308
self._fatal_error(
1309
exc, 'Fatal write error on datagram transport')
1310
return
1311
1312
self._maybe_resume_protocol() # May append to buffer.
1313
if not self._buffer:
1314
self._loop._remove_writer(self._sock_fd)
1315
if self._closing:
1316
self._call_connection_lost(None)
1317
1318