Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/terminal/lib/terminal.ts
Views: 687
import type {1ClientCommand,2IPty,3PrimusChannel,4PrimusWithChannels,5Options,6} from "./types";7import { getChannelName, getRemotePtyChannelName } from "./util";8import { console_init_filename, len, path_split } from "@cocalc/util/misc";9import { getLogger } from "@cocalc/backend/logger";10import { envForSpawn } from "@cocalc/backend/misc";11import { getCWD } from "./util";12import { readlink, realpath, readFile, writeFile } from "node:fs/promises";13import { spawn } from "node-pty";14import { throttle } from "lodash";15import { exists } from "@cocalc/backend/misc/async-utils-node";16import { isEqual } from "lodash";17import type { Spark } from "primus";18import { join } from "path";1920const logger = getLogger("terminal:terminal");2122const CHECK_INTERVAL_MS = 5 * 1000;23export const MAX_HISTORY_LENGTH = 1000 * 1000;24const TRUNCATE_THRESH_MS = 500;25const INFINITY = 999999;26const DEFAULT_COMMAND = "/bin/bash";2728const EXIT_MESSAGE = "\r\n\r\n[Process completed - press any key]\r\n\r\n";2930export const REMOTE_TERMINAL_HEARTBEAT_INTERVAL_MS = 7.5 * 1000;3132type MessagesState = "none" | "reading";33type State = "init" | "ready" | "closed";3435export class Terminal {36private state: State = "init";37private options: Options;38private channel: PrimusChannel;39private remotePtyChannel: PrimusChannel;40private history: string = "";41private path: string;42private client_sizes = {};43private last_truncate_time: number = Date.now();44private truncating: number = 0;45private size?: { rows: number; cols: number };46private backendMessagesBuffer = "";47private backendMessagesState: MessagesState = "none";48// two different ways of providing the backend support -- local or remote49private localPty?: IPty;50private remotePty?: Spark;51private computeServerId: number = 0;52private remotePtyHeartbeatInterval;5354constructor(primus: PrimusWithChannels, path: string, options: Options = {}) {55this.options = { command: DEFAULT_COMMAND, ...options };56this.path = path;57this.channel = primus.channel(getChannelName(path));58this.channel.on("connection", this.handleClientConnection);59this.remotePtyChannel = primus.channel(getRemotePtyChannelName(path));60this.remotePtyChannel.on("connection", (conn) => {61logger.debug("new remote terminal connection");62this.handleRemotePtyConnection(conn);63});64this.remotePtyHeartbeatInterval = setInterval(() => {65// we always do this (basically a no-op) even if there66// is no remote pty.67this.remotePty?.write({});68}, REMOTE_TERMINAL_HEARTBEAT_INTERVAL_MS);69}7071init = async () => {72await this.initLocalPty();73};7475private initLocalPty = async () => {76if (this.state == "closed") {77throw Error("terminal is closed");78}79const dbg = (...args) => {80logger.debug("initLocalPty: ", ...args);81};82if (this.remotePty != null) {83dbg("don't init local pty since there is a remote one.");84return;85}86if (this.localPty != null) {87dbg("don't init local pty since there is already a local one.");88return;89}9091const args: string[] = [];9293const { options } = this;94if (options.args != null) {95for (const arg of options.args) {96if (typeof arg === "string") {97args.push(arg);98} else {99dbg("WARNING -- discarding invalid non-string arg ", arg);100}101}102} else {103const initFilename: string = console_init_filename(this.path);104if (await exists(initFilename)) {105args.push("--init-file");106args.push(path_split(initFilename).tail);107}108}109if (this.remotePty) {110// switched to a different remote so don't finish initializing a local one111// (we check after each async call)112return;113}114115const { head: pathHead, tail: pathTail } = path_split(this.path);116const env = {117COCALC_TERMINAL_FILENAME: pathTail,118...envForSpawn(),119...options.env,120};121if (env["TMUX"]) {122// If TMUX was set for some reason in the environment that setup123// a cocalc project (e.g., start hub in dev mode from tmux), then124// TMUX is set even though terminal hasn't started tmux yet, which125// confuses our open command. So we explicitly unset it here.126// https://unix.stackexchange.com/questions/10689/how-can-i-tell-if-im-in-a-tmux-session-from-a-bash-script127delete env["TMUX"];128}129130const { command } = options;131if (command == null) {132throw Error("bug");133}134const cwd = getCWD(pathHead, options.cwd);135136try {137this.history = (await readFile(this.path)).toString();138} catch (err) {139dbg("WARNING: failed to load", this.path, err);140}141if (this.remotePty) {142// switched to a different remote, so don't finish initializing a local one143return;144}145146this.setComputeServerId(0);147dbg("spawn", {148command,149args,150cwd,151size: this.size ? this.size : "size not defined",152});153const localPty = spawn(command, args, {154cwd,155env,156rows: this.size?.rows,157cols: this.size?.cols,158}) as IPty;159dbg("pid=", localPty.pid, { command, args });160this.localPty = localPty;161162localPty.onData(this.handleDataFromTerminal);163localPty.onExit(async (exitInfo) => {164dbg("exited with code ", exitInfo);165this.handleDataFromTerminal(EXIT_MESSAGE);166delete this.localPty;167});168// if (command == "/bin/bash") {169// localPty.write("\nreset;history -d $(history 1)\n");170// }171this.state = "ready";172return localPty;173};174175close = () => {176logger.debug("close");177if ((this.state as State) == "closed") {178return;179}180this.state = "closed";181this.killPty();182this.localPty?.destroy();183this.channel.destroy();184this.remotePtyChannel.destroy();185clearInterval(this.remotePtyHeartbeatInterval);186delete this.localPty;187delete this.remotePty;188};189190getPid = (): number | undefined => {191return this.localPty?.pid;192};193194// original path195getPath = () => {196return this.options.path;197};198199getCommand = () => {200return this.options.command;201};202203setCommand = (command: string, args?: string[]) => {204if (this.state == "closed") return;205if (command == this.options.command && isEqual(args, this.options.args)) {206logger.debug("setCommand: no actual change.");207return;208}209logger.debug(210"setCommand",211{ command: this.options.command, args: this.options.args },212"-->",213{ command, args },214);215// we track change216this.options.command = command;217this.options.args = args;218if (this.remotePty != null) {219// remote pty220this.remotePty.write({ cmd: "set_command", command, args });221} else if (this.localPty != null) {222this.localPty.onExit(() => {223this.initLocalPty();224});225this.killLocalPty();226}227};228229private killPty = () => {230if (this.localPty != null) {231this.killLocalPty();232} else if (this.remotePty != null) {233this.killRemotePty();234}235};236237private killLocalPty = () => {238if (this.localPty == null) return;239logger.debug("killing ", this.localPty.pid);240this.localPty.kill("SIGKILL");241this.localPty.destroy();242delete this.localPty;243};244245private killRemotePty = () => {246if (this.remotePty == null) return;247this.remotePty.write({ cmd: "kill" });248};249250private setSizePty = (rows: number, cols: number) => {251if (this.localPty != null) {252this.localPty.resize(cols, rows);253} else if (this.remotePty != null) {254this.remotePty.write({ cmd: "size", rows, cols });255}256};257258private saveHistoryToDisk = throttle(async () => {259const target = join(this.getHome(), this.path);260try {261await writeFile(target, this.history);262} catch (err) {263logger.debug(264`WARNING: failed to save terminal history to '${target}'`,265err,266);267}268}, 15000);269270private resetBackendMessagesBuffer = () => {271this.backendMessagesBuffer = "";272this.backendMessagesState = "none";273};274275private handleDataFromTerminal = (data) => {276//console.log("handleDataFromTerminal", { data });277if (this.state == "closed") return;278//logger.debug("terminal: term --> browsers", data);279this.handleBackendMessages(data);280this.history += data;281const n = this.history.length;282if (n >= MAX_HISTORY_LENGTH) {283logger.debug("terminal data -- truncating");284this.history = this.history.slice(n - MAX_HISTORY_LENGTH / 2);285const last = this.last_truncate_time;286const now = Date.now();287this.last_truncate_time = now;288logger.debug(now, last, now - last, TRUNCATE_THRESH_MS);289if (now - last <= TRUNCATE_THRESH_MS) {290// getting a huge amount of data quickly.291if (!this.truncating) {292this.channel.write({ cmd: "burst" });293}294this.truncating += data.length;295setTimeout(this.checkIfStillTruncating, CHECK_INTERVAL_MS);296if (this.truncating >= 5 * MAX_HISTORY_LENGTH) {297// only start sending control+c if output has been completely stuck298// being truncated several times in a row -- it has to be a serious non-stop burst...299this.localPty?.write("\u0003");300}301return;302} else {303this.truncating = 0;304}305}306this.saveHistoryToDisk();307if (!this.truncating) {308this.channel.write(data);309}310};311312private checkIfStillTruncating = () => {313if (!this.truncating) {314return;315}316if (Date.now() - this.last_truncate_time >= CHECK_INTERVAL_MS) {317// turn off truncating, and send recent data.318const { truncating, history } = this;319this.channel.write(320history.slice(Math.max(0, history.length - truncating)),321);322this.truncating = 0;323this.channel.write({ cmd: "no-burst" });324} else {325setTimeout(this.checkIfStillTruncating, CHECK_INTERVAL_MS);326}327};328329private handleBackendMessages = (data: string) => {330/* parse out messages like this:331\x1b]49;"valid JSON string here"\x07332and format and send them via our json channel.333NOTE: such messages also get sent via the334normal channel, but ignored by the client.335*/336if (this.backendMessagesState === "none") {337const i = data.indexOf("\x1b");338if (i === -1) {339return; // nothing to worry about340}341// stringify it so it is easy to see what is there:342this.backendMessagesState = "reading";343this.backendMessagesBuffer = data.slice(i);344} else {345this.backendMessagesBuffer += data;346}347if (348this.backendMessagesBuffer.length >= 5 &&349this.backendMessagesBuffer.slice(1, 5) != "]49;"350) {351this.resetBackendMessagesBuffer();352return;353}354if (this.backendMessagesBuffer.length >= 6) {355const i = this.backendMessagesBuffer.indexOf("\x07");356if (i === -1) {357// continue to wait... unless too long358if (this.backendMessagesBuffer.length > 10000) {359this.resetBackendMessagesBuffer();360}361return;362}363const s = this.backendMessagesBuffer.slice(5, i);364this.resetBackendMessagesBuffer();365logger.debug(366`handle_backend_message: parsing JSON payload ${JSON.stringify(s)}`,367);368try {369const payload = JSON.parse(s);370this.channel.write({ cmd: "message", payload });371} catch (err) {372logger.warn(373`handle_backend_message: error sending JSON payload ${JSON.stringify(374s,375)}, ${err}`,376);377// Otherwise, ignore...378}379}380};381382private setSize = (spark: Spark, newSize: { rows; cols }) => {383this.client_sizes[spark.id] = newSize;384try {385this.resize();386} catch (err) {387// no-op -- can happen if terminal is restarting.388logger.debug("WARNING: resizing terminal", this.path, err);389}390};391392getSize = (): { rows: number; cols: number } | undefined => {393const sizes = this.client_sizes;394if (len(sizes) == 0) {395return;396}397let rows: number = INFINITY;398let cols: number = INFINITY;399for (const id in sizes) {400if (sizes[id].rows) {401// if, since 0 rows or 0 columns means *ignore*.402rows = Math.min(rows, sizes[id].rows);403}404if (sizes[id].cols) {405cols = Math.min(cols, sizes[id].cols);406}407}408if (rows === INFINITY || cols === INFINITY) {409// no clients with known sizes currently visible410return;411}412// ensure valid values413rows = Math.max(rows ?? 1, rows);414cols = Math.max(cols ?? 1, cols);415// cache for future use.416this.size = { rows, cols };417return { rows, cols };418};419420private resize = () => {421if (this.state == "closed") return;422//logger.debug("resize");423if (this.localPty == null && this.remotePty == null) {424// nothing to do425return;426}427const size = this.getSize();428if (size == null) {429return;430}431const { rows, cols } = size;432logger.debug("resize", "new size", rows, cols);433try {434this.setSizePty(rows, cols);435// broadcast out new size to all clients436this.channel.write({ cmd: "size", rows, cols });437} catch (err) {438logger.debug("terminal channel -- WARNING: unable to resize term", err);439}440};441442private setComputeServerId = (id: number) => {443this.computeServerId = id;444this.channel.write({ cmd: "computeServerId", id });445};446447private sendCurrentWorkingDirectory = async (spark: Spark) => {448if (this.localPty != null) {449await this.sendCurrentWorkingDirectoryLocalPty(spark);450} else if (this.remotePty != null) {451await this.sendCurrentWorkingDirectoryRemotePty(spark);452}453};454455private getHome = () => {456return process.env.HOME ?? "/home/user";457};458459private sendCurrentWorkingDirectoryLocalPty = async (spark: Spark) => {460if (this.localPty == null) {461return;462}463// we reply with the current working directory of the underlying terminal process,464// which is why we use readlink and proc below.465const pid = this.localPty.pid;466// [hsy/dev] wrapping in realpath, because I had the odd case, where the project's467// home included a symlink, hence the "startsWith" below didn't remove the home dir.468const home = await realpath(this.getHome());469const cwd = await readlink(`/proc/${pid}/cwd`);470// try to send back a relative path, because the webapp does not471// understand absolute paths472const path = cwd.startsWith(home) ? cwd.slice(home.length + 1) : cwd;473logger.debug("terminal cwd sent back", { path });474spark.write({ cmd: "cwd", payload: path });475};476477private sendCurrentWorkingDirectoryRemotePty = async (spark: Spark) => {478if (this.remotePty == null) {479return;480}481// Write cwd command, then wait for a cmd:'cwd' response, and482// forward it to the spark.483this.remotePty.write({ cmd: "cwd" });484const handle = (mesg) => {485if (typeof mesg == "object" && mesg.cmd == "cwd") {486spark.write(mesg);487this.remotePty?.removeListener("data", handle);488}489};490this.remotePty.addListener("data", handle);491};492493private bootAllOtherClients = (spark: Spark) => {494// delete all sizes except this one, so at least kick resets495// the sizes no matter what.496for (const id in this.client_sizes) {497if (id !== spark.id) {498delete this.client_sizes[id];499}500}501// next tell this client to go fullsize.502if (this.size != null) {503const { rows, cols } = this.size;504if (rows && cols) {505spark.write({ cmd: "size", rows, cols });506}507}508// broadcast message to all other clients telling them to close.509this.channel.forEach((spark0, id, _) => {510if (id !== spark.id) {511spark0.write({ cmd: "close" });512}513});514};515516private writeToPty = async (data) => {517if (this.state == "closed") return;518// only for VERY low level debugging:519// logger.debug("writeToPty", { data });520if (this.localPty != null) {521this.localPty.write(data);522} else if (this.remotePty != null) {523this.remotePty.write(data);524} else {525logger.debug("no pty active, but got data, so let's spawn one locally");526const pty = await this.initLocalPty();527if (pty != null) {528// we delete first character since it is the "any key"529// user hit to get terminal going.530pty.write(data.slice(1));531}532}533};534535private handleDataFromClient = async (536spark,537data: string | ClientCommand,538) => {539//logger.debug("terminal: browser --> term", name, JSON.stringify(data));540if (typeof data === "string") {541this.writeToPty(data);542} else if (typeof data === "object") {543await this.handleCommandFromClient(spark, data);544}545};546547private handleCommandFromClient = async (548spark: Spark,549data: ClientCommand,550) => {551// control message552//logger.debug("terminal channel control message", JSON.stringify(data));553if (this.localPty == null && this.remotePty == null) {554await this.initLocalPty();555}556switch (data.cmd) {557case "size":558this.setSize(spark, { rows: data.rows, cols: data.cols });559break;560561case "set_command":562this.setCommand(data.command, data.args);563break;564565case "kill":566// send kill signal567this.killPty();568break;569570case "cwd":571try {572await this.sendCurrentWorkingDirectory(spark);573} catch (err) {574logger.debug(575"WARNING -- issue getting current working directory",576err,577);578// TODO: the terminal protocol doesn't even have a way579// to report that an error occured, so this silently580// fails. It's just for displaying the current working581// directory, so not too critical.582}583break;584585case "boot": {586this.bootAllOtherClients(spark);587break;588}589}590};591592private handleClientConnection = (spark: Spark) => {593logger.debug(594this.path,595`new client connection from ${spark.address.ip} -- ${spark.id}`,596);597598// send current size info599if (this.size != null) {600const { rows, cols } = this.size;601spark.write({ cmd: "size", rows, cols });602}603604spark.write({ cmd: "computeServerId", id: this.computeServerId });605606// send burst info607if (this.truncating) {608spark.write({ cmd: "burst" });609}610611// send history612spark.write(this.history);613614// have history, so do not ignore commands now.615spark.write({ cmd: "no-ignore" });616617spark.on("end", () => {618if (this.state == "closed") return;619delete this.client_sizes[spark.id];620this.resize();621});622623spark.on("data", async (data) => {624if ((this.state as State) == "closed") return;625try {626await this.handleDataFromClient(spark, data);627} catch (err) {628if (this.state != "closed") {629spark.write(`${err}`);630}631}632});633};634635// inform remote pty client of the exact options that are current here.636private initRemotePty = () => {637if (this.remotePty == null) return;638this.remotePty.write({639cmd: "init",640options: this.options,641size: this.getSize(),642});643};644645private handleRemotePtyConnection = (remotePty: Spark) => {646logger.debug(647this.path,648`new pty connection from ${remotePty.address.ip} -- ${remotePty.id}`,649);650if (this.remotePty != null) {651// already an existing remote connection652// Remove listeners and end it. We have to653// remove listeners or calling end will trigger654// the remotePty.on("end",...) below, which messes655// up everything.656this.remotePty.removeAllListeners();657this.remotePty.end();658}659660remotePty.on("end", async () => {661if (this.state == "closed") return;662logger.debug("ending existing remote terminal");663delete this.remotePty;664await this.initLocalPty();665});666667remotePty.on("data", async (data) => {668if ((this.state as State) == "closed") return;669if (typeof data == "string") {670this.handleDataFromTerminal(data);671} else {672if (this.localPty != null) {673// already switched back to local674return;675}676if (typeof data == "object") {677switch (data.cmd) {678case "setComputeServerId":679this.setComputeServerId(data.id);680break;681case "exit": {682this.handleDataFromTerminal(EXIT_MESSAGE);683break;684}685}686}687}688});689690this.remotePty = remotePty;691this.initRemotePty();692this.killLocalPty();693};694}695696697