CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/sync/table/changefeed.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
import { EventEmitter } from "events";
7
import { callback, delay } from "awaiting";
8
import { close } from "@cocalc/util/misc";
9
10
type State = "closed" | "disconnected" | "connecting" | "connected";
11
12
export class Changefeed extends EventEmitter {
13
private query: any;
14
private do_query: Function;
15
private query_cancel: Function;
16
private state: State = "disconnected";
17
private table: string;
18
private id: string;
19
private options: any;
20
private handle_update_queue: { err?: any; resp?: any }[] = [];
21
22
constructor({
23
do_query,
24
query_cancel,
25
options,
26
query,
27
table,
28
}: {
29
do_query: Function;
30
query_cancel: Function;
31
options: any;
32
table: string;
33
query: any;
34
}) {
35
super();
36
this.do_query = do_query;
37
this.query_cancel = query_cancel;
38
this.query = query;
39
this.options = options;
40
this.table = table;
41
}
42
43
// Query for state of the table, connects to the
44
// changefeed, and return the initial state
45
// of the table. Throws an exception if anything
46
// goes wrong.
47
public async connect(): Promise<any> {
48
if (this.state != "disconnected") {
49
throw Error(
50
`can only connect if state is 'disconnected' but it is ${this.state}`
51
);
52
}
53
this.state = "connecting";
54
const resp = await callback(this.run_the_query.bind(this));
55
if (this.state === ("closed" as State)) {
56
throw Error("after running query, changefeed state is 'closed'");
57
}
58
if (resp.event === "query_cancel") {
59
throw Error("query-cancel");
60
}
61
if (resp.query == null || resp.query[this.table] == null) {
62
throw Error(`${this.table} changefeed init -- no error and no data`);
63
}
64
// Successfully completed query
65
this.id = resp.id;
66
this.state = "connected";
67
this.process_queue_next_tick();
68
return resp.query[this.table];
69
}
70
71
// Wait a tick, then process the queue of messages that
72
// arrived during initialization.
73
private async process_queue_next_tick(): Promise<void> {
74
await delay(0);
75
while (this.state != "closed" && this.handle_update_queue.length > 0) {
76
const x = this.handle_update_queue.shift();
77
if (x != null) {
78
this.handle_update(x.err, x.resp);
79
}
80
}
81
}
82
83
private run_the_query(cb: Function): void {
84
// This query_function gets called first on the
85
// initial query, then repeatedly with each changefeed
86
// update. The input function "cb" will be called
87
// precisely once, and the method handle_changefeed_update
88
// may get called if there are additional
89
// changefeed updates.
90
let first_time: boolean = true;
91
this.do_query({
92
query: this.query,
93
changes: true,
94
timeout: 30,
95
options: this.options,
96
cb: (err, resp) => {
97
if (first_time) {
98
cb(err, resp);
99
first_time = false;
100
} else {
101
this.handle_update(err, resp);
102
}
103
},
104
});
105
}
106
107
private handle_update(err, resp): void {
108
if (this.state != "connected") {
109
if (this.state == "closed") {
110
// expected, since last updates after query cancel may get through...
111
return;
112
}
113
// This can and does happen when updates appear immediately
114
// after the first initial state is set (in run_the_query).
115
this.handle_update_queue.push({ err, resp });
116
return;
117
}
118
if (resp == null && err == null) {
119
err = "resp must not be null for non-error";
120
}
121
if (err || resp.event === "query_cancel") {
122
//if (err) console.warn("closing changefeed due to err", err);
123
this.close();
124
return;
125
}
126
if (resp.action == null) {
127
// Not a changefeed message. This happens, e.g., the first time
128
// when we use the standby server to get the changefeed.
129
return;
130
}
131
// Return just the new_val/old_val/action part of resp.
132
// console.log("resp=", resp);
133
const x: { new_val?: any; old_val?: any; action?: string } = {};
134
if (resp.new_val) {
135
x.new_val = resp.new_val;
136
}
137
if (resp.old_val) {
138
x.old_val = resp.old_val;
139
}
140
x.action = resp.action;
141
this.emit("update", x);
142
}
143
144
public close(): void {
145
this.state = "closed";
146
if (this.id != null) {
147
// stop listening for future updates
148
this.cancel_query(this.id);
149
}
150
this.emit("close");
151
this.removeAllListeners();
152
close(this);
153
this.state = "closed";
154
}
155
156
private async cancel_query(id: string): Promise<void> {
157
try {
158
await this.query_cancel(id);
159
} catch (err) {
160
// ignore error, which might be due to disconnecting and isn't a big deal.
161
// Basically anything that could cause an error would have also
162
// canceled the changefeed anyways.
163
}
164
}
165
166
public get_state(): string {
167
return this.state;
168
}
169
}
170
171
//
172
173