Path: blob/main/src/resources/jupyter/jupyter.py
12921 views
import os1import sys2import re3import json4import stat5import pprint6import uuid7import signal8import subprocess910from socketserver import TCPServer, StreamRequestHandler1112try:13from socketserver import UnixStreamServer14except:15pass161718from log import log_init, log, log_error, trace19from notebook import notebook_execute, RestartKernel20from nbclient.exceptions import CellExecutionError2122import asyncio2324if sys.platform == "win32":25from asyncio.windows_events import *2627asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())282930class ExecuteHandler(StreamRequestHandler):31def handle(self):32try:33trace("handling server request")3435# read input36input = str(self.rfile.readline().strip(), "utf-8")37input = json.loads(input)3839# validate secret40if not self.server.validate_secret(input["secret"]):41trace("invalid secret (exiting server)")42self.server.request_exit()43return44if input["command"] == "file":45filename = input["options"]["file"]46input = json.load(open(filename, "r", encoding="utf8"))47os.unlink(filename)4849# if this is an abort command then request exit50command = input["command"]51if command == "abort":52trace("abort command received (exiting server)")53self.server.request_exit()54return5556# options57options = input["options"]5859# stream status back to client60def status(msg):61self.message("status", msg)6263# execute the notebook64trace("executing notebook")65persist = notebook_execute(options, status)66if not persist:67trace("notebook not persistable (exiting server)")68self.server.request_exit()69else:70self.server.record_success()71except RestartKernel:72trace("notebook restart request recived (exiting server)")73self.message("restart")74self.server.request_exit()75except Exception as e:76self.message("error", "\n\n" + str(e))77self.server.record_error(e)7879# write a message back to the client80def message(self, type, data=""):81message = {"type": type, "data": data}82self.wfile.write(bytearray(json.dumps(message) + "\n", "utf-8"))83self.wfile.flush()848586def execute_server(options):87# determine server type88is_tcp = options["type"] == "tcp"89if is_tcp:90base = TCPServer91else:92base = UnixStreamServer9394class ExecuteServer(base):95allow_reuse_address = True96exit_pending = False97consecutive_errors = 09899def __init__(self, options):100trace(101"creating notebook server ("102+ options["type"]103+ ": "104+ options["transport"]105+ ")"106)107108# set secret for tcp109if is_tcp:110self.secret = str(uuid.uuid4())111else:112self.secret = ""113114# server params115self.transport = options["transport"]116self.timeout = options["timeout"]117118# initialize with address (based on server type) and handler119if is_tcp:120server_address = ("localhost", 0)121else:122server_address = self.transport123super().__init__(server_address, ExecuteHandler)124125# if we are a tcp server then get the port number and write it126# to the transport file. change file permissions to user r/w127# for both tcp and unix domain sockets128if is_tcp:129port = self.socket.getsockname()[1]130trace("notebook server bound to port " + str(port))131with open(self.transport, "w") as file:132file.write("")133os.chmod(self.transport, stat.S_IRUSR | stat.S_IWUSR)134with open(self.transport, "w") as file:135file.write(json.dumps(dict({"port": port, "secret": self.secret})))136else:137os.chmod(self.transport, stat.S_IRUSR | stat.S_IWUSR)138139def handle_request(self):140if self.exit_pending:141self.exit()142super().handle_request()143144def handle_timeout(self):145trace("request timeout (exiting server)")146self.exit()147148def validate_secret(self, secret):149return self.secret == secret150151def record_success(self):152self.consecutive_errors = 0153154def record_error(self, e):155# exit for 5 consecutive errors156self.consecutive_errors += 1157if self.consecutive_errors >= 5:158self.exit()159160def request_exit(self):161self.exit_pending = True162163def exit(self):164try:165trace("cleaning up server resources")166self.server_close()167self.remove_transport()168169finally:170trace("exiting server")171sys.exit(0)172173def remove_transport(self):174try:175if os.path.exists(self.transport):176os.remove(self.transport)177except:178pass179180return ExecuteServer(options)181182183def run_server(options):184try:185with execute_server(options) as server:186while True:187server.handle_request()188except Exception as e:189log_error("Unable to run server", exc_info=e)190191192# run a server as a detached subprocess193def run_server_subprocess(options, status):194# python executable195python_exe = sys.executable196197# detached process flags for windows198flags = 0199if sys.platform == "win32":200python_exe = re.sub("python\\.exe$", "pythonw.exe", python_exe)201flags |= 0x00000008 # DETACHED_PROCESS202flags |= 0x00000200 # CREATE_NEW_PROCESS_GROUP203flags |= 0x08000000 # CREATE_NO_WINDOW204flags |= 0x01000000 # CREATE_BREAKAWAY_FROM_JOB205206# forward options via env vars207os.environ["QUARTO_JUPYTER_OPTIONS"] = json.dumps(options)208209# create subprocess210subprocess.Popen(211[python_exe] + sys.argv + ["serve"],212stdin=subprocess.DEVNULL,213stdout=subprocess.DEVNULL,214stderr=subprocess.DEVNULL,215creationflags=flags,216close_fds=True,217start_new_session=True,218)219220221# run a notebook directly (not a server)222def run_notebook(options, status):223# run notebook w/ some special exception handling. note that we don't224# log exceptions here b/c they are considered normal course of execution225# for errors that occur in notebook cells226try:227trace("Running notebook_execute")228notebook_execute(options, status)229except Exception as e:230trace(f"run_notebook caught exception: {type(e).__name__}")231# CellExecutionError for execution at the terminal includes a bunch232# of extra stack frames internal to this script. remove them233msg = str(e)234kCellExecutionError = "nbclient.exceptions.CellExecutionError: "235loc = msg.find(kCellExecutionError)236if loc != -1:237msg = msg[loc + len(kCellExecutionError) :]238status("\n\n" + msg + "\n")239sys.exit(1)240241242if __name__ == "__main__":243# stream status to stderr244def status(msg):245sys.stderr.write(msg)246sys.stderr.flush()247248try:249# read command from cmd line if it's there (in that case250# options are passed via environment variable)251if len(sys.argv) > 1:252command = sys.argv[1]253options = json.loads(os.getenv("QUARTO_JUPYTER_OPTIONS"))254del os.environ["QUARTO_JUPYTER_OPTIONS"]255# otherwise read from stdin256else:257sys.stdin.reconfigure(encoding='utf-8')258input = json.load(sys.stdin)259command = input["command"]260options = input["options"]261262# initialize log263log_init(options["log"], options["debug"])264265# start the server (creates a new detached process, we implement this here266# only b/c Deno doesn't currently support detaching spawned processes)267if command == "start":268trace("starting notebook server subprocess")269run_server_subprocess(options, status)270271# serve a notebook (invoked by run_server_subprocess)272elif command == "serve":273trace("running notebook server subprocess")274run_server(options)275276# execute a notebook and then quit277elif command == "execute":278trace("running notebook without keepalive")279run_notebook(options, status)280281except Exception as e:282log_error("Unable to run notebook", exc_info=e)283sys.exit(1)284285286