CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/project/sage_session.ts
Views: 687
1
//########################################################################
2
// This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
// License: MS-RSL – see LICENSE.md for details
4
//########################################################################
5
6
/*
7
Start the Sage server and also get a new socket connection to it.
8
*/
9
10
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
11
import { getLogger } from "@cocalc/backend/logger";
12
import processKill from "@cocalc/backend/misc/process-kill";
13
import { abspath } from "@cocalc/backend/misc_node";
14
import type {
15
Type as TCPMesgType,
16
Message as TCPMessage,
17
} from "@cocalc/backend/tcp/enable-messaging-protocol";
18
import { CoCalcSocket } from "@cocalc/backend/tcp/enable-messaging-protocol";
19
import * as message from "@cocalc/util/message";
20
import {
21
path_split,
22
to_json,
23
trunc,
24
trunc_middle,
25
uuid,
26
} from "@cocalc/util/misc";
27
import { CB } from "@cocalc/util/types/callback";
28
import { ISageSession, SageCallOpts } from "@cocalc/util/types/sage";
29
import { Client } from "./client";
30
import { get_sage_socket } from "./sage_socket";
31
32
// import { ExecuteCodeOutput } from "@cocalc/util/types/execute-code";
33
34
const winston = getLogger("sage-session");
35
36
//##############################################
37
// Direct Sage socket session -- used internally in local hub, e.g., to assist CodeMirror editors...
38
//##############################################
39
40
// we have to make sure to only export the type to avoid error TS4094
41
export type SageSessionType = InstanceType<typeof SageSession>;
42
43
interface SageSessionOpts {
44
client: Client;
45
path: string; // the path to the *worksheet* file
46
}
47
48
const cache: { [path: string]: SageSessionType } = {};
49
50
export function sage_session(opts: Readonly<SageSessionOpts>): SageSessionType {
51
const { path } = opts;
52
// compute and cache if not cached; otherwise, get from cache:
53
return (cache[path] = cache[path] ?? new SageSession(opts));
54
}
55
// TODO for project-info/server we need a function that returns a path to a sage worksheet for a given PID
56
//export function get_sage_path(pid) {}
57
// return path
58
// }
59
60
/*
61
Sage Session object
62
63
Until you actually try to call it no socket need
64
*/
65
class SageSession implements ISageSession {
66
private _path: string;
67
private _client: Client;
68
private _output_cb: {
69
[key: string]: CB<{ done: boolean; error: string }, any>;
70
} = {};
71
private _socket: CoCalcSocket | undefined;
72
public init_socket: () => Promise<void>;
73
74
constructor(opts: Readonly<SageSessionOpts>) {
75
this.dbg = this.dbg.bind(this);
76
this.close = this.close.bind(this);
77
this.is_running = this.is_running.bind(this);
78
this._init_socket = this._init_socket.bind(this);
79
this.init_socket = reuseInFlight(this._init_socket).bind(this);
80
this._init_path = this._init_path.bind(this);
81
this.call = this.call.bind(this);
82
this._handle_mesg_blob = this._handle_mesg_blob.bind(this);
83
this._handle_mesg_json = this._handle_mesg_json.bind(this);
84
this.dbg("constructor")();
85
this._path = opts.path;
86
this._client = opts.client;
87
this._output_cb = {};
88
}
89
90
private dbg(f: string) {
91
return (m?: string) =>
92
winston.debug(`SageSession(path='${this._path}').${f}: ${m}`);
93
}
94
95
public close(): void {
96
if (this._socket != null) {
97
const pid = this._socket.pid;
98
if (pid != null) processKill(pid, 9);
99
}
100
this._socket?.end();
101
delete this._socket;
102
for (let id in this._output_cb) {
103
const cb = this._output_cb[id];
104
cb({ done: true, error: "killed" });
105
}
106
this._output_cb = {};
107
delete cache[this._path];
108
}
109
110
// return true if there is a socket connection to a sage server process
111
is_running(): boolean {
112
return this._socket != null;
113
}
114
115
// NOTE: There can be many simultaneous init_socket calls at the same time,
116
// if e.g., the socket doesn't exist and there are a bunch of calls to @call
117
// at the same time.
118
// See https://github.com/sagemathinc/cocalc/issues/3506
119
// wrapped in reuseInFlight !
120
private async _init_socket(): Promise<void> {
121
const dbg = this.dbg("init_socket()");
122
dbg();
123
try {
124
const socket: CoCalcSocket = await get_sage_socket();
125
126
dbg("successfully opened a sage session");
127
this._socket = socket;
128
129
socket.on("end", () => {
130
delete this._socket;
131
return dbg("codemirror session terminated");
132
});
133
134
// CRITICAL: we must define this handler before @_init_path below,
135
// or @_init_path can't possibly work... since it would wait for
136
// this handler to get the response message!
137
socket.on("mesg", (type: TCPMesgType, mesg: TCPMessage) => {
138
dbg(`sage session: received message ${type}`);
139
switch (type) {
140
case "json":
141
this._handle_mesg_json(mesg);
142
break;
143
case "blob":
144
this._handle_mesg_blob(mesg);
145
break;
146
}
147
});
148
149
await this._init_path();
150
} catch (err) {
151
if (err) {
152
dbg(`fail -- ${err}.`);
153
throw err;
154
}
155
}
156
}
157
158
private async _init_path(): Promise<void> {
159
const dbg = this.dbg("_init_path()");
160
dbg();
161
return new Promise<void>((resolve, reject) => {
162
this.call({
163
input: {
164
event: "execute_code",
165
code: "os.chdir(salvus.data['path']);__file__=salvus.data['file']",
166
data: {
167
path: abspath(path_split(this._path).head),
168
file: abspath(this._path),
169
},
170
preparse: false,
171
},
172
cb: (resp) => {
173
let err: string | undefined = undefined;
174
if (resp.stderr) {
175
err = resp.stderr;
176
dbg(`error '${err}'`);
177
}
178
if (resp.done) {
179
if (err) {
180
reject(err);
181
} else {
182
resolve();
183
}
184
}
185
},
186
});
187
});
188
}
189
190
public async call({ input, cb }: Readonly<SageCallOpts>): Promise<void> {
191
const dbg = this.dbg("call");
192
dbg(`input='${trunc(to_json(input), 300)}'`);
193
switch (input.event) {
194
case "ping":
195
cb({ pong: true });
196
return;
197
198
case "status":
199
cb({ running: this.is_running() });
200
return;
201
202
case "signal":
203
if (this._socket != null) {
204
dbg(`sending signal ${input.signal} to process ${this._socket.pid}`);
205
const pid = this._socket.pid;
206
if (pid != null) processKill(pid, input.signal);
207
}
208
cb({});
209
return;
210
211
case "restart":
212
dbg("restarting sage session");
213
if (this._socket != null) {
214
this.close();
215
}
216
try {
217
await this.init_socket();
218
cb({});
219
} catch (err) {
220
cb({ error: err });
221
}
222
return;
223
224
case "raw_input":
225
dbg("sending sage_raw_input event");
226
this._socket?.write_mesg("json", {
227
event: "sage_raw_input",
228
value: input.value,
229
});
230
return;
231
232
default:
233
// send message over socket and get responses
234
try {
235
if (this._socket == null) {
236
await this.init_socket();
237
}
238
239
if (input.id == null) {
240
input.id = uuid();
241
dbg(`generated new random uuid for input: '${input.id}' `);
242
}
243
244
if (this._socket == null) {
245
throw new Error("no socket");
246
}
247
248
this._socket.write_mesg("json", input);
249
250
this._output_cb[input.id] = cb; // this is when opts.cb will get called...
251
} catch (err) {
252
cb({ done: true, error: err });
253
}
254
}
255
}
256
private _handle_mesg_blob(mesg: TCPMessage) {
257
const { uuid } = mesg;
258
let { blob } = mesg;
259
const dbg = this.dbg(`_handle_mesg_blob(uuid='${uuid}')`);
260
dbg();
261
262
if (blob == null) {
263
dbg("no blob -- dropping message");
264
return;
265
}
266
267
// This should never happen, typing enforces this to be a Buffer
268
if (typeof blob === "string") {
269
dbg("blob is string -- converting to buffer");
270
blob = Buffer.from(blob, "utf8");
271
}
272
273
this._client.save_blob({
274
blob,
275
uuid,
276
cb: (err, resp) => {
277
if (err) {
278
resp = message.save_blob({
279
error: err,
280
sha1: uuid, // dumb - that sha1 should be called uuid...
281
});
282
}
283
this._socket?.write_mesg("json", resp);
284
},
285
});
286
}
287
288
private _handle_mesg_json(mesg: TCPMessage) {
289
const dbg = this.dbg("_handle_mesg_json");
290
dbg(`mesg='${trunc_middle(to_json(mesg), 400)}'`);
291
if (mesg == null) return; // should not happen
292
const { id } = mesg;
293
if (id == null) return; // should not happen
294
const cb = this._output_cb[id];
295
if (cb != null) {
296
// Must do this check first since it uses done:false.
297
if (mesg.done || mesg.done == null) {
298
delete this._output_cb[id];
299
mesg.done = true;
300
}
301
if (mesg.done != null && !mesg.done) {
302
// waste of space to include done part of mesg if just false for everything else...
303
delete mesg.done;
304
}
305
cb(mesg);
306
}
307
}
308
}
309
310