Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/platform/agentHost/node/sessionDatabase.ts
13394 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import * as fs from 'fs';
7
import { SequencerByKey } from '../../../base/common/async.js';
8
import type { Database, RunResult } from '@vscode/sqlite3';
9
import type { IFileEditContent, IFileEditRecord, ISessionDatabase } from '../common/sessionDataService.js';
10
import { dirname } from '../../../base/common/path.js';
11
12
/**
13
* A single numbered migration. Migrations are applied in order of
14
* {@link version} and tracked via `PRAGMA user_version`.
15
*/
16
export interface ISessionDatabaseMigration {
17
/** Monotonically-increasing version number (1-based). */
18
readonly version: number;
19
/** SQL to execute for this migration. */
20
readonly sql: string;
21
}
22
23
/**
24
* The set of migrations that define the current session database schema.
25
* New migrations should be **appended** to this array with the next version
26
* number. Never reorder or mutate existing entries.
27
*/
28
export const sessionDatabaseMigrations: readonly ISessionDatabaseMigration[] = [
29
{
30
version: 1,
31
sql: [
32
`CREATE TABLE IF NOT EXISTS turns (
33
id TEXT PRIMARY KEY NOT NULL
34
)`,
35
`CREATE TABLE IF NOT EXISTS file_edits (
36
turn_id TEXT NOT NULL REFERENCES turns(id) ON DELETE CASCADE,
37
tool_call_id TEXT NOT NULL,
38
file_path TEXT NOT NULL,
39
before_content BLOB NOT NULL,
40
after_content BLOB NOT NULL,
41
added_lines INTEGER,
42
removed_lines INTEGER,
43
PRIMARY KEY (tool_call_id, file_path)
44
)`,
45
].join(';\n'),
46
},
47
{
48
version: 2,
49
sql: `CREATE TABLE IF NOT EXISTS session_metadata (
50
key TEXT PRIMARY KEY NOT NULL,
51
value TEXT NOT NULL
52
)`,
53
},
54
{
55
version: 3,
56
sql: [
57
// Recreate file_edits with new columns: edit_type, original_path,
58
// and nullable before_content/after_content.
59
`CREATE TABLE file_edits_v3 (
60
turn_id TEXT NOT NULL REFERENCES turns(id) ON DELETE CASCADE,
61
tool_call_id TEXT NOT NULL,
62
file_path TEXT NOT NULL,
63
edit_type TEXT NOT NULL DEFAULT 'edit',
64
original_path TEXT,
65
before_content BLOB,
66
after_content BLOB,
67
added_lines INTEGER,
68
removed_lines INTEGER,
69
PRIMARY KEY (tool_call_id, file_path)
70
)`,
71
`INSERT INTO file_edits_v3 (turn_id, tool_call_id, file_path, edit_type, before_content, after_content, added_lines, removed_lines)
72
SELECT turn_id, tool_call_id, file_path, 'edit', before_content, after_content, added_lines, removed_lines FROM file_edits`,
73
`DROP TABLE file_edits`,
74
`ALTER TABLE file_edits_v3 RENAME TO file_edits`,
75
].join(';\n'),
76
},
77
{
78
version: 4,
79
sql: [
80
`ALTER TABLE turns ADD COLUMN event_id TEXT`,
81
`CREATE INDEX IF NOT EXISTS idx_turns_event_id ON turns(event_id)`,
82
].join(';\n'),
83
},
84
];
85
86
// ---- Promise wrappers around callback-based @vscode/sqlite3 API -----------
87
88
function dbExec(db: Database, sql: string): Promise<void> {
89
return new Promise((resolve, reject) => {
90
db.exec(sql, err => err ? reject(err) : resolve());
91
});
92
}
93
94
function dbRun(db: Database, sql: string, params: unknown[]): Promise<{ changes: number; lastID: number }> {
95
return new Promise((resolve, reject) => {
96
db.run(sql, params, function (this: RunResult, err: Error | null) {
97
if (err) {
98
return reject(err);
99
}
100
resolve({ changes: this.changes, lastID: this.lastID });
101
});
102
});
103
}
104
105
function dbGet(db: Database, sql: string, params: unknown[]): Promise<Record<string, unknown> | undefined> {
106
return new Promise((resolve, reject) => {
107
db.get(sql, params, (err: Error | null, row: Record<string, unknown> | undefined) => {
108
if (err) {
109
return reject(err);
110
}
111
resolve(row);
112
});
113
});
114
}
115
116
function dbAll(db: Database, sql: string, params: unknown[]): Promise<Record<string, unknown>[]> {
117
return new Promise((resolve, reject) => {
118
db.all(sql, params, (err: Error | null, rows: Record<string, unknown>[]) => {
119
if (err) {
120
return reject(err);
121
}
122
resolve(rows);
123
});
124
});
125
}
126
127
function dbClose(db: Database): Promise<void> {
128
return new Promise((resolve, reject) => {
129
db.close(err => err ? reject(err) : resolve());
130
});
131
}
132
133
function dbOpen(path: string): Promise<Database> {
134
return new Promise((resolve, reject) => {
135
import('@vscode/sqlite3').then(sqlite3 => {
136
const db = new sqlite3.default.Database(path, (err: Error | null) => {
137
if (err) {
138
return reject(err);
139
}
140
resolve(db);
141
});
142
}, reject);
143
});
144
}
145
146
/**
147
* Applies any pending {@link ISessionDatabaseMigration migrations} to a
148
* database. Migrations whose version is greater than the current
149
* `PRAGMA user_version` are run inside a serialized transaction. After all
150
* migrations complete the pragma is updated to the highest applied version.
151
*/
152
export async function runMigrations(db: Database, migrations: readonly ISessionDatabaseMigration[]): Promise<void> {
153
// Enable foreign key enforcement — must be set outside a transaction
154
// and every time a connection is opened.
155
await dbExec(db, 'PRAGMA foreign_keys = ON');
156
157
const row = await dbGet(db, 'PRAGMA user_version', []);
158
const currentVersion = (row?.user_version as number | undefined) ?? 0;
159
160
const pending = migrations
161
.filter(m => m.version > currentVersion)
162
.sort((a, b) => a.version - b.version);
163
164
if (pending.length === 0) {
165
return;
166
}
167
168
await dbExec(db, 'BEGIN TRANSACTION');
169
try {
170
for (const migration of pending) {
171
await dbExec(db, migration.sql);
172
// PRAGMA cannot be parameterized; the version is a trusted literal.
173
await dbExec(db, `PRAGMA user_version = ${migration.version}`);
174
}
175
await dbExec(db, 'COMMIT');
176
} catch (err) {
177
await dbExec(db, 'ROLLBACK');
178
throw err;
179
}
180
}
181
182
/**
183
* A wrapper around a `@vscode/sqlite3` {@link Database} instance with
184
* lazy initialisation.
185
*
186
* The underlying connection is opened on the first async method call
187
* (not at construction time), allowing the object to be created
188
* synchronously and shared via a {@link ReferenceCollection}.
189
*
190
* Calling {@link dispose} closes the connection.
191
*/
192
export class SessionDatabase implements ISessionDatabase {
193
194
protected _dbPromise: Promise<Database> | undefined;
195
protected _closed: Promise<void> | true | undefined;
196
private readonly _fileEditSequencer = new SequencerByKey<string>();
197
198
/**
199
* Serializes `setMetadata` writes per key. `@vscode/sqlite3` runs in
200
* parallelized mode, so two `db.run()` calls on the same connection
201
* can be dispatched to the libuv thread pool and complete out of
202
* submission order. For "last writer wins" keys (notably `configValues`
203
* via {@link setMetadata}), that meant a fast-following second write
204
* could be overtaken by the first and silently lose its value — see
205
* the "Session Config persistence across restarts" integration test.
206
* Sequencing by key preserves intra-key order while still allowing
207
* writes for different keys to run concurrently.
208
*/
209
private readonly _metadataSequencer = new SequencerByKey<string>();
210
211
/**
212
* In-flight write operations. Tracked so {@link whenIdle} can await them
213
* before the process exits — without this, a `SIGTERM` arriving between
214
* a fire-and-forget mutating call (e.g. `setMetadata`) being invoked and
215
* its underlying SQLite query completing would silently drop the write.
216
* Every public mutating method routes its returned promise through
217
* {@link _track}; reads (`getMetadata`, `getFileEdits`, ...) skip
218
* tracking since shutdown does not need to wait for them.
219
*/
220
private readonly _pendingWrites = new Set<Promise<unknown>>();
221
222
constructor(
223
private readonly _path: string,
224
private readonly _migrations: readonly ISessionDatabaseMigration[] = sessionDatabaseMigrations,
225
) { }
226
227
/**
228
* Opens (or creates) a SQLite database at {@link path} and applies
229
* any pending migrations. Only used in tests where synchronous
230
* construction + immediate readiness is desired.
231
*/
232
static async open(path: string, migrations: readonly ISessionDatabaseMigration[] = sessionDatabaseMigrations): Promise<SessionDatabase> {
233
const inst = new SessionDatabase(path, migrations);
234
await inst._ensureDb();
235
return inst;
236
}
237
238
protected _ensureDb(): Promise<Database> {
239
if (this._closed) {
240
return Promise.reject(new Error('SessionDatabase has been disposed'));
241
}
242
if (!this._dbPromise) {
243
this._dbPromise = (async () => {
244
// Ensure the parent directory exists before SQLite tries to
245
// create the database file.
246
await fs.promises.mkdir(dirname(this._path), { recursive: true });
247
const db = await dbOpen(this._path);
248
try {
249
await runMigrations(db, this._migrations);
250
} catch (err) {
251
await dbClose(db);
252
this._dbPromise = undefined;
253
throw err;
254
}
255
// If dispose() was called while we were opening, close immediately.
256
if (this._closed) {
257
await dbClose(db);
258
throw new Error('SessionDatabase has been disposed');
259
}
260
return db;
261
})();
262
}
263
return this._dbPromise;
264
}
265
266
/**
267
* Returns the names of all user-created tables in the database.
268
* Useful for testing migration behavior.
269
*/
270
async getAllTables(): Promise<string[]> {
271
const db = await this._ensureDb();
272
const rows = await dbAll(db, `SELECT name FROM sqlite_master WHERE type='table' ORDER BY name`, []);
273
return rows.map(r => r.name as string);
274
}
275
276
// ---- Turns ----------------------------------------------------------
277
278
createTurn(turnId: string): Promise<void> {
279
return this._track(async () => {
280
const db = await this._ensureDb();
281
await dbRun(db, 'INSERT OR IGNORE INTO turns (id) VALUES (?)', [turnId]);
282
});
283
}
284
285
deleteTurn(turnId: string): Promise<void> {
286
return this._track(async () => {
287
const db = await this._ensureDb();
288
await dbRun(db, 'DELETE FROM turns WHERE id = ?', [turnId]);
289
});
290
}
291
292
setTurnEventId(turnId: string, eventId: string): Promise<void> {
293
return this._track(async () => {
294
const db = await this._ensureDb();
295
await dbRun(db, 'INSERT OR IGNORE INTO turns (id) VALUES (?)', [turnId]);
296
// Only set the event ID if not already set — steering messages
297
// trigger additional user.message events within the same turn,
298
// and we must preserve the first (boundary) event ID.
299
await dbRun(db, 'UPDATE turns SET event_id = ? WHERE id = ? AND event_id IS NULL', [eventId, turnId]);
300
});
301
}
302
303
async getTurnEventId(turnId: string): Promise<string | undefined> {
304
const db = await this._ensureDb();
305
const row = await dbGet(db, 'SELECT event_id FROM turns WHERE id = ?', [turnId]);
306
return row?.event_id as string | undefined ?? undefined;
307
}
308
309
async getNextTurnEventId(turnId: string): Promise<string | undefined> {
310
const db = await this._ensureDb();
311
const row = await dbGet(
312
db,
313
`SELECT event_id FROM turns WHERE rowid > (SELECT rowid FROM turns WHERE id = ?) ORDER BY rowid LIMIT 1`,
314
[turnId],
315
);
316
return row?.event_id as string | undefined ?? undefined;
317
}
318
319
async getFirstTurnEventId(): Promise<string | undefined> {
320
const db = await this._ensureDb();
321
const row = await dbGet(db, 'SELECT event_id FROM turns ORDER BY rowid LIMIT 1', []);
322
return row?.event_id as string | undefined ?? undefined;
323
}
324
325
truncateFromTurn(turnId: string): Promise<void> {
326
return this._track(async () => {
327
const db = await this._ensureDb();
328
// Delete the target turn and all turns inserted after it (by rowid order).
329
// File edits cascade-delete via the foreign key constraint.
330
await dbRun(db,
331
`DELETE FROM turns WHERE rowid >= (SELECT rowid FROM turns WHERE id = ?)`,
332
[turnId],
333
);
334
});
335
}
336
337
deleteTurnsAfter(turnId: string): Promise<void> {
338
return this._track(async () => {
339
const db = await this._ensureDb();
340
// Delete all turns inserted after the given turn (by rowid order),
341
// keeping the given turn itself.
342
// File edits cascade-delete via the foreign key constraint.
343
await dbRun(db,
344
`DELETE FROM turns WHERE rowid > (SELECT rowid FROM turns WHERE id = ?)`,
345
[turnId],
346
);
347
});
348
}
349
350
deleteAllTurns(): Promise<void> {
351
return this._track(async () => {
352
const db = await this._ensureDb();
353
await dbExec(db, 'DELETE FROM turns');
354
});
355
}
356
357
// ---- File edits -----------------------------------------------------
358
359
storeFileEdit(edit: IFileEditRecord & IFileEditContent): Promise<void> {
360
return this._track(() => this._fileEditSequencer.queue(edit.filePath, async () => {
361
const db = await this._ensureDb();
362
// Ensure the turn exists — lazily insert since the turn record
363
// may not have been created by an explicit createTurn() call.
364
await dbRun(db, 'INSERT OR IGNORE INTO turns (id) VALUES (?)', [edit.turnId]);
365
await dbRun(
366
db,
367
`INSERT OR REPLACE INTO file_edits
368
(turn_id, tool_call_id, file_path, edit_type, original_path, before_content, after_content, added_lines, removed_lines)
369
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
370
[
371
edit.turnId,
372
edit.toolCallId,
373
edit.filePath,
374
edit.kind,
375
edit.originalPath ?? null,
376
edit.beforeContent ? Buffer.from(edit.beforeContent) : null,
377
edit.afterContent ? Buffer.from(edit.afterContent) : null,
378
edit.addedLines ?? null,
379
edit.removedLines ?? null,
380
],
381
);
382
}));
383
}
384
385
async getFileEdits(toolCallIds: string[]): Promise<IFileEditRecord[]> {
386
if (toolCallIds.length === 0) {
387
return [];
388
}
389
const db = await this._ensureDb();
390
const placeholders = toolCallIds.map(() => '?').join(',');
391
const rows = await dbAll(
392
db,
393
`SELECT turn_id, tool_call_id, file_path, edit_type, original_path, added_lines, removed_lines
394
FROM file_edits
395
WHERE tool_call_id IN (${placeholders})
396
ORDER BY rowid`,
397
toolCallIds,
398
);
399
return rows.map(row => ({
400
turnId: row.turn_id as string,
401
toolCallId: row.tool_call_id as string,
402
filePath: row.file_path as string,
403
kind: (row.edit_type as IFileEditRecord['kind']) ?? 'edit',
404
originalPath: row.original_path as string | undefined ?? undefined,
405
addedLines: row.added_lines as number | undefined ?? undefined,
406
removedLines: row.removed_lines as number | undefined ?? undefined,
407
}));
408
}
409
410
async getAllFileEdits(): Promise<IFileEditRecord[]> {
411
const db = await this._ensureDb();
412
const rows = await dbAll(
413
db,
414
`SELECT turn_id, tool_call_id, file_path, edit_type, original_path, added_lines, removed_lines
415
FROM file_edits
416
ORDER BY rowid`,
417
[],
418
);
419
return rows.map(row => ({
420
turnId: row.turn_id as string,
421
toolCallId: row.tool_call_id as string,
422
filePath: row.file_path as string,
423
kind: (row.edit_type as IFileEditRecord['kind']) ?? 'edit',
424
originalPath: row.original_path as string | undefined ?? undefined,
425
addedLines: row.added_lines as number | undefined ?? undefined,
426
removedLines: row.removed_lines as number | undefined ?? undefined,
427
}));
428
}
429
430
async getFileEditsByTurn(turnId: string): Promise<IFileEditRecord[]> {
431
const db = await this._ensureDb();
432
const rows = await dbAll(
433
db,
434
`SELECT turn_id, tool_call_id, file_path, edit_type, original_path, added_lines, removed_lines
435
FROM file_edits
436
WHERE turn_id = ?
437
ORDER BY rowid`,
438
[turnId],
439
);
440
return rows.map(row => ({
441
turnId: row.turn_id as string,
442
toolCallId: row.tool_call_id as string,
443
filePath: row.file_path as string,
444
kind: (row.edit_type as IFileEditRecord['kind']) ?? 'edit',
445
originalPath: row.original_path as string | undefined ?? undefined,
446
addedLines: row.added_lines as number | undefined ?? undefined,
447
removedLines: row.removed_lines as number | undefined ?? undefined,
448
}));
449
}
450
451
async readFileEditContent(toolCallId: string, filePath: string): Promise<IFileEditContent | undefined> {
452
return this._fileEditSequencer.queue(filePath, async () => {
453
const db = await this._ensureDb();
454
const row = await dbGet(
455
db,
456
`SELECT before_content, after_content
457
FROM file_edits
458
WHERE tool_call_id = ? AND file_path = ?`,
459
[toolCallId, filePath],
460
);
461
if (!row) {
462
return undefined;
463
}
464
return {
465
beforeContent: row.before_content ? toUint8Array(row.before_content) : undefined,
466
afterContent: row.after_content ? toUint8Array(row.after_content) : undefined,
467
};
468
});
469
}
470
471
// ---- Session metadata -----------------------------------------------
472
473
async getMetadata(key: string): Promise<string | undefined> {
474
const db = await this._ensureDb();
475
const row = await dbGet(db, 'SELECT value FROM session_metadata WHERE key = ?', [key]);
476
return row?.value as string | undefined;
477
}
478
479
async getMetadataObject<T extends Record<string, unknown>>(obj: T): Promise<{ [K in keyof T]: string | undefined }> {
480
const keys = Object.keys(obj) as (keyof T & string)[];
481
// eslint-disable-next-line local/code-no-dangerous-type-assertions
482
const result = {} as { [K in keyof T]: string | undefined };
483
if (keys.length === 0) {
484
return result;
485
}
486
const db = await this._ensureDb();
487
const placeholders = keys.map(() => '?').join(',');
488
const rows = await dbAll(db, `SELECT key, value FROM session_metadata WHERE key IN (${placeholders})`, keys);
489
for (const key of keys) {
490
result[key] = undefined;
491
}
492
for (const row of rows) {
493
result[row.key as keyof T] = row.value as string;
494
}
495
return result;
496
}
497
498
setMetadata(key: string, value: string): Promise<void> {
499
return this._track(() => this._metadataSequencer.queue(key, async () => {
500
const db = await this._ensureDb();
501
await dbRun(db, 'INSERT OR REPLACE INTO session_metadata (key, value) VALUES (?, ?)', [key, value]);
502
}));
503
}
504
505
remapTurnIds(mapping: ReadonlyMap<string, string>): Promise<void> {
506
return this._track(async () => {
507
const db = await this._ensureDb();
508
// Defer FK checks to commit time so we can update turns.id and
509
// file_edits.turn_id in any order without mid-statement violations.
510
// This pragma auto-resets after the transaction ends.
511
await dbExec(db, 'PRAGMA defer_foreign_keys = ON');
512
await dbExec(db, 'BEGIN TRANSACTION');
513
try {
514
// Delete turns not present in the mapping (e.g. turns beyond
515
// the fork point). File edits cascade-delete via FK.
516
const oldIds = [...mapping.keys()];
517
if (oldIds.length > 0) {
518
const placeholders = oldIds.map(() => '?').join(',');
519
await dbRun(db,
520
`DELETE FROM turns WHERE id NOT IN (${placeholders})`,
521
oldIds,
522
);
523
}
524
525
// Remap the remaining turn IDs to their new values
526
for (const [oldId, newId] of mapping) {
527
await dbRun(db, 'UPDATE turns SET id = ? WHERE id = ?', [newId, oldId]);
528
await dbRun(db, 'UPDATE file_edits SET turn_id = ? WHERE turn_id = ?', [newId, oldId]);
529
}
530
await dbExec(db, 'COMMIT');
531
} catch (err) {
532
await dbExec(db, 'ROLLBACK');
533
throw err;
534
}
535
});
536
}
537
538
/**
539
* Resolves once all currently in-flight write operations have settled.
540
* Used by graceful shutdown to flush pending fire-and-forget writes
541
* before the process exits. Should be called from a path where no
542
* further writes are expected; loops until idle to also drain any
543
* writes that get queued while we're awaiting.
544
*/
545
async whenIdle(): Promise<void> {
546
while (this._pendingWrites.size > 0) {
547
await Promise.allSettled([...this._pendingWrites]);
548
}
549
}
550
551
async vacuumInto(targetPath: string) {
552
const db = await this._ensureDb();
553
await dbRun(db, 'VACUUM INTO ?', [targetPath]);
554
}
555
556
/**
557
* Wrap a mutating operation's promise so {@link whenIdle} can await it.
558
* Invoke at the **outermost** layer of every public mutating method so
559
* that any internal awaits (notably `_ensureDb()`) are covered too —
560
* tracking only the leaf `dbRun`/`dbExec` would miss the window
561
* between the method being called and the query actually being queued.
562
*/
563
private _track<T>(fn: () => Promise<T>): Promise<T> {
564
const p = fn();
565
this._pendingWrites.add(p);
566
const untrack = () => { this._pendingWrites.delete(p); };
567
p.then(untrack, untrack);
568
return p;
569
}
570
571
async close() {
572
await (this._closed ??= this._dbPromise?.then(db => db.close()).catch(() => { }) || true);
573
}
574
575
dispose(): void {
576
this.close();
577
}
578
}
579
580
function toUint8Array(value: unknown): Uint8Array {
581
if (value instanceof Buffer) {
582
return new Uint8Array(value.buffer, value.byteOffset, value.byteLength);
583
}
584
if (value instanceof Uint8Array) {
585
return value;
586
}
587
if (typeof value === 'string') {
588
return new TextEncoder().encode(value);
589
}
590
return new Uint8Array(0);
591
}
592
593