Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
quarto-dev
GitHub Repository: quarto-dev/quarto-cli
Path: blob/main/src/resources/jupyter/jupyter.py
12921 views
1
import os
2
import sys
3
import re
4
import json
5
import stat
6
import pprint
7
import uuid
8
import signal
9
import subprocess
10
11
from socketserver import TCPServer, StreamRequestHandler
12
13
try:
14
from socketserver import UnixStreamServer
15
except:
16
pass
17
18
19
from log import log_init, log, log_error, trace
20
from notebook import notebook_execute, RestartKernel
21
from nbclient.exceptions import CellExecutionError
22
23
import asyncio
24
25
if sys.platform == "win32":
26
from asyncio.windows_events import *
27
28
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
29
30
31
class ExecuteHandler(StreamRequestHandler):
32
def handle(self):
33
try:
34
trace("handling server request")
35
36
# read input
37
input = str(self.rfile.readline().strip(), "utf-8")
38
input = json.loads(input)
39
40
# validate secret
41
if not self.server.validate_secret(input["secret"]):
42
trace("invalid secret (exiting server)")
43
self.server.request_exit()
44
return
45
if input["command"] == "file":
46
filename = input["options"]["file"]
47
input = json.load(open(filename, "r", encoding="utf8"))
48
os.unlink(filename)
49
50
# if this is an abort command then request exit
51
command = input["command"]
52
if command == "abort":
53
trace("abort command received (exiting server)")
54
self.server.request_exit()
55
return
56
57
# options
58
options = input["options"]
59
60
# stream status back to client
61
def status(msg):
62
self.message("status", msg)
63
64
# execute the notebook
65
trace("executing notebook")
66
persist = notebook_execute(options, status)
67
if not persist:
68
trace("notebook not persistable (exiting server)")
69
self.server.request_exit()
70
else:
71
self.server.record_success()
72
except RestartKernel:
73
trace("notebook restart request recived (exiting server)")
74
self.message("restart")
75
self.server.request_exit()
76
except Exception as e:
77
self.message("error", "\n\n" + str(e))
78
self.server.record_error(e)
79
80
# write a message back to the client
81
def message(self, type, data=""):
82
message = {"type": type, "data": data}
83
self.wfile.write(bytearray(json.dumps(message) + "\n", "utf-8"))
84
self.wfile.flush()
85
86
87
def execute_server(options):
88
# determine server type
89
is_tcp = options["type"] == "tcp"
90
if is_tcp:
91
base = TCPServer
92
else:
93
base = UnixStreamServer
94
95
class ExecuteServer(base):
96
allow_reuse_address = True
97
exit_pending = False
98
consecutive_errors = 0
99
100
def __init__(self, options):
101
trace(
102
"creating notebook server ("
103
+ options["type"]
104
+ ": "
105
+ options["transport"]
106
+ ")"
107
)
108
109
# set secret for tcp
110
if is_tcp:
111
self.secret = str(uuid.uuid4())
112
else:
113
self.secret = ""
114
115
# server params
116
self.transport = options["transport"]
117
self.timeout = options["timeout"]
118
119
# initialize with address (based on server type) and handler
120
if is_tcp:
121
server_address = ("localhost", 0)
122
else:
123
server_address = self.transport
124
super().__init__(server_address, ExecuteHandler)
125
126
# if we are a tcp server then get the port number and write it
127
# to the transport file. change file permissions to user r/w
128
# for both tcp and unix domain sockets
129
if is_tcp:
130
port = self.socket.getsockname()[1]
131
trace("notebook server bound to port " + str(port))
132
with open(self.transport, "w") as file:
133
file.write("")
134
os.chmod(self.transport, stat.S_IRUSR | stat.S_IWUSR)
135
with open(self.transport, "w") as file:
136
file.write(json.dumps(dict({"port": port, "secret": self.secret})))
137
else:
138
os.chmod(self.transport, stat.S_IRUSR | stat.S_IWUSR)
139
140
def handle_request(self):
141
if self.exit_pending:
142
self.exit()
143
super().handle_request()
144
145
def handle_timeout(self):
146
trace("request timeout (exiting server)")
147
self.exit()
148
149
def validate_secret(self, secret):
150
return self.secret == secret
151
152
def record_success(self):
153
self.consecutive_errors = 0
154
155
def record_error(self, e):
156
# exit for 5 consecutive errors
157
self.consecutive_errors += 1
158
if self.consecutive_errors >= 5:
159
self.exit()
160
161
def request_exit(self):
162
self.exit_pending = True
163
164
def exit(self):
165
try:
166
trace("cleaning up server resources")
167
self.server_close()
168
self.remove_transport()
169
170
finally:
171
trace("exiting server")
172
sys.exit(0)
173
174
def remove_transport(self):
175
try:
176
if os.path.exists(self.transport):
177
os.remove(self.transport)
178
except:
179
pass
180
181
return ExecuteServer(options)
182
183
184
def run_server(options):
185
try:
186
with execute_server(options) as server:
187
while True:
188
server.handle_request()
189
except Exception as e:
190
log_error("Unable to run server", exc_info=e)
191
192
193
# run a server as a detached subprocess
194
def run_server_subprocess(options, status):
195
# python executable
196
python_exe = sys.executable
197
198
# detached process flags for windows
199
flags = 0
200
if sys.platform == "win32":
201
python_exe = re.sub("python\\.exe$", "pythonw.exe", python_exe)
202
flags |= 0x00000008 # DETACHED_PROCESS
203
flags |= 0x00000200 # CREATE_NEW_PROCESS_GROUP
204
flags |= 0x08000000 # CREATE_NO_WINDOW
205
flags |= 0x01000000 # CREATE_BREAKAWAY_FROM_JOB
206
207
# forward options via env vars
208
os.environ["QUARTO_JUPYTER_OPTIONS"] = json.dumps(options)
209
210
# create subprocess
211
subprocess.Popen(
212
[python_exe] + sys.argv + ["serve"],
213
stdin=subprocess.DEVNULL,
214
stdout=subprocess.DEVNULL,
215
stderr=subprocess.DEVNULL,
216
creationflags=flags,
217
close_fds=True,
218
start_new_session=True,
219
)
220
221
222
# run a notebook directly (not a server)
223
def run_notebook(options, status):
224
# run notebook w/ some special exception handling. note that we don't
225
# log exceptions here b/c they are considered normal course of execution
226
# for errors that occur in notebook cells
227
try:
228
trace("Running notebook_execute")
229
notebook_execute(options, status)
230
except Exception as e:
231
trace(f"run_notebook caught exception: {type(e).__name__}")
232
# CellExecutionError for execution at the terminal includes a bunch
233
# of extra stack frames internal to this script. remove them
234
msg = str(e)
235
kCellExecutionError = "nbclient.exceptions.CellExecutionError: "
236
loc = msg.find(kCellExecutionError)
237
if loc != -1:
238
msg = msg[loc + len(kCellExecutionError) :]
239
status("\n\n" + msg + "\n")
240
sys.exit(1)
241
242
243
if __name__ == "__main__":
244
# stream status to stderr
245
def status(msg):
246
sys.stderr.write(msg)
247
sys.stderr.flush()
248
249
try:
250
# read command from cmd line if it's there (in that case
251
# options are passed via environment variable)
252
if len(sys.argv) > 1:
253
command = sys.argv[1]
254
options = json.loads(os.getenv("QUARTO_JUPYTER_OPTIONS"))
255
del os.environ["QUARTO_JUPYTER_OPTIONS"]
256
# otherwise read from stdin
257
else:
258
sys.stdin.reconfigure(encoding='utf-8')
259
input = json.load(sys.stdin)
260
command = input["command"]
261
options = input["options"]
262
263
# initialize log
264
log_init(options["log"], options["debug"])
265
266
# start the server (creates a new detached process, we implement this here
267
# only b/c Deno doesn't currently support detaching spawned processes)
268
if command == "start":
269
trace("starting notebook server subprocess")
270
run_server_subprocess(options, status)
271
272
# serve a notebook (invoked by run_server_subprocess)
273
elif command == "serve":
274
trace("running notebook server subprocess")
275
run_server(options)
276
277
# execute a notebook and then quit
278
elif command == "execute":
279
trace("running notebook without keepalive")
280
run_notebook(options, status)
281
282
except Exception as e:
283
log_error("Unable to run notebook", exc_info=e)
284
sys.exit(1)
285
286