Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/hub/changefeeds/server.ts
5928 views
1
import { type Client, type ConatSocketServer } from "@cocalc/conat/core/client";
2
import { uuid } from "@cocalc/util/misc";
3
import { UsageMonitor } from "@cocalc/conat/monitor/usage";
4
import { getLogger } from "@cocalc/conat/client";
5
import { isValidUUID } from "@cocalc/util/misc";
6
import {
7
SUBJECT,
8
MAX_PER_ACCOUNT,
9
MAX_GLOBAL,
10
SERVER_KEEPALIVE,
11
KEEPALIVE_TIMEOUT,
12
RESOURCE,
13
} from "./util";
14
export { type ConatSocketServer };
15
16
const logger = getLogger("hub:changefeeds:server");
17
18
export function changefeedServer({
19
client,
20
userQuery,
21
cancelQuery,
22
}: {
23
client: Client;
24
25
userQuery: (opts: {
26
query: object;
27
options?: object[];
28
account_id: string;
29
changes: string;
30
cb: Function;
31
}) => void;
32
33
cancelQuery: (uuid: string) => void;
34
}): ConatSocketServer {
35
const usage = new UsageMonitor({
36
maxPerUser: MAX_PER_ACCOUNT,
37
max: MAX_GLOBAL,
38
resource: RESOURCE,
39
log: (...args) => {
40
logger.debug(RESOURCE, ...args);
41
},
42
});
43
44
const server = client.socket.listen(SUBJECT, {
45
keepAlive: SERVER_KEEPALIVE,
46
keepAliveTimeout: KEEPALIVE_TIMEOUT,
47
});
48
logger.debug("created changefeed server with id", server.id);
49
50
server.on("connection", (socket) => {
51
const v = socket.subject.split(".")[1];
52
logger.debug(server.id, "connection from ", v);
53
if (!v?.startsWith("account-")) {
54
socket.write({ error: "only account users can create changefeeds" });
55
logger.debug(
56
"socket.close: due to changefeed request from non-account subject",
57
socket.subject,
58
);
59
socket.close();
60
return;
61
}
62
const account_id = v.slice("account-".length);
63
if (!isValidUUID(account_id)) {
64
logger.debug(
65
"socket.close: due to invalid uuid",
66
socket.subject,
67
account_id,
68
);
69
socket.write({
70
error: `invalid account_id -- '${account_id}', subject=${socket.subject}`,
71
});
72
socket.close();
73
return;
74
}
75
let added = false;
76
try {
77
usage.add(account_id);
78
added = true;
79
} catch (err) {
80
socket.write({ error: `${err}`, code: err.code });
81
logger.debug(
82
"socket.close: due to usage error (limit exceeded?)",
83
socket.subject,
84
err,
85
);
86
socket.close();
87
return;
88
}
89
90
const changes = uuid();
91
92
socket.on("closed", () => {
93
logger.debug(
94
"socket.close: cleaning up since socket closed for some external reason (timeout?)",
95
socket.subject,
96
);
97
if (added) {
98
usage.delete(account_id);
99
}
100
cancelQuery(changes);
101
});
102
103
let running = false;
104
socket.on("data", (data) => {
105
if (running) {
106
socket.write({ error: "exactly one query per connection" });
107
logger.debug(
108
"socket.close: due to attempt to run more than one query",
109
socket.subject,
110
);
111
socket.close();
112
return;
113
}
114
running = true;
115
const { query, options } = data;
116
try {
117
userQuery({
118
query,
119
options,
120
changes,
121
account_id,
122
cb: (error, update) => {
123
// logger.debug("got: ", { error, update });
124
if (error) {
125
error = `error from postgres: "${error}"`;
126
}
127
try {
128
socket.write({ error, update });
129
} catch (err) {
130
if (`${err}`.includes("closed")) {
131
// expected behavior when other side closed it
132
socket.close();
133
return;
134
}
135
// happens if buffer is full. we just close the socket for now. (TODO?)
136
error = `${error ? error + "; " : ""}unable to send (buffer may be full -- closing) `;
137
}
138
if (error) {
139
logger.debug(error, socket.subject);
140
socket.close();
141
}
142
},
143
});
144
} catch (err) {
145
logger.debug(
146
"socket.close: due to error creating query",
147
socket.subject,
148
err,
149
);
150
try {
151
socket.write({ error: `${err}` });
152
} catch {}
153
socket.close();
154
}
155
});
156
});
157
server.on("closed", () => {
158
logger.debug("shutting down changefeed server");
159
usage.close();
160
});
161
162
return server;
163
}
164
165