Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres-user-queries.coffee
5688 views
1
#########################################################################
2
# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
# License: MS-RSL – see LICENSE.md for details
4
#########################################################################
5
6
"""
7
User (and project) client queries
8
9
COPYRIGHT : (c) 2017 SageMath, Inc.
10
LICENSE : MS-RSL
11
"""
12
13
MAX_CHANGEFEEDS_PER_CLIENT = 2000
14
15
# Reject all patches that have timestamp that is more than 3 minutes in the future.
16
MAX_PATCH_FUTURE_MS = 1000*60*3
17
18
EventEmitter = require('events')
19
async = require('async')
20
lodash = require('lodash')
21
22
{one_result, all_results, count_result, pg_type, quote_field} = require('./postgres-base')
23
24
{UserQueryQueue} = require('./postgres-user-query-queue')
25
26
{defaults} = misc = require('@cocalc/util/misc')
27
required = defaults.required
28
29
{SCHEMA, OPERATORS, isToOperand} = require('@cocalc/util/schema')
30
{queryIsCmp, userGetQueryFilter} = require("./user-query/user-get-query")
31
32
{updateRetentionData} = require('./postgres/retention')
33
{sanitizeManageUsersOwnerOnly} = require('./postgres/project/manage-users-owner-only')
34
{sanitizeUserSetQueryProjectUsers} = require('./postgres/project/user-set-query-project-users')
35
36
{ checkProjectName } = require("@cocalc/util/db-schema/name-rules");
37
{callback2} = require('@cocalc/util/async-utils')
38
39
40
exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext
41
# Cancel all queued up queries by the given client
42
cancel_user_queries: (opts) =>
43
opts = defaults opts,
44
client_id : required
45
@_user_query_queue?.cancel_user_queries(opts)
46
47
user_query: (opts) =>
48
opts = defaults opts,
49
client_id : undefined # if given, uses to control number of queries at once by one client.
50
priority : undefined # (NOT IMPLEMENTED) priority for this query (an integer [-10,...,19] like in UNIX)
51
account_id : undefined
52
project_id : undefined
53
query : required
54
options : []
55
changes : undefined
56
cb : undefined
57
58
if opts.account_id?
59
# Check for "sudo" by admin to query as a different user, which is done by specifying
60
# options = [..., {account_id:'uuid'}, ...].
61
for x in opts.options
62
if x.account_id?
63
# Check user is an admin, then change opts.account_id
64
@get_account
65
columns : ['groups']
66
account_id : opts.account_id
67
cb : (err, r) =>
68
if err
69
opts.cb?(err)
70
else if r['groups']? and 'admin' in r['groups']
71
opts.account_id = x.account_id
72
opts.options = (y for y in opts.options when not y['account_id']?)
73
# now do query with new opts and options not including account_id sudo.
74
@user_query(opts)
75
else
76
opts.cb?('user must be admin to sudo')
77
return
78
79
if not opts.client_id?
80
# No client_id given, so do not use query queue.
81
delete opts.priority
82
delete opts.client_id
83
@_user_query(opts)
84
return
85
86
if not @_user_query_queue?
87
o =
88
do_query : @_user_query
89
dbg : @_dbg('user_query_queue')
90
concurrent : @concurrent
91
@_user_query_queue ?= new UserQueryQueue(o)
92
93
@_user_query_queue.user_query(opts)
94
95
_user_query: (opts) =>
96
opts = defaults opts,
97
account_id : undefined
98
project_id : undefined
99
query : required
100
options : [] # used for initial query; **IGNORED** by changefeed!;
101
# - Use [{set:true}] or [{set:false}] to force get or set query
102
# - For a set query, use {delete:true} to delete instead of set. This is the only way
103
# to delete a record, and won't work unless delete:true is set in the schema
104
# for the table to explicitly allow deleting.
105
changes : undefined # id of change feed
106
cb : undefined # cb(err, result) # WARNING -- this *will* get called multiple times when changes is true!
107
id = misc.uuid().slice(0,6)
108
dbg = @_dbg("_user_query(id=#{id})")
109
dbg(misc.to_json(opts.query))
110
if misc.is_array(opts.query)
111
dbg('array query instead')
112
@_user_query_array(opts)
113
return
114
115
subs =
116
'{account_id}' : opts.account_id
117
'{project_id}' : opts.project_id
118
'{now}' : new Date()
119
120
if opts.changes?
121
changes =
122
id : opts.changes
123
cb : opts.cb
124
125
v = misc.keys(opts.query)
126
if v.length > 1
127
dbg("FATAL no key")
128
opts.cb?('FATAL: must specify exactly one key in the query')
129
return
130
table = v[0]
131
query = opts.query[table]
132
if misc.is_array(query)
133
if query.length > 1
134
dbg("FATAL not implemented")
135
opts.cb?("FATAL: array of length > 1 not yet implemented")
136
return
137
multi = true
138
query = query[0]
139
else
140
multi = false
141
is_set_query = undefined
142
if opts.options?
143
if not misc.is_array(opts.options)
144
dbg("FATAL options")
145
opts.cb?("FATAL: options (=#{misc.to_json(opts.options)}) must be an array")
146
return
147
for x in opts.options
148
if x.set?
149
is_set_query = !!x.set
150
options = (x for x in opts.options when not x.set?)
151
else
152
options = []
153
154
if misc.is_object(query)
155
query = misc.deep_copy(query)
156
misc.obj_key_subs(query, subs)
157
if not is_set_query?
158
is_set_query = not misc.has_null_leaf(query)
159
if is_set_query
160
dbg("do a set query")
161
if changes
162
dbg("FATAL: changefeed")
163
opts.cb?("FATAL: changefeeds only for read queries")
164
return
165
if not opts.account_id? and not opts.project_id?
166
dbg("FATAL: anon set")
167
opts.cb?("FATAL: no anonymous set queries")
168
return
169
dbg("user_set_query")
170
@user_set_query
171
account_id : opts.account_id
172
project_id : opts.project_id
173
table : table
174
query : query
175
options : opts.options
176
cb : (err, x) =>
177
dbg("returned #{err}")
178
opts.cb?(err, {"#{table}":x})
179
else
180
# do a get query
181
if changes and not multi
182
dbg("FATAL: changefeed multi")
183
opts.cb?("FATAL: changefeeds only implemented for multi-document queries")
184
return
185
186
if changes
187
err = @_inc_changefeed_count(opts.account_id, opts.project_id, table, changes.id)
188
if err
189
dbg("err changefeed count -- #{err}")
190
opts.cb?(err)
191
return
192
193
dbg("user_get_query")
194
@user_get_query
195
account_id : opts.account_id
196
project_id : opts.project_id
197
table : table
198
query : query
199
options : options
200
multi : multi
201
changes : changes
202
cb : (err, x) =>
203
dbg("returned #{err}")
204
if err and changes
205
# didn't actually make the changefeed, so don't count it.
206
@_dec_changefeed_count(changes.id, table)
207
opts.cb?(err, if not err then {"#{table}":x})
208
else
209
dbg("FATAL - invalid table")
210
opts.cb?("FATAL: invalid user_query of '#{table}' -- query must be an object")
211
212
###
213
TRACK CHANGEFEED COUNTS
214
215
_inc and dec below are evidently broken, in that it's CRITICAL that they match up exactly, or users will be
216
locked out until they just happen to switch to another hub with different tracking, which is silly.
217
218
TODO: DISABLED FOR NOW!
219
###
220
221
# Increment a count of the number of changefeeds by a given client so we can cap it.
222
_inc_changefeed_count: (account_id, project_id, table, changefeed_id) =>
223
return
224
client_name = "#{account_id}-#{project_id}"
225
cnt = @_user_get_changefeed_counts ?= {}
226
ids = @_user_get_changefeed_id_to_user ?= {}
227
if not cnt[client_name]?
228
cnt[client_name] = 1
229
else if cnt[client_name] >= MAX_CHANGEFEEDS_PER_CLIENT
230
return "user may create at most #{MAX_CHANGEFEEDS_PER_CLIENT} changefeeds; please close files, refresh browser, restart project"
231
else
232
# increment before successfully making get_query to prevent huge bursts causing trouble!
233
cnt[client_name] += 1
234
@_dbg("_inc_changefeed_count(table='#{table}')")("{#{client_name}:#{cnt[client_name]} ...}")
235
ids[changefeed_id] = client_name
236
return false
237
238
# Corresponding decrement of count of the number of changefeeds by a given client.
239
_dec_changefeed_count: (id, table) =>
240
return
241
client_name = @_user_get_changefeed_id_to_user[id]
242
if client_name?
243
@_user_get_changefeed_counts?[client_name] -= 1
244
delete @_user_get_changefeed_id_to_user[id]
245
cnt = @_user_get_changefeed_counts
246
if table?
247
t = "(table='#{table}')"
248
else
249
t = ""
250
@_dbg("_dec_changefeed_count#{t}")("counts={#{client_name}:#{cnt[client_name]} ...}")
251
252
# Handle user_query when opts.query is an array. opts below are as for user_query.
253
_user_query_array: (opts) =>
254
if opts.changes and opts.query.length > 1
255
opts.cb?("FATAL: changefeeds only implemented for single table")
256
return
257
result = []
258
f = (query, cb) =>
259
@user_query
260
account_id : opts.account_id
261
project_id : opts.project_id
262
query : query
263
options : opts.options
264
cb : (err, x) =>
265
result.push(x); cb(err)
266
async.mapSeries(opts.query, f, (err) => opts.cb(err, result))
267
268
user_query_cancel_changefeed: (opts) =>
269
opts = defaults opts,
270
id : required
271
cb : undefined # not really asynchronous
272
dbg = @_dbg("user_query_cancel_changefeed(id='#{opts.id}')")
273
feed = @_changefeeds?[opts.id]
274
if feed?
275
dbg("actually canceling feed")
276
@_dec_changefeed_count(opts.id)
277
delete @_changefeeds[opts.id]
278
feed.close()
279
else
280
dbg("already canceled before (no such feed)")
281
opts.cb?()
282
283
_user_get_query_columns: (query, remove_from_query) =>
284
v = misc.keys(query)
285
if remove_from_query?
286
# If remove_from_query is specified it should be an array of strings
287
# and we do not includes these in what is returned.
288
v = lodash.difference(v, remove_from_query)
289
return v
290
291
_require_is_admin: (account_id, cb) =>
292
if not account_id?
293
cb("FATAL: user must be an admin")
294
return
295
@is_admin
296
account_id : account_id
297
cb : (err, is_admin) =>
298
if err
299
cb(err)
300
else if not is_admin
301
cb("FATAL: user must be an admin")
302
else
303
cb()
304
305
# Ensure that each project_id in project_ids is such that the account is in one of the given
306
# groups for the project, or that the account is an admin. If not, cb(err).
307
_require_project_ids_in_groups: (account_id, project_ids, groups, cb) =>
308
s = {"#{account_id}": true}
309
require_admin = false
310
@_query
311
query : "SELECT project_id, users#>'{#{account_id}}' AS user FROM projects"
312
where : "project_id = ANY($)":project_ids
313
cache : true
314
cb : all_results (err, x) =>
315
if err
316
cb(err)
317
else
318
known_project_ids = {} # we use this to ensure that each of the given project_ids exists.
319
for p in x
320
known_project_ids[p.project_id] = true
321
if p.user?.group not in groups
322
require_admin = true
323
# If any of the project_ids don't exist, reject the query.
324
for project_id in project_ids
325
if not known_project_ids[project_id]
326
cb("FATAL: unknown project_id '#{misc.trunc(project_id,100)}'")
327
return
328
if require_admin
329
@_require_is_admin(account_id, cb)
330
else
331
cb()
332
333
_query_parse_options: (options) =>
334
r = {}
335
for x in options
336
for name, value of x
337
switch name
338
when 'only_changes'
339
r.only_changes = !!value
340
when 'limit'
341
r.limit = parseInt(value)
342
when 'slice'
343
r.slice = value
344
when 'order_by'
345
if value[0] == '-'
346
value = value.slice(1) + " DESC "
347
if r.order_by
348
r.order_by = r.order_by + ', ' + value
349
else
350
r.order_by = value
351
when 'delete'
352
null
353
# ignore delete here - is parsed elsewhere
354
when 'heartbeat'
355
@_dbg("_query_parse_options")("TODO/WARNING -- ignoring heartbeat option from old client")
356
else
357
r.err = "unknown option '#{name}'"
358
# Guard rails: no matter what, all queries are capped with a limit of 100000.
359
# TODO: If somehow somebody has, e.g., more than 100K projects, or maybe more
360
# than 100K edits of a single file, they could hit this and not realize it. I
361
# had this set at 1000 for a few minutes and it caused me to randomly not have
362
# some of my projects.
363
MAX_LIMIT = 100000
364
try
365
if not isFinite(r.limit)
366
r.limit = MAX_LIMIT
367
else if r.limit > MAX_LIMIT
368
r.limit = MAX_LIMIT
369
catch
370
r.limit = MAX_LIMIT
371
return r
372
373
###
374
SET QUERIES
375
###
376
_parse_set_query_opts: (opts) =>
377
r = {}
378
379
if opts.project_id?
380
dbg = r.dbg = @_dbg("user_set_query(project_id='#{opts.project_id}', table='#{opts.table}')")
381
else if opts.account_id?
382
dbg = r.dbg = @_dbg("user_set_query(account_id='#{opts.account_id}', table='#{opts.table}')")
383
else
384
return {err:"FATAL: account_id or project_id must be specified to set query on table='#{opts.table}'"}
385
386
if not SCHEMA[opts.table]?
387
return {err:"FATAL: table '#{opts.table}' does not exist"}
388
389
dbg(misc.to_json(opts.query))
390
391
if opts.options
392
dbg("options=#{misc.to_json(opts.options)}")
393
394
r.query = misc.copy(opts.query)
395
r.table = opts.table
396
r.db_table = SCHEMA[opts.table].virtual ? opts.table
397
r.account_id = opts.account_id
398
r.project_id = opts.project_id
399
400
s = SCHEMA[opts.table]
401
402
if opts.account_id?
403
r.client_query = s?.user_query
404
else
405
r.client_query = s?.project_query
406
407
if not r.client_query?.set?.fields?
408
return {err:"FATAL: user set queries not allowed for table '#{opts.table}'"}
409
410
if not @_mod_fields(opts.query, r.client_query)
411
dbg("shortcut -- no fields will be modified, so nothing to do")
412
return
413
414
for field in misc.keys(r.client_query.set.fields)
415
if r.client_query.set.fields[field] == undefined
416
return {err: "FATAL: user set query not allowed for #{opts.table}.#{field}"}
417
val = r.client_query.set.fields[field]
418
419
if typeof(val) == 'function'
420
try
421
r.query[field] = val(r.query, @)
422
catch err
423
return {err:"FATAL: error setting '#{field}' -- #{err}"}
424
else
425
switch val
426
when 'account_id'
427
if not r.account_id?
428
return {err: "FATAL: account_id must be specified -- make sure you are signed in"}
429
r.query[field] = r.account_id
430
when 'project_id'
431
if not r.project_id?
432
return {err: "FATAL: project_id must be specified"}
433
r.query[field] = r.project_id
434
when 'time_id'
435
r.query[field] = uuid.v1()
436
when 'project_write'
437
if not r.query[field]?
438
return {err: "FATAL: must specify #{opts.table}.#{field}"}
439
r.require_project_ids_write_access = [r.query[field]]
440
when 'project_owner'
441
if not r.query[field]?
442
return {err:"FATAL: must specify #{opts.table}.#{field}"}
443
r.require_project_ids_owner = [r.query[field]]
444
445
if r.client_query.set.admin
446
r.require_admin = true
447
448
r.primary_keys = @_primary_keys(r.db_table)
449
450
r.json_fields = @_json_fields(r.db_table, r.query)
451
452
for k, v of r.query
453
if k in r.primary_keys
454
continue
455
if r.client_query?.set?.fields?[k] != undefined
456
continue
457
if s.admin_query?.set?.fields?[k] != undefined
458
r.require_admin = true
459
continue
460
return {err: "FATAL: changing #{r.table}.#{k} not allowed"}
461
462
# HOOKS which allow for running arbitrary code in response to
463
# user set queries. In each case, new_val below is only the part
464
# of the object that the user requested to change.
465
466
# 0. CHECK: Runs before doing any further processing; has callback, so this
467
# provides a generic way to quickly check whether or not this query is allowed
468
# for things that can't be done declaratively. The check_hook can also
469
# mutate the obj (the user query), e.g., to enforce limits on input size.
470
r.check_hook = r.client_query.set.check_hook
471
472
# 1. BEFORE: If before_change is set, it is called with input
473
# (database, old_val, new_val, account_id, cb)
474
# before the actual change to the database is made.
475
r.before_change_hook = r.client_query.set.before_change
476
477
# 2. INSTEAD OF: If instead_of_change is set, then instead_of_change_hook
478
# is called with input
479
# (database, old_val, new_val, account_id, cb)
480
# *instead* of actually doing the update/insert to
481
# the database. This makes it possible to run arbitrary
482
# code whenever the user does a certain type of set query.
483
# Obviously, if that code doesn't set the new_val in the
484
# database, then new_val won't be the new val.
485
r.instead_of_change_hook = r.client_query.set.instead_of_change
486
487
# 3. AFTER: If set, the on_change_hook is called with
488
# (database, old_val, new_val, account_id, cb)
489
# after everything the database has been modified.
490
r.on_change_hook = r.client_query.set.on_change
491
492
# 4. instead of query
493
r.instead_of_query = r.client_query.set.instead_of_query
494
495
#dbg("on_change_hook=#{on_change_hook?}, #{misc.to_json(misc.keys(client_query.set))}")
496
497
# Set the query options -- order doesn't matter for set queries (unlike for get), so we
498
# just merge the options into a single dictionary.
499
# NOTE: As I write this, there is just one supported option: {delete:true}.
500
r.options = {}
501
if r.client_query.set.options?
502
for x in r.client_query.set.options
503
for y, z of x
504
r.options[y] = z
505
if opts.options?
506
for x in opts.options
507
for y, z of x
508
r.options[y] = z
509
dbg("options = #{misc.to_json(r.options)}")
510
511
if r.options.delete and not r.client_query.set.delete
512
# delete option is set, but deletes aren't explicitly allowed on this table. ERROR.
513
return {err: "FATAL: delete from #{r.table} not allowed"}
514
515
return r
516
517
_user_set_query_enforce_requirements: (r, cb) =>
518
async.parallel([
519
(cb) =>
520
if r.require_admin
521
@_require_is_admin(r.account_id, cb)
522
else
523
cb()
524
(cb) =>
525
if r.require_project_ids_write_access?
526
if r.project_id?
527
err = undefined
528
for x in r.require_project_ids_write_access
529
if x != r.project_id
530
err = "FATAL: can only query same project"
531
break
532
cb(err)
533
else
534
@_require_project_ids_in_groups(r.account_id, r.require_project_ids_write_access,\
535
['owner', 'collaborator'], cb)
536
else
537
cb()
538
(cb) =>
539
if r.require_project_ids_owner?
540
@_require_project_ids_in_groups(r.account_id, r.require_project_ids_owner,\
541
['owner'], cb)
542
else
543
cb()
544
], cb)
545
546
_user_set_query_where: (r) =>
547
where = {}
548
for primary_key in @_primary_keys(r.db_table)
549
value = r.query[primary_key]
550
if SCHEMA[r.db_table].fields[primary_key].noCoerce
551
where["#{primary_key}=$"] = value
552
else
553
type = pg_type(SCHEMA[r.db_table].fields[primary_key])
554
if type == 'TIMESTAMP' and not misc.is_date(value)
555
# Javascript is better at parsing its own dates than PostgreSQL
556
# isNaN test so NOW(), etc. work still
557
x = new Date(value)
558
if not isNaN(x)
559
value = x
560
where["#{primary_key}=$::#{type}"] = value
561
return where
562
563
_user_set_query_values: (r) =>
564
values = {}
565
s = SCHEMA[r.db_table]
566
for key, value of r.query
567
type = pg_type(s?.fields?[key])
568
if value? and type? and not s?.fields?[key]?.noCoerce
569
if type == 'TIMESTAMP' and not misc.is_date(value)
570
# (as above) Javascript is better at parsing its own dates than PostgreSQL
571
x = new Date(value)
572
if not isNaN(x)
573
value = x
574
values["#{key}::#{type}"] = value
575
else
576
values[key] = value
577
return values
578
579
_user_set_query_hooks_prepare: (r, cb) =>
580
if r.on_change_hook? or r.before_change_hook? or r.instead_of_change_hook?
581
for primary_key in r.primary_keys
582
if not r.query[primary_key]?
583
# this is fine -- it just means the old_val isn't defined.
584
# this can happen, e.g., when creating a new object with a primary key that is a generated id.
585
cb()
586
return
587
# get the old value before changing it
588
# TODO: optimization -- can we restrict columns below?
589
@_query
590
query : "SELECT * FROM #{r.db_table}"
591
where : @_user_set_query_where(r)
592
cb : one_result (err, x) =>
593
r.old_val = x; cb(err)
594
else
595
cb()
596
597
_user_query_set_count: (r, cb) =>
598
@_query
599
query : "SELECT COUNT(*) FROM #{r.db_table}"
600
where : @_user_set_query_where(r)
601
cb : count_result(cb)
602
603
_user_query_set_delete: (r, cb) =>
604
@_query
605
query : "DELETE FROM #{r.db_table}"
606
where : @_user_set_query_where(r)
607
cb : cb
608
609
_user_set_query_conflict: (r) =>
610
return r.primary_keys
611
612
_user_query_set_upsert: (r, cb) =>
613
# r.dbg("_user_query_set_upsert #{JSON.stringify(r.query)}")
614
@_query
615
query : "INSERT INTO #{r.db_table}"
616
values : @_user_set_query_values(r)
617
conflict : @_user_set_query_conflict(r)
618
cb : cb
619
620
# Record is already in DB, so we update it:
621
# this function handles a case that involves both
622
# a jsonb_merge and an update.
623
_user_query_set_upsert_and_jsonb_merge: (r, cb) =>
624
jsonb_merge = {}
625
for k of r.json_fields
626
v = r.query[k]
627
if v?
628
jsonb_merge[k] = v
629
set = {}
630
for k, v of r.query
631
if k not in r.primary_keys and not jsonb_merge[k]?
632
set[k] = v
633
@_query
634
query : "UPDATE #{r.db_table}"
635
jsonb_merge : jsonb_merge
636
set : set
637
where : @_user_set_query_where(r)
638
cb : cb
639
640
_user_set_query_main_query: (r, cb) =>
641
r.dbg("_user_set_query_main_query")
642
643
if not r.client_query.set.allow_field_deletes
644
# allow_field_deletes not set, so remove any null/undefined
645
# fields from the query
646
for key of r.query
647
if not r.query[key]?
648
delete r.query[key]
649
650
if r.options.delete
651
for primary_key in r.primary_keys
652
if not r.query[primary_key]?
653
cb("FATAL: delete query must set primary key")
654
return
655
r.dbg("delete based on primary key")
656
@_user_query_set_delete(r, cb)
657
return
658
if r.instead_of_change_hook?
659
r.instead_of_change_hook(@, r.old_val, r.query, r.account_id, cb)
660
else
661
if misc.len(r.json_fields) == 0
662
# easy case -- there are no jsonb merge fields; just do an upsert.
663
@_user_query_set_upsert(r, cb)
664
return
665
# HARD CASE -- there are json_fields... so we are doing an insert
666
# if the object isn't already in the database, and an update
667
# if it is. This is ugly because I don't know how to do both
668
# a JSON merge as an upsert.
669
cnt = undefined # will equal number of records having the primary key (so 0 or 1)
670
async.series([
671
(cb) =>
672
@_user_query_set_count r, (err, n) =>
673
cnt = n; cb(err)
674
(cb) =>
675
r.dbg("do the set query")
676
if cnt == 0
677
# Just insert (do as upsert to avoid error in case of race)
678
@_user_query_set_upsert(r, cb)
679
else
680
# Do as an update -- record is definitely already in db since cnt > 0.
681
# This would fail in the unlikely (but possible) case that somebody deletes
682
# the record between the above count and when we do the UPDATE.
683
# Using a transaction could avoid this.
684
# Maybe such an error is reasonable and it's good to report it as such.
685
@_user_query_set_upsert_and_jsonb_merge(r, cb)
686
], cb)
687
688
user_set_query: (opts) =>
689
opts = defaults opts,
690
account_id : undefined
691
project_id : undefined
692
table : required
693
query : required
694
options : undefined # options=[{delete:true}] is the only supported nontrivial option here.
695
cb : required # cb(err)
696
697
# TODO: it would be nice to return the primary key part of the created object on creation.
698
# That's not implemented and will be somewhat nontrivial, and will use the RETURNING clause
699
# of postgres's INSERT - https://www.postgresql.org/docs/current/sql-insert.html
700
701
if @is_standby
702
opts.cb("set queries against standby not allowed")
703
return
704
r = @_parse_set_query_opts(opts)
705
706
# Only uncomment for debugging -- too big/verbose/dangerous
707
# r.dbg("parsed query opts = #{JSON.stringify(r)}")
708
709
if not r? # nothing to do
710
opts.cb()
711
return
712
if r.err
713
opts.cb(r.err)
714
return
715
716
async.series([
717
(cb) =>
718
@_user_set_query_enforce_requirements(r, cb)
719
(cb) =>
720
if r.check_hook?
721
r.check_hook(@, r.query, r.account_id, r.project_id, cb)
722
else
723
cb()
724
(cb) =>
725
@_user_set_query_hooks_prepare(r, cb)
726
(cb) =>
727
if r.before_change_hook?
728
r.before_change_hook @, r.old_val, r.query, r.account_id, (err, stop) =>
729
r.done = stop
730
cb(err)
731
else
732
cb()
733
(cb) =>
734
if r.done
735
cb()
736
return
737
if r.instead_of_query?
738
opts1 = misc.copy_without(opts, ['cb', 'changes', 'table'])
739
r.instead_of_query(@, opts1, cb)
740
else
741
@_user_set_query_main_query(r, cb)
742
(cb) =>
743
if r.done
744
cb()
745
return
746
if r.on_change_hook?
747
r.on_change_hook(@, r.old_val, r.query, r.account_id, cb)
748
else
749
cb()
750
], (err) => opts.cb(err))
751
752
# mod_fields counts the fields in query that might actually get modified
753
# in the database when we do the query; e.g., account_id won't since it gets
754
# filled in with the user's account_id, and project_write won't since it must
755
# refer to an existing project. We use mod_field **only** to skip doing
756
# no-op queries. It's just an optimization.
757
_mod_fields: (query, client_query) =>
758
for field in misc.keys(query)
759
if client_query.set.fields[field] not in ['account_id', 'project_write']
760
return true
761
return false
762
763
_user_get_query_json_timestamps: (obj, fields) =>
764
# obj is an object returned from the database via a query
765
# Postgres JSONB doesn't support timestamps, so we convert
766
# every json leaf node of obj that looks like JSON of a timestamp
767
# to a Javascript Date.
768
for k, v of obj
769
if fields[k]
770
obj[k] = misc.fix_json_dates(v, fields[k])
771
772
# fill in the default values for obj using the client_query spec.
773
_user_get_query_set_defaults: (client_query, obj, fields) =>
774
if not misc.is_array(obj)
775
obj = [obj]
776
else if obj.length == 0
777
return
778
s = client_query?.get?.fields ? {}
779
for k in fields
780
v = s[k]
781
if v?
782
# k is a field for which a default value (=v) is provided in the schema
783
for x in obj
784
# For each obj pulled from the database that is defined...
785
if x?
786
# We check to see if the field k was set on that object.
787
y = x[k]
788
if not y?
789
# It was NOT set, so we deep copy the default value for the field k.
790
x[k] = misc.deep_copy(v)
791
else if typeof(v) == 'object' and typeof(y) == 'object' and not misc.is_array(v)
792
# y *is* defined and is an object, so we merge in the provided defaults.
793
for k0, v0 of v
794
if not y[k0]?
795
y[k0] = v0
796
797
_user_set_query_project_users: (obj, account_id) =>
798
return sanitizeUserSetQueryProjectUsers(obj, account_id)
799
800
_user_set_query_project_manage_users_owner_only: (obj, account_id) =>
801
# This hook is called from the schema functional substitution to validate
802
# the manage_users_owner_only flag. This must be synchronous - async validation
803
# (permission checks) is done in the check_hook instead.
804
# Just do basic type validation and sanitization here
805
return sanitizeManageUsersOwnerOnly(obj.manage_users_owner_only)
806
807
project_action: (opts) =>
808
opts = defaults opts,
809
project_id : required
810
action_request : required # action is object {action:?, time:?}
811
cb : required
812
if opts.action_request.action == 'test'
813
# used for testing -- shouldn't trigger anything to happen.
814
opts.cb()
815
return
816
dbg = @_dbg("project_action(project_id='#{opts.project_id}',action_request=#{misc.to_json(opts.action_request)})")
817
dbg()
818
project = undefined
819
action_request = misc.copy(opts.action_request)
820
set_action_request = (cb) =>
821
dbg("set action_request to #{misc.to_json(action_request)}")
822
@_query
823
query : "UPDATE projects"
824
where : 'project_id = $::UUID':opts.project_id
825
jsonb_set : {action_request : action_request}
826
cb : cb
827
async.series([
828
(cb) =>
829
action_request.started = new Date()
830
set_action_request(cb)
831
(cb) =>
832
dbg("get project")
833
try
834
project = await @projectControl(opts.project_id)
835
cb()
836
catch err
837
cb(err)
838
(cb) =>
839
dbg("doing action")
840
try
841
switch action_request.action
842
when 'restart'
843
await project.restart()
844
when 'stop'
845
await project.stop()
846
when 'start'
847
await project.start()
848
else
849
throw Error("FATAL: action '#{opts.action_request.action}' not implemented")
850
cb()
851
catch err
852
cb(err)
853
], (err) =>
854
if err
855
action_request.err = err
856
action_request.finished = new Date()
857
dbg("finished!")
858
set_action_request(opts.cb)
859
)
860
861
# This hook is called *before* the user commits a change to a project in the database
862
# via a user set query.
863
# TODO: Add a pre-check here as well that total upgrade isn't going to be exceeded.
864
# This will avoid a possible subtle edge case if user is cheating and always somehow
865
# crashes server...?
866
_user_set_query_project_change_before: (old_val, new_val, account_id, cb) =>
867
#dbg = @_dbg("_user_set_query_project_change_before #{account_id}, #{misc.to_json(old_val)} --> #{misc.to_json(new_val)}")
868
# I've seen MASSIVE OUTPUT from this, e.g., when setting avatar.
869
dbg = @_dbg("_user_set_query_project_change_before #{account_id}")
870
dbg()
871
872
if new_val?.name and (new_val?.name != old_val?.name)
873
# Changing or setting the name of the project to something nontrivial.
874
try
875
checkProjectName(new_val.name);
876
catch err
877
cb(err.toString())
878
return
879
if new_val.name
880
# Setting name to something nontrivial, so we must check uniqueness
881
# among all projects this user owns.
882
result = await callback2 @_query,
883
query : 'SELECT COUNT(*) FROM projects'
884
where :
885
"users#>>'{#{account_id},group}' = $::TEXT" : 'owner'
886
"project_id != $::UUID" : new_val.project_id
887
"LOWER(name) = $::TEXT":new_val.name.toLowerCase()
888
if result.rows[0].count > 0
889
cb("There is already a project with the same owner as this project and name='#{new_val.name}'. Names are not case sensitive.")
890
return
891
# A second constraint is that only the project owner can change the project name.
892
result = await callback2 @_query,
893
query : 'SELECT COUNT(*) FROM projects'
894
where :
895
"users#>>'{#{account_id},group}' = $::TEXT" : 'owner'
896
"project_id = $::UUID" : new_val.project_id
897
if result.rows[0].count == 0
898
cb("Only the owner of the project can currently change the project name.")
899
return
900
901
if new_val?.manage_users_owner_only? and new_val.manage_users_owner_only != old_val?.manage_users_owner_only
902
# Permission is enforced in the set-field interceptor; nothing to do here.
903
# Leaving this block for clarity and to avoid silent bypass if future callers
904
# modify manage_users_owner_only via another path.
905
dbg("manage_users_owner_only change requested")
906
907
if new_val?.action_request? and JSON.stringify(new_val.action_request.time) != JSON.stringify(old_val?.action_request?.time)
908
# Requesting an action, e.g., save, restart, etc.
909
dbg("action_request -- #{misc.to_json(new_val.action_request)}")
910
#
911
# WARNING: Above, we take the difference of times below, since != doesn't work as we want with
912
# separate Date objects, as it will say equal dates are not equal. Example:
913
# coffee> x = JSON.stringify(new Date()); {from_json}=require('misc'); a=from_json(x); b=from_json(x); [a!=b, a-b]
914
# [ true, 0 ]
915
916
# Launch the action -- success or failure communicated back to all clients through changes to state.
917
# Also, we don't have to worry about permissions here; that this function got called at all means
918
# the user has write access to the projects table entry with given project_id, which gives them permission
919
# to do any action with the project.
920
@project_action
921
project_id : new_val.project_id
922
action_request : misc.copy_with(new_val.action_request, ['action', 'time'])
923
cb : (err) =>
924
dbg("action_request #{misc.to_json(new_val.action_request)} completed -- #{err}")
925
# true means -- do nothing further. We don't want to the user to
926
# set this same thing since we already dealt with it properly.
927
cb(err, true)
928
return
929
930
if not new_val.users? # not changing users
931
cb(); return
932
old_val = old_val?.users ? {}
933
new_val = new_val?.users ? {}
934
for id in misc.keys(old_val).concat(new_val)
935
if account_id != id
936
# make sure user doesn't change anybody else's allocation
937
if not lodash.isEqual(old_val?[id]?.upgrades, new_val?[id]?.upgrades)
938
err = "FATAL: user '#{account_id}' tried to change user '#{id}' allocation toward a project"
939
dbg(err)
940
cb(err)
941
return
942
cb()
943
944
# This hook is called *after* the user commits a change to a project in the database
945
# via a user set query. It could undo changes the user isn't allowed to make, which
946
# might require doing various async calls, or take actions (e.g., setting quotas,
947
# starting projects, etc.).
948
_user_set_query_project_change_after: (old_val, new_val, account_id, cb) =>
949
dbg = @_dbg("_user_set_query_project_change_after #{account_id}, #{misc.to_json(old_val)} --> #{misc.to_json(new_val)}")
950
dbg()
951
old_upgrades = old_val.users?[account_id]?.upgrades
952
new_upgrades = new_val.users?[account_id]?.upgrades
953
if new_upgrades? and not lodash.isEqual(old_upgrades, new_upgrades)
954
dbg("upgrades changed for #{account_id} from #{misc.to_json(old_upgrades)} to #{misc.to_json(new_upgrades)}")
955
project = undefined
956
async.series([
957
(cb) =>
958
@ensure_user_project_upgrades_are_valid
959
account_id : account_id
960
cb : cb
961
(cb) =>
962
if not @projectControl?
963
cb()
964
else
965
dbg("get project")
966
try
967
project = await @projectControl(new_val.project_id)
968
cb()
969
catch err
970
cb(err)
971
(cb) =>
972
if not project?
973
cb()
974
else
975
dbg("determine total quotas and apply")
976
try
977
await project.setAllQuotas()
978
cb()
979
catch err
980
cb(err)
981
], cb)
982
else
983
cb()
984
985
###
986
GET QUERIES
987
###
988
989
# Make any functional substitutions defined by the schema.
990
# This may mutate query in place.
991
_user_get_query_functional_subs: (query, fields) =>
992
if fields?
993
for field, val of fields
994
if typeof(val) == 'function'
995
query[field] = val(query, @)
996
997
_parse_get_query_opts: (opts) =>
998
if opts.changes? and not opts.changes.cb?
999
return {err: "FATAL: user_get_query -- if opts.changes is specified, then opts.changes.cb must also be specified"}
1000
1001
r = {}
1002
# get data about user queries on this table
1003
if opts.project_id?
1004
r.client_query = SCHEMA[opts.table]?.project_query
1005
else
1006
r.client_query = SCHEMA[opts.table]?.user_query
1007
1008
if not r.client_query?.get?
1009
return {err: "FATAL: get queries not allowed for table '#{opts.table}'"}
1010
1011
if not opts.account_id? and not opts.project_id? and not SCHEMA[opts.table].anonymous
1012
return {err: "FATAL: anonymous get queries not allowed for table '#{opts.table}'"}
1013
1014
r.table = SCHEMA[opts.table].virtual ? opts.table
1015
1016
r.primary_keys = @_primary_keys(opts.table)
1017
1018
# Are only admins allowed any get access to this table?
1019
r.require_admin = !!r.client_query.get.admin
1020
1021
# Verify that all requested fields may be read by users
1022
for field in misc.keys(opts.query)
1023
if r.client_query.get.fields?[field] == undefined
1024
return {err: "FATAL: user get query not allowed for #{opts.table}.#{field}"}
1025
1026
# Functional substitutions defined by schema
1027
@_user_get_query_functional_subs(opts.query, r.client_query.get?.fields)
1028
1029
if r.client_query.get?.instead_of_query?
1030
return r
1031
1032
# Sanity check: make sure there is something in the query
1033
# that gets only things in this table that this user
1034
# is allowed to see, or at least a check_hook. This is not required
1035
# for admins.
1036
if not r.client_query.get.pg_where? and not r.client_query.get.check_hook? and not r.require_admin
1037
return {err: "FATAL: user get query not allowed for #{opts.table} (no getAll filter - pg_where or check_hook)"}
1038
1039
# Apply default options to the get query (don't impact changefeed)
1040
# The user can override these, e.g., if they were to want to explicitly increase a limit
1041
# to get more file use history.
1042
user_options = {}
1043
for x in opts.options
1044
for y, z of x
1045
user_options[y] = true
1046
1047
get_options = undefined
1048
if @is_heavily_loaded() and r.client_query.get.options_load?
1049
get_options = r.client_query.get.options_load
1050
else if r.client_query.get.options?
1051
get_options = r.client_query.get.options
1052
if get_options?
1053
# complicated since options is a list of {opt:val} !
1054
for x in get_options
1055
for y, z of x
1056
if not user_options[y]
1057
opts.options.push(x)
1058
break
1059
1060
r.json_fields = @_json_fields(opts.table, opts.query)
1061
return r
1062
1063
# _json_fields: map from field names to array of fields that should be parsed as timestamps
1064
# These keys of his map are also used by _user_query_set_upsert_and_jsonb_merge to determine
1065
# JSON deep merging for set queries.
1066
_json_fields: (table, query) =>
1067
json_fields = {}
1068
for field, info of SCHEMA[table].fields
1069
if (query[field]? or query[field] == null) and (info.type == 'map' or info.pg_type == 'JSONB')
1070
json_fields[field] = info.date ? []
1071
return json_fields
1072
1073
_user_get_query_where: (client_query, account_id, project_id, user_query, table, cb) =>
1074
dbg = @_dbg("_user_get_query_where")
1075
dbg()
1076
1077
pg_where = client_query.get.pg_where
1078
1079
if @is_heavily_loaded() and client_query.get.pg_where_load?
1080
# use a different query if load is heavy
1081
pg_where = client_query.get.pg_where_load
1082
1083
if not pg_where?
1084
pg_where = []
1085
if pg_where == 'projects'
1086
pg_where = ['projects']
1087
1088
if typeof(pg_where) == 'function'
1089
pg_where = pg_where(user_query, @)
1090
if not misc.is_array(pg_where)
1091
cb("FATAL: pg_where must be an array (of strings or objects)")
1092
return
1093
1094
# Do NOT mutate the schema itself!
1095
pg_where = misc.deep_copy(pg_where)
1096
1097
# expand 'projects' in query, depending on whether project_id is specified or not.
1098
# This is just a convenience to make the db schema simpler.
1099
for i in [0...pg_where.length]
1100
if pg_where[i] == 'projects'
1101
if user_query.project_id
1102
pg_where[i] = {"project_id = $::UUID" : 'project_id'}
1103
else
1104
pg_where[i] = {"project_id = ANY(select project_id from projects where users ? $::TEXT)" : 'account_id'}
1105
1106
# Now we fill in all the parametrized substitutions in the pg_where list.
1107
subs = {}
1108
for x in pg_where
1109
if misc.is_object(x)
1110
for _, value of x
1111
subs[value] = value
1112
1113
sub_value = (value, cb) =>
1114
switch value
1115
when 'account_id'
1116
if not account_id?
1117
cb('FATAL: account_id must be given')
1118
return
1119
subs[value] = account_id
1120
cb()
1121
when 'project_id'
1122
if project_id?
1123
subs[value] = project_id
1124
cb()
1125
else if not user_query.project_id
1126
cb("FATAL: must specify project_id")
1127
else if SCHEMA[table].anonymous
1128
subs[value] = user_query.project_id
1129
cb()
1130
else
1131
@user_is_in_project_group
1132
account_id : account_id
1133
project_id : user_query.project_id
1134
groups : ['owner', 'collaborator']
1135
cb : (err, in_group) =>
1136
if err
1137
cb(err)
1138
else if in_group
1139
subs[value] = user_query.project_id
1140
cb()
1141
else
1142
cb("FATAL: you do not have read access to this project -- account_id=#{account_id}, project_id_=#{project_id}")
1143
when 'project_id-public'
1144
if not user_query.project_id?
1145
cb("FATAL: must specify project_id")
1146
else
1147
if SCHEMA[table].anonymous
1148
@has_public_path
1149
project_id : user_query.project_id
1150
cb : (err, has_public_path) =>
1151
if err
1152
cb(err)
1153
else if not has_public_path
1154
cb("project does not have any public paths")
1155
else
1156
subs[value] = user_query.project_id
1157
cb()
1158
else
1159
cb("FATAL: table must allow anonymous queries")
1160
else
1161
cb()
1162
1163
async.map misc.keys(subs), sub_value, (err) =>
1164
if err
1165
cb(err)
1166
return
1167
for x in pg_where
1168
if misc.is_object(x)
1169
for key, value of x
1170
x[key] = subs[value]
1171
1172
# impose further restrictions (more where conditions)
1173
pg_where.push(userGetQueryFilter(user_query, client_query))
1174
1175
cb(undefined, pg_where)
1176
1177
_user_get_query_options: (options, multi, schema_options) =>
1178
r = {}
1179
1180
if schema_options?
1181
options = options.concat(schema_options)
1182
1183
# Parse option part of the query
1184
{limit, order_by, slice, only_changes, err} = @_query_parse_options(options)
1185
1186
if err
1187
return {err: err}
1188
if only_changes
1189
r.only_changes = true
1190
if limit?
1191
r.limit = limit
1192
else if not multi
1193
r.limit = 1
1194
if order_by?
1195
r.order_by = order_by
1196
if slice?
1197
return {err: "slice not implemented"}
1198
return r
1199
1200
_user_get_query_do_query: (query_opts, client_query, user_query, multi, json_fields, cb) =>
1201
query_opts.cb = all_results (err, x) =>
1202
if err
1203
cb(err)
1204
else
1205
if misc.len(json_fields) > 0
1206
# Convert timestamps to Date objects, if **explicitly** specified in the schema
1207
for obj in x
1208
@_user_get_query_json_timestamps(obj, json_fields)
1209
1210
if not multi
1211
x = x[0]
1212
# Fill in default values and remove null's
1213
@_user_get_query_set_defaults(client_query, x, misc.keys(user_query))
1214
# Get rid of undefined fields -- that's the default and wastes memory and bandwidth
1215
if x?
1216
for obj in x
1217
misc.map_mutate_out_undefined_and_null(obj)
1218
cb(undefined, x)
1219
@_query(query_opts)
1220
1221
_user_get_query_query: (table, user_query, remove_from_query) =>
1222
return "SELECT #{(quote_field(field) for field in @_user_get_query_columns(user_query, remove_from_query)).join(',')} FROM #{table}"
1223
1224
_user_get_query_satisfied_by_obj: (user_query, obj, possible_time_fields) =>
1225
#dbg = @_dbg("_user_get_query_satisfied_by_obj)
1226
#dbg(user_query, obj)
1227
for field, value of obj
1228
date_keys = possible_time_fields[field]
1229
if date_keys
1230
value = misc.fix_json_dates(value, date_keys)
1231
if (q = user_query[field])?
1232
if (op = queryIsCmp(q))
1233
#dbg(value:value, op: op, q:q)
1234
x = q[op]
1235
switch op
1236
when '=='
1237
if value != x
1238
return false
1239
when '!='
1240
if value == x
1241
return false
1242
when '>='
1243
if value < x
1244
return false
1245
when '<='
1246
if value > x
1247
return false
1248
when '>'
1249
if value <= x
1250
return false
1251
when '<'
1252
if value >= x
1253
return false
1254
else if value != q
1255
return false
1256
return true
1257
1258
_user_get_query_handle_field_deletes: (client_query, new_val) =>
1259
if client_query.get.allow_field_deletes
1260
# leave in the nulls that might be in new_val
1261
return
1262
# remove all nulls from new_val. Right now we
1263
# just can't support this due to default values.
1264
# TODO: completely get rid of default values (?) or
1265
# maybe figure out how to implement this. The symptom
1266
# of not doing this is a normal user will do things like
1267
# delete the users field of their projects. Not good.
1268
for key of new_val
1269
if not new_val[key]?
1270
delete new_val[key]
1271
1272
_user_get_query_changefeed: (changes, table, primary_keys, user_query,
1273
where, json_fields, account_id, client_query, orig_table, cb) =>
1274
dbg = @_dbg("_user_get_query_changefeed(table='#{table}')")
1275
dbg()
1276
# WARNING: always call changes.cb! Do not do something like f = changes.cb, then call f!!!!
1277
# This is because the value of changes.cb may be changed by the caller.
1278
if not misc.is_object(changes)
1279
cb("FATAL: changes must be an object with keys id and cb")
1280
return
1281
if not misc.is_valid_uuid_string(changes.id)
1282
cb("FATAL: changes.id must be a uuid")
1283
return
1284
if typeof(changes.cb) != 'function'
1285
cb("FATAL: changes.cb must be a function")
1286
return
1287
for primary_key in primary_keys
1288
if not user_query[primary_key]? and user_query[primary_key] != null
1289
cb("FATAL: changefeed MUST include primary key (='#{primary_key}') in query")
1290
return
1291
watch = []
1292
select = {}
1293
init_tracker = tracker = free_tracker = undefined
1294
possible_time_fields = misc.deep_copy(json_fields)
1295
feed = undefined
1296
1297
changefeed_keys = SCHEMA[orig_table]?.changefeed_keys ? SCHEMA[table]?.changefeed_keys ? []
1298
for field, val of user_query
1299
type = pg_type(SCHEMA[table]?.fields?[field])
1300
if type == 'TIMESTAMP'
1301
possible_time_fields[field] = 'all'
1302
if val == null and field not in primary_keys and field not in changefeed_keys
1303
watch.push(field)
1304
else
1305
select[field] = type
1306
1307
if misc.len(possible_time_fields) > 0
1308
# Convert (likely) timestamps to Date objects; fill in defaults for inserts
1309
process = (x) =>
1310
if not x?
1311
return
1312
if x.new_val?
1313
@_user_get_query_json_timestamps(x.new_val, possible_time_fields)
1314
if x.action == 'insert' # do not do this for delete or update actions!
1315
@_user_get_query_set_defaults(client_query, x.new_val, misc.keys(user_query))
1316
else if x.action == 'update'
1317
@_user_get_query_handle_field_deletes(client_query, x.new_val)
1318
if x.old_val?
1319
@_user_get_query_json_timestamps(x.old_val, possible_time_fields)
1320
else
1321
process = (x) =>
1322
if not x?
1323
return
1324
if x.new_val?
1325
if x.action == 'insert' # do not do this for delete or update actions!
1326
@_user_get_query_set_defaults(client_query, x.new_val, misc.keys(user_query))
1327
else if x.action == 'update'
1328
@_user_get_query_handle_field_deletes(client_query, x.new_val)
1329
1330
async.series([
1331
(cb) =>
1332
# check for alternative where test for changefeed.
1333
pg_changefeed = client_query?.get?.pg_changefeed
1334
if not pg_changefeed?
1335
cb(); return
1336
1337
if pg_changefeed == 'projects'
1338
tracker_add = (project_id) => feed.insert({project_id:project_id})
1339
tracker_remove = (project_id) => feed.delete({project_id:project_id})
1340
1341
# Any tracker error means this changefeed is now broken and
1342
# has to be recreated.
1343
tracker_error = () => changes.cb("tracker error - ${err}")
1344
1345
pg_changefeed = (db, account_id) =>
1346
where : (obj) =>
1347
# Check that this is a project we have read access to
1348
if not db._project_and_user_tracker?.get_projects(account_id)[obj.project_id]
1349
return false
1350
# Now check our actual query conditions on the object.
1351
# This would normally be done by the changefeed, but since
1352
# we are passing in a custom where, we have to do it.
1353
if not @_user_get_query_satisfied_by_obj(user_query, obj, possible_time_fields)
1354
return false
1355
return true
1356
1357
select : {'project_id':'UUID'}
1358
1359
init_tracker : (tracker) =>
1360
tracker.on "add_user_to_project-#{account_id}", tracker_add
1361
tracker.on "remove_user_from_project-#{account_id}", tracker_remove
1362
tracker.once 'error', tracker_error
1363
1364
1365
free_tracker : (tracker) =>
1366
dbg("freeing project tracker events")
1367
tracker.removeListener("add_user_to_project-#{account_id}", tracker_add)
1368
tracker.removeListener("remove_user_from_project-#{account_id}", tracker_remove)
1369
tracker.removeListener("error", tracker_error)
1370
1371
1372
else if pg_changefeed == 'news'
1373
pg_changefeed = ->
1374
where : (obj) ->
1375
if obj.date?
1376
date_obj = new Date(obj.date)
1377
# we send future news items to the frontend, but filter it based on the server time
1378
return date_obj >= misc.months_ago(3)
1379
else
1380
return true
1381
select : {id: 'SERIAL UNIQUE', date: 'TIMESTAMP'}
1382
1383
else if pg_changefeed == 'one-hour'
1384
pg_changefeed = ->
1385
where : (obj) ->
1386
if obj.time?
1387
return new Date(obj.time) >= misc.hours_ago(1)
1388
else
1389
return true
1390
select : {id:'UUID', time:'TIMESTAMP'}
1391
1392
else if pg_changefeed == 'five-minutes'
1393
pg_changefeed = ->
1394
where : (obj) ->
1395
if obj.time?
1396
return new Date(obj.time) >= misc.minutes_ago(5)
1397
else
1398
return true
1399
select : {id:'UUID', time:'TIMESTAMP'}
1400
1401
else if pg_changefeed == 'collaborators'
1402
if not account_id?
1403
cb("FATAL: account_id must be given")
1404
return
1405
tracker_add = (collab_id) => feed.insert({account_id:collab_id})
1406
tracker_remove = (collab_id) => feed.delete({account_id:collab_id})
1407
tracker_error = () => changes.cb("tracker error - ${err}")
1408
pg_changefeed = (db, account_id) ->
1409
shared_tracker = undefined
1410
where : (obj) -> # test of "is a collab with me"
1411
return shared_tracker.get_collabs(account_id)?[obj.account_id]
1412
init_tracker : (tracker) =>
1413
shared_tracker = tracker
1414
tracker.on "add_collaborator-#{account_id}", tracker_add
1415
tracker.on "remove_collaborator-#{account_id}", tracker_remove
1416
tracker.once 'error', tracker_error
1417
free_tracker : (tracker) =>
1418
dbg("freeing collab tracker events")
1419
tracker.removeListener("add_collaborator-#{account_id}", tracker_add)
1420
tracker.removeListener("remove_collaborator-#{account_id}", tracker_remove)
1421
tracker.removeListener("error", tracker_error)
1422
1423
1424
x = pg_changefeed(@, account_id)
1425
if x.init_tracker?
1426
init_tracker = x.init_tracker
1427
if x.free_tracker?
1428
free_tracker = x.free_tracker
1429
if x.select?
1430
for k, v of x.select
1431
select[k] = v
1432
1433
if x.where? or x.init_tracker?
1434
where = x.where
1435
if not account_id?
1436
cb()
1437
return
1438
# initialize user tracker is needed for where tests...
1439
@project_and_user_tracker cb : (err, _tracker) =>
1440
if err
1441
cb(err)
1442
else
1443
tracker = _tracker
1444
try
1445
await tracker.register(account_id)
1446
cb()
1447
catch err
1448
cb(err)
1449
else
1450
cb()
1451
(cb) =>
1452
@changefeed
1453
table : table
1454
select : select
1455
where : where
1456
watch : watch
1457
cb : (err, _feed) =>
1458
# there *is* a glboal variable feed that we set here:
1459
feed = _feed
1460
if err
1461
cb(err)
1462
return
1463
feed.on 'change', (x) ->
1464
process(x)
1465
changes.cb(undefined, x)
1466
feed.on 'close', ->
1467
changes.cb(undefined, {action:'close'})
1468
dbg("feed close")
1469
if tracker? and free_tracker?
1470
dbg("free_tracker")
1471
free_tracker(tracker)
1472
else
1473
dbg("do NOT free_tracker")
1474
feed.on 'error', (err) ->
1475
changes.cb("feed error - #{err}")
1476
@_changefeeds ?= {}
1477
@_changefeeds[changes.id] = feed
1478
init_tracker?(tracker)
1479
cb()
1480
], cb)
1481
1482
user_get_query: (opts) =>
1483
opts = defaults opts,
1484
account_id : undefined
1485
project_id : undefined
1486
table : required
1487
query : required
1488
multi : required
1489
options : required # used for initial query; **IGNORED** by changefeed,
1490
# which ensures that *something* is sent every n minutes, in case no
1491
# changes are coming out of the changefeed. This is an additional
1492
# measure in case the client somehow doesn't get a "this changefeed died" message.
1493
# Use [{delete:true}] to instead delete the selected records (must
1494
# have delete:true in schema).
1495
changes : undefined # {id:?, cb:?}
1496
cb : required # cb(err, result)
1497
###
1498
The general idea is that user get queries are of the form
1499
1500
SELECT [columns] FROM table WHERE [get_all] AND [further restrictions] LIMIT/slice
1501
1502
Using the whitelist rules specified in SCHEMA, we
1503
determine each of the above, then run the query.
1504
1505
If no error in query, and changes is a given uuid, set up a change
1506
feed that calls opts.cb on changes as well.
1507
###
1508
id = misc.uuid().slice(0,6)
1509
#dbg = @_dbg("user_get_query(id=#{id})")
1510
dbg = -> # Logging below is just too verbose, and turns out to not be useful...
1511
dbg("account_id='#{opts.account_id}', project_id='#{opts.project_id}', query=#{misc.to_json(opts.query)}, multi=#{opts.multi}, options=#{misc.to_json(opts.options)}, changes=#{misc.to_json(opts.changes)}")
1512
{err, table, client_query, require_admin, primary_keys, json_fields} = @_parse_get_query_opts(opts)
1513
1514
if err
1515
dbg("error parsing query opts -- #{err}")
1516
opts.cb(err)
1517
return
1518
1519
_query_opts = {} # this will be the input to the @_query command.
1520
locals =
1521
result : undefined
1522
changes_cb : undefined
1523
1524
async.series([
1525
(cb) =>
1526
if client_query.get.check_hook?
1527
dbg("do check hook")
1528
client_query.get.check_hook(@, opts.query, opts.account_id, opts.project_id, cb)
1529
else
1530
cb()
1531
(cb) =>
1532
if require_admin
1533
dbg('require admin')
1534
@_require_is_admin(opts.account_id, cb)
1535
else
1536
cb()
1537
(cb) =>
1538
# NOTE: _user_get_query_where may mutate opts.query (for 'null' params)
1539
# so it is important that this is called before @_user_get_query_query below.
1540
# See the TODO in userGetQueryFilter.
1541
dbg("get_query_where")
1542
@_user_get_query_where client_query, opts.account_id, opts.project_id, opts.query, opts.table, (err, where) =>
1543
_query_opts.where = where
1544
cb(err)
1545
(cb) =>
1546
if client_query.get.instead_of_query?
1547
cb();
1548
return
1549
_query_opts.query = @_user_get_query_query(table, opts.query, client_query.get.remove_from_query)
1550
x = @_user_get_query_options(opts.options, opts.multi, client_query.options)
1551
if x.err
1552
dbg("error in get_query_options, #{x.err}")
1553
cb(x.err)
1554
return
1555
misc.merge(_query_opts, x)
1556
1557
nestloop = SCHEMA[opts.table]?.pg_nestloop # true, false or undefined
1558
if typeof nestloop == 'boolean'
1559
val = if nestloop then 'on' else 'off'
1560
_query_opts.pg_params = {enable_nestloop : val}
1561
1562
indexscan = SCHEMA[opts.table]?.pg_indexscan # true, false or undefined
1563
if typeof indexscan == 'boolean'
1564
val = if indexscan then 'on' else 'off'
1565
_query_opts.pg_params = {enable_indexscan : val}
1566
1567
if opts.changes?
1568
locals.changes_cb = opts.changes.cb
1569
locals.changes_queue = []
1570
# see note about why we do the following at the bottom of this file
1571
opts.changes.cb = (err, obj) ->
1572
locals.changes_queue.push({err:err, obj:obj})
1573
dbg("getting changefeed")
1574
@_user_get_query_changefeed(opts.changes, table, primary_keys,
1575
opts.query, _query_opts.where, json_fields,
1576
opts.account_id, client_query, opts.table,
1577
cb)
1578
else
1579
cb()
1580
1581
(cb) =>
1582
if client_query.get.instead_of_query?
1583
if opts.changes?
1584
cb("changefeeds are not supported for querying this table")
1585
return
1586
# Custom version: instead of doing a full query, we instead
1587
# call a function and that's it.
1588
dbg("do instead_of_query instead")
1589
opts1 = misc.copy_without(opts, ['cb', 'changes', 'table'])
1590
client_query.get.instead_of_query @, opts1, (err, result) =>
1591
locals.result = result
1592
cb(err)
1593
return
1594
1595
if _query_opts.only_changes
1596
dbg("skipping query")
1597
locals.result = undefined
1598
cb()
1599
else
1600
dbg("finally doing query")
1601
@_user_get_query_do_query _query_opts, client_query, opts.query, opts.multi, json_fields, (err, result) =>
1602
if err
1603
cb(err)
1604
return
1605
locals.result = result
1606
cb()
1607
], (err) =>
1608
if err
1609
dbg("series failed -- err=#{err}")
1610
opts.cb(err)
1611
return
1612
dbg("series succeeded")
1613
opts.cb(undefined, locals.result)
1614
if opts.changes?
1615
dbg("sending change queue")
1616
opts.changes.cb = locals.changes_cb
1617
##dbg("sending queued #{JSON.stringify(locals.changes_queue)}")
1618
for {err, obj} in locals.changes_queue
1619
##dbg("sending queued changes #{JSON.stringify([err, obj])}")
1620
opts.changes.cb(err, obj)
1621
)
1622
1623
###
1624
Synchronized strings
1625
###
1626
_user_set_query_syncstring_change_after: (old_val, new_val, account_id, cb) =>
1627
dbg = @_dbg("_user_set_query_syncstring_change_after")
1628
cb() # return immediately -- stuff below can happen as side effect in the background.
1629
# Now do the following reactions to this syncstring change in the background:
1630
# 1. Awaken the relevant project.
1631
project_id = old_val?.project_id ? new_val?.project_id
1632
if project_id? and (new_val?.save?.state == 'requested' or (new_val?.last_active? and new_val?.last_active != old_val?.last_active))
1633
dbg("awakening project #{project_id}")
1634
awaken_project(@, project_id)
1635
1636
1637
# Verify that writing a patch is allowed.
1638
_user_set_query_patches_check: (obj, account_id, project_id, cb) =>
1639
# Reject any patch that is too new
1640
if obj.time - new Date() > MAX_PATCH_FUTURE_MS
1641
cb("clock") # this exact error is assumed in synctable!
1642
return
1643
# Write access
1644
@_syncstring_access_check(obj.string_id, account_id, project_id, cb)
1645
1646
# Verify that writing a patch is allowed.
1647
_user_get_query_patches_check: (obj, account_id, project_id, cb) =>
1648
# Write access (no notion of read only yet -- will be easy to add later)
1649
@_syncstring_access_check(obj.string_id, account_id, project_id, cb)
1650
1651
# Verify that writing a patch is allowed.
1652
_user_set_query_cursors_check: (obj, account_id, project_id, cb) =>
1653
@_syncstring_access_check(obj.string_id, account_id, project_id, cb)
1654
1655
# Verify that writing a patch is allowed.
1656
_user_get_query_cursors_check: (obj, account_id, project_id, cb) =>
1657
@_syncstring_access_check(obj.string_id, account_id, project_id, cb)
1658
1659
_syncstring_access_check: (string_id, account_id, project_id, cb) =>
1660
# Check that string_id is the id of a syncstring the given account_id or
1661
# project_id is allowed to write to. NOTE: We do not concern ourselves (for now at least)
1662
# with proof of identity (i.e., one user with full read/write access to a project
1663
# claiming they are another users of that SAME project), since our security model
1664
# is that any user of a project can edit anything there. In particular, the
1665
# synctable lets any user with write access to the project edit the users field.
1666
if string_id?.length != 40
1667
cb("FATAL: string_id (='#{string_id}') must be a string of length 40")
1668
return
1669
@_query
1670
query : "SELECT project_id FROM syncstrings"
1671
where : "string_id = $::CHAR(40)" : string_id
1672
cache : false # *MUST* leave as false (not true), since unfortunately, if this returns no, due to FATAL below this would break opening the file until cache clears.
1673
cb : one_result 'project_id', (err, x) =>
1674
if err
1675
cb(err)
1676
else if not x
1677
# There is no such syncstring with this id -- fail
1678
cb("FATAL: no such syncstring")
1679
else if account_id?
1680
# Attempt to read or write by a user browser client
1681
@_require_project_ids_in_groups(account_id, [x], ['owner', 'collaborator'], cb)
1682
else if project_id?
1683
# Attempt to read or write by a *project*
1684
if project_id == x
1685
cb()
1686
else
1687
cb("FATAL: project not allowed to write to syncstring in different project")
1688
1689
1690
# Check permissions for querying for syncstrings in a project
1691
_syncstrings_check: (obj, account_id, project_id, cb) =>
1692
#dbg = @dbg("_syncstrings_check")
1693
#dbg(misc.to_json([obj, account_id, project_id]))
1694
if not misc.is_valid_uuid_string(obj?.project_id)
1695
cb("FATAL: project_id (='#{obj?.project_id}') must be a valid uuid")
1696
return
1697
if project_id?
1698
if project_id == obj.project_id
1699
# The project can access its own syncstrings
1700
cb()
1701
else
1702
cb("FATAL: projects can only access their own syncstrings") # for now at least!
1703
return
1704
if account_id?
1705
# Access request by a client user
1706
@_require_project_ids_in_groups(account_id, [obj.project_id], ['owner', 'collaborator'], cb)
1707
else
1708
cb("FATAL: only users and projects can access syncstrings")
1709
1710
# Other functions that are needed to implement various use queries,
1711
# e.g., for virtual queries like file_use_times.
1712
# ASYNC FUNCTION with no callback.
1713
updateRetentionData: (opts) =>
1714
return await updateRetentionData(opts)
1715
1716
_last_awaken_time = {}
1717
awaken_project = (db, project_id, cb) ->
1718
# throttle so that this gets called *for a given project* at most once every 30s.
1719
now = new Date()
1720
if _last_awaken_time[project_id]? and now - _last_awaken_time[project_id] < 30000
1721
return
1722
_last_awaken_time[project_id] = now
1723
dbg = db._dbg("_awaken_project(project_id=#{project_id})")
1724
if not db.projectControl?
1725
dbg("skipping since no projectControl defined")
1726
return
1727
dbg("doing it...")
1728
async.series([
1729
(cb) ->
1730
try
1731
project = db.projectControl(project_id)
1732
await project.start()
1733
cb()
1734
catch err
1735
cb("error starting project = #{err}")
1736
(cb) ->
1737
if not db.ensure_connection_to_project?
1738
cb()
1739
return
1740
dbg("also make sure there is a connection from hub to project")
1741
# This is so the project can find out that the user wants to save a file (etc.)
1742
db.ensure_connection_to_project(project_id, cb)
1743
], (err) ->
1744
if err
1745
dbg("awaken project error -- #{err}")
1746
else
1747
dbg("success awakening project")
1748
cb?(err)
1749
)
1750
###
1751
Note about opts.changes.cb:
1752
1753
Regarding sync, what was happening I think is:
1754
- (a) https://github.com/sagemathinc/cocalc/blob/master/src/packages/hub/postgres-user-queries.coffee#L1384 starts sending changes
1755
- (b) https://github.com/sagemathinc/cocalc/blob/master/src/packages/hub/postgres-user-queries.coffee#L1393 sends the full table.
1756
1757
(a) could result in changes actually getting to the client before the table itself has been initialized. The client code assumes that it only gets changes *after* the table is initialized. The browser client seems to be smart enough that it detects this situation and resets itself, so the browser never gets messed up as a result.
1758
However, the project definitely does NOT do so well, and it can get messed up. Then it has a broken version of the table, missing some last minute change. It is broken until the project forgets about that table entirely, which is can be a pretty long time (or project restart).
1759
1760
My fix is to queue up those changes on the server, then only start sending them to the client **after** the (b) query is done. I tested this by using setTimeout to manually delay (b) for a few seconds, and fully seeing the "file won't save problem". The other approach would make it so clients are more robust against getting changes first. However, it would take a long time for all clients to update (restart all projects), and it's an annoying assumption to make in general -- we may have entirely new clients later and they could make the same bad assumptions about order...
1761
###
1762
1763