Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
quarto-dev
GitHub Repository: quarto-dev/quarto-cli
Path: blob/main/src/execute/jupyter/jupyter-kernel.ts
6460 views
1
/*
2
* jupyter-kernel.ts
3
*
4
* Copyright (C) 2020-2022 Posit Software, PBC
5
*/
6
7
import { existsSync, safeRemoveSync } from "../../deno_ral/fs.ts";
8
import { join } from "../../deno_ral/path.ts";
9
import { error, info, warning } from "../../deno_ral/log.ts";
10
11
import { sleep } from "../../core/async.ts";
12
import {
13
JupyterCapabilities,
14
JupyterKernelspec,
15
} from "../../core/jupyter/types.ts";
16
import { getQuartoAPI } from "../../core/api/index.ts";
17
import type { ProcessResult } from "../../core/process-types.ts";
18
19
import {
20
kExecuteDaemon,
21
kExecuteDaemonRestart,
22
kExecuteDebug,
23
} from "../../config/constants.ts";
24
25
import { ExecuteOptions } from "../types.ts";
26
import { isWindows } from "../../deno_ral/platform.ts";
27
28
export interface JupyterExecuteOptions extends ExecuteOptions {
29
kernelspec: JupyterKernelspec;
30
python_cmd: string[];
31
supervisor_pid?: number;
32
}
33
34
export async function executeKernelOneshot(
35
options: JupyterExecuteOptions,
36
): Promise<void> {
37
// abort any existing keepalive kernel
38
await abortKernel(options);
39
40
// execute the notebook (save back in place)
41
if (!options.quiet) {
42
messageStartingKernel(options.kernelspec);
43
}
44
45
trace(options, "Executing notebook with oneshot kernel");
46
const debug = !!options.format.execute[kExecuteDebug] ||
47
(!!Deno.env.get("QUARTO_JUPYTER_DEBUG"));
48
const result = await execJupyter(
49
"execute",
50
{ ...options, debug },
51
options.kernelspec,
52
);
53
54
if (!result.success) {
55
return Promise.reject();
56
}
57
}
58
59
export async function executeKernelKeepalive(
60
options: JupyterExecuteOptions,
61
): Promise<void> {
62
// if we are in debug mode then tail follow the log file
63
let serverLogProcess: Deno.ChildProcess | undefined;
64
if (options.format.execute[kExecuteDebug]) {
65
if (!isWindows) {
66
serverLogProcess = new Deno.Command("tail", {
67
args: ["-F", "-n", "0", kernelLogFile()],
68
}).spawn();
69
}
70
}
71
72
// if we have a restart request then abort before proceeding
73
if (options.format.execute[kExecuteDaemonRestart]) {
74
await abortKernel(options);
75
}
76
77
trace(options, "Connecting to kernel");
78
const [conn, transport] = await connectToKernel(options);
79
trace(options, "Kernel connection successful");
80
try {
81
trace(options, "Sending execute command to kernel");
82
await writeKernelCommand(
83
conn,
84
"execute",
85
transport.secret,
86
{ ...options },
87
);
88
trace(options, "Execute command sent, reading response");
89
let leftover = "";
90
while (true) {
91
const buffer = new Uint8Array(512);
92
93
const bytesRead = await conn.read(buffer);
94
if (bytesRead === null) {
95
break;
96
}
97
98
if (bytesRead > 0) {
99
const payload = new TextDecoder().decode(
100
buffer.slice(0, bytesRead),
101
);
102
103
const jsonMessages = payload.split("\n");
104
105
for (let jsonMessage of jsonMessages) {
106
if (!jsonMessage) {
107
continue;
108
}
109
if (leftover) {
110
jsonMessage = leftover + jsonMessage;
111
leftover = "";
112
}
113
try {
114
const msg: { type: string; data: string } = JSON.parse(
115
jsonMessage,
116
);
117
if (msg.type === "error") {
118
trace(options, "Error response received");
119
error(msg.data, { colorize: false });
120
printExecDiagnostics(options.kernelspec, msg.data);
121
return Promise.reject();
122
} else if (msg.type == "restart") {
123
trace(options, "Restart request received");
124
return executeKernelKeepalive(options);
125
} else {
126
info(msg.data, { newline: false });
127
}
128
} catch {
129
leftover = jsonMessage;
130
}
131
}
132
}
133
}
134
trace(options, "Server request complete\n\n");
135
} catch (e) {
136
trace(options, "Error occurred receiving response from server");
137
// likely this is not our server! (as it's not producing/consuming the expected json)
138
// in that case remove the connection file and re-throw the exception
139
const transportFile = kernelTransportFile(options.target.input);
140
if (existsSync(transportFile)) {
141
safeRemoveSync(transportFile);
142
}
143
throw e;
144
} finally {
145
conn.close();
146
147
serverLogProcess?.kill("SIGKILL");
148
}
149
}
150
151
async function abortKernel(options: JupyterExecuteOptions) {
152
// connect to kernel if it exists and send abort command
153
try {
154
trace(options, "Checking for existing kernel");
155
const [conn, transport] = await connectToKernel(options, false);
156
trace(options, "Existing kernel found");
157
try {
158
trace(options, "Sending kernel abort request");
159
await writeKernelCommand(conn, "abort", transport.secret, {});
160
trace(options, "Abort request successful");
161
} finally {
162
const transportFile = kernelTransportFile(options.target.input);
163
if (existsSync(transportFile)) {
164
safeRemoveSync(transportFile);
165
}
166
conn.close();
167
}
168
} catch {
169
trace(options, "No existing kernel found");
170
}
171
}
172
173
async function execJupyter(
174
command: string,
175
options: Record<string, unknown>,
176
kernelspec: JupyterKernelspec,
177
): Promise<ProcessResult> {
178
const quarto = getQuartoAPI();
179
try {
180
const cmd = await quarto.jupyter.pythonExec(kernelspec);
181
const result = await quarto.system.execProcess(
182
{
183
cmd: cmd[0],
184
args: [
185
...cmd.slice(1),
186
quarto.path.resource("jupyter", "jupyter.py"),
187
],
188
env: {
189
// Force default matplotlib backend. something simillar is done here:
190
// https://github.com/ipython/ipykernel/blob/d7339c2c70115bbe6042880d29eeb273b5a2e350/ipykernel/kernelapp.py#L549-L554
191
// however this respects existing environment variables, which we've seen in at least
192
// one case result in an inability to render due to the iTerm2 backend being configured
193
// (see https://github.com/quarto-dev/quarto-cli/issues/502). Our current position is
194
// that the way to use a different backend w/ Quarto is to call the matplotlib.use()
195
// function within the notebook
196
"MPLBACKEND": "module://matplotlib_inline.backend_inline",
197
"PYDEVD_DISABLE_FILE_VALIDATION": "1",
198
},
199
stdout: "piped",
200
},
201
kernelCommand(command, "", options),
202
);
203
if (!result.success) {
204
// forward error (print some diagnostics if python and/or jupyter couldn't be found)
205
await printExecDiagnostics(kernelspec, result.stderr);
206
}
207
return result;
208
} catch (e) {
209
if (!(e instanceof Error)) throw e;
210
if (e?.message) {
211
info("");
212
error(e.message);
213
}
214
await printExecDiagnostics(kernelspec);
215
return Promise.reject();
216
}
217
}
218
219
export async function printExecDiagnostics(
220
kernelspec: JupyterKernelspec,
221
stderr?: string,
222
) {
223
const quarto = getQuartoAPI();
224
const caps = await quarto.jupyter.capabilities(kernelspec);
225
if (caps && !caps.jupyter_core) {
226
info("Python 3 installation:");
227
info(quarto.jupyter.capabilitiesMessage(caps, " "));
228
info("");
229
info(quarto.jupyter.installationMessage(caps));
230
info("");
231
maybePrintUnactivatedEnvMessage(caps);
232
} else if (caps && !haveRequiredPython(caps)) {
233
info(pythonVersionMessage());
234
info(quarto.jupyter.capabilitiesMessage(caps, " "));
235
} else if (!caps) {
236
info(quarto.jupyter.pythonInstallationMessage());
237
info("");
238
} else if (stderr && (stderr.indexOf("ModuleNotFoundError") !== -1)) {
239
maybePrintUnactivatedEnvMessage(caps);
240
}
241
}
242
243
function haveRequiredPython(caps: JupyterCapabilities) {
244
return caps.versionMajor >= 3 && caps.versionMinor >= 6;
245
}
246
247
function pythonVersionMessage() {
248
return `Quarto requires Python version 3.6 (or greater). Detected version is:`;
249
}
250
251
function maybePrintUnactivatedEnvMessage(caps: JupyterCapabilities) {
252
const quarto = getQuartoAPI();
253
const envMessage = quarto.jupyter.unactivatedEnvMessage(caps);
254
if (envMessage) {
255
info(envMessage);
256
info("");
257
}
258
}
259
260
async function writeKernelCommand(
261
conn: Deno.Conn,
262
command: string,
263
secret: string,
264
options: Record<string, unknown>,
265
) {
266
let messageBytes = new TextEncoder().encode(
267
kernelCommand(command, secret, options) + "\n",
268
);
269
270
// don't send the message if it's big.
271
// Instead, write it to a file and send the file path
272
// This is disappointing, but something is deeply wrong with Deno.Conn:
273
// https://github.com/quarto-dev/quarto-cli/issues/7737#issuecomment-1830665357
274
if (messageBytes.length > 1024) {
275
const tempFile = Deno.makeTempFileSync();
276
Deno.writeFileSync(tempFile, messageBytes);
277
const msg = kernelCommand("file", secret, { file: tempFile }) + "\n";
278
messageBytes = new TextEncoder().encode(msg);
279
}
280
281
const bytesWritten = await conn.write(messageBytes);
282
if (bytesWritten !== messageBytes.length) {
283
throw new Error("Internal Error");
284
}
285
}
286
287
function kernelCommand(
288
command: string,
289
secret: string,
290
options: Record<string, unknown>,
291
) {
292
return JSON.stringify(
293
{ command, secret, options: { ...options, log: kernelLogFile() } },
294
);
295
}
296
297
interface KernelTransport {
298
port: number | string;
299
secret: string;
300
type: "tcp" | "unix";
301
}
302
303
function kernelTransportFile(target: string) {
304
const quarto = getQuartoAPI();
305
let transportsDir: string;
306
307
try {
308
transportsDir = quarto.path.runtime("jt");
309
} catch (e) {
310
console.error("Could not create runtime directory for jupyter transport.");
311
console.error(
312
"This is possibly a permission issue in the environment Quarto is running in.",
313
);
314
console.error(
315
"Please consult the following documentation for more information:",
316
);
317
console.error(
318
"https://github.com/quarto-dev/quarto-cli/issues/4594#issuecomment-1619177667",
319
);
320
throw e;
321
}
322
const targetFile = quarto.path.absolute(target);
323
const hash = quarto.crypto.md5Hash(targetFile).slice(0, 20);
324
return join(transportsDir, hash);
325
}
326
327
function kernelLogFile() {
328
const quarto = getQuartoAPI();
329
const logsDir = quarto.path.dataDir("logs");
330
const kernelLog = join(logsDir, "jupyter-kernel.log");
331
if (!existsSync(kernelLog)) {
332
Deno.writeTextFileSync(kernelLog, "");
333
}
334
return kernelLog;
335
}
336
337
function readKernelTransportFile(
338
transportFile: string,
339
type: "tcp" | "unix",
340
): KernelTransport | null {
341
if (existsSync(transportFile)) {
342
if (type === "tcp") {
343
try {
344
const transport = JSON.parse(Deno.readTextFileSync(transportFile));
345
if (transport.port && transport.secret) {
346
return {
347
...transport,
348
type,
349
};
350
} else {
351
throw new Error("Invalid file format");
352
}
353
} catch (e) {
354
if (!(e instanceof Error)) throw e;
355
error(
356
"Error reading kernel transport file: " + e.toString() +
357
"(removing file)",
358
);
359
safeRemoveSync(transportFile);
360
return null;
361
}
362
} else {
363
return {
364
port: transportFile,
365
secret: "",
366
type,
367
};
368
}
369
} else {
370
return null;
371
}
372
}
373
374
async function connectToKernel(
375
options: JupyterExecuteOptions,
376
startIfRequired = true,
377
): Promise<[Deno.Conn, KernelTransport]> {
378
// see if we are in debug mode
379
const debug = !!options.format.execute[kExecuteDebug];
380
381
// derive the file path for this connection
382
const transportFile = kernelTransportFile(options.target.input);
383
384
// determine connection type -- for now we are going to *always* use tcp because we observed
385
// periodic hanging on osx with attempting to connect to domain sockets. note also that we
386
// have to fall back to tcp anyway when transportFile path is > 100, see here for details:
387
// https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
388
// note also that the entire preview subsystem requires the ability to bind to tcp ports
389
// so this isn't really taking us into new compatibility waters
390
/*
391
const type = isWindows || transportFile.length >= 100
392
? "tcp"
393
: "unix";
394
*/
395
const type = "tcp";
396
397
// get the transport
398
const transport = readKernelTransportFile(transportFile, type);
399
400
// if there is a transport then try to connect to it
401
if (transport) {
402
try {
403
return await denoConnectToKernel(transport);
404
} catch {
405
// remove the transport file
406
if (existsSync(transportFile)) {
407
safeRemoveSync(transportFile);
408
}
409
}
410
}
411
412
// we are done if there is no startIfRequired request
413
if (!startIfRequired) {
414
return Promise.reject();
415
}
416
417
// start the kernel
418
if (!options.quiet) {
419
messageStartingKernel(options.kernelspec);
420
}
421
422
// determine timeout
423
const kDefaultTimeout = 300;
424
const keepAlive = options.format.execute[kExecuteDaemon];
425
const timeout =
426
keepAlive === true || keepAlive === null || keepAlive === undefined
427
? kDefaultTimeout
428
: keepAlive === false
429
? 0
430
: keepAlive;
431
432
// try to start the server
433
const result = await execJupyter("start", {
434
transport: transportFile,
435
timeout,
436
type,
437
debug,
438
}, options.kernelspec);
439
if (!result.success) {
440
return Promise.reject();
441
}
442
443
// poll for the transport file and connect once we have it
444
for (let i = 1; i < 20; i++) {
445
await sleep(i * 100);
446
const kernelTransport = readKernelTransportFile(transportFile, type);
447
if (kernelTransport) {
448
try {
449
return await denoConnectToKernel(kernelTransport);
450
} catch (e) {
451
if (!(e instanceof Error)) throw e;
452
// remove the transport file
453
safeRemoveSync(transportFile);
454
error("Error connecting to Jupyter kernel: " + e.toString());
455
return Promise.reject();
456
}
457
}
458
}
459
460
warning("Unable to start Jupyter kernel for " + options.target.input);
461
return Promise.reject();
462
}
463
464
async function denoConnectToKernel(
465
transport: KernelTransport,
466
): Promise<[Deno.Conn, KernelTransport]> {
467
if (transport.type === "tcp") {
468
const tcpConnectOptions = {
469
transport: transport.type,
470
hostname: "127.0.0.1",
471
port: transport.port as number,
472
};
473
return [
474
await Deno.connect(
475
tcpConnectOptions,
476
),
477
transport,
478
];
479
} else {
480
const unixConnectOptions = {
481
transport: transport.type,
482
path: transport.port as string,
483
};
484
return [
485
await Deno.connect(
486
unixConnectOptions,
487
),
488
transport,
489
];
490
}
491
}
492
493
function messageStartingKernel(kernelspec: JupyterKernelspec) {
494
info(`\nStarting ${kernelspec.name} kernel...`, { newline: false });
495
}
496
497
function trace(options: ExecuteOptions, msg: string) {
498
if (options.format.execute[kExecuteDebug]) {
499
info("- " + msg, { bold: true });
500
}
501
}
502
503