Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/dkv.ts
1710 views
1
/*
2
Eventually Consistent Distributed Key:Value Store
3
4
- You give one subject and general-dkv provides a synchronous eventually consistent
5
"multimaster" distributed way to work with the KV store of keys matching that subject,
6
inside of the named KV store.
7
8
- You may define a 3-way merge function, which is used to automatically resolve all
9
conflicting writes. The default is to use our local version, i.e., "last write
10
to remote wins". The function is run locally so can have access to any state.
11
12
- All set/get/delete operations are synchronous.
13
14
- The state gets sync'd in the backend to persistent storage on Conat as soon as possible,
15
and there is an async save function.
16
17
This class is based on top of the Consistent Centralized Key:Value Store defined in kv.ts.
18
You can use the same key:value store at the same time via both interfaces, and if the store
19
is a DKV, you can also access the underlying KV via "store.kv".
20
21
- You must explicitly call "await store.init()" to initialize this before using it.
22
23
- The store emits an event ('change', key) whenever anything changes.
24
25
- Calling "store.getAll()" provides ALL the data, and "store.get(key)" gets one value.
26
27
- Use "store.set(key,value)" or "store.set({key:value, key2:value2, ...})" to set data,
28
with the following semantics:
29
30
- in the background, changes propagate to Conat. You do not do anything explicitly and
31
this should never raise an exception.
32
33
- you can call "store.hasUnsavedChanges()" to see if there are any unsaved changes.
34
35
- call "store.unsavedChanges()" to see the unsaved keys.
36
37
- The 3-way merge function takes as input {local,remote,prev,key}, where
38
- key = the key where there's a conflict
39
- local = your version of the value
40
- remote = the remote value, which conflicts in that isEqual(local,remote) is false.
41
- prev = a known common prev of local and remote.
42
43
(any of local, remote or prev can be undefined, e.g., no previous value or a key was deleted)
44
45
You can do anything synchronously you want to resolve such conflicts, i.e., there are no
46
axioms that have to be satisifed. If the 3-way merge function throws an exception (or is
47
not specified) we silently fall back to "last write wins".
48
49
50
DEVELOPMENT:
51
52
~/cocalc/src/packages/backend$ node
53
54
s = await require("@cocalc/backend/conat/sync").dkv({name:'test', merge:({local,remote})=>{return {...remote,...local}}});
55
56
57
In the browser console:
58
59
> s = await cc.client.conat_client.dkv({filter:['foo.>'],merge:({local,remote})=>{return {...remote,...local}}})
60
61
# NOTE that the name is account-{account_id} or project-{project_id},
62
# and if not given defaults to the account-{user's account id}
63
> s.kv.name
64
'account-6aae57c6-08f1-4bb5-848b-3ceb53e61ede'
65
66
> s.on('change',(key)=>console.log(key));0;
67
68
*/
69
70
import { EventEmitter } from "events";
71
import {
72
CoreStream,
73
type Configuration,
74
type ChangeEvent,
75
} from "./core-stream";
76
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
77
import { isEqual } from "lodash";
78
import { delay, map as awaitMap } from "awaiting";
79
import {
80
type Client,
81
ConatError,
82
type Headers,
83
} from "@cocalc/conat/core/client";
84
import refCache from "@cocalc/util/refcache";
85
import { type JSONValue } from "@cocalc/util/types";
86
import { conat } from "@cocalc/conat/client";
87
import { asyncThrottle, until } from "@cocalc/util/async-utils";
88
import {
89
inventory,
90
type Inventory,
91
INVENTORY_UPDATE_INTERVAL,
92
} from "./inventory";
93
94
export const TOMBSTONE = Symbol("tombstone");
95
const MAX_PARALLEL = 250;
96
97
const DEBUG = false;
98
99
export type MergeFunction = (opts: {
100
key: string;
101
prev: any;
102
local: any;
103
remote: any;
104
}) => any;
105
106
interface SetOptions {
107
headers?: Headers;
108
}
109
110
export interface DKVOptions {
111
name: string;
112
account_id?: string;
113
project_id?: string;
114
desc?: JSONValue;
115
client?: Client;
116
// 3-way merge conflict resolution
117
merge?: (opts: { key: string; prev?: any; local?: any; remote?: any }) => any;
118
config?: Partial<Configuration>;
119
120
// if noAutosave is set, local changes are never saved until you explicitly
121
// call "await this.save()", which will try once to save. Changes made during
122
// the save may not be saved though.
123
// CAUTION: noAutosave is really only meant for unit testing! The save is
124
// reuseInFlighted so a safe somewhere far away could be in progress starting
125
// before your call to save, and when it finishes that's it, so what you just
126
// did is not saved. Take care.
127
noAutosave?: boolean;
128
129
ephemeral?: boolean;
130
sync?: boolean;
131
132
noCache?: boolean;
133
noInventory?: boolean;
134
service?: string;
135
}
136
137
export class DKV<T = any> extends EventEmitter {
138
private kv?: CoreStream<T>;
139
private merge?: MergeFunction;
140
private local: { [key: string]: T | typeof TOMBSTONE } = {};
141
private options: { [key: string]: SetOptions } = {};
142
private saved: { [key: string]: T | typeof TOMBSTONE } = {};
143
private changed: Set<string> = new Set();
144
private noAutosave: boolean;
145
public readonly name: string;
146
public readonly desc?: JSONValue;
147
private saveErrors: boolean = false;
148
private invalidSeq = new Set<number>();
149
private opts: DKVOptions;
150
151
constructor(opts: DKVOptions) {
152
super();
153
if (opts.client == null) {
154
throw Error("client must be specified");
155
}
156
this.opts = opts;
157
const {
158
name,
159
project_id,
160
account_id,
161
desc,
162
client,
163
merge,
164
config,
165
noAutosave,
166
ephemeral,
167
sync,
168
service,
169
} = opts;
170
this.name = name;
171
this.desc = desc;
172
this.merge = merge;
173
this.noAutosave = !!noAutosave;
174
this.kv = new CoreStream({
175
name,
176
project_id,
177
account_id,
178
client,
179
config,
180
ephemeral,
181
sync,
182
service,
183
});
184
185
return new Proxy(this, {
186
deleteProperty(target, prop) {
187
if (typeof prop == "string") {
188
target.delete(prop);
189
}
190
return true;
191
},
192
set(target, prop, value) {
193
prop = String(prop);
194
if (prop == "_eventsCount" || prop == "_events" || prop == "close") {
195
target[prop] = value;
196
return true;
197
}
198
if (target[prop] != null) {
199
throw Error(`method name '${prop}' is read only`);
200
}
201
target.set(prop, value);
202
return true;
203
},
204
get(target, prop) {
205
return target[String(prop)] ?? target.get(String(prop));
206
},
207
});
208
}
209
210
private initialized = false;
211
init = async () => {
212
if (this.initialized) {
213
throw Error("init can only be called once");
214
}
215
this.initialized = true;
216
if (this.kv == null) {
217
throw Error("closed");
218
}
219
this.kv.on("change", this.handleRemoteChange);
220
await this.kv.init();
221
// allow_msg_ttl is used for deleting tombstones.
222
await this.kv.config({ allow_msg_ttl: true });
223
this.emit("connected");
224
};
225
226
isClosed = () => {
227
return this.kv == null;
228
};
229
230
close = () => {
231
if (this.isClosed()) {
232
return;
233
}
234
const kv = this.kv;
235
delete this.kv;
236
if (kv != null) {
237
kv.removeListener("change", this.handleRemoteChange);
238
kv.close();
239
}
240
this.emit("closed");
241
this.removeAllListeners();
242
// @ts-ignore
243
delete this.local;
244
// @ts-ignore
245
delete this.options;
246
// @ts-ignore
247
delete this.changed;
248
delete this.merge;
249
// @ts-ignore
250
delete this.opts;
251
};
252
253
private discardLocalState = (key: string) => {
254
delete this.local[key];
255
delete this.options[key];
256
delete this.saved[key];
257
if (this.isStable()) {
258
this.emit("stable");
259
}
260
};
261
262
// stable = everything is saved *and* also echoed back from the server as confirmation.
263
isStable = () => {
264
for (const _ in this.local) {
265
return false;
266
}
267
return true;
268
};
269
270
private handleRemoteChange = ({
271
mesg: remote,
272
key,
273
prev,
274
}: ChangeEvent<T>) => {
275
if (key === undefined) {
276
// not part of kv store data
277
return;
278
}
279
const local = this.local[key] === TOMBSTONE ? undefined : this.local[key];
280
let value: any = remote;
281
if (local !== undefined) {
282
// we have an unsaved local value, so let's check to see if there is a
283
// conflict or not.
284
if (isEqual(local, remote)) {
285
// incoming remote value is equal to unsaved local value, so we can
286
// just discard our local value (no need to save it).
287
this.discardLocalState(key);
288
} else {
289
// There is a conflict. Let's resolve the conflict:
290
// console.log("merge conflict", { key, remote, local, prev });
291
try {
292
value = this.merge?.({ key, local, remote, prev }) ?? local;
293
// console.log("merge conflict --> ", value);
294
// console.log("handle merge conflict", {
295
// key,
296
// local,
297
// remote,
298
// prev,
299
// value,
300
// });
301
} catch (err) {
302
console.warn("exception in merge conflict resolution", err);
303
// user provided a merge function that throws an exception. We select local, since
304
// it is the newest, i.e., "last write wins"
305
value = local;
306
// console.log("merge conflict ERROR --> ", err, value);
307
}
308
if (isEqual(value, remote)) {
309
// no change, so forget our local value
310
this.discardLocalState(key);
311
} else {
312
// resolve with the new value, or if it is undefined, a TOMBSTONE,
313
// meaning choice is to delete.
314
// console.log("conflict resolution: ", { key, value });
315
if (value === TOMBSTONE) {
316
this.delete(key);
317
} else {
318
this.set(key, value);
319
}
320
}
321
}
322
}
323
this.emit("change", { key, value, prev });
324
};
325
326
get(key: string): T | undefined;
327
get(): { [key: string]: T };
328
get(key?: string): T | { [key: string]: T } | undefined {
329
if (this.kv == null) {
330
throw Error("closed");
331
}
332
if (key === undefined) {
333
return this.getAll();
334
}
335
const local = this.local[key];
336
if (local === TOMBSTONE) {
337
return undefined;
338
}
339
if (local !== undefined) {
340
return local;
341
}
342
return this.kv.getKv(key);
343
}
344
345
get length(): number {
346
// not efficient
347
return Object.keys(this.getAll()).length;
348
}
349
350
getAll = (): { [key: string]: T } => {
351
if (this.kv == null) {
352
throw Error("closed");
353
}
354
const x = { ...this.kv.getAllKv(), ...this.local };
355
for (const key in this.local) {
356
if (this.local[key] === TOMBSTONE) {
357
delete x[key];
358
}
359
}
360
return x as { [key: string]: T };
361
};
362
363
// gets all the keys; fast because doesn't decode messages
364
keys = (): string[] => {
365
if (this.kv == null) {
366
return [];
367
}
368
// this is fast
369
const keys = this.kv.keysKv();
370
371
// have to add any unsaved keys in this.local
372
let X: Set<string> | null = null;
373
for (const key in this.local) {
374
if (X === null) {
375
X = new Set(keys);
376
}
377
if (!X.has(key)) {
378
keys.push(key);
379
}
380
}
381
return keys;
382
};
383
384
has = (key: string): boolean => {
385
if (this.kv == null) {
386
throw Error("closed");
387
}
388
const a = this.local[key];
389
if (a === TOMBSTONE) {
390
return false;
391
}
392
if (a !== undefined) {
393
return true;
394
}
395
return this.kv.hasKv(key);
396
};
397
398
time = (key?: string): { [key: string]: Date } | Date | undefined => {
399
if (this.kv == null) {
400
throw Error("closed");
401
}
402
return this.kv.timeKv(key);
403
};
404
405
seq = (key: string): number | undefined => {
406
if (this.kv == null) {
407
throw Error("closed");
408
}
409
return this.kv.seqKv(key);
410
};
411
412
private _delete = (key) => {
413
this.local[key] = TOMBSTONE;
414
this.changed.add(key);
415
};
416
417
delete = (key) => {
418
this._delete(key);
419
if (!this.noAutosave) {
420
this.save();
421
}
422
};
423
424
clear = () => {
425
if (this.kv == null) {
426
throw Error("closed");
427
}
428
for (const key in this.kv.getAllKv()) {
429
this._delete(key);
430
}
431
for (const key in this.local) {
432
this._delete(key);
433
}
434
if (!this.noAutosave) {
435
this.save();
436
}
437
};
438
439
private toValue = (obj) => {
440
if (obj === undefined) {
441
return TOMBSTONE;
442
}
443
return obj;
444
};
445
446
headers = (key: string): Headers | undefined => {
447
if (this.options[key] != null) {
448
return this.options[key]?.headers;
449
} else {
450
return this.kv?.headersKv(key);
451
}
452
};
453
454
set = (key: string, value: T, options?: SetOptions) => {
455
const obj = this.toValue(value);
456
this.local[key] = obj;
457
if (options != null) {
458
this.options[key] = options;
459
}
460
this.changed.add(key);
461
if (!this.noAutosave) {
462
this.save();
463
}
464
this.updateInventory();
465
};
466
467
setMany = (obj) => {
468
for (const key in obj) {
469
this.local[key] = this.toValue(obj[key]);
470
this.changed.add(key);
471
}
472
if (!this.noAutosave) {
473
this.save();
474
}
475
this.updateInventory();
476
};
477
478
hasUnsavedChanges = () => {
479
if (this.kv == null) {
480
return false;
481
}
482
return this.unsavedChanges().length > 0;
483
};
484
485
unsavedChanges = (): string[] => {
486
return Object.keys(this.local).filter(
487
(key) => this.local[key] !== this.saved[key],
488
);
489
};
490
491
save = reuseInFlight(async () => {
492
if (this.noAutosave) {
493
return await this.attemptToSave();
494
}
495
let status;
496
497
await until(
498
async () => {
499
if (this.kv == null) {
500
return true;
501
}
502
try {
503
status = await this.attemptToSave();
504
//console.log("successfully saved");
505
} catch (err) {
506
if (false && !process.env.COCALC_TEST_MODE) {
507
console.log(
508
"WARNING: dkv attemptToSave failed -- ",
509
this.name,
510
this.kv?.name,
511
err,
512
);
513
}
514
}
515
return !this.hasUnsavedChanges();
516
},
517
{ start: 150, decay: 1.3, max: 10000 },
518
);
519
return status;
520
});
521
522
private attemptToSave = async () => {
523
if (true) {
524
await this.attemptToSaveMany();
525
} else {
526
await this.attemptToSaveParallel();
527
}
528
};
529
530
private attemptToSaveMany = reuseInFlight(async () => {
531
let start = Date.now();
532
if (DEBUG) {
533
console.log("attemptToSaveMany: start");
534
}
535
if (this.kv == null) {
536
throw Error("closed");
537
}
538
this.changed.clear();
539
const status = { unsaved: 0, set: 0, delete: 0 };
540
const obj = { ...this.local };
541
for (const key in obj) {
542
if (obj[key] === TOMBSTONE) {
543
status.unsaved += 1;
544
await this.kv.deleteKv(key);
545
if (this.kv == null) return;
546
status.delete += 1;
547
status.unsaved -= 1;
548
delete obj[key];
549
if (!this.changed.has(key)) {
550
// successfully saved this and user didn't make a change *during* the set
551
this.discardLocalState(key);
552
}
553
}
554
}
555
let errors = false;
556
const x: {
557
key: string;
558
mesg: T;
559
options?: {
560
headers?: Headers;
561
previousSeq?: number;
562
};
563
}[] = [];
564
for (const key in obj) {
565
const previousSeq = this.merge != null ? this.seq(key) : undefined;
566
if (previousSeq && this.invalidSeq.has(previousSeq)) {
567
continue;
568
}
569
status.unsaved += 1;
570
x.push({
571
key,
572
mesg: obj[key] as T,
573
options: {
574
...this.options[key],
575
previousSeq,
576
},
577
});
578
}
579
const results = await this.kv.setKvMany(x);
580
581
let i = 0;
582
for (const resp of results) {
583
const { key } = x[i];
584
i++;
585
if (this.kv == null) return;
586
if (!(resp as any).error) {
587
status.unsaved -= 1;
588
status.set += 1;
589
} else {
590
const { code, error } = resp as any;
591
if (DEBUG) {
592
console.log("kv store -- attemptToSave failed", this.desc, error, {
593
key,
594
value: obj[key],
595
code: code,
596
});
597
}
598
errors = true;
599
if (code == "reject") {
600
const value = this.local[key];
601
// can never save this.
602
this.discardLocalState(key);
603
status.unsaved -= 1;
604
this.emit("reject", { key, value });
605
}
606
if (code == "wrong-last-sequence") {
607
// This happens when another client has published a NEWER version of this key,
608
// so the right thing is to just ignore this. In a moment there will be no
609
// need to save anything, since we'll receive a message that overwrites this key.
610
// It's very important that the changefeed actually be working, of course, which
611
// is why the this.invalidSeq, so we never retry in this case, since it can't work.
612
if (x[i]?.options?.previousSeq) {
613
this.invalidSeq.add(x[i].options!.previousSeq!);
614
}
615
return;
616
}
617
if (code == 408) {
618
// timeout -- expected to happen periodically, of course
619
if (!process.env.COCALC_TEST_MODE) {
620
console.log("WARNING: timeout saving (will try again soon)");
621
}
622
return;
623
}
624
if (!process.env.COCALC_TEST_MODE) {
625
console.warn(
626
`WARNING: unexpected error saving dkv '${this.name}' -- ${error}`,
627
);
628
}
629
}
630
}
631
if (errors) {
632
this.saveErrors = true;
633
throw Error(`there were errors saving dkv '${this.name}'`);
634
// so it retries
635
} else {
636
if (
637
!process.env.COCALC_TEST_MODE &&
638
this.saveErrors &&
639
status.unsaved == 0
640
) {
641
this.saveErrors = false;
642
console.log(`SUCCESS: dkv ${this.name} fully saved`);
643
}
644
}
645
if (DEBUG) {
646
console.log("attemptToSaveMany: done", Date.now() - start);
647
}
648
649
return status;
650
});
651
652
attemptToSaveParallel = reuseInFlight(async () => {
653
let start = Date.now();
654
if (DEBUG) {
655
console.log("attemptToSaveParallel: start");
656
}
657
if (this.kv == null) {
658
throw Error("closed");
659
}
660
this.changed.clear();
661
const status = { unsaved: 0, set: 0, delete: 0 };
662
const obj = { ...this.local };
663
for (const key in obj) {
664
if (obj[key] === TOMBSTONE) {
665
status.unsaved += 1;
666
await this.kv.deleteKv(key);
667
if (this.kv == null) return;
668
status.delete += 1;
669
status.unsaved -= 1;
670
delete obj[key];
671
if (!this.changed.has(key)) {
672
// successfully saved this and user didn't make a change *during* the set
673
this.discardLocalState(key);
674
}
675
}
676
}
677
let errors = false;
678
const f = async (key: string) => {
679
if (this.kv == null) {
680
// closed
681
return;
682
}
683
const previousSeq = this.merge != null ? this.seq(key) : undefined;
684
try {
685
if (previousSeq && this.invalidSeq.has(previousSeq)) {
686
throw new ConatError("waiting on new sequence via changefeed", {
687
code: "wrong-last-sequence",
688
});
689
}
690
status.unsaved += 1;
691
await this.kv.setKv(key, obj[key] as T, {
692
...this.options[key],
693
previousSeq,
694
});
695
if (this.kv == null) return;
696
if (DEBUG) {
697
console.log("kv store -- attemptToSave succeed", this.desc, {
698
key,
699
value: obj[key],
700
});
701
}
702
status.unsaved -= 1;
703
status.set += 1;
704
// note that we CANNOT call this.discardLocalState(key) here, because
705
// this.get(key) needs to work immediately after save, but if this.local[key]
706
// is deleted, then this.get(key) would be undefined, because
707
// this.kv.getKv(key) only has value in it once the value is
708
// echoed back from the server.
709
} catch (err) {
710
if (DEBUG) {
711
console.log("kv store -- attemptToSave failed", this.desc, err, {
712
key,
713
value: obj[key],
714
code: err.code,
715
});
716
}
717
errors = true;
718
if (err.code == "reject") {
719
const value = this.local[key];
720
// can never save this.
721
this.discardLocalState(key);
722
status.unsaved -= 1;
723
this.emit("reject", { key, value });
724
}
725
if (err.code == "wrong-last-sequence") {
726
// This happens when another client has published a NEWER version of this key,
727
// so the right thing is to just ignore this. In a moment there will be no
728
// need to save anything, since we'll receive a message that overwrites this key.
729
// It's very important that the changefeed actually be working, of course, which
730
// is why the this.invalidSeq, so we never retry in this case, since it can't work.
731
if (previousSeq) {
732
this.invalidSeq.add(previousSeq);
733
}
734
return;
735
}
736
if (err.code == 408) {
737
// timeout -- expected to happen periodically, of course
738
if (!process.env.COCALC_TEST_MODE) {
739
console.log("WARNING: timeout saving (will try again soon)");
740
}
741
return;
742
}
743
if (!process.env.COCALC_TEST_MODE) {
744
console.warn(
745
`WARNING: unexpected error saving dkv '${this.name}' -- ${err}`,
746
);
747
}
748
}
749
};
750
await awaitMap(Object.keys(obj), MAX_PARALLEL, f);
751
if (errors) {
752
this.saveErrors = true;
753
throw Error(`there were errors saving dkv '${this.name}'`);
754
// so it retries
755
} else {
756
if (
757
!process.env.COCALC_TEST_MODE &&
758
this.saveErrors &&
759
status.unsaved == 0
760
) {
761
this.saveErrors = false;
762
console.log(`SUCCESS: dkv ${this.name} fully saved`);
763
}
764
}
765
if (DEBUG) {
766
console.log("attemptToSaveParallel: done", Date.now() - start);
767
}
768
769
return status;
770
});
771
772
stats = () => this.kv?.stats();
773
774
// get or set config
775
config = async (
776
config: Partial<Configuration> = {},
777
): Promise<Configuration> => {
778
if (this.kv == null) {
779
throw Error("not initialized");
780
}
781
return await this.kv.config(config);
782
};
783
784
private updateInventory = asyncThrottle(
785
async () => {
786
if (this.isClosed() || this.opts == null || this.opts.noInventory) {
787
return;
788
}
789
await delay(500);
790
if (this.isClosed() || this.kv == null) {
791
return;
792
}
793
let inv: Inventory | undefined = undefined;
794
try {
795
const { account_id, project_id, desc } = this.opts;
796
const inv = await inventory({
797
account_id,
798
project_id,
799
service: this.opts.service,
800
});
801
if (this.isClosed()) {
802
return;
803
}
804
const status = {
805
type: "kv" as "kv",
806
name: this.opts.name,
807
desc,
808
...(await this.kv.inventory()),
809
};
810
inv.set(status);
811
} catch (err) {
812
if (!process.env.COCALC_TEST_MODE) {
813
console.log(
814
`WARNING: unable to update inventory. name='${this.opts.name} -- ${err}'`,
815
);
816
}
817
} finally {
818
// @ts-ignore
819
inv?.close();
820
}
821
},
822
INVENTORY_UPDATE_INTERVAL,
823
{ leading: true, trailing: true },
824
);
825
}
826
827
export const cache = refCache<DKVOptions, DKV>({
828
name: "dkv",
829
createKey: ({ name, account_id, project_id, client }) =>
830
JSON.stringify({ name, account_id, project_id, id: client?.id }),
831
createObject: async (opts) => {
832
if (opts.client == null) {
833
opts = { ...opts, client: await conat() };
834
}
835
const k = new DKV(opts);
836
await k.init();
837
return k;
838
},
839
});
840
841
export async function dkv<T>(options: DKVOptions): Promise<DKV<T>> {
842
return await cache(options);
843
}
844
845