Path: blob/master/src/packages/conat/core/cluster.ts
5808 views
import { type Client, connect } from "./client";1import { Patterns } from "./patterns";2import {3updateInterest,4type InterestUpdate,5} from "@cocalc/conat/core/server";6import type { DStream } from "@cocalc/conat/sync/dstream";7import { once } from "@cocalc/util/async-utils";8import { server as createPersistServer } from "@cocalc/conat/persist/server";9import { getLogger } from "@cocalc/conat/client";10import { hash_string } from "@cocalc/util/misc";11const CREATE_LINK_TIMEOUT = 45_000;1213const logger = getLogger("conat:core:cluster");1415export async function clusterLink(16address: string,17systemAccountPassword: string,18timeout = CREATE_LINK_TIMEOUT,19) {20const client = connect({ address, systemAccountPassword });21if (client.info == null) {22try {23await client.waitUntilSignedIn({24timeout: timeout ?? CREATE_LINK_TIMEOUT,25});26} catch (err) {27client.close();28throw err;29}30if (client.info == null) {31// this is impossible32throw Error("BUG -- failed to sign in");33}34}35const { id, clusterName } = client.info;36if (!id) {37throw Error("id must be specified");38}39if (!clusterName) {40throw Error("clusterName must be specified");41}42const link = new ClusterLink(client, id, clusterName, address);43await link.init();44return link;45}4647export type Interest = Patterns<{ [queue: string]: Set<string> }>;4849export { type ClusterLink };5051class ClusterLink {52public interest: Interest = new Patterns();53private streams: ClusterStreams;54private state: "init" | "ready" | "closed" = "init";55private clientStateChanged = Date.now(); // when client status last changed5657constructor(58public readonly client: Client,59public readonly id: string,60public readonly clusterName: string,61public readonly address: string,62) {63if (!client) {64throw Error("client must be specified");65}66if (!clusterName) {67throw Error("clusterName must be specified");68}69if (!id) {70throw Error("id must be specified");71}72}7374init = async () => {75this.client.on("connected", this.handleClientStateChanged);76this.client.on("disconnected", this.handleClientStateChanged);77this.streams = await clusterStreams({78client: this.client,79id: this.id,80clusterName: this.clusterName,81});82for (const update of this.streams.interest.getAll()) {83updateInterest(update, this.interest);84}85// I have a slight concern about this because updates might not86// arrive in order during automatic failover. That said, maybe87// automatic failover doesn't matter with these streams, since88// it shouldn't really happen -- each stream is served from the server89// it is about, and when that server goes down none of this state90// matters anymore.91this.streams.interest.on("change", this.handleInterestUpdate);92this.state = "ready";93};9495isConnected = () => {96return this.client.state == "connected";97};9899handleInterestUpdate = (update: InterestUpdate) => {100updateInterest(update, this.interest);101};102103private handleClientStateChanged = () => {104this.clientStateChanged = Date.now();105};106107howLongDisconnected = () => {108if (this.isConnected()) {109return 0;110}111return Date.now() - this.clientStateChanged;112};113114close = () => {115if (this.state == "closed") {116return;117}118this.state = "closed";119this.client.removeListener("connected", this.handleClientStateChanged);120this.client.removeListener("disconnected", this.handleClientStateChanged);121if (this.streams != null) {122this.streams.interest.removeListener("change", this.handleInterestUpdate);123this.streams.interest.close();124// @ts-ignore125delete this.streams;126}127this.client.close();128// @ts-ignore129delete this.client;130};131132hasInterest = (subject) => {133return this.interest.hasMatch(subject);134};135136waitForInterest = async (137subject: string,138timeout: number,139signal?: AbortSignal,140) => {141const hasMatch = this.interest.hasMatch(subject);142143if (hasMatch || !timeout) {144// NOTE: we never return the actual matches, since this is a145// potential security vulnerability.146// it could make it very easy to figure out private inboxes, etc.147return hasMatch;148}149const start = Date.now();150while (this.state != "closed" && !signal?.aborted) {151if (Date.now() - start >= timeout) {152throw Error("timeout");153}154await once(this.interest, "change");155if ((this.state as any) == "closed" || signal?.aborted) {156return false;157}158const hasMatch = this.interest.hasMatch(subject);159if (hasMatch) {160return true;161}162}163164return false;165};166167hash = (): { interest: number; } => {168return {169interest: hashInterest(this.interest),170};171};172}173174function clusterStreamNames({175clusterName,176id,177}: {178clusterName: string;179id: string;180}) {181return {182interest: `cluster/${clusterName}/${id}/interest`,183};184}185186export function clusterService({187id,188clusterName,189}: {190id: string;191clusterName: string;192}) {193return `persist:${clusterName}:${id}`;194}195196export async function createClusterPersistServer({197client,198id,199clusterName,200}: {201client: Client;202id: string;203clusterName: string;204}) {205const service = clusterService({ clusterName, id });206logger.debug("createClusterPersistServer: ", { service });207return await createPersistServer({ client, service });208}209210export interface ClusterStreams {211interest: DStream<InterestUpdate>;212}213214export async function clusterStreams({215client,216clusterName,217id,218}: {219client: Client;220clusterName: string;221id: string;222}): Promise<ClusterStreams> {223logger.debug("clusterStream: ", { clusterName, id });224if (!clusterName) {225throw Error("clusterName must be set");226}227const names = clusterStreamNames({ clusterName, id });228const opts = {229service: clusterService({ clusterName, id }),230noCache: true,231ephemeral: true,232};233const interest = await client.sync.dstream<InterestUpdate>({234noInventory: true,235name: names.interest,236...opts,237});238logger.debug("clusterStreams: got them", { clusterName });239return { interest };240}241242// Periodically delete not-necessary updates from the interest stream243export async function trimClusterStreams(244streams: ClusterStreams,245data: {246interest: Patterns<{ [queue: string]: Set<string> }>;247links: { interest: Patterns<{ [queue: string]: Set<string> }> }[];248},249// don't delete anything that isn't at lest minAge ms old.250minAge: number,251): Promise<{ seqsInterest: number[]; }> {252const { interest } = streams;253// First deal with interst254// we iterate over the interest stream checking for subjects255// with no current interest at all; in such cases it is safe256// to purge them entirely from the stream.257const seqs: number[] = [];258const now = Date.now();259for (let n = 0; n < interest.length; n++) {260const time = interest.time(n);261if (time == null) continue;262if (now - time.valueOf() <= minAge) {263break;264}265const update = interest.get(n) as InterestUpdate;266if (!data.interest.hasPattern(update.subject)) {267const seq = interest.seq(n);268if (seq != null) {269seqs.push(seq);270}271}272}273if (seqs.length > 0) {274// [ ] todo -- add to interest.delete a version where it takes an array of sequence numbers275logger.debug("trimClusterStream: trimming interest", { seqs });276await interest.delete({ seqs });277logger.debug("trimClusterStream: successfully trimmed interest", { seqs });278}279return { seqsInterest: seqs};280}281282function hashSet(X: Set<string>): number {283let h = 0;284for (const a of X) {285h += hash_string(a); // integers, and not too many, so should commute286}287return h;288}289290function hashInterestValue(X: { [queue: string]: Set<string> }): number {291let h = 0;292for (const queue in X) {293h += hashSet(X[queue]); // integers, and not too many, so should commute294}295return h;296}297298export function hashInterest(299interest: Patterns<{ [queue: string]: Set<string> }>,300): number {301return interest.hash(hashInterestValue);302}303304305306