CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/sync/listings/index.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
import { delay } from "awaiting";
7
import { once } from "@cocalc/util/async-utils";
8
import { SyncTable, SyncTableState } from "@cocalc/sync/table";
9
import type { TypedMap } from "@cocalc/util/types/typed-map";
10
import {
11
close,
12
merge,
13
path_split,
14
startswith,
15
field_cmp,
16
seconds_ago,
17
} from "@cocalc/util/misc";
18
import type { Listing } from "@cocalc/util/db-schema/listings";
19
import {
20
WATCH_TIMEOUT_MS,
21
MAX_FILES_PER_PATH,
22
} from "@cocalc/util/db-schema/listings";
23
import type { EventEmitter } from "events";
24
import { DirectoryListingEntry } from "@cocalc/util/types";
25
26
// Update directory listing only when file changes stop for at least this long.
27
// This is important since we don't want to fire off dozens of changes per second,
28
// e.g., if a logfile is being updated.
29
const WATCH_DEBOUNCE_MS = parseInt(
30
process.env.COCALC_FS_WATCH_DEBOUNCE_MS ?? "500",
31
);
32
33
// See https://github.com/sagemathinc/cocalc/issues/4623
34
// for one reason to put a slight delay in; basically,
35
// a change could be to delete and then create a file quickly,
36
// and that confuses our file deletion detection. A light delay
37
// is OK given our application. No approach like this can
38
// ever be perfect, of course.
39
const DELAY_ON_CHANGE_MS = 50;
40
41
// Watch directories for which some client has shown interest recently:
42
const INTEREST_THRESH_SECONDS = WATCH_TIMEOUT_MS / 1000;
43
44
// Maximum number of paths to keep in listings tables for this project.
45
// Periodically, info about older paths beyond this number will be purged
46
// from the database. NOTE that synctable.delete is "barely" implemented,
47
// so there may be some issues with this working.
48
import { MAX_PATHS } from "@cocalc/util/db-schema/listings";
49
50
export type ImmutableListing = TypedMap<Listing>;
51
52
export interface Watcher extends EventEmitter {
53
close();
54
}
55
56
interface Options {
57
table: SyncTable;
58
project_id: string;
59
compute_server_id: number;
60
getListing;
61
createWatcher;
62
onDeletePath;
63
existsSync;
64
getLogger;
65
}
66
67
class ListingsTable {
68
private readonly table?: SyncTable; // will be removed by close()
69
private project_id: string;
70
private compute_server_id: number;
71
private watchers: { [path: string]: Watcher } = {};
72
private getListing: (
73
path,
74
hidden?: boolean,
75
) => Promise<DirectoryListingEntry[]>;
76
private createWatcher: (path: string, debounceMs: number) => Watcher;
77
private onDeletePath: (path: string) => Promise<void>;
78
private existsSync: (path: string) => boolean;
79
private log: (...args) => void;
80
81
constructor(opts: Options) {
82
this.log = opts.getLogger("sync:listings").debug;
83
this.log("constructor");
84
this.project_id = opts.project_id;
85
this.compute_server_id = opts.compute_server_id ?? 0;
86
this.table = opts.table;
87
this.getListing = opts.getListing;
88
this.createWatcher = opts.createWatcher;
89
this.onDeletePath = opts.onDeletePath;
90
this.existsSync = opts.existsSync;
91
this.setupWatchers();
92
}
93
94
close = () => {
95
this.log("close");
96
for (const path in this.watchers) {
97
this.stopWatching(path);
98
}
99
close(this);
100
};
101
102
// Start watching any paths that have recent interest (so this is not
103
// in response to a *change* after starting).
104
private setupWatchers = async () => {
105
if (this.table == null) return; // closed
106
if (this.table.get_state() == ("init" as SyncTableState)) {
107
await once(this.table, "state");
108
}
109
if (this.table.get_state() != ("connected" as SyncTableState)) {
110
return; // game over
111
}
112
this.table.get()?.forEach((val) => {
113
const path = val.get("path");
114
if (path == null) return;
115
if (this.watchers[path] == null) return; // already watching -- shouldn't happen
116
const interest = val.get("interest");
117
if (interest != null && interest > seconds_ago(INTEREST_THRESH_SECONDS)) {
118
this.startWatching(path);
119
}
120
});
121
this.table.on("change", this.handleChangeEvent);
122
123
this.removeStaleWatchers();
124
};
125
126
private removeStaleWatchers = async () => {
127
if (this.table == null) return; // closed
128
if (this.table.get_state() == ("connected" as SyncTableState)) {
129
this.table.get()?.forEach((val) => {
130
const path = val.get("path");
131
if (path == null) return;
132
if (this.watchers[path] == null) return;
133
const interest = val.get("interest");
134
if (
135
interest == null ||
136
interest <= seconds_ago(INTEREST_THRESH_SECONDS)
137
) {
138
this.stopWatching(path);
139
}
140
});
141
}
142
143
// Now get rid of any old paths that are no longer relevant
144
// to reduce wasted database space, memory, and bandwidth for
145
// client browsers that are using this project.
146
try {
147
await this.trimOldPaths();
148
} catch (err) {
149
this.log("WARNING, error trimming old paths -- ", err);
150
}
151
152
if (this.table == null) return; // closed
153
if (this.table.get_state() == ("connected" as SyncTableState)) {
154
await delay(1000 * INTEREST_THRESH_SECONDS);
155
if (this.table == null) return; // closed
156
if (this.table.get_state() != ("connected" as SyncTableState)) return;
157
this.removeStaleWatchers();
158
}
159
};
160
161
private isReady = (): boolean => {
162
return !!this.table?.is_ready();
163
};
164
165
private getTable = (): SyncTable => {
166
if (!this.isReady() || this.table == null) {
167
throw Error("table not ready");
168
}
169
return this.table;
170
};
171
172
set = async (obj: Listing) => {
173
this.getTable().set(
174
merge(
175
{
176
project_id: this.project_id,
177
compute_server_id: this.compute_server_id,
178
},
179
obj,
180
),
181
"shallow",
182
);
183
await this.getTable().save();
184
};
185
186
get = (path: string): ImmutableListing | undefined => {
187
path = canonicalPath(path);
188
const x = this.getTable().get(
189
JSON.stringify([this.project_id, path, this.compute_server_id]),
190
);
191
if (x == null) return x;
192
return x as unknown as ImmutableListing;
193
// NOTE: That we have to use JSON.stringify above is an ugly shortcoming
194
// of the get method in @cocalc/sync/table/synctable.ts
195
// that could probably be relatively easily fixed.
196
};
197
198
private handleChangeEvent = (keys: string[]) => {
199
this.log("handleChangeEvent", JSON.stringify(keys));
200
for (const key of keys) {
201
this.handleChange(JSON.parse(key)[1]);
202
}
203
};
204
205
private handleChange = (path: string): void => {
206
this.log("handleChange", path);
207
const cur = this.get(path);
208
if (cur == null) return;
209
let interest: undefined | Date = cur.get("interest");
210
if (interest == null) return;
211
if (interest >= seconds_ago(INTEREST_THRESH_SECONDS)) {
212
// Ensure any possible client clock skew "issue" has no trivial bad impact.
213
const time = new Date();
214
if (interest > time) {
215
interest = time;
216
this.set({ path, interest });
217
}
218
// Make sure we watch this path for updates, since there is genuine current interest.
219
this.ensureWatching(path);
220
}
221
};
222
223
private ensureWatching = async (path: string): Promise<void> => {
224
path = canonicalPath(path);
225
if (this.watchers[path] != null) {
226
// We are already watching this path
227
if (this.get(path)?.get("error")) {
228
this.log("ensureWatching -- removing old watcher due to error", path);
229
this.stopWatching(path);
230
} else {
231
return;
232
}
233
}
234
235
// Fire off computing of directory listing for this path,
236
// and start watching for changes.
237
try {
238
await this.computeListing(path);
239
} catch (err) {
240
this.log(
241
"ensureWatching -- failed to compute listing so not starting watching",
242
err,
243
);
244
return;
245
}
246
try {
247
this.startWatching(path);
248
} catch (err) {
249
this.log("failed to start watching", err);
250
}
251
};
252
253
private computeListing = async (path: string): Promise<void> => {
254
path = canonicalPath(path);
255
const time = new Date();
256
let listing;
257
try {
258
listing = await this.getListing(path, true);
259
if (!this.isReady()) return;
260
} catch (err) {
261
if (!this.isReady()) return;
262
this.set({ path, time, error: `${err}` });
263
throw err;
264
}
265
let missing: number | undefined = undefined;
266
267
const y = this.get(path);
268
const previous_listing = y?.get("listing")?.toJS() as any;
269
let deleted: any = y?.get("deleted")?.toJS() as any;
270
if (previous_listing != null) {
271
// Check to see to what extend change in the listing is due to files
272
// being deleted. Note that in case of a directory with a large
273
// number of files we only know about recent files (since we don't)
274
// store the full listing, so deleting a non-recent file won't get
275
// detected here -- which is fine, since deletion tracking is important
276
// mainly for recently files.
277
const cur = new Set();
278
for (const x of listing) {
279
cur.add(x.name);
280
}
281
for (const x of previous_listing) {
282
if (!cur.has(x.name)) {
283
// x.name is suddenly gone... so deleted
284
if (deleted == null) {
285
deleted = [x.name];
286
} else {
287
if (deleted.indexOf(x.name) == -1) {
288
deleted.push(x.name);
289
}
290
}
291
}
292
}
293
}
294
295
// Shrink listing length
296
if (listing.length > MAX_FILES_PER_PATH) {
297
listing.sort(field_cmp("mtime"));
298
listing.reverse();
299
missing = listing.length - MAX_FILES_PER_PATH;
300
listing = listing.slice(0, MAX_FILES_PER_PATH);
301
}
302
// We want to clear the error, but just clearning it in synctable doesn't
303
// clear to database, so if there is an error, we set it to "" which does
304
// save fine to the database. (TODO: this is just a workaround.)
305
const error = y?.get("error") != null ? "" : undefined;
306
307
this.set({ path, listing, time, missing, deleted, error });
308
};
309
310
private startWatching = (path: string): void => {
311
path = canonicalPath(path);
312
if (this.watchers[path] != null) return;
313
if (process.env.HOME == null) {
314
throw Error("HOME env variable must be defined");
315
}
316
this.watchers[path] = this.createWatcher(path, WATCH_DEBOUNCE_MS);
317
this.watchers[path].on("change", async () => {
318
try {
319
await delay(DELAY_ON_CHANGE_MS);
320
if (!this.isReady()) return;
321
await this.computeListing(path);
322
} catch (err) {
323
this.log(`computeListing("${path}") error: "${err}"`);
324
}
325
});
326
};
327
328
private stopWatching = (path: string): void => {
329
path = canonicalPath(path);
330
const w = this.watchers[path];
331
if (w == null) return;
332
delete this.watchers[path];
333
w.close();
334
};
335
336
private trimOldPaths = async (): Promise<void> => {
337
this.log("trimOldPaths");
338
if (!this.isReady()) return;
339
const table = this.getTable();
340
let num_to_remove = table.size() - MAX_PATHS;
341
this.log("trimOldPaths", num_to_remove);
342
if (num_to_remove <= 0) {
343
// definitely nothing to do
344
return;
345
}
346
347
// Check to see if we can trim some paths. We sort the paths
348
// by "interest" timestamp, and eliminate the oldest ones that are
349
// not *currently* being watched.
350
const paths: { path: string; interest: Date }[] = [];
351
table.get()?.forEach((val) => {
352
const path = val.get("path");
353
if (this.watchers[path] != null) {
354
num_to_remove -= 1;
355
// paths we are watching are not eligible to be removed.
356
return;
357
}
358
const interest = val.get("interest", new Date(0));
359
paths.push({ path, interest });
360
});
361
this.log("trimOldPaths", JSON.stringify(paths));
362
this.log("trimOldPaths", num_to_remove);
363
364
if (num_to_remove <= 0) return;
365
paths.sort(field_cmp("interest"));
366
// Now remove the first num_to_remove paths.
367
for (let i = 0; i < num_to_remove; i++) {
368
this.log("trimOldPaths -- removing", paths[i].path);
369
await this.removePath(paths[i].path);
370
}
371
};
372
373
private removePath = async (path: string): Promise<void> => {
374
if (!this.isReady()) return;
375
this.log("removePath", path);
376
await this.getTable().delete({ project_id: this.project_id, path });
377
};
378
379
// Given a "filename", add it to deleted if there is already a record
380
// for the containing path in the database. (TODO: we may change this
381
// to create the record if it doesn't exist.)
382
setDeleted = async (filename: string): Promise<void> => {
383
this.log("setDeleted:", filename);
384
if (!this.isReady()) {
385
// setDeleted is a convenience, so dropping it in case of a project
386
// with no network is OK.
387
this.log(`setDeleted: skipping since not ready`);
388
return;
389
}
390
if (filename[0] == "/") {
391
// absolute path
392
if (process.env.HOME == null || !startswith(filename, process.env.HOME)) {
393
// can't do anything with this.
394
return;
395
}
396
filename = filename.slice(process.env.HOME.length + 1);
397
}
398
const { head, tail } = path_split(filename);
399
const x = this.get(head);
400
if (x != null) {
401
// TODO/edge case: if x is null we *could* create the path here...
402
let deleted: any = x.get("deleted");
403
if (deleted == null) {
404
deleted = [tail];
405
} else {
406
if (deleted.indexOf(tail) != -1) return;
407
deleted = deleted.toJS();
408
deleted.push(tail);
409
}
410
this.log(`setDeleted: recording "${deleted}" in "${head}"`);
411
await this.set({ path: head, deleted });
412
if (!this.isReady()) return;
413
}
414
415
await this.onDeletePath(filename);
416
};
417
418
// Returns true if definitely known to be deleted.
419
// Returns false if definitely known to not be deleted
420
// Returns null if we don't know for sure, e.g., not in listing table or listings not ready.
421
isDeleted = (filename: string): boolean | null => {
422
if (!this.isReady()) {
423
// in case that listings are not available, return null -- we don't know.
424
return null;
425
}
426
const { head, tail } = path_split(filename);
427
if (head != "" && this.isDeleted(head)) {
428
// recursively check if filename is contained in a
429
// directory tree that go deleted.
430
return true;
431
}
432
const x = this.get(head);
433
if (x == null) {
434
// we don't know.
435
return null;
436
}
437
const deleted = x.get("deleted");
438
if (deleted == null) {
439
// we don't know
440
return null;
441
}
442
// table is available and has deleted info for the directory -- let's see:
443
if (deleted.indexOf(tail) != -1) {
444
// it was explicitly deleted at some point.
445
// It *might* still be deleted. Check on disk now
446
// via a synchronous check.
447
if (this.existsSync(filename)) {
448
// it now exists -- return false but also update the table since
449
// path is no longer deleted
450
this.set({
451
path: head,
452
deleted: deleted.toJS().filter((x) => x != tail),
453
});
454
return false;
455
} else {
456
// definitely explicitly deleted and not back on disk for some reason,
457
return true;
458
}
459
}
460
return false;
461
};
462
}
463
464
let listingsTable: { [compute_server_id: number]: ListingsTable } = {};
465
export function registerListingsTable(opts: Options): void {
466
const { compute_server_id = 0 } = opts;
467
if (listingsTable[compute_server_id] != null) {
468
// There was one sitting around wasting space so clean it up
469
// before making a new one.
470
listingsTable[compute_server_id].close();
471
}
472
listingsTable[compute_server_id] = new ListingsTable(opts);
473
}
474
475
export function getListingsTable(
476
compute_server_id: number = 0,
477
): ListingsTable | undefined {
478
return listingsTable[compute_server_id];
479
}
480
481
// this does a tiny amount to make paths more canonical.
482
function canonicalPath(path: string): string {
483
if (path == "." || path == "~") {
484
return "";
485
}
486
return path;
487
}
488
489