Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/sync/table/changefeed.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45import { EventEmitter } from "events";6import { callback, delay } from "awaiting";7import { close } from "@cocalc/util/misc";89type State = "closed" | "disconnected" | "connecting" | "connected";1011export class Changefeed extends EventEmitter {12private query: any;13private do_query: Function;14private query_cancel: Function;15private state: State = "disconnected";16private table: string;17private id: string;18private options: any;19private handle_update_queue: { err?: any; resp?: any }[] = [];2021constructor({22do_query,23query_cancel,24options,25query,26table,27}: {28do_query: Function;29query_cancel: Function;30options: any;31table: string;32query: any;33}) {34super();35this.do_query = do_query;36this.query_cancel = query_cancel;37this.query = query;38this.options = options;39this.table = table;40}4142// Query for state of the table, connects to the43// changefeed, and return the initial state44// of the table. Throws an exception if anything45// goes wrong.46public async connect(): Promise<any> {47if (this.state != "disconnected") {48throw Error(49`can only connect if state is 'disconnected' but it is ${this.state}`50);51}52this.state = "connecting";53const resp = await callback(this.run_the_query.bind(this));54if (this.state === ("closed" as State)) {55throw Error("after running query, changefeed state is 'closed'");56}57if (resp.event === "query_cancel") {58throw Error("query-cancel");59}60if (resp.query == null || resp.query[this.table] == null) {61throw Error(`${this.table} changefeed init -- no error and no data`);62}63// Successfully completed query64this.id = resp.id;65this.state = "connected";66this.process_queue_next_tick();67return resp.query[this.table];68}6970// Wait a tick, then process the queue of messages that71// arrived during initialization.72private async process_queue_next_tick(): Promise<void> {73await delay(0);74while (this.state != "closed" && this.handle_update_queue.length > 0) {75const x = this.handle_update_queue.shift();76if (x != null) {77this.handle_update(x.err, x.resp);78}79}80}8182private run_the_query(cb: Function): void {83// This query_function gets called first on the84// initial query, then repeatedly with each changefeed85// update. The input function "cb" will be called86// precisely once, and the method handle_changefeed_update87// may get called if there are additional88// changefeed updates.89let first_time: boolean = true;90this.do_query({91query: this.query,92changes: true,93timeout: 30,94options: this.options,95cb: (err, resp) => {96if (first_time) {97cb(err, resp);98first_time = false;99} else {100this.handle_update(err, resp);101}102},103});104}105106private handle_update(err, resp): void {107if (this.state != "connected") {108if (this.state == "closed") {109// expected, since last updates after query cancel may get through...110return;111}112// This can and does happen when updates appear immediately113// after the first initial state is set (in run_the_query).114this.handle_update_queue.push({ err, resp });115return;116}117if (resp == null && err == null) {118err = "resp must not be null for non-error";119}120if (err || resp.event === "query_cancel") {121//if (err) console.warn("closing changefeed due to err", err);122this.close();123return;124}125if (resp.action == null) {126// Not a changefeed message. This happens, e.g., the first time127// when we use the standby server to get the changefeed.128return;129}130// Return just the new_val/old_val/action part of resp.131// console.log("resp=", resp);132const x: { new_val?: any; old_val?: any; action?: string } = {};133if (resp.new_val) {134x.new_val = resp.new_val;135}136if (resp.old_val) {137x.old_val = resp.old_val;138}139x.action = resp.action;140this.emit("update", x);141}142143public close(): void {144this.state = "closed";145if (this.id != null) {146// stop listening for future updates147this.cancel_query(this.id);148}149this.emit("close");150this.removeAllListeners();151close(this);152this.state = "closed";153}154155private async cancel_query(id: string): Promise<void> {156try {157await this.query_cancel(id);158} catch (err) {159// ignore error, which might be due to disconnecting and isn't a big deal.160// Basically anything that could cause an error would have also161// canceled the changefeed anyways.162}163}164165public get_state(): string {166return this.state;167}168}169170//171172173