Path: blob/master/src/packages/conat/hub/changefeeds/server.ts
5928 views
import { type Client, type ConatSocketServer } from "@cocalc/conat/core/client";1import { uuid } from "@cocalc/util/misc";2import { UsageMonitor } from "@cocalc/conat/monitor/usage";3import { getLogger } from "@cocalc/conat/client";4import { isValidUUID } from "@cocalc/util/misc";5import {6SUBJECT,7MAX_PER_ACCOUNT,8MAX_GLOBAL,9SERVER_KEEPALIVE,10KEEPALIVE_TIMEOUT,11RESOURCE,12} from "./util";13export { type ConatSocketServer };1415const logger = getLogger("hub:changefeeds:server");1617export function changefeedServer({18client,19userQuery,20cancelQuery,21}: {22client: Client;2324userQuery: (opts: {25query: object;26options?: object[];27account_id: string;28changes: string;29cb: Function;30}) => void;3132cancelQuery: (uuid: string) => void;33}): ConatSocketServer {34const usage = new UsageMonitor({35maxPerUser: MAX_PER_ACCOUNT,36max: MAX_GLOBAL,37resource: RESOURCE,38log: (...args) => {39logger.debug(RESOURCE, ...args);40},41});4243const server = client.socket.listen(SUBJECT, {44keepAlive: SERVER_KEEPALIVE,45keepAliveTimeout: KEEPALIVE_TIMEOUT,46});47logger.debug("created changefeed server with id", server.id);4849server.on("connection", (socket) => {50const v = socket.subject.split(".")[1];51logger.debug(server.id, "connection from ", v);52if (!v?.startsWith("account-")) {53socket.write({ error: "only account users can create changefeeds" });54logger.debug(55"socket.close: due to changefeed request from non-account subject",56socket.subject,57);58socket.close();59return;60}61const account_id = v.slice("account-".length);62if (!isValidUUID(account_id)) {63logger.debug(64"socket.close: due to invalid uuid",65socket.subject,66account_id,67);68socket.write({69error: `invalid account_id -- '${account_id}', subject=${socket.subject}`,70});71socket.close();72return;73}74let added = false;75try {76usage.add(account_id);77added = true;78} catch (err) {79socket.write({ error: `${err}`, code: err.code });80logger.debug(81"socket.close: due to usage error (limit exceeded?)",82socket.subject,83err,84);85socket.close();86return;87}8889const changes = uuid();9091socket.on("closed", () => {92logger.debug(93"socket.close: cleaning up since socket closed for some external reason (timeout?)",94socket.subject,95);96if (added) {97usage.delete(account_id);98}99cancelQuery(changes);100});101102let running = false;103socket.on("data", (data) => {104if (running) {105socket.write({ error: "exactly one query per connection" });106logger.debug(107"socket.close: due to attempt to run more than one query",108socket.subject,109);110socket.close();111return;112}113running = true;114const { query, options } = data;115try {116userQuery({117query,118options,119changes,120account_id,121cb: (error, update) => {122// logger.debug("got: ", { error, update });123if (error) {124error = `error from postgres: "${error}"`;125}126try {127socket.write({ error, update });128} catch (err) {129if (`${err}`.includes("closed")) {130// expected behavior when other side closed it131socket.close();132return;133}134// happens if buffer is full. we just close the socket for now. (TODO?)135error = `${error ? error + "; " : ""}unable to send (buffer may be full -- closing) `;136}137if (error) {138logger.debug(error, socket.subject);139socket.close();140}141},142});143} catch (err) {144logger.debug(145"socket.close: due to error creating query",146socket.subject,147err,148);149try {150socket.write({ error: `${err}` });151} catch {}152socket.close();153}154});155});156server.on("closed", () => {157logger.debug("shutting down changefeed server");158usage.close();159});160161return server;162}163164165