Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
ulixee
GitHub Repository: ulixee/secret-agent
Path: blob/main/commons/SqliteTable.ts
1028 views
1
import type { Database as SqliteDatabase, Statement } from 'better-sqlite3';
2
3
type SqliteTypes = 'INTEGER' | 'TEXT' | 'BLOB';
4
type IRecord = (string | number | Buffer)[];
5
6
export default abstract class SqliteTable<T> {
7
protected readonly insertStatement: Statement;
8
protected defaultSortOrder?: string;
9
protected insertCallbackFn?: (records: T[]) => void;
10
11
protected pendingInserts: IRecord[] = [];
12
13
private insertSubscriptionRecords: T[] = [];
14
private subscriptionThrottle: NodeJS.Timeout;
15
private lastSubscriptionPublishTime: Date;
16
17
protected constructor(
18
readonly db: SqliteDatabase,
19
readonly tableName: string,
20
readonly columns: [keyof T, SqliteTypes, string?][],
21
private insertOrReplace = false,
22
) {
23
if (!db.readonly) {
24
this.db.exec(this.createTableStatement());
25
this.insertStatement = this.db.prepare(this.buildInsertStatement());
26
}
27
}
28
29
public findPendingInserts(cb: (record: IRecord) => boolean): IRecord[] {
30
return this.pendingInserts.filter(cb);
31
}
32
33
public findPendingRecords(cb: (record: IRecord) => boolean): T[] {
34
return this.pendingInserts.filter(cb).map(pending => {
35
const result: any = {};
36
for (let i = 0; i < pending.length; i += 1) {
37
const col = this.columns[i];
38
result[col[0]] = pending[i];
39
}
40
return result;
41
});
42
}
43
44
public subscribe(callbackFn: (records: T[]) => void): void {
45
this.insertCallbackFn = callbackFn;
46
const pendingRecords = this.pendingInserts.map(x => this.insertToObject(x));
47
this.lastSubscriptionPublishTime = new Date();
48
process.nextTick(callbackFn, this.all().concat(pendingRecords));
49
}
50
51
public unsubscribe(): void {
52
this.insertCallbackFn = null;
53
}
54
55
public runPendingInserts(): void {
56
const records = [...this.pendingInserts];
57
this.pendingInserts.length = 0;
58
59
for (const record of records) {
60
this.insertStatement.run(...record);
61
}
62
}
63
64
public insertNow(record: IRecord): void {
65
this.insertStatement.run(...record);
66
this.addRecordToPublish(record);
67
}
68
69
public all(): T[] {
70
const sort = this.defaultSortOrder ? ` ORDER BY ${this.defaultSortOrder}` : '';
71
return this.db.prepare(`select * from ${this.tableName}${sort}`).all() as T[];
72
}
73
74
protected queuePendingInsert(record: IRecord): void {
75
this.pendingInserts.push(record);
76
this.addRecordToPublish(record);
77
}
78
79
protected buildInsertStatement(): string {
80
const keys = this.columns.map(x => x[0]);
81
const params = keys.map(() => '?').join(', ');
82
const insertOrReplace = this.insertOrReplace ? ' OR REPLACE' : '';
83
return `INSERT${insertOrReplace} INTO ${this.tableName} (${keys.join(
84
', ',
85
)}) VALUES (${params})`;
86
}
87
88
private addRecordToPublish(record: IRecord): void {
89
if (!this.insertCallbackFn) return;
90
this.insertSubscriptionRecords.push(this.insertToObject(record));
91
if (new Date().getTime() - this.lastSubscriptionPublishTime.getTime() > 500) {
92
return this.publishPendingRecords();
93
}
94
clearTimeout(this.subscriptionThrottle);
95
this.subscriptionThrottle = setTimeout(this.publishPendingRecords.bind(this), 100).unref();
96
}
97
98
private publishPendingRecords(): void {
99
if (!this.insertCallbackFn) return;
100
const records = [...this.insertSubscriptionRecords];
101
this.insertSubscriptionRecords.length = 0;
102
this.lastSubscriptionPublishTime = new Date();
103
this.insertCallbackFn(records);
104
}
105
106
private createTableStatement(): string {
107
const definitions = this.columns.map(x => {
108
let columnDef = `${x[0]} ${x[1]}`;
109
if (x.length > 2) columnDef = `${columnDef} ${x[2]}`;
110
return columnDef;
111
});
112
return `CREATE TABLE IF NOT EXISTS ${this.tableName} (${definitions})`;
113
}
114
115
private insertToObject(record: IRecord): T {
116
const result: any = {};
117
for (let i = 0; i < record.length; i += 1) {
118
if (record[i] !== null) result[this.columns[i][0]] = record[i];
119
}
120
return result as T;
121
}
122
}
123
124