Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/dkv.ts
5870 views
1
/*
2
Eventually Consistent Distributed Key:Value Store
3
4
- You give one subject and general-dkv provides a synchronous eventually consistent
5
"multimaster" distributed way to work with the KV store of keys matching that subject,
6
inside of the named KV store.
7
8
- You may define a 3-way merge function, which is used to automatically resolve all
9
conflicting writes. The default is to use our local version, i.e., "last write
10
to remote wins". The function is run locally so can have access to any state.
11
12
- All set/get/delete operations are synchronous.
13
14
- The state gets sync'd in the backend to persistent storage on Conat as soon as possible,
15
and there is an async save function.
16
17
This class is based on top of the Consistent Centralized Key:Value Store defined in kv.ts.
18
You can use the same key:value store at the same time via both interfaces, and if the store
19
is a DKV, you can also access the underlying KV via "store.kv".
20
21
- You must explicitly call "await store.init()" to initialize this before using it.
22
23
- The store emits an event ('change', key) whenever anything changes.
24
25
- Calling "store.getAll()" provides ALL the data, and "store.get(key)" gets one value.
26
27
- Use "store.set(key,value)" or "store.set({key:value, key2:value2, ...})" to set data,
28
with the following semantics:
29
30
- in the background, changes propagate to Conat. You do not do anything explicitly and
31
this should never raise an exception.
32
33
- you can call "store.hasUnsavedChanges()" to see if there are any unsaved changes.
34
35
- call "store.unsavedChanges()" to see the unsaved keys.
36
37
- The 3-way merge function takes as input {local,remote,prev,key}, where
38
- key = the key where there's a conflict
39
- local = your version of the value
40
- remote = the remote value, which conflicts in that isEqual(local,remote) is false.
41
- prev = a known common prev of local and remote.
42
43
(any of local, remote or prev can be undefined, e.g., no previous value or a key was deleted)
44
45
You can do anything synchronously you want to resolve such conflicts, i.e., there are no
46
axioms that have to be satisifed. If the 3-way merge function throws an exception (or is
47
not specified) we silently fall back to "last write wins".
48
49
50
DEVELOPMENT:
51
52
~/cocalc/src/packages/backend$ node
53
54
s = await require("@cocalc/backend/conat/sync").dkv({name:'test', merge:({local,remote})=>{return {...remote,...local}}});
55
56
57
In the browser console:
58
59
> s = await cc.client.conat_client.dkv({filter:['foo.>'],merge:({local,remote})=>{return {...remote,...local}}})
60
61
# NOTE that the name is account-{account_id} or project-{project_id},
62
# and if not given defaults to the account-{user's account id}
63
> s.kv.name
64
'account-6aae57c6-08f1-4bb5-848b-3ceb53e61ede'
65
66
> s.on('change',(key)=>console.log(key));0;
67
68
*/
69
70
import { EventEmitter } from "events";
71
import {
72
CoreStream,
73
type Configuration,
74
type ChangeEvent,
75
} from "./core-stream";
76
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
77
import { isEqual } from "lodash";
78
import { delay, map as awaitMap } from "awaiting";
79
import {
80
type Client,
81
ConatError,
82
type Headers,
83
} from "@cocalc/conat/core/client";
84
import refCache from "@cocalc/util/refcache";
85
import { type JSONValue } from "@cocalc/util/types";
86
import { conat } from "@cocalc/conat/client";
87
import { asyncThrottle, until } from "@cocalc/util/async-utils";
88
import {
89
inventory,
90
type Inventory,
91
INVENTORY_UPDATE_INTERVAL,
92
} from "./inventory";
93
94
export const TOMBSTONE = Symbol("tombstone");
95
const MAX_PARALLEL = 250;
96
97
const DEBUG = false;
98
99
export type MergeFunction = (opts: {
100
key: string;
101
prev: any;
102
local: any;
103
remote: any;
104
}) => any;
105
106
interface SetOptions {
107
headers?: Headers;
108
}
109
110
export interface DKVOptions {
111
name: string;
112
account_id?: string;
113
project_id?: string;
114
desc?: JSONValue;
115
client?: Client;
116
// 3-way merge conflict resolution
117
merge?: (opts: { key: string; prev?: any; local?: any; remote?: any }) => any;
118
config?: Partial<Configuration>;
119
120
// if noAutosave is set, local changes are never saved until you explicitly
121
// call "await this.save()", which will try once to save. Changes made during
122
// the save may not be saved though.
123
// CAUTION: noAutosave is really only meant for unit testing! The save is
124
// reuseInFlighted so a safe somewhere far away could be in progress starting
125
// before your call to save, and when it finishes that's it, so what you just
126
// did is not saved. Take care.
127
noAutosave?: boolean;
128
129
ephemeral?: boolean;
130
sync?: boolean;
131
132
noCache?: boolean;
133
noInventory?: boolean;
134
service?: string;
135
}
136
137
export class DKV<T = any> extends EventEmitter {
138
private kv?: CoreStream<T>;
139
private merge?: MergeFunction;
140
private local: { [key: string]: T | typeof TOMBSTONE } = {};
141
private options: { [key: string]: SetOptions } = {};
142
private saved: { [key: string]: T | typeof TOMBSTONE } = {};
143
private remote: { [key: string]: T } = {};
144
private changed: Set<string> = new Set();
145
private noAutosave: boolean;
146
public readonly name: string;
147
public readonly desc?: JSONValue;
148
private saveErrors: boolean = false;
149
private invalidSeq = new Set<number>();
150
private opts: DKVOptions;
151
152
constructor(opts: DKVOptions) {
153
super();
154
if (opts.client == null) {
155
throw Error("client must be specified");
156
}
157
this.opts = opts;
158
const {
159
name,
160
project_id,
161
account_id,
162
desc,
163
client,
164
merge,
165
config,
166
noAutosave,
167
ephemeral,
168
sync,
169
service,
170
} = opts;
171
this.name = name;
172
this.desc = desc;
173
this.merge = merge;
174
this.noAutosave = !!noAutosave;
175
this.kv = new CoreStream({
176
name,
177
project_id,
178
account_id,
179
client,
180
config,
181
ephemeral,
182
sync,
183
service,
184
});
185
186
return new Proxy(this, {
187
deleteProperty(target, prop) {
188
if (typeof prop == "string") {
189
target.delete(prop);
190
}
191
return true;
192
},
193
set(target, prop, value) {
194
prop = String(prop);
195
if (prop == "_eventsCount" || prop == "_events" || prop == "close") {
196
target[prop] = value;
197
return true;
198
}
199
if (target[prop] != null) {
200
throw Error(`method name '${prop}' is read only`);
201
}
202
target.set(prop, value);
203
return true;
204
},
205
get(target, prop) {
206
return target[String(prop)] ?? target.get(String(prop));
207
},
208
});
209
}
210
211
private initialized = false;
212
init = async () => {
213
if (this.initialized) {
214
throw Error("init can only be called once");
215
}
216
this.initialized = true;
217
if (this.kv == null) {
218
throw Error("closed");
219
}
220
this.kv.on("change", this.handleRemoteChange);
221
await this.kv.init();
222
// allow_msg_ttl is used for deleting tombstones.
223
await this.kv.config({ allow_msg_ttl: true });
224
this.emit("connected");
225
};
226
227
isClosed = () => {
228
return this.kv == null;
229
};
230
231
close = () => {
232
if (this.isClosed()) {
233
return;
234
}
235
const kv = this.kv;
236
delete this.kv;
237
if (kv != null) {
238
kv.removeListener("change", this.handleRemoteChange);
239
kv.close();
240
}
241
this.emit("closed");
242
this.removeAllListeners();
243
// @ts-ignore
244
delete this.local;
245
// @ts-ignore
246
delete this.options;
247
// @ts-ignore
248
delete this.changed;
249
delete this.merge;
250
// @ts-ignore
251
delete this.opts;
252
};
253
254
private discardLocalState = (key: string) => {
255
delete this.local[key];
256
delete this.options[key];
257
delete this.saved[key];
258
if (this.isStable()) {
259
this.emit("stable");
260
}
261
};
262
263
// stable = everything is saved *and* also echoed back from the server as confirmation.
264
isStable = () => {
265
for (const _ in this.local) {
266
return false;
267
}
268
return true;
269
};
270
271
private handleRemoteChange = ({
272
mesg: remote,
273
key,
274
prev,
275
}: ChangeEvent<T>) => {
276
if (key === undefined) {
277
// not part of kv store data
278
return;
279
}
280
const local = this.local[key] === TOMBSTONE ? undefined : this.local[key];
281
const prevResolved = prev ?? this.remote[key];
282
let value: any = remote;
283
if (local !== undefined) {
284
// we have an unsaved local value, so let's check to see if there is a
285
// conflict or not.
286
if (isEqual(local, remote)) {
287
// incoming remote value is equal to unsaved local value, so we can
288
// just discard our local value (no need to save it).
289
this.discardLocalState(key);
290
} else {
291
// There is a conflict. Let's resolve the conflict:
292
// console.log("merge conflict", { key, remote, local, prev });
293
try {
294
value =
295
this.merge?.({ key, local, remote, prev: prevResolved }) ?? local;
296
// console.log("merge conflict --> ", value);
297
// console.log("handle merge conflict", {
298
// key,
299
// local,
300
// remote,
301
// prev,
302
// value,
303
// });
304
} catch (err) {
305
console.warn("exception in merge conflict resolution", err);
306
// user provided a merge function that throws an exception. We select local, since
307
// it is the newest, i.e., "last write wins"
308
value = local;
309
// console.log("merge conflict ERROR --> ", err, value);
310
}
311
if (isEqual(value, remote)) {
312
// no change, so forget our local value
313
this.discardLocalState(key);
314
} else {
315
// resolve with the new value, or if it is undefined, a TOMBSTONE,
316
// meaning choice is to delete.
317
// console.log("conflict resolution: ", { key, value });
318
if (value === TOMBSTONE) {
319
this.delete(key);
320
} else {
321
this.set(key, value);
322
}
323
}
324
}
325
}
326
if (remote === undefined) {
327
delete this.remote[key];
328
} else {
329
this.remote[key] = remote;
330
}
331
this.emit("change", { key, value, prev: prevResolved });
332
};
333
334
get(key: string): T | undefined;
335
get(): { [key: string]: T };
336
get(key?: string): T | { [key: string]: T } | undefined {
337
if (this.kv == null) {
338
throw Error("closed");
339
}
340
if (key === undefined) {
341
return this.getAll();
342
}
343
const local = this.local[key];
344
if (local === TOMBSTONE) {
345
return undefined;
346
}
347
if (local !== undefined) {
348
return local;
349
}
350
return this.kv.getKv(key);
351
}
352
353
get length(): number {
354
// not efficient
355
return Object.keys(this.getAll()).length;
356
}
357
358
getAll = (): { [key: string]: T } => {
359
if (this.kv == null) {
360
throw Error("closed");
361
}
362
const x = { ...this.kv.getAllKv(), ...this.local };
363
for (const key in this.local) {
364
if (this.local[key] === TOMBSTONE) {
365
delete x[key];
366
}
367
}
368
return x as { [key: string]: T };
369
};
370
371
// gets all the keys; fast because doesn't decode messages
372
keys = (): string[] => {
373
if (this.kv == null) {
374
return [];
375
}
376
// this is fast
377
const keys = this.kv.keysKv();
378
379
// have to add any unsaved keys in this.local
380
let X: Set<string> | null = null;
381
for (const key in this.local) {
382
if (X === null) {
383
X = new Set(keys);
384
}
385
if (!X.has(key)) {
386
keys.push(key);
387
}
388
}
389
return keys;
390
};
391
392
has = (key: string): boolean => {
393
if (this.kv == null) {
394
throw Error("closed");
395
}
396
const a = this.local[key];
397
if (a === TOMBSTONE) {
398
return false;
399
}
400
if (a !== undefined) {
401
return true;
402
}
403
return this.kv.hasKv(key);
404
};
405
406
time = (key?: string): { [key: string]: Date } | Date | undefined => {
407
if (this.kv == null) {
408
throw Error("closed");
409
}
410
return this.kv.timeKv(key);
411
};
412
413
seq = (key: string): number | undefined => {
414
if (this.kv == null) {
415
throw Error("closed");
416
}
417
return this.kv.seqKv(key);
418
};
419
420
private _delete = (key) => {
421
this.local[key] = TOMBSTONE;
422
this.changed.add(key);
423
};
424
425
delete = (key) => {
426
this._delete(key);
427
if (!this.noAutosave) {
428
this.save();
429
}
430
};
431
432
clear = () => {
433
if (this.kv == null) {
434
throw Error("closed");
435
}
436
for (const key in this.kv.getAllKv()) {
437
this._delete(key);
438
}
439
for (const key in this.local) {
440
this._delete(key);
441
}
442
if (!this.noAutosave) {
443
this.save();
444
}
445
};
446
447
private toValue = (obj) => {
448
if (obj === undefined) {
449
return TOMBSTONE;
450
}
451
return obj;
452
};
453
454
headers = (key: string): Headers | undefined => {
455
if (this.options[key] != null) {
456
return this.options[key]?.headers;
457
} else {
458
return this.kv?.headersKv(key);
459
}
460
};
461
462
debugStats = () => {
463
const kv = this.kv;
464
if (kv == null) {
465
return { name: this.name, state: "closed" as const };
466
}
467
const kvStats = kv.debugStats?.();
468
return {
469
name: this.name,
470
state: "open" as const,
471
rawLength: kvStats?.rawLength ?? kv.raw.length,
472
messagesLength: kvStats?.messagesLength ?? kv.messages.length,
473
kvLength: kvStats?.kvLength ?? kv.lengthKv,
474
lastValueByKeySize: kvStats?.lastValueByKeySize,
475
localKeys: Object.keys(this.local).length,
476
changedKeys: this.changed.size,
477
};
478
};
479
480
set = (key: string, value: T, options?: SetOptions) => {
481
const obj = this.toValue(value);
482
this.local[key] = obj;
483
if (options != null) {
484
this.options[key] = options;
485
}
486
this.changed.add(key);
487
if (!this.noAutosave) {
488
this.save();
489
}
490
this.updateInventory();
491
};
492
493
setMany = (obj) => {
494
for (const key in obj) {
495
this.local[key] = this.toValue(obj[key]);
496
this.changed.add(key);
497
}
498
if (!this.noAutosave) {
499
this.save();
500
}
501
this.updateInventory();
502
};
503
504
hasUnsavedChanges = () => {
505
if (this.kv == null) {
506
return false;
507
}
508
return this.unsavedChanges().length > 0;
509
};
510
511
unsavedChanges = (): string[] => {
512
return Object.keys(this.local).filter(
513
(key) => this.local[key] !== this.saved[key],
514
);
515
};
516
517
save = reuseInFlight(async () => {
518
if (this.noAutosave) {
519
return await this.attemptToSave();
520
}
521
let status;
522
523
await until(
524
async () => {
525
if (this.kv == null) {
526
return true;
527
}
528
try {
529
status = await this.attemptToSave();
530
//console.log("successfully saved");
531
} catch (err) {
532
if (false && !process.env.COCALC_TEST_MODE) {
533
console.log(
534
"WARNING: dkv attemptToSave failed -- ",
535
this.name,
536
this.kv?.name,
537
err,
538
);
539
}
540
}
541
return !this.hasUnsavedChanges();
542
},
543
{ start: 150, decay: 1.3, max: 10000 },
544
);
545
return status;
546
});
547
548
private attemptToSave = async () => {
549
if (true) {
550
await this.attemptToSaveMany();
551
} else {
552
await this.attemptToSaveParallel();
553
}
554
};
555
556
private attemptToSaveMany = reuseInFlight(async () => {
557
let start = Date.now();
558
if (DEBUG) {
559
console.log("attemptToSaveMany: start");
560
}
561
if (this.kv == null) {
562
throw Error("closed");
563
}
564
this.changed.clear();
565
const status = { unsaved: 0, set: 0, delete: 0 };
566
const obj = { ...this.local };
567
for (const key in obj) {
568
if (obj[key] === TOMBSTONE) {
569
status.unsaved += 1;
570
await this.kv.deleteKv(key);
571
if (this.kv == null) return;
572
status.delete += 1;
573
status.unsaved -= 1;
574
delete obj[key];
575
if (!this.changed.has(key)) {
576
// successfully saved this and user didn't make a change *during* the set
577
this.discardLocalState(key);
578
}
579
}
580
}
581
let errors = false;
582
const x: {
583
key: string;
584
mesg: T;
585
options?: {
586
headers?: Headers;
587
previousSeq?: number;
588
};
589
}[] = [];
590
for (const key in obj) {
591
const previousSeq = this.merge != null ? this.seq(key) : undefined;
592
if (previousSeq && this.invalidSeq.has(previousSeq)) {
593
continue;
594
}
595
status.unsaved += 1;
596
x.push({
597
key,
598
mesg: obj[key] as T,
599
options: {
600
...this.options[key],
601
previousSeq,
602
},
603
});
604
}
605
const results = await this.kv.setKvMany(x);
606
607
let i = 0;
608
for (const resp of results) {
609
const { key } = x[i];
610
i++;
611
if (this.kv == null) return;
612
if (!(resp as any).error) {
613
status.unsaved -= 1;
614
status.set += 1;
615
} else {
616
const { code, error } = resp as any;
617
if (DEBUG) {
618
console.log("kv store -- attemptToSave failed", this.desc, error, {
619
key,
620
value: obj[key],
621
code: code,
622
});
623
}
624
errors = true;
625
if (code == "reject") {
626
const value = this.local[key];
627
// can never save this.
628
this.discardLocalState(key);
629
status.unsaved -= 1;
630
this.emit("reject", { key, value });
631
}
632
if (code == "wrong-last-sequence") {
633
// This happens when another client has published a NEWER version of this key,
634
// so the right thing is to just ignore this. In a moment there will be no
635
// need to save anything, since we'll receive a message that overwrites this key.
636
// It's very important that the changefeed actually be working, of course, which
637
// is why the this.invalidSeq, so we never retry in this case, since it can't work.
638
if (x[i]?.options?.previousSeq) {
639
this.invalidSeq.add(x[i].options!.previousSeq!);
640
}
641
return;
642
}
643
if (code == 408) {
644
// timeout -- expected to happen periodically, of course
645
if (!process.env.COCALC_TEST_MODE) {
646
console.log("WARNING: timeout saving (will try again soon)");
647
}
648
return;
649
}
650
if (!process.env.COCALC_TEST_MODE) {
651
console.warn(
652
`WARNING: unexpected error saving dkv '${this.name}' -- ${error}`,
653
);
654
}
655
}
656
}
657
if (errors) {
658
this.saveErrors = true;
659
throw Error(`there were errors saving dkv '${this.name}'`);
660
// so it retries
661
} else {
662
if (
663
!process.env.COCALC_TEST_MODE &&
664
this.saveErrors &&
665
status.unsaved == 0
666
) {
667
this.saveErrors = false;
668
console.log(`SUCCESS: dkv ${this.name} fully saved`);
669
}
670
}
671
if (DEBUG) {
672
console.log("attemptToSaveMany: done", Date.now() - start);
673
}
674
675
return status;
676
});
677
678
attemptToSaveParallel = reuseInFlight(async () => {
679
let start = Date.now();
680
if (DEBUG) {
681
console.log("attemptToSaveParallel: start");
682
}
683
if (this.kv == null) {
684
throw Error("closed");
685
}
686
this.changed.clear();
687
const status = { unsaved: 0, set: 0, delete: 0 };
688
const obj = { ...this.local };
689
for (const key in obj) {
690
if (obj[key] === TOMBSTONE) {
691
status.unsaved += 1;
692
await this.kv.deleteKv(key);
693
if (this.kv == null) return;
694
status.delete += 1;
695
status.unsaved -= 1;
696
delete obj[key];
697
if (!this.changed.has(key)) {
698
// successfully saved this and user didn't make a change *during* the set
699
this.discardLocalState(key);
700
}
701
}
702
}
703
let errors = false;
704
const f = async (key: string) => {
705
if (this.kv == null) {
706
// closed
707
return;
708
}
709
const previousSeq = this.merge != null ? this.seq(key) : undefined;
710
try {
711
if (previousSeq && this.invalidSeq.has(previousSeq)) {
712
throw new ConatError("waiting on new sequence via changefeed", {
713
code: "wrong-last-sequence",
714
});
715
}
716
status.unsaved += 1;
717
await this.kv.setKv(key, obj[key] as T, {
718
...this.options[key],
719
previousSeq,
720
});
721
if (this.kv == null) return;
722
if (DEBUG) {
723
console.log("kv store -- attemptToSave succeed", this.desc, {
724
key,
725
value: obj[key],
726
});
727
}
728
status.unsaved -= 1;
729
status.set += 1;
730
// note that we CANNOT call this.discardLocalState(key) here, because
731
// this.get(key) needs to work immediately after save, but if this.local[key]
732
// is deleted, then this.get(key) would be undefined, because
733
// this.kv.getKv(key) only has value in it once the value is
734
// echoed back from the server.
735
} catch (err) {
736
if (DEBUG) {
737
console.log("kv store -- attemptToSave failed", this.desc, err, {
738
key,
739
value: obj[key],
740
code: err.code,
741
});
742
}
743
errors = true;
744
if (err.code == "reject") {
745
const value = this.local[key];
746
// can never save this.
747
this.discardLocalState(key);
748
status.unsaved -= 1;
749
this.emit("reject", { key, value });
750
}
751
if (err.code == "wrong-last-sequence") {
752
// This happens when another client has published a NEWER version of this key,
753
// so the right thing is to just ignore this. In a moment there will be no
754
// need to save anything, since we'll receive a message that overwrites this key.
755
// It's very important that the changefeed actually be working, of course, which
756
// is why the this.invalidSeq, so we never retry in this case, since it can't work.
757
if (previousSeq) {
758
this.invalidSeq.add(previousSeq);
759
}
760
return;
761
}
762
if (err.code == 408) {
763
// timeout -- expected to happen periodically, of course
764
if (!process.env.COCALC_TEST_MODE) {
765
console.log("WARNING: timeout saving (will try again soon)");
766
}
767
return;
768
}
769
if (!process.env.COCALC_TEST_MODE) {
770
console.warn(
771
`WARNING: unexpected error saving dkv '${this.name}' -- ${err}`,
772
);
773
}
774
}
775
};
776
await awaitMap(Object.keys(obj), MAX_PARALLEL, f);
777
if (errors) {
778
this.saveErrors = true;
779
throw Error(`there were errors saving dkv '${this.name}'`);
780
// so it retries
781
} else {
782
if (
783
!process.env.COCALC_TEST_MODE &&
784
this.saveErrors &&
785
status.unsaved == 0
786
) {
787
this.saveErrors = false;
788
console.log(`SUCCESS: dkv ${this.name} fully saved`);
789
}
790
}
791
if (DEBUG) {
792
console.log("attemptToSaveParallel: done", Date.now() - start);
793
}
794
795
return status;
796
});
797
798
stats = () => this.kv?.stats();
799
800
// get or set config
801
config = async (
802
config: Partial<Configuration> = {},
803
): Promise<Configuration> => {
804
if (this.kv == null) {
805
throw Error("not initialized");
806
}
807
return await this.kv.config(config);
808
};
809
810
private updateInventory = asyncThrottle(
811
async () => {
812
if (this.isClosed() || this.opts == null || this.opts.noInventory) {
813
return;
814
}
815
await delay(500);
816
if (this.isClosed() || this.kv == null) {
817
return;
818
}
819
let inv: Inventory | undefined = undefined;
820
try {
821
const { account_id, project_id, desc } = this.opts;
822
const inv = await inventory({
823
account_id,
824
project_id,
825
service: this.opts.service,
826
});
827
if (this.isClosed()) {
828
return;
829
}
830
const status = {
831
type: "kv" as "kv",
832
name: this.opts.name,
833
desc,
834
...(await this.kv.inventory()),
835
};
836
inv.set(status);
837
} catch (err) {
838
if (!process.env.COCALC_TEST_MODE) {
839
console.log(
840
`WARNING: unable to update inventory. name='${this.opts.name} -- ${err}'`,
841
);
842
}
843
} finally {
844
// @ts-ignore
845
inv?.close();
846
}
847
},
848
INVENTORY_UPDATE_INTERVAL,
849
{ leading: true, trailing: true },
850
);
851
}
852
853
export const cache = refCache<DKVOptions, DKV>({
854
name: "dkv",
855
createKey: ({ name, account_id, project_id, client }) =>
856
JSON.stringify({ name, account_id, project_id, id: client?.id }),
857
createObject: async (opts) => {
858
if (opts.client == null) {
859
opts = { ...opts, client: await conat() };
860
}
861
const k = new DKV(opts);
862
await k.init();
863
return k;
864
},
865
});
866
867
export async function dkv<T>(options: DKVOptions): Promise<DKV<T>> {
868
return await cache(options);
869
}
870
871