Path: blob/main/src/vs/platform/agentHost/node/sessionDatabase.ts
13394 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import * as fs from 'fs';6import { SequencerByKey } from '../../../base/common/async.js';7import type { Database, RunResult } from '@vscode/sqlite3';8import type { IFileEditContent, IFileEditRecord, ISessionDatabase } from '../common/sessionDataService.js';9import { dirname } from '../../../base/common/path.js';1011/**12* A single numbered migration. Migrations are applied in order of13* {@link version} and tracked via `PRAGMA user_version`.14*/15export interface ISessionDatabaseMigration {16/** Monotonically-increasing version number (1-based). */17readonly version: number;18/** SQL to execute for this migration. */19readonly sql: string;20}2122/**23* The set of migrations that define the current session database schema.24* New migrations should be **appended** to this array with the next version25* number. Never reorder or mutate existing entries.26*/27export const sessionDatabaseMigrations: readonly ISessionDatabaseMigration[] = [28{29version: 1,30sql: [31`CREATE TABLE IF NOT EXISTS turns (32id TEXT PRIMARY KEY NOT NULL33)`,34`CREATE TABLE IF NOT EXISTS file_edits (35turn_id TEXT NOT NULL REFERENCES turns(id) ON DELETE CASCADE,36tool_call_id TEXT NOT NULL,37file_path TEXT NOT NULL,38before_content BLOB NOT NULL,39after_content BLOB NOT NULL,40added_lines INTEGER,41removed_lines INTEGER,42PRIMARY KEY (tool_call_id, file_path)43)`,44].join(';\n'),45},46{47version: 2,48sql: `CREATE TABLE IF NOT EXISTS session_metadata (49key TEXT PRIMARY KEY NOT NULL,50value TEXT NOT NULL51)`,52},53{54version: 3,55sql: [56// Recreate file_edits with new columns: edit_type, original_path,57// and nullable before_content/after_content.58`CREATE TABLE file_edits_v3 (59turn_id TEXT NOT NULL REFERENCES turns(id) ON DELETE CASCADE,60tool_call_id TEXT NOT NULL,61file_path TEXT NOT NULL,62edit_type TEXT NOT NULL DEFAULT 'edit',63original_path TEXT,64before_content BLOB,65after_content BLOB,66added_lines INTEGER,67removed_lines INTEGER,68PRIMARY KEY (tool_call_id, file_path)69)`,70`INSERT INTO file_edits_v3 (turn_id, tool_call_id, file_path, edit_type, before_content, after_content, added_lines, removed_lines)71SELECT turn_id, tool_call_id, file_path, 'edit', before_content, after_content, added_lines, removed_lines FROM file_edits`,72`DROP TABLE file_edits`,73`ALTER TABLE file_edits_v3 RENAME TO file_edits`,74].join(';\n'),75},76{77version: 4,78sql: [79`ALTER TABLE turns ADD COLUMN event_id TEXT`,80`CREATE INDEX IF NOT EXISTS idx_turns_event_id ON turns(event_id)`,81].join(';\n'),82},83];8485// ---- Promise wrappers around callback-based @vscode/sqlite3 API -----------8687function dbExec(db: Database, sql: string): Promise<void> {88return new Promise((resolve, reject) => {89db.exec(sql, err => err ? reject(err) : resolve());90});91}9293function dbRun(db: Database, sql: string, params: unknown[]): Promise<{ changes: number; lastID: number }> {94return new Promise((resolve, reject) => {95db.run(sql, params, function (this: RunResult, err: Error | null) {96if (err) {97return reject(err);98}99resolve({ changes: this.changes, lastID: this.lastID });100});101});102}103104function dbGet(db: Database, sql: string, params: unknown[]): Promise<Record<string, unknown> | undefined> {105return new Promise((resolve, reject) => {106db.get(sql, params, (err: Error | null, row: Record<string, unknown> | undefined) => {107if (err) {108return reject(err);109}110resolve(row);111});112});113}114115function dbAll(db: Database, sql: string, params: unknown[]): Promise<Record<string, unknown>[]> {116return new Promise((resolve, reject) => {117db.all(sql, params, (err: Error | null, rows: Record<string, unknown>[]) => {118if (err) {119return reject(err);120}121resolve(rows);122});123});124}125126function dbClose(db: Database): Promise<void> {127return new Promise((resolve, reject) => {128db.close(err => err ? reject(err) : resolve());129});130}131132function dbOpen(path: string): Promise<Database> {133return new Promise((resolve, reject) => {134import('@vscode/sqlite3').then(sqlite3 => {135const db = new sqlite3.default.Database(path, (err: Error | null) => {136if (err) {137return reject(err);138}139resolve(db);140});141}, reject);142});143}144145/**146* Applies any pending {@link ISessionDatabaseMigration migrations} to a147* database. Migrations whose version is greater than the current148* `PRAGMA user_version` are run inside a serialized transaction. After all149* migrations complete the pragma is updated to the highest applied version.150*/151export async function runMigrations(db: Database, migrations: readonly ISessionDatabaseMigration[]): Promise<void> {152// Enable foreign key enforcement — must be set outside a transaction153// and every time a connection is opened.154await dbExec(db, 'PRAGMA foreign_keys = ON');155156const row = await dbGet(db, 'PRAGMA user_version', []);157const currentVersion = (row?.user_version as number | undefined) ?? 0;158159const pending = migrations160.filter(m => m.version > currentVersion)161.sort((a, b) => a.version - b.version);162163if (pending.length === 0) {164return;165}166167await dbExec(db, 'BEGIN TRANSACTION');168try {169for (const migration of pending) {170await dbExec(db, migration.sql);171// PRAGMA cannot be parameterized; the version is a trusted literal.172await dbExec(db, `PRAGMA user_version = ${migration.version}`);173}174await dbExec(db, 'COMMIT');175} catch (err) {176await dbExec(db, 'ROLLBACK');177throw err;178}179}180181/**182* A wrapper around a `@vscode/sqlite3` {@link Database} instance with183* lazy initialisation.184*185* The underlying connection is opened on the first async method call186* (not at construction time), allowing the object to be created187* synchronously and shared via a {@link ReferenceCollection}.188*189* Calling {@link dispose} closes the connection.190*/191export class SessionDatabase implements ISessionDatabase {192193protected _dbPromise: Promise<Database> | undefined;194protected _closed: Promise<void> | true | undefined;195private readonly _fileEditSequencer = new SequencerByKey<string>();196197/**198* Serializes `setMetadata` writes per key. `@vscode/sqlite3` runs in199* parallelized mode, so two `db.run()` calls on the same connection200* can be dispatched to the libuv thread pool and complete out of201* submission order. For "last writer wins" keys (notably `configValues`202* via {@link setMetadata}), that meant a fast-following second write203* could be overtaken by the first and silently lose its value — see204* the "Session Config persistence across restarts" integration test.205* Sequencing by key preserves intra-key order while still allowing206* writes for different keys to run concurrently.207*/208private readonly _metadataSequencer = new SequencerByKey<string>();209210/**211* In-flight write operations. Tracked so {@link whenIdle} can await them212* before the process exits — without this, a `SIGTERM` arriving between213* a fire-and-forget mutating call (e.g. `setMetadata`) being invoked and214* its underlying SQLite query completing would silently drop the write.215* Every public mutating method routes its returned promise through216* {@link _track}; reads (`getMetadata`, `getFileEdits`, ...) skip217* tracking since shutdown does not need to wait for them.218*/219private readonly _pendingWrites = new Set<Promise<unknown>>();220221constructor(222private readonly _path: string,223private readonly _migrations: readonly ISessionDatabaseMigration[] = sessionDatabaseMigrations,224) { }225226/**227* Opens (or creates) a SQLite database at {@link path} and applies228* any pending migrations. Only used in tests where synchronous229* construction + immediate readiness is desired.230*/231static async open(path: string, migrations: readonly ISessionDatabaseMigration[] = sessionDatabaseMigrations): Promise<SessionDatabase> {232const inst = new SessionDatabase(path, migrations);233await inst._ensureDb();234return inst;235}236237protected _ensureDb(): Promise<Database> {238if (this._closed) {239return Promise.reject(new Error('SessionDatabase has been disposed'));240}241if (!this._dbPromise) {242this._dbPromise = (async () => {243// Ensure the parent directory exists before SQLite tries to244// create the database file.245await fs.promises.mkdir(dirname(this._path), { recursive: true });246const db = await dbOpen(this._path);247try {248await runMigrations(db, this._migrations);249} catch (err) {250await dbClose(db);251this._dbPromise = undefined;252throw err;253}254// If dispose() was called while we were opening, close immediately.255if (this._closed) {256await dbClose(db);257throw new Error('SessionDatabase has been disposed');258}259return db;260})();261}262return this._dbPromise;263}264265/**266* Returns the names of all user-created tables in the database.267* Useful for testing migration behavior.268*/269async getAllTables(): Promise<string[]> {270const db = await this._ensureDb();271const rows = await dbAll(db, `SELECT name FROM sqlite_master WHERE type='table' ORDER BY name`, []);272return rows.map(r => r.name as string);273}274275// ---- Turns ----------------------------------------------------------276277createTurn(turnId: string): Promise<void> {278return this._track(async () => {279const db = await this._ensureDb();280await dbRun(db, 'INSERT OR IGNORE INTO turns (id) VALUES (?)', [turnId]);281});282}283284deleteTurn(turnId: string): Promise<void> {285return this._track(async () => {286const db = await this._ensureDb();287await dbRun(db, 'DELETE FROM turns WHERE id = ?', [turnId]);288});289}290291setTurnEventId(turnId: string, eventId: string): Promise<void> {292return this._track(async () => {293const db = await this._ensureDb();294await dbRun(db, 'INSERT OR IGNORE INTO turns (id) VALUES (?)', [turnId]);295// Only set the event ID if not already set — steering messages296// trigger additional user.message events within the same turn,297// and we must preserve the first (boundary) event ID.298await dbRun(db, 'UPDATE turns SET event_id = ? WHERE id = ? AND event_id IS NULL', [eventId, turnId]);299});300}301302async getTurnEventId(turnId: string): Promise<string | undefined> {303const db = await this._ensureDb();304const row = await dbGet(db, 'SELECT event_id FROM turns WHERE id = ?', [turnId]);305return row?.event_id as string | undefined ?? undefined;306}307308async getNextTurnEventId(turnId: string): Promise<string | undefined> {309const db = await this._ensureDb();310const row = await dbGet(311db,312`SELECT event_id FROM turns WHERE rowid > (SELECT rowid FROM turns WHERE id = ?) ORDER BY rowid LIMIT 1`,313[turnId],314);315return row?.event_id as string | undefined ?? undefined;316}317318async getFirstTurnEventId(): Promise<string | undefined> {319const db = await this._ensureDb();320const row = await dbGet(db, 'SELECT event_id FROM turns ORDER BY rowid LIMIT 1', []);321return row?.event_id as string | undefined ?? undefined;322}323324truncateFromTurn(turnId: string): Promise<void> {325return this._track(async () => {326const db = await this._ensureDb();327// Delete the target turn and all turns inserted after it (by rowid order).328// File edits cascade-delete via the foreign key constraint.329await dbRun(db,330`DELETE FROM turns WHERE rowid >= (SELECT rowid FROM turns WHERE id = ?)`,331[turnId],332);333});334}335336deleteTurnsAfter(turnId: string): Promise<void> {337return this._track(async () => {338const db = await this._ensureDb();339// Delete all turns inserted after the given turn (by rowid order),340// keeping the given turn itself.341// File edits cascade-delete via the foreign key constraint.342await dbRun(db,343`DELETE FROM turns WHERE rowid > (SELECT rowid FROM turns WHERE id = ?)`,344[turnId],345);346});347}348349deleteAllTurns(): Promise<void> {350return this._track(async () => {351const db = await this._ensureDb();352await dbExec(db, 'DELETE FROM turns');353});354}355356// ---- File edits -----------------------------------------------------357358storeFileEdit(edit: IFileEditRecord & IFileEditContent): Promise<void> {359return this._track(() => this._fileEditSequencer.queue(edit.filePath, async () => {360const db = await this._ensureDb();361// Ensure the turn exists — lazily insert since the turn record362// may not have been created by an explicit createTurn() call.363await dbRun(db, 'INSERT OR IGNORE INTO turns (id) VALUES (?)', [edit.turnId]);364await dbRun(365db,366`INSERT OR REPLACE INTO file_edits367(turn_id, tool_call_id, file_path, edit_type, original_path, before_content, after_content, added_lines, removed_lines)368VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,369[370edit.turnId,371edit.toolCallId,372edit.filePath,373edit.kind,374edit.originalPath ?? null,375edit.beforeContent ? Buffer.from(edit.beforeContent) : null,376edit.afterContent ? Buffer.from(edit.afterContent) : null,377edit.addedLines ?? null,378edit.removedLines ?? null,379],380);381}));382}383384async getFileEdits(toolCallIds: string[]): Promise<IFileEditRecord[]> {385if (toolCallIds.length === 0) {386return [];387}388const db = await this._ensureDb();389const placeholders = toolCallIds.map(() => '?').join(',');390const rows = await dbAll(391db,392`SELECT turn_id, tool_call_id, file_path, edit_type, original_path, added_lines, removed_lines393FROM file_edits394WHERE tool_call_id IN (${placeholders})395ORDER BY rowid`,396toolCallIds,397);398return rows.map(row => ({399turnId: row.turn_id as string,400toolCallId: row.tool_call_id as string,401filePath: row.file_path as string,402kind: (row.edit_type as IFileEditRecord['kind']) ?? 'edit',403originalPath: row.original_path as string | undefined ?? undefined,404addedLines: row.added_lines as number | undefined ?? undefined,405removedLines: row.removed_lines as number | undefined ?? undefined,406}));407}408409async getAllFileEdits(): Promise<IFileEditRecord[]> {410const db = await this._ensureDb();411const rows = await dbAll(412db,413`SELECT turn_id, tool_call_id, file_path, edit_type, original_path, added_lines, removed_lines414FROM file_edits415ORDER BY rowid`,416[],417);418return rows.map(row => ({419turnId: row.turn_id as string,420toolCallId: row.tool_call_id as string,421filePath: row.file_path as string,422kind: (row.edit_type as IFileEditRecord['kind']) ?? 'edit',423originalPath: row.original_path as string | undefined ?? undefined,424addedLines: row.added_lines as number | undefined ?? undefined,425removedLines: row.removed_lines as number | undefined ?? undefined,426}));427}428429async getFileEditsByTurn(turnId: string): Promise<IFileEditRecord[]> {430const db = await this._ensureDb();431const rows = await dbAll(432db,433`SELECT turn_id, tool_call_id, file_path, edit_type, original_path, added_lines, removed_lines434FROM file_edits435WHERE turn_id = ?436ORDER BY rowid`,437[turnId],438);439return rows.map(row => ({440turnId: row.turn_id as string,441toolCallId: row.tool_call_id as string,442filePath: row.file_path as string,443kind: (row.edit_type as IFileEditRecord['kind']) ?? 'edit',444originalPath: row.original_path as string | undefined ?? undefined,445addedLines: row.added_lines as number | undefined ?? undefined,446removedLines: row.removed_lines as number | undefined ?? undefined,447}));448}449450async readFileEditContent(toolCallId: string, filePath: string): Promise<IFileEditContent | undefined> {451return this._fileEditSequencer.queue(filePath, async () => {452const db = await this._ensureDb();453const row = await dbGet(454db,455`SELECT before_content, after_content456FROM file_edits457WHERE tool_call_id = ? AND file_path = ?`,458[toolCallId, filePath],459);460if (!row) {461return undefined;462}463return {464beforeContent: row.before_content ? toUint8Array(row.before_content) : undefined,465afterContent: row.after_content ? toUint8Array(row.after_content) : undefined,466};467});468}469470// ---- Session metadata -----------------------------------------------471472async getMetadata(key: string): Promise<string | undefined> {473const db = await this._ensureDb();474const row = await dbGet(db, 'SELECT value FROM session_metadata WHERE key = ?', [key]);475return row?.value as string | undefined;476}477478async getMetadataObject<T extends Record<string, unknown>>(obj: T): Promise<{ [K in keyof T]: string | undefined }> {479const keys = Object.keys(obj) as (keyof T & string)[];480// eslint-disable-next-line local/code-no-dangerous-type-assertions481const result = {} as { [K in keyof T]: string | undefined };482if (keys.length === 0) {483return result;484}485const db = await this._ensureDb();486const placeholders = keys.map(() => '?').join(',');487const rows = await dbAll(db, `SELECT key, value FROM session_metadata WHERE key IN (${placeholders})`, keys);488for (const key of keys) {489result[key] = undefined;490}491for (const row of rows) {492result[row.key as keyof T] = row.value as string;493}494return result;495}496497setMetadata(key: string, value: string): Promise<void> {498return this._track(() => this._metadataSequencer.queue(key, async () => {499const db = await this._ensureDb();500await dbRun(db, 'INSERT OR REPLACE INTO session_metadata (key, value) VALUES (?, ?)', [key, value]);501}));502}503504remapTurnIds(mapping: ReadonlyMap<string, string>): Promise<void> {505return this._track(async () => {506const db = await this._ensureDb();507// Defer FK checks to commit time so we can update turns.id and508// file_edits.turn_id in any order without mid-statement violations.509// This pragma auto-resets after the transaction ends.510await dbExec(db, 'PRAGMA defer_foreign_keys = ON');511await dbExec(db, 'BEGIN TRANSACTION');512try {513// Delete turns not present in the mapping (e.g. turns beyond514// the fork point). File edits cascade-delete via FK.515const oldIds = [...mapping.keys()];516if (oldIds.length > 0) {517const placeholders = oldIds.map(() => '?').join(',');518await dbRun(db,519`DELETE FROM turns WHERE id NOT IN (${placeholders})`,520oldIds,521);522}523524// Remap the remaining turn IDs to their new values525for (const [oldId, newId] of mapping) {526await dbRun(db, 'UPDATE turns SET id = ? WHERE id = ?', [newId, oldId]);527await dbRun(db, 'UPDATE file_edits SET turn_id = ? WHERE turn_id = ?', [newId, oldId]);528}529await dbExec(db, 'COMMIT');530} catch (err) {531await dbExec(db, 'ROLLBACK');532throw err;533}534});535}536537/**538* Resolves once all currently in-flight write operations have settled.539* Used by graceful shutdown to flush pending fire-and-forget writes540* before the process exits. Should be called from a path where no541* further writes are expected; loops until idle to also drain any542* writes that get queued while we're awaiting.543*/544async whenIdle(): Promise<void> {545while (this._pendingWrites.size > 0) {546await Promise.allSettled([...this._pendingWrites]);547}548}549550async vacuumInto(targetPath: string) {551const db = await this._ensureDb();552await dbRun(db, 'VACUUM INTO ?', [targetPath]);553}554555/**556* Wrap a mutating operation's promise so {@link whenIdle} can await it.557* Invoke at the **outermost** layer of every public mutating method so558* that any internal awaits (notably `_ensureDb()`) are covered too —559* tracking only the leaf `dbRun`/`dbExec` would miss the window560* between the method being called and the query actually being queued.561*/562private _track<T>(fn: () => Promise<T>): Promise<T> {563const p = fn();564this._pendingWrites.add(p);565const untrack = () => { this._pendingWrites.delete(p); };566p.then(untrack, untrack);567return p;568}569570async close() {571await (this._closed ??= this._dbPromise?.then(db => db.close()).catch(() => { }) || true);572}573574dispose(): void {575this.close();576}577}578579function toUint8Array(value: unknown): Uint8Array {580if (value instanceof Buffer) {581return new Uint8Array(value.buffer, value.byteOffset, value.byteLength);582}583if (value instanceof Uint8Array) {584return value;585}586if (typeof value === 'string') {587return new TextEncoder().encode(value);588}589return new Uint8Array(0);590}591592593