CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/project/project-status/server.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Project status server, doing the heavy lifting of telling the client
8
what's going on in the project, especially if there is a problem.
9
10
Under the hood, it subscribes to the ProjectInfoServer, which updates
11
various statistics at a high-frequency. Therefore, this here filters
12
that information to a low-frequency low-volume stream of important
13
status updates.
14
15
Hence in particular, information like cpu, memory and disk are smoothed out and throttled.
16
*/
17
18
import { getLogger } from "@cocalc/project/logger";
19
import { how_long_ago_m, round1 } from "@cocalc/util/misc";
20
import { version as smcVersion } from "@cocalc/util/smc-version";
21
import { delay } from "awaiting";
22
import { EventEmitter } from "events";
23
import { isEqual } from "lodash";
24
import { get_ProjectInfoServer, ProjectInfoServer } from "../project-info";
25
import { ProjectInfo } from "@cocalc/util/types/project-info/types";
26
import {
27
ALERT_DISK_FREE,
28
ALERT_HIGH_PCT /* ALERT_MEDIUM_PCT */,
29
RAISE_ALERT_AFTER_MIN,
30
STATUS_UPDATES_INTERVAL_S,
31
} from "@cocalc/comm/project-status/const";
32
import {
33
Alert,
34
AlertType,
35
ComponentName,
36
ProjectStatus,
37
} from "@cocalc/comm/project-status/types";
38
import { cgroup_stats } from "@cocalc/comm/project-status/utils";
39
40
// TODO: only return the "next" value, if it is significantly different from "prev"
41
//function threshold(prev?: number, next?: number): number | undefined {
42
// return next;
43
//}
44
45
const winston = getLogger("ProjectStatusServer");
46
47
function quantize(val, order) {
48
const q = Math.round(Math.pow(10, order));
49
return Math.round(q * Math.ceil(val / q));
50
}
51
52
// tracks, when for the first time we saw an elevated value
53
// we clear it if we're below a threshold (in the clear)
54
interface Elevated {
55
cpu: number | null; // timestamps
56
memory: number | null; // timestamps
57
disk: number | null; // timestamps
58
}
59
60
export class ProjectStatusServer extends EventEmitter {
61
private readonly dbg: Function;
62
private running = false;
63
private readonly testing: boolean;
64
private readonly project_info: ProjectInfoServer;
65
private info?: ProjectInfo;
66
private status?: ProjectStatus;
67
private last?: ProjectStatus;
68
private elevated: Elevated = {
69
cpu: null,
70
disk: null,
71
memory: null,
72
};
73
private elevated_cpu_procs: { [pid: string]: number } = {};
74
private disk_mb?: number;
75
private cpu_pct?: number;
76
private cpu_tot?: number; // total time in seconds
77
private mem_pct?: number;
78
private mem_rss?: number;
79
private mem_tot?: number;
80
private components: { [name in ComponentName]?: number | undefined } = {};
81
private lastEmit: number = 0; // timestamp, when status was emitted last
82
83
constructor(testing = false) {
84
super();
85
this.testing = testing;
86
this.dbg = (...msg) => winston.debug(...msg);
87
this.project_info = get_ProjectInfoServer();
88
}
89
90
private async init(): Promise<void> {
91
this.project_info.start();
92
this.project_info.on("info", (info) => {
93
//this.dbg(`got info timestamp=${info.timestamp}`);
94
this.info = info;
95
this.update();
96
this.emitInfo();
97
});
98
}
99
100
// checks if there the current state (after update()) should be emitted
101
private emitInfo(): void {
102
if (this.lastEmit === 0) {
103
this.dbg("emitInfo[last=0]", this.status);
104
this.doEmit();
105
return;
106
}
107
108
// if alert changed, emit immediately
109
if (!isEqual(this.last?.alerts, this.status?.alerts)) {
110
this.dbg("emitInfo[alert]", this.status);
111
this.doEmit();
112
} else {
113
// deep comparison check via lodash and we rate limit
114
const recent =
115
this.lastEmit + 1000 * STATUS_UPDATES_INTERVAL_S > Date.now();
116
const changed = !isEqual(this.status, this.last);
117
if (!recent && changed) {
118
this.dbg("emitInfo[changed]", this.status);
119
this.doEmit();
120
}
121
}
122
}
123
124
private doEmit(): void {
125
this.emit("status", this.status);
126
this.lastEmit = Date.now();
127
}
128
129
public setComponentAlert(name: ComponentName) {
130
// we set this to the time when we first got notified about the problem
131
if (this.components[name] == null) {
132
this.components[name] = Date.now();
133
}
134
}
135
136
public clearComponentAlert(name: ComponentName) {
137
delete this.components[name];
138
}
139
140
// this derives elevated levels from the project info object
141
private update_alerts() {
142
if (this.info == null) return;
143
const du = this.info.disk_usage.project;
144
const ts = this.info.timestamp;
145
146
const do_alert = (type: AlertType, is_bad: boolean) => {
147
if (is_bad) {
148
// if it isn't fine, set it once to the timestamp (and let it age)
149
if (this.elevated[type] == null) {
150
this.elevated[type] = ts;
151
}
152
} else {
153
// unless it's fine again, then remove the timestamp
154
this.elevated[type] = null;
155
}
156
};
157
158
do_alert("disk", du.free < ALERT_DISK_FREE);
159
this.disk_mb = du.usage;
160
161
const cg = this.info.cgroup;
162
const du_tmp = this.info.disk_usage.tmp;
163
if (cg != null) {
164
// we round/quantisize values to reduce the number of updates
165
// and also send less data with each update
166
const cgStats = cgroup_stats(cg, du_tmp);
167
this.mem_pct = Math.round(cgStats.mem_pct);
168
this.cpu_pct = Math.round(cgStats.cpu_pct);
169
this.cpu_tot = Math.round(cgStats.cpu_tot);
170
this.mem_tot = quantize(cgStats.mem_tot, 1);
171
this.mem_rss = quantize(cgStats.mem_rss, 1);
172
do_alert("memory", cgStats.mem_pct > ALERT_HIGH_PCT);
173
do_alert("cpu-cgroup", cgStats.cpu_pct > ALERT_HIGH_PCT);
174
}
175
}
176
177
private alert_cpu_processes(): string[] {
178
const pids: string[] = [];
179
if (this.info == null) return [];
180
const ts = this.info.timestamp;
181
const ecp = this.elevated_cpu_procs;
182
// we have to check if there aren't any processes left which no longer exist
183
const leftovers = new Set(Object.keys(ecp));
184
// bookkeeping of elevated process PIDS
185
for (const [pid, proc] of Object.entries(this.info.processes ?? {})) {
186
leftovers.delete(pid);
187
if (proc.cpu.pct > ALERT_HIGH_PCT) {
188
if (ecp[pid] == null) {
189
ecp[pid] = ts;
190
}
191
} else {
192
delete ecp[pid];
193
}
194
}
195
for (const pid of leftovers) {
196
delete ecp[pid];
197
}
198
// to actually fire alert when necessary
199
for (const [pid, ts] of Object.entries(ecp)) {
200
if (ts != null && how_long_ago_m(ts) > RAISE_ALERT_AFTER_MIN) {
201
pids.push(pid);
202
}
203
}
204
pids.sort(); // to make this stable across iterations
205
//this.dbg("alert_cpu_processes", pids, ecp);
206
return pids;
207
}
208
209
// update alert levels and set alert states if they persist to be active
210
private alerts(): Alert[] {
211
this.update_alerts();
212
const alerts: Alert[] = [];
213
const alert_keys: AlertType[] = ["cpu-cgroup", "disk", "memory"];
214
for (const k of alert_keys) {
215
const ts = this.elevated[k];
216
if (ts != null && how_long_ago_m(ts) > RAISE_ALERT_AFTER_MIN) {
217
alerts.push({ type: k } as Alert);
218
}
219
}
220
const pids: string[] = this.alert_cpu_processes();
221
if (pids.length > 0) alerts.push({ type: "cpu-process", pids });
222
223
const componentNames: ComponentName[] = [];
224
for (const [k, ts] of Object.entries(this.components)) {
225
if (ts == null) continue;
226
// we alert without a delay
227
componentNames.push(k as ComponentName);
228
}
229
// only send any alert if there is actually a problem!
230
if (componentNames.length > 0) {
231
alerts.push({ type: "component", names: componentNames });
232
}
233
return alerts;
234
}
235
236
private fake_data(): ProjectStatus["usage"] {
237
const lastUsage = this.last?.["usage"];
238
239
const next = (key, max) => {
240
const last = lastUsage?.[key] ?? max / 2;
241
const dx = max / 50;
242
const val = last + dx * Math.random() - dx / 2;
243
return Math.round(Math.min(max, Math.max(0, val)));
244
};
245
246
const mem_tot = 3000;
247
const mem_pct = next("mem_pct", 100);
248
const mem_rss = Math.round((mem_tot * mem_pct) / 100);
249
const cpu_tot = round1((lastUsage?.["cpu_tot"] ?? 0) + Math.random() / 10);
250
251
return {
252
disk_mb: next("disk", 3000),
253
mem_tot,
254
mem_pct,
255
cpu_pct: next("cpu_pct", 100),
256
cpu_tot,
257
mem_rss,
258
};
259
}
260
261
// this function takes the "info" we have (+ more maybe?)
262
// and derives various states from it.
263
// It shouldn't really matter how often it is being called,
264
// but still only emit new objects if it is either really necessary (new alert)
265
// or after some time. This must be a low-frequency and low-volume stream of data.
266
private update(): void {
267
this.last = this.status;
268
269
// alerts must come first, it updates usage status fields
270
const alerts = this.alerts();
271
272
// set this to true if you're developing (otherwise you don't get any data)
273
const fake_data = false;
274
275
// collect status fields in usage object
276
const usage = fake_data
277
? this.fake_data()
278
: {
279
disk_mb: this.disk_mb,
280
mem_pct: this.mem_pct,
281
cpu_pct: this.cpu_pct,
282
cpu_tot: this.cpu_tot,
283
mem_rss: this.mem_rss,
284
mem_tot: this.mem_tot,
285
};
286
287
this.status = { alerts, usage, version: smcVersion };
288
}
289
290
private async get_status(): Promise<ProjectStatus | undefined> {
291
this.update();
292
return this.status;
293
}
294
295
public stop(): void {
296
this.running = false;
297
}
298
299
public async start(): Promise<void> {
300
if (this.running) {
301
this.dbg(
302
"project-status/server: already running, cannot be started twice",
303
);
304
} else {
305
await this._start();
306
}
307
}
308
309
private async _start(): Promise<void> {
310
this.dbg("start");
311
if (this.running) {
312
throw Error("Cannot start ProjectStatusServer twice");
313
}
314
this.running = true;
315
await this.init();
316
317
const status = await this.get_status();
318
this.emit("status", status);
319
320
while (this.testing) {
321
await delay(5000);
322
const status = await this.get_status();
323
this.emit("status", status);
324
}
325
}
326
}
327
328
// singleton, we instantiate it when we need it
329
let _status: ProjectStatusServer | undefined = undefined;
330
331
export function get_ProjectStatusServer(): ProjectStatusServer {
332
if (_status != null) return _status;
333
_status = new ProjectStatusServer();
334
return _status;
335
}
336
337
// testing: $ ts-node server.ts
338
if (require.main === module) {
339
const pss = new ProjectStatusServer(true);
340
pss.start();
341
let cnt = 0;
342
pss.on("status", (status) => {
343
console.log(JSON.stringify(status, null, 2));
344
cnt += 1;
345
if (cnt >= 2) process.exit();
346
});
347
}
348
349