Path: blob/master/src/packages/conat/persist/client.ts
5805 views
import {1type Message as ConatMessage,2type Client,3type MessageData,4ConatError,5} from "@cocalc/conat/core/client";6import { type ConatSocketClient } from "@cocalc/conat/socket";7import { EventIterator } from "@cocalc/util/event-iterator";8import type {9StorageOptions,10Configuration,11SetOperation,12DeleteOperation,13StoredMessage,14PartialInventory,15} from "./storage";16export { StoredMessage, StorageOptions };17import { persistSubject, SERVICE, type User } from "./util";18import { assertHasWritePermission as assertHasWritePermission0 } from "./auth";19import { refCacheSync } from "@cocalc/util/refcache";20import { EventEmitter } from "events";21import { getLogger } from "@cocalc/conat/client";22import { until } from "@cocalc/util/async-utils";23import { getPersistServerId } from "./load-balancer";2425let DEFAULT_RECONNECT_DELAY = 1500;2627export function setDefaultReconnectDelay(delay) {28DEFAULT_RECONNECT_DELAY = delay;29}3031interface GetAllOpts {32start_seq?: number;33end_seq?: number;34timeout?: number;35maxWait?: number;36}3738const logger = getLogger("persist:client");3940export type ChangefeedEvent = (SetOperation | DeleteOperation)[];41export type Changefeed = EventIterator<ChangefeedEvent>;4243export { type PersistStreamClient };44class PersistStreamClient extends EventEmitter {45public socket: ConatSocketClient;46private changefeeds: any[] = [];47private state: "ready" | "closed" = "ready";48private lastSeq?: number;49private reconnecting = false;50private gettingMissed = false;51private changesWhenGettingMissed: ChangefeedEvent[] = [];52private reconnectTimer?: ReturnType<typeof setTimeout>;5354constructor(55private client: Client,56private storage: StorageOptions,57private user: User,58private service = SERVICE,59) {60super();61this.setMaxListeners(100);62// paths.add(this.storage.path);63logger.debug("constructor", this.storage);64this.init();65}6667private init = () => {68if (this.client.state == "closed") {69this.close();70return;71}72if (this.isClosed()) {73return;74}75this.socket?.close();76const subject = persistSubject({ ...this.user, service: this.service });77this.socket = this.client.socket.connect(subject, {78desc: `persist: ${this.storage.path}`,79reconnection: false,80loadBalancer: async (subject: string) =>81await getPersistServerId({ client: this.client, subject }),82});83logger.debug("init", this.storage.path, "connecting to ", subject);84this.socket.write({85storage: this.storage,86changefeed: this.changefeeds.length > 0,87});8889// get any messages from the stream that we missed while offline;90// this only matters if there are changefeeds.91if (this.reconnecting) {92this.getMissed();93}9495this.socket.once("disconnected", () => {96this.reconnecting = true;97this.socket.removeAllListeners();98this.scheduleReconnect();99});100this.socket.once("closed", () => {101this.reconnecting = true;102this.socket.removeAllListeners();103this.scheduleReconnect();104});105106this.socket.on("data", (updates, headers) => {107if (updates == null && headers != null) {108// has to be an error109this.emit(110"error",111new ConatError(headers?.error, { code: headers?.code }),112);113this.close();114return;115}116if (this.gettingMissed) {117this.changesWhenGettingMissed.push(updates);118} else {119this.changefeedEmit(updates);120}121});122};123124private getMissed = async () => {125if (this.changefeeds.length == 0 || this.state != "ready") {126return;127}128try {129this.gettingMissed = true;130this.changesWhenGettingMissed.length = 0;131132await until(133async () => {134if (this.changefeeds.length == 0 || this.state != "ready") {135return true;136}137try {138await this.socket.waitUntilReady(15000);139if (this.changefeeds.length == 0 || this.state != "ready") {140return true;141}142const resp = await this.socket.request(null, {143headers: {144cmd: "changefeed",145},146});147if (resp.headers?.error) {148throw new ConatError(`${resp.headers?.error}`, {149code: resp.headers?.code,150});151}152if (this.changefeeds.length == 0 || this.state != "ready") {153return true;154}155const updates = await this.getAll({156start_seq: this.lastSeq,157timeout: 15000,158});159this.changefeedEmit(updates);160return true;161} catch {162return false;163}164},165{ min: 2000, max: 15000 },166);167} finally {168if (this.state != "ready") {169return;170}171this.gettingMissed = false;172for (const updates of this.changesWhenGettingMissed) {173this.changefeedEmit(updates);174}175this.changesWhenGettingMissed.length = 0;176}177};178179private changefeedEmit = (updates: ChangefeedEvent) => {180updates = updates.filter((update) => {181if (update.op == "delete") {182return true;183} else {184if (update.seq > (this.lastSeq ?? 0)) {185this.lastSeq = update.seq;186return true;187}188}189return false;190});191if (updates.length == 0) {192return;193}194this.emit("changefeed", updates);195};196197private isClosed = () => this.state == "closed";198199private scheduleReconnect = () => {200if (this.state == "closed") {201return;202}203if (this.reconnectTimer != null) {204clearTimeout(this.reconnectTimer);205}206this.reconnectTimer = setTimeout(this.init, DEFAULT_RECONNECT_DELAY);207this.reconnectTimer.unref?.();208};209210close = () => {211logger.debug("close", this.storage);212// paths.delete(this.storage.path);213this.state = "closed";214this.emit("closed");215if (this.reconnectTimer != null) {216clearTimeout(this.reconnectTimer);217this.reconnectTimer = undefined;218}219for (const iter of this.changefeeds) {220iter.close();221this.changefeeds.length = 0;222}223this.socket.close();224};225226// The changefeed is *guaranteed* to deliver every message227// in the stream **exactly once and in order**, even if there228// are disconnects, failovers, etc. Dealing with dropped messages,229// duplicates, etc., is NOT the responsibility of clients.230changefeed = async (): Promise<Changefeed> => {231// activate changefeed mode (so server publishes updates -- this is idempotent)232const resp = await this.socket.request(null, {233headers: {234cmd: "changefeed",235},236});237if (resp.headers?.error) {238throw new ConatError(`${resp.headers?.error}`, {239code: resp.headers?.code,240});241}242// an iterator over any updates that are published.243const iter = new EventIterator<ChangefeedEvent>(this, "changefeed", {244map: (args) => args[0],245});246this.changefeeds.push(iter);247return iter;248};249250set = async ({251key,252ttl,253previousSeq,254msgID,255messageData,256timeout,257}: SetOptions & { timeout?: number }): Promise<{258seq: number;259time: number;260}> => {261return this.checkForError(262await this.socket.request(null, {263raw: messageData.raw,264encoding: messageData.encoding,265headers: {266headers: messageData.headers,267cmd: "set",268key,269ttl,270previousSeq,271msgID,272timeout,273},274timeout,275}),276);277};278279setMany = async (280ops: SetOptions[],281{ timeout }: { timeout?: number } = {},282): Promise<283({ seq: number; time: number } | { error: string; code?: any })[]284> => {285return this.checkForError(286await this.socket.request(ops, {287headers: {288cmd: "setMany",289timeout,290},291timeout,292}),293);294};295296delete = async ({297timeout,298seq,299seqs,300last_seq,301all,302}: {303timeout?: number;304seq?: number;305seqs?: number[];306last_seq?: number;307all?: boolean;308}): Promise<{ seqs: number[] }> => {309return this.checkForError(310await this.socket.request(null, {311headers: {312cmd: "delete",313seq,314seqs,315last_seq,316all,317timeout,318},319timeout,320}),321);322};323324config = async ({325config,326timeout,327}: {328config?: Partial<Configuration>;329timeout?: number;330} = {}): Promise<Configuration> => {331return this.checkForError(332await this.socket.request(null, {333headers: {334cmd: "config",335config,336timeout,337} as any,338timeout,339}),340);341};342343inventory = async (timeout?): Promise<PartialInventory> => {344return this.checkForError(345await this.socket.request(null, {346headers: {347cmd: "inventory",348} as any,349timeout,350}),351);352};353354get = async ({355seq,356key,357timeout,358}: {359timeout?: number;360} & (361| { seq: number; key?: undefined }362| { key: string; seq?: undefined }363)): Promise<ConatMessage | undefined> => {364const resp = await this.socket.request(null, {365headers: { cmd: "get", seq, key, timeout } as any,366timeout,367});368this.checkForError(resp, true);369if (resp.headers == null) {370return undefined;371}372return resp;373};374375// returns async iterator over arrays of stored messages.376// It's must safer to use getAll below, but less memory377// efficient.378async *getAllIter({379start_seq,380end_seq,381timeout,382maxWait,383}: GetAllOpts = {}): AsyncGenerator<StoredMessage[], void, unknown> {384if (this.isClosed()) {385// done386return;387}388const sub = await this.socket.requestMany(null, {389headers: {390cmd: "getAll",391start_seq,392end_seq,393timeout,394} as any,395timeout,396maxWait,397});398if (this.isClosed()) {399// done with this400return;401}402let seq = 0; // next expected seq number for the sub (not the data)403for await (const { data, headers } of sub) {404if (headers?.error) {405throw new ConatError(`${headers.error}`, { code: headers.code });406}407if (data == null || this.socket.state == "closed") {408// done409return;410}411if (typeof headers?.seq != "number" || headers?.seq != seq) {412throw new ConatError(413`data dropped, probably due to load -- please try again; expected seq=${seq}, but got ${headers?.seq}`,414{415code: 503,416},417);418} else {419seq = headers?.seq + 1;420}421yield data;422}423}424425getAll = async (opts: GetAllOpts = {}): Promise<StoredMessage[]> => {426// NOTE: We check messages.headers.seq (which has nothing to do with the stream seq numbers!)427// and make sure it counts from 0 up until done, and that nothing was missed.428// ONLY once that is done and we have everything do we call processPersistentMessages.429// Otherwise, just wait and try again from scratch. There's no socket or430// any other guarantees that messages aren't dropped since this is requestMany,431// and under load DEFINITELY messages can be dropped.432// This throws with code=503 if something goes wrong due to sequence numbers.433let messages: StoredMessage[] = [];434const sub = await this.getAllIter(opts);435if (this.isClosed()) {436throw Error("closed");437}438for await (const value of sub) {439messages = messages.concat(value);440}441if (this.isClosed()) {442throw Error("closed");443}444return messages;445};446447keys = async ({ timeout }: { timeout?: number } = {}): Promise<string[]> => {448return this.checkForError(449await this.socket.request(null, {450headers: { cmd: "keys", timeout } as any,451timeout,452}),453);454};455456sqlite = async ({457timeout,458statement,459params,460}: {461timeout?: number;462statement: string;463params?: any[];464}): Promise<any[]> => {465return this.checkForError(466await this.socket.request(null, {467headers: {468cmd: "sqlite",469statement,470params,471} as any,472timeout,473}),474);475};476477private checkForError = (mesg, noReturn = false) => {478if (mesg.headers != null) {479const { error, code } = mesg.headers;480if (error || code) {481throw new ConatError(error ?? "error", { code });482}483}484if (!noReturn) {485return mesg.data;486}487};488489// id of the remote server we're connected to490serverId = async () => {491return this.checkForError(492await this.socket.request(null, {493headers: { cmd: "serverId" },494}),495);496};497}498499export interface SetOptions {500messageData: MessageData;501key?: string;502ttl?: number;503previousSeq?: number;504msgID?: string;505timeout?: number;506}507508interface Options {509client: Client;510// who is accessing persistent storage511user: User;512// what storage they are accessing513storage: StorageOptions;514noCache?: boolean;515service?: string;516}517518export const stream = refCacheSync<Options, PersistStreamClient>({519name: "persistent-stream-client",520createKey: ({ user, storage, client, service = SERVICE }: Options) => {521return JSON.stringify([user, storage, client.id, service]);522},523createObject: ({ client, user, storage, service = SERVICE }: Options) => {524// avoid wasting server resources, etc., by always checking permissions client side first525assertHasWritePermission({ user, storage, service });526return new PersistStreamClient(client, storage, user, service);527},528});529530let permissionChecks = true;531export function disablePermissionCheck() {532if (!process.env.COCALC_TEST_MODE) {533throw Error("disabling permission check only allowed in test mode");534}535permissionChecks = false;536}537538const assertHasWritePermission = ({ user, storage, service }) => {539if (!permissionChecks) {540// should only be used for unit testing, since otherwise would541// make clients slower and possibly increase server load.542return;543}544const subject = persistSubject({ ...user, service });545assertHasWritePermission0({ subject, path: storage.path, service });546};547548549