Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/database/postgres/schema/sync.ts
Views: 687
import { getClient, Client } from "@cocalc/database/pool";1import type { DBSchema, TableSchema } from "./types";2import { quoteField } from "./util";3import { pgType } from "./pg-type";4import { createIndexesQueries } from "./indexes";5import { createTable } from "./table";6import getLogger from "@cocalc/backend/logger";7import { SCHEMA } from "@cocalc/util/schema";8import { dropDeprecatedTables } from "./drop-deprecated-tables";9import { primaryKeys } from "./table";10import { isEqual } from "lodash";1112const log = getLogger("db:schema:sync");1314async function syncTableSchema(db: Client, schema: TableSchema): Promise<void> {15const dbg = (...args) => log.debug("syncTableSchema", schema.name, ...args);16dbg();17if (schema.virtual) {18dbg("nothing to do -- table is virtual");19return;20}21await syncTableSchemaColumns(db, schema);22await syncTableSchemaIndexes(db, schema);23await syncTableSchemaPrimaryKeys(db, schema);24}2526async function getColumnTypeInfo(27db: Client,28table: string,29): Promise<{ [column_name: string]: string }> {30// may from column to type info31const columns: { [column_name: string]: string } = {};3233const { rows } = await db.query(34"SELECT column_name, data_type, character_maximum_length FROM information_schema.columns WHERE table_name=$1",35[table],36);3738for (const y of rows) {39if (y.character_maximum_length) {40columns[y.column_name] = `varchar(${y.character_maximum_length})`;41} else {42columns[y.column_name] = y.data_type;43}44}4546return columns;47}4849async function alterColumnOfTable(50db: Client,51schema: TableSchema,52action: "alter" | "add",53column: string,54): Promise<void> {55// Note: changing column ordering is NOT supported in PostgreSQL, so56// it's critical to not depend on it!57// https://wiki.postgresql.org/wiki/Alter_column_position58const qTable = quoteField(schema.name);5960const info = schema.fields[column];61if (info == null) throw Error(`invalid column ${column}`);62const col = quoteField(column);63const type = pgType(info);64let desc = type;65if (info.unique) {66desc += " UNIQUE";67}68if (info.pg_check) {69desc += " " + info.pg_check;70}71if (action == "alter") {72log.debug(73"alterColumnOfTable",74schema.name,75"alter this column's type:",76col,77);78await db.query(79`ALTER TABLE ${qTable} ALTER COLUMN ${col} TYPE ${desc} USING ${col}::${type}`,80);81} else if (action == "add") {82log.debug("alterColumnOfTable", schema.name, "add this column:", col);83await db.query(`ALTER TABLE ${qTable} ADD COLUMN ${col} ${desc}`);84} else {85throw Error(`unknown action '${action}`);86}87}8889async function syncTableSchemaColumns(90db: Client,91schema: TableSchema,92): Promise<void> {93log.debug("syncTableSchemaColumns", "table = ", schema.name);94const columnTypeInfo = await getColumnTypeInfo(db, schema.name);9596for (const column in schema.fields) {97const info = schema.fields[column];98let cur_type = columnTypeInfo[column]?.toLowerCase();99if (cur_type != null) {100cur_type = cur_type.split(" ")[0];101}102let goal_type = pgType(info).toLowerCase().split(" ")[0];103if (goal_type === "serial") {104// We can't do anything with this (or we could, but it's way too complicated).105continue;106}107if (goal_type.slice(0, 4) === "char") {108// we do NOT support changing between fixed length and variable length strength109goal_type = "var" + goal_type;110}111if (cur_type == null) {112// column is in our schema, but not in the actual database113await alterColumnOfTable(db, schema, "add", column);114} else if (cur_type !== goal_type) {115if (goal_type.includes("[]") || goal_type.includes("varchar")) {116// NO support for array or varchar schema changes (even detecting)!117continue;118}119await alterColumnOfTable(db, schema, "alter", column);120}121}122}123124async function getCurrentIndexes(125db: Client,126table: string,127): Promise<Set<string>> {128const { rows } = await db.query(129"SELECT c.relname AS name FROM pg_class AS a JOIN pg_index AS b ON (a.oid = b.indrelid) JOIN pg_class AS c ON (c.oid = b.indexrelid) WHERE a.relname=$1",130[table],131);132133const curIndexes = new Set<string>([]);134for (const { name } of rows) {135curIndexes.add(name);136}137138return curIndexes;139}140141// There is also code in database/postgres/schema/indexes.ts that creates indexes.142143async function updateIndex(144db: Client,145table: string,146action: "create" | "delete",147name: string,148query?: string,149unique?: boolean,150): Promise<void> {151log.debug("updateIndex", { table, action, name });152if (action == "create") {153// ATTN if you consider adding CONCURRENTLY to create index, read the note earlier above about this154await db.query(`CREATE ${unique ? "UNIQUE" : ""} INDEX ${name} ON ${table} ${query}`);155} else if (action == "delete") {156await db.query(`DROP INDEX ${name}`);157} else {158// typescript would catch this, but just in case:159throw Error(`BUG: unknown action ${name}`);160}161}162163async function syncTableSchemaIndexes(164db: Client,165schema: TableSchema,166): Promise<void> {167const dbg = (...args) =>168log.debug("syncTableSchemaIndexes", "table = ", schema.name, ...args);169dbg();170171const curIndexes = await getCurrentIndexes(db, schema.name);172dbg("curIndexes", curIndexes);173174// these are the indexes we are supposed to have175176const goalIndexes = createIndexesQueries(schema);177dbg("goalIndexes", goalIndexes);178const goalIndexNames = new Set<string>();179for (const x of goalIndexes) {180goalIndexNames.add(x.name);181if (!curIndexes.has(x.name)) {182await updateIndex(db, schema.name, "create", x.name, x.query, x.unique);183}184}185for (const name of curIndexes) {186// only delete indexes that end with _idx; don't want to delete, e.g., pkey primary key indexes.187if (name.endsWith("_idx") && !goalIndexNames.has(name)) {188await updateIndex(db, schema.name, "delete", name);189}190}191}192193// Names of all tables owned by the current user.194async function getAllTables(db: Client): Promise<Set<string>> {195const { rows } = await db.query(196"SELECT tablename FROM pg_tables WHERE tableowner = current_user",197);198const v = new Set<string>();199for (const { tablename } of rows) {200v.add(tablename);201}202return v;203}204205// Determine names of all tables that are in our schema but not in the206// actual database.207function getMissingTables(208dbSchema: DBSchema,209allTables: Set<string>,210): Set<string> {211const missing = new Set<string>();212for (const table in dbSchema) {213const s = dbSchema[table];214if (215!allTables.has(table) &&216!s.virtual &&217!s.external &&218s.durability != "ephemeral"219) {220missing.add(table);221}222}223return missing;224}225226export async function syncSchema(227dbSchema: DBSchema = SCHEMA,228role?: string,229): Promise<void> {230const dbg = (...args) => log.debug("syncSchema", { role }, ...args);231dbg();232233// We use a single connection for the schema update so that it's possible234// to set the role for that connection without causing any side effects235// elsewhere.236const db = getClient();237try {238await db.connect();239if (role) {240// change to that user for the rest of this connection.241await db.query(`SET ROLE ${role}`);242}243dbg("dropping any deprecated tables");244await dropDeprecatedTables(db);245246const allTables = await getAllTables(db);247dbg("allTables", allTables);248249// Create from scratch any missing tables -- usually this creates all tables and250// indexes the first time around.251const missingTables = await getMissingTables(dbSchema, allTables);252dbg("missingTables", missingTables);253for (const table of missingTables) {254dbg("create missing table", table);255const schema = dbSchema[table];256if (schema == null) {257throw Error("BUG -- inconsistent schema");258}259await createTable(db, schema);260}261// For each table that already exists and is in the schema,262// ensure that the columns are correct,263// have the correct type, and all indexes exist.264for (const table of allTables) {265if (missingTables.has(table)) {266// already handled above -- we created this table just a moment ago267continue;268}269const schema = dbSchema[table];270if (schema == null || schema.external) {271// table not in our schema at all or managed externally -- ignore272continue;273}274// not newly created and in the schema so check if anything changed275dbg("sync existing table", table);276await syncTableSchema(db, schema);277}278} catch (err) {279dbg("FAILED to sync schema ", { role }, err);280throw err;281} finally {282db.end();283}284}285286async function syncTableSchemaPrimaryKeys(287db: Client,288schema: TableSchema,289): Promise<void> {290log.debug("syncTableSchemaPrimaryKeys", "table = ", schema.name);291const actualPrimaryKeys = (await getPrimaryKeys(db, schema.name)).sort();292const goalPrimaryKeys = primaryKeys(schema.name).sort();293if (isEqual(actualPrimaryKeys, goalPrimaryKeys)) {294return;295}296log.debug("syncTableSchemaPrimaryKeys", "table = ", schema.name, {297actualPrimaryKeys,298goalPrimaryKeys,299});300for (const key of goalPrimaryKeys) {301if (!actualPrimaryKeys.includes(key)) {302const defaultValue = schema.default_primary_key_value?.[key];303if (defaultValue == null) {304throw Error(305`must specify default_primary_key_value for '${schema.name}' and key='${key}'`,306);307} else {308await db.query(`update "${schema.name}" set "${key}"=$1`, [309defaultValue,310]);311}312}313}314await db.query(`315ALTER TABLE "${schema.name}" DROP CONSTRAINT ${schema.name}_pkey;316`);317await db.query(`318ALTER TABLE "${schema.name}" ADD PRIMARY KEY (${goalPrimaryKeys319.map((name) => `"${name}"`)320.join(",")})321`);322}323324async function getPrimaryKeys(db: Client, table: string): Promise<string[]> {325const { rows } = await db.query(`326SELECT a.attname as name327FROM pg_index i328JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)329WHERE i.indrelid = '${table}'::regclass330AND i.indisprimary331`);332return rows.map((row) => row.name);333}334335336