Path: blob/master/src/packages/conat/persist/server.ts
1710 views
/*1CONAT_SERVER=http://localhost:3000 node23// making a server from scratch45// initialize persist context67require('@cocalc/backend/conat/persist');89// a conat server and client10s = require('@cocalc/conat/core/server').init({port:4567, getUser:()=>{return {hub_id:'hub'}}}); client = s.client();1112// persist server13p = require('@cocalc/conat/persist/server').server({client}); 0;14151617// a client for persist server1819c = require('@cocalc/conat/persist/client').stream({client, user:{hub_id:'hub'}, storage:{path:'b.txt'}});2021for await (x of await c.getAll()) { console.log(x) }222324await c.set({messageData:client.message(123)})2526for await (x of await c.getAll()) { console.log(x) }2728[ { seq: 1, time: 1750218209211, encoding: 0, raw: <Buffer 7b> } ]2930(await c.get({seq:5})).data3132await c.set({key:'foo', messageData:client.message('bar')})33(await c.get({key:'foo'})).data3435await c.delete({seq:6})363738client = await require('@cocalc/backend/conat').conat(); kv = require('@cocalc/backend/conat/sync').akv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a.txt', client})3940client = await require('@cocalc/backend/conat').conat(); s = require('@cocalc/backend/conat/sync').astream({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'b.txt', client})4142client = await require('@cocalc/backend/conat').conat(); s = await require('@cocalc/backend/conat/sync').dstream({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'ds2.txt', client})434445client = await require('@cocalc/backend/conat').conat(); kv = require('@cocalc/backend/conat/sync').akv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a.txt', client})464748client = await require('@cocalc/backend/conat').conat(); kv = await require('@cocalc/backend/conat/sync').dkv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a1', client})495051client = await require('@cocalc/backend/conat').conat(); s = await require('@cocalc/conat/sync/core-stream').cstream({name:'d.txt',client})525354*/5556import { type Client, ConatError } from "@cocalc/conat/core/client";57import {58type ConatSocketServer,59type ServerSocket,60} from "@cocalc/conat/socket";61import { getLogger } from "@cocalc/conat/client";62import type {63StoredMessage,64PersistentStream,65StorageOptions,66} from "./storage";67import { getStream, SERVICE, MAX_PER_USER, MAX_GLOBAL, RESOURCE } from "./util";68import { throttle } from "lodash";69import { type SetOptions } from "./client";70import { once } from "@cocalc/util/async-utils";71import { UsageMonitor } from "@cocalc/conat/monitor/usage";7273const logger = getLogger("persist:server");7475// When sending a large number of message for76// getAll or change updates, we combine together messages77// until hitting this size, then send them all at once.78// This bound is to avoid potentially using a huge amount of RAM79// when streaming a large saved database to the client.80// Note: if a single message is larger than this, it still81// gets sent, just individually.82const DEFAULT_MESSAGES_THRESH = 20 * 1e6;83//const DEFAULT_MESSAGES_THRESH = 1e5;8485// I added an experimental way to run any sqlite query... but it is disabled86// since of course there are major DOS and security concerns.87const ENABLE_SQLITE_GENERAL_QUERIES = false;8889const SEND_THROTTLE = 30;9091export function server({92client,93messagesThresh = DEFAULT_MESSAGES_THRESH,94service = SERVICE,95}: {96client: Client;97messagesThresh?: number;98service?: string;99}): ConatSocketServer {100logger.debug("server: creating persist server");101if (client == null) {102throw Error("client must be specified");103}104const subject = `${service}.*`;105const server: ConatSocketServer = client.socket.listen(subject);106logger.debug("server: listening on ", { subject });107const usage = new UsageMonitor({108maxPerUser: MAX_PER_USER,109max: MAX_GLOBAL,110resource: RESOURCE,111log: (...args) => {112logger.debug(RESOURCE, ...args);113},114});115server.on("close", () => {116usage.close();117});118119server.on("connection", (socket: ServerSocket) => {120logger.debug("server: got new connection", {121id: socket.id,122subject: socket.subject,123});124// console.log(new Date(), "persist server got connection", {125// persist: client.info.id,126// subject: socket.subject,127// });128let error = "";129let errorCode: any = undefined;130let changefeed = false;131let storage: undefined | StorageOptions = undefined;132let stream: undefined | PersistentStream = undefined;133let user = "";134let added = false;135socket.on("data", async (data) => {136// logger.debug("server: got data ", data);137if (stream == null) {138storage = data.storage;139changefeed = data.changefeed;140try {141user = socket.subject.split(".")[1];142usage.add(user);143added = true;144stream = await getStream({145subject: socket.subject,146storage,147service,148});149if (changefeed) {150startChangefeed({ socket, stream, messagesThresh });151}152socket.emit("stream-initialized");153} catch (err) {154error = `${err}`;155errorCode = err.code;156socket.write(null, { headers: { error, code: errorCode } });157}158}159});160socket.on("closed", () => {161logger.debug("socket closed", socket.subject);162storage = undefined;163stream?.close();164stream = undefined;165if (added) {166usage.delete(user);167}168});169170socket.on("request", async (mesg) => {171const request = mesg.headers;172// logger.debug("got request", request);173174try {175if (error) {176throw new ConatError(error, { code: errorCode });177}178if (stream == null) {179await once(socket, "stream-initialized", request.timeout ?? 30000);180}181if (stream == null) {182throw Error("bug");183}184if (request.cmd == "set") {185mesg.respondSync(186stream.set({187key: request.key,188previousSeq: request.previousSeq,189raw: mesg.raw,190ttl: request.ttl,191encoding: mesg.encoding,192headers: request.headers,193msgID: request.msgID,194}),195);196} else if (request.cmd == "setMany") {197// just like set except the main data of the mesg198// has an array of set operations199const resp: (200| { seq: number; time: number }201| { error: string; code?: any }202)[] = [];203for (const {204key,205previousSeq,206ttl,207msgID,208messageData,209} of mesg.data as SetOptions[]) {210try {211resp.push(212stream.set({213key,214previousSeq,215ttl,216headers: messageData.headers,217msgID,218raw: messageData.raw,219encoding: messageData.encoding,220}),221);222} catch (err) {223resp.push({ error: `${err}`, code: err.code });224}225}226mesg.respondSync(resp);227} else if (request.cmd == "delete") {228mesg.respondSync(stream.delete(request));229} else if (request.cmd == "config") {230mesg.respondSync(stream.config(request.config));231} else if (request.cmd == "inventory") {232mesg.respondSync(stream.inventory());233} else if (request.cmd == "get") {234const resp = stream.get({ key: request.key, seq: request.seq });235//console.log("got resp = ", resp);236if (resp == null) {237mesg.respondSync(null);238} else {239const { raw, encoding, headers, seq, time, key } = resp;240mesg.respondSync(null, {241raw,242encoding,243headers: { ...headers, seq, time, key },244});245}246} else if (request.cmd == "keys") {247const resp = stream.keys();248mesg.respondSync(resp);249} else if (request.cmd == "sqlite") {250if (!ENABLE_SQLITE_GENERAL_QUERIES) {251throw Error("sqlite command not currently supported");252}253const resp = stream.sqlite(request.statement, request.params);254mesg.respondSync(resp);255} else if (request.cmd == "serverId") {256mesg.respondSync(server.id);257} else if (request.cmd == "getAll") {258logger.debug("getAll", { subject: socket.subject, request });259// getAll uses requestMany which responds with all matching messages,260// so no call to mesg.respond here.261getAll({ stream, mesg, request, messagesThresh });262} else if (request.cmd == "changefeed") {263logger.debug("changefeed", changefeed);264if (!changefeed) {265changefeed = true;266startChangefeed({ socket, stream, messagesThresh });267}268mesg.respondSync("created");269} else {270mesg.respondSync(null, {271headers: { error: `unknown command ${request.cmd}`, code: 404 },272});273}274} catch (err) {275mesg.respondSync(null, {276headers: { error: `${err}`, code: err.code },277});278}279});280});281282return server;283}284285async function getAll({ stream, mesg, request, messagesThresh }) {286let seq = 0;287const respond = (error?, messages?: StoredMessage[]) => {288mesg.respondSync(messages, { headers: { error, seq, code: error?.code } });289seq += 1;290};291292try {293const messages: StoredMessage[] = [];294let size = 0;295for (const message of stream.getAll({296start_seq: request.start_seq,297end_seq: request.end_seq,298})) {299messages.push(message);300size += message.raw.length;301if (size >= messagesThresh) {302respond(undefined, messages);303messages.length = 0;304size = 0;305}306}307308if (messages.length > 0) {309respond(undefined, messages);310}311// successful finish312respond();313} catch (err) {314respond(`${err}`);315}316}317318function startChangefeed({ socket, stream, messagesThresh }) {319logger.debug("startChangefeed", { subject: socket.subject });320// this seq here has nothing to do with the seq of the StoredMessage!321let seq = 0;322const respond = (error?, messages?: StoredMessage[]) => {323if (socket.state == "closed") {324return;325}326//logger.debug("changefeed: writing messages to socket", { seq, messages });327socket.write(messages, { headers: { error, seq } });328seq += 1;329};330331const unsentMessages: StoredMessage[] = [];332const sendAllUnsentMessages = throttle(333() => {334while (socket.state != "closed" && unsentMessages.length > 0) {335const messages: StoredMessage[] = [];336let size = 0;337while (unsentMessages.length > 0 && socket.state != "closed") {338const message = unsentMessages.shift();339// e.g. op:'delete' messages have length 0 and no raw field340size += message?.raw?.length ?? 0;341messages.push(message!);342if (size >= messagesThresh) {343respond(undefined, messages);344size = 0;345messages.length = 0;346}347}348if (messages.length > 0) {349respond(undefined, messages);350}351}352},353SEND_THROTTLE,354{ leading: true, trailing: true },355);356357stream.on("change", (message) => {358if (socket.state == "closed") {359return;360}361//console.log("stream change event", message);362// logger.debug("changefeed got message", message, socket.state);363unsentMessages.push(message);364sendAllUnsentMessages();365});366}367368369