Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/sync/table/query-function.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6Determine function that does query.7*/89const DISABLE_STANDBY: boolean = true; // if true, never use standby server at all.1011const async = require("async");1213import { delay } from "awaiting";14import { SCHEMA } from "@cocalc/util/schema";15import { copy } from "@cocalc/util/misc";1617export function query_function(18client_query: Function,19table: string20): Function {21const s = SCHEMA[table];22if (s == null) {23throw Error(`unknown table ${table}`);24}25const db_standby = s.db_standby;2627if (DISABLE_STANDBY || !db_standby) {28// just use default client.query, which queries the master database.29return client_query;30}3132function do_query(opts: any): void {33if (opts == null) {34throw Error("opts must be an object");35}3637let read_done: boolean = false;38const change_queue: { err: any; change: any }[] = [];3940function do_initial_read_query(cb: Function): void {41const opts2 = copy(opts);42opts2.standby = true;43opts2.changes = false;44let cb_called: boolean = false;45opts2.cb = async function (err, resp): Promise<void> {46opts.cb(err, resp);47if (!err) {48read_done = true;49if (change_queue.length > 0) {50// CRITICAL: delay, since these must be pushed out in a later event loop.51// Without this delay, there will be many random failures.52await delay(0);53while (change_queue.length > 0) {54const x = change_queue.shift();55if (x == null) break; // make typescript happy.56const { err, change } = x;57opts.cb(err, change);58}59}60}61if (!cb_called) {62cb_called = true;63cb(err);64}65};66client_query(opts2);67}6869function start_changefeed(cb: Function): void {70let first_resp: boolean = true;71const opts2 = copy(opts);72opts2.standby = false;73opts2.changes = true;74opts2.cb = function (err, change): void {75if (read_done) {76opts.cb(err, change);77} else {78change_queue.push({ err, change });79}80if (first_resp) {81first_resp = false;82cb(err);83}84};85opts2.options = opts2.options.concat({ only_changes: true });86client_query(opts2);87}8889let f: Function;90if (db_standby === "unsafe") {91/* If db_standby == 'unsafe', then we do not even require92the changefeed to be working before doing the full query.93This will for sure miss all changes from when the query94finishes until the changefeed starts. For some95tables this is fine; for others, not. */9697f = async.parallel;98} else {99// Otherwise, the query could miss a small amount of data,100// but only for a tiny window of time.101f = async.series;102}103104f([do_initial_read_query, start_changefeed]);105}106107return do_query;108}109110111