Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/events.py
12 views
1
"""Event loop and event loop policy."""
2
3
__all__ = (
4
'AbstractEventLoopPolicy',
5
'AbstractEventLoop', 'AbstractServer',
6
'Handle', 'TimerHandle',
7
'get_event_loop_policy', 'set_event_loop_policy',
8
'get_event_loop', 'set_event_loop', 'new_event_loop',
9
'get_child_watcher', 'set_child_watcher',
10
'_set_running_loop', 'get_running_loop',
11
'_get_running_loop',
12
)
13
14
import contextvars
15
import os
16
import signal
17
import socket
18
import subprocess
19
import sys
20
import threading
21
22
from . import format_helpers
23
24
25
class Handle:
26
"""Object returned by callback registration methods."""
27
28
__slots__ = ('_callback', '_args', '_cancelled', '_loop',
29
'_source_traceback', '_repr', '__weakref__',
30
'_context')
31
32
def __init__(self, callback, args, loop, context=None):
33
if context is None:
34
context = contextvars.copy_context()
35
self._context = context
36
self._loop = loop
37
self._callback = callback
38
self._args = args
39
self._cancelled = False
40
self._repr = None
41
if self._loop.get_debug():
42
self._source_traceback = format_helpers.extract_stack(
43
sys._getframe(1))
44
else:
45
self._source_traceback = None
46
47
def _repr_info(self):
48
info = [self.__class__.__name__]
49
if self._cancelled:
50
info.append('cancelled')
51
if self._callback is not None:
52
info.append(format_helpers._format_callback_source(
53
self._callback, self._args))
54
if self._source_traceback:
55
frame = self._source_traceback[-1]
56
info.append(f'created at {frame[0]}:{frame[1]}')
57
return info
58
59
def __repr__(self):
60
if self._repr is not None:
61
return self._repr
62
info = self._repr_info()
63
return '<{}>'.format(' '.join(info))
64
65
def get_context(self):
66
return self._context
67
68
def cancel(self):
69
if not self._cancelled:
70
self._cancelled = True
71
if self._loop.get_debug():
72
# Keep a representation in debug mode to keep callback and
73
# parameters. For example, to log the warning
74
# "Executing <Handle...> took 2.5 second"
75
self._repr = repr(self)
76
self._callback = None
77
self._args = None
78
79
def cancelled(self):
80
return self._cancelled
81
82
def _run(self):
83
try:
84
self._context.run(self._callback, *self._args)
85
except (SystemExit, KeyboardInterrupt):
86
raise
87
except BaseException as exc:
88
cb = format_helpers._format_callback_source(
89
self._callback, self._args)
90
msg = f'Exception in callback {cb}'
91
context = {
92
'message': msg,
93
'exception': exc,
94
'handle': self,
95
}
96
if self._source_traceback:
97
context['source_traceback'] = self._source_traceback
98
self._loop.call_exception_handler(context)
99
self = None # Needed to break cycles when an exception occurs.
100
101
102
class TimerHandle(Handle):
103
"""Object returned by timed callback registration methods."""
104
105
__slots__ = ['_scheduled', '_when']
106
107
def __init__(self, when, callback, args, loop, context=None):
108
super().__init__(callback, args, loop, context)
109
if self._source_traceback:
110
del self._source_traceback[-1]
111
self._when = when
112
self._scheduled = False
113
114
def _repr_info(self):
115
info = super()._repr_info()
116
pos = 2 if self._cancelled else 1
117
info.insert(pos, f'when={self._when}')
118
return info
119
120
def __hash__(self):
121
return hash(self._when)
122
123
def __lt__(self, other):
124
if isinstance(other, TimerHandle):
125
return self._when < other._when
126
return NotImplemented
127
128
def __le__(self, other):
129
if isinstance(other, TimerHandle):
130
return self._when < other._when or self.__eq__(other)
131
return NotImplemented
132
133
def __gt__(self, other):
134
if isinstance(other, TimerHandle):
135
return self._when > other._when
136
return NotImplemented
137
138
def __ge__(self, other):
139
if isinstance(other, TimerHandle):
140
return self._when > other._when or self.__eq__(other)
141
return NotImplemented
142
143
def __eq__(self, other):
144
if isinstance(other, TimerHandle):
145
return (self._when == other._when and
146
self._callback == other._callback and
147
self._args == other._args and
148
self._cancelled == other._cancelled)
149
return NotImplemented
150
151
def cancel(self):
152
if not self._cancelled:
153
self._loop._timer_handle_cancelled(self)
154
super().cancel()
155
156
def when(self):
157
"""Return a scheduled callback time.
158
159
The time is an absolute timestamp, using the same time
160
reference as loop.time().
161
"""
162
return self._when
163
164
165
class AbstractServer:
166
"""Abstract server returned by create_server()."""
167
168
def close(self):
169
"""Stop serving. This leaves existing connections open."""
170
raise NotImplementedError
171
172
def get_loop(self):
173
"""Get the event loop the Server object is attached to."""
174
raise NotImplementedError
175
176
def is_serving(self):
177
"""Return True if the server is accepting connections."""
178
raise NotImplementedError
179
180
async def start_serving(self):
181
"""Start accepting connections.
182
183
This method is idempotent, so it can be called when
184
the server is already being serving.
185
"""
186
raise NotImplementedError
187
188
async def serve_forever(self):
189
"""Start accepting connections until the coroutine is cancelled.
190
191
The server is closed when the coroutine is cancelled.
192
"""
193
raise NotImplementedError
194
195
async def wait_closed(self):
196
"""Coroutine to wait until service is closed."""
197
raise NotImplementedError
198
199
async def __aenter__(self):
200
return self
201
202
async def __aexit__(self, *exc):
203
self.close()
204
await self.wait_closed()
205
206
207
class AbstractEventLoop:
208
"""Abstract event loop."""
209
210
# Running and stopping the event loop.
211
212
def run_forever(self):
213
"""Run the event loop until stop() is called."""
214
raise NotImplementedError
215
216
def run_until_complete(self, future):
217
"""Run the event loop until a Future is done.
218
219
Return the Future's result, or raise its exception.
220
"""
221
raise NotImplementedError
222
223
def stop(self):
224
"""Stop the event loop as soon as reasonable.
225
226
Exactly how soon that is may depend on the implementation, but
227
no more I/O callbacks should be scheduled.
228
"""
229
raise NotImplementedError
230
231
def is_running(self):
232
"""Return whether the event loop is currently running."""
233
raise NotImplementedError
234
235
def is_closed(self):
236
"""Returns True if the event loop was closed."""
237
raise NotImplementedError
238
239
def close(self):
240
"""Close the loop.
241
242
The loop should not be running.
243
244
This is idempotent and irreversible.
245
246
No other methods should be called after this one.
247
"""
248
raise NotImplementedError
249
250
async def shutdown_asyncgens(self):
251
"""Shutdown all active asynchronous generators."""
252
raise NotImplementedError
253
254
async def shutdown_default_executor(self):
255
"""Schedule the shutdown of the default executor."""
256
raise NotImplementedError
257
258
# Methods scheduling callbacks. All these return Handles.
259
260
def _timer_handle_cancelled(self, handle):
261
"""Notification that a TimerHandle has been cancelled."""
262
raise NotImplementedError
263
264
def call_soon(self, callback, *args, context=None):
265
return self.call_later(0, callback, *args, context=context)
266
267
def call_later(self, delay, callback, *args, context=None):
268
raise NotImplementedError
269
270
def call_at(self, when, callback, *args, context=None):
271
raise NotImplementedError
272
273
def time(self):
274
raise NotImplementedError
275
276
def create_future(self):
277
raise NotImplementedError
278
279
# Method scheduling a coroutine object: create a task.
280
281
def create_task(self, coro, *, name=None, context=None):
282
raise NotImplementedError
283
284
# Methods for interacting with threads.
285
286
def call_soon_threadsafe(self, callback, *args, context=None):
287
raise NotImplementedError
288
289
def run_in_executor(self, executor, func, *args):
290
raise NotImplementedError
291
292
def set_default_executor(self, executor):
293
raise NotImplementedError
294
295
# Network I/O methods returning Futures.
296
297
async def getaddrinfo(self, host, port, *,
298
family=0, type=0, proto=0, flags=0):
299
raise NotImplementedError
300
301
async def getnameinfo(self, sockaddr, flags=0):
302
raise NotImplementedError
303
304
async def create_connection(
305
self, protocol_factory, host=None, port=None,
306
*, ssl=None, family=0, proto=0,
307
flags=0, sock=None, local_addr=None,
308
server_hostname=None,
309
ssl_handshake_timeout=None,
310
ssl_shutdown_timeout=None,
311
happy_eyeballs_delay=None, interleave=None):
312
raise NotImplementedError
313
314
async def create_server(
315
self, protocol_factory, host=None, port=None,
316
*, family=socket.AF_UNSPEC,
317
flags=socket.AI_PASSIVE, sock=None, backlog=100,
318
ssl=None, reuse_address=None, reuse_port=None,
319
ssl_handshake_timeout=None,
320
ssl_shutdown_timeout=None,
321
start_serving=True):
322
"""A coroutine which creates a TCP server bound to host and port.
323
324
The return value is a Server object which can be used to stop
325
the service.
326
327
If host is an empty string or None all interfaces are assumed
328
and a list of multiple sockets will be returned (most likely
329
one for IPv4 and another one for IPv6). The host parameter can also be
330
a sequence (e.g. list) of hosts to bind to.
331
332
family can be set to either AF_INET or AF_INET6 to force the
333
socket to use IPv4 or IPv6. If not set it will be determined
334
from host (defaults to AF_UNSPEC).
335
336
flags is a bitmask for getaddrinfo().
337
338
sock can optionally be specified in order to use a preexisting
339
socket object.
340
341
backlog is the maximum number of queued connections passed to
342
listen() (defaults to 100).
343
344
ssl can be set to an SSLContext to enable SSL over the
345
accepted connections.
346
347
reuse_address tells the kernel to reuse a local socket in
348
TIME_WAIT state, without waiting for its natural timeout to
349
expire. If not specified will automatically be set to True on
350
UNIX.
351
352
reuse_port tells the kernel to allow this endpoint to be bound to
353
the same port as other existing endpoints are bound to, so long as
354
they all set this flag when being created. This option is not
355
supported on Windows.
356
357
ssl_handshake_timeout is the time in seconds that an SSL server
358
will wait for completion of the SSL handshake before aborting the
359
connection. Default is 60s.
360
361
ssl_shutdown_timeout is the time in seconds that an SSL server
362
will wait for completion of the SSL shutdown procedure
363
before aborting the connection. Default is 30s.
364
365
start_serving set to True (default) causes the created server
366
to start accepting connections immediately. When set to False,
367
the user should await Server.start_serving() or Server.serve_forever()
368
to make the server to start accepting connections.
369
"""
370
raise NotImplementedError
371
372
async def sendfile(self, transport, file, offset=0, count=None,
373
*, fallback=True):
374
"""Send a file through a transport.
375
376
Return an amount of sent bytes.
377
"""
378
raise NotImplementedError
379
380
async def start_tls(self, transport, protocol, sslcontext, *,
381
server_side=False,
382
server_hostname=None,
383
ssl_handshake_timeout=None,
384
ssl_shutdown_timeout=None):
385
"""Upgrade a transport to TLS.
386
387
Return a new transport that *protocol* should start using
388
immediately.
389
"""
390
raise NotImplementedError
391
392
async def create_unix_connection(
393
self, protocol_factory, path=None, *,
394
ssl=None, sock=None,
395
server_hostname=None,
396
ssl_handshake_timeout=None,
397
ssl_shutdown_timeout=None):
398
raise NotImplementedError
399
400
async def create_unix_server(
401
self, protocol_factory, path=None, *,
402
sock=None, backlog=100, ssl=None,
403
ssl_handshake_timeout=None,
404
ssl_shutdown_timeout=None,
405
start_serving=True):
406
"""A coroutine which creates a UNIX Domain Socket server.
407
408
The return value is a Server object, which can be used to stop
409
the service.
410
411
path is a str, representing a file system path to bind the
412
server socket to.
413
414
sock can optionally be specified in order to use a preexisting
415
socket object.
416
417
backlog is the maximum number of queued connections passed to
418
listen() (defaults to 100).
419
420
ssl can be set to an SSLContext to enable SSL over the
421
accepted connections.
422
423
ssl_handshake_timeout is the time in seconds that an SSL server
424
will wait for the SSL handshake to complete (defaults to 60s).
425
426
ssl_shutdown_timeout is the time in seconds that an SSL server
427
will wait for the SSL shutdown to finish (defaults to 30s).
428
429
start_serving set to True (default) causes the created server
430
to start accepting connections immediately. When set to False,
431
the user should await Server.start_serving() or Server.serve_forever()
432
to make the server to start accepting connections.
433
"""
434
raise NotImplementedError
435
436
async def connect_accepted_socket(
437
self, protocol_factory, sock,
438
*, ssl=None,
439
ssl_handshake_timeout=None,
440
ssl_shutdown_timeout=None):
441
"""Handle an accepted connection.
442
443
This is used by servers that accept connections outside of
444
asyncio, but use asyncio to handle connections.
445
446
This method is a coroutine. When completed, the coroutine
447
returns a (transport, protocol) pair.
448
"""
449
raise NotImplementedError
450
451
async def create_datagram_endpoint(self, protocol_factory,
452
local_addr=None, remote_addr=None, *,
453
family=0, proto=0, flags=0,
454
reuse_address=None, reuse_port=None,
455
allow_broadcast=None, sock=None):
456
"""A coroutine which creates a datagram endpoint.
457
458
This method will try to establish the endpoint in the background.
459
When successful, the coroutine returns a (transport, protocol) pair.
460
461
protocol_factory must be a callable returning a protocol instance.
462
463
socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on
464
host (or family if specified), socket type SOCK_DGRAM.
465
466
reuse_address tells the kernel to reuse a local socket in
467
TIME_WAIT state, without waiting for its natural timeout to
468
expire. If not specified it will automatically be set to True on
469
UNIX.
470
471
reuse_port tells the kernel to allow this endpoint to be bound to
472
the same port as other existing endpoints are bound to, so long as
473
they all set this flag when being created. This option is not
474
supported on Windows and some UNIX's. If the
475
:py:data:`~socket.SO_REUSEPORT` constant is not defined then this
476
capability is unsupported.
477
478
allow_broadcast tells the kernel to allow this endpoint to send
479
messages to the broadcast address.
480
481
sock can optionally be specified in order to use a preexisting
482
socket object.
483
"""
484
raise NotImplementedError
485
486
# Pipes and subprocesses.
487
488
async def connect_read_pipe(self, protocol_factory, pipe):
489
"""Register read pipe in event loop. Set the pipe to non-blocking mode.
490
491
protocol_factory should instantiate object with Protocol interface.
492
pipe is a file-like object.
493
Return pair (transport, protocol), where transport supports the
494
ReadTransport interface."""
495
# The reason to accept file-like object instead of just file descriptor
496
# is: we need to own pipe and close it at transport finishing
497
# Can got complicated errors if pass f.fileno(),
498
# close fd in pipe transport then close f and vice versa.
499
raise NotImplementedError
500
501
async def connect_write_pipe(self, protocol_factory, pipe):
502
"""Register write pipe in event loop.
503
504
protocol_factory should instantiate object with BaseProtocol interface.
505
Pipe is file-like object already switched to nonblocking.
506
Return pair (transport, protocol), where transport support
507
WriteTransport interface."""
508
# The reason to accept file-like object instead of just file descriptor
509
# is: we need to own pipe and close it at transport finishing
510
# Can got complicated errors if pass f.fileno(),
511
# close fd in pipe transport then close f and vice versa.
512
raise NotImplementedError
513
514
async def subprocess_shell(self, protocol_factory, cmd, *,
515
stdin=subprocess.PIPE,
516
stdout=subprocess.PIPE,
517
stderr=subprocess.PIPE,
518
**kwargs):
519
raise NotImplementedError
520
521
async def subprocess_exec(self, protocol_factory, *args,
522
stdin=subprocess.PIPE,
523
stdout=subprocess.PIPE,
524
stderr=subprocess.PIPE,
525
**kwargs):
526
raise NotImplementedError
527
528
# Ready-based callback registration methods.
529
# The add_*() methods return None.
530
# The remove_*() methods return True if something was removed,
531
# False if there was nothing to delete.
532
533
def add_reader(self, fd, callback, *args):
534
raise NotImplementedError
535
536
def remove_reader(self, fd):
537
raise NotImplementedError
538
539
def add_writer(self, fd, callback, *args):
540
raise NotImplementedError
541
542
def remove_writer(self, fd):
543
raise NotImplementedError
544
545
# Completion based I/O methods returning Futures.
546
547
async def sock_recv(self, sock, nbytes):
548
raise NotImplementedError
549
550
async def sock_recv_into(self, sock, buf):
551
raise NotImplementedError
552
553
async def sock_recvfrom(self, sock, bufsize):
554
raise NotImplementedError
555
556
async def sock_recvfrom_into(self, sock, buf, nbytes=0):
557
raise NotImplementedError
558
559
async def sock_sendall(self, sock, data):
560
raise NotImplementedError
561
562
async def sock_sendto(self, sock, data, address):
563
raise NotImplementedError
564
565
async def sock_connect(self, sock, address):
566
raise NotImplementedError
567
568
async def sock_accept(self, sock):
569
raise NotImplementedError
570
571
async def sock_sendfile(self, sock, file, offset=0, count=None,
572
*, fallback=None):
573
raise NotImplementedError
574
575
# Signal handling.
576
577
def add_signal_handler(self, sig, callback, *args):
578
raise NotImplementedError
579
580
def remove_signal_handler(self, sig):
581
raise NotImplementedError
582
583
# Task factory.
584
585
def set_task_factory(self, factory):
586
raise NotImplementedError
587
588
def get_task_factory(self):
589
raise NotImplementedError
590
591
# Error handlers.
592
593
def get_exception_handler(self):
594
raise NotImplementedError
595
596
def set_exception_handler(self, handler):
597
raise NotImplementedError
598
599
def default_exception_handler(self, context):
600
raise NotImplementedError
601
602
def call_exception_handler(self, context):
603
raise NotImplementedError
604
605
# Debug flag management.
606
607
def get_debug(self):
608
raise NotImplementedError
609
610
def set_debug(self, enabled):
611
raise NotImplementedError
612
613
614
class AbstractEventLoopPolicy:
615
"""Abstract policy for accessing the event loop."""
616
617
def get_event_loop(self):
618
"""Get the event loop for the current context.
619
620
Returns an event loop object implementing the BaseEventLoop interface,
621
or raises an exception in case no event loop has been set for the
622
current context and the current policy does not specify to create one.
623
624
It should never return None."""
625
raise NotImplementedError
626
627
def set_event_loop(self, loop):
628
"""Set the event loop for the current context to loop."""
629
raise NotImplementedError
630
631
def new_event_loop(self):
632
"""Create and return a new event loop object according to this
633
policy's rules. If there's need to set this loop as the event loop for
634
the current context, set_event_loop must be called explicitly."""
635
raise NotImplementedError
636
637
# Child processes handling (Unix only).
638
639
def get_child_watcher(self):
640
"Get the watcher for child processes."
641
raise NotImplementedError
642
643
def set_child_watcher(self, watcher):
644
"""Set the watcher for child processes."""
645
raise NotImplementedError
646
647
648
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
649
"""Default policy implementation for accessing the event loop.
650
651
In this policy, each thread has its own event loop. However, we
652
only automatically create an event loop by default for the main
653
thread; other threads by default have no event loop.
654
655
Other policies may have different rules (e.g. a single global
656
event loop, or automatically creating an event loop per thread, or
657
using some other notion of context to which an event loop is
658
associated).
659
"""
660
661
_loop_factory = None
662
663
class _Local(threading.local):
664
_loop = None
665
_set_called = False
666
667
def __init__(self):
668
self._local = self._Local()
669
670
def get_event_loop(self):
671
"""Get the event loop for the current context.
672
673
Returns an instance of EventLoop or raises an exception.
674
"""
675
if (self._local._loop is None and
676
not self._local._set_called and
677
threading.current_thread() is threading.main_thread()):
678
stacklevel = 2
679
try:
680
f = sys._getframe(1)
681
except AttributeError:
682
pass
683
else:
684
# Move up the call stack so that the warning is attached
685
# to the line outside asyncio itself.
686
while f:
687
module = f.f_globals.get('__name__')
688
if not (module == 'asyncio' or module.startswith('asyncio.')):
689
break
690
f = f.f_back
691
stacklevel += 1
692
import warnings
693
warnings.warn('There is no current event loop',
694
DeprecationWarning, stacklevel=stacklevel)
695
self.set_event_loop(self.new_event_loop())
696
697
if self._local._loop is None:
698
raise RuntimeError('There is no current event loop in thread %r.'
699
% threading.current_thread().name)
700
701
return self._local._loop
702
703
def set_event_loop(self, loop):
704
"""Set the event loop."""
705
self._local._set_called = True
706
if loop is not None and not isinstance(loop, AbstractEventLoop):
707
raise TypeError(f"loop must be an instance of AbstractEventLoop or None, not '{type(loop).__name__}'")
708
self._local._loop = loop
709
710
def new_event_loop(self):
711
"""Create a new event loop.
712
713
You must call set_event_loop() to make this the current event
714
loop.
715
"""
716
return self._loop_factory()
717
718
719
# Event loop policy. The policy itself is always global, even if the
720
# policy's rules say that there is an event loop per thread (or other
721
# notion of context). The default policy is installed by the first
722
# call to get_event_loop_policy().
723
_event_loop_policy = None
724
725
# Lock for protecting the on-the-fly creation of the event loop policy.
726
_lock = threading.Lock()
727
728
729
# A TLS for the running event loop, used by _get_running_loop.
730
class _RunningLoop(threading.local):
731
loop_pid = (None, None)
732
733
734
_running_loop = _RunningLoop()
735
736
737
def get_running_loop():
738
"""Return the running event loop. Raise a RuntimeError if there is none.
739
740
This function is thread-specific.
741
"""
742
# NOTE: this function is implemented in C (see _asynciomodule.c)
743
loop = _get_running_loop()
744
if loop is None:
745
raise RuntimeError('no running event loop')
746
return loop
747
748
749
def _get_running_loop():
750
"""Return the running event loop or None.
751
752
This is a low-level function intended to be used by event loops.
753
This function is thread-specific.
754
"""
755
# NOTE: this function is implemented in C (see _asynciomodule.c)
756
running_loop, pid = _running_loop.loop_pid
757
if running_loop is not None and pid == os.getpid():
758
return running_loop
759
760
761
def _set_running_loop(loop):
762
"""Set the running event loop.
763
764
This is a low-level function intended to be used by event loops.
765
This function is thread-specific.
766
"""
767
# NOTE: this function is implemented in C (see _asynciomodule.c)
768
_running_loop.loop_pid = (loop, os.getpid())
769
770
771
def _init_event_loop_policy():
772
global _event_loop_policy
773
with _lock:
774
if _event_loop_policy is None: # pragma: no branch
775
from . import DefaultEventLoopPolicy
776
_event_loop_policy = DefaultEventLoopPolicy()
777
778
779
def get_event_loop_policy():
780
"""Get the current event loop policy."""
781
if _event_loop_policy is None:
782
_init_event_loop_policy()
783
return _event_loop_policy
784
785
786
def set_event_loop_policy(policy):
787
"""Set the current event loop policy.
788
789
If policy is None, the default policy is restored."""
790
global _event_loop_policy
791
if policy is not None and not isinstance(policy, AbstractEventLoopPolicy):
792
raise TypeError(f"policy must be an instance of AbstractEventLoopPolicy or None, not '{type(policy).__name__}'")
793
_event_loop_policy = policy
794
795
796
def get_event_loop():
797
"""Return an asyncio event loop.
798
799
When called from a coroutine or a callback (e.g. scheduled with call_soon
800
or similar API), this function will always return the running event loop.
801
802
If there is no running event loop set, the function will return
803
the result of `get_event_loop_policy().get_event_loop()` call.
804
"""
805
# NOTE: this function is implemented in C (see _asynciomodule.c)
806
current_loop = _get_running_loop()
807
if current_loop is not None:
808
return current_loop
809
return get_event_loop_policy().get_event_loop()
810
811
812
def set_event_loop(loop):
813
"""Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
814
get_event_loop_policy().set_event_loop(loop)
815
816
817
def new_event_loop():
818
"""Equivalent to calling get_event_loop_policy().new_event_loop()."""
819
return get_event_loop_policy().new_event_loop()
820
821
822
def get_child_watcher():
823
"""Equivalent to calling get_event_loop_policy().get_child_watcher()."""
824
return get_event_loop_policy().get_child_watcher()
825
826
827
def set_child_watcher(watcher):
828
"""Equivalent to calling
829
get_event_loop_policy().set_child_watcher(watcher)."""
830
return get_event_loop_policy().set_child_watcher(watcher)
831
832
833
# Alias pure-Python implementations for testing purposes.
834
_py__get_running_loop = _get_running_loop
835
_py__set_running_loop = _set_running_loop
836
_py_get_running_loop = get_running_loop
837
_py_get_event_loop = get_event_loop
838
839
840
try:
841
# get_event_loop() is one of the most frequently called
842
# functions in asyncio. Pure Python implementation is
843
# about 4 times slower than C-accelerated.
844
from _asyncio import (_get_running_loop, _set_running_loop,
845
get_running_loop, get_event_loop)
846
except ImportError:
847
pass
848
else:
849
# Alias C implementations for testing purposes.
850
_c__get_running_loop = _get_running_loop
851
_c__set_running_loop = _set_running_loop
852
_c_get_running_loop = get_running_loop
853
_c_get_event_loop = get_event_loop
854
855
856
if hasattr(os, 'fork'):
857
def on_fork():
858
# Reset the loop and wakeupfd in the forked child process.
859
if _event_loop_policy is not None:
860
_event_loop_policy._local = BaseDefaultEventLoopPolicy._Local()
861
_set_running_loop(None)
862
signal.set_wakeup_fd(-1)
863
864
os.register_at_fork(after_in_child=on_fork)
865
866