Path: blob/master/src/packages/conat/persist/server.ts
5819 views
/*1CONAT_SERVER=http://localhost:3000 node23// making a server from scratch45// initialize persist context67require('@cocalc/backend/conat/persist');89// a conat server and client10s = require('@cocalc/conat/core/server').init({port:4567, getUser:()=>{return {hub_id:'hub'}}}); client = s.client();1112// persist server13p = require('@cocalc/conat/persist/server').server({client}); 0;14151617// a client for persist server1819c = require('@cocalc/conat/persist/client').stream({client, user:{hub_id:'hub'}, storage:{path:'b.txt'}});2021for await (x of await c.getAll()) { console.log(x) }222324await c.set({messageData:client.message(123)})2526for await (x of await c.getAll()) { console.log(x) }2728[ { seq: 1, time: 1750218209211, encoding: 0, raw: <Buffer 7b> } ]2930(await c.get({seq:5})).data3132await c.set({key:'foo', messageData:client.message('bar')})33(await c.get({key:'foo'})).data3435await c.delete({seq:6})363738client = await require('@cocalc/backend/conat').conat(); kv = require('@cocalc/backend/conat/sync').akv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a.txt', client})3940client = await require('@cocalc/backend/conat').conat(); s = require('@cocalc/backend/conat/sync').astream({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'b.txt', client})4142client = await require('@cocalc/backend/conat').conat(); s = await require('@cocalc/backend/conat/sync').dstream({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'ds2.txt', client})434445client = await require('@cocalc/backend/conat').conat(); kv = require('@cocalc/backend/conat/sync').akv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a.txt', client})464748client = await require('@cocalc/backend/conat').conat(); kv = await require('@cocalc/backend/conat/sync').dkv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a1', client})495051client = await require('@cocalc/backend/conat').conat(); s = await require('@cocalc/conat/sync/core-stream').cstream({name:'d.txt',client})525354*/5556import { type Client, ConatError } from "@cocalc/conat/core/client";57import {58type ConatSocketServer,59type ServerSocket,60} from "@cocalc/conat/socket";61import type {62StoredMessage,63PersistentStream,64StorageOptions,65} from "./storage";66import { getStream, SERVICE, MAX_PER_USER, MAX_GLOBAL, RESOURCE } from "./util";67import { throttle } from "lodash";68import { type SetOptions } from "./client";69import { once } from "@cocalc/util/async-utils";70import { UsageMonitor } from "@cocalc/conat/monitor/usage";71import { getLogger } from "@cocalc/conat/client";72import { initLoadBalancer } from "./load-balancer";7374const logger = getLogger("persist:server");7576// When sending a large number of message for77// getAll or change updates, we combine together messages78// until hitting this size, then send them all at once.79// This bound is to avoid potentially using a huge amount of RAM80// when streaming a large saved database to the client.81// Note: if a single message is larger than this, it still82// gets sent, just individually.83const DEFAULT_MESSAGES_THRESH = 20 * 1e6;84//const DEFAULT_MESSAGES_THRESH = 1e5;8586// I added an experimental way to run any sqlite query... but it is disabled87// since of course there are major DOS and security concerns.88const ENABLE_SQLITE_GENERAL_QUERIES = false;8990const SEND_THROTTLE = 30;9192export function server({93client,94messagesThresh = DEFAULT_MESSAGES_THRESH,95service = SERVICE,96id = "0",97clusterMode,98}: {99client: Client;100messagesThresh?: number;101service?: string;102id?: string;103// if false, runs it's own internal load balancer that always returns this server104clusterMode?: boolean;105}): ConatSocketServer {106const log = (...args) => {107logger.debug(id, service, ...args);108};109log("server: creating persist server");110if (client == null) {111throw Error("client must be specified");112}113const subject = `${service}.*`;114const server: ConatSocketServer = client.socket.listen(subject, { id });115log("server listening", { subject, id });116if (!clusterMode) {117log("persist server not in cluster mode, so starting own load balancer");118initLoadBalancer({ service, ids: [id], client });119}120const usage = new UsageMonitor({121maxPerUser: MAX_PER_USER,122max: MAX_GLOBAL,123resource: RESOURCE,124log: (...args) => {125log(RESOURCE, ...args);126},127});128129server.on("close", () => {130log("stopping persist server", { id, service });131usage.close();132});133134server.on("connection", (socket: ServerSocket) => {135log("server: got new connection", {136id,137service,138subject: socket.subject,139});140// console.log(new Date(), "persist server got connection", {141// persist: client.info.id,142// subject: socket.subject,143// });144let error = "";145let errorCode: any = undefined;146let changefeed = false;147let storage: undefined | StorageOptions = undefined;148let stream: undefined | PersistentStream = undefined;149let user = "";150let added = false;151socket.on("data", async (data) => {152// log("server: got data ", data);153if (stream == null) {154storage = data.storage;155changefeed = data.changefeed;156try {157user = socket.subject.split(".")[1];158usage.add(user);159added = true;160stream = await getStream({161subject: socket.subject,162storage,163service,164});165if (changefeed) {166startChangefeed({ socket, stream, messagesThresh });167}168socket.emit("stream-initialized");169} catch (err) {170error = `${err}`;171errorCode = err.code;172socket.write(null, { headers: { error, code: errorCode } });173}174}175});176socket.on("closed", () => {177log("socket closed", id, socket.subject);178storage = undefined;179stream?.close();180stream = undefined;181if (added) {182usage.delete(user);183}184});185186socket.on("request", async (mesg) => {187const request = mesg.headers;188// log("got request", request);189190try {191if (error) {192throw new ConatError(error, { code: errorCode });193}194if (stream == null) {195await once(socket, "stream-initialized", request.timeout ?? 30000);196}197if (stream == null) {198throw Error("bug");199}200if (request.cmd == "set") {201mesg.respondSync(202stream.set({203key: request.key,204previousSeq: request.previousSeq,205raw: mesg.raw,206ttl: request.ttl,207encoding: mesg.encoding,208headers: request.headers,209msgID: request.msgID,210}),211);212} else if (request.cmd == "setMany") {213// just like set except the main data of the mesg214// has an array of set operations215const resp: (216| { seq: number; time: number }217| { error: string; code?: any }218)[] = [];219for (const {220key,221previousSeq,222ttl,223msgID,224messageData,225} of mesg.data as SetOptions[]) {226try {227resp.push(228stream.set({229key,230previousSeq,231ttl,232headers: messageData.headers,233msgID,234raw: messageData.raw,235encoding: messageData.encoding,236}),237);238} catch (err) {239resp.push({ error: `${err}`, code: err.code });240}241}242mesg.respondSync(resp);243} else if (request.cmd == "delete") {244mesg.respondSync(stream.delete(request));245} else if (request.cmd == "config") {246mesg.respondSync(stream.config(request.config));247} else if (request.cmd == "inventory") {248mesg.respondSync(stream.inventory());249} else if (request.cmd == "get") {250const resp = stream.get({ key: request.key, seq: request.seq });251//console.log("got resp = ", resp);252if (resp == null) {253mesg.respondSync(null);254} else {255const { raw, encoding, headers, seq, time, key } = resp;256mesg.respondSync(null, {257raw,258encoding,259headers: { ...headers, seq, time, key },260});261}262} else if (request.cmd == "keys") {263const resp = stream.keys();264mesg.respondSync(resp);265} else if (request.cmd == "sqlite") {266if (!ENABLE_SQLITE_GENERAL_QUERIES) {267throw Error("sqlite command not currently supported");268}269const resp = stream.sqlite(request.statement, request.params);270mesg.respondSync(resp);271} else if (request.cmd == "serverId") {272mesg.respondSync(server.id);273} else if (request.cmd == "getAll") {274log("getAll", { subject: socket.subject, request });275// getAll uses requestMany which responds with all matching messages,276// so no call to mesg.respond here.277getAll({ stream, mesg, request, messagesThresh });278} else if (request.cmd == "changefeed") {279log("changefeed", changefeed);280if (!changefeed) {281changefeed = true;282startChangefeed({ socket, stream, messagesThresh });283}284mesg.respondSync("created");285} else {286mesg.respondSync(null, {287headers: { error: `unknown command ${request.cmd}`, code: 404 },288});289}290} catch (err) {291mesg.respondSync(null, {292headers: { error: `${err}`, code: err.code },293});294}295});296});297298return server;299}300301async function getAll({ stream, mesg, request, messagesThresh }) {302let seq = 0;303const respond = (error?, messages?: StoredMessage[]) => {304mesg.respondSync(messages, { headers: { error, seq, code: error?.code } });305seq += 1;306};307308try {309const messages: StoredMessage[] = [];310let size = 0;311for (const message of stream.getAll({312start_seq: request.start_seq,313end_seq: request.end_seq,314})) {315messages.push(message);316size += message.raw.length;317if (size >= messagesThresh) {318respond(undefined, messages);319messages.length = 0;320size = 0;321}322}323324if (messages.length > 0) {325respond(undefined, messages);326}327// successful finish328respond();329} catch (err) {330respond(`${err}`);331}332}333334function startChangefeed({ socket, stream, messagesThresh }) {335logger.debug("startChangefeed", { subject: socket.subject });336// this seq here has nothing to do with the seq of the StoredMessage!337let seq = 0;338const respond = (error?, messages?: StoredMessage[]) => {339if (socket.state == "closed") {340return;341}342//logger.debug("changefeed: writing messages to socket", { seq, messages });343socket.write(messages, { headers: { error, seq } });344seq += 1;345};346347const unsentMessages: StoredMessage[] = [];348const sendAllUnsentMessages = throttle(349() => {350while (socket.state != "closed" && unsentMessages.length > 0) {351const messages: StoredMessage[] = [];352let size = 0;353while (unsentMessages.length > 0 && socket.state != "closed") {354const message = unsentMessages.shift();355// e.g. op:'delete' messages have length 0 and no raw field356size += message?.raw?.length ?? 0;357messages.push(message!);358if (size >= messagesThresh) {359respond(undefined, messages);360size = 0;361messages.length = 0;362}363}364if (messages.length > 0) {365respond(undefined, messages);366}367}368},369SEND_THROTTLE,370{ leading: true, trailing: true },371);372373stream.on("change", (message) => {374if (socket.state == "closed") {375return;376}377//console.log("stream change event", message);378// logger.debug("changefeed got message", message, socket.state);379unsentMessages.push(message);380sendAllUnsentMessages();381});382}383384385