CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/terminal/lib/terminal.ts
Views: 687
1
import type {
2
ClientCommand,
3
IPty,
4
PrimusChannel,
5
PrimusWithChannels,
6
Options,
7
} from "./types";
8
import { getChannelName, getRemotePtyChannelName } from "./util";
9
import { console_init_filename, len, path_split } from "@cocalc/util/misc";
10
import { getLogger } from "@cocalc/backend/logger";
11
import { envForSpawn } from "@cocalc/backend/misc";
12
import { getCWD } from "./util";
13
import { readlink, realpath, readFile, writeFile } from "node:fs/promises";
14
import { spawn } from "node-pty";
15
import { throttle } from "lodash";
16
import { exists } from "@cocalc/backend/misc/async-utils-node";
17
import { isEqual } from "lodash";
18
import type { Spark } from "primus";
19
import { join } from "path";
20
21
const logger = getLogger("terminal:terminal");
22
23
const CHECK_INTERVAL_MS = 5 * 1000;
24
export const MAX_HISTORY_LENGTH = 1000 * 1000;
25
const TRUNCATE_THRESH_MS = 500;
26
const INFINITY = 999999;
27
const DEFAULT_COMMAND = "/bin/bash";
28
29
const EXIT_MESSAGE = "\r\n\r\n[Process completed - press any key]\r\n\r\n";
30
31
export const REMOTE_TERMINAL_HEARTBEAT_INTERVAL_MS = 7.5 * 1000;
32
33
type MessagesState = "none" | "reading";
34
type State = "init" | "ready" | "closed";
35
36
export class Terminal {
37
private state: State = "init";
38
private options: Options;
39
private channel: PrimusChannel;
40
private remotePtyChannel: PrimusChannel;
41
private history: string = "";
42
private path: string;
43
private client_sizes = {};
44
private last_truncate_time: number = Date.now();
45
private truncating: number = 0;
46
private size?: { rows: number; cols: number };
47
private backendMessagesBuffer = "";
48
private backendMessagesState: MessagesState = "none";
49
// two different ways of providing the backend support -- local or remote
50
private localPty?: IPty;
51
private remotePty?: Spark;
52
private computeServerId: number = 0;
53
private remotePtyHeartbeatInterval;
54
55
constructor(primus: PrimusWithChannels, path: string, options: Options = {}) {
56
this.options = { command: DEFAULT_COMMAND, ...options };
57
this.path = path;
58
this.channel = primus.channel(getChannelName(path));
59
this.channel.on("connection", this.handleClientConnection);
60
this.remotePtyChannel = primus.channel(getRemotePtyChannelName(path));
61
this.remotePtyChannel.on("connection", (conn) => {
62
logger.debug("new remote terminal connection");
63
this.handleRemotePtyConnection(conn);
64
});
65
this.remotePtyHeartbeatInterval = setInterval(() => {
66
// we always do this (basically a no-op) even if there
67
// is no remote pty.
68
this.remotePty?.write({});
69
}, REMOTE_TERMINAL_HEARTBEAT_INTERVAL_MS);
70
}
71
72
init = async () => {
73
await this.initLocalPty();
74
};
75
76
private initLocalPty = async () => {
77
if (this.state == "closed") {
78
throw Error("terminal is closed");
79
}
80
const dbg = (...args) => {
81
logger.debug("initLocalPty: ", ...args);
82
};
83
if (this.remotePty != null) {
84
dbg("don't init local pty since there is a remote one.");
85
return;
86
}
87
if (this.localPty != null) {
88
dbg("don't init local pty since there is already a local one.");
89
return;
90
}
91
92
const args: string[] = [];
93
94
const { options } = this;
95
if (options.args != null) {
96
for (const arg of options.args) {
97
if (typeof arg === "string") {
98
args.push(arg);
99
} else {
100
dbg("WARNING -- discarding invalid non-string arg ", arg);
101
}
102
}
103
} else {
104
const initFilename: string = console_init_filename(this.path);
105
if (await exists(initFilename)) {
106
args.push("--init-file");
107
args.push(path_split(initFilename).tail);
108
}
109
}
110
if (this.remotePty) {
111
// switched to a different remote so don't finish initializing a local one
112
// (we check after each async call)
113
return;
114
}
115
116
const { head: pathHead, tail: pathTail } = path_split(this.path);
117
const env = {
118
COCALC_TERMINAL_FILENAME: pathTail,
119
...envForSpawn(),
120
...options.env,
121
};
122
if (env["TMUX"]) {
123
// If TMUX was set for some reason in the environment that setup
124
// a cocalc project (e.g., start hub in dev mode from tmux), then
125
// TMUX is set even though terminal hasn't started tmux yet, which
126
// confuses our open command. So we explicitly unset it here.
127
// https://unix.stackexchange.com/questions/10689/how-can-i-tell-if-im-in-a-tmux-session-from-a-bash-script
128
delete env["TMUX"];
129
}
130
131
const { command } = options;
132
if (command == null) {
133
throw Error("bug");
134
}
135
const cwd = getCWD(pathHead, options.cwd);
136
137
try {
138
this.history = (await readFile(this.path)).toString();
139
} catch (err) {
140
dbg("WARNING: failed to load", this.path, err);
141
}
142
if (this.remotePty) {
143
// switched to a different remote, so don't finish initializing a local one
144
return;
145
}
146
147
this.setComputeServerId(0);
148
dbg("spawn", {
149
command,
150
args,
151
cwd,
152
size: this.size ? this.size : "size not defined",
153
});
154
const localPty = spawn(command, args, {
155
cwd,
156
env,
157
rows: this.size?.rows,
158
cols: this.size?.cols,
159
}) as IPty;
160
dbg("pid=", localPty.pid, { command, args });
161
this.localPty = localPty;
162
163
localPty.onData(this.handleDataFromTerminal);
164
localPty.onExit(async (exitInfo) => {
165
dbg("exited with code ", exitInfo);
166
this.handleDataFromTerminal(EXIT_MESSAGE);
167
delete this.localPty;
168
});
169
// if (command == "/bin/bash") {
170
// localPty.write("\nreset;history -d $(history 1)\n");
171
// }
172
this.state = "ready";
173
return localPty;
174
};
175
176
close = () => {
177
logger.debug("close");
178
if ((this.state as State) == "closed") {
179
return;
180
}
181
this.state = "closed";
182
this.killPty();
183
this.localPty?.destroy();
184
this.channel.destroy();
185
this.remotePtyChannel.destroy();
186
clearInterval(this.remotePtyHeartbeatInterval);
187
delete this.localPty;
188
delete this.remotePty;
189
};
190
191
getPid = (): number | undefined => {
192
return this.localPty?.pid;
193
};
194
195
// original path
196
getPath = () => {
197
return this.options.path;
198
};
199
200
getCommand = () => {
201
return this.options.command;
202
};
203
204
setCommand = (command: string, args?: string[]) => {
205
if (this.state == "closed") return;
206
if (command == this.options.command && isEqual(args, this.options.args)) {
207
logger.debug("setCommand: no actual change.");
208
return;
209
}
210
logger.debug(
211
"setCommand",
212
{ command: this.options.command, args: this.options.args },
213
"-->",
214
{ command, args },
215
);
216
// we track change
217
this.options.command = command;
218
this.options.args = args;
219
if (this.remotePty != null) {
220
// remote pty
221
this.remotePty.write({ cmd: "set_command", command, args });
222
} else if (this.localPty != null) {
223
this.localPty.onExit(() => {
224
this.initLocalPty();
225
});
226
this.killLocalPty();
227
}
228
};
229
230
private killPty = () => {
231
if (this.localPty != null) {
232
this.killLocalPty();
233
} else if (this.remotePty != null) {
234
this.killRemotePty();
235
}
236
};
237
238
private killLocalPty = () => {
239
if (this.localPty == null) return;
240
logger.debug("killing ", this.localPty.pid);
241
this.localPty.kill("SIGKILL");
242
this.localPty.destroy();
243
delete this.localPty;
244
};
245
246
private killRemotePty = () => {
247
if (this.remotePty == null) return;
248
this.remotePty.write({ cmd: "kill" });
249
};
250
251
private setSizePty = (rows: number, cols: number) => {
252
if (this.localPty != null) {
253
this.localPty.resize(cols, rows);
254
} else if (this.remotePty != null) {
255
this.remotePty.write({ cmd: "size", rows, cols });
256
}
257
};
258
259
private saveHistoryToDisk = throttle(async () => {
260
const target = join(this.getHome(), this.path);
261
try {
262
await writeFile(target, this.history);
263
} catch (err) {
264
logger.debug(
265
`WARNING: failed to save terminal history to '${target}'`,
266
err,
267
);
268
}
269
}, 15000);
270
271
private resetBackendMessagesBuffer = () => {
272
this.backendMessagesBuffer = "";
273
this.backendMessagesState = "none";
274
};
275
276
private handleDataFromTerminal = (data) => {
277
//console.log("handleDataFromTerminal", { data });
278
if (this.state == "closed") return;
279
//logger.debug("terminal: term --> browsers", data);
280
this.handleBackendMessages(data);
281
this.history += data;
282
const n = this.history.length;
283
if (n >= MAX_HISTORY_LENGTH) {
284
logger.debug("terminal data -- truncating");
285
this.history = this.history.slice(n - MAX_HISTORY_LENGTH / 2);
286
const last = this.last_truncate_time;
287
const now = Date.now();
288
this.last_truncate_time = now;
289
logger.debug(now, last, now - last, TRUNCATE_THRESH_MS);
290
if (now - last <= TRUNCATE_THRESH_MS) {
291
// getting a huge amount of data quickly.
292
if (!this.truncating) {
293
this.channel.write({ cmd: "burst" });
294
}
295
this.truncating += data.length;
296
setTimeout(this.checkIfStillTruncating, CHECK_INTERVAL_MS);
297
if (this.truncating >= 5 * MAX_HISTORY_LENGTH) {
298
// only start sending control+c if output has been completely stuck
299
// being truncated several times in a row -- it has to be a serious non-stop burst...
300
this.localPty?.write("\u0003");
301
}
302
return;
303
} else {
304
this.truncating = 0;
305
}
306
}
307
this.saveHistoryToDisk();
308
if (!this.truncating) {
309
this.channel.write(data);
310
}
311
};
312
313
private checkIfStillTruncating = () => {
314
if (!this.truncating) {
315
return;
316
}
317
if (Date.now() - this.last_truncate_time >= CHECK_INTERVAL_MS) {
318
// turn off truncating, and send recent data.
319
const { truncating, history } = this;
320
this.channel.write(
321
history.slice(Math.max(0, history.length - truncating)),
322
);
323
this.truncating = 0;
324
this.channel.write({ cmd: "no-burst" });
325
} else {
326
setTimeout(this.checkIfStillTruncating, CHECK_INTERVAL_MS);
327
}
328
};
329
330
private handleBackendMessages = (data: string) => {
331
/* parse out messages like this:
332
\x1b]49;"valid JSON string here"\x07
333
and format and send them via our json channel.
334
NOTE: such messages also get sent via the
335
normal channel, but ignored by the client.
336
*/
337
if (this.backendMessagesState === "none") {
338
const i = data.indexOf("\x1b");
339
if (i === -1) {
340
return; // nothing to worry about
341
}
342
// stringify it so it is easy to see what is there:
343
this.backendMessagesState = "reading";
344
this.backendMessagesBuffer = data.slice(i);
345
} else {
346
this.backendMessagesBuffer += data;
347
}
348
if (
349
this.backendMessagesBuffer.length >= 5 &&
350
this.backendMessagesBuffer.slice(1, 5) != "]49;"
351
) {
352
this.resetBackendMessagesBuffer();
353
return;
354
}
355
if (this.backendMessagesBuffer.length >= 6) {
356
const i = this.backendMessagesBuffer.indexOf("\x07");
357
if (i === -1) {
358
// continue to wait... unless too long
359
if (this.backendMessagesBuffer.length > 10000) {
360
this.resetBackendMessagesBuffer();
361
}
362
return;
363
}
364
const s = this.backendMessagesBuffer.slice(5, i);
365
this.resetBackendMessagesBuffer();
366
logger.debug(
367
`handle_backend_message: parsing JSON payload ${JSON.stringify(s)}`,
368
);
369
try {
370
const payload = JSON.parse(s);
371
this.channel.write({ cmd: "message", payload });
372
} catch (err) {
373
logger.warn(
374
`handle_backend_message: error sending JSON payload ${JSON.stringify(
375
s,
376
)}, ${err}`,
377
);
378
// Otherwise, ignore...
379
}
380
}
381
};
382
383
private setSize = (spark: Spark, newSize: { rows; cols }) => {
384
this.client_sizes[spark.id] = newSize;
385
try {
386
this.resize();
387
} catch (err) {
388
// no-op -- can happen if terminal is restarting.
389
logger.debug("WARNING: resizing terminal", this.path, err);
390
}
391
};
392
393
getSize = (): { rows: number; cols: number } | undefined => {
394
const sizes = this.client_sizes;
395
if (len(sizes) == 0) {
396
return;
397
}
398
let rows: number = INFINITY;
399
let cols: number = INFINITY;
400
for (const id in sizes) {
401
if (sizes[id].rows) {
402
// if, since 0 rows or 0 columns means *ignore*.
403
rows = Math.min(rows, sizes[id].rows);
404
}
405
if (sizes[id].cols) {
406
cols = Math.min(cols, sizes[id].cols);
407
}
408
}
409
if (rows === INFINITY || cols === INFINITY) {
410
// no clients with known sizes currently visible
411
return;
412
}
413
// ensure valid values
414
rows = Math.max(rows ?? 1, rows);
415
cols = Math.max(cols ?? 1, cols);
416
// cache for future use.
417
this.size = { rows, cols };
418
return { rows, cols };
419
};
420
421
private resize = () => {
422
if (this.state == "closed") return;
423
//logger.debug("resize");
424
if (this.localPty == null && this.remotePty == null) {
425
// nothing to do
426
return;
427
}
428
const size = this.getSize();
429
if (size == null) {
430
return;
431
}
432
const { rows, cols } = size;
433
logger.debug("resize", "new size", rows, cols);
434
try {
435
this.setSizePty(rows, cols);
436
// broadcast out new size to all clients
437
this.channel.write({ cmd: "size", rows, cols });
438
} catch (err) {
439
logger.debug("terminal channel -- WARNING: unable to resize term", err);
440
}
441
};
442
443
private setComputeServerId = (id: number) => {
444
this.computeServerId = id;
445
this.channel.write({ cmd: "computeServerId", id });
446
};
447
448
private sendCurrentWorkingDirectory = async (spark: Spark) => {
449
if (this.localPty != null) {
450
await this.sendCurrentWorkingDirectoryLocalPty(spark);
451
} else if (this.remotePty != null) {
452
await this.sendCurrentWorkingDirectoryRemotePty(spark);
453
}
454
};
455
456
private getHome = () => {
457
return process.env.HOME ?? "/home/user";
458
};
459
460
private sendCurrentWorkingDirectoryLocalPty = async (spark: Spark) => {
461
if (this.localPty == null) {
462
return;
463
}
464
// we reply with the current working directory of the underlying terminal process,
465
// which is why we use readlink and proc below.
466
const pid = this.localPty.pid;
467
// [hsy/dev] wrapping in realpath, because I had the odd case, where the project's
468
// home included a symlink, hence the "startsWith" below didn't remove the home dir.
469
const home = await realpath(this.getHome());
470
const cwd = await readlink(`/proc/${pid}/cwd`);
471
// try to send back a relative path, because the webapp does not
472
// understand absolute paths
473
const path = cwd.startsWith(home) ? cwd.slice(home.length + 1) : cwd;
474
logger.debug("terminal cwd sent back", { path });
475
spark.write({ cmd: "cwd", payload: path });
476
};
477
478
private sendCurrentWorkingDirectoryRemotePty = async (spark: Spark) => {
479
if (this.remotePty == null) {
480
return;
481
}
482
// Write cwd command, then wait for a cmd:'cwd' response, and
483
// forward it to the spark.
484
this.remotePty.write({ cmd: "cwd" });
485
const handle = (mesg) => {
486
if (typeof mesg == "object" && mesg.cmd == "cwd") {
487
spark.write(mesg);
488
this.remotePty?.removeListener("data", handle);
489
}
490
};
491
this.remotePty.addListener("data", handle);
492
};
493
494
private bootAllOtherClients = (spark: Spark) => {
495
// delete all sizes except this one, so at least kick resets
496
// the sizes no matter what.
497
for (const id in this.client_sizes) {
498
if (id !== spark.id) {
499
delete this.client_sizes[id];
500
}
501
}
502
// next tell this client to go fullsize.
503
if (this.size != null) {
504
const { rows, cols } = this.size;
505
if (rows && cols) {
506
spark.write({ cmd: "size", rows, cols });
507
}
508
}
509
// broadcast message to all other clients telling them to close.
510
this.channel.forEach((spark0, id, _) => {
511
if (id !== spark.id) {
512
spark0.write({ cmd: "close" });
513
}
514
});
515
};
516
517
private writeToPty = async (data) => {
518
if (this.state == "closed") return;
519
// only for VERY low level debugging:
520
// logger.debug("writeToPty", { data });
521
if (this.localPty != null) {
522
this.localPty.write(data);
523
} else if (this.remotePty != null) {
524
this.remotePty.write(data);
525
} else {
526
logger.debug("no pty active, but got data, so let's spawn one locally");
527
const pty = await this.initLocalPty();
528
if (pty != null) {
529
// we delete first character since it is the "any key"
530
// user hit to get terminal going.
531
pty.write(data.slice(1));
532
}
533
}
534
};
535
536
private handleDataFromClient = async (
537
spark,
538
data: string | ClientCommand,
539
) => {
540
//logger.debug("terminal: browser --> term", name, JSON.stringify(data));
541
if (typeof data === "string") {
542
this.writeToPty(data);
543
} else if (typeof data === "object") {
544
await this.handleCommandFromClient(spark, data);
545
}
546
};
547
548
private handleCommandFromClient = async (
549
spark: Spark,
550
data: ClientCommand,
551
) => {
552
// control message
553
//logger.debug("terminal channel control message", JSON.stringify(data));
554
if (this.localPty == null && this.remotePty == null) {
555
await this.initLocalPty();
556
}
557
switch (data.cmd) {
558
case "size":
559
this.setSize(spark, { rows: data.rows, cols: data.cols });
560
break;
561
562
case "set_command":
563
this.setCommand(data.command, data.args);
564
break;
565
566
case "kill":
567
// send kill signal
568
this.killPty();
569
break;
570
571
case "cwd":
572
try {
573
await this.sendCurrentWorkingDirectory(spark);
574
} catch (err) {
575
logger.debug(
576
"WARNING -- issue getting current working directory",
577
err,
578
);
579
// TODO: the terminal protocol doesn't even have a way
580
// to report that an error occured, so this silently
581
// fails. It's just for displaying the current working
582
// directory, so not too critical.
583
}
584
break;
585
586
case "boot": {
587
this.bootAllOtherClients(spark);
588
break;
589
}
590
}
591
};
592
593
private handleClientConnection = (spark: Spark) => {
594
logger.debug(
595
this.path,
596
`new client connection from ${spark.address.ip} -- ${spark.id}`,
597
);
598
599
// send current size info
600
if (this.size != null) {
601
const { rows, cols } = this.size;
602
spark.write({ cmd: "size", rows, cols });
603
}
604
605
spark.write({ cmd: "computeServerId", id: this.computeServerId });
606
607
// send burst info
608
if (this.truncating) {
609
spark.write({ cmd: "burst" });
610
}
611
612
// send history
613
spark.write(this.history);
614
615
// have history, so do not ignore commands now.
616
spark.write({ cmd: "no-ignore" });
617
618
spark.on("end", () => {
619
if (this.state == "closed") return;
620
delete this.client_sizes[spark.id];
621
this.resize();
622
});
623
624
spark.on("data", async (data) => {
625
if ((this.state as State) == "closed") return;
626
try {
627
await this.handleDataFromClient(spark, data);
628
} catch (err) {
629
if (this.state != "closed") {
630
spark.write(`${err}`);
631
}
632
}
633
});
634
};
635
636
// inform remote pty client of the exact options that are current here.
637
private initRemotePty = () => {
638
if (this.remotePty == null) return;
639
this.remotePty.write({
640
cmd: "init",
641
options: this.options,
642
size: this.getSize(),
643
});
644
};
645
646
private handleRemotePtyConnection = (remotePty: Spark) => {
647
logger.debug(
648
this.path,
649
`new pty connection from ${remotePty.address.ip} -- ${remotePty.id}`,
650
);
651
if (this.remotePty != null) {
652
// already an existing remote connection
653
// Remove listeners and end it. We have to
654
// remove listeners or calling end will trigger
655
// the remotePty.on("end",...) below, which messes
656
// up everything.
657
this.remotePty.removeAllListeners();
658
this.remotePty.end();
659
}
660
661
remotePty.on("end", async () => {
662
if (this.state == "closed") return;
663
logger.debug("ending existing remote terminal");
664
delete this.remotePty;
665
await this.initLocalPty();
666
});
667
668
remotePty.on("data", async (data) => {
669
if ((this.state as State) == "closed") return;
670
if (typeof data == "string") {
671
this.handleDataFromTerminal(data);
672
} else {
673
if (this.localPty != null) {
674
// already switched back to local
675
return;
676
}
677
if (typeof data == "object") {
678
switch (data.cmd) {
679
case "setComputeServerId":
680
this.setComputeServerId(data.id);
681
break;
682
case "exit": {
683
this.handleDataFromTerminal(EXIT_MESSAGE);
684
break;
685
}
686
}
687
}
688
}
689
});
690
691
this.remotePty = remotePty;
692
this.initRemotePty();
693
this.killLocalPty();
694
};
695
}
696
697