Path: blob/main/src/execute/jupyter/jupyter-kernel.ts
6460 views
/*1* jupyter-kernel.ts2*3* Copyright (C) 2020-2022 Posit Software, PBC4*/56import { existsSync, safeRemoveSync } from "../../deno_ral/fs.ts";7import { join } from "../../deno_ral/path.ts";8import { error, info, warning } from "../../deno_ral/log.ts";910import { sleep } from "../../core/async.ts";11import {12JupyterCapabilities,13JupyterKernelspec,14} from "../../core/jupyter/types.ts";15import { getQuartoAPI } from "../../core/api/index.ts";16import type { ProcessResult } from "../../core/process-types.ts";1718import {19kExecuteDaemon,20kExecuteDaemonRestart,21kExecuteDebug,22} from "../../config/constants.ts";2324import { ExecuteOptions } from "../types.ts";25import { isWindows } from "../../deno_ral/platform.ts";2627export interface JupyterExecuteOptions extends ExecuteOptions {28kernelspec: JupyterKernelspec;29python_cmd: string[];30supervisor_pid?: number;31}3233export async function executeKernelOneshot(34options: JupyterExecuteOptions,35): Promise<void> {36// abort any existing keepalive kernel37await abortKernel(options);3839// execute the notebook (save back in place)40if (!options.quiet) {41messageStartingKernel(options.kernelspec);42}4344trace(options, "Executing notebook with oneshot kernel");45const debug = !!options.format.execute[kExecuteDebug] ||46(!!Deno.env.get("QUARTO_JUPYTER_DEBUG"));47const result = await execJupyter(48"execute",49{ ...options, debug },50options.kernelspec,51);5253if (!result.success) {54return Promise.reject();55}56}5758export async function executeKernelKeepalive(59options: JupyterExecuteOptions,60): Promise<void> {61// if we are in debug mode then tail follow the log file62let serverLogProcess: Deno.ChildProcess | undefined;63if (options.format.execute[kExecuteDebug]) {64if (!isWindows) {65serverLogProcess = new Deno.Command("tail", {66args: ["-F", "-n", "0", kernelLogFile()],67}).spawn();68}69}7071// if we have a restart request then abort before proceeding72if (options.format.execute[kExecuteDaemonRestart]) {73await abortKernel(options);74}7576trace(options, "Connecting to kernel");77const [conn, transport] = await connectToKernel(options);78trace(options, "Kernel connection successful");79try {80trace(options, "Sending execute command to kernel");81await writeKernelCommand(82conn,83"execute",84transport.secret,85{ ...options },86);87trace(options, "Execute command sent, reading response");88let leftover = "";89while (true) {90const buffer = new Uint8Array(512);9192const bytesRead = await conn.read(buffer);93if (bytesRead === null) {94break;95}9697if (bytesRead > 0) {98const payload = new TextDecoder().decode(99buffer.slice(0, bytesRead),100);101102const jsonMessages = payload.split("\n");103104for (let jsonMessage of jsonMessages) {105if (!jsonMessage) {106continue;107}108if (leftover) {109jsonMessage = leftover + jsonMessage;110leftover = "";111}112try {113const msg: { type: string; data: string } = JSON.parse(114jsonMessage,115);116if (msg.type === "error") {117trace(options, "Error response received");118error(msg.data, { colorize: false });119printExecDiagnostics(options.kernelspec, msg.data);120return Promise.reject();121} else if (msg.type == "restart") {122trace(options, "Restart request received");123return executeKernelKeepalive(options);124} else {125info(msg.data, { newline: false });126}127} catch {128leftover = jsonMessage;129}130}131}132}133trace(options, "Server request complete\n\n");134} catch (e) {135trace(options, "Error occurred receiving response from server");136// likely this is not our server! (as it's not producing/consuming the expected json)137// in that case remove the connection file and re-throw the exception138const transportFile = kernelTransportFile(options.target.input);139if (existsSync(transportFile)) {140safeRemoveSync(transportFile);141}142throw e;143} finally {144conn.close();145146serverLogProcess?.kill("SIGKILL");147}148}149150async function abortKernel(options: JupyterExecuteOptions) {151// connect to kernel if it exists and send abort command152try {153trace(options, "Checking for existing kernel");154const [conn, transport] = await connectToKernel(options, false);155trace(options, "Existing kernel found");156try {157trace(options, "Sending kernel abort request");158await writeKernelCommand(conn, "abort", transport.secret, {});159trace(options, "Abort request successful");160} finally {161const transportFile = kernelTransportFile(options.target.input);162if (existsSync(transportFile)) {163safeRemoveSync(transportFile);164}165conn.close();166}167} catch {168trace(options, "No existing kernel found");169}170}171172async function execJupyter(173command: string,174options: Record<string, unknown>,175kernelspec: JupyterKernelspec,176): Promise<ProcessResult> {177const quarto = getQuartoAPI();178try {179const cmd = await quarto.jupyter.pythonExec(kernelspec);180const result = await quarto.system.execProcess(181{182cmd: cmd[0],183args: [184...cmd.slice(1),185quarto.path.resource("jupyter", "jupyter.py"),186],187env: {188// Force default matplotlib backend. something simillar is done here:189// https://github.com/ipython/ipykernel/blob/d7339c2c70115bbe6042880d29eeb273b5a2e350/ipykernel/kernelapp.py#L549-L554190// however this respects existing environment variables, which we've seen in at least191// one case result in an inability to render due to the iTerm2 backend being configured192// (see https://github.com/quarto-dev/quarto-cli/issues/502). Our current position is193// that the way to use a different backend w/ Quarto is to call the matplotlib.use()194// function within the notebook195"MPLBACKEND": "module://matplotlib_inline.backend_inline",196"PYDEVD_DISABLE_FILE_VALIDATION": "1",197},198stdout: "piped",199},200kernelCommand(command, "", options),201);202if (!result.success) {203// forward error (print some diagnostics if python and/or jupyter couldn't be found)204await printExecDiagnostics(kernelspec, result.stderr);205}206return result;207} catch (e) {208if (!(e instanceof Error)) throw e;209if (e?.message) {210info("");211error(e.message);212}213await printExecDiagnostics(kernelspec);214return Promise.reject();215}216}217218export async function printExecDiagnostics(219kernelspec: JupyterKernelspec,220stderr?: string,221) {222const quarto = getQuartoAPI();223const caps = await quarto.jupyter.capabilities(kernelspec);224if (caps && !caps.jupyter_core) {225info("Python 3 installation:");226info(quarto.jupyter.capabilitiesMessage(caps, " "));227info("");228info(quarto.jupyter.installationMessage(caps));229info("");230maybePrintUnactivatedEnvMessage(caps);231} else if (caps && !haveRequiredPython(caps)) {232info(pythonVersionMessage());233info(quarto.jupyter.capabilitiesMessage(caps, " "));234} else if (!caps) {235info(quarto.jupyter.pythonInstallationMessage());236info("");237} else if (stderr && (stderr.indexOf("ModuleNotFoundError") !== -1)) {238maybePrintUnactivatedEnvMessage(caps);239}240}241242function haveRequiredPython(caps: JupyterCapabilities) {243return caps.versionMajor >= 3 && caps.versionMinor >= 6;244}245246function pythonVersionMessage() {247return `Quarto requires Python version 3.6 (or greater). Detected version is:`;248}249250function maybePrintUnactivatedEnvMessage(caps: JupyterCapabilities) {251const quarto = getQuartoAPI();252const envMessage = quarto.jupyter.unactivatedEnvMessage(caps);253if (envMessage) {254info(envMessage);255info("");256}257}258259async function writeKernelCommand(260conn: Deno.Conn,261command: string,262secret: string,263options: Record<string, unknown>,264) {265let messageBytes = new TextEncoder().encode(266kernelCommand(command, secret, options) + "\n",267);268269// don't send the message if it's big.270// Instead, write it to a file and send the file path271// This is disappointing, but something is deeply wrong with Deno.Conn:272// https://github.com/quarto-dev/quarto-cli/issues/7737#issuecomment-1830665357273if (messageBytes.length > 1024) {274const tempFile = Deno.makeTempFileSync();275Deno.writeFileSync(tempFile, messageBytes);276const msg = kernelCommand("file", secret, { file: tempFile }) + "\n";277messageBytes = new TextEncoder().encode(msg);278}279280const bytesWritten = await conn.write(messageBytes);281if (bytesWritten !== messageBytes.length) {282throw new Error("Internal Error");283}284}285286function kernelCommand(287command: string,288secret: string,289options: Record<string, unknown>,290) {291return JSON.stringify(292{ command, secret, options: { ...options, log: kernelLogFile() } },293);294}295296interface KernelTransport {297port: number | string;298secret: string;299type: "tcp" | "unix";300}301302function kernelTransportFile(target: string) {303const quarto = getQuartoAPI();304let transportsDir: string;305306try {307transportsDir = quarto.path.runtime("jt");308} catch (e) {309console.error("Could not create runtime directory for jupyter transport.");310console.error(311"This is possibly a permission issue in the environment Quarto is running in.",312);313console.error(314"Please consult the following documentation for more information:",315);316console.error(317"https://github.com/quarto-dev/quarto-cli/issues/4594#issuecomment-1619177667",318);319throw e;320}321const targetFile = quarto.path.absolute(target);322const hash = quarto.crypto.md5Hash(targetFile).slice(0, 20);323return join(transportsDir, hash);324}325326function kernelLogFile() {327const quarto = getQuartoAPI();328const logsDir = quarto.path.dataDir("logs");329const kernelLog = join(logsDir, "jupyter-kernel.log");330if (!existsSync(kernelLog)) {331Deno.writeTextFileSync(kernelLog, "");332}333return kernelLog;334}335336function readKernelTransportFile(337transportFile: string,338type: "tcp" | "unix",339): KernelTransport | null {340if (existsSync(transportFile)) {341if (type === "tcp") {342try {343const transport = JSON.parse(Deno.readTextFileSync(transportFile));344if (transport.port && transport.secret) {345return {346...transport,347type,348};349} else {350throw new Error("Invalid file format");351}352} catch (e) {353if (!(e instanceof Error)) throw e;354error(355"Error reading kernel transport file: " + e.toString() +356"(removing file)",357);358safeRemoveSync(transportFile);359return null;360}361} else {362return {363port: transportFile,364secret: "",365type,366};367}368} else {369return null;370}371}372373async function connectToKernel(374options: JupyterExecuteOptions,375startIfRequired = true,376): Promise<[Deno.Conn, KernelTransport]> {377// see if we are in debug mode378const debug = !!options.format.execute[kExecuteDebug];379380// derive the file path for this connection381const transportFile = kernelTransportFile(options.target.input);382383// determine connection type -- for now we are going to *always* use tcp because we observed384// periodic hanging on osx with attempting to connect to domain sockets. note also that we385// have to fall back to tcp anyway when transportFile path is > 100, see here for details:386// https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars387// note also that the entire preview subsystem requires the ability to bind to tcp ports388// so this isn't really taking us into new compatibility waters389/*390const type = isWindows || transportFile.length >= 100391? "tcp"392: "unix";393*/394const type = "tcp";395396// get the transport397const transport = readKernelTransportFile(transportFile, type);398399// if there is a transport then try to connect to it400if (transport) {401try {402return await denoConnectToKernel(transport);403} catch {404// remove the transport file405if (existsSync(transportFile)) {406safeRemoveSync(transportFile);407}408}409}410411// we are done if there is no startIfRequired request412if (!startIfRequired) {413return Promise.reject();414}415416// start the kernel417if (!options.quiet) {418messageStartingKernel(options.kernelspec);419}420421// determine timeout422const kDefaultTimeout = 300;423const keepAlive = options.format.execute[kExecuteDaemon];424const timeout =425keepAlive === true || keepAlive === null || keepAlive === undefined426? kDefaultTimeout427: keepAlive === false428? 0429: keepAlive;430431// try to start the server432const result = await execJupyter("start", {433transport: transportFile,434timeout,435type,436debug,437}, options.kernelspec);438if (!result.success) {439return Promise.reject();440}441442// poll for the transport file and connect once we have it443for (let i = 1; i < 20; i++) {444await sleep(i * 100);445const kernelTransport = readKernelTransportFile(transportFile, type);446if (kernelTransport) {447try {448return await denoConnectToKernel(kernelTransport);449} catch (e) {450if (!(e instanceof Error)) throw e;451// remove the transport file452safeRemoveSync(transportFile);453error("Error connecting to Jupyter kernel: " + e.toString());454return Promise.reject();455}456}457}458459warning("Unable to start Jupyter kernel for " + options.target.input);460return Promise.reject();461}462463async function denoConnectToKernel(464transport: KernelTransport,465): Promise<[Deno.Conn, KernelTransport]> {466if (transport.type === "tcp") {467const tcpConnectOptions = {468transport: transport.type,469hostname: "127.0.0.1",470port: transport.port as number,471};472return [473await Deno.connect(474tcpConnectOptions,475),476transport,477];478} else {479const unixConnectOptions = {480transport: transport.type,481path: transport.port as string,482};483return [484await Deno.connect(485unixConnectOptions,486),487transport,488];489}490}491492function messageStartingKernel(kernelspec: JupyterKernelspec) {493info(`\nStarting ${kernelspec.name} kernel...`, { newline: false });494}495496function trace(options: ExecuteOptions, msg: string) {497if (options.format.execute[kExecuteDebug]) {498info("- " + msg, { bold: true });499}500}501502503