Path: blob/master/src/packages/conat/sync/astream.ts
1710 views
/*1Asynchronous Memory Efficient Access to Core Stream.23This provides access to the same data as dstream, except it doesn't download any4data to the client until you actually call get. The calls to get and5set are thus async.67There is no need to close this because it is stateless.89[ ] TODO: efficiently get or set many values at once in a single call. This will be10very useful, e.g., for jupyter notebook timetravel browsing.1112DEVELOPMENT:1314~/cocalc/src/packages/backend$ node1516a = await require("@cocalc/backend/conat/sync").dstream({name:'test'})171819b = require("@cocalc/backend/conat/sync").astream({name:'test'})20const {seq} = await b.push('x')2122a.get() // ['x']2324await b.get(seq) // 'x'2526*/2728import {29type StorageOptions,30type PersistStreamClient,31stream,32} from "@cocalc/conat/persist/client";33import { type DStreamOptions } from "./dstream";34import {35type Headers,36messageData,37type Client,38Message,39decode,40} from "@cocalc/conat/core/client";41import { storagePath, type User } from "./core-stream";42import { connect } from "@cocalc/conat/core/client";43import { type Configuration } from "@cocalc/conat/persist/storage";4445export class AStream<T = any> {46private storage: StorageOptions;47private user: User;48private stream: PersistStreamClient;49private client: Client;5051constructor(options: DStreamOptions) {52this.user = {53account_id: options.account_id,54project_id: options.project_id,55};56this.storage = { path: storagePath(options) };57this.client = options.client ?? connect();58this.stream = stream({59client: this.client,60user: this.user,61storage: this.storage,62service: options.service,63});64}6566close = () => {67this.stream.close();68};6970getMessage = async (71seq_or_key: number | string,72{ timeout }: { timeout?: number } = {},73): Promise<Message<T> | undefined> => {74return await this.stream.get({75...opt(seq_or_key),76timeout,77});78};7980get = async (81seq_or_key: number | string,82opts?: { timeout?: number },83): Promise<T | undefined> => {84return (await this.getMessage(seq_or_key, opts))?.data;85};8687headers = async (88seq_or_key: number | string,89opts?: { timeout?: number },90): Promise<Headers | undefined> => {91return (await this.getMessage(seq_or_key, opts))?.headers;92};9394// This is an async iterator so you can iterate over the95// data without having to have it all in RAM at once.96// Of course, you can put it all in a single list if you want.97// It is NOT guaranteed to always work if the load is very heavy98// or network is flaky, but will return all data properly in order99// then throw an exception with code 503 rather than returning data100// with something skipped.101async *getAll(opts?): AsyncGenerator<102{103mesg: T;104headers?: Headers;105seq: number;106time: number;107key?: string;108},109void,110unknown111> {112for await (const messages of this.stream.getAllIter(opts)) {113for (const { seq, time, key, encoding, raw, headers } of messages) {114const mesg = decode({ encoding, data: raw });115yield { mesg, headers, seq, time, key };116}117}118}119120async *changefeed(): AsyncGenerator<121| {122op: "set";123mesg: T;124headers?: Headers;125seq: number;126time: number;127key?: string;128}129| { op: "delete"; seqs: number[] },130void,131unknown132> {133const cf = await this.stream.changefeed();134for await (const updates of cf) {135for (const event of updates) {136if (event.op == "delete") {137yield event;138} else {139const { seq, time, key, encoding, raw, headers } = event;140const mesg = decode({ encoding, data: raw });141yield { op: "set", mesg, headers, seq, time, key };142}143}144}145}146147delete = async (opts: {148timeout?: number;149seq?: number;150last_seq?: number;151all?: boolean;152}): Promise<{ seqs: number[] }> => {153return await this.stream.delete(opts);154};155156publish = async (157value: T,158options?: {159headers?: Headers;160previousSeq?: number;161timeout?: number;162key?: string;163ttl?: number;164msgID?: string;165},166): Promise<{ seq: number; time: number }> => {167const { headers, ...options0 } = options ?? {};168return await this.stream.set({169messageData: messageData(value, { headers }),170...options0,171});172};173174push = async (175...args: T[]176): Promise<({ seq: number; time: number } | { error: string })[]> => {177// [ ] TODO: should break this up into chunks with a limit on size.178const ops = args.map((mesg) => {179return { messageData: messageData(mesg) };180});181return await this.stream.setMany(ops);182};183184config = async (185config: Partial<Configuration> = {},186): Promise<Configuration> => {187if (this.storage == null) {188throw Error("bug -- storage must be set");189}190return await this.stream.config({ config });191};192193sqlite = async (194statement: string,195params?: any[],196{ timeout }: { timeout?: number } = {},197): Promise<any[]> => {198return await this.stream.sqlite({199timeout,200statement,201params,202});203};204}205206export function astream<T>(opts: DStreamOptions) {207return new AStream<T>(opts);208}209210function opt(seq_or_key: number | string): { seq: number } | { key: string } {211const t = typeof seq_or_key;212if (t == "number") {213return { seq: seq_or_key as number };214} else if (t == "string") {215return { key: seq_or_key as string };216}217throw Error(`arg must be number or string`);218}219220221