Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/astream.ts
1710 views
1
/*
2
Asynchronous Memory Efficient Access to Core Stream.
3
4
This provides access to the same data as dstream, except it doesn't download any
5
data to the client until you actually call get. The calls to get and
6
set are thus async.
7
8
There is no need to close this because it is stateless.
9
10
[ ] TODO: efficiently get or set many values at once in a single call. This will be
11
very useful, e.g., for jupyter notebook timetravel browsing.
12
13
DEVELOPMENT:
14
15
~/cocalc/src/packages/backend$ node
16
17
a = await require("@cocalc/backend/conat/sync").dstream({name:'test'})
18
19
20
b = require("@cocalc/backend/conat/sync").astream({name:'test'})
21
const {seq} = await b.push('x')
22
23
a.get() // ['x']
24
25
await b.get(seq) // 'x'
26
27
*/
28
29
import {
30
type StorageOptions,
31
type PersistStreamClient,
32
stream,
33
} from "@cocalc/conat/persist/client";
34
import { type DStreamOptions } from "./dstream";
35
import {
36
type Headers,
37
messageData,
38
type Client,
39
Message,
40
decode,
41
} from "@cocalc/conat/core/client";
42
import { storagePath, type User } from "./core-stream";
43
import { connect } from "@cocalc/conat/core/client";
44
import { type Configuration } from "@cocalc/conat/persist/storage";
45
46
export class AStream<T = any> {
47
private storage: StorageOptions;
48
private user: User;
49
private stream: PersistStreamClient;
50
private client: Client;
51
52
constructor(options: DStreamOptions) {
53
this.user = {
54
account_id: options.account_id,
55
project_id: options.project_id,
56
};
57
this.storage = { path: storagePath(options) };
58
this.client = options.client ?? connect();
59
this.stream = stream({
60
client: this.client,
61
user: this.user,
62
storage: this.storage,
63
service: options.service,
64
});
65
}
66
67
close = () => {
68
this.stream.close();
69
};
70
71
getMessage = async (
72
seq_or_key: number | string,
73
{ timeout }: { timeout?: number } = {},
74
): Promise<Message<T> | undefined> => {
75
return await this.stream.get({
76
...opt(seq_or_key),
77
timeout,
78
});
79
};
80
81
get = async (
82
seq_or_key: number | string,
83
opts?: { timeout?: number },
84
): Promise<T | undefined> => {
85
return (await this.getMessage(seq_or_key, opts))?.data;
86
};
87
88
headers = async (
89
seq_or_key: number | string,
90
opts?: { timeout?: number },
91
): Promise<Headers | undefined> => {
92
return (await this.getMessage(seq_or_key, opts))?.headers;
93
};
94
95
// This is an async iterator so you can iterate over the
96
// data without having to have it all in RAM at once.
97
// Of course, you can put it all in a single list if you want.
98
// It is NOT guaranteed to always work if the load is very heavy
99
// or network is flaky, but will return all data properly in order
100
// then throw an exception with code 503 rather than returning data
101
// with something skipped.
102
async *getAll(opts?): AsyncGenerator<
103
{
104
mesg: T;
105
headers?: Headers;
106
seq: number;
107
time: number;
108
key?: string;
109
},
110
void,
111
unknown
112
> {
113
for await (const messages of this.stream.getAllIter(opts)) {
114
for (const { seq, time, key, encoding, raw, headers } of messages) {
115
const mesg = decode({ encoding, data: raw });
116
yield { mesg, headers, seq, time, key };
117
}
118
}
119
}
120
121
async *changefeed(): AsyncGenerator<
122
| {
123
op: "set";
124
mesg: T;
125
headers?: Headers;
126
seq: number;
127
time: number;
128
key?: string;
129
}
130
| { op: "delete"; seqs: number[] },
131
void,
132
unknown
133
> {
134
const cf = await this.stream.changefeed();
135
for await (const updates of cf) {
136
for (const event of updates) {
137
if (event.op == "delete") {
138
yield event;
139
} else {
140
const { seq, time, key, encoding, raw, headers } = event;
141
const mesg = decode({ encoding, data: raw });
142
yield { op: "set", mesg, headers, seq, time, key };
143
}
144
}
145
}
146
}
147
148
delete = async (opts: {
149
timeout?: number;
150
seq?: number;
151
last_seq?: number;
152
all?: boolean;
153
}): Promise<{ seqs: number[] }> => {
154
return await this.stream.delete(opts);
155
};
156
157
publish = async (
158
value: T,
159
options?: {
160
headers?: Headers;
161
previousSeq?: number;
162
timeout?: number;
163
key?: string;
164
ttl?: number;
165
msgID?: string;
166
},
167
): Promise<{ seq: number; time: number }> => {
168
const { headers, ...options0 } = options ?? {};
169
return await this.stream.set({
170
messageData: messageData(value, { headers }),
171
...options0,
172
});
173
};
174
175
push = async (
176
...args: T[]
177
): Promise<({ seq: number; time: number } | { error: string })[]> => {
178
// [ ] TODO: should break this up into chunks with a limit on size.
179
const ops = args.map((mesg) => {
180
return { messageData: messageData(mesg) };
181
});
182
return await this.stream.setMany(ops);
183
};
184
185
config = async (
186
config: Partial<Configuration> = {},
187
): Promise<Configuration> => {
188
if (this.storage == null) {
189
throw Error("bug -- storage must be set");
190
}
191
return await this.stream.config({ config });
192
};
193
194
sqlite = async (
195
statement: string,
196
params?: any[],
197
{ timeout }: { timeout?: number } = {},
198
): Promise<any[]> => {
199
return await this.stream.sqlite({
200
timeout,
201
statement,
202
params,
203
});
204
};
205
}
206
207
export function astream<T>(opts: DStreamOptions) {
208
return new AStream<T>(opts);
209
}
210
211
function opt(seq_or_key: number | string): { seq: number } | { key: string } {
212
const t = typeof seq_or_key;
213
if (t == "number") {
214
return { seq: seq_or_key as number };
215
} else if (t == "string") {
216
return { key: seq_or_key as string };
217
}
218
throw Error(`arg must be number or string`);
219
}
220
221