CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres/schema/sync.ts
Views: 687
1
import { getClient, Client } from "@cocalc/database/pool";
2
import type { DBSchema, TableSchema } from "./types";
3
import { quoteField } from "./util";
4
import { pgType } from "./pg-type";
5
import { createIndexesQueries } from "./indexes";
6
import { createTable } from "./table";
7
import getLogger from "@cocalc/backend/logger";
8
import { SCHEMA } from "@cocalc/util/schema";
9
import { dropDeprecatedTables } from "./drop-deprecated-tables";
10
import { primaryKeys } from "./table";
11
import { isEqual } from "lodash";
12
13
const log = getLogger("db:schema:sync");
14
15
async function syncTableSchema(db: Client, schema: TableSchema): Promise<void> {
16
const dbg = (...args) => log.debug("syncTableSchema", schema.name, ...args);
17
dbg();
18
if (schema.virtual) {
19
dbg("nothing to do -- table is virtual");
20
return;
21
}
22
await syncTableSchemaColumns(db, schema);
23
await syncTableSchemaIndexes(db, schema);
24
await syncTableSchemaPrimaryKeys(db, schema);
25
}
26
27
async function getColumnTypeInfo(
28
db: Client,
29
table: string,
30
): Promise<{ [column_name: string]: string }> {
31
// may from column to type info
32
const columns: { [column_name: string]: string } = {};
33
34
const { rows } = await db.query(
35
"SELECT column_name, data_type, character_maximum_length FROM information_schema.columns WHERE table_name=$1",
36
[table],
37
);
38
39
for (const y of rows) {
40
if (y.character_maximum_length) {
41
columns[y.column_name] = `varchar(${y.character_maximum_length})`;
42
} else {
43
columns[y.column_name] = y.data_type;
44
}
45
}
46
47
return columns;
48
}
49
50
async function alterColumnOfTable(
51
db: Client,
52
schema: TableSchema,
53
action: "alter" | "add",
54
column: string,
55
): Promise<void> {
56
// Note: changing column ordering is NOT supported in PostgreSQL, so
57
// it's critical to not depend on it!
58
// https://wiki.postgresql.org/wiki/Alter_column_position
59
const qTable = quoteField(schema.name);
60
61
const info = schema.fields[column];
62
if (info == null) throw Error(`invalid column ${column}`);
63
const col = quoteField(column);
64
const type = pgType(info);
65
let desc = type;
66
if (info.unique) {
67
desc += " UNIQUE";
68
}
69
if (info.pg_check) {
70
desc += " " + info.pg_check;
71
}
72
if (action == "alter") {
73
log.debug(
74
"alterColumnOfTable",
75
schema.name,
76
"alter this column's type:",
77
col,
78
);
79
await db.query(
80
`ALTER TABLE ${qTable} ALTER COLUMN ${col} TYPE ${desc} USING ${col}::${type}`,
81
);
82
} else if (action == "add") {
83
log.debug("alterColumnOfTable", schema.name, "add this column:", col);
84
await db.query(`ALTER TABLE ${qTable} ADD COLUMN ${col} ${desc}`);
85
} else {
86
throw Error(`unknown action '${action}`);
87
}
88
}
89
90
async function syncTableSchemaColumns(
91
db: Client,
92
schema: TableSchema,
93
): Promise<void> {
94
log.debug("syncTableSchemaColumns", "table = ", schema.name);
95
const columnTypeInfo = await getColumnTypeInfo(db, schema.name);
96
97
for (const column in schema.fields) {
98
const info = schema.fields[column];
99
let cur_type = columnTypeInfo[column]?.toLowerCase();
100
if (cur_type != null) {
101
cur_type = cur_type.split(" ")[0];
102
}
103
let goal_type = pgType(info).toLowerCase().split(" ")[0];
104
if (goal_type === "serial") {
105
// We can't do anything with this (or we could, but it's way too complicated).
106
continue;
107
}
108
if (goal_type.slice(0, 4) === "char") {
109
// we do NOT support changing between fixed length and variable length strength
110
goal_type = "var" + goal_type;
111
}
112
if (cur_type == null) {
113
// column is in our schema, but not in the actual database
114
await alterColumnOfTable(db, schema, "add", column);
115
} else if (cur_type !== goal_type) {
116
if (goal_type.includes("[]") || goal_type.includes("varchar")) {
117
// NO support for array or varchar schema changes (even detecting)!
118
continue;
119
}
120
await alterColumnOfTable(db, schema, "alter", column);
121
}
122
}
123
}
124
125
async function getCurrentIndexes(
126
db: Client,
127
table: string,
128
): Promise<Set<string>> {
129
const { rows } = await db.query(
130
"SELECT c.relname AS name FROM pg_class AS a JOIN pg_index AS b ON (a.oid = b.indrelid) JOIN pg_class AS c ON (c.oid = b.indexrelid) WHERE a.relname=$1",
131
[table],
132
);
133
134
const curIndexes = new Set<string>([]);
135
for (const { name } of rows) {
136
curIndexes.add(name);
137
}
138
139
return curIndexes;
140
}
141
142
// There is also code in database/postgres/schema/indexes.ts that creates indexes.
143
144
async function updateIndex(
145
db: Client,
146
table: string,
147
action: "create" | "delete",
148
name: string,
149
query?: string,
150
unique?: boolean,
151
): Promise<void> {
152
log.debug("updateIndex", { table, action, name });
153
if (action == "create") {
154
// ATTN if you consider adding CONCURRENTLY to create index, read the note earlier above about this
155
await db.query(`CREATE ${unique ? "UNIQUE" : ""} INDEX ${name} ON ${table} ${query}`);
156
} else if (action == "delete") {
157
await db.query(`DROP INDEX ${name}`);
158
} else {
159
// typescript would catch this, but just in case:
160
throw Error(`BUG: unknown action ${name}`);
161
}
162
}
163
164
async function syncTableSchemaIndexes(
165
db: Client,
166
schema: TableSchema,
167
): Promise<void> {
168
const dbg = (...args) =>
169
log.debug("syncTableSchemaIndexes", "table = ", schema.name, ...args);
170
dbg();
171
172
const curIndexes = await getCurrentIndexes(db, schema.name);
173
dbg("curIndexes", curIndexes);
174
175
// these are the indexes we are supposed to have
176
177
const goalIndexes = createIndexesQueries(schema);
178
dbg("goalIndexes", goalIndexes);
179
const goalIndexNames = new Set<string>();
180
for (const x of goalIndexes) {
181
goalIndexNames.add(x.name);
182
if (!curIndexes.has(x.name)) {
183
await updateIndex(db, schema.name, "create", x.name, x.query, x.unique);
184
}
185
}
186
for (const name of curIndexes) {
187
// only delete indexes that end with _idx; don't want to delete, e.g., pkey primary key indexes.
188
if (name.endsWith("_idx") && !goalIndexNames.has(name)) {
189
await updateIndex(db, schema.name, "delete", name);
190
}
191
}
192
}
193
194
// Names of all tables owned by the current user.
195
async function getAllTables(db: Client): Promise<Set<string>> {
196
const { rows } = await db.query(
197
"SELECT tablename FROM pg_tables WHERE tableowner = current_user",
198
);
199
const v = new Set<string>();
200
for (const { tablename } of rows) {
201
v.add(tablename);
202
}
203
return v;
204
}
205
206
// Determine names of all tables that are in our schema but not in the
207
// actual database.
208
function getMissingTables(
209
dbSchema: DBSchema,
210
allTables: Set<string>,
211
): Set<string> {
212
const missing = new Set<string>();
213
for (const table in dbSchema) {
214
const s = dbSchema[table];
215
if (
216
!allTables.has(table) &&
217
!s.virtual &&
218
!s.external &&
219
s.durability != "ephemeral"
220
) {
221
missing.add(table);
222
}
223
}
224
return missing;
225
}
226
227
export async function syncSchema(
228
dbSchema: DBSchema = SCHEMA,
229
role?: string,
230
): Promise<void> {
231
const dbg = (...args) => log.debug("syncSchema", { role }, ...args);
232
dbg();
233
234
// We use a single connection for the schema update so that it's possible
235
// to set the role for that connection without causing any side effects
236
// elsewhere.
237
const db = getClient();
238
try {
239
await db.connect();
240
if (role) {
241
// change to that user for the rest of this connection.
242
await db.query(`SET ROLE ${role}`);
243
}
244
dbg("dropping any deprecated tables");
245
await dropDeprecatedTables(db);
246
247
const allTables = await getAllTables(db);
248
dbg("allTables", allTables);
249
250
// Create from scratch any missing tables -- usually this creates all tables and
251
// indexes the first time around.
252
const missingTables = await getMissingTables(dbSchema, allTables);
253
dbg("missingTables", missingTables);
254
for (const table of missingTables) {
255
dbg("create missing table", table);
256
const schema = dbSchema[table];
257
if (schema == null) {
258
throw Error("BUG -- inconsistent schema");
259
}
260
await createTable(db, schema);
261
}
262
// For each table that already exists and is in the schema,
263
// ensure that the columns are correct,
264
// have the correct type, and all indexes exist.
265
for (const table of allTables) {
266
if (missingTables.has(table)) {
267
// already handled above -- we created this table just a moment ago
268
continue;
269
}
270
const schema = dbSchema[table];
271
if (schema == null || schema.external) {
272
// table not in our schema at all or managed externally -- ignore
273
continue;
274
}
275
// not newly created and in the schema so check if anything changed
276
dbg("sync existing table", table);
277
await syncTableSchema(db, schema);
278
}
279
} catch (err) {
280
dbg("FAILED to sync schema ", { role }, err);
281
throw err;
282
} finally {
283
db.end();
284
}
285
}
286
287
async function syncTableSchemaPrimaryKeys(
288
db: Client,
289
schema: TableSchema,
290
): Promise<void> {
291
log.debug("syncTableSchemaPrimaryKeys", "table = ", schema.name);
292
const actualPrimaryKeys = (await getPrimaryKeys(db, schema.name)).sort();
293
const goalPrimaryKeys = primaryKeys(schema.name).sort();
294
if (isEqual(actualPrimaryKeys, goalPrimaryKeys)) {
295
return;
296
}
297
log.debug("syncTableSchemaPrimaryKeys", "table = ", schema.name, {
298
actualPrimaryKeys,
299
goalPrimaryKeys,
300
});
301
for (const key of goalPrimaryKeys) {
302
if (!actualPrimaryKeys.includes(key)) {
303
const defaultValue = schema.default_primary_key_value?.[key];
304
if (defaultValue == null) {
305
throw Error(
306
`must specify default_primary_key_value for '${schema.name}' and key='${key}'`,
307
);
308
} else {
309
await db.query(`update "${schema.name}" set "${key}"=$1`, [
310
defaultValue,
311
]);
312
}
313
}
314
}
315
await db.query(`
316
ALTER TABLE "${schema.name}" DROP CONSTRAINT ${schema.name}_pkey;
317
`);
318
await db.query(`
319
ALTER TABLE "${schema.name}" ADD PRIMARY KEY (${goalPrimaryKeys
320
.map((name) => `"${name}"`)
321
.join(",")})
322
`);
323
}
324
325
async function getPrimaryKeys(db: Client, table: string): Promise<string[]> {
326
const { rows } = await db.query(`
327
SELECT a.attname as name
328
FROM pg_index i
329
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
330
WHERE i.indrelid = '${table}'::regclass
331
AND i.indisprimary
332
`);
333
return rows.map((row) => row.name);
334
}
335
336