Path: blob/master/src/packages/project/project-status/server.ts
5751 views
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6Project status server, doing the heavy lifting of telling the client7what's going on in the project, especially if there is a problem.89Under the hood, it subscribes to the ProjectInfoServer, which updates10various statistics at a high-frequency. Therefore, this here filters11that information to a low-frequency low-volume stream of important12status updates.1314Hence in particular, information like cpu, memory and disk are smoothed out and throttled.15*/1617import { delay } from "awaiting";18import { EventEmitter } from "events";19import { isEqual } from "lodash";2021import {22ALERT_DISK_FREE,23ALERT_HIGH_PCT /* ALERT_MEDIUM_PCT */,24RAISE_ALERT_AFTER_MIN,25STATUS_UPDATES_INTERVAL_S,26} from "@cocalc/comm/project-status/const";27import {28Alert,29AlertType,30ComponentName,31ProjectStatus,32} from "@cocalc/comm/project-status/types";33import { cgroup_stats } from "@cocalc/comm/project-status/utils";34import { createPublisher } from "@cocalc/conat/project/project-status";35import { compute_server_id, project_id } from "@cocalc/project/data";36import { getLogger } from "@cocalc/project/logger";37import { how_long_ago_m, round1 } from "@cocalc/util/misc";38import { version as smcVersion } from "@cocalc/util/smc-version";39import { ProjectInfo } from "@cocalc/util/types/project-info/types";40import { get_ProjectInfoServer, ProjectInfoServer } from "../project-info";4142// TODO: only return the "next" value, if it is significantly different from "prev"43//function threshold(prev?: number, next?: number): number | undefined {44// return next;45//}4647const logger = getLogger("project-status:server");4849function quantize(val, order) {50const q = Math.round(Math.pow(10, order));51return Math.round(q * Math.ceil(val / q));52}5354// tracks, when for the first time we saw an elevated value55// we clear it if we're below a threshold (in the clear)56interface Elevated {57cpu: number | null; // timestamps58memory: number | null; // timestamps59disk: number | null; // timestamps60}6162export class ProjectStatusServer extends EventEmitter {63private readonly dbg: Function;64private running = false;65private readonly testing: boolean;66private readonly project_info: ProjectInfoServer;67private info?: ProjectInfo;68private status?: ProjectStatus;69private last?: ProjectStatus;70private elevated: Elevated = {71cpu: null,72disk: null,73memory: null,74};75private elevated_cpu_procs: { [pid: string]: number } = {};76private disk_mb?: number;77private cpu_pct?: number;78private cpu_tot?: number; // total time in seconds79private mem_pct?: number;80private mem_rss?: number;81private mem_tot?: number;82private components: { [name in ComponentName]?: number | undefined } = {};83private lastEmit: number = 0; // timestamp, when status was emitted last8485constructor(testing = false) {86super();87this.testing = testing;88this.dbg = (...msg) => logger.debug(...msg);89this.project_info = get_ProjectInfoServer();90}9192private async init(): Promise<void> {93this.project_info.start();94this.project_info.on("info", (info) => {95//this.dbg(`got info timestamp=${info.timestamp}`);96this.info = info;97this.update();98this.emitInfo();99});100}101102// checks if there the current state (after update()) should be emitted103private emitInfo(): void {104if (this.lastEmit === 0) {105this.dbg("emitInfo[last=0]", this.status);106this.doEmit();107return;108}109110// if alert changed, emit immediately111if (!isEqual(this.last?.alerts, this.status?.alerts)) {112this.dbg("emitInfo[alert]", this.status);113this.doEmit();114} else {115// deep comparison check via lodash and we rate limit116const recent =117this.lastEmit + 1000 * STATUS_UPDATES_INTERVAL_S > Date.now();118const changed = !isEqual(this.status, this.last);119if (!recent && changed) {120this.dbg("emitInfo[changed]", this.status);121this.doEmit();122}123}124}125126private doEmit(): void {127this.emit("status", this.status);128this.lastEmit = Date.now();129}130131public setComponentAlert(name: ComponentName) {132// we set this to the time when we first got notified about the problem133if (this.components[name] == null) {134this.components[name] = Date.now();135}136}137138public clearComponentAlert(name: ComponentName) {139delete this.components[name];140}141142// this derives elevated levels from the project info object143private update_alerts() {144if (this.info == null) return;145const du = this.info.disk_usage.project;146const ts = this.info.timestamp;147148const do_alert = (type: AlertType, is_bad: boolean) => {149if (is_bad) {150// if it isn't fine, set it once to the timestamp (and let it age)151if (this.elevated[type] == null) {152this.elevated[type] = ts;153}154} else {155// unless it's fine again, then remove the timestamp156this.elevated[type] = null;157}158};159160do_alert("disk", du.free < ALERT_DISK_FREE);161this.disk_mb = du.usage;162163const cg = this.info.cgroup;164const du_tmp = this.info.disk_usage.tmp;165if (cg != null) {166// we round/quantisize values to reduce the number of updates167// and also send less data with each update168const cgStats = cgroup_stats(cg, du_tmp);169this.mem_pct = Math.round(cgStats.mem_pct);170this.cpu_pct = Math.round(cgStats.cpu_pct);171this.cpu_tot = Math.round(cgStats.cpu_tot);172this.mem_tot = quantize(cgStats.mem_tot, 1);173this.mem_rss = quantize(cgStats.mem_rss, 1);174do_alert("memory", cgStats.mem_pct > ALERT_HIGH_PCT);175do_alert("cpu-cgroup", cgStats.cpu_pct > ALERT_HIGH_PCT);176}177}178179private alert_cpu_processes(): string[] {180const pids: string[] = [];181if (this.info == null) return [];182const ts = this.info.timestamp;183const ecp = this.elevated_cpu_procs;184// we have to check if there aren't any processes left which no longer exist185const leftovers = new Set(Object.keys(ecp));186// bookkeeping of elevated process PIDS187for (const [pid, proc] of Object.entries(this.info.processes ?? {})) {188leftovers.delete(pid);189if (proc.cpu.pct > ALERT_HIGH_PCT) {190if (ecp[pid] == null) {191ecp[pid] = ts;192}193} else {194delete ecp[pid];195}196}197for (const pid of leftovers) {198delete ecp[pid];199}200// to actually fire alert when necessary201for (const [pid, ts] of Object.entries(ecp)) {202if (ts != null && how_long_ago_m(ts) > RAISE_ALERT_AFTER_MIN) {203pids.push(pid);204}205}206pids.sort(); // to make this stable across iterations207//this.dbg("alert_cpu_processes", pids, ecp);208return pids;209}210211// update alert levels and set alert states if they persist to be active212private alerts(): Alert[] {213this.update_alerts();214const alerts: Alert[] = [];215const alert_keys: AlertType[] = ["cpu-cgroup", "disk", "memory"];216for (const k of alert_keys) {217const ts = this.elevated[k];218if (ts != null && how_long_ago_m(ts) > RAISE_ALERT_AFTER_MIN) {219alerts.push({ type: k } as Alert);220}221}222const pids: string[] = this.alert_cpu_processes();223if (pids.length > 0) alerts.push({ type: "cpu-process", pids });224225const componentNames: ComponentName[] = [];226for (const [k, ts] of Object.entries(this.components)) {227if (ts == null) continue;228// we alert without a delay229componentNames.push(k as ComponentName);230}231// only send any alert if there is actually a problem!232if (componentNames.length > 0) {233alerts.push({ type: "component", names: componentNames });234}235return alerts;236}237238private fake_data(): ProjectStatus["usage"] {239const lastUsage = this.last?.["usage"];240241const next = (key, max) => {242const last = lastUsage?.[key] ?? max / 2;243const dx = max / 50;244const val = last + dx * Math.random() - dx / 2;245return Math.round(Math.min(max, Math.max(0, val)));246};247248const mem_tot = 3000;249const mem_pct = next("mem_pct", 100);250const mem_rss = Math.round((mem_tot * mem_pct) / 100);251const cpu_tot = round1((lastUsage?.["cpu_tot"] ?? 0) + Math.random() / 10);252253return {254disk_mb: next("disk", 3000),255mem_tot,256mem_pct,257cpu_pct: next("cpu_pct", 100),258cpu_tot,259mem_rss,260};261}262263// this function takes the "info" we have (+ more maybe?)264// and derives various states from it.265// It shouldn't really matter how often it is being called,266// but still only emit new objects if it is either really necessary (new alert)267// or after some time. This must be a low-frequency and low-volume stream of data.268private update(): void {269this.last = this.status;270271// alerts must come first, it updates usage status fields272const alerts = this.alerts();273274// set this to true if you're developing (otherwise you don't get any data)275const fake_data = false;276277// collect status fields in usage object278const usage = fake_data279? this.fake_data()280: {281disk_mb: this.disk_mb,282mem_pct: this.mem_pct,283cpu_pct: this.cpu_pct,284cpu_tot: this.cpu_tot,285mem_rss: this.mem_rss,286mem_tot: this.mem_tot,287};288289this.status = { alerts, usage, version: smcVersion };290}291292private async get_status(): Promise<ProjectStatus | undefined> {293this.update();294return this.status;295}296297public stop(): void {298this.running = false;299}300301public async start(): Promise<void> {302if (!this.running) {303await this._start();304}305}306307private async _start(): Promise<void> {308this.dbg("start");309if (this.running) {310throw Error("Cannot start ProjectStatusServer twice");311}312this.running = true;313await this.init();314315const status = await this.get_status();316this.emit("status", status);317318while (this.testing) {319await delay(5000);320const status = await this.get_status();321this.emit("status", status);322}323}324}325326// singleton, we instantiate it when we need it327let status: ProjectStatusServer | undefined = undefined;328329export function init() {330logger.debug("initializing project status server, and enabling publishing");331if (status == null) {332status = new ProjectStatusServer();333}334createPublisher({335projectStatusServer: status,336compute_server_id,337project_id,338});339status.start();340}341342// testing: $ ts-node server.ts343if (require.main === module) {344const pss = new ProjectStatusServer(true);345pss.start();346let cnt = 0;347pss.on("status", (status) => {348console.log(JSON.stringify(status, null, 2));349cnt += 1;350if (cnt >= 2) process.exit();351});352}353354355