CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/pool/pool.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2023 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Launching and managing Jupyter kernels in a pool for
8
performance.
9
*/
10
11
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
12
import { delay } from "awaiting";
13
import json from "json-stable-stringify";
14
import nodeCleanup from "node-cleanup";
15
import { unlinkSync } from "node:fs";
16
import { mkdir, readFile, writeFile } from "node:fs/promises";
17
import getLogger from "@cocalc/backend/logger";
18
import createChdirCommand from "@cocalc/util/jupyter-api/chdir-commands";
19
import createSetenvCommand from "@cocalc/util/jupyter-api/setenv-commands";
20
import { exists, unlink } from "@cocalc/backend/misc/async-utils-node";
21
import { getLanguage } from "@cocalc/jupyter/kernel/kernel-data";
22
import launchJupyterKernelNoPool, {
23
LaunchJupyterOpts,
24
SpawnedKernel,
25
} from "@cocalc/jupyter/kernel/launch-kernel";
26
import {
27
getConfig,
28
getConfigDir,
29
getLaunchDelayMS,
30
getSize,
31
getTimeoutS,
32
} from "./pool-params";
33
import { getAbsolutePathFromHome } from "@cocalc/jupyter/util/fs";
34
35
// any kernel name whose lowercase representation contains one of these strings
36
// will never use the pool. See https://github.com/sagemathinc/cocalc/issues/7041
37
const BLACKLIST = ["julia"];
38
39
function isBlacklisted(kernel: string) {
40
const s = kernel.toLowerCase();
41
for (const n of BLACKLIST) {
42
if (s.includes(n)) {
43
return true;
44
}
45
}
46
return false;
47
}
48
49
export type { LaunchJupyterOpts, SpawnedKernel };
50
51
const log = getLogger("jupyter:pool");
52
53
async function writeConfig(content: string): Promise<void> {
54
try {
55
// harmless to call if dir already exists
56
await mkdir(getConfigDir(), { recursive: true });
57
await writeFile(getConfig(), content);
58
} catch (error) {
59
log.debug("Error writeConfig -- ", error);
60
}
61
}
62
63
async function readConfig(): Promise<string> {
64
return (await readFile(getConfig())).toString();
65
}
66
67
const POOL: { [key: string]: SpawnedKernel[] } = {};
68
const EXPIRE: { [key: string]: number } = {};
69
70
// Make key for cache that describes this kernel. We explicitly omit
71
// the parameters that aren't generic and would make it not possible to
72
// put this in a pool:
73
// - opts.cwd : current working directory
74
function makeKey({ name, opts }) {
75
// Copy of opts but delete opts.cwd and opts.env.COCALC_JUPYTER_FILENAME.
76
// We don't change opts though!
77
const opts0 = { ...opts };
78
delete opts0.cwd;
79
opts0.env = { ...opts.env };
80
delete opts0.env.COCALC_JUPYTER_FILENAME;
81
return json({ name, opts: opts0 });
82
}
83
84
export default async function launchJupyterKernel(
85
name: string, // name of the kernel
86
opts: LaunchJupyterOpts,
87
size_arg?: number, // min number of these in the pool
88
timeout_s_arg?: number,
89
): Promise<SpawnedKernel> {
90
const size: number = size_arg ?? getSize();
91
const timeout_s: number = timeout_s_arg ?? getTimeoutS();
92
if (isBlacklisted(name)) {
93
log.debug(`not using kernel pool for ${name} because it is blacklisted`);
94
return await launchJupyterKernelNoPool(name, opts);
95
}
96
let language;
97
try {
98
language = await getLanguage(name);
99
} catch (error) {
100
log.error("Failed to get language of kernel -- not using pool", error);
101
return await launchJupyterKernelNoPool(name, opts);
102
}
103
104
let initCode: string[] = [];
105
if (opts.cwd) {
106
try {
107
const absPath = getAbsolutePathFromHome(opts.cwd);
108
initCode.push(createChdirCommand(language, absPath));
109
} catch (error) {
110
log.error("Failed to get chdir command -- not using pool", error);
111
return await launchJupyterKernelNoPool(name, opts);
112
}
113
}
114
if (opts.env?.COCALC_JUPYTER_FILENAME) {
115
try {
116
initCode.push(
117
createSetenvCommand(
118
language,
119
"COCALC_JUPYTER_FILENAME",
120
opts.env.COCALC_JUPYTER_FILENAME,
121
),
122
);
123
} catch (error) {
124
log.error("Failed to get setenv command -- not using pool", error);
125
return await launchJupyterKernelNoPool(name, opts);
126
}
127
}
128
129
const key = makeKey({ name, opts });
130
log.debug("launchJupyterKernel", key);
131
try {
132
if (POOL[key] == null) {
133
POOL[key] = [];
134
}
135
if (POOL[key].length > 0) {
136
const kernel = POOL[key].shift();
137
replenishPool(key, size, timeout_s);
138
return { ...(kernel as SpawnedKernel), initCode };
139
}
140
const kernel = await launchJupyterKernelNoPool(name, opts);
141
142
// we don't start replenishing the pool until the kernel is initialized,
143
// since we don't want to slow down creating the kernel itself!
144
replenishPool(key, size, timeout_s);
145
146
// we do NOT include the initCode here; it's not needed since this kernel
147
// isn't from the pool.
148
return kernel;
149
} catch (error) {
150
log.error("Failed to launch Jupyter kernel", error);
151
throw error;
152
}
153
}
154
155
// Don't replenish pool for same key twice at same time, or
156
// pool could end up a little too big.
157
const replenishPool = reuseInFlight(
158
async (key: string, size_arg?: number, timeout_s_arg?: number) => {
159
const { name, opts } = JSON.parse(key);
160
if (isBlacklisted(name)) {
161
log.debug(
162
"replenishPool",
163
key,
164
` -- skipping since ${name} is blacklisted`,
165
);
166
return;
167
}
168
const size: number = size_arg ?? getSize();
169
const timeout_s: number = timeout_s_arg ?? getTimeoutS();
170
log.debug("replenishPool", key, { size, timeout_s });
171
try {
172
if (POOL[key] == null) {
173
POOL[key] = [];
174
}
175
const pool = POOL[key];
176
while (pool.length < size) {
177
log.debug("replenishPool - creating a kernel", key);
178
writeConfig(key);
179
await delay(getLaunchDelayMS());
180
const kernel = await launchJupyterKernelNoPool(name, opts);
181
pool.push(kernel);
182
EXPIRE[key] = Math.max(EXPIRE[key] ?? 0, Date.now() + 1000 * timeout_s);
183
}
184
} catch (error) {
185
log.error("Failed to replenish Jupyter kernel pool", error);
186
throw error;
187
}
188
},
189
{
190
createKey: (args) => args[0],
191
},
192
);
193
194
/*
195
If there is nothing in the pool, find the newest non-hidden ipynb files in
196
the current directory or in any immediate subdirectory. It is a JSON file,
197
and we parse the
198
199
*/
200
async function fillWhenEmpty() {
201
for (const key in POOL) {
202
if (POOL[key].length > 0) {
203
// nothing to do
204
return;
205
}
206
}
207
// pool is empty, so possibly put something in it.
208
try {
209
// this can throw, e.g., a corrupt file
210
const key = await readConfig();
211
if (key) {
212
// this can definitely throw, e.g., change image and then available kernels change. No need to crash the entire project in that case!
213
await replenishPool(key);
214
}
215
} catch (error) {
216
console.log("fillWhenEmpty -- A non-fatal error occurred:", error);
217
log.error("fillWhenEmpty -- A non-fatal error occurred:", error);
218
}
219
}
220
221
async function maintainPool() {
222
log.debug("maintainPool", { EXPIRE });
223
const now = Date.now();
224
for (const key in EXPIRE) {
225
if (EXPIRE[key] < now) {
226
log.debug("maintainPool -- expiring key=", key);
227
const pool = POOL[key] ?? [];
228
while (pool.length > 0) {
229
const kernel = pool.shift() as SpawnedKernel;
230
try {
231
await killKernel(kernel);
232
} catch (error) {
233
// won't happen
234
log.error("Failed to kill Jupyter kernel", error);
235
}
236
}
237
}
238
}
239
fillWhenEmpty();
240
}
241
242
export function init() {
243
// DO NOT create the pool if we're running under jest testing, since
244
// then tests don't exit cleanly.
245
if (process.env.NODE_ENV != "test") {
246
setInterval(maintainPool, 30 * 1000);
247
maintainPool();
248
}
249
}
250
251
nodeCleanup(() => {
252
for (const key in POOL) {
253
for (const kernel of POOL[key]) {
254
try {
255
process.kill(-kernel.spawn.pid, "SIGTERM");
256
unlinkSync(kernel.connectionFile);
257
} catch (_) {}
258
}
259
}
260
});
261
262
export async function killKernel(kernel: SpawnedKernel) {
263
kernel.spawn?.removeAllListeners();
264
try {
265
if (kernel.spawn?.pid) {
266
log.debug("killKernel pid=", kernel.spawn.pid);
267
try {
268
process.kill(-kernel.spawn.pid, "SIGTERM");
269
} catch (error) {
270
log.error("Failed to send SIGTERM to Jupyter kernel", error);
271
}
272
}
273
kernel.spawn?.close?.();
274
if (await exists(kernel.connectionFile)) {
275
try {
276
await unlink(kernel.connectionFile);
277
} catch (error) {
278
log.error(
279
`Failed to delete Jupyter kernel connection file ${kernel.connectionFile}`,
280
error,
281
);
282
}
283
}
284
} catch (error) {
285
log.error("Failed to kill Jupyter kernel", error);
286
}
287
}
288
289