Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/akv.ts
1710 views
1
/*
2
Asynchronous Memory-Efficient Access to Key:Value Store
3
4
This provides access to the same data as dkv, except it doesn't download any
5
data to the client until you actually call get. The calls to get and
6
set are thus async.
7
8
There is no need to close this because it is stateless.
9
10
[ ] TODO: efficiently get or set many values at once in a single call. This will be
11
very useful, e.g., for jupyter notebook timetravel browsing.
12
13
DEVELOPMENT:
14
15
~/cocalc/src/packages/backend$ node
16
17
a = await require("@cocalc/backend/conat/sync").dkv({name:'test'}); a.set('x',5)
18
19
20
b = require("@cocalc/backend/conat/sync").akv({name:'test'})
21
await b.set('x',10)
22
23
a.get('x')
24
25
await b.get('x')
26
27
*/
28
29
import {
30
type StorageOptions,
31
type PersistStreamClient,
32
stream,
33
} from "@cocalc/conat/persist/client";
34
import { type DKVOptions } from "./dkv";
35
import {
36
type Headers,
37
messageData,
38
type Message,
39
} from "@cocalc/conat/core/client";
40
import { storagePath, type User, COCALC_TOMBSTONE_HEADER } from "./core-stream";
41
import { connect } from "@cocalc/conat/core/client";
42
43
export class AKV<T = any> {
44
private storage: StorageOptions;
45
private user: User;
46
private stream: PersistStreamClient;
47
48
constructor(options: DKVOptions) {
49
this.user = {
50
account_id: options.account_id,
51
project_id: options.project_id,
52
};
53
this.storage = { path: storagePath(options) };
54
const client = options.client ?? connect();
55
this.stream = stream({
56
client,
57
user: this.user,
58
storage: this.storage,
59
service: options.service,
60
});
61
}
62
63
close = () => {
64
this.stream.close();
65
};
66
67
getMessage = async (
68
key: string,
69
{ timeout }: { timeout?: number } = {},
70
): Promise<Message<T> | undefined> => {
71
const mesg = await this.stream.get({ key, timeout });
72
if (mesg?.headers?.[COCALC_TOMBSTONE_HEADER]) {
73
return undefined;
74
}
75
return mesg;
76
};
77
78
// // Just get one value asynchronously, rather than the entire dkv.
79
// // If the timeout option is given and the value of key is not set,
80
// // will wait until that many ms to get the key.
81
get = async (
82
key: string,
83
opts?: { timeout?: number },
84
): Promise<T | undefined> => {
85
return (await this.getMessage(key, opts))?.data;
86
};
87
88
headers = async (
89
key: string,
90
opts?: { timeout?: number },
91
): Promise<Headers | undefined> => {
92
return (await this.getMessage(key, opts))?.headers;
93
};
94
95
time = async (
96
key: string,
97
opts?: { timeout?: number },
98
): Promise<Date | undefined> => {
99
const time = (await this.getMessage(key, opts))?.headers?.time;
100
return time !== undefined ? new Date(time as number) : undefined;
101
};
102
103
delete = async (key: string, opts?: { timeout?: number }): Promise<void> => {
104
await this.set(key, null as any, {
105
...opts,
106
headers: { [COCALC_TOMBSTONE_HEADER]: true },
107
});
108
};
109
110
seq = async (
111
key: string,
112
opts?: { timeout?: number },
113
): Promise<number | undefined> => {
114
return (await this.getMessage(key, opts))?.headers?.seq as
115
| number
116
| undefined;
117
};
118
119
set = async (
120
key: string,
121
value: T,
122
options?: {
123
headers?: Headers;
124
previousSeq?: number;
125
timeout?: number;
126
ttl?: number;
127
msgID?: string;
128
},
129
): Promise<{ seq: number; time: number }> => {
130
const { headers, ...options0 } = options ?? {};
131
return await this.stream.set({
132
key,
133
messageData: messageData(value, { headers }),
134
...options0,
135
});
136
};
137
138
keys = async ({ timeout }: { timeout?: number } = {}): Promise<string[]> => {
139
return await this.stream.keys({
140
timeout,
141
});
142
};
143
144
// WARNING/TODO: this getAll implementation is not at all clever!
145
// getAll = async () => {
146
// const v: { [key: string]: T } = {};
147
// for (const key of await this.keys()) {
148
// v[key] = await this.get(key);
149
// }
150
// return v;
151
// };
152
153
sqlite = async (
154
statement: string,
155
params?: any[],
156
{ timeout }: { timeout?: number } = {},
157
): Promise<any[]> => {
158
return await this.stream.sqlite({
159
timeout,
160
statement,
161
params,
162
});
163
};
164
}
165
166
export function akv<T>(opts: DKVOptions) {
167
return new AKV<T>(opts);
168
}
169
170