Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/dko.ts
5801 views
1
/*
2
Distributed eventually consistent key:object store, where changes propogate sparsely.
3
4
The "values" MUST be objects and no keys or fields of objects can container the
5
sep character, which is '|' by default.
6
7
NOTE: Whenever you do a set, the lodash isEqual function is used to see which fields
8
you are setting are actually different, and only those get sync'd out.
9
This takes more resources on each client, but less on the network and servers.
10
It also means that if two clients write to an object at the same time but to
11
different field (a merge conflict), then the result gets merged together properly
12
with last write wins per field.
13
14
DEVELOPMENT:
15
16
~/cocalc/src/packages/backend n
17
> t = await require("@cocalc/backend/conat/sync").dko({name:'test'})
18
19
*/
20
21
import { EventEmitter } from "events";
22
import { dkv as createDKV, DKV, DKVOptions } from "./dkv";
23
import { is_object } from "@cocalc/util/misc";
24
import refCache from "@cocalc/util/refcache";
25
import jsonStableStringify from "json-stable-stringify";
26
import { isEqual } from "lodash";
27
28
export function userKvKey(options: DKVOptions) {
29
if (!options.name) {
30
throw Error("name must be specified");
31
}
32
const { client, ...x } = options;
33
return jsonStableStringify(x)!;
34
}
35
36
export class DKO<T = any> extends EventEmitter {
37
dkv?: DKV; // can't type this
38
39
constructor(private opts: DKVOptions) {
40
super();
41
return new Proxy(this, {
42
deleteProperty(target, prop) {
43
if (typeof prop == "string") {
44
target.delete(prop);
45
return true;
46
}
47
return false;
48
},
49
set(target, prop, value) {
50
prop = String(prop);
51
if (prop == "_eventsCount" || prop == "_events" || prop == "close") {
52
target[prop] = value;
53
return true;
54
}
55
if (target[prop] != null) {
56
throw Error(`method name '${prop}' is read only`);
57
}
58
target.set(prop, value);
59
return true;
60
},
61
get(target, prop) {
62
return target[String(prop)] ?? target.get(String(prop));
63
},
64
});
65
}
66
67
private dkvOnChange = ({ key: path, value }) => {
68
if (path == null) {
69
// TODO: could this happen?
70
return;
71
}
72
const { key, field } = this.fromPath(path);
73
if (!field) {
74
// there is no field part of the path, which happens
75
// only for delete of entire object, after setting all
76
// the fields to null.
77
this.emit("change", { key });
78
} else {
79
if (value === undefined && this.dkv?.get(key) == null) {
80
// don't emit change setting fields to undefined if the
81
// object was already deleted.
82
return;
83
}
84
this.emit("change", { key, field, value });
85
}
86
};
87
88
private dkvOnReject = ({ key: path, value }) => {
89
if (path == null) {
90
// TODO: would this happen?
91
return;
92
}
93
const { key, field } = this.fromPath(path);
94
if (!field) {
95
this.emit("reject", { key });
96
} else {
97
this.emit("reject", { key, field, value });
98
}
99
};
100
101
private initialized = false;
102
init = async () => {
103
if (this.initialized) {
104
throw Error("init can only be called once");
105
}
106
this.initialized = true;
107
this.dkv = await createDKV<{ [key: string]: any }>({
108
...this.opts,
109
name: dkoPrefix(this.opts.name),
110
});
111
this.dkv.on("change", this.dkvOnChange);
112
this.dkv.on("reject", this.dkvOnReject);
113
};
114
115
close = async () => {
116
if (this.dkv == null) {
117
return;
118
}
119
this.dkv.removeListener("change", this.dkvOnChange);
120
this.dkv.removeListener("reject", this.dkvOnReject);
121
await this.dkv.close();
122
delete this.dkv;
123
// @ts-ignore
124
delete this.opts;
125
this.emit("closed");
126
this.removeAllListeners();
127
};
128
129
// WARNING: Do *NOT* change toPath and fromPath except in a backward incompat
130
// way, since it would corrupt all user data involving this.
131
private toPath = (key: string, field: string): string => {
132
return JSON.stringify([key, field]);
133
};
134
135
private fromPath = (path: string): { key: string; field?: string } => {
136
if (path.startsWith("[") && path.endsWith("]")) {
137
// *might* be json encoded as above
138
try {
139
const v = JSON.parse(path);
140
if (v.length == 2) {
141
const [key, field] = v;
142
return { key, field };
143
} else {
144
throw Error("fallback");
145
}
146
} catch {
147
// it wasn't json encoded
148
// see https://github.com/sagemathinc/cocalc/issues/8386
149
return { key: path };
150
}
151
} else {
152
// not encoded since no field -- the value of this one is the list of keys
153
return { key: path };
154
}
155
};
156
157
delete = (key: string) => {
158
if (this.dkv == null) {
159
throw Error("closed");
160
}
161
const fields = this.dkv.get(key);
162
if (fields == null) {
163
return;
164
}
165
this.dkv.delete(key);
166
for (const field of fields) {
167
this.dkv.delete(this.toPath(key, field));
168
}
169
};
170
171
clear = () => {
172
this.dkv?.clear();
173
};
174
175
get = (key: string): T | undefined => {
176
if (this.dkv == null) {
177
throw Error("closed");
178
}
179
const fields = this.dkv.get(key);
180
if (fields == null) {
181
return undefined;
182
}
183
const x: any = {};
184
try {
185
for (const field of fields) {
186
x[field] = this.dkv.get(this.toPath(key, field));
187
}
188
return x;
189
} catch {
190
return undefined;
191
}
192
};
193
194
has = (key: string): boolean => {
195
if (this.dkv == null) {
196
throw Error("closed");
197
}
198
return this.dkv.has(key);
199
};
200
201
getAll = (): { [key: string]: T } => {
202
// get everything
203
if (this.dkv == null) {
204
throw Error("closed");
205
}
206
const all = this.dkv.getAll();
207
const result: any = {};
208
for (const x in all) {
209
const { key, field } = this.fromPath(x);
210
if (!field) {
211
continue;
212
}
213
if (result[key] == null) {
214
result[key] = { [field]: all[x] };
215
} else {
216
result[key][field] = all[x];
217
}
218
}
219
return result;
220
};
221
222
debugStats = () => {
223
return this.dkv?.debugStats();
224
};
225
226
set = (key: string, obj: T) => {
227
if (this.dkv == null) {
228
throw Error("closed");
229
}
230
if (obj == null) {
231
this.delete(key);
232
return;
233
}
234
if (!is_object(obj)) {
235
throw Error("values must be objects");
236
}
237
const fields = Object.keys(obj);
238
const cur = this.dkv.get(key);
239
if (!isEqual(cur, fields)) {
240
this.dkv.set(key, fields);
241
}
242
for (const field of fields) {
243
const path = this.toPath(key, field);
244
const value = obj[field];
245
const cur = this.dkv.get(path);
246
if (!isEqual(cur, value)) {
247
this.dkv.set(path, value);
248
}
249
}
250
};
251
252
hasUnsavedChanges = (): boolean => {
253
return !!this.dkv?.hasUnsavedChanges();
254
};
255
256
unsavedChanges = (): { key: string; field: string }[] => {
257
const dkv = this.dkv;
258
if (dkv == null) {
259
return [];
260
}
261
const v = dkv.unsavedChanges();
262
const w: { key: string; field: string }[] = [];
263
for (const path of v) {
264
const { key, field } = this.fromPath(path);
265
if (field) {
266
w.push({ key, field });
267
}
268
}
269
return w;
270
};
271
272
save = async () => {
273
await this.dkv?.save();
274
};
275
}
276
277
export const cache = refCache<DKVOptions, DKO>({
278
name: "dko",
279
createKey: userKvKey,
280
createObject: async (opts) => {
281
const k = new DKO(opts);
282
await k.init();
283
return k;
284
},
285
});
286
287
// WARNING: changing this or it will silently delete user data.
288
export const DKO_PREFIX = "__dko__";
289
290
function dkoPrefix(name: string): string {
291
return `${DKO_PREFIX}${name}`;
292
}
293
294
export async function dko<T>(options: DKVOptions): Promise<DKO<T>> {
295
return await cache(options);
296
}
297
298