Path: blob/master/src/packages/conat/persist/load-balancer.ts
2210 views
/*1The persist load balancer listens for requests on the subject23{SERVICE}.{scope}.id45- SERVICE (e.g., 'persist')6- scope it typically hub or account-...or project-....7- id is literally that.89It then responds with a persist server id from the ones that got started.10The persist server id is just a function of the scope, i.e,. we11just shard accounts and projects across the persist server, and that's12it. The most important thing is that this assignment never changes13(unless you restart servers), because if two clients both fetch the14id for the same scope, they must get something on the same persist15server, since otherwise things could be out of sync.1617If somehow two clients got different id's, that' wouldn't corrupt data on disk.18We always run different persist servers on the same machine with the same disk,19so since data is written with sqlite lite, nothing will literally be out sync,20since sqlite has locks and allows multiple processes to write to the same file.21However, the *change* events will not properly get sent out, and that will break22collaborative editing badly.23*/2425import { type Client } from "@cocalc/conat/core/client";26import { getLogger } from "@cocalc/conat/client";27import { SERVICE } from "./util";28import { hash_string } from "@cocalc/util/misc";29import { delay } from "awaiting";3031const logger = getLogger("persist:load-balancer");3233export function initLoadBalancer({34client,35ids,36service = SERVICE,37}: {38client: Client;39ids: string[];40service?: string;41}) {42if (ids.length == 0) {43throw Error("there must be at least 1 id");44}4546const subject = `${service}.*.id`;4748// I don't think subscription ever randomly throw errors, but this49// is so important I'm making it extra paranoid:50(async () => {51while (true) {52let sub: any = undefined;53try {54logger.debug("creating persist load balancer: ", { subject, ids });55sub = await client.subscribe(subject);56for await (const mesg of sub) {57mesg.respondSync(getId(ids, mesg.subject));58}59} catch (err) {60sub?.close();61logger.debug("ERROR (restarting) -- ", err);62}63await delay(3000);64}65})();66}6768// we use a hash so that this takes NO memory, but the assignment69// lasts forever, which means our sharding by server doesn't70// take into account load at all. This keeps things much, much71// simpler, and should be fine in practice.72export function getId(ids: string[], subject: string) {73const h = Math.abs(hash_string(subject.split(".")[1]));74const id = ids[h % ids.length];75//logger.debug("getId", { ids, subject, id });76return id;77}7879export async function getPersistServerId({80client,81subject,82}: {83client: Client;84subject: string;85}) {86// take only first two segments of subject, since it could have a bunch more87// that we better ignore (e.g., from the client)88const s = subject.split(".").slice(0, 2).join(".") + ".id";89const resp = await client.request(s, null);90return resp.data;91}929394