Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/inventory.ts
1710 views
1
/*
2
Inventory of all streams and key:value stores in a specific project or account.
3
4
DEVELOPMENT:
5
6
i = await require('@cocalc/backend/conat/sync').inventory({project_id:'00847397-d6a8-4cb0-96a8-6ef64ac3e6cf'})
7
8
i.ls()
9
10
*/
11
12
import { dkv, type DKV } from "./dkv";
13
import { dstream, type DStream } from "./dstream";
14
import { dko, type DKO } from "./dko";
15
import getTime from "@cocalc/conat/time";
16
import refCache from "@cocalc/util/refcache";
17
import type { JSONValue } from "@cocalc/util/types";
18
import {
19
human_readable_size as humanReadableSize,
20
trunc_middle,
21
} from "@cocalc/util/misc";
22
import { DKO_PREFIX } from "./dko";
23
import { waitUntilTimeAvailable } from "@cocalc/conat/time";
24
import {
25
type Configuration,
26
type PartialInventory,
27
} from "@cocalc/conat/persist/storage";
28
import { AsciiTable3 } from "ascii-table3";
29
30
export const INVENTORY_UPDATE_INTERVAL = 90000;
31
export const INVENTORY_NAME = "CoCalc-Inventory";
32
33
type Sort =
34
| "last"
35
| "created"
36
| "count"
37
| "bytes"
38
| "name"
39
| "type"
40
| "-last"
41
| "-created"
42
| "-count"
43
| "-bytes"
44
| "-name"
45
| "-type";
46
47
interface Options {
48
account_id?: string;
49
project_id?: string;
50
service?: string;
51
}
52
53
type StoreType = "stream" | "kv";
54
55
export interface InventoryItem extends PartialInventory {
56
// when it was created
57
created: number;
58
// last time this stream was updated
59
last: number;
60
// optional description, which can be anything
61
desc?: JSONValue;
62
}
63
64
interface FullItem extends InventoryItem {
65
type: StoreType;
66
name: string;
67
}
68
69
export class Inventory {
70
public options: Options;
71
private dkv?: DKV<InventoryItem>;
72
73
constructor(options: {
74
account_id?: string;
75
project_id?: string;
76
service?: string;
77
}) {
78
this.options = options;
79
}
80
81
init = async () => {
82
this.dkv = await dkv({
83
name: INVENTORY_NAME,
84
...this.options,
85
});
86
await waitUntilTimeAvailable();
87
};
88
89
// Set but with NO LIMITS and no MERGE conflict algorithm. Use with care!
90
set = ({
91
type,
92
name,
93
bytes,
94
count,
95
desc,
96
limits,
97
seq,
98
}: {
99
type: StoreType;
100
name: string;
101
bytes: number;
102
count: number;
103
limits: Partial<Configuration>;
104
desc?: JSONValue;
105
seq: number;
106
}) => {
107
if (this.dkv == null) {
108
throw Error("not initialized");
109
}
110
const last = getTime();
111
const key = this.encodeKey({ name, type });
112
const cur = this.dkv.get(key);
113
const created = cur?.created ?? last;
114
desc = desc ?? cur?.desc;
115
this.dkv.set(key, {
116
desc,
117
last,
118
created,
119
bytes,
120
count,
121
limits,
122
seq,
123
});
124
};
125
126
private encodeKey = ({ name, type }) => JSON.stringify({ name, type });
127
128
private decodeKey = (key) => JSON.parse(key);
129
130
delete = ({ name, type }: { name: string; type: StoreType }) => {
131
if (this.dkv == null) {
132
throw Error("not initialized");
133
}
134
this.dkv.delete(this.encodeKey({ name, type }));
135
};
136
137
get = (
138
x: { name: string; type: StoreType } | string,
139
): (InventoryItem & { type: StoreType; name: string }) | undefined => {
140
if (this.dkv == null) {
141
throw Error("not initialized");
142
}
143
let cur;
144
let name, type;
145
if (typeof x == "string") {
146
// just the name -- we infer/guess the type
147
name = x;
148
type = "kv";
149
cur = this.dkv.get(this.encodeKey({ name, type }));
150
if (cur == null) {
151
type = "stream";
152
cur = this.dkv.get(this.encodeKey({ name, type }));
153
}
154
} else {
155
name = x.name;
156
cur = this.dkv.get(this.encodeKey(x));
157
}
158
if (cur == null) {
159
return;
160
}
161
return { ...cur, type, name };
162
};
163
164
getStores = async ({
165
filter,
166
sort = "-last",
167
}: { filter?: string; sort?: Sort } = {}): Promise<
168
(DKV | DStream | DKO)[]
169
> => {
170
const v: (DKV | DStream | DKO)[] = [];
171
const all = this.getAll({ filter });
172
for (const key of this.sortedKeys(all, sort)) {
173
const x = all[key];
174
const { desc, name, type } = x;
175
if (type == "kv") {
176
if (name.startsWith(DKO_PREFIX)) {
177
v.push(await dko<any>({ name, ...this.options, desc }));
178
} else {
179
v.push(await dkv({ name, ...this.options, desc }));
180
}
181
} else if (type == "stream") {
182
v.push(await dstream({ name, ...this.options, desc }));
183
} else {
184
throw Error(`unknown store type '${type}'`);
185
}
186
}
187
return v;
188
};
189
190
getAll = ({ filter }: { filter?: string } = {}): FullItem[] => {
191
if (this.dkv == null) {
192
throw Error("not initialized");
193
}
194
const all = this.dkv.getAll();
195
if (filter) {
196
filter = filter.toLowerCase();
197
}
198
const v: FullItem[] = [];
199
for (const key of Object.keys(all)) {
200
const { name, type } = this.decodeKey(key);
201
if (filter) {
202
const { desc } = all[key];
203
const s = `${desc ? JSON.stringify(desc) : ""} ${name}`.toLowerCase();
204
if (!s.includes(filter)) {
205
continue;
206
}
207
}
208
v.push({ ...all[key], name, type });
209
}
210
return v;
211
};
212
213
close = async () => {
214
await this.dkv?.close();
215
delete this.dkv;
216
};
217
218
private sortedKeys = (all, sort0: Sort) => {
219
let reverse: boolean, sort: string;
220
if (sort0[0] == "-") {
221
reverse = true;
222
sort = sort0.slice(1);
223
} else {
224
reverse = false;
225
sort = sort0;
226
}
227
// return keys of all, sorted as specified
228
const x: { k: string; v: any }[] = [];
229
for (const k in all) {
230
x.push({ k, v: { ...all[k], ...this.decodeKey(k) } });
231
}
232
x.sort((a, b) => {
233
const a0 = a.v[sort];
234
const b0 = b.v[sort];
235
if (a0 < b0) {
236
return -1;
237
}
238
if (a0 > b0) {
239
return 1;
240
}
241
return 0;
242
});
243
const y = x.map(({ k }) => k);
244
if (reverse) {
245
y.reverse();
246
}
247
return y;
248
};
249
250
ls = ({
251
log = console.log,
252
filter,
253
noTrunc,
254
path: path0,
255
sort = "last",
256
noHelp,
257
}: {
258
log?: Function;
259
filter?: string;
260
noTrunc?: boolean;
261
path?: string;
262
sort?: Sort;
263
noHelp?: boolean;
264
} = {}) => {
265
if (this.dkv == null) {
266
throw Error("not initialized");
267
}
268
const all = this.dkv.getAll();
269
if (!noHelp) {
270
log(
271
"ls(opts: {filter?: string; noTrunc?: boolean; path?: string; sort?: 'last'|'created'|'count'|'bytes'|'name'|'type'|'-last'|...})",
272
);
273
}
274
275
const rows: any[] = [];
276
for (const key of this.sortedKeys(all, sort)) {
277
const { last, created, count, bytes, desc, limits } = all[key];
278
if (path0 && desc?.["path"] != path0) {
279
continue;
280
}
281
let { name, type } = this.decodeKey(key);
282
if (name.startsWith(DKO_PREFIX)) {
283
type = "kvobject";
284
name = name.slice(DKO_PREFIX.length);
285
}
286
if (!noTrunc) {
287
name = trunc_middle(name, 50);
288
}
289
if (
290
filter &&
291
!`${desc ? JSON.stringify(desc) : ""} ${name}`
292
.toLowerCase()
293
.includes(filter.toLowerCase())
294
) {
295
continue;
296
}
297
rows.push([
298
type,
299
name,
300
dateToString(new Date(created)),
301
humanReadableSize(bytes),
302
count,
303
dateToString(new Date(last)),
304
desc ? JSON.stringify(desc) : "",
305
limits != null && Object.keys(limits).length > 0
306
? JSON.stringify(limits)
307
: "--",
308
]);
309
}
310
311
const table = new AsciiTable3(
312
`Inventory for ${JSON.stringify(this.options)}`,
313
)
314
.setHeading(
315
"Type",
316
"Name",
317
"Created",
318
"Size",
319
"Count",
320
"Last Update",
321
"Desc",
322
"Limits",
323
)
324
.addRowMatrix(rows);
325
table.setStyle("unicode-round");
326
if (!noTrunc) {
327
table.setWidth(7, 50).setWrapped(1);
328
table.setWidth(8, 30).setWrapped(1);
329
}
330
log(table.toString());
331
};
332
}
333
334
function dateToString(d: Date) {
335
return d.toISOString().replace("T", " ").replace("Z", "").split(".")[0];
336
}
337
338
export const cache = refCache<Options & { noCache?: boolean }, Inventory>({
339
name: "inventory",
340
createObject: async (loc) => {
341
const k = new Inventory(loc);
342
await k.init();
343
return k;
344
},
345
});
346
347
export async function inventory(options: Options = {}): Promise<Inventory> {
348
return await cache(options);
349
}
350
351