Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/runners.py
12 views
1
__all__ = ('Runner', 'run')
2
3
import contextvars
4
import enum
5
import functools
6
import threading
7
import signal
8
from . import coroutines
9
from . import events
10
from . import exceptions
11
from . import tasks
12
from . import constants
13
14
class _State(enum.Enum):
15
CREATED = "created"
16
INITIALIZED = "initialized"
17
CLOSED = "closed"
18
19
20
class Runner:
21
"""A context manager that controls event loop life cycle.
22
23
The context manager always creates a new event loop,
24
allows to run async functions inside it,
25
and properly finalizes the loop at the context manager exit.
26
27
If debug is True, the event loop will be run in debug mode.
28
If loop_factory is passed, it is used for new event loop creation.
29
30
asyncio.run(main(), debug=True)
31
32
is a shortcut for
33
34
with asyncio.Runner(debug=True) as runner:
35
runner.run(main())
36
37
The run() method can be called multiple times within the runner's context.
38
39
This can be useful for interactive console (e.g. IPython),
40
unittest runners, console tools, -- everywhere when async code
41
is called from existing sync framework and where the preferred single
42
asyncio.run() call doesn't work.
43
44
"""
45
46
# Note: the class is final, it is not intended for inheritance.
47
48
def __init__(self, *, debug=None, loop_factory=None):
49
self._state = _State.CREATED
50
self._debug = debug
51
self._loop_factory = loop_factory
52
self._loop = None
53
self._context = None
54
self._interrupt_count = 0
55
self._set_event_loop = False
56
57
def __enter__(self):
58
self._lazy_init()
59
return self
60
61
def __exit__(self, exc_type, exc_val, exc_tb):
62
self.close()
63
64
def close(self):
65
"""Shutdown and close event loop."""
66
if self._state is not _State.INITIALIZED:
67
return
68
try:
69
loop = self._loop
70
_cancel_all_tasks(loop)
71
loop.run_until_complete(loop.shutdown_asyncgens())
72
loop.run_until_complete(
73
loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT))
74
finally:
75
if self._set_event_loop:
76
events.set_event_loop(None)
77
loop.close()
78
self._loop = None
79
self._state = _State.CLOSED
80
81
def get_loop(self):
82
"""Return embedded event loop."""
83
self._lazy_init()
84
return self._loop
85
86
def run(self, coro, *, context=None):
87
"""Run a coroutine inside the embedded event loop."""
88
if not coroutines.iscoroutine(coro):
89
raise ValueError("a coroutine was expected, got {!r}".format(coro))
90
91
if events._get_running_loop() is not None:
92
# fail fast with short traceback
93
raise RuntimeError(
94
"Runner.run() cannot be called from a running event loop")
95
96
self._lazy_init()
97
98
if context is None:
99
context = self._context
100
task = self._loop.create_task(coro, context=context)
101
102
if (threading.current_thread() is threading.main_thread()
103
and signal.getsignal(signal.SIGINT) is signal.default_int_handler
104
):
105
sigint_handler = functools.partial(self._on_sigint, main_task=task)
106
try:
107
signal.signal(signal.SIGINT, sigint_handler)
108
except ValueError:
109
# `signal.signal` may throw if `threading.main_thread` does
110
# not support signals (e.g. embedded interpreter with signals
111
# not registered - see gh-91880)
112
sigint_handler = None
113
else:
114
sigint_handler = None
115
116
self._interrupt_count = 0
117
try:
118
return self._loop.run_until_complete(task)
119
except exceptions.CancelledError:
120
if self._interrupt_count > 0:
121
uncancel = getattr(task, "uncancel", None)
122
if uncancel is not None and uncancel() == 0:
123
raise KeyboardInterrupt()
124
raise # CancelledError
125
finally:
126
if (sigint_handler is not None
127
and signal.getsignal(signal.SIGINT) is sigint_handler
128
):
129
signal.signal(signal.SIGINT, signal.default_int_handler)
130
131
def _lazy_init(self):
132
if self._state is _State.CLOSED:
133
raise RuntimeError("Runner is closed")
134
if self._state is _State.INITIALIZED:
135
return
136
if self._loop_factory is None:
137
self._loop = events.new_event_loop()
138
if not self._set_event_loop:
139
# Call set_event_loop only once to avoid calling
140
# attach_loop multiple times on child watchers
141
events.set_event_loop(self._loop)
142
self._set_event_loop = True
143
else:
144
self._loop = self._loop_factory()
145
if self._debug is not None:
146
self._loop.set_debug(self._debug)
147
self._context = contextvars.copy_context()
148
self._state = _State.INITIALIZED
149
150
def _on_sigint(self, signum, frame, main_task):
151
self._interrupt_count += 1
152
if self._interrupt_count == 1 and not main_task.done():
153
main_task.cancel()
154
# wakeup loop if it is blocked by select() with long timeout
155
self._loop.call_soon_threadsafe(lambda: None)
156
return
157
raise KeyboardInterrupt()
158
159
160
def run(main, *, debug=None, loop_factory=None):
161
"""Execute the coroutine and return the result.
162
163
This function runs the passed coroutine, taking care of
164
managing the asyncio event loop, finalizing asynchronous
165
generators and closing the default executor.
166
167
This function cannot be called when another asyncio event loop is
168
running in the same thread.
169
170
If debug is True, the event loop will be run in debug mode.
171
172
This function always creates a new event loop and closes it at the end.
173
It should be used as a main entry point for asyncio programs, and should
174
ideally only be called once.
175
176
The executor is given a timeout duration of 5 minutes to shutdown.
177
If the executor hasn't finished within that duration, a warning is
178
emitted and the executor is closed.
179
180
Example:
181
182
async def main():
183
await asyncio.sleep(1)
184
print('hello')
185
186
asyncio.run(main())
187
"""
188
if events._get_running_loop() is not None:
189
# fail fast with short traceback
190
raise RuntimeError(
191
"asyncio.run() cannot be called from a running event loop")
192
193
with Runner(debug=debug, loop_factory=loop_factory) as runner:
194
return runner.run(main)
195
196
197
def _cancel_all_tasks(loop):
198
to_cancel = tasks.all_tasks(loop)
199
if not to_cancel:
200
return
201
202
for task in to_cancel:
203
task.cancel()
204
205
loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
206
207
for task in to_cancel:
208
if task.cancelled():
209
continue
210
if task.exception() is not None:
211
loop.call_exception_handler({
212
'message': 'unhandled exception during asyncio.run() shutdown',
213
'exception': task.exception(),
214
'task': task,
215
})
216
217