Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/stateless-api/kernel.ts
5796 views
1
import { kernel as createKernel } from "@cocalc/jupyter/kernel";
2
import type { JupyterKernelInterface } from "@cocalc/jupyter/types/project-interface";
3
import { run_cell } from "@cocalc/jupyter/nbgrader/jupyter-run";
4
import { mkdtemp } from "fs/promises";
5
import { rmSync } from "fs";
6
import { tmpdir } from "os";
7
import { join } from "path";
8
import getLogger from "@cocalc/backend/logger";
9
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
10
import { type Limits } from "@cocalc/util/jupyter/nbgrader-types";
11
import { closeAll as closeAllLaunches } from "@cocalc/jupyter/kernel/launch-kernel";
12
13
const log = getLogger("jupyter:stateless-api:kernel");
14
15
export const DEFAULT_POOL_SIZE = 2;
16
const DEFAULT_POOL_TIMEOUT_S = 3600;
17
18
// When we idle timeout we always keep at least this many kernels around. We don't go to 0.
19
const MIN_POOL_SIZE = 1;
20
21
// -n = max open files
22
// -f = max bytes allowed to *write* to disk
23
// -t = max cputime is 30 seconds
24
// -v = max virtual memory usage to 3GB
25
const DEFAULT_ULIMIT = "-n 1000 -f 10485760 -t 30 -v 3000000";
26
27
export default class Kernel {
28
private static pools: { [kernelName: string]: Kernel[] } = {};
29
private static last_active: { [kernelName: string]: number } = {};
30
private static ulimit: { [kernelName: string]: string } = {};
31
32
private kernel?: JupyterKernelInterface;
33
private tempDir?: string;
34
private state?: "closed" | undefined = undefined;
35
36
constructor(private kernelName: string) {
37
kernels.push(this);
38
}
39
40
private static getPool(kernelName: string) {
41
let pool = Kernel.pools[kernelName];
42
if (pool == null) {
43
pool = Kernel.pools[kernelName] = [];
44
}
45
return pool;
46
}
47
48
// changing ulimit only impacts NEWLY **created** kernels.
49
static setUlimit(kernelName: string, ulimit: string) {
50
Kernel.ulimit[kernelName] = ulimit;
51
}
52
53
// Set a timeout for a given kernel pool (for a specifically named kernel)
54
// to determine when to clear it if no requests have been made.
55
private static setIdleTimeout(kernelName: string, timeout_s: number) {
56
if (!timeout_s) {
57
// 0 = no timeout
58
return;
59
}
60
const now = Date.now();
61
Kernel.last_active[kernelName] = now;
62
setTimeout(
63
() => {
64
if (Kernel.last_active[kernelName] > now) {
65
// kernel was requested after now.
66
return;
67
}
68
// No recent request for kernelName.
69
// Keep at least MIN_POOL_SIZE in Kernel.pools[kernelName]. I.e.,
70
// instead of closing and deleting everything, we just want to
71
// shrink the pool to MIN_POOL_SIZE.
72
// no request for kernelName, so we clear them from the pool
73
const poolToShrink = Kernel.pools[kernelName] ?? [];
74
if (poolToShrink.length > MIN_POOL_SIZE) {
75
// check if pool needs shrinking
76
// calculate how many to close
77
const numToClose = poolToShrink.length - MIN_POOL_SIZE;
78
for (let i = 0; i < numToClose; i++) {
79
poolToShrink[i].close(); // close oldest kernels first
80
}
81
// update pool to have only the most recent kernels
82
Kernel.pools[kernelName] = poolToShrink.slice(numToClose);
83
}
84
},
85
(timeout_s ?? DEFAULT_POOL_TIMEOUT_S) * 1000,
86
);
87
}
88
89
static async getFromPool(
90
kernelName: string,
91
{
92
size = DEFAULT_POOL_SIZE,
93
timeout_s = DEFAULT_POOL_TIMEOUT_S,
94
}: { size?: number; timeout_s?: number } = {},
95
): Promise<Kernel> {
96
if (size <= 0) {
97
// not using a pool -- just create and return kernel
98
const k = new Kernel(kernelName);
99
await k.init();
100
return k;
101
}
102
this.setIdleTimeout(kernelName, timeout_s);
103
const pool = Kernel.getPool(kernelName);
104
let i = 1;
105
while (pool.length <= size) {
106
// <= since going to remove one below
107
const k = new Kernel(kernelName);
108
pool.push(k);
109
// we cause this kernel to get init'd soon, but NOT immediately, since starting
110
// several at once just makes them all take much longer exactly when the user
111
// most wants to use their new kernel
112
setTimeout(
113
async () => {
114
try {
115
await k.init();
116
} catch (err) {
117
log.debug("Failed to pre-init Jupyter kernel -- ", kernelName, err);
118
}
119
},
120
// stagger startup by a few seconds, though kernels that are needed will start ASAP.
121
Math.random() * 3000 * i,
122
);
123
i += 1;
124
}
125
const k = pool.shift() as Kernel;
126
// it's ok to call again due to reuseInFlight and that no-op after init.
127
await k.init();
128
return k;
129
}
130
131
private init = reuseInFlight(async () => {
132
if (this.kernel != null || this.state == "closed") {
133
// already initialized
134
return;
135
}
136
this.tempDir = await mkdtemp(join(tmpdir(), "cocalc"));
137
if (this.state == "closed") {
138
this.close();
139
return;
140
}
141
const path = `${this.tempDir}/execute.ipynb`;
142
this.kernel = createKernel({
143
name: this.kernelName,
144
path,
145
ulimit: Kernel.ulimit[this.kernelName] ?? DEFAULT_ULIMIT,
146
});
147
await this.kernel.ensure_running();
148
if (this.state == "closed") {
149
this.close();
150
return;
151
}
152
await this.kernel.execute_code_now({ code: "" });
153
if (this.state == "closed") {
154
this.close();
155
return;
156
}
157
});
158
159
// empty all pools and do not refill
160
static closeAll() {
161
for (const kernelName in Kernel.pools) {
162
for (const kernel of Kernel.pools[kernelName]) {
163
kernel.close();
164
}
165
}
166
Kernel.pools = {};
167
Kernel.last_active = {};
168
}
169
170
execute = async (
171
code: string,
172
limits: Partial<Limits> = {
173
timeout_ms: 30000,
174
timeout_ms_per_cell: 30000,
175
max_output: 5000000,
176
max_output_per_cell: 1000000,
177
start_time: Date.now(),
178
total_output: 0,
179
},
180
) => {
181
if (this.kernel == null) {
182
throw Error("kernel already closed");
183
}
184
185
if (limits.total_output == null) {
186
limits.total_output = 0;
187
}
188
const cell = { cell_type: "code", source: [code], outputs: [] };
189
await run_cell(this.kernel, limits, cell);
190
return cell.outputs;
191
};
192
193
chdir = async (path: string) => {
194
if (this.kernel == null) return;
195
await this.kernel.chdir(path);
196
};
197
198
// this is not used anywhere
199
returnToPool = async (): Promise<void> => {
200
if (this.kernel == null) {
201
throw Error("kernel already closed");
202
}
203
const pool = Kernel.getPool(this.kernelName);
204
pool.push(this);
205
};
206
207
close = () => {
208
this.state = "closed";
209
try {
210
this.kernel?.close();
211
} catch (err) {
212
log.warn("Error closing kernel", err);
213
} finally {
214
delete this.kernel;
215
}
216
if (this.tempDir) {
217
try {
218
rmSync(this.tempDir, { force: true, recursive: true });
219
} catch (err) {
220
log.warn("Error cleaning up temporary directory", err);
221
} finally {
222
delete this.tempDir;
223
}
224
}
225
};
226
}
227
228
// Clean up after any kernel created here
229
const kernels: Kernel[] = [];
230
function closeAll() {
231
closeAllLaunches();
232
for (const kernel of kernels) {
233
kernel.close();
234
}
235
kernels.length = 0;
236
}
237
238
process.once("exit", () => {
239
closeAll();
240
});
241
242
["SIGINT", "SIGTERM", "SIGQUIT"].forEach((sig) => {
243
process.once(sig, () => {
244
closeAll();
245
});
246
});
247
248