Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/database/postgres/changefeed-query.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6Does the queries to update changefeeds, deduplicating across7both all changefeeds and a small interval of time.8*/910// set to false to completely disable for debugging/testing11const THROTTLE: boolean = true;1213// 10ms when running unit tests, still throttle, but make it quick.14// Otherwise, we default to 250ms, which is enough to be massively15// useful, but also not noticed by user.16let THROTTLE_MS: number = process.env.SMC_TEST ? 10 : 500;1718// THROTTLE_MS can be overridden the POSTGRES_THROTTLE_CHANGEFEED_MS19// environment variable.20if (process.env.POSTGRES_THROTTLE_CHANGEFEED_MS != null) {21THROTTLE_MS = parseInt(process.env.POSTGRES_THROTTLE_CHANGEFEED_MS);22}2324import { EventEmitter } from "events";2526import { callback } from "awaiting";2728import { once } from "@cocalc/util/async-utils";29import { close, copy } from "@cocalc/util/misc";3031const { one_result, all_results } = require("../postgres-base");3233import { PostgreSQL, QueryWhere } from "./types";3435interface QueryOpts {36db: PostgreSQL;37select: string[];38table: string;39where: QueryWhere;40one: boolean;41}4243interface TableQuery {44select: string[];45where: QueryWhere;46}4748function key(obj: { [key: string]: any }): string {49return `query-${JSON.stringify(obj)}`;50}5152type State = "ready" | "closed";5354class ThrottledTableQueue extends EventEmitter {55private table: string;56private queue: { [key: string]: TableQuery } = {};57private db: PostgreSQL;58private process_timer: any;59private interval_ms: number;60private state: State = "ready";6162constructor(db: PostgreSQL, table: string, interval_ms: number) {63super();64this.db = db;65this.table = table;66this.interval_ms = interval_ms;6768// client listens for results of query -- if queries pile up, and69// there are many tables, the default of 10 can easily be exceeded.70this.setMaxListeners(100);71}7273private dbg(f): Function {74return this.db._dbg(`ThrottledTableQueue('${this.table}').${f}`);75}7677public close(): void {78if (this.state == "closed") {79return;80}81if (this.process_timer != null) {82clearTimeout(this.process_timer);83}84for (const k in this.queue) {85this.emit(k, "closed");86}87this.emit("closed");88this.removeAllListeners();89close(this);90this.state = "closed";91}9293public enqueue(query: TableQuery): string {94if (this.state == "closed") {95throw Error("trying to enqueue after close");96}97const k = key(query);98this.queue[k] = query;99if (this.process_timer == null) {100this.dbg("enqueue")(`will process queue in ${this.interval_ms}ms...`);101this.process_timer = setTimeout(102this.process_queue.bind(this),103this.interval_ms104);105}106return k;107}108109private async process_queue(): Promise<void> {110const dbg = this.dbg("process_queue");111delete this.process_timer; // it just fired112// First time we just doing them one at a time.113// FURTHER WORK: we could instead do ALL queries simultaneously as114// a single query, or at least do them in parallel...115116// Make a copy of whatever is queued up at this moment in time,117// then clear the queue. We are responsible for handling all118// queries in the queue now, but nothing further. If queries are119// slow, it can easily be the case that process_queue is120// called several times at once. However, that's fine, since121// each call is responsible for only the part of the queue that122// existed when it was called.123const queue = copy(this.queue);124this.queue = {};125126for (const k in queue) {127dbg(k);128const { select, where } = queue[k];129130try {131const result = await callback(132one_query,133this.db,134select,135this.table,136where137);138if (this.state == "closed") return;139dbg("success", k);140this.emit(k, undefined, result);141} catch (err) {142if (this.state == "closed") return;143dbg("fail", k);144this.emit(k, err, undefined);145}146}147}148}149150const throttled_table_queues: { [table: string]: ThrottledTableQueue } = {};151152function throttled_table_queue(153db: PostgreSQL,154table: string,155interval_ms: number156): ThrottledTableQueue {157if (throttled_table_queues[table] != null) {158return throttled_table_queues[table];159}160return (throttled_table_queues[table] = new ThrottledTableQueue(161db,162table,163interval_ms164));165}166167export async function query(opts: QueryOpts): Promise<any> {168if (THROTTLE && opts.one) {169const Table = throttled_table_queue(opts.db, opts.table, THROTTLE_MS);170const k: string = Table.enqueue({ select: opts.select, where: opts.where });171const [err, result] = await once(Table, k);172if (err != null) {173throw err;174}175return result;176}177return await callback(178opts.one ? one_query : all_query,179opts.db,180opts.select,181opts.table,182opts.where183);184}185186function all_query(db, select, table, where, cb): void {187db._query({ select, table, where, cb: all_results(cb) });188}189190function one_query(db, select, table, where, cb): void {191db._query({ select, table, where, cb: one_result(cb) });192}193194195