Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemath
GitHub Repository: sagemath/sagecell
Path: blob/master/handlers.py
447 views
1
import asyncio
2
import base64
3
import collections
4
import json
5
import os.path
6
import re
7
import time
8
import uuid
9
import zlib
10
11
from tornado.escape import url_escape
12
import tornado.gen
13
import tornado.ioloop
14
import tornado.web
15
import tornado.websocket
16
import sockjs.tornado
17
from zmq.utils import jsonapi
18
19
from log import StatsMessage, logger, stats_logger
20
21
22
try:
23
from sage.all import gap, gp, maxima, r, singular
24
tab_completion = {
25
"gap": gap._tab_completion(),
26
"gp": gp._tab_completion(),
27
"maxima": maxima._tab_completion(),
28
"r": r._tab_completion(),
29
"singular": singular._tab_completion()
30
}
31
except ImportError:
32
tab_completion = {}
33
34
import misc
35
config = misc.Config()
36
37
38
class RootHandler(tornado.web.RequestHandler):
39
"""
40
Root URL request handler.
41
42
This renders templates/root.html, which optionally inserts
43
specified preloaded code during the rendering process.
44
45
There are three ways currently supported to specify
46
preloading code:
47
48
``<root_url>?c=<code>`` loads 'plaintext' code
49
``<root_url>?z=<base64>`` loads base64-compressed code
50
```<root_url>?q=<uuid>`` loads code from a database based
51
upon a unique identifying permalink (uuid4-based)
52
"""
53
54
async def get(self):
55
logger.debug('RootHandler.get')
56
args = self.request.arguments
57
code = None
58
lang = self.get_argument("lang", None)
59
interacts = None
60
if "c" in args:
61
# The code is explicitly specified
62
code = self.get_argument("c")
63
elif "z" in args:
64
# The code is base64-compressed
65
def get_decompressed(name):
66
a = args[name][-1]
67
# We allow the user to strip off the ``=`` padding at the end
68
# so that the URL doesn't have to have any escaping.
69
# Here we add back the ``=`` padding if we need it.
70
a += b"=" * ((4 - (len(a) % 4)) % 4)
71
return zlib.decompress(
72
base64.urlsafe_b64decode(a)).decode("utf8")
73
74
try:
75
code = get_decompressed("z")
76
if "interacts" in args:
77
interacts = get_decompressed("interacts")
78
except Exception as e:
79
self.set_status(400)
80
self.finish("Invalid zipped code: %s" % e)
81
return
82
elif "q" in args:
83
# The code is referenced by a permalink identifier.
84
q = self.get_argument("q")
85
try:
86
code, lang, interacts = await self.application.db.get(q)
87
except LookupError:
88
logger.warning("ID not found in permalink database %s", q)
89
self.set_status(404)
90
self.finish("ID not found in permalink database")
91
return
92
if code is not None:
93
code = url_escape(code, plus=False)
94
if interacts is not None:
95
interacts = url_escape(interacts, plus=False)
96
autoeval = self.get_argument(
97
"autoeval", "false" if code is None else "true" )
98
self.render(
99
"root.html",
100
code=code, lang=lang, interacts=interacts, autoeval=autoeval)
101
102
def options(self):
103
self.set_status(200)
104
105
106
class HelpHandler(tornado.web.RequestHandler):
107
"""
108
Render templates/help.html.
109
"""
110
def get(self):
111
self.render("help.html")
112
113
114
class KernelHandler(tornado.web.RequestHandler):
115
"""
116
Kernel startup request handler.
117
118
This starts up an IPython kernel on an untrusted account
119
and returns the associated kernel id and a url to request
120
websocket connections for a websocket-ZMQ bridge back to
121
the kernel in a JSON-compatible message.
122
123
The returned websocket url is not entirely complete, in
124
that it is the base url to be used for two different
125
websocket connections (corresponding to the shell and
126
iopub streams) of the IPython kernel. It is the
127
responsiblity of the client to request the correct URLs
128
for these websockets based on the following pattern:
129
130
``<ws_url>/iopub`` is the expected iopub stream url
131
``<ws_url>/shell`` is the expected shell stream url
132
"""
133
134
async def post(self, *args, **kwargs):
135
method = self.get_argument("method", "POST")
136
if method == "DELETE":
137
self.delete(*args, **kwargs)
138
elif method == "OPTIONS":
139
self.options(*args, **kwargs)
140
else:
141
if config.get("requires_tos") and \
142
self.get_argument("accepted_tos", "false") != "true":
143
self.set_status(403)
144
self.finish()
145
return
146
logger.info('starting kernel for session '
147
+ self.get_argument('CellSessionID', '(no ID)'))
148
proto = self.request.protocol.replace("http", "ws", 1)
149
host = self.request.host
150
ws_url = "%s://%s/" % (proto, host)
151
timeout = min(float(self.get_argument("timeout", 0)),
152
config.get("max_timeout"))
153
kernel = await self.application.kernel_dealer.get_kernel(
154
rlimits=config.get("provider_settings")["preforked_rlimits"],
155
lifespan=config.get("max_lifespan"),
156
timeout=timeout)
157
kernel.referer=self.request.headers.get('Referer', '')
158
kernel.remote_ip=self.request.remote_ip
159
data = {"ws_url": ws_url, "id": kernel.id}
160
self.set_header("Jupyter-Kernel-ID", kernel.id)
161
self.write(self.permissions(data))
162
self.finish()
163
164
def delete(self, kernel_id):
165
try:
166
self.application.kernel_dealer.kernel(kernel_id).stop()
167
except KeyError:
168
logger.debug("DELETE for non-existing kernel %s", kernel_id)
169
self.permissions()
170
self.finish()
171
172
def options(self, kernel_id=None):
173
self.permissions()
174
self.finish()
175
176
def permissions(self, data=None):
177
if "frame" in self.request.arguments:
178
data = '<script>parent.postMessage(%r,"*");</script>' % json.dumps(data)
179
self.set_header("Content-Type", "text/html")
180
elif "Origin" in self.request.headers:
181
self.set_header(
182
"Access-Control-Allow-Origin", self.request.headers["Origin"])
183
self.set_header(
184
"Access-Control-Allow-Credentials", "true")
185
self.set_header(
186
"Access-Control-Allow-Methods", "POST, GET, OPTIONS, DELETE")
187
self.set_header(
188
"Access-Control-Allow-Headers", "X-XSRFToken, Content-Type")
189
return data
190
191
192
class Completer(object):
193
194
name_pattern = re.compile(r"\b[a-z_]\w*$", re.IGNORECASE)
195
196
def __init__(self, kernel_dealer):
197
self.waiting = {}
198
def cb(task):
199
self.kernel = task.result()
200
self.kernel.channels["shell"].on_recv(self.on_recv)
201
logger.info("completer kernel ready")
202
asyncio.ensure_future(kernel_dealer.get_kernel()).add_done_callback(cb)
203
204
def registerRequest(self, addr, msg):
205
content = msg["content"]
206
mode = content.get("mode", "sage")
207
if mode in ("sage", "python"):
208
self.waiting[msg["header"]["msg_id"]] = addr
209
self.kernel.session.send(self.kernel.channels["shell"], msg)
210
return
211
match = Completer.name_pattern.search(
212
content["line"][:content["cursor_pos"]])
213
response = {
214
"channel": "shell",
215
"header": {
216
"msg_id": str(uuid.uuid4()),
217
"username": "",
218
"session": self.kernel.id,
219
"msg_type": "complete_reply"
220
},
221
"parent_header": msg["header"],
222
"metadata": {},
223
"content": {
224
"matches": [t for t in tab_completion.get(mode, [])
225
if t.startswith(match.group())],
226
"cursor_start": match.start(),
227
},
228
}
229
addr.send("complete," + jsonapi.dumps(response))
230
231
def on_recv(self, msg):
232
msg = self.kernel.session.feed_identities(msg)[1]
233
msg = self.kernel.session.deserialize(msg)
234
addr = self.waiting.pop(msg["parent_header"]["msg_id"])
235
addr.send(b"complete," + jsonapi.dumps(msg, default=misc.sage_json))
236
237
238
class SockJSHandler(sockjs.tornado.SockJSConnection):
239
240
def on_open(self, request):
241
self.channels = {}
242
243
def on_message(self, message):
244
prefix, message = message.split(",", 1)
245
id = prefix.split("/", 1)[0]
246
message = jsonapi.loads(message)
247
logger.debug("SockJSHandler.on_message: %s", message)
248
msg_type = message["header"]["msg_type"]
249
app = self.session.handler.application
250
if id == "complete":
251
if msg_type in ("complete_request", "object_info_request"):
252
app.completer.registerRequest(self, message)
253
return
254
try:
255
kernel = app.kernel_dealer.kernel(id)
256
except KeyError:
257
# Ignore messages to nonexistent or killed kernels.
258
logger.warning("%s sent to nonexistent kernel %s", msg_type, id)
259
return
260
if id not in self.channels:
261
self.channels[id] = SockJSChannelsHandler(self.send)
262
self.channels[id].connect(kernel)
263
if msg_type == "execute_request":
264
stats_logger.info(StatsMessage(
265
kernel_id=id,
266
remote_ip=kernel.remote_ip,
267
referer=kernel.referer,
268
code=message["content"]["code"],
269
execute_type="request"))
270
self.channels[id].send(message)
271
272
def on_close(self):
273
while self.channels:
274
self.channels.popitem()[1].disconnect()
275
276
277
KernelRouter = sockjs.tornado.SockJSRouter(SockJSHandler, "/sockjs")
278
279
280
class TOSHandler(tornado.web.RequestHandler):
281
"""Handler for ``/tos.html``"""
282
tos = config.get("requires_tos")
283
if tos:
284
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static", "tos.html")
285
with open(path) as f:
286
tos_html = f.read()
287
tos_json = json.dumps(tos_html)
288
else:
289
tos_html = "No Terms of Service Required"
290
tos_json = json.dumps(tos_html)
291
292
def post(self):
293
if len(self.get_arguments("callback")) == 0:
294
if self.tos:
295
self.write(self.tos_html)
296
else:
297
self.set_status(204)
298
if "Origin" in self.request.headers:
299
self.set_header("Access-Control-Allow-Origin",
300
self.request.headers["Origin"])
301
self.set_header("Access-Control-Allow-Credentials", "true")
302
self.set_header("Content-Type", "text/html")
303
else:
304
resp = self.tos_json if self.tos else '""'
305
self.write("%s(%s);" % (self.get_argument("callback"), resp))
306
self.set_header("Content-Type", "application/javascript")
307
308
def get(self):
309
if self.tos:
310
self.write(self.tos_html)
311
else:
312
raise tornado.web.HTTPError(404, 'No Terms of Service Required')
313
314
315
class ServiceHandler(tornado.web.RequestHandler):
316
"""
317
Implements a blocking (to the client) web service to execute a single
318
computation the server. This should be non-blocking to Tornado.
319
320
The code to be executed is given in the code request parameter.
321
322
This handler is currently not production-ready. But it is used for health
323
checks...
324
"""
325
326
async def post(self):
327
if 'Origin' in self.request.headers:
328
self.set_header(
329
'Access-Control-Allow-Origin', self.request.headers['Origin'])
330
self.set_header('Access-Control-Allow-Credentials', 'true')
331
if (config.get('requires_tos')
332
and self.get_argument('accepted_tos', 'false') != 'true'):
333
self.set_status(403)
334
self.finish(
335
'When evaluating code, you must acknowledge your acceptance '
336
'of the terms of service at /static/tos.html by passing the '
337
'parameter accepted_tos=true\n')
338
return
339
code = ''.join(self.get_arguments('code', strip=False))
340
if len(code) > 65000:
341
self.set_status(413)
342
self.finish('Max code size is 65000 characters')
343
return
344
remote_ip = self.request.remote_ip
345
referer = self.request.headers.get('Referer', '')
346
self.kernel = await self.application.kernel_dealer.get_kernel(
347
rlimits=config.get("provider_settings")["preforked_rlimits"],
348
lifespan=config.get("max_lifespan"),
349
timeout=0)
350
sm = StatsMessage(
351
kernel_id=self.kernel.id,
352
remote_ip=remote_ip,
353
referer=referer,
354
code=code,
355
execute_type='service')
356
if remote_ip == '127.0.0.1':
357
stats_logger.debug(sm)
358
else:
359
stats_logger.info(sm)
360
self.zmq_handler = ZMQServiceHandler()
361
streams = self.zmq_handler.streams
362
self.zmq_handler.connect(self.kernel)
363
loop = tornado.ioloop.IOLoop.current()
364
365
def kernel_callback(msg):
366
if msg['msg_type'] == 'execute_reply':
367
loop.remove_timeout(self.timeout_handle)
368
streams['success'] = msg['content']['status'] == 'ok'
369
streams['execute_reply'] = msg['content']
370
if self.kernel.status == "idle" and 'success' in streams:
371
logger.debug('service request finished for %s', self.kernel.id)
372
loop.add_callback(self.finish_request)
373
374
self.zmq_handler.msg_from_kernel_callbacks.append(kernel_callback)
375
376
def timeout_callback():
377
logger.debug('service request timed out for %s', self.kernel.id)
378
self.kernel.stop()
379
self.zmq_handler.streams['success'] = False
380
loop.add_callback(self.finish_request)
381
382
self.timeout_handle = loop.call_later(30, timeout_callback)
383
exec_message = {
384
'channel': 'shell',
385
'parent_header': {},
386
'header': {
387
'msg_id': str(uuid.uuid4()),
388
'username': '',
389
'session': self.kernel.id,
390
'msg_type': 'execute_request',
391
},
392
'content': {
393
'code': code,
394
'silent': False,
395
'user_expressions':
396
jsonapi.loads(self.get_argument('user_expressions', '{}')),
397
'allow_stdin': False,
398
},
399
'metadata': {},
400
}
401
self.zmq_handler.send(exec_message)
402
self._auto_finish = False
403
404
def finish_request(self):
405
self.finish(self.zmq_handler.streams)
406
407
408
class ZMQChannelsHandler(object):
409
"""
410
This handles the websocket-ZMQ bridge to an IPython kernel.
411
412
It also handles the heartbeat (hb) stream that same kernel, but there is no
413
associated websocket connection. The websocket is instead used to notify
414
the client if the heartbeat stream fails.
415
"""
416
417
def _json_msg(self, msg):
418
"""
419
Converts a single message into a JSON string
420
"""
421
# can't encode buffers, so let's get rid of them if they exist
422
msg.pop("buffers", None)
423
# sage_json handles things like encoding dates and sage types
424
return jsonapi.dumps(msg, default=misc.sage_json)
425
426
def connect(self, kernel):
427
self.kernel = kernel
428
self.msg_from_kernel_callbacks = []
429
self.msg_to_kernel_callbacks = []
430
for channel in ["iopub", "shell"]:
431
kernel.channels[channel].on_recv_stream(self.on_recv)
432
kernel.on_stop(self.kernel_stopped)
433
434
def disconnect(self):
435
self.kernel.stop()
436
437
def kernel_stopped(self):
438
msg = {
439
"channel": "iopub",
440
'header': {
441
'msg_type': 'status',
442
'session': self.kernel.id,
443
'msg_id': str(uuid.uuid4()),
444
'username': ''
445
},
446
'parent_header': {},
447
'metadata': {},
448
'content': {'execution_state': 'dead'}
449
}
450
self.output_message(msg)
451
452
def on_recv(self, stream, msg_list):
453
kernel = self.kernel
454
msg_list = kernel.session.feed_identities(msg_list)[1]
455
msg = kernel.session.deserialize(msg_list)
456
msg["channel"] = stream.channel
457
# Useful but may be way too verbose even for debugging
458
#logger.debug("received from kernel %s", msg)
459
msg_type = msg["msg_type"]
460
if msg_type == "status":
461
kernel.status = msg["content"]["execution_state"]
462
if msg_type in ("execute_reply",
463
"sagenb.interact.update_interact_reply"):
464
kernel.executing -= 1
465
logger.debug("decreased execution counter for %s to %s",
466
kernel.id, kernel.executing)
467
if msg_type == "kernel_timeout":
468
timeout = float(msg["content"]["timeout"])
469
logger.debug("reset timeout for %s to %f", kernel.id, timeout)
470
if timeout >= 0:
471
kernel.timeout = min(timeout, config.get("max_timeout"))
472
else:
473
for callback in self.msg_from_kernel_callbacks:
474
callback(msg)
475
self.output_message(msg)
476
if kernel.timeout > 0:
477
kernel.deadline = time.time() + kernel.timeout
478
elif kernel.executing == 0 and kernel.status == "idle":
479
logger.debug("stopping on %s, %s", stream.channel, msg_type)
480
kernel.stop()
481
482
def send(self, msg):
483
# Useful but may be way too verbose even for debugging
484
#logger.debug("sending to kernel %s", msg)
485
for f in self.msg_to_kernel_callbacks:
486
f(msg)
487
kernel = self.kernel
488
if msg['header']['msg_type'] in ('execute_request',
489
'sagenb.interact.update_interact'):
490
kernel.executing += 1
491
logger.debug("increased execution counter for %s to %s",
492
kernel.id, kernel.executing)
493
kernel.session.send(kernel.channels["shell"], msg)
494
495
496
class ZMQServiceHandler(ZMQChannelsHandler):
497
498
def __init__(self):
499
super(ZMQServiceHandler, self).__init__()
500
self.streams = collections.defaultdict(str)
501
502
def output_message(self, msg):
503
if msg["channel"] == "iopub" and msg["header"]["msg_type"] == "stream":
504
self.streams[msg["content"]["name"]] += msg["content"]["text"]
505
506
507
class SockJSChannelsHandler(ZMQChannelsHandler):
508
509
def __init__(self, callback):
510
self.callback = callback
511
512
def output_message(self, msg):
513
self.callback(
514
"%s/channels,%s" % (self.kernel.id, self._json_msg(msg).decode()))
515
516
517
class WebChannelsHandler(ZMQChannelsHandler,
518
tornado.websocket.WebSocketHandler):
519
520
def on_close(self):
521
self.disconnect()
522
523
def on_message(self, msg):
524
self.send(jsonapi.loads(msg))
525
526
def open(self, kernel_id):
527
self.connect(self.application.kernel_dealer.kernel(kernel_id))
528
529
def output_message(self, msg):
530
self.write_message(self._json_msg(msg))
531
532
533
class StaticHandler(tornado.web.StaticFileHandler):
534
"""Handler for static requests"""
535
536
def set_extra_headers(self, path):
537
if "Origin" in self.request.headers:
538
self.set_header("Access-Control-Allow-Origin",
539
self.request.headers["Origin"])
540
self.set_header("Access-Control-Allow-Credentials", "true")
541
542
543
class FileHandler(StaticHandler):
544
"""
545
Files handler
546
547
This takes in a filename and returns the file
548
"""
549
550
def compute_etag(self):
551
# tornado.web.StaticFileHandler uses filenames for etag, but then
552
# updated user files get the same one even if recomputed in linked
553
# cells. Dropping etag still makes use of modification time.
554
return None
555
556
async def get(self, kernel_id, file_path):
557
await super(FileHandler, self).get('%s/%s'%(kernel_id, file_path))
558
559
def set_extra_headers(self, path):
560
super(FileHandler, self).set_extra_headers(path)
561
self.set_header('Cache-Control', 'no-cache')
562
563