Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/base_events.py
12 views
1
"""Base implementation of event loop.
2
3
The event loop can be broken up into a multiplexer (the part
4
responsible for notifying us of I/O events) and the event loop proper,
5
which wraps a multiplexer with functionality for scheduling callbacks,
6
immediately or at a given time in the future.
7
8
Whenever a public API takes a callback, subsequent positional
9
arguments will be passed to the callback if/when it is called. This
10
avoids the proliferation of trivial lambdas implementing closures.
11
Keyword arguments for the callback are not supported; this is a
12
conscious design decision, leaving the door open for keyword arguments
13
to modify the meaning of the API call itself.
14
"""
15
16
import collections
17
import collections.abc
18
import concurrent.futures
19
import functools
20
import heapq
21
import itertools
22
import os
23
import socket
24
import stat
25
import subprocess
26
import threading
27
import time
28
import traceback
29
import sys
30
import warnings
31
import weakref
32
33
try:
34
import ssl
35
except ImportError: # pragma: no cover
36
ssl = None
37
38
from . import constants
39
from . import coroutines
40
from . import events
41
from . import exceptions
42
from . import futures
43
from . import protocols
44
from . import sslproto
45
from . import staggered
46
from . import tasks
47
from . import transports
48
from . import trsock
49
from .log import logger
50
51
52
__all__ = 'BaseEventLoop','Server',
53
54
55
# Minimum number of _scheduled timer handles before cleanup of
56
# cancelled handles is performed.
57
_MIN_SCHEDULED_TIMER_HANDLES = 100
58
59
# Minimum fraction of _scheduled timer handles that are cancelled
60
# before cleanup of cancelled handles is performed.
61
_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
62
63
64
_HAS_IPv6 = hasattr(socket, 'AF_INET6')
65
66
# Maximum timeout passed to select to avoid OS limitations
67
MAXIMUM_SELECT_TIMEOUT = 24 * 3600
68
69
70
def _format_handle(handle):
71
cb = handle._callback
72
if isinstance(getattr(cb, '__self__', None), tasks.Task):
73
# format the task
74
return repr(cb.__self__)
75
else:
76
return str(handle)
77
78
79
def _format_pipe(fd):
80
if fd == subprocess.PIPE:
81
return '<pipe>'
82
elif fd == subprocess.STDOUT:
83
return '<stdout>'
84
else:
85
return repr(fd)
86
87
88
def _set_reuseport(sock):
89
if not hasattr(socket, 'SO_REUSEPORT'):
90
raise ValueError('reuse_port not supported by socket module')
91
else:
92
try:
93
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
94
except OSError:
95
raise ValueError('reuse_port not supported by socket module, '
96
'SO_REUSEPORT defined but not implemented.')
97
98
99
def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
100
# Try to skip getaddrinfo if "host" is already an IP. Users might have
101
# handled name resolution in their own code and pass in resolved IPs.
102
if not hasattr(socket, 'inet_pton'):
103
return
104
105
if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
106
host is None:
107
return None
108
109
if type == socket.SOCK_STREAM:
110
proto = socket.IPPROTO_TCP
111
elif type == socket.SOCK_DGRAM:
112
proto = socket.IPPROTO_UDP
113
else:
114
return None
115
116
if port is None:
117
port = 0
118
elif isinstance(port, bytes) and port == b'':
119
port = 0
120
elif isinstance(port, str) and port == '':
121
port = 0
122
else:
123
# If port's a service name like "http", don't skip getaddrinfo.
124
try:
125
port = int(port)
126
except (TypeError, ValueError):
127
return None
128
129
if family == socket.AF_UNSPEC:
130
afs = [socket.AF_INET]
131
if _HAS_IPv6:
132
afs.append(socket.AF_INET6)
133
else:
134
afs = [family]
135
136
if isinstance(host, bytes):
137
host = host.decode('idna')
138
if '%' in host:
139
# Linux's inet_pton doesn't accept an IPv6 zone index after host,
140
# like '::1%lo0'.
141
return None
142
143
for af in afs:
144
try:
145
socket.inet_pton(af, host)
146
# The host has already been resolved.
147
if _HAS_IPv6 and af == socket.AF_INET6:
148
return af, type, proto, '', (host, port, flowinfo, scopeid)
149
else:
150
return af, type, proto, '', (host, port)
151
except OSError:
152
pass
153
154
# "host" is not an IP address.
155
return None
156
157
158
def _interleave_addrinfos(addrinfos, first_address_family_count=1):
159
"""Interleave list of addrinfo tuples by family."""
160
# Group addresses by family
161
addrinfos_by_family = collections.OrderedDict()
162
for addr in addrinfos:
163
family = addr[0]
164
if family not in addrinfos_by_family:
165
addrinfos_by_family[family] = []
166
addrinfos_by_family[family].append(addr)
167
addrinfos_lists = list(addrinfos_by_family.values())
168
169
reordered = []
170
if first_address_family_count > 1:
171
reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
172
del addrinfos_lists[0][:first_address_family_count - 1]
173
reordered.extend(
174
a for a in itertools.chain.from_iterable(
175
itertools.zip_longest(*addrinfos_lists)
176
) if a is not None)
177
return reordered
178
179
180
def _run_until_complete_cb(fut):
181
if not fut.cancelled():
182
exc = fut.exception()
183
if isinstance(exc, (SystemExit, KeyboardInterrupt)):
184
# Issue #22429: run_forever() already finished, no need to
185
# stop it.
186
return
187
futures._get_loop(fut).stop()
188
189
190
if hasattr(socket, 'TCP_NODELAY'):
191
def _set_nodelay(sock):
192
if (sock.family in {socket.AF_INET, socket.AF_INET6} and
193
sock.type == socket.SOCK_STREAM and
194
sock.proto == socket.IPPROTO_TCP):
195
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
196
else:
197
def _set_nodelay(sock):
198
pass
199
200
201
def _check_ssl_socket(sock):
202
if ssl is not None and isinstance(sock, ssl.SSLSocket):
203
raise TypeError("Socket cannot be of type SSLSocket")
204
205
206
class _SendfileFallbackProtocol(protocols.Protocol):
207
def __init__(self, transp):
208
if not isinstance(transp, transports._FlowControlMixin):
209
raise TypeError("transport should be _FlowControlMixin instance")
210
self._transport = transp
211
self._proto = transp.get_protocol()
212
self._should_resume_reading = transp.is_reading()
213
self._should_resume_writing = transp._protocol_paused
214
transp.pause_reading()
215
transp.set_protocol(self)
216
if self._should_resume_writing:
217
self._write_ready_fut = self._transport._loop.create_future()
218
else:
219
self._write_ready_fut = None
220
221
async def drain(self):
222
if self._transport.is_closing():
223
raise ConnectionError("Connection closed by peer")
224
fut = self._write_ready_fut
225
if fut is None:
226
return
227
await fut
228
229
def connection_made(self, transport):
230
raise RuntimeError("Invalid state: "
231
"connection should have been established already.")
232
233
def connection_lost(self, exc):
234
if self._write_ready_fut is not None:
235
# Never happens if peer disconnects after sending the whole content
236
# Thus disconnection is always an exception from user perspective
237
if exc is None:
238
self._write_ready_fut.set_exception(
239
ConnectionError("Connection is closed by peer"))
240
else:
241
self._write_ready_fut.set_exception(exc)
242
self._proto.connection_lost(exc)
243
244
def pause_writing(self):
245
if self._write_ready_fut is not None:
246
return
247
self._write_ready_fut = self._transport._loop.create_future()
248
249
def resume_writing(self):
250
if self._write_ready_fut is None:
251
return
252
self._write_ready_fut.set_result(False)
253
self._write_ready_fut = None
254
255
def data_received(self, data):
256
raise RuntimeError("Invalid state: reading should be paused")
257
258
def eof_received(self):
259
raise RuntimeError("Invalid state: reading should be paused")
260
261
async def restore(self):
262
self._transport.set_protocol(self._proto)
263
if self._should_resume_reading:
264
self._transport.resume_reading()
265
if self._write_ready_fut is not None:
266
# Cancel the future.
267
# Basically it has no effect because protocol is switched back,
268
# no code should wait for it anymore.
269
self._write_ready_fut.cancel()
270
if self._should_resume_writing:
271
self._proto.resume_writing()
272
273
274
class Server(events.AbstractServer):
275
276
def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
277
ssl_handshake_timeout, ssl_shutdown_timeout=None):
278
self._loop = loop
279
self._sockets = sockets
280
self._active_count = 0
281
self._waiters = []
282
self._protocol_factory = protocol_factory
283
self._backlog = backlog
284
self._ssl_context = ssl_context
285
self._ssl_handshake_timeout = ssl_handshake_timeout
286
self._ssl_shutdown_timeout = ssl_shutdown_timeout
287
self._serving = False
288
self._serving_forever_fut = None
289
290
def __repr__(self):
291
return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
292
293
def _attach(self):
294
assert self._sockets is not None
295
self._active_count += 1
296
297
def _detach(self):
298
assert self._active_count > 0
299
self._active_count -= 1
300
if self._active_count == 0 and self._sockets is None:
301
self._wakeup()
302
303
def _wakeup(self):
304
waiters = self._waiters
305
self._waiters = None
306
for waiter in waiters:
307
if not waiter.done():
308
waiter.set_result(waiter)
309
310
def _start_serving(self):
311
if self._serving:
312
return
313
self._serving = True
314
for sock in self._sockets:
315
sock.listen(self._backlog)
316
self._loop._start_serving(
317
self._protocol_factory, sock, self._ssl_context,
318
self, self._backlog, self._ssl_handshake_timeout,
319
self._ssl_shutdown_timeout)
320
321
def get_loop(self):
322
return self._loop
323
324
def is_serving(self):
325
return self._serving
326
327
@property
328
def sockets(self):
329
if self._sockets is None:
330
return ()
331
return tuple(trsock.TransportSocket(s) for s in self._sockets)
332
333
def close(self):
334
sockets = self._sockets
335
if sockets is None:
336
return
337
self._sockets = None
338
339
for sock in sockets:
340
self._loop._stop_serving(sock)
341
342
self._serving = False
343
344
if (self._serving_forever_fut is not None and
345
not self._serving_forever_fut.done()):
346
self._serving_forever_fut.cancel()
347
self._serving_forever_fut = None
348
349
if self._active_count == 0:
350
self._wakeup()
351
352
async def start_serving(self):
353
self._start_serving()
354
# Skip one loop iteration so that all 'loop.add_reader'
355
# go through.
356
await tasks.sleep(0)
357
358
async def serve_forever(self):
359
if self._serving_forever_fut is not None:
360
raise RuntimeError(
361
f'server {self!r} is already being awaited on serve_forever()')
362
if self._sockets is None:
363
raise RuntimeError(f'server {self!r} is closed')
364
365
self._start_serving()
366
self._serving_forever_fut = self._loop.create_future()
367
368
try:
369
await self._serving_forever_fut
370
except exceptions.CancelledError:
371
try:
372
self.close()
373
await self.wait_closed()
374
finally:
375
raise
376
finally:
377
self._serving_forever_fut = None
378
379
async def wait_closed(self):
380
if self._waiters is None or self._active_count == 0:
381
return
382
waiter = self._loop.create_future()
383
self._waiters.append(waiter)
384
await waiter
385
386
387
class BaseEventLoop(events.AbstractEventLoop):
388
389
def __init__(self):
390
self._timer_cancelled_count = 0
391
self._closed = False
392
self._stopping = False
393
self._ready = collections.deque()
394
self._scheduled = []
395
self._default_executor = None
396
self._internal_fds = 0
397
# Identifier of the thread running the event loop, or None if the
398
# event loop is not running
399
self._thread_id = None
400
self._clock_resolution = time.get_clock_info('monotonic').resolution
401
self._exception_handler = None
402
self.set_debug(coroutines._is_debug_mode())
403
# In debug mode, if the execution of a callback or a step of a task
404
# exceed this duration in seconds, the slow callback/task is logged.
405
self.slow_callback_duration = 0.1
406
self._current_handle = None
407
self._task_factory = None
408
self._coroutine_origin_tracking_enabled = False
409
self._coroutine_origin_tracking_saved_depth = None
410
411
# A weak set of all asynchronous generators that are
412
# being iterated by the loop.
413
self._asyncgens = weakref.WeakSet()
414
# Set to True when `loop.shutdown_asyncgens` is called.
415
self._asyncgens_shutdown_called = False
416
# Set to True when `loop.shutdown_default_executor` is called.
417
self._executor_shutdown_called = False
418
419
def __repr__(self):
420
return (
421
f'<{self.__class__.__name__} running={self.is_running()} '
422
f'closed={self.is_closed()} debug={self.get_debug()}>'
423
)
424
425
def create_future(self):
426
"""Create a Future object attached to the loop."""
427
return futures.Future(loop=self)
428
429
def create_task(self, coro, *, name=None, context=None):
430
"""Schedule a coroutine object.
431
432
Return a task object.
433
"""
434
self._check_closed()
435
if self._task_factory is None:
436
task = tasks.Task(coro, loop=self, name=name, context=context)
437
if task._source_traceback:
438
del task._source_traceback[-1]
439
else:
440
if context is None:
441
# Use legacy API if context is not needed
442
task = self._task_factory(self, coro)
443
else:
444
task = self._task_factory(self, coro, context=context)
445
446
task.set_name(name)
447
448
return task
449
450
def set_task_factory(self, factory):
451
"""Set a task factory that will be used by loop.create_task().
452
453
If factory is None the default task factory will be set.
454
455
If factory is a callable, it should have a signature matching
456
'(loop, coro)', where 'loop' will be a reference to the active
457
event loop, 'coro' will be a coroutine object. The callable
458
must return a Future.
459
"""
460
if factory is not None and not callable(factory):
461
raise TypeError('task factory must be a callable or None')
462
self._task_factory = factory
463
464
def get_task_factory(self):
465
"""Return a task factory, or None if the default one is in use."""
466
return self._task_factory
467
468
def _make_socket_transport(self, sock, protocol, waiter=None, *,
469
extra=None, server=None):
470
"""Create socket transport."""
471
raise NotImplementedError
472
473
def _make_ssl_transport(
474
self, rawsock, protocol, sslcontext, waiter=None,
475
*, server_side=False, server_hostname=None,
476
extra=None, server=None,
477
ssl_handshake_timeout=None,
478
ssl_shutdown_timeout=None,
479
call_connection_made=True):
480
"""Create SSL transport."""
481
raise NotImplementedError
482
483
def _make_datagram_transport(self, sock, protocol,
484
address=None, waiter=None, extra=None):
485
"""Create datagram transport."""
486
raise NotImplementedError
487
488
def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
489
extra=None):
490
"""Create read pipe transport."""
491
raise NotImplementedError
492
493
def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
494
extra=None):
495
"""Create write pipe transport."""
496
raise NotImplementedError
497
498
async def _make_subprocess_transport(self, protocol, args, shell,
499
stdin, stdout, stderr, bufsize,
500
extra=None, **kwargs):
501
"""Create subprocess transport."""
502
raise NotImplementedError
503
504
def _write_to_self(self):
505
"""Write a byte to self-pipe, to wake up the event loop.
506
507
This may be called from a different thread.
508
509
The subclass is responsible for implementing the self-pipe.
510
"""
511
raise NotImplementedError
512
513
def _process_events(self, event_list):
514
"""Process selector events."""
515
raise NotImplementedError
516
517
def _check_closed(self):
518
if self._closed:
519
raise RuntimeError('Event loop is closed')
520
521
def _check_default_executor(self):
522
if self._executor_shutdown_called:
523
raise RuntimeError('Executor shutdown has been called')
524
525
def _asyncgen_finalizer_hook(self, agen):
526
self._asyncgens.discard(agen)
527
if not self.is_closed():
528
self.call_soon_threadsafe(self.create_task, agen.aclose())
529
530
def _asyncgen_firstiter_hook(self, agen):
531
if self._asyncgens_shutdown_called:
532
warnings.warn(
533
f"asynchronous generator {agen!r} was scheduled after "
534
f"loop.shutdown_asyncgens() call",
535
ResourceWarning, source=self)
536
537
self._asyncgens.add(agen)
538
539
async def shutdown_asyncgens(self):
540
"""Shutdown all active asynchronous generators."""
541
self._asyncgens_shutdown_called = True
542
543
if not len(self._asyncgens):
544
# If Python version is <3.6 or we don't have any asynchronous
545
# generators alive.
546
return
547
548
closing_agens = list(self._asyncgens)
549
self._asyncgens.clear()
550
551
results = await tasks.gather(
552
*[ag.aclose() for ag in closing_agens],
553
return_exceptions=True)
554
555
for result, agen in zip(results, closing_agens):
556
if isinstance(result, Exception):
557
self.call_exception_handler({
558
'message': f'an error occurred during closing of '
559
f'asynchronous generator {agen!r}',
560
'exception': result,
561
'asyncgen': agen
562
})
563
564
async def shutdown_default_executor(self, timeout=None):
565
"""Schedule the shutdown of the default executor.
566
567
The timeout parameter specifies the amount of time the executor will
568
be given to finish joining. The default value is None, which means
569
that the executor will be given an unlimited amount of time.
570
"""
571
self._executor_shutdown_called = True
572
if self._default_executor is None:
573
return
574
future = self.create_future()
575
thread = threading.Thread(target=self._do_shutdown, args=(future,))
576
thread.start()
577
try:
578
await future
579
finally:
580
thread.join(timeout)
581
582
if thread.is_alive():
583
warnings.warn("The executor did not finishing joining "
584
f"its threads within {timeout} seconds.",
585
RuntimeWarning, stacklevel=2)
586
self._default_executor.shutdown(wait=False)
587
588
def _do_shutdown(self, future):
589
try:
590
self._default_executor.shutdown(wait=True)
591
if not self.is_closed():
592
self.call_soon_threadsafe(future.set_result, None)
593
except Exception as ex:
594
if not self.is_closed():
595
self.call_soon_threadsafe(future.set_exception, ex)
596
597
def _check_running(self):
598
if self.is_running():
599
raise RuntimeError('This event loop is already running')
600
if events._get_running_loop() is not None:
601
raise RuntimeError(
602
'Cannot run the event loop while another loop is running')
603
604
def run_forever(self):
605
"""Run until stop() is called."""
606
self._check_closed()
607
self._check_running()
608
self._set_coroutine_origin_tracking(self._debug)
609
610
old_agen_hooks = sys.get_asyncgen_hooks()
611
try:
612
self._thread_id = threading.get_ident()
613
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
614
finalizer=self._asyncgen_finalizer_hook)
615
616
events._set_running_loop(self)
617
while True:
618
self._run_once()
619
if self._stopping:
620
break
621
finally:
622
self._stopping = False
623
self._thread_id = None
624
events._set_running_loop(None)
625
self._set_coroutine_origin_tracking(False)
626
sys.set_asyncgen_hooks(*old_agen_hooks)
627
628
def run_until_complete(self, future):
629
"""Run until the Future is done.
630
631
If the argument is a coroutine, it is wrapped in a Task.
632
633
WARNING: It would be disastrous to call run_until_complete()
634
with the same coroutine twice -- it would wrap it in two
635
different Tasks and that can't be good.
636
637
Return the Future's result, or raise its exception.
638
"""
639
self._check_closed()
640
self._check_running()
641
642
new_task = not futures.isfuture(future)
643
future = tasks.ensure_future(future, loop=self)
644
if new_task:
645
# An exception is raised if the future didn't complete, so there
646
# is no need to log the "destroy pending task" message
647
future._log_destroy_pending = False
648
649
future.add_done_callback(_run_until_complete_cb)
650
try:
651
self.run_forever()
652
except:
653
if new_task and future.done() and not future.cancelled():
654
# The coroutine raised a BaseException. Consume the exception
655
# to not log a warning, the caller doesn't have access to the
656
# local task.
657
future.exception()
658
raise
659
finally:
660
future.remove_done_callback(_run_until_complete_cb)
661
if not future.done():
662
raise RuntimeError('Event loop stopped before Future completed.')
663
664
return future.result()
665
666
def stop(self):
667
"""Stop running the event loop.
668
669
Every callback already scheduled will still run. This simply informs
670
run_forever to stop looping after a complete iteration.
671
"""
672
self._stopping = True
673
674
def close(self):
675
"""Close the event loop.
676
677
This clears the queues and shuts down the executor,
678
but does not wait for the executor to finish.
679
680
The event loop must not be running.
681
"""
682
if self.is_running():
683
raise RuntimeError("Cannot close a running event loop")
684
if self._closed:
685
return
686
if self._debug:
687
logger.debug("Close %r", self)
688
self._closed = True
689
self._ready.clear()
690
self._scheduled.clear()
691
self._executor_shutdown_called = True
692
executor = self._default_executor
693
if executor is not None:
694
self._default_executor = None
695
executor.shutdown(wait=False)
696
697
def is_closed(self):
698
"""Returns True if the event loop was closed."""
699
return self._closed
700
701
def __del__(self, _warn=warnings.warn):
702
if not self.is_closed():
703
_warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
704
if not self.is_running():
705
self.close()
706
707
def is_running(self):
708
"""Returns True if the event loop is running."""
709
return (self._thread_id is not None)
710
711
def time(self):
712
"""Return the time according to the event loop's clock.
713
714
This is a float expressed in seconds since an epoch, but the
715
epoch, precision, accuracy and drift are unspecified and may
716
differ per event loop.
717
"""
718
return time.monotonic()
719
720
def call_later(self, delay, callback, *args, context=None):
721
"""Arrange for a callback to be called at a given time.
722
723
Return a Handle: an opaque object with a cancel() method that
724
can be used to cancel the call.
725
726
The delay can be an int or float, expressed in seconds. It is
727
always relative to the current time.
728
729
Each callback will be called exactly once. If two callbacks
730
are scheduled for exactly the same time, it undefined which
731
will be called first.
732
733
Any positional arguments after the callback will be passed to
734
the callback when it is called.
735
"""
736
if delay is None:
737
raise TypeError('delay must not be None')
738
timer = self.call_at(self.time() + delay, callback, *args,
739
context=context)
740
if timer._source_traceback:
741
del timer._source_traceback[-1]
742
return timer
743
744
def call_at(self, when, callback, *args, context=None):
745
"""Like call_later(), but uses an absolute time.
746
747
Absolute time corresponds to the event loop's time() method.
748
"""
749
if when is None:
750
raise TypeError("when cannot be None")
751
self._check_closed()
752
if self._debug:
753
self._check_thread()
754
self._check_callback(callback, 'call_at')
755
timer = events.TimerHandle(when, callback, args, self, context)
756
if timer._source_traceback:
757
del timer._source_traceback[-1]
758
heapq.heappush(self._scheduled, timer)
759
timer._scheduled = True
760
return timer
761
762
def call_soon(self, callback, *args, context=None):
763
"""Arrange for a callback to be called as soon as possible.
764
765
This operates as a FIFO queue: callbacks are called in the
766
order in which they are registered. Each callback will be
767
called exactly once.
768
769
Any positional arguments after the callback will be passed to
770
the callback when it is called.
771
"""
772
self._check_closed()
773
if self._debug:
774
self._check_thread()
775
self._check_callback(callback, 'call_soon')
776
handle = self._call_soon(callback, args, context)
777
if handle._source_traceback:
778
del handle._source_traceback[-1]
779
return handle
780
781
def _check_callback(self, callback, method):
782
if (coroutines.iscoroutine(callback) or
783
coroutines.iscoroutinefunction(callback)):
784
raise TypeError(
785
f"coroutines cannot be used with {method}()")
786
if not callable(callback):
787
raise TypeError(
788
f'a callable object was expected by {method}(), '
789
f'got {callback!r}')
790
791
def _call_soon(self, callback, args, context):
792
handle = events.Handle(callback, args, self, context)
793
if handle._source_traceback:
794
del handle._source_traceback[-1]
795
self._ready.append(handle)
796
return handle
797
798
def _check_thread(self):
799
"""Check that the current thread is the thread running the event loop.
800
801
Non-thread-safe methods of this class make this assumption and will
802
likely behave incorrectly when the assumption is violated.
803
804
Should only be called when (self._debug == True). The caller is
805
responsible for checking this condition for performance reasons.
806
"""
807
if self._thread_id is None:
808
return
809
thread_id = threading.get_ident()
810
if thread_id != self._thread_id:
811
raise RuntimeError(
812
"Non-thread-safe operation invoked on an event loop other "
813
"than the current one")
814
815
def call_soon_threadsafe(self, callback, *args, context=None):
816
"""Like call_soon(), but thread-safe."""
817
self._check_closed()
818
if self._debug:
819
self._check_callback(callback, 'call_soon_threadsafe')
820
handle = self._call_soon(callback, args, context)
821
if handle._source_traceback:
822
del handle._source_traceback[-1]
823
self._write_to_self()
824
return handle
825
826
def run_in_executor(self, executor, func, *args):
827
self._check_closed()
828
if self._debug:
829
self._check_callback(func, 'run_in_executor')
830
if executor is None:
831
executor = self._default_executor
832
# Only check when the default executor is being used
833
self._check_default_executor()
834
if executor is None:
835
executor = concurrent.futures.ThreadPoolExecutor(
836
thread_name_prefix='asyncio'
837
)
838
self._default_executor = executor
839
return futures.wrap_future(
840
executor.submit(func, *args), loop=self)
841
842
def set_default_executor(self, executor):
843
if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
844
raise TypeError('executor must be ThreadPoolExecutor instance')
845
self._default_executor = executor
846
847
def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
848
msg = [f"{host}:{port!r}"]
849
if family:
850
msg.append(f'family={family!r}')
851
if type:
852
msg.append(f'type={type!r}')
853
if proto:
854
msg.append(f'proto={proto!r}')
855
if flags:
856
msg.append(f'flags={flags!r}')
857
msg = ', '.join(msg)
858
logger.debug('Get address info %s', msg)
859
860
t0 = self.time()
861
addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
862
dt = self.time() - t0
863
864
msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
865
if dt >= self.slow_callback_duration:
866
logger.info(msg)
867
else:
868
logger.debug(msg)
869
return addrinfo
870
871
async def getaddrinfo(self, host, port, *,
872
family=0, type=0, proto=0, flags=0):
873
if self._debug:
874
getaddr_func = self._getaddrinfo_debug
875
else:
876
getaddr_func = socket.getaddrinfo
877
878
return await self.run_in_executor(
879
None, getaddr_func, host, port, family, type, proto, flags)
880
881
async def getnameinfo(self, sockaddr, flags=0):
882
return await self.run_in_executor(
883
None, socket.getnameinfo, sockaddr, flags)
884
885
async def sock_sendfile(self, sock, file, offset=0, count=None,
886
*, fallback=True):
887
if self._debug and sock.gettimeout() != 0:
888
raise ValueError("the socket must be non-blocking")
889
_check_ssl_socket(sock)
890
self._check_sendfile_params(sock, file, offset, count)
891
try:
892
return await self._sock_sendfile_native(sock, file,
893
offset, count)
894
except exceptions.SendfileNotAvailableError as exc:
895
if not fallback:
896
raise
897
return await self._sock_sendfile_fallback(sock, file,
898
offset, count)
899
900
async def _sock_sendfile_native(self, sock, file, offset, count):
901
# NB: sendfile syscall is not supported for SSL sockets and
902
# non-mmap files even if sendfile is supported by OS
903
raise exceptions.SendfileNotAvailableError(
904
f"syscall sendfile is not available for socket {sock!r} "
905
f"and file {file!r} combination")
906
907
async def _sock_sendfile_fallback(self, sock, file, offset, count):
908
if offset:
909
file.seek(offset)
910
blocksize = (
911
min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
912
if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
913
)
914
buf = bytearray(blocksize)
915
total_sent = 0
916
try:
917
while True:
918
if count:
919
blocksize = min(count - total_sent, blocksize)
920
if blocksize <= 0:
921
break
922
view = memoryview(buf)[:blocksize]
923
read = await self.run_in_executor(None, file.readinto, view)
924
if not read:
925
break # EOF
926
await self.sock_sendall(sock, view[:read])
927
total_sent += read
928
return total_sent
929
finally:
930
if total_sent > 0 and hasattr(file, 'seek'):
931
file.seek(offset + total_sent)
932
933
def _check_sendfile_params(self, sock, file, offset, count):
934
if 'b' not in getattr(file, 'mode', 'b'):
935
raise ValueError("file should be opened in binary mode")
936
if not sock.type == socket.SOCK_STREAM:
937
raise ValueError("only SOCK_STREAM type sockets are supported")
938
if count is not None:
939
if not isinstance(count, int):
940
raise TypeError(
941
"count must be a positive integer (got {!r})".format(count))
942
if count <= 0:
943
raise ValueError(
944
"count must be a positive integer (got {!r})".format(count))
945
if not isinstance(offset, int):
946
raise TypeError(
947
"offset must be a non-negative integer (got {!r})".format(
948
offset))
949
if offset < 0:
950
raise ValueError(
951
"offset must be a non-negative integer (got {!r})".format(
952
offset))
953
954
async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
955
"""Create, bind and connect one socket."""
956
my_exceptions = []
957
exceptions.append(my_exceptions)
958
family, type_, proto, _, address = addr_info
959
sock = None
960
try:
961
sock = socket.socket(family=family, type=type_, proto=proto)
962
sock.setblocking(False)
963
if local_addr_infos is not None:
964
for lfamily, _, _, _, laddr in local_addr_infos:
965
# skip local addresses of different family
966
if lfamily != family:
967
continue
968
try:
969
sock.bind(laddr)
970
break
971
except OSError as exc:
972
msg = (
973
f'error while attempting to bind on '
974
f'address {laddr!r}: '
975
f'{exc.strerror.lower()}'
976
)
977
exc = OSError(exc.errno, msg)
978
my_exceptions.append(exc)
979
else: # all bind attempts failed
980
if my_exceptions:
981
raise my_exceptions.pop()
982
else:
983
raise OSError(f"no matching local address with {family=} found")
984
await self.sock_connect(sock, address)
985
return sock
986
except OSError as exc:
987
my_exceptions.append(exc)
988
if sock is not None:
989
sock.close()
990
raise
991
except:
992
if sock is not None:
993
sock.close()
994
raise
995
finally:
996
exceptions = my_exceptions = None
997
998
async def create_connection(
999
self, protocol_factory, host=None, port=None,
1000
*, ssl=None, family=0,
1001
proto=0, flags=0, sock=None,
1002
local_addr=None, server_hostname=None,
1003
ssl_handshake_timeout=None,
1004
ssl_shutdown_timeout=None,
1005
happy_eyeballs_delay=None, interleave=None,
1006
all_errors=False):
1007
"""Connect to a TCP server.
1008
1009
Create a streaming transport connection to a given internet host and
1010
port: socket family AF_INET or socket.AF_INET6 depending on host (or
1011
family if specified), socket type SOCK_STREAM. protocol_factory must be
1012
a callable returning a protocol instance.
1013
1014
This method is a coroutine which will try to establish the connection
1015
in the background. When successful, the coroutine returns a
1016
(transport, protocol) pair.
1017
"""
1018
if server_hostname is not None and not ssl:
1019
raise ValueError('server_hostname is only meaningful with ssl')
1020
1021
if server_hostname is None and ssl:
1022
# Use host as default for server_hostname. It is an error
1023
# if host is empty or not set, e.g. when an
1024
# already-connected socket was passed or when only a port
1025
# is given. To avoid this error, you can pass
1026
# server_hostname='' -- this will bypass the hostname
1027
# check. (This also means that if host is a numeric
1028
# IP/IPv6 address, we will attempt to verify that exact
1029
# address; this will probably fail, but it is possible to
1030
# create a certificate for a specific IP address, so we
1031
# don't judge it here.)
1032
if not host:
1033
raise ValueError('You must set server_hostname '
1034
'when using ssl without a host')
1035
server_hostname = host
1036
1037
if ssl_handshake_timeout is not None and not ssl:
1038
raise ValueError(
1039
'ssl_handshake_timeout is only meaningful with ssl')
1040
1041
if ssl_shutdown_timeout is not None and not ssl:
1042
raise ValueError(
1043
'ssl_shutdown_timeout is only meaningful with ssl')
1044
1045
if sock is not None:
1046
_check_ssl_socket(sock)
1047
1048
if happy_eyeballs_delay is not None and interleave is None:
1049
# If using happy eyeballs, default to interleave addresses by family
1050
interleave = 1
1051
1052
if host is not None or port is not None:
1053
if sock is not None:
1054
raise ValueError(
1055
'host/port and sock can not be specified at the same time')
1056
1057
infos = await self._ensure_resolved(
1058
(host, port), family=family,
1059
type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
1060
if not infos:
1061
raise OSError('getaddrinfo() returned empty list')
1062
1063
if local_addr is not None:
1064
laddr_infos = await self._ensure_resolved(
1065
local_addr, family=family,
1066
type=socket.SOCK_STREAM, proto=proto,
1067
flags=flags, loop=self)
1068
if not laddr_infos:
1069
raise OSError('getaddrinfo() returned empty list')
1070
else:
1071
laddr_infos = None
1072
1073
if interleave:
1074
infos = _interleave_addrinfos(infos, interleave)
1075
1076
exceptions = []
1077
if happy_eyeballs_delay is None:
1078
# not using happy eyeballs
1079
for addrinfo in infos:
1080
try:
1081
sock = await self._connect_sock(
1082
exceptions, addrinfo, laddr_infos)
1083
break
1084
except OSError:
1085
continue
1086
else: # using happy eyeballs
1087
sock, _, _ = await staggered.staggered_race(
1088
(functools.partial(self._connect_sock,
1089
exceptions, addrinfo, laddr_infos)
1090
for addrinfo in infos),
1091
happy_eyeballs_delay, loop=self)
1092
1093
if sock is None:
1094
exceptions = [exc for sub in exceptions for exc in sub]
1095
try:
1096
if all_errors:
1097
raise ExceptionGroup("create_connection failed", exceptions)
1098
if len(exceptions) == 1:
1099
raise exceptions[0]
1100
else:
1101
# If they all have the same str(), raise one.
1102
model = str(exceptions[0])
1103
if all(str(exc) == model for exc in exceptions):
1104
raise exceptions[0]
1105
# Raise a combined exception so the user can see all
1106
# the various error messages.
1107
raise OSError('Multiple exceptions: {}'.format(
1108
', '.join(str(exc) for exc in exceptions)))
1109
finally:
1110
exceptions = None
1111
1112
else:
1113
if sock is None:
1114
raise ValueError(
1115
'host and port was not specified and no sock specified')
1116
if sock.type != socket.SOCK_STREAM:
1117
# We allow AF_INET, AF_INET6, AF_UNIX as long as they
1118
# are SOCK_STREAM.
1119
# We support passing AF_UNIX sockets even though we have
1120
# a dedicated API for that: create_unix_connection.
1121
# Disallowing AF_UNIX in this method, breaks backwards
1122
# compatibility.
1123
raise ValueError(
1124
f'A Stream Socket was expected, got {sock!r}')
1125
1126
transport, protocol = await self._create_connection_transport(
1127
sock, protocol_factory, ssl, server_hostname,
1128
ssl_handshake_timeout=ssl_handshake_timeout,
1129
ssl_shutdown_timeout=ssl_shutdown_timeout)
1130
if self._debug:
1131
# Get the socket from the transport because SSL transport closes
1132
# the old socket and creates a new SSL socket
1133
sock = transport.get_extra_info('socket')
1134
logger.debug("%r connected to %s:%r: (%r, %r)",
1135
sock, host, port, transport, protocol)
1136
return transport, protocol
1137
1138
async def _create_connection_transport(
1139
self, sock, protocol_factory, ssl,
1140
server_hostname, server_side=False,
1141
ssl_handshake_timeout=None,
1142
ssl_shutdown_timeout=None):
1143
1144
sock.setblocking(False)
1145
1146
protocol = protocol_factory()
1147
waiter = self.create_future()
1148
if ssl:
1149
sslcontext = None if isinstance(ssl, bool) else ssl
1150
transport = self._make_ssl_transport(
1151
sock, protocol, sslcontext, waiter,
1152
server_side=server_side, server_hostname=server_hostname,
1153
ssl_handshake_timeout=ssl_handshake_timeout,
1154
ssl_shutdown_timeout=ssl_shutdown_timeout)
1155
else:
1156
transport = self._make_socket_transport(sock, protocol, waiter)
1157
1158
try:
1159
await waiter
1160
except:
1161
transport.close()
1162
raise
1163
1164
return transport, protocol
1165
1166
async def sendfile(self, transport, file, offset=0, count=None,
1167
*, fallback=True):
1168
"""Send a file to transport.
1169
1170
Return the total number of bytes which were sent.
1171
1172
The method uses high-performance os.sendfile if available.
1173
1174
file must be a regular file object opened in binary mode.
1175
1176
offset tells from where to start reading the file. If specified,
1177
count is the total number of bytes to transmit as opposed to
1178
sending the file until EOF is reached. File position is updated on
1179
return or also in case of error in which case file.tell()
1180
can be used to figure out the number of bytes
1181
which were sent.
1182
1183
fallback set to True makes asyncio to manually read and send
1184
the file when the platform does not support the sendfile syscall
1185
(e.g. Windows or SSL socket on Unix).
1186
1187
Raise SendfileNotAvailableError if the system does not support
1188
sendfile syscall and fallback is False.
1189
"""
1190
if transport.is_closing():
1191
raise RuntimeError("Transport is closing")
1192
mode = getattr(transport, '_sendfile_compatible',
1193
constants._SendfileMode.UNSUPPORTED)
1194
if mode is constants._SendfileMode.UNSUPPORTED:
1195
raise RuntimeError(
1196
f"sendfile is not supported for transport {transport!r}")
1197
if mode is constants._SendfileMode.TRY_NATIVE:
1198
try:
1199
return await self._sendfile_native(transport, file,
1200
offset, count)
1201
except exceptions.SendfileNotAvailableError as exc:
1202
if not fallback:
1203
raise
1204
1205
if not fallback:
1206
raise RuntimeError(
1207
f"fallback is disabled and native sendfile is not "
1208
f"supported for transport {transport!r}")
1209
1210
return await self._sendfile_fallback(transport, file,
1211
offset, count)
1212
1213
async def _sendfile_native(self, transp, file, offset, count):
1214
raise exceptions.SendfileNotAvailableError(
1215
"sendfile syscall is not supported")
1216
1217
async def _sendfile_fallback(self, transp, file, offset, count):
1218
if offset:
1219
file.seek(offset)
1220
blocksize = min(count, 16384) if count else 16384
1221
buf = bytearray(blocksize)
1222
total_sent = 0
1223
proto = _SendfileFallbackProtocol(transp)
1224
try:
1225
while True:
1226
if count:
1227
blocksize = min(count - total_sent, blocksize)
1228
if blocksize <= 0:
1229
return total_sent
1230
view = memoryview(buf)[:blocksize]
1231
read = await self.run_in_executor(None, file.readinto, view)
1232
if not read:
1233
return total_sent # EOF
1234
await proto.drain()
1235
transp.write(view[:read])
1236
total_sent += read
1237
finally:
1238
if total_sent > 0 and hasattr(file, 'seek'):
1239
file.seek(offset + total_sent)
1240
await proto.restore()
1241
1242
async def start_tls(self, transport, protocol, sslcontext, *,
1243
server_side=False,
1244
server_hostname=None,
1245
ssl_handshake_timeout=None,
1246
ssl_shutdown_timeout=None):
1247
"""Upgrade transport to TLS.
1248
1249
Return a new transport that *protocol* should start using
1250
immediately.
1251
"""
1252
if ssl is None:
1253
raise RuntimeError('Python ssl module is not available')
1254
1255
if not isinstance(sslcontext, ssl.SSLContext):
1256
raise TypeError(
1257
f'sslcontext is expected to be an instance of ssl.SSLContext, '
1258
f'got {sslcontext!r}')
1259
1260
if not getattr(transport, '_start_tls_compatible', False):
1261
raise TypeError(
1262
f'transport {transport!r} is not supported by start_tls()')
1263
1264
waiter = self.create_future()
1265
ssl_protocol = sslproto.SSLProtocol(
1266
self, protocol, sslcontext, waiter,
1267
server_side, server_hostname,
1268
ssl_handshake_timeout=ssl_handshake_timeout,
1269
ssl_shutdown_timeout=ssl_shutdown_timeout,
1270
call_connection_made=False)
1271
1272
# Pause early so that "ssl_protocol.data_received()" doesn't
1273
# have a chance to get called before "ssl_protocol.connection_made()".
1274
transport.pause_reading()
1275
1276
transport.set_protocol(ssl_protocol)
1277
conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1278
resume_cb = self.call_soon(transport.resume_reading)
1279
1280
try:
1281
await waiter
1282
except BaseException:
1283
transport.close()
1284
conmade_cb.cancel()
1285
resume_cb.cancel()
1286
raise
1287
1288
return ssl_protocol._app_transport
1289
1290
async def create_datagram_endpoint(self, protocol_factory,
1291
local_addr=None, remote_addr=None, *,
1292
family=0, proto=0, flags=0,
1293
reuse_port=None,
1294
allow_broadcast=None, sock=None):
1295
"""Create datagram connection."""
1296
if sock is not None:
1297
if sock.type != socket.SOCK_DGRAM:
1298
raise ValueError(
1299
f'A UDP Socket was expected, got {sock!r}')
1300
if (local_addr or remote_addr or
1301
family or proto or flags or
1302
reuse_port or allow_broadcast):
1303
# show the problematic kwargs in exception msg
1304
opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1305
family=family, proto=proto, flags=flags,
1306
reuse_port=reuse_port,
1307
allow_broadcast=allow_broadcast)
1308
problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1309
raise ValueError(
1310
f'socket modifier keyword arguments can not be used '
1311
f'when sock is specified. ({problems})')
1312
sock.setblocking(False)
1313
r_addr = None
1314
else:
1315
if not (local_addr or remote_addr):
1316
if family == 0:
1317
raise ValueError('unexpected address family')
1318
addr_pairs_info = (((family, proto), (None, None)),)
1319
elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1320
for addr in (local_addr, remote_addr):
1321
if addr is not None and not isinstance(addr, str):
1322
raise TypeError('string is expected')
1323
1324
if local_addr and local_addr[0] not in (0, '\x00'):
1325
try:
1326
if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1327
os.remove(local_addr)
1328
except FileNotFoundError:
1329
pass
1330
except OSError as err:
1331
# Directory may have permissions only to create socket.
1332
logger.error('Unable to check or remove stale UNIX '
1333
'socket %r: %r',
1334
local_addr, err)
1335
1336
addr_pairs_info = (((family, proto),
1337
(local_addr, remote_addr)), )
1338
else:
1339
# join address by (family, protocol)
1340
addr_infos = {} # Using order preserving dict
1341
for idx, addr in ((0, local_addr), (1, remote_addr)):
1342
if addr is not None:
1343
if not (isinstance(addr, tuple) and len(addr) == 2):
1344
raise TypeError('2-tuple is expected')
1345
1346
infos = await self._ensure_resolved(
1347
addr, family=family, type=socket.SOCK_DGRAM,
1348
proto=proto, flags=flags, loop=self)
1349
if not infos:
1350
raise OSError('getaddrinfo() returned empty list')
1351
1352
for fam, _, pro, _, address in infos:
1353
key = (fam, pro)
1354
if key not in addr_infos:
1355
addr_infos[key] = [None, None]
1356
addr_infos[key][idx] = address
1357
1358
# each addr has to have info for each (family, proto) pair
1359
addr_pairs_info = [
1360
(key, addr_pair) for key, addr_pair in addr_infos.items()
1361
if not ((local_addr and addr_pair[0] is None) or
1362
(remote_addr and addr_pair[1] is None))]
1363
1364
if not addr_pairs_info:
1365
raise ValueError('can not get address information')
1366
1367
exceptions = []
1368
1369
for ((family, proto),
1370
(local_address, remote_address)) in addr_pairs_info:
1371
sock = None
1372
r_addr = None
1373
try:
1374
sock = socket.socket(
1375
family=family, type=socket.SOCK_DGRAM, proto=proto)
1376
if reuse_port:
1377
_set_reuseport(sock)
1378
if allow_broadcast:
1379
sock.setsockopt(
1380
socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1381
sock.setblocking(False)
1382
1383
if local_addr:
1384
sock.bind(local_address)
1385
if remote_addr:
1386
if not allow_broadcast:
1387
await self.sock_connect(sock, remote_address)
1388
r_addr = remote_address
1389
except OSError as exc:
1390
if sock is not None:
1391
sock.close()
1392
exceptions.append(exc)
1393
except:
1394
if sock is not None:
1395
sock.close()
1396
raise
1397
else:
1398
break
1399
else:
1400
raise exceptions[0]
1401
1402
protocol = protocol_factory()
1403
waiter = self.create_future()
1404
transport = self._make_datagram_transport(
1405
sock, protocol, r_addr, waiter)
1406
if self._debug:
1407
if local_addr:
1408
logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1409
"created: (%r, %r)",
1410
local_addr, remote_addr, transport, protocol)
1411
else:
1412
logger.debug("Datagram endpoint remote_addr=%r created: "
1413
"(%r, %r)",
1414
remote_addr, transport, protocol)
1415
1416
try:
1417
await waiter
1418
except:
1419
transport.close()
1420
raise
1421
1422
return transport, protocol
1423
1424
async def _ensure_resolved(self, address, *,
1425
family=0, type=socket.SOCK_STREAM,
1426
proto=0, flags=0, loop):
1427
host, port = address[:2]
1428
info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1429
if info is not None:
1430
# "host" is already a resolved IP.
1431
return [info]
1432
else:
1433
return await loop.getaddrinfo(host, port, family=family, type=type,
1434
proto=proto, flags=flags)
1435
1436
async def _create_server_getaddrinfo(self, host, port, family, flags):
1437
infos = await self._ensure_resolved((host, port), family=family,
1438
type=socket.SOCK_STREAM,
1439
flags=flags, loop=self)
1440
if not infos:
1441
raise OSError(f'getaddrinfo({host!r}) returned empty list')
1442
return infos
1443
1444
async def create_server(
1445
self, protocol_factory, host=None, port=None,
1446
*,
1447
family=socket.AF_UNSPEC,
1448
flags=socket.AI_PASSIVE,
1449
sock=None,
1450
backlog=100,
1451
ssl=None,
1452
reuse_address=None,
1453
reuse_port=None,
1454
ssl_handshake_timeout=None,
1455
ssl_shutdown_timeout=None,
1456
start_serving=True):
1457
"""Create a TCP server.
1458
1459
The host parameter can be a string, in that case the TCP server is
1460
bound to host and port.
1461
1462
The host parameter can also be a sequence of strings and in that case
1463
the TCP server is bound to all hosts of the sequence. If a host
1464
appears multiple times (possibly indirectly e.g. when hostnames
1465
resolve to the same IP address), the server is only bound once to that
1466
host.
1467
1468
Return a Server object which can be used to stop the service.
1469
1470
This method is a coroutine.
1471
"""
1472
if isinstance(ssl, bool):
1473
raise TypeError('ssl argument must be an SSLContext or None')
1474
1475
if ssl_handshake_timeout is not None and ssl is None:
1476
raise ValueError(
1477
'ssl_handshake_timeout is only meaningful with ssl')
1478
1479
if ssl_shutdown_timeout is not None and ssl is None:
1480
raise ValueError(
1481
'ssl_shutdown_timeout is only meaningful with ssl')
1482
1483
if sock is not None:
1484
_check_ssl_socket(sock)
1485
1486
if host is not None or port is not None:
1487
if sock is not None:
1488
raise ValueError(
1489
'host/port and sock can not be specified at the same time')
1490
1491
if reuse_address is None:
1492
reuse_address = os.name == "posix" and sys.platform != "cygwin"
1493
sockets = []
1494
if host == '':
1495
hosts = [None]
1496
elif (isinstance(host, str) or
1497
not isinstance(host, collections.abc.Iterable)):
1498
hosts = [host]
1499
else:
1500
hosts = host
1501
1502
fs = [self._create_server_getaddrinfo(host, port, family=family,
1503
flags=flags)
1504
for host in hosts]
1505
infos = await tasks.gather(*fs)
1506
infos = set(itertools.chain.from_iterable(infos))
1507
1508
completed = False
1509
try:
1510
for res in infos:
1511
af, socktype, proto, canonname, sa = res
1512
try:
1513
sock = socket.socket(af, socktype, proto)
1514
except socket.error:
1515
# Assume it's a bad family/type/protocol combination.
1516
if self._debug:
1517
logger.warning('create_server() failed to create '
1518
'socket.socket(%r, %r, %r)',
1519
af, socktype, proto, exc_info=True)
1520
continue
1521
sockets.append(sock)
1522
if reuse_address:
1523
sock.setsockopt(
1524
socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1525
if reuse_port:
1526
_set_reuseport(sock)
1527
# Disable IPv4/IPv6 dual stack support (enabled by
1528
# default on Linux) which makes a single socket
1529
# listen on both address families.
1530
if (_HAS_IPv6 and
1531
af == socket.AF_INET6 and
1532
hasattr(socket, 'IPPROTO_IPV6')):
1533
sock.setsockopt(socket.IPPROTO_IPV6,
1534
socket.IPV6_V6ONLY,
1535
True)
1536
try:
1537
sock.bind(sa)
1538
except OSError as err:
1539
raise OSError(err.errno, 'error while attempting '
1540
'to bind on address %r: %s'
1541
% (sa, err.strerror.lower())) from None
1542
completed = True
1543
finally:
1544
if not completed:
1545
for sock in sockets:
1546
sock.close()
1547
else:
1548
if sock is None:
1549
raise ValueError('Neither host/port nor sock were specified')
1550
if sock.type != socket.SOCK_STREAM:
1551
raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1552
sockets = [sock]
1553
1554
for sock in sockets:
1555
sock.setblocking(False)
1556
1557
server = Server(self, sockets, protocol_factory,
1558
ssl, backlog, ssl_handshake_timeout,
1559
ssl_shutdown_timeout)
1560
if start_serving:
1561
server._start_serving()
1562
# Skip one loop iteration so that all 'loop.add_reader'
1563
# go through.
1564
await tasks.sleep(0)
1565
1566
if self._debug:
1567
logger.info("%r is serving", server)
1568
return server
1569
1570
async def connect_accepted_socket(
1571
self, protocol_factory, sock,
1572
*, ssl=None,
1573
ssl_handshake_timeout=None,
1574
ssl_shutdown_timeout=None):
1575
if sock.type != socket.SOCK_STREAM:
1576
raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1577
1578
if ssl_handshake_timeout is not None and not ssl:
1579
raise ValueError(
1580
'ssl_handshake_timeout is only meaningful with ssl')
1581
1582
if ssl_shutdown_timeout is not None and not ssl:
1583
raise ValueError(
1584
'ssl_shutdown_timeout is only meaningful with ssl')
1585
1586
if sock is not None:
1587
_check_ssl_socket(sock)
1588
1589
transport, protocol = await self._create_connection_transport(
1590
sock, protocol_factory, ssl, '', server_side=True,
1591
ssl_handshake_timeout=ssl_handshake_timeout,
1592
ssl_shutdown_timeout=ssl_shutdown_timeout)
1593
if self._debug:
1594
# Get the socket from the transport because SSL transport closes
1595
# the old socket and creates a new SSL socket
1596
sock = transport.get_extra_info('socket')
1597
logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1598
return transport, protocol
1599
1600
async def connect_read_pipe(self, protocol_factory, pipe):
1601
protocol = protocol_factory()
1602
waiter = self.create_future()
1603
transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1604
1605
try:
1606
await waiter
1607
except:
1608
transport.close()
1609
raise
1610
1611
if self._debug:
1612
logger.debug('Read pipe %r connected: (%r, %r)',
1613
pipe.fileno(), transport, protocol)
1614
return transport, protocol
1615
1616
async def connect_write_pipe(self, protocol_factory, pipe):
1617
protocol = protocol_factory()
1618
waiter = self.create_future()
1619
transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1620
1621
try:
1622
await waiter
1623
except:
1624
transport.close()
1625
raise
1626
1627
if self._debug:
1628
logger.debug('Write pipe %r connected: (%r, %r)',
1629
pipe.fileno(), transport, protocol)
1630
return transport, protocol
1631
1632
def _log_subprocess(self, msg, stdin, stdout, stderr):
1633
info = [msg]
1634
if stdin is not None:
1635
info.append(f'stdin={_format_pipe(stdin)}')
1636
if stdout is not None and stderr == subprocess.STDOUT:
1637
info.append(f'stdout=stderr={_format_pipe(stdout)}')
1638
else:
1639
if stdout is not None:
1640
info.append(f'stdout={_format_pipe(stdout)}')
1641
if stderr is not None:
1642
info.append(f'stderr={_format_pipe(stderr)}')
1643
logger.debug(' '.join(info))
1644
1645
async def subprocess_shell(self, protocol_factory, cmd, *,
1646
stdin=subprocess.PIPE,
1647
stdout=subprocess.PIPE,
1648
stderr=subprocess.PIPE,
1649
universal_newlines=False,
1650
shell=True, bufsize=0,
1651
encoding=None, errors=None, text=None,
1652
**kwargs):
1653
if not isinstance(cmd, (bytes, str)):
1654
raise ValueError("cmd must be a string")
1655
if universal_newlines:
1656
raise ValueError("universal_newlines must be False")
1657
if not shell:
1658
raise ValueError("shell must be True")
1659
if bufsize != 0:
1660
raise ValueError("bufsize must be 0")
1661
if text:
1662
raise ValueError("text must be False")
1663
if encoding is not None:
1664
raise ValueError("encoding must be None")
1665
if errors is not None:
1666
raise ValueError("errors must be None")
1667
1668
protocol = protocol_factory()
1669
debug_log = None
1670
if self._debug:
1671
# don't log parameters: they may contain sensitive information
1672
# (password) and may be too long
1673
debug_log = 'run shell command %r' % cmd
1674
self._log_subprocess(debug_log, stdin, stdout, stderr)
1675
transport = await self._make_subprocess_transport(
1676
protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1677
if self._debug and debug_log is not None:
1678
logger.info('%s: %r', debug_log, transport)
1679
return transport, protocol
1680
1681
async def subprocess_exec(self, protocol_factory, program, *args,
1682
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1683
stderr=subprocess.PIPE, universal_newlines=False,
1684
shell=False, bufsize=0,
1685
encoding=None, errors=None, text=None,
1686
**kwargs):
1687
if universal_newlines:
1688
raise ValueError("universal_newlines must be False")
1689
if shell:
1690
raise ValueError("shell must be False")
1691
if bufsize != 0:
1692
raise ValueError("bufsize must be 0")
1693
if text:
1694
raise ValueError("text must be False")
1695
if encoding is not None:
1696
raise ValueError("encoding must be None")
1697
if errors is not None:
1698
raise ValueError("errors must be None")
1699
1700
popen_args = (program,) + args
1701
protocol = protocol_factory()
1702
debug_log = None
1703
if self._debug:
1704
# don't log parameters: they may contain sensitive information
1705
# (password) and may be too long
1706
debug_log = f'execute program {program!r}'
1707
self._log_subprocess(debug_log, stdin, stdout, stderr)
1708
transport = await self._make_subprocess_transport(
1709
protocol, popen_args, False, stdin, stdout, stderr,
1710
bufsize, **kwargs)
1711
if self._debug and debug_log is not None:
1712
logger.info('%s: %r', debug_log, transport)
1713
return transport, protocol
1714
1715
def get_exception_handler(self):
1716
"""Return an exception handler, or None if the default one is in use.
1717
"""
1718
return self._exception_handler
1719
1720
def set_exception_handler(self, handler):
1721
"""Set handler as the new event loop exception handler.
1722
1723
If handler is None, the default exception handler will
1724
be set.
1725
1726
If handler is a callable object, it should have a
1727
signature matching '(loop, context)', where 'loop'
1728
will be a reference to the active event loop, 'context'
1729
will be a dict object (see `call_exception_handler()`
1730
documentation for details about context).
1731
"""
1732
if handler is not None and not callable(handler):
1733
raise TypeError(f'A callable object or None is expected, '
1734
f'got {handler!r}')
1735
self._exception_handler = handler
1736
1737
def default_exception_handler(self, context):
1738
"""Default exception handler.
1739
1740
This is called when an exception occurs and no exception
1741
handler is set, and can be called by a custom exception
1742
handler that wants to defer to the default behavior.
1743
1744
This default handler logs the error message and other
1745
context-dependent information. In debug mode, a truncated
1746
stack trace is also appended showing where the given object
1747
(e.g. a handle or future or task) was created, if any.
1748
1749
The context parameter has the same meaning as in
1750
`call_exception_handler()`.
1751
"""
1752
message = context.get('message')
1753
if not message:
1754
message = 'Unhandled exception in event loop'
1755
1756
exception = context.get('exception')
1757
if exception is not None:
1758
exc_info = (type(exception), exception, exception.__traceback__)
1759
else:
1760
exc_info = False
1761
1762
if ('source_traceback' not in context and
1763
self._current_handle is not None and
1764
self._current_handle._source_traceback):
1765
context['handle_traceback'] = \
1766
self._current_handle._source_traceback
1767
1768
log_lines = [message]
1769
for key in sorted(context):
1770
if key in {'message', 'exception'}:
1771
continue
1772
value = context[key]
1773
if key == 'source_traceback':
1774
tb = ''.join(traceback.format_list(value))
1775
value = 'Object created at (most recent call last):\n'
1776
value += tb.rstrip()
1777
elif key == 'handle_traceback':
1778
tb = ''.join(traceback.format_list(value))
1779
value = 'Handle created at (most recent call last):\n'
1780
value += tb.rstrip()
1781
else:
1782
value = repr(value)
1783
log_lines.append(f'{key}: {value}')
1784
1785
logger.error('\n'.join(log_lines), exc_info=exc_info)
1786
1787
def call_exception_handler(self, context):
1788
"""Call the current event loop's exception handler.
1789
1790
The context argument is a dict containing the following keys:
1791
1792
- 'message': Error message;
1793
- 'exception' (optional): Exception object;
1794
- 'future' (optional): Future instance;
1795
- 'task' (optional): Task instance;
1796
- 'handle' (optional): Handle instance;
1797
- 'protocol' (optional): Protocol instance;
1798
- 'transport' (optional): Transport instance;
1799
- 'socket' (optional): Socket instance;
1800
- 'asyncgen' (optional): Asynchronous generator that caused
1801
the exception.
1802
1803
New keys maybe introduced in the future.
1804
1805
Note: do not overload this method in an event loop subclass.
1806
For custom exception handling, use the
1807
`set_exception_handler()` method.
1808
"""
1809
if self._exception_handler is None:
1810
try:
1811
self.default_exception_handler(context)
1812
except (SystemExit, KeyboardInterrupt):
1813
raise
1814
except BaseException:
1815
# Second protection layer for unexpected errors
1816
# in the default implementation, as well as for subclassed
1817
# event loops with overloaded "default_exception_handler".
1818
logger.error('Exception in default exception handler',
1819
exc_info=True)
1820
else:
1821
try:
1822
ctx = None
1823
thing = context.get("task")
1824
if thing is None:
1825
# Even though Futures don't have a context,
1826
# Task is a subclass of Future,
1827
# and sometimes the 'future' key holds a Task.
1828
thing = context.get("future")
1829
if thing is None:
1830
# Handles also have a context.
1831
thing = context.get("handle")
1832
if thing is not None and hasattr(thing, "get_context"):
1833
ctx = thing.get_context()
1834
if ctx is not None and hasattr(ctx, "run"):
1835
ctx.run(self._exception_handler, self, context)
1836
else:
1837
self._exception_handler(self, context)
1838
except (SystemExit, KeyboardInterrupt):
1839
raise
1840
except BaseException as exc:
1841
# Exception in the user set custom exception handler.
1842
try:
1843
# Let's try default handler.
1844
self.default_exception_handler({
1845
'message': 'Unhandled error in exception handler',
1846
'exception': exc,
1847
'context': context,
1848
})
1849
except (SystemExit, KeyboardInterrupt):
1850
raise
1851
except BaseException:
1852
# Guard 'default_exception_handler' in case it is
1853
# overloaded.
1854
logger.error('Exception in default exception handler '
1855
'while handling an unexpected error '
1856
'in custom exception handler',
1857
exc_info=True)
1858
1859
def _add_callback(self, handle):
1860
"""Add a Handle to _ready."""
1861
if not handle._cancelled:
1862
self._ready.append(handle)
1863
1864
def _add_callback_signalsafe(self, handle):
1865
"""Like _add_callback() but called from a signal handler."""
1866
self._add_callback(handle)
1867
self._write_to_self()
1868
1869
def _timer_handle_cancelled(self, handle):
1870
"""Notification that a TimerHandle has been cancelled."""
1871
if handle._scheduled:
1872
self._timer_cancelled_count += 1
1873
1874
def _run_once(self):
1875
"""Run one full iteration of the event loop.
1876
1877
This calls all currently ready callbacks, polls for I/O,
1878
schedules the resulting callbacks, and finally schedules
1879
'call_later' callbacks.
1880
"""
1881
1882
sched_count = len(self._scheduled)
1883
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1884
self._timer_cancelled_count / sched_count >
1885
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1886
# Remove delayed calls that were cancelled if their number
1887
# is too high
1888
new_scheduled = []
1889
for handle in self._scheduled:
1890
if handle._cancelled:
1891
handle._scheduled = False
1892
else:
1893
new_scheduled.append(handle)
1894
1895
heapq.heapify(new_scheduled)
1896
self._scheduled = new_scheduled
1897
self._timer_cancelled_count = 0
1898
else:
1899
# Remove delayed calls that were cancelled from head of queue.
1900
while self._scheduled and self._scheduled[0]._cancelled:
1901
self._timer_cancelled_count -= 1
1902
handle = heapq.heappop(self._scheduled)
1903
handle._scheduled = False
1904
1905
timeout = None
1906
if self._ready or self._stopping:
1907
timeout = 0
1908
elif self._scheduled:
1909
# Compute the desired timeout.
1910
when = self._scheduled[0]._when
1911
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
1912
1913
event_list = self._selector.select(timeout)
1914
self._process_events(event_list)
1915
# Needed to break cycles when an exception occurs.
1916
event_list = None
1917
1918
# Handle 'later' callbacks that are ready.
1919
end_time = self.time() + self._clock_resolution
1920
while self._scheduled:
1921
handle = self._scheduled[0]
1922
if handle._when >= end_time:
1923
break
1924
handle = heapq.heappop(self._scheduled)
1925
handle._scheduled = False
1926
self._ready.append(handle)
1927
1928
# This is the only place where callbacks are actually *called*.
1929
# All other places just add them to ready.
1930
# Note: We run all currently scheduled callbacks, but not any
1931
# callbacks scheduled by callbacks run this time around --
1932
# they will be run the next time (after another I/O poll).
1933
# Use an idiom that is thread-safe without using locks.
1934
ntodo = len(self._ready)
1935
for i in range(ntodo):
1936
handle = self._ready.popleft()
1937
if handle._cancelled:
1938
continue
1939
if self._debug:
1940
try:
1941
self._current_handle = handle
1942
t0 = self.time()
1943
handle._run()
1944
dt = self.time() - t0
1945
if dt >= self.slow_callback_duration:
1946
logger.warning('Executing %s took %.3f seconds',
1947
_format_handle(handle), dt)
1948
finally:
1949
self._current_handle = None
1950
else:
1951
handle._run()
1952
handle = None # Needed to break cycles when an exception occurs.
1953
1954
def _set_coroutine_origin_tracking(self, enabled):
1955
if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
1956
return
1957
1958
if enabled:
1959
self._coroutine_origin_tracking_saved_depth = (
1960
sys.get_coroutine_origin_tracking_depth())
1961
sys.set_coroutine_origin_tracking_depth(
1962
constants.DEBUG_STACK_DEPTH)
1963
else:
1964
sys.set_coroutine_origin_tracking_depth(
1965
self._coroutine_origin_tracking_saved_depth)
1966
1967
self._coroutine_origin_tracking_enabled = enabled
1968
1969
def get_debug(self):
1970
return self._debug
1971
1972
def set_debug(self, enabled):
1973
self._debug = enabled
1974
1975
if self.is_running():
1976
self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)
1977
1978