Path: blob/master/src/packages/conat/persist/client.ts
1710 views
import {1type Message as ConatMessage,2type Client,3type MessageData,4ConatError,5} from "@cocalc/conat/core/client";6import { type ConatSocketClient } from "@cocalc/conat/socket";7import { EventIterator } from "@cocalc/util/event-iterator";8import type {9StorageOptions,10Configuration,11SetOperation,12DeleteOperation,13StoredMessage,14PartialInventory,15} from "./storage";16export { StoredMessage, StorageOptions };17import { persistSubject, SERVICE, type User } from "./util";18import { assertHasWritePermission as assertHasWritePermission0 } from "./auth";19import { refCacheSync } from "@cocalc/util/refcache";20import { EventEmitter } from "events";21import { getLogger } from "@cocalc/conat/client";22import { until } from "@cocalc/util/async-utils";2324let DEFAULT_RECONNECT_DELAY = 1500;2526export function setDefaultReconnectDelay(delay) {27DEFAULT_RECONNECT_DELAY = delay;28}2930interface GetAllOpts {31start_seq?: number;32end_seq?: number;33timeout?: number;34maxWait?: number;35}3637const logger = getLogger("persist:client");3839export type ChangefeedEvent = (SetOperation | DeleteOperation)[];40export type Changefeed = EventIterator<ChangefeedEvent>;4142export { type PersistStreamClient };43class PersistStreamClient extends EventEmitter {44public socket: ConatSocketClient;45private changefeeds: any[] = [];46private state: "ready" | "closed" = "ready";47private lastSeq?: number;48private reconnecting = false;49private gettingMissed = false;50private changesWhenGettingMissed: ChangefeedEvent[] = [];5152constructor(53private client: Client,54private storage: StorageOptions,55private user: User,56private service = SERVICE,57) {58super();59this.setMaxListeners(100);60// paths.add(this.storage.path);61logger.debug("constructor", this.storage);62this.init();63}6465private init = () => {66if (this.client.state == "closed") {67this.close();68return;69}70if (this.isClosed()) {71return;72}73this.socket?.close();74const subject = persistSubject({ ...this.user, service: this.service });75this.socket = this.client.socket.connect(subject, {76desc: `persist: ${this.storage.path}`,77reconnection: false,78});79logger.debug("init", this.storage.path, "connecting to ", subject);80this.socket.write({81storage: this.storage,82changefeed: this.changefeeds.length > 0,83});8485// get any messages from the stream that we missed while offline;86// this only matters if there are changefeeds.87if (this.reconnecting) {88this.getMissed();89}9091this.socket.once("disconnected", () => {92this.reconnecting = true;93this.socket.removeAllListeners();94setTimeout(this.init, DEFAULT_RECONNECT_DELAY);95});96this.socket.once("closed", () => {97this.reconnecting = true;98this.socket.removeAllListeners();99setTimeout(this.init, DEFAULT_RECONNECT_DELAY);100});101102this.socket.on("data", (updates, headers) => {103if (updates == null && headers != null) {104// has to be an error105this.emit(106"error",107new ConatError(headers?.error, { code: headers?.code }),108);109this.close();110return;111}112if (this.gettingMissed) {113this.changesWhenGettingMissed.push(updates);114} else {115this.changefeedEmit(updates);116}117});118};119120private getMissed = async () => {121if (this.changefeeds.length == 0 || this.state != "ready") {122return;123}124try {125this.gettingMissed = true;126this.changesWhenGettingMissed.length = 0;127128await until(129async () => {130if (this.changefeeds.length == 0 || this.state != "ready") {131return true;132}133try {134await this.socket.waitUntilReady(15000);135if (this.changefeeds.length == 0 || this.state != "ready") {136return true;137}138const resp = await this.socket.request(null, {139headers: {140cmd: "changefeed",141},142});143if (resp.headers?.error) {144throw new ConatError(`${resp.headers?.error}`, {145code: resp.headers?.code,146});147}148if (this.changefeeds.length == 0 || this.state != "ready") {149return true;150}151const updates = await this.getAll({152start_seq: this.lastSeq,153timeout: 15000,154});155this.changefeedEmit(updates);156return true;157} catch {158return false;159}160},161{ min: 2000, max: 15000 },162);163} finally {164if (this.state != "ready") {165return;166}167this.gettingMissed = false;168for (const updates of this.changesWhenGettingMissed) {169this.changefeedEmit(updates);170}171this.changesWhenGettingMissed.length = 0;172}173};174175private changefeedEmit = (updates: ChangefeedEvent) => {176updates = updates.filter((update) => {177if (update.op == "delete") {178return true;179} else {180if (update.seq > (this.lastSeq ?? 0)) {181this.lastSeq = update.seq;182return true;183}184}185return false;186});187if (updates.length == 0) {188return;189}190this.emit("changefeed", updates);191};192193private isClosed = () => this.state == "closed";194195close = () => {196logger.debug("close", this.storage);197// paths.delete(this.storage.path);198this.state = "closed";199this.emit("closed");200for (const iter of this.changefeeds) {201iter.close();202this.changefeeds.length = 0;203}204this.socket.close();205};206207// The changefeed is *guaranteed* to deliver every message208// in the stream **exactly once and in order**, even if there209// are disconnects, failovers, etc. Dealing with dropped messages,210// duplicates, etc., is NOT the responsibility of clients.211changefeed = async (): Promise<Changefeed> => {212// activate changefeed mode (so server publishes updates -- this is idempotent)213const resp = await this.socket.request(null, {214headers: {215cmd: "changefeed",216},217});218if (resp.headers?.error) {219throw new ConatError(`${resp.headers?.error}`, {220code: resp.headers?.code,221});222}223// an iterator over any updates that are published.224const iter = new EventIterator<ChangefeedEvent>(this, "changefeed", {225map: (args) => args[0],226});227this.changefeeds.push(iter);228return iter;229};230231set = async ({232key,233ttl,234previousSeq,235msgID,236messageData,237timeout,238}: SetOptions & { timeout?: number }): Promise<{239seq: number;240time: number;241}> => {242return this.checkForError(243await this.socket.request(null, {244raw: messageData.raw,245encoding: messageData.encoding,246headers: {247headers: messageData.headers,248cmd: "set",249key,250ttl,251previousSeq,252msgID,253timeout,254},255timeout,256}),257);258};259260setMany = async (261ops: SetOptions[],262{ timeout }: { timeout?: number } = {},263): Promise<264({ seq: number; time: number } | { error: string; code?: any })[]265> => {266return this.checkForError(267await this.socket.request(ops, {268headers: {269cmd: "setMany",270timeout,271},272timeout,273}),274);275};276277delete = async ({278timeout,279seq,280seqs,281last_seq,282all,283}: {284timeout?: number;285seq?: number;286seqs?: number[];287last_seq?: number;288all?: boolean;289}): Promise<{ seqs: number[] }> => {290return this.checkForError(291await this.socket.request(null, {292headers: {293cmd: "delete",294seq,295seqs,296last_seq,297all,298timeout,299},300timeout,301}),302);303};304305config = async ({306config,307timeout,308}: {309config?: Partial<Configuration>;310timeout?: number;311} = {}): Promise<Configuration> => {312return this.checkForError(313await this.socket.request(null, {314headers: {315cmd: "config",316config,317timeout,318} as any,319timeout,320}),321);322};323324inventory = async (timeout?): Promise<PartialInventory> => {325return this.checkForError(326await this.socket.request(null, {327headers: {328cmd: "inventory",329} as any,330timeout,331}),332);333};334335get = async ({336seq,337key,338timeout,339}: {340timeout?: number;341} & (342| { seq: number; key?: undefined }343| { key: string; seq?: undefined }344)): Promise<ConatMessage | undefined> => {345const resp = await this.socket.request(null, {346headers: { cmd: "get", seq, key, timeout } as any,347timeout,348});349this.checkForError(resp, true);350if (resp.headers == null) {351return undefined;352}353return resp;354};355356// returns async iterator over arrays of stored messages.357// It's must safer to use getAll below, but less memory358// efficient.359async *getAllIter({360start_seq,361end_seq,362timeout,363maxWait,364}: GetAllOpts = {}): AsyncGenerator<StoredMessage[], void, unknown> {365if (this.isClosed()) {366// done367return;368}369const sub = await this.socket.requestMany(null, {370headers: {371cmd: "getAll",372start_seq,373end_seq,374timeout,375} as any,376timeout,377maxWait,378});379if (this.isClosed()) {380// done with this381return;382}383let seq = 0; // next expected seq number for the sub (not the data)384for await (const { data, headers } of sub) {385if (headers?.error) {386throw new ConatError(`${headers.error}`, { code: headers.code });387}388if (data == null || this.socket.state == "closed") {389// done390return;391}392if (typeof headers?.seq != "number" || headers?.seq != seq) {393throw new ConatError(394`data dropped, probably due to load -- please try again; expected seq=${seq}, but got ${headers?.seq}`,395{396code: 503,397},398);399} else {400seq = headers?.seq + 1;401}402yield data;403}404}405406getAll = async (opts: GetAllOpts = {}): Promise<StoredMessage[]> => {407// NOTE: We check messages.headers.seq (which has nothing to do with the stream seq numbers!)408// and make sure it counts from 0 up until done, and that nothing was missed.409// ONLY once that is done and we have everything do we call processPersistentMessages.410// Otherwise, just wait and try again from scratch. There's no socket or411// any other guarantees that messages aren't dropped since this is requestMany,412// and under load DEFINITELY messages can be dropped.413// This throws with code=503 if something goes wrong due to sequence numbers.414let messages: StoredMessage[] = [];415const sub = await this.getAllIter(opts);416if (this.isClosed()) {417throw Error("closed");418}419for await (const value of sub) {420messages = messages.concat(value);421}422if (this.isClosed()) {423throw Error("closed");424}425return messages;426};427428keys = async ({ timeout }: { timeout?: number } = {}): Promise<string[]> => {429return this.checkForError(430await this.socket.request(null, {431headers: { cmd: "keys", timeout } as any,432timeout,433}),434);435};436437sqlite = async ({438timeout,439statement,440params,441}: {442timeout?: number;443statement: string;444params?: any[];445}): Promise<any[]> => {446return this.checkForError(447await this.socket.request(null, {448headers: {449cmd: "sqlite",450statement,451params,452} as any,453timeout,454}),455);456};457458private checkForError = (mesg, noReturn = false) => {459if (mesg.headers != null) {460const { error, code } = mesg.headers;461if (error || code) {462throw new ConatError(error ?? "error", { code });463}464}465if (!noReturn) {466return mesg.data;467}468};469470// id of the remote server we're connected to471serverId = async () => {472return this.checkForError(473await this.socket.request(null, {474headers: { cmd: "serverId" },475}),476);477};478}479480export interface SetOptions {481messageData: MessageData;482key?: string;483ttl?: number;484previousSeq?: number;485msgID?: string;486timeout?: number;487}488489interface Options {490client: Client;491// who is accessing persistent storage492user: User;493// what storage they are accessing494storage: StorageOptions;495noCache?: boolean;496service?: string;497}498499export const stream = refCacheSync<Options, PersistStreamClient>({500name: "persistent-stream-client",501createKey: ({ user, storage, client, service = SERVICE }: Options) => {502return JSON.stringify([user, storage, client.id, service]);503},504createObject: ({ client, user, storage, service = SERVICE }: Options) => {505// avoid wasting server resources, etc., by always checking permissions client side first506assertHasWritePermission({ user, storage, service });507return new PersistStreamClient(client, storage, user, service);508},509});510511let permissionChecks = true;512export function disablePermissionCheck() {513if (!process.env.COCALC_TEST_MODE) {514throw Error("disabling permission check only allowed in test mode");515}516permissionChecks = false;517}518519const assertHasWritePermission = ({ user, storage, service }) => {520if (!permissionChecks) {521// should only be used for unit testing, since otherwise would522// make clients slower and possibly increase server load.523return;524}525const subject = persistSubject({ ...user, service });526assertHasWritePermission0({ subject, path: storage.path, service });527};528529530