Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/project/project-status/server.ts
5751 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Project status server, doing the heavy lifting of telling the client
8
what's going on in the project, especially if there is a problem.
9
10
Under the hood, it subscribes to the ProjectInfoServer, which updates
11
various statistics at a high-frequency. Therefore, this here filters
12
that information to a low-frequency low-volume stream of important
13
status updates.
14
15
Hence in particular, information like cpu, memory and disk are smoothed out and throttled.
16
*/
17
18
import { delay } from "awaiting";
19
import { EventEmitter } from "events";
20
import { isEqual } from "lodash";
21
22
import {
23
ALERT_DISK_FREE,
24
ALERT_HIGH_PCT /* ALERT_MEDIUM_PCT */,
25
RAISE_ALERT_AFTER_MIN,
26
STATUS_UPDATES_INTERVAL_S,
27
} from "@cocalc/comm/project-status/const";
28
import {
29
Alert,
30
AlertType,
31
ComponentName,
32
ProjectStatus,
33
} from "@cocalc/comm/project-status/types";
34
import { cgroup_stats } from "@cocalc/comm/project-status/utils";
35
import { createPublisher } from "@cocalc/conat/project/project-status";
36
import { compute_server_id, project_id } from "@cocalc/project/data";
37
import { getLogger } from "@cocalc/project/logger";
38
import { how_long_ago_m, round1 } from "@cocalc/util/misc";
39
import { version as smcVersion } from "@cocalc/util/smc-version";
40
import { ProjectInfo } from "@cocalc/util/types/project-info/types";
41
import { get_ProjectInfoServer, ProjectInfoServer } from "../project-info";
42
43
// TODO: only return the "next" value, if it is significantly different from "prev"
44
//function threshold(prev?: number, next?: number): number | undefined {
45
// return next;
46
//}
47
48
const logger = getLogger("project-status:server");
49
50
function quantize(val, order) {
51
const q = Math.round(Math.pow(10, order));
52
return Math.round(q * Math.ceil(val / q));
53
}
54
55
// tracks, when for the first time we saw an elevated value
56
// we clear it if we're below a threshold (in the clear)
57
interface Elevated {
58
cpu: number | null; // timestamps
59
memory: number | null; // timestamps
60
disk: number | null; // timestamps
61
}
62
63
export class ProjectStatusServer extends EventEmitter {
64
private readonly dbg: Function;
65
private running = false;
66
private readonly testing: boolean;
67
private readonly project_info: ProjectInfoServer;
68
private info?: ProjectInfo;
69
private status?: ProjectStatus;
70
private last?: ProjectStatus;
71
private elevated: Elevated = {
72
cpu: null,
73
disk: null,
74
memory: null,
75
};
76
private elevated_cpu_procs: { [pid: string]: number } = {};
77
private disk_mb?: number;
78
private cpu_pct?: number;
79
private cpu_tot?: number; // total time in seconds
80
private mem_pct?: number;
81
private mem_rss?: number;
82
private mem_tot?: number;
83
private components: { [name in ComponentName]?: number | undefined } = {};
84
private lastEmit: number = 0; // timestamp, when status was emitted last
85
86
constructor(testing = false) {
87
super();
88
this.testing = testing;
89
this.dbg = (...msg) => logger.debug(...msg);
90
this.project_info = get_ProjectInfoServer();
91
}
92
93
private async init(): Promise<void> {
94
this.project_info.start();
95
this.project_info.on("info", (info) => {
96
//this.dbg(`got info timestamp=${info.timestamp}`);
97
this.info = info;
98
this.update();
99
this.emitInfo();
100
});
101
}
102
103
// checks if there the current state (after update()) should be emitted
104
private emitInfo(): void {
105
if (this.lastEmit === 0) {
106
this.dbg("emitInfo[last=0]", this.status);
107
this.doEmit();
108
return;
109
}
110
111
// if alert changed, emit immediately
112
if (!isEqual(this.last?.alerts, this.status?.alerts)) {
113
this.dbg("emitInfo[alert]", this.status);
114
this.doEmit();
115
} else {
116
// deep comparison check via lodash and we rate limit
117
const recent =
118
this.lastEmit + 1000 * STATUS_UPDATES_INTERVAL_S > Date.now();
119
const changed = !isEqual(this.status, this.last);
120
if (!recent && changed) {
121
this.dbg("emitInfo[changed]", this.status);
122
this.doEmit();
123
}
124
}
125
}
126
127
private doEmit(): void {
128
this.emit("status", this.status);
129
this.lastEmit = Date.now();
130
}
131
132
public setComponentAlert(name: ComponentName) {
133
// we set this to the time when we first got notified about the problem
134
if (this.components[name] == null) {
135
this.components[name] = Date.now();
136
}
137
}
138
139
public clearComponentAlert(name: ComponentName) {
140
delete this.components[name];
141
}
142
143
// this derives elevated levels from the project info object
144
private update_alerts() {
145
if (this.info == null) return;
146
const du = this.info.disk_usage.project;
147
const ts = this.info.timestamp;
148
149
const do_alert = (type: AlertType, is_bad: boolean) => {
150
if (is_bad) {
151
// if it isn't fine, set it once to the timestamp (and let it age)
152
if (this.elevated[type] == null) {
153
this.elevated[type] = ts;
154
}
155
} else {
156
// unless it's fine again, then remove the timestamp
157
this.elevated[type] = null;
158
}
159
};
160
161
do_alert("disk", du.free < ALERT_DISK_FREE);
162
this.disk_mb = du.usage;
163
164
const cg = this.info.cgroup;
165
const du_tmp = this.info.disk_usage.tmp;
166
if (cg != null) {
167
// we round/quantisize values to reduce the number of updates
168
// and also send less data with each update
169
const cgStats = cgroup_stats(cg, du_tmp);
170
this.mem_pct = Math.round(cgStats.mem_pct);
171
this.cpu_pct = Math.round(cgStats.cpu_pct);
172
this.cpu_tot = Math.round(cgStats.cpu_tot);
173
this.mem_tot = quantize(cgStats.mem_tot, 1);
174
this.mem_rss = quantize(cgStats.mem_rss, 1);
175
do_alert("memory", cgStats.mem_pct > ALERT_HIGH_PCT);
176
do_alert("cpu-cgroup", cgStats.cpu_pct > ALERT_HIGH_PCT);
177
}
178
}
179
180
private alert_cpu_processes(): string[] {
181
const pids: string[] = [];
182
if (this.info == null) return [];
183
const ts = this.info.timestamp;
184
const ecp = this.elevated_cpu_procs;
185
// we have to check if there aren't any processes left which no longer exist
186
const leftovers = new Set(Object.keys(ecp));
187
// bookkeeping of elevated process PIDS
188
for (const [pid, proc] of Object.entries(this.info.processes ?? {})) {
189
leftovers.delete(pid);
190
if (proc.cpu.pct > ALERT_HIGH_PCT) {
191
if (ecp[pid] == null) {
192
ecp[pid] = ts;
193
}
194
} else {
195
delete ecp[pid];
196
}
197
}
198
for (const pid of leftovers) {
199
delete ecp[pid];
200
}
201
// to actually fire alert when necessary
202
for (const [pid, ts] of Object.entries(ecp)) {
203
if (ts != null && how_long_ago_m(ts) > RAISE_ALERT_AFTER_MIN) {
204
pids.push(pid);
205
}
206
}
207
pids.sort(); // to make this stable across iterations
208
//this.dbg("alert_cpu_processes", pids, ecp);
209
return pids;
210
}
211
212
// update alert levels and set alert states if they persist to be active
213
private alerts(): Alert[] {
214
this.update_alerts();
215
const alerts: Alert[] = [];
216
const alert_keys: AlertType[] = ["cpu-cgroup", "disk", "memory"];
217
for (const k of alert_keys) {
218
const ts = this.elevated[k];
219
if (ts != null && how_long_ago_m(ts) > RAISE_ALERT_AFTER_MIN) {
220
alerts.push({ type: k } as Alert);
221
}
222
}
223
const pids: string[] = this.alert_cpu_processes();
224
if (pids.length > 0) alerts.push({ type: "cpu-process", pids });
225
226
const componentNames: ComponentName[] = [];
227
for (const [k, ts] of Object.entries(this.components)) {
228
if (ts == null) continue;
229
// we alert without a delay
230
componentNames.push(k as ComponentName);
231
}
232
// only send any alert if there is actually a problem!
233
if (componentNames.length > 0) {
234
alerts.push({ type: "component", names: componentNames });
235
}
236
return alerts;
237
}
238
239
private fake_data(): ProjectStatus["usage"] {
240
const lastUsage = this.last?.["usage"];
241
242
const next = (key, max) => {
243
const last = lastUsage?.[key] ?? max / 2;
244
const dx = max / 50;
245
const val = last + dx * Math.random() - dx / 2;
246
return Math.round(Math.min(max, Math.max(0, val)));
247
};
248
249
const mem_tot = 3000;
250
const mem_pct = next("mem_pct", 100);
251
const mem_rss = Math.round((mem_tot * mem_pct) / 100);
252
const cpu_tot = round1((lastUsage?.["cpu_tot"] ?? 0) + Math.random() / 10);
253
254
return {
255
disk_mb: next("disk", 3000),
256
mem_tot,
257
mem_pct,
258
cpu_pct: next("cpu_pct", 100),
259
cpu_tot,
260
mem_rss,
261
};
262
}
263
264
// this function takes the "info" we have (+ more maybe?)
265
// and derives various states from it.
266
// It shouldn't really matter how often it is being called,
267
// but still only emit new objects if it is either really necessary (new alert)
268
// or after some time. This must be a low-frequency and low-volume stream of data.
269
private update(): void {
270
this.last = this.status;
271
272
// alerts must come first, it updates usage status fields
273
const alerts = this.alerts();
274
275
// set this to true if you're developing (otherwise you don't get any data)
276
const fake_data = false;
277
278
// collect status fields in usage object
279
const usage = fake_data
280
? this.fake_data()
281
: {
282
disk_mb: this.disk_mb,
283
mem_pct: this.mem_pct,
284
cpu_pct: this.cpu_pct,
285
cpu_tot: this.cpu_tot,
286
mem_rss: this.mem_rss,
287
mem_tot: this.mem_tot,
288
};
289
290
this.status = { alerts, usage, version: smcVersion };
291
}
292
293
private async get_status(): Promise<ProjectStatus | undefined> {
294
this.update();
295
return this.status;
296
}
297
298
public stop(): void {
299
this.running = false;
300
}
301
302
public async start(): Promise<void> {
303
if (!this.running) {
304
await this._start();
305
}
306
}
307
308
private async _start(): Promise<void> {
309
this.dbg("start");
310
if (this.running) {
311
throw Error("Cannot start ProjectStatusServer twice");
312
}
313
this.running = true;
314
await this.init();
315
316
const status = await this.get_status();
317
this.emit("status", status);
318
319
while (this.testing) {
320
await delay(5000);
321
const status = await this.get_status();
322
this.emit("status", status);
323
}
324
}
325
}
326
327
// singleton, we instantiate it when we need it
328
let status: ProjectStatusServer | undefined = undefined;
329
330
export function init() {
331
logger.debug("initializing project status server, and enabling publishing");
332
if (status == null) {
333
status = new ProjectStatusServer();
334
}
335
createPublisher({
336
projectStatusServer: status,
337
compute_server_id,
338
project_id,
339
});
340
status.start();
341
}
342
343
// testing: $ ts-node server.ts
344
if (require.main === module) {
345
const pss = new ProjectStatusServer(true);
346
pss.start();
347
let cnt = 0;
348
pss.on("status", (status) => {
349
console.log(JSON.stringify(status, null, 2));
350
cnt += 1;
351
if (cnt >= 2) process.exit();
352
});
353
}
354
355