Path: blob/master/src/packages/conat/sync/inventory.ts
1710 views
/*1Inventory of all streams and key:value stores in a specific project or account.23DEVELOPMENT:45i = await require('@cocalc/backend/conat/sync').inventory({project_id:'00847397-d6a8-4cb0-96a8-6ef64ac3e6cf'})67i.ls()89*/1011import { dkv, type DKV } from "./dkv";12import { dstream, type DStream } from "./dstream";13import { dko, type DKO } from "./dko";14import getTime from "@cocalc/conat/time";15import refCache from "@cocalc/util/refcache";16import type { JSONValue } from "@cocalc/util/types";17import {18human_readable_size as humanReadableSize,19trunc_middle,20} from "@cocalc/util/misc";21import { DKO_PREFIX } from "./dko";22import { waitUntilTimeAvailable } from "@cocalc/conat/time";23import {24type Configuration,25type PartialInventory,26} from "@cocalc/conat/persist/storage";27import { AsciiTable3 } from "ascii-table3";2829export const INVENTORY_UPDATE_INTERVAL = 90000;30export const INVENTORY_NAME = "CoCalc-Inventory";3132type Sort =33| "last"34| "created"35| "count"36| "bytes"37| "name"38| "type"39| "-last"40| "-created"41| "-count"42| "-bytes"43| "-name"44| "-type";4546interface Options {47account_id?: string;48project_id?: string;49service?: string;50}5152type StoreType = "stream" | "kv";5354export interface InventoryItem extends PartialInventory {55// when it was created56created: number;57// last time this stream was updated58last: number;59// optional description, which can be anything60desc?: JSONValue;61}6263interface FullItem extends InventoryItem {64type: StoreType;65name: string;66}6768export class Inventory {69public options: Options;70private dkv?: DKV<InventoryItem>;7172constructor(options: {73account_id?: string;74project_id?: string;75service?: string;76}) {77this.options = options;78}7980init = async () => {81this.dkv = await dkv({82name: INVENTORY_NAME,83...this.options,84});85await waitUntilTimeAvailable();86};8788// Set but with NO LIMITS and no MERGE conflict algorithm. Use with care!89set = ({90type,91name,92bytes,93count,94desc,95limits,96seq,97}: {98type: StoreType;99name: string;100bytes: number;101count: number;102limits: Partial<Configuration>;103desc?: JSONValue;104seq: number;105}) => {106if (this.dkv == null) {107throw Error("not initialized");108}109const last = getTime();110const key = this.encodeKey({ name, type });111const cur = this.dkv.get(key);112const created = cur?.created ?? last;113desc = desc ?? cur?.desc;114this.dkv.set(key, {115desc,116last,117created,118bytes,119count,120limits,121seq,122});123};124125private encodeKey = ({ name, type }) => JSON.stringify({ name, type });126127private decodeKey = (key) => JSON.parse(key);128129delete = ({ name, type }: { name: string; type: StoreType }) => {130if (this.dkv == null) {131throw Error("not initialized");132}133this.dkv.delete(this.encodeKey({ name, type }));134};135136get = (137x: { name: string; type: StoreType } | string,138): (InventoryItem & { type: StoreType; name: string }) | undefined => {139if (this.dkv == null) {140throw Error("not initialized");141}142let cur;143let name, type;144if (typeof x == "string") {145// just the name -- we infer/guess the type146name = x;147type = "kv";148cur = this.dkv.get(this.encodeKey({ name, type }));149if (cur == null) {150type = "stream";151cur = this.dkv.get(this.encodeKey({ name, type }));152}153} else {154name = x.name;155cur = this.dkv.get(this.encodeKey(x));156}157if (cur == null) {158return;159}160return { ...cur, type, name };161};162163getStores = async ({164filter,165sort = "-last",166}: { filter?: string; sort?: Sort } = {}): Promise<167(DKV | DStream | DKO)[]168> => {169const v: (DKV | DStream | DKO)[] = [];170const all = this.getAll({ filter });171for (const key of this.sortedKeys(all, sort)) {172const x = all[key];173const { desc, name, type } = x;174if (type == "kv") {175if (name.startsWith(DKO_PREFIX)) {176v.push(await dko<any>({ name, ...this.options, desc }));177} else {178v.push(await dkv({ name, ...this.options, desc }));179}180} else if (type == "stream") {181v.push(await dstream({ name, ...this.options, desc }));182} else {183throw Error(`unknown store type '${type}'`);184}185}186return v;187};188189getAll = ({ filter }: { filter?: string } = {}): FullItem[] => {190if (this.dkv == null) {191throw Error("not initialized");192}193const all = this.dkv.getAll();194if (filter) {195filter = filter.toLowerCase();196}197const v: FullItem[] = [];198for (const key of Object.keys(all)) {199const { name, type } = this.decodeKey(key);200if (filter) {201const { desc } = all[key];202const s = `${desc ? JSON.stringify(desc) : ""} ${name}`.toLowerCase();203if (!s.includes(filter)) {204continue;205}206}207v.push({ ...all[key], name, type });208}209return v;210};211212close = async () => {213await this.dkv?.close();214delete this.dkv;215};216217private sortedKeys = (all, sort0: Sort) => {218let reverse: boolean, sort: string;219if (sort0[0] == "-") {220reverse = true;221sort = sort0.slice(1);222} else {223reverse = false;224sort = sort0;225}226// return keys of all, sorted as specified227const x: { k: string; v: any }[] = [];228for (const k in all) {229x.push({ k, v: { ...all[k], ...this.decodeKey(k) } });230}231x.sort((a, b) => {232const a0 = a.v[sort];233const b0 = b.v[sort];234if (a0 < b0) {235return -1;236}237if (a0 > b0) {238return 1;239}240return 0;241});242const y = x.map(({ k }) => k);243if (reverse) {244y.reverse();245}246return y;247};248249ls = ({250log = console.log,251filter,252noTrunc,253path: path0,254sort = "last",255noHelp,256}: {257log?: Function;258filter?: string;259noTrunc?: boolean;260path?: string;261sort?: Sort;262noHelp?: boolean;263} = {}) => {264if (this.dkv == null) {265throw Error("not initialized");266}267const all = this.dkv.getAll();268if (!noHelp) {269log(270"ls(opts: {filter?: string; noTrunc?: boolean; path?: string; sort?: 'last'|'created'|'count'|'bytes'|'name'|'type'|'-last'|...})",271);272}273274const rows: any[] = [];275for (const key of this.sortedKeys(all, sort)) {276const { last, created, count, bytes, desc, limits } = all[key];277if (path0 && desc?.["path"] != path0) {278continue;279}280let { name, type } = this.decodeKey(key);281if (name.startsWith(DKO_PREFIX)) {282type = "kvobject";283name = name.slice(DKO_PREFIX.length);284}285if (!noTrunc) {286name = trunc_middle(name, 50);287}288if (289filter &&290!`${desc ? JSON.stringify(desc) : ""} ${name}`291.toLowerCase()292.includes(filter.toLowerCase())293) {294continue;295}296rows.push([297type,298name,299dateToString(new Date(created)),300humanReadableSize(bytes),301count,302dateToString(new Date(last)),303desc ? JSON.stringify(desc) : "",304limits != null && Object.keys(limits).length > 0305? JSON.stringify(limits)306: "--",307]);308}309310const table = new AsciiTable3(311`Inventory for ${JSON.stringify(this.options)}`,312)313.setHeading(314"Type",315"Name",316"Created",317"Size",318"Count",319"Last Update",320"Desc",321"Limits",322)323.addRowMatrix(rows);324table.setStyle("unicode-round");325if (!noTrunc) {326table.setWidth(7, 50).setWrapped(1);327table.setWidth(8, 30).setWrapped(1);328}329log(table.toString());330};331}332333function dateToString(d: Date) {334return d.toISOString().replace("T", " ").replace("Z", "").split(".")[0];335}336337export const cache = refCache<Options & { noCache?: boolean }, Inventory>({338name: "inventory",339createObject: async (loc) => {340const k = new Inventory(loc);341await k.init();342return k;343},344});345346export async function inventory(options: Options = {}): Promise<Inventory> {347return await cache(options);348}349350351