CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/util/db-schema/messages.ts
Views: 791
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
This is a table to support a simple messages system in cocalc, to support sending and replying to
8
messages between these three classes of entities:
9
10
- cocalc system
11
- projects
12
- users
13
14
A message has a subject and body.
15
16
When it is read can be set, and a message can also be saved for later.
17
18
That's it! This is meant to be just enough to support things liked:
19
20
- the system sending messages to users, e.g., reminds about notification
21
- a user replying and sending a message to the system (which admins could see).
22
- users sending messages to each other (and replying)
23
- users send message to system
24
- system send message to user
25
- project sending messages to users (e.g., about something happening)
26
27
For simplicity there are no tags or any extra metadata -- put that in the markdown
28
in the body of the message.
29
30
On purpose, all messages are sent/received in one central place in the UI, NOT associated
31
to particular files/directories/projects. Again, use links in the message for that.
32
*/
33
34
import { Table } from "./types";
35
import { ID } from "./crm";
36
import throttle from "@cocalc/util/api/throttle";
37
import { SCHEMA } from "./index";
38
import { isEqual } from "lodash";
39
import { isValidUUID } from "@cocalc/util/misc";
40
41
// make this a bit big initially -- we'll add a feature to "load more", hopefully before
42
// this limit is a problem
43
export const NUM_MESSAGES = 1000;
44
45
interface Message0 {
46
id: number;
47
from_id: string; // a uuid
48
to_ids: string[]; // array of uuid's
49
subject: string;
50
body: string;
51
// sent is only set after message is sent.
52
sent?: Date;
53
// used for replies
54
thread_id?: number;
55
56
// optionally included mainly in frontend code when working with a list of messages
57
// not in database!
58
index?: number;
59
}
60
61
export const NON_BITSET_FIELDS = [
62
"id",
63
"subject",
64
"body",
65
"from_id",
66
"to_ids",
67
"sent",
68
"thread_id",
69
];
70
71
export interface Message extends Message0 {
72
read?: string;
73
saved?: string;
74
starred?: string;
75
deleted?: string;
76
expire?: string;
77
}
78
79
export interface MessageMe extends Message0 {
80
read?: boolean;
81
saved?: boolean;
82
starred?: boolean;
83
liked?: boolean;
84
deleted?: boolean;
85
expire?: boolean;
86
}
87
88
export const MAX_LIMIT = 500;
89
90
export const BITSET_FIELDS = [
91
"read",
92
"saved",
93
"starred",
94
"liked",
95
"deleted",
96
"expire",
97
] as const;
98
99
export type BitSetField = (typeof BITSET_FIELDS)[number];
100
101
export function isBitSetField(x): x is BitSetField {
102
return typeof x == "string" && BITSET_FIELDS.includes(x as any);
103
}
104
105
export interface ApiMessagesGet {
106
account_id: string;
107
limit?: number;
108
offset?: number;
109
// received = all messages to you (the default)
110
// sent = all messages you sent
111
// new = to you and not read or saved -- these are what the counter counts
112
type?: "received" | "sent" | "new" | "starred" | "liked";
113
// strictly newer than cutoff
114
cutoff?: Date;
115
}
116
117
export function getBitPosition({
118
account_id,
119
to_ids,
120
from_id,
121
}: {
122
account_id: string;
123
to_ids;
124
from_id: string;
125
}): number {
126
const i = to_ids.indexOf(account_id);
127
if (i != -1) {
128
return i + 1;
129
}
130
if (account_id == from_id) {
131
return 0;
132
}
133
throw Error(
134
`invalid user -- ${account_id}, to_ids=${JSON.stringify(to_ids)}, from_id=${from_id}`,
135
);
136
}
137
138
Table({
139
name: "messages",
140
fields: {
141
id: ID,
142
sent: {
143
type: "timestamp",
144
desc: "When this message was actually sent. A draft is a message where sent has not yet been set.",
145
},
146
from_id: {
147
type: "uuid",
148
desc: "A project_id when from='project' and an account_id when from='account'. For type='system', haven't decided what this is yet (maybe some hardcoded uuid's for different components of the system?).",
149
not_null: true,
150
render: { type: "account" },
151
},
152
to_ids: {
153
type: "array",
154
pg_type: "UUID[]",
155
desc: "array of uuid's of account that the message is being sent to",
156
not_null: true,
157
render: { type: "accounts" },
158
},
159
subject: {
160
type: "string",
161
desc: "Subject of the message.",
162
not_null: true,
163
},
164
body: {
165
type: "string",
166
desc: "Body of the message (should be formatted as markdown).",
167
not_null: true,
168
},
169
thread_id: {
170
type: "number",
171
desc: "If this message is in a thread, this is the id of the root message.",
172
},
173
// The rest are status bitsets, with bit 0 corresponds to from_id, and bits 1 to n corresponding
174
// the users receiving the message, according to the ids in to_ids.
175
read: {
176
type: "string",
177
pg_type: "bit varying",
178
desc: "User read this message.",
179
},
180
saved: {
181
type: "string",
182
pg_type: "bit varying",
183
desc: "Users that saved this message for later (so no longer in inbox)",
184
},
185
starred: {
186
type: "string",
187
pg_type: "bit varying",
188
desc: "Users that starred this message so they can easily find it later",
189
},
190
liked: {
191
type: "string",
192
pg_type: "bit varying",
193
desc: "Users liked this message, indicate to sender and other uses that is good. Thumbs up.",
194
},
195
deleted: {
196
type: "string",
197
pg_type: "bit varying",
198
desc: "If user deleted this message (so in the trash).",
199
},
200
expire: {
201
type: "string",
202
pg_type: "bit varying",
203
desc: "User permanently deleted this message. ",
204
},
205
},
206
rules: {
207
primary_key: "id",
208
changefeed_keys: ["to_ids", "sent"],
209
pg_indexes: ["USING GIN (to_ids)", "sent"],
210
user_query: {
211
get: {
212
pg_where: [
213
{ "$::UUID = ANY(to_ids)": "account_id" },
214
"sent IS NOT null",
215
],
216
options: [{ order_by: "-id" }, { limit: NUM_MESSAGES }],
217
fields: {
218
id: null,
219
sent: null,
220
from_id: null,
221
to_ids: null,
222
subject: null,
223
body: null,
224
thread_id: null,
225
read: null,
226
saved: null,
227
starred: null,
228
liked: null,
229
deleted: null,
230
expire: null,
231
},
232
},
233
set: {
234
fields: {
235
id: true,
236
read: true,
237
saved: true,
238
starred: true,
239
liked: true,
240
deleted: true,
241
expire: true,
242
},
243
async instead_of_change(
244
database,
245
old_val,
246
new_val,
247
account_id,
248
cb,
249
): Promise<void> {
250
const client = database._client();
251
if (client == null) {
252
cb("database not connected -- try again later");
253
return;
254
}
255
if (old_val != null) {
256
// const dbg = database._dbg("messages:instead_of_change");
257
258
// It took me a long time to figure out that this is the way to flip bits without changing what is there, which
259
// we need to do in order avoid a race condition, where two users say both mark a message read at almost the
260
// same time, and they both write out 01 and 10 for the read bitset... with last write wins, the database would
261
// end up with either 01 or 10, and one person's value is lost. That's sill. With just directly changing *only*
262
// the user's bit, we always end up with 11. And this code illustrates how to change one bit. Here "20" is
263
// the number of users (so number of recipients + 1), and 3 is the position to flip (+1 since it is 1-indexed in postgres),
264
// and it's `'x'::bit(1),3+1` to set the bit to x (=0 or 1), i.e., 0 in this example:
265
//
266
// smc=# update messages set saved=overlay(coalesce(saved,'0'::bit(1))::bit(20) PLACING '0'::bit(1) FROM 3+1) where id=61; select saved from messages where id=61;
267
268
const ids = new_val.to_ids ?? old_val.to_ids ?? [];
269
const numUsers = ids.length + 1;
270
let userIndex = -1;
271
const setBit = (field: BitSetField, value: string) => {
272
if (userIndex == -1) {
273
// compute it first time, if needed
274
const n = ids.indexOf(account_id);
275
if (n == -1) {
276
throw Error(
277
"you do not have permission to edit this message",
278
);
279
}
280
userIndex = n + 1; // +1 to account for from_id
281
}
282
// ignore everything in value except the userIndex position.
283
const bit = value[userIndex] ?? "0";
284
if (bit != "0" && bit != "1") {
285
// be especially careful to avoid sql injection attack.
286
throw Error(`invalid bit '${bit}'`);
287
}
288
return `${field} = overlay(coalesce(${field},'0'::bit(1))::bit(${numUsers}) PLACING '${bit}'::bit(1) FROM ${userIndex}+1)`;
289
};
290
291
const v: string[] = [];
292
for (const field of BITSET_FIELDS) {
293
if (new_val[field] != null && new_val[field] != old_val[field]) {
294
v.push(setBit(field, new_val[field]));
295
}
296
}
297
298
if (v.length == 0) {
299
// nothing changed
300
cb();
301
return;
302
}
303
304
try {
305
const query = `UPDATE messages SET ${v.join(",")} WHERE $1=ANY(to_ids) AND id=$2`;
306
const params = [account_id, parseInt(old_val.id)];
307
await client.query(query, params);
308
await database.updateUnreadMessageCount({ account_id });
309
cb();
310
} catch (err) {
311
cb(`${err}`);
312
}
313
} else {
314
cb(`use the sent_messages table to create a new message`);
315
}
316
},
317
},
318
},
319
},
320
});
321
322
Table({
323
// this should be called "messages_from_me" because it also includes drafts that have not been sent yet
324
name: "sent_messages",
325
fields: SCHEMA.messages.fields,
326
rules: {
327
primary_key: SCHEMA.messages.primary_key,
328
changefeed_keys: ["from_id"],
329
virtual: "messages",
330
user_query: {
331
get: {
332
...SCHEMA.messages.user_query?.get!,
333
pg_where: [{ "from_id = $::UUID": "account_id" }],
334
},
335
set: {
336
fields: {
337
id: true,
338
to_ids: true,
339
subject: true,
340
body: true,
341
sent: true,
342
thread_id: true,
343
saved: true,
344
starred: true,
345
liked: true,
346
read: true,
347
deleted: true,
348
expire: true,
349
},
350
async instead_of_change(
351
database,
352
old_val,
353
new_val,
354
account_id,
355
cb,
356
): Promise<void> {
357
const client = database._client();
358
if (client == null) {
359
cb("database not connected -- try again later");
360
return;
361
}
362
if (old_val != null) {
363
try {
364
if (old_val.sent) {
365
// once a message is sent, the ONLY thing you can change are BITSET_FIELDS.
366
for (const field in new_val) {
367
// @ts-ignore
368
if (!BITSET_FIELDS.includes(field)) {
369
delete new_val[field];
370
}
371
}
372
// TODO: we might later have a notion of editing messages after they are sent, but this will
373
// be by adding one or more patches, so the edit history is clear.
374
}
375
if (
376
new_val.to_ids != null &&
377
!isEqual(new_val.to_ids, old_val.to_ids)
378
) {
379
await assertToIdsAreValid({ client, to_ids: new_val.to_ids });
380
}
381
382
const setBit = (field: BitSetField, value: string) => {
383
const numUsers =
384
1 + (new_val.to_ids ?? old_val.to_ids ?? []).length;
385
const bit = value[0] ?? "0";
386
if (bit != "0" && bit != "1") {
387
throw Error(`invalid bit '${bit}'`);
388
}
389
return `${field} = overlay(coalesce(${field},'0'::bit(1))::bit(${numUsers}) PLACING '${bit}'::bit(1) FROM 1)`;
390
};
391
const v: string[] = [];
392
for (const field of BITSET_FIELDS) {
393
if (
394
new_val[field] != null &&
395
new_val[field] != old_val[field]
396
) {
397
v.push(setBit(field, new_val[field]));
398
}
399
}
400
const bitsets = v.length == 0 ? "" : "," + v.join(",");
401
402
// user is allowed to change a lot about messages *from* them only.
403
// putting from_id in the query specifically as an extra security measure, so user can't change
404
// message with id they don't own.
405
const query = `UPDATE messages SET to_ids=$3,subject=$4,body=$5,sent=$6,thread_id=$7 ${bitsets} WHERE from_id=$1 AND id=$2`;
406
const params = [
407
account_id,
408
parseInt(old_val.id),
409
new_val.to_ids ?? old_val.to_ids,
410
new_val.subject ?? old_val.subject,
411
new_val.body ?? old_val.body,
412
new_val.sent ?? old_val.sent,
413
new_val.thread_id ?? old_val.thread_id,
414
];
415
await client.query(query, params);
416
const to_ids = new_val.to_ids ?? old_val.to_ids;
417
if (to_ids && (new_val.sent ?? old_val.sent)) {
418
for (const account_id of to_ids) {
419
await database.updateUnreadMessageCount({
420
account_id,
421
});
422
}
423
}
424
cb();
425
} catch (err) {
426
cb(`${err}`);
427
}
428
} else {
429
// create a new message:
430
cb("use the create_message virtual table to create messages");
431
}
432
},
433
},
434
},
435
},
436
});
437
438
async function assertToIdsAreValid({ client, to_ids }) {
439
const { rows } = await client.query(
440
"SELECT account_id FROM accounts WHERE account_id=ANY($1)",
441
[to_ids],
442
);
443
if (rows.length != to_ids.length) {
444
const exist = new Set(rows.map(({ account_id }) => account_id));
445
const missing = to_ids.filter((account_id) => !exist.has(account_id));
446
if (missing.length > 0) {
447
throw Error(
448
`every target account_id must exist -- these accounts do not exist: ${JSON.stringify(missing)}`,
449
);
450
}
451
}
452
}
453
454
// See comment in groups -- for create_groups.
455
Table({
456
name: "create_message",
457
rules: {
458
virtual: "messages",
459
primary_key: "id",
460
user_query: {
461
get: {
462
fields: {
463
id: null,
464
to_ids: null,
465
subject: null,
466
body: null,
467
sent: null,
468
thread_id: null,
469
},
470
async instead_of_query(database, opts, cb): Promise<void> {
471
try {
472
const { account_id } = opts;
473
throttle({
474
endpoint: "user_query-create_message",
475
account_id,
476
});
477
const client = database._client();
478
const query = opts.query ?? {};
479
const to_ids = Array.from(new Set(query.to_ids));
480
await assertToIdsAreValid({ client, to_ids });
481
const { rows } = await client.query(
482
`INSERT INTO messages(from_id,to_ids,subject,body,thread_id,sent)
483
VALUES($1::UUID,$2::UUID[],$3,$4,$5,$6) RETURNING *
484
`,
485
[
486
account_id,
487
to_ids,
488
opts.query.subject,
489
opts.query.body,
490
opts.query.thread_id,
491
opts.query.sent,
492
],
493
);
494
if (opts.query.sent) {
495
for (const account_id of to_ids) {
496
await database.updateUnreadMessageCount({
497
account_id,
498
});
499
}
500
}
501
cb(undefined, rows[0]);
502
} catch (err) {
503
cb(`${err}`);
504
}
505
},
506
},
507
},
508
},
509
fields: SCHEMA.groups.fields,
510
});
511
512
Table({
513
name: "crm_messages",
514
rules: {
515
virtual: "messages",
516
primary_key: "id",
517
user_query: {
518
get: {
519
admin: true, // only admins can do get queries on this table
520
fields: SCHEMA.messages.user_query?.get?.fields ?? {},
521
},
522
},
523
},
524
fields: SCHEMA.messages.fields,
525
});
526
527
// Helper function for database queries.
528
529
export function pgBitField(
530
field: BitSetField,
531
account_id: string,
532
as?: string,
533
) {
534
// be extra careful due to possibility of SQL injection.
535
if (!isBitSetField(field)) {
536
throw Error(`field ${field} must be a bitset field`);
537
}
538
if (!isValidUUID(account_id)) {
539
throw Error("account_id must be valid");
540
}
541
if (as == null) {
542
as = ` AS ${field}`;
543
} else if (as) {
544
as = ` AS ${as}`;
545
}
546
return `coalesce(substring(${field},array_position(to_ids,'${account_id}')+1,1),'0'::bit(1)) = '1'::bit(1) ${as}`;
547
}
548
549
export function pgBitFieldSelf(field: BitSetField, as?: string) {
550
// be extra careful due to possibility of SQL injection.
551
if (!isBitSetField(field)) {
552
throw Error(`field ${field} must be a bitset field`);
553
}
554
if (as == null) {
555
as = ` AS ${field}`;
556
} else if (as) {
557
as = ` AS ${as}`;
558
}
559
return `coalesce(substring(${field},1,1),'0'::bit(1)) = '1'::bit(1) ${as}`;
560
}
561
562