CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/sync-fs/lib/handle-api-call.ts
Views: 687
1
/* This runs in the project and handles api calls from computer servers.
2
3
It mainly handles a persistent connection from the file system container,
4
and supports functions including moving files, syncing, executing code,
5
etc.
6
*/
7
8
import { fromCompressedJSON } from "./compressed-json";
9
import getLogger from "@cocalc/backend/logger";
10
import type { FilesystemState } from "./types";
11
import { metadataFile, mtimeDirTree, remove, writeFileLz4 } from "./util";
12
import { join } from "path";
13
import { mkdir, rename, readFile, writeFile } from "fs/promises";
14
import type { MesgSyncFSOptions } from "@cocalc/comm/websocket/types";
15
import { sha1 } from "@cocalc/backend/sha1";
16
//import type { Spark } from "primus";
17
type Spark = any; // for now
18
19
const log = getLogger("sync-fs:handle-api-call").debug;
20
21
const CLOCK_THRESH_MS = 5 * 1000;
22
23
export default async function handleApiCall({
24
computeStateJson,
25
exclude = [],
26
compute_server_id,
27
now, // time in ms since epoch on compute server
28
}: MesgSyncFSOptions) {
29
log("handleApiCall");
30
31
let computeState;
32
if (computeStateJson) {
33
computeState = fromCompressedJSON(await readFile(computeStateJson));
34
} else {
35
throw Error("not implemented");
36
}
37
if (!process.env.HOME) {
38
throw Error("HOME must be defined");
39
}
40
// This can also happen if the network connection breaks for a bit, e.g., when
41
// restarting the project.
42
const clockSkew = Math.abs((now ?? 0) - Date.now());
43
if (clockSkew >= CLOCK_THRESH_MS) {
44
throw Error(
45
`Compute server sync time is off by ${clockSkew}ms, which exceeds the ${CLOCK_THRESH_MS}ms threshhold. Try again and possibly double check your clock settings.`,
46
);
47
}
48
49
const meta = await metadataFile({ path: process.env.HOME, exclude });
50
const projectState = await getProjectState(meta, exclude);
51
52
const {
53
removeFromCompute,
54
removeFromProject,
55
copyFromProject,
56
copyFromCompute,
57
} = getOperations({ computeState, projectState });
58
59
if (removeFromProject.length > 0) {
60
await remove(removeFromProject, process.env.HOME);
61
}
62
63
await writeMetadataFile({ compute_server_id, meta });
64
65
return {
66
removeFromCompute,
67
copyFromCompute,
68
copyFromProjectTar:
69
copyFromProject.length > 0
70
? await createCopyFromProjectTar(copyFromProject, compute_server_id)
71
: undefined,
72
};
73
}
74
75
let lastMetadataFileHash: { [compute_server_id: number]: string } = {};
76
async function writeMetadataFile({ compute_server_id, meta }) {
77
let start = Date.now();
78
const hash = sha1(meta);
79
const path = join(getStateDir(compute_server_id), "meta");
80
const tmp = join(path, ".meta.lz4");
81
const target = join(path, "meta.lz4");
82
if (hash == lastMetadataFileHash[compute_server_id]) {
83
log(
84
`writeMetadataFile: not writing "${target}" since hash didn't change. Hash time =`,
85
Date.now() - start,
86
"ms",
87
);
88
return;
89
}
90
lastMetadataFileHash[compute_server_id] = hash;
91
await mkdir(path, { recursive: true });
92
await writeFileLz4(tmp, meta);
93
// ensure this is atomic
94
await rename(tmp, target);
95
log(
96
`writeMetadataFile: wrote out "${target}" atomically. Total time =`,
97
Date.now() - start,
98
"ms",
99
);
100
}
101
102
function getStateDir(compute_server_id): string {
103
if (!process.env.HOME) {
104
throw Error("HOME env var must be set");
105
}
106
return join(process.env.HOME, ".compute-servers", `${compute_server_id}`);
107
}
108
109
// This is the path to a file with the names
110
// of the files to copy via tar, separated by NULL.
111
// **This is not an actual tarball.**
112
// We use NULL instead of newline so that filenames
113
// with newlines in them work, and this should be processed
114
// with tar using the --null option.
115
async function createCopyFromProjectTar(
116
paths: string[],
117
compute_server_id: number,
118
): Promise<string> {
119
if (!process.env.HOME) {
120
throw Error("HOME must be defined");
121
}
122
const stateDir = getStateDir(compute_server_id);
123
await mkdir(stateDir, { recursive: true });
124
const target = join(stateDir, "copy-from-project");
125
await writeFile(target, paths.join("\0"));
126
const i = target.lastIndexOf(stateDir);
127
return target.slice(i);
128
}
129
130
// we have to use separate cache/state for each exclude list, unfortunately. in practice,
131
// they should often be similar or the same, because people will rarely customize this (?).
132
let lastProjectState: { [exclude: string]: FilesystemState } = {};
133
async function getProjectState(meta, exclude): Promise<FilesystemState> {
134
const now = Math.floor(Date.now() / 1000); // in integers seconds
135
const key = JSON.stringify(exclude);
136
const lastState = lastProjectState[key] ?? {};
137
138
if (!process.env.HOME) {
139
throw Error("HOME must be defined");
140
}
141
const projectState = await mtimeDirTree({
142
path: process.env.HOME,
143
exclude,
144
metadataFile: meta,
145
});
146
147
// figure out what got deleted in the project
148
for (const path in lastState) {
149
if (projectState[path] === undefined) {
150
// it is currently deleted. If it was already marked deleted at a point in time,
151
// just stay with that time. If now, consider it deleted now (negative sign means "deleted").
152
// NOTE: it's impossible to know exactly when path was actually deleted.
153
projectState[path] = lastState[path] < 0 ? lastState[path] : -now;
154
}
155
}
156
157
lastProjectState[key] = projectState;
158
159
// // this is for DEBUGING ONLY!
160
// await writeFile(
161
// join(process.env.HOME, ".compute-servers", "project-state.json"),
162
// JSON.stringify(projectState),
163
// );
164
165
return projectState;
166
}
167
168
function getOperations({ computeState, projectState }): {
169
removeFromCompute: string[];
170
removeFromProject: string[];
171
copyFromProject: string[];
172
copyFromCompute: string[];
173
} {
174
const removeFromCompute: string[] = [];
175
const removeFromProject: string[] = [];
176
const copyFromProject: string[] = [];
177
const copyFromCompute: string[] = [];
178
179
const handlePath = (path) => {
180
const projectMtime = projectState[path];
181
const computeMtime = computeState[path];
182
if (projectMtime == computeMtime) {
183
// definitely nothing to do
184
return;
185
}
186
if (projectMtime !== undefined && computeMtime === undefined) {
187
// file is NOT stored on compute server, so no need to worry about it
188
return;
189
}
190
// something must be done! What:
191
if (projectMtime === undefined) {
192
if (computeMtime < 0) {
193
// it's supposed to be deleted and it's gone, so nothing to do.
194
return;
195
}
196
// it's definitely NOT on the project but it is on the compute server, so we need it.
197
copyFromCompute.push(path);
198
return;
199
}
200
201
// now both projectMtime and computeMtime are defined and different
202
// We use >= instead of > so that ties are broken in favor of the project,
203
// which is an arbitrary but consistent choice.
204
if (Math.abs(projectMtime) >= Math.abs(computeMtime)) {
205
// project version is newer
206
if (projectMtime > 0) {
207
// it was edited later on the project
208
copyFromProject.push(path);
209
} else {
210
// it was deleted from the project, so now need to delete on compute
211
removeFromCompute.push(path);
212
}
213
return;
214
} else {
215
// compute version is newer
216
if (computeMtime > 0) {
217
// edited on compute later
218
copyFromCompute.push(path);
219
} else {
220
// deleted on compute, so now also need to delete in project
221
removeFromProject.push(path);
222
}
223
}
224
};
225
226
for (const path in projectState) {
227
handlePath(path);
228
}
229
for (const path in computeState) {
230
if (projectState[path] === undefined) {
231
// NOT already handled above
232
handlePath(path);
233
}
234
}
235
236
return {
237
removeFromCompute,
238
removeFromProject,
239
copyFromProject,
240
copyFromCompute,
241
};
242
}
243
244
const sparks: { [compute_server_id: number]: Spark } = {};
245
246
export async function handleComputeServerSyncRegister(
247
{ compute_server_id },
248
spark,
249
) {
250
log("handleComputeServerSyncRegister -- registering ", {
251
compute_server_id,
252
spark_id: spark.id,
253
});
254
// save the connection so we can send a sync_request message later, and also handle the api
255
// calls for copying files back and forth, etc.
256
sparks[compute_server_id] = spark;
257
const remove = () => {
258
if (sparks[compute_server_id]?.id == spark.id) {
259
log(
260
"handleComputeServerSyncRegister: removing compute server connection due to disconnect -- ",
261
{ compute_server_id, spark_id: spark.id },
262
);
263
// the spark connection currently cached is this
264
// one, so we remove it. It could be replaced by
265
// a new one, in which case we better not remove it.
266
delete sparks[compute_server_id];
267
}
268
};
269
spark.on("end", remove);
270
spark.on("close", remove);
271
}
272
273
// User has requested that compute_server_id
274
// do sync right now via the browser websocket api.
275
export async function handleSyncFsRequestCall({ compute_server_id }) {
276
const spark = sparks[compute_server_id];
277
if (spark != null) {
278
log("handleSyncFsRequestCall: success");
279
spark.write({ event: "compute_server_sync_request" });
280
return { status: "ok" };
281
} else {
282
log("handleSyncFsRequestCall: fail");
283
throw Error(`no connection to compute server -- please start it or restart it`);
284
//throw Error("no connection to compute server");
285
}
286
}
287
288
function callComputeServerApi(
289
compute_server_id,
290
mesg,
291
timeoutMs = 30000,
292
compute = false,
293
): Promise<any> {
294
const spark = compute
295
? computeSparks[compute_server_id]
296
: sparks[compute_server_id];
297
if (spark == null) {
298
log("callComputeServerApi: no connection");
299
throw Error(
300
`no connection to compute server -- please start or restart it`,
301
);
302
}
303
return new Promise((resolve, reject) => {
304
const id = Math.random();
305
spark.write({ ...mesg, id });
306
307
const handler = (data) => {
308
if (data?.id == id) {
309
spark.removeListener("data", handler);
310
clearTimeout(timeout);
311
if (data.error) {
312
reject(Error(data.error));
313
} else {
314
resolve(data.resp);
315
}
316
}
317
};
318
spark.addListener("data", handler);
319
320
const timeout = setTimeout(() => {
321
spark.removeListener("data", handler);
322
reject(Error(`timeout -- ${timeoutMs}ms`));
323
}, timeoutMs);
324
});
325
}
326
327
export async function handleCopy(opts: {
328
event: string;
329
compute_server_id: number;
330
paths: string[];
331
dest?: string;
332
timeout?: number;
333
}) {
334
log("handleCopy: ", opts);
335
const mesg = { event: opts.event, paths: opts.paths, dest: opts.dest };
336
return await callComputeServerApi(
337
opts.compute_server_id,
338
mesg,
339
(opts.timeout ?? 30) * 1000,
340
);
341
}
342
343
export async function handleSyncFsGetListing({
344
path,
345
hidden,
346
compute_server_id,
347
}) {
348
log("handleSyncFsGetListing: ", { path, hidden, compute_server_id });
349
const mesg = { event: "listing", path, hidden };
350
return await callComputeServerApi(compute_server_id, mesg, 15000);
351
}
352
353
export async function handleComputeServerFilesystemExec(opts) {
354
const { compute_server_id } = opts;
355
log("handleComputeServerFilesystemExec: ", opts);
356
const mesg = { event: "exec", opts };
357
return await callComputeServerApi(
358
compute_server_id,
359
mesg,
360
(opts.timeout ?? 10) * 1000,
361
);
362
}
363
364
export async function handleComputeServerDeleteFiles({
365
compute_server_id,
366
paths,
367
}) {
368
log("handleComputeServerDeleteFiles: ", { compute_server_id, paths });
369
const mesg = { event: "delete_files", paths };
370
return await callComputeServerApi(compute_server_id, mesg, 60 * 1000);
371
}
372
373
export async function handleComputeServerRenameFile({
374
compute_server_id,
375
src,
376
dest,
377
}) {
378
log("handleComputeServerRenameFile: ", { compute_server_id, src, dest });
379
const mesg = { event: "rename_file", src, dest };
380
return await callComputeServerApi(compute_server_id, mesg, 60 * 1000);
381
}
382
383
export async function handleComputeServerMoveFiles({
384
compute_server_id,
385
paths,
386
dest,
387
}) {
388
log("handleComputeServerMoveFiles: ", { compute_server_id, paths, dest });
389
const mesg = { event: "move_files", paths, dest };
390
return await callComputeServerApi(compute_server_id, mesg, 60 * 1000);
391
}
392
393
/*
394
Similar but for compute instead of filesystem:
395
*/
396
397
const computeSparks: { [compute_server_id: number]: Spark } = {};
398
399
export async function handleComputeServerComputeRegister(
400
{ compute_server_id },
401
spark,
402
) {
403
log("handleComputeServerComputeRegister -- registering ", {
404
compute_server_id,
405
spark_id: spark.id,
406
});
407
// save the connection so we can send a sync_request message later, and also handle the api
408
// calls for copying files back and forth, etc.
409
computeSparks[compute_server_id] = spark;
410
const remove = () => {
411
if (computeSparks[compute_server_id]?.id == spark.id) {
412
log(
413
"handleComputeServerComputeRegister: removing compute server connection due to disconnect -- ",
414
{ compute_server_id, spark_id: spark.id },
415
);
416
// the spark connection currently cached is this
417
// one, so we remove it. It could be replaced by
418
// a new one, in which case we better not remove it.
419
delete computeSparks[compute_server_id];
420
}
421
};
422
spark.on("end", remove);
423
spark.on("close", remove);
424
}
425
426
export async function handleComputeServerComputeExec(opts) {
427
const { compute_server_id } = opts;
428
log("handleComputeServerComputeExec: ", opts);
429
const mesg = { event: "exec", opts };
430
return await callComputeServerApi(
431
compute_server_id,
432
mesg,
433
(opts.timeout ?? 10) * 1000,
434
true,
435
);
436
}
437
438