Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/__main__.py
12 views
1
import ast
2
import asyncio
3
import code
4
import concurrent.futures
5
import inspect
6
import sys
7
import threading
8
import types
9
import warnings
10
11
from . import futures
12
13
14
class AsyncIOInteractiveConsole(code.InteractiveConsole):
15
16
def __init__(self, locals, loop):
17
super().__init__(locals)
18
self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
19
20
self.loop = loop
21
22
def runcode(self, code):
23
future = concurrent.futures.Future()
24
25
def callback():
26
global repl_future
27
global repl_future_interrupted
28
29
repl_future = None
30
repl_future_interrupted = False
31
32
func = types.FunctionType(code, self.locals)
33
try:
34
coro = func()
35
except SystemExit:
36
raise
37
except KeyboardInterrupt as ex:
38
repl_future_interrupted = True
39
future.set_exception(ex)
40
return
41
except BaseException as ex:
42
future.set_exception(ex)
43
return
44
45
if not inspect.iscoroutine(coro):
46
future.set_result(coro)
47
return
48
49
try:
50
repl_future = self.loop.create_task(coro)
51
futures._chain_future(repl_future, future)
52
except BaseException as exc:
53
future.set_exception(exc)
54
55
loop.call_soon_threadsafe(callback)
56
57
try:
58
return future.result()
59
except SystemExit:
60
raise
61
except BaseException:
62
if repl_future_interrupted:
63
self.write("\nKeyboardInterrupt\n")
64
else:
65
self.showtraceback()
66
67
68
class REPLThread(threading.Thread):
69
70
def run(self):
71
try:
72
banner = (
73
f'asyncio REPL {sys.version} on {sys.platform}\n'
74
f'Use "await" directly instead of "asyncio.run()".\n'
75
f'Type "help", "copyright", "credits" or "license" '
76
f'for more information.\n'
77
f'{getattr(sys, "ps1", ">>> ")}import asyncio'
78
)
79
80
console.interact(
81
banner=banner,
82
exitmsg='exiting asyncio REPL...')
83
finally:
84
warnings.filterwarnings(
85
'ignore',
86
message=r'^coroutine .* was never awaited$',
87
category=RuntimeWarning)
88
89
loop.call_soon_threadsafe(loop.stop)
90
91
92
if __name__ == '__main__':
93
loop = asyncio.new_event_loop()
94
asyncio.set_event_loop(loop)
95
96
repl_locals = {'asyncio': asyncio}
97
for key in {'__name__', '__package__',
98
'__loader__', '__spec__',
99
'__builtins__', '__file__'}:
100
repl_locals[key] = locals()[key]
101
102
console = AsyncIOInteractiveConsole(repl_locals, loop)
103
104
repl_future = None
105
repl_future_interrupted = False
106
107
try:
108
import readline # NoQA
109
except ImportError:
110
pass
111
112
repl_thread = REPLThread()
113
repl_thread.daemon = True
114
repl_thread.start()
115
116
while True:
117
try:
118
loop.run_forever()
119
except KeyboardInterrupt:
120
if repl_future and not repl_future.done():
121
repl_future.cancel()
122
repl_future_interrupted = True
123
continue
124
else:
125
break
126
127