Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/project/project-status/server.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6Project status server, doing the heavy lifting of telling the client7what's going on in the project, especially if there is a problem.89Under the hood, it subscribes to the ProjectInfoServer, which updates10various statistics at a high-frequency. Therefore, this here filters11that information to a low-frequency low-volume stream of important12status updates.1314Hence in particular, information like cpu, memory and disk are smoothed out and throttled.15*/1617import { getLogger } from "@cocalc/project/logger";18import { how_long_ago_m, round1 } from "@cocalc/util/misc";19import { version as smcVersion } from "@cocalc/util/smc-version";20import { delay } from "awaiting";21import { EventEmitter } from "events";22import { isEqual } from "lodash";23import { get_ProjectInfoServer, ProjectInfoServer } from "../project-info";24import { ProjectInfo } from "@cocalc/util/types/project-info/types";25import {26ALERT_DISK_FREE,27ALERT_HIGH_PCT /* ALERT_MEDIUM_PCT */,28RAISE_ALERT_AFTER_MIN,29STATUS_UPDATES_INTERVAL_S,30} from "@cocalc/comm/project-status/const";31import {32Alert,33AlertType,34ComponentName,35ProjectStatus,36} from "@cocalc/comm/project-status/types";37import { cgroup_stats } from "@cocalc/comm/project-status/utils";3839// TODO: only return the "next" value, if it is significantly different from "prev"40//function threshold(prev?: number, next?: number): number | undefined {41// return next;42//}4344const winston = getLogger("ProjectStatusServer");4546function quantize(val, order) {47const q = Math.round(Math.pow(10, order));48return Math.round(q * Math.ceil(val / q));49}5051// tracks, when for the first time we saw an elevated value52// we clear it if we're below a threshold (in the clear)53interface Elevated {54cpu: number | null; // timestamps55memory: number | null; // timestamps56disk: number | null; // timestamps57}5859export class ProjectStatusServer extends EventEmitter {60private readonly dbg: Function;61private running = false;62private readonly testing: boolean;63private readonly project_info: ProjectInfoServer;64private info?: ProjectInfo;65private status?: ProjectStatus;66private last?: ProjectStatus;67private elevated: Elevated = {68cpu: null,69disk: null,70memory: null,71};72private elevated_cpu_procs: { [pid: string]: number } = {};73private disk_mb?: number;74private cpu_pct?: number;75private cpu_tot?: number; // total time in seconds76private mem_pct?: number;77private mem_rss?: number;78private mem_tot?: number;79private components: { [name in ComponentName]?: number | undefined } = {};80private lastEmit: number = 0; // timestamp, when status was emitted last8182constructor(testing = false) {83super();84this.testing = testing;85this.dbg = (...msg) => winston.debug(...msg);86this.project_info = get_ProjectInfoServer();87}8889private async init(): Promise<void> {90this.project_info.start();91this.project_info.on("info", (info) => {92//this.dbg(`got info timestamp=${info.timestamp}`);93this.info = info;94this.update();95this.emitInfo();96});97}9899// checks if there the current state (after update()) should be emitted100private emitInfo(): void {101if (this.lastEmit === 0) {102this.dbg("emitInfo[last=0]", this.status);103this.doEmit();104return;105}106107// if alert changed, emit immediately108if (!isEqual(this.last?.alerts, this.status?.alerts)) {109this.dbg("emitInfo[alert]", this.status);110this.doEmit();111} else {112// deep comparison check via lodash and we rate limit113const recent =114this.lastEmit + 1000 * STATUS_UPDATES_INTERVAL_S > Date.now();115const changed = !isEqual(this.status, this.last);116if (!recent && changed) {117this.dbg("emitInfo[changed]", this.status);118this.doEmit();119}120}121}122123private doEmit(): void {124this.emit("status", this.status);125this.lastEmit = Date.now();126}127128public setComponentAlert(name: ComponentName) {129// we set this to the time when we first got notified about the problem130if (this.components[name] == null) {131this.components[name] = Date.now();132}133}134135public clearComponentAlert(name: ComponentName) {136delete this.components[name];137}138139// this derives elevated levels from the project info object140private update_alerts() {141if (this.info == null) return;142const du = this.info.disk_usage.project;143const ts = this.info.timestamp;144145const do_alert = (type: AlertType, is_bad: boolean) => {146if (is_bad) {147// if it isn't fine, set it once to the timestamp (and let it age)148if (this.elevated[type] == null) {149this.elevated[type] = ts;150}151} else {152// unless it's fine again, then remove the timestamp153this.elevated[type] = null;154}155};156157do_alert("disk", du.free < ALERT_DISK_FREE);158this.disk_mb = du.usage;159160const cg = this.info.cgroup;161const du_tmp = this.info.disk_usage.tmp;162if (cg != null) {163// we round/quantisize values to reduce the number of updates164// and also send less data with each update165const cgStats = cgroup_stats(cg, du_tmp);166this.mem_pct = Math.round(cgStats.mem_pct);167this.cpu_pct = Math.round(cgStats.cpu_pct);168this.cpu_tot = Math.round(cgStats.cpu_tot);169this.mem_tot = quantize(cgStats.mem_tot, 1);170this.mem_rss = quantize(cgStats.mem_rss, 1);171do_alert("memory", cgStats.mem_pct > ALERT_HIGH_PCT);172do_alert("cpu-cgroup", cgStats.cpu_pct > ALERT_HIGH_PCT);173}174}175176private alert_cpu_processes(): string[] {177const pids: string[] = [];178if (this.info == null) return [];179const ts = this.info.timestamp;180const ecp = this.elevated_cpu_procs;181// we have to check if there aren't any processes left which no longer exist182const leftovers = new Set(Object.keys(ecp));183// bookkeeping of elevated process PIDS184for (const [pid, proc] of Object.entries(this.info.processes ?? {})) {185leftovers.delete(pid);186if (proc.cpu.pct > ALERT_HIGH_PCT) {187if (ecp[pid] == null) {188ecp[pid] = ts;189}190} else {191delete ecp[pid];192}193}194for (const pid of leftovers) {195delete ecp[pid];196}197// to actually fire alert when necessary198for (const [pid, ts] of Object.entries(ecp)) {199if (ts != null && how_long_ago_m(ts) > RAISE_ALERT_AFTER_MIN) {200pids.push(pid);201}202}203pids.sort(); // to make this stable across iterations204//this.dbg("alert_cpu_processes", pids, ecp);205return pids;206}207208// update alert levels and set alert states if they persist to be active209private alerts(): Alert[] {210this.update_alerts();211const alerts: Alert[] = [];212const alert_keys: AlertType[] = ["cpu-cgroup", "disk", "memory"];213for (const k of alert_keys) {214const ts = this.elevated[k];215if (ts != null && how_long_ago_m(ts) > RAISE_ALERT_AFTER_MIN) {216alerts.push({ type: k } as Alert);217}218}219const pids: string[] = this.alert_cpu_processes();220if (pids.length > 0) alerts.push({ type: "cpu-process", pids });221222const componentNames: ComponentName[] = [];223for (const [k, ts] of Object.entries(this.components)) {224if (ts == null) continue;225// we alert without a delay226componentNames.push(k as ComponentName);227}228// only send any alert if there is actually a problem!229if (componentNames.length > 0) {230alerts.push({ type: "component", names: componentNames });231}232return alerts;233}234235private fake_data(): ProjectStatus["usage"] {236const lastUsage = this.last?.["usage"];237238const next = (key, max) => {239const last = lastUsage?.[key] ?? max / 2;240const dx = max / 50;241const val = last + dx * Math.random() - dx / 2;242return Math.round(Math.min(max, Math.max(0, val)));243};244245const mem_tot = 3000;246const mem_pct = next("mem_pct", 100);247const mem_rss = Math.round((mem_tot * mem_pct) / 100);248const cpu_tot = round1((lastUsage?.["cpu_tot"] ?? 0) + Math.random() / 10);249250return {251disk_mb: next("disk", 3000),252mem_tot,253mem_pct,254cpu_pct: next("cpu_pct", 100),255cpu_tot,256mem_rss,257};258}259260// this function takes the "info" we have (+ more maybe?)261// and derives various states from it.262// It shouldn't really matter how often it is being called,263// but still only emit new objects if it is either really necessary (new alert)264// or after some time. This must be a low-frequency and low-volume stream of data.265private update(): void {266this.last = this.status;267268// alerts must come first, it updates usage status fields269const alerts = this.alerts();270271// set this to true if you're developing (otherwise you don't get any data)272const fake_data = false;273274// collect status fields in usage object275const usage = fake_data276? this.fake_data()277: {278disk_mb: this.disk_mb,279mem_pct: this.mem_pct,280cpu_pct: this.cpu_pct,281cpu_tot: this.cpu_tot,282mem_rss: this.mem_rss,283mem_tot: this.mem_tot,284};285286this.status = { alerts, usage, version: smcVersion };287}288289private async get_status(): Promise<ProjectStatus | undefined> {290this.update();291return this.status;292}293294public stop(): void {295this.running = false;296}297298public async start(): Promise<void> {299if (this.running) {300this.dbg(301"project-status/server: already running, cannot be started twice",302);303} else {304await this._start();305}306}307308private async _start(): Promise<void> {309this.dbg("start");310if (this.running) {311throw Error("Cannot start ProjectStatusServer twice");312}313this.running = true;314await this.init();315316const status = await this.get_status();317this.emit("status", status);318319while (this.testing) {320await delay(5000);321const status = await this.get_status();322this.emit("status", status);323}324}325}326327// singleton, we instantiate it when we need it328let _status: ProjectStatusServer | undefined = undefined;329330export function get_ProjectStatusServer(): ProjectStatusServer {331if (_status != null) return _status;332_status = new ProjectStatusServer();333return _status;334}335336// testing: $ ts-node server.ts337if (require.main === module) {338const pss = new ProjectStatusServer(true);339pss.start();340let cnt = 0;341pss.on("status", (status) => {342console.log(JSON.stringify(status, null, 2));343cnt += 1;344if (cnt >= 2) process.exit();345});346}347348349