CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres-user-queries.coffee
Views: 687
1
#########################################################################
2
# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
# License: MS-RSL – see LICENSE.md for details
4
#########################################################################
5
6
"""
7
User (and project) client queries
8
9
COPYRIGHT : (c) 2017 SageMath, Inc.
10
LICENSE : MS-RSL
11
"""
12
13
MAX_CHANGEFEEDS_PER_CLIENT = 2000
14
15
# Reject all patches that have timestamp that is more than 3 minutes in the future.
16
MAX_PATCH_FUTURE_MS = 1000*60*3
17
18
EventEmitter = require('events')
19
async = require('async')
20
lodash = require('lodash')
21
22
{one_result, all_results, count_result, pg_type, quote_field} = require('./postgres-base')
23
24
{UserQueryQueue} = require('./postgres-user-query-queue')
25
26
{defaults} = misc = require('@cocalc/util/misc')
27
required = defaults.required
28
29
{PROJECT_UPGRADES, SCHEMA, OPERATORS, isToOperand} = require('@cocalc/util/schema')
30
{queryIsCmp, userGetQueryFilter} = require("./user-query/user-get-query")
31
32
{file_use_times} = require('./postgres/file-use-times')
33
{updateRetentionData} = require('./postgres/retention')
34
35
{ checkProjectName } = require("@cocalc/util/db-schema/name-rules");
36
{callback2} = require('@cocalc/util/async-utils')
37
38
39
exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext
40
# Cancel all queued up queries by the given client
41
cancel_user_queries: (opts) =>
42
opts = defaults opts,
43
client_id : required
44
@_user_query_queue?.cancel_user_queries(opts)
45
46
user_query: (opts) =>
47
opts = defaults opts,
48
client_id : undefined # if given, uses to control number of queries at once by one client.
49
priority : undefined # (NOT IMPLEMENTED) priority for this query (an integer [-10,...,19] like in UNIX)
50
account_id : undefined
51
project_id : undefined
52
query : required
53
options : []
54
changes : undefined
55
cb : undefined
56
57
if opts.account_id?
58
# Check for "sudo" by admin to query as a different user, which is done by specifying
59
# options = [..., {account_id:'uuid'}, ...].
60
for x in opts.options
61
if x.account_id?
62
# Check user is an admin, then change opts.account_id
63
@get_account
64
columns : ['groups']
65
account_id : opts.account_id
66
cb : (err, r) =>
67
if err
68
opts.cb?(err)
69
else if r['groups']? and 'admin' in r['groups']
70
opts.account_id = x.account_id
71
opts.options = (y for y in opts.options when not y['account_id']?)
72
# now do query with new opts and options not including account_id sudo.
73
@user_query(opts)
74
else
75
opts.cb?('user must be admin to sudo')
76
return
77
78
if not opts.client_id?
79
# No client_id given, so do not use query queue.
80
delete opts.priority
81
delete opts.client_id
82
@_user_query(opts)
83
return
84
85
if not @_user_query_queue?
86
o =
87
do_query : @_user_query
88
dbg : @_dbg('user_query_queue')
89
concurrent : @concurrent
90
@_user_query_queue ?= new UserQueryQueue(o)
91
92
@_user_query_queue.user_query(opts)
93
94
_user_query: (opts) =>
95
opts = defaults opts,
96
account_id : undefined
97
project_id : undefined
98
query : required
99
options : [] # used for initial query; **IGNORED** by changefeed!;
100
# - Use [{set:true}] or [{set:false}] to force get or set query
101
# - For a set query, use {delete:true} to delete instead of set. This is the only way
102
# to delete a record, and won't work unless delete:true is set in the schema
103
# for the table to explicitly allow deleting.
104
changes : undefined # id of change feed
105
cb : undefined # cb(err, result) # WARNING -- this *will* get called multiple times when changes is true!
106
id = misc.uuid().slice(0,6)
107
dbg = @_dbg("_user_query(id=#{id})")
108
dbg(misc.to_json(opts.query))
109
if misc.is_array(opts.query)
110
dbg('array query instead')
111
@_user_query_array(opts)
112
return
113
114
subs =
115
'{account_id}' : opts.account_id
116
'{project_id}' : opts.project_id
117
'{now}' : new Date()
118
119
if opts.changes?
120
changes =
121
id : opts.changes
122
cb : opts.cb
123
124
v = misc.keys(opts.query)
125
if v.length > 1
126
dbg("FATAL no key")
127
opts.cb?('FATAL: must specify exactly one key in the query')
128
return
129
table = v[0]
130
query = opts.query[table]
131
if misc.is_array(query)
132
if query.length > 1
133
dbg("FATAL not implemented")
134
opts.cb?("FATAL: array of length > 1 not yet implemented")
135
return
136
multi = true
137
query = query[0]
138
else
139
multi = false
140
is_set_query = undefined
141
if opts.options?
142
if not misc.is_array(opts.options)
143
dbg("FATAL options")
144
opts.cb?("FATAL: options (=#{misc.to_json(opts.options)}) must be an array")
145
return
146
for x in opts.options
147
if x.set?
148
is_set_query = !!x.set
149
options = (x for x in opts.options when not x.set?)
150
else
151
options = []
152
153
if misc.is_object(query)
154
query = misc.deep_copy(query)
155
misc.obj_key_subs(query, subs)
156
if not is_set_query?
157
is_set_query = not misc.has_null_leaf(query)
158
if is_set_query
159
dbg("do a set query")
160
if changes
161
dbg("FATAL: changefeed")
162
opts.cb?("FATAL: changefeeds only for read queries")
163
return
164
if not opts.account_id? and not opts.project_id?
165
dbg("FATAL: anon set")
166
opts.cb?("FATAL: no anonymous set queries")
167
return
168
dbg("user_set_query")
169
@user_set_query
170
account_id : opts.account_id
171
project_id : opts.project_id
172
table : table
173
query : query
174
options : opts.options
175
cb : (err, x) =>
176
dbg("returned #{err}")
177
opts.cb?(err, {"#{table}":x})
178
else
179
# do a get query
180
if changes and not multi
181
dbg("FATAL: changefeed multi")
182
opts.cb?("FATAL: changefeeds only implemented for multi-document queries")
183
return
184
185
if changes
186
err = @_inc_changefeed_count(opts.account_id, opts.project_id, table, changes.id)
187
if err
188
dbg("err changefeed count -- #{err}")
189
opts.cb?(err)
190
return
191
192
dbg("user_get_query")
193
@user_get_query
194
account_id : opts.account_id
195
project_id : opts.project_id
196
table : table
197
query : query
198
options : options
199
multi : multi
200
changes : changes
201
cb : (err, x) =>
202
dbg("returned #{err}")
203
if err and changes
204
# didn't actually make the changefeed, so don't count it.
205
@_dec_changefeed_count(changes.id, table)
206
opts.cb?(err, if not err then {"#{table}":x})
207
else
208
dbg("FATAL - invalid table")
209
opts.cb?("FATAL: invalid user_query of '#{table}' -- query must be an object")
210
211
###
212
TRACK CHANGEFEED COUNTS
213
214
_inc and dec below are evidently broken, in that it's CRITICAL that they match up exactly, or users will be
215
locked out until they just happen to switch to another hub with different tracking, which is silly.
216
217
TODO: DISABLED FOR NOW!
218
###
219
220
# Increment a count of the number of changefeeds by a given client so we can cap it.
221
_inc_changefeed_count: (account_id, project_id, table, changefeed_id) =>
222
return
223
client_name = "#{account_id}-#{project_id}"
224
cnt = @_user_get_changefeed_counts ?= {}
225
ids = @_user_get_changefeed_id_to_user ?= {}
226
if not cnt[client_name]?
227
cnt[client_name] = 1
228
else if cnt[client_name] >= MAX_CHANGEFEEDS_PER_CLIENT
229
return "user may create at most #{MAX_CHANGEFEEDS_PER_CLIENT} changefeeds; please close files, refresh browser, restart project"
230
else
231
# increment before successfully making get_query to prevent huge bursts causing trouble!
232
cnt[client_name] += 1
233
@_dbg("_inc_changefeed_count(table='#{table}')")("{#{client_name}:#{cnt[client_name]} ...}")
234
ids[changefeed_id] = client_name
235
return false
236
237
# Corresponding decrement of count of the number of changefeeds by a given client.
238
_dec_changefeed_count: (id, table) =>
239
return
240
client_name = @_user_get_changefeed_id_to_user[id]
241
if client_name?
242
@_user_get_changefeed_counts?[client_name] -= 1
243
delete @_user_get_changefeed_id_to_user[id]
244
cnt = @_user_get_changefeed_counts
245
if table?
246
t = "(table='#{table}')"
247
else
248
t = ""
249
@_dbg("_dec_changefeed_count#{t}")("counts={#{client_name}:#{cnt[client_name]} ...}")
250
251
# Handle user_query when opts.query is an array. opts below are as for user_query.
252
_user_query_array: (opts) =>
253
if opts.changes and opts.query.length > 1
254
opts.cb?("FATAL: changefeeds only implemented for single table")
255
return
256
result = []
257
f = (query, cb) =>
258
@user_query
259
account_id : opts.account_id
260
project_id : opts.project_id
261
query : query
262
options : opts.options
263
cb : (err, x) =>
264
result.push(x); cb(err)
265
async.mapSeries(opts.query, f, (err) => opts.cb(err, result))
266
267
user_query_cancel_changefeed: (opts) =>
268
opts = defaults opts,
269
id : required
270
cb : undefined # not really asynchronous
271
dbg = @_dbg("user_query_cancel_changefeed(id='#{opts.id}')")
272
feed = @_changefeeds?[opts.id]
273
if feed?
274
dbg("actually canceling feed")
275
@_dec_changefeed_count(opts.id)
276
delete @_changefeeds[opts.id]
277
feed.close()
278
else
279
dbg("already canceled before (no such feed)")
280
opts.cb?()
281
282
_user_get_query_columns: (query, remove_from_query) =>
283
v = misc.keys(query)
284
if remove_from_query?
285
# If remove_from_query is specified it should be an array of strings
286
# and we do not includes these in what is returned.
287
v = lodash.difference(v, remove_from_query)
288
return v
289
290
_require_is_admin: (account_id, cb) =>
291
if not account_id?
292
cb("FATAL: user must be an admin")
293
return
294
@is_admin
295
account_id : account_id
296
cb : (err, is_admin) =>
297
if err
298
cb(err)
299
else if not is_admin
300
cb("FATAL: user must be an admin")
301
else
302
cb()
303
304
# Ensure that each project_id in project_ids is such that the account is in one of the given
305
# groups for the project, or that the account is an admin. If not, cb(err).
306
_require_project_ids_in_groups: (account_id, project_ids, groups, cb) =>
307
s = {"#{account_id}": true}
308
require_admin = false
309
@_query
310
query : "SELECT project_id, users#>'{#{account_id}}' AS user FROM projects"
311
where : "project_id = ANY($)":project_ids
312
cache : true
313
cb : all_results (err, x) =>
314
if err
315
cb(err)
316
else
317
known_project_ids = {} # we use this to ensure that each of the given project_ids exists.
318
for p in x
319
known_project_ids[p.project_id] = true
320
if p.user?.group not in groups
321
require_admin = true
322
# If any of the project_ids don't exist, reject the query.
323
for project_id in project_ids
324
if not known_project_ids[project_id]
325
cb("FATAL: unknown project_id '#{misc.trunc(project_id,100)}'")
326
return
327
if require_admin
328
@_require_is_admin(account_id, cb)
329
else
330
cb()
331
332
_query_parse_options: (options) =>
333
r = {}
334
for x in options
335
for name, value of x
336
switch name
337
when 'only_changes'
338
r.only_changes = !!value
339
when 'limit'
340
r.limit = parseInt(value)
341
when 'slice'
342
r.slice = value
343
when 'order_by'
344
if value[0] == '-'
345
value = value.slice(1) + " DESC "
346
if r.order_by
347
r.order_by = r.order_by + ', ' + value
348
else
349
r.order_by = value
350
when 'delete'
351
null
352
# ignore delete here - is parsed elsewhere
353
when 'heartbeat'
354
@_dbg("_query_parse_options")("TODO/WARNING -- ignoring heartbeat option from old client")
355
else
356
r.err = "unknown option '#{name}'"
357
# Guard rails: no matter what, all queries are capped with a limit of 100000.
358
# TODO: If somehow somebody has, e.g., more than 100K projects, or maybe more
359
# than 100K edits of a single file, they could hit this and not realize it. I
360
# had this set at 1000 for a few minutes and it caused me to randomly not have
361
# some of my projects.
362
MAX_LIMIT = 100000
363
try
364
if not isFinite(r.limit)
365
r.limit = MAX_LIMIT
366
else if r.limit > MAX_LIMIT
367
r.limit = MAX_LIMIT
368
catch
369
r.limit = MAX_LIMIT
370
return r
371
372
###
373
SET QUERIES
374
###
375
_parse_set_query_opts: (opts) =>
376
r = {}
377
378
if opts.project_id?
379
dbg = r.dbg = @_dbg("user_set_query(project_id='#{opts.project_id}', table='#{opts.table}')")
380
else if opts.account_id?
381
dbg = r.dbg = @_dbg("user_set_query(account_id='#{opts.account_id}', table='#{opts.table}')")
382
else
383
return {err:"FATAL: account_id or project_id must be specified"}
384
385
if not SCHEMA[opts.table]?
386
return {err:"FATAL: table '#{opts.table}' does not exist"}
387
388
dbg(misc.to_json(opts.query))
389
390
if opts.options
391
dbg("options=#{misc.to_json(opts.options)}")
392
393
r.query = misc.copy(opts.query)
394
r.table = opts.table
395
r.db_table = SCHEMA[opts.table].virtual ? opts.table
396
r.account_id = opts.account_id
397
r.project_id = opts.project_id
398
399
s = SCHEMA[opts.table]
400
401
if opts.account_id?
402
r.client_query = s?.user_query
403
else
404
r.client_query = s?.project_query
405
406
if not r.client_query?.set?.fields?
407
return {err:"FATAL: user set queries not allowed for table '#{opts.table}'"}
408
409
if not @_mod_fields(opts.query, r.client_query)
410
dbg("shortcut -- no fields will be modified, so nothing to do")
411
return
412
413
for field in misc.keys(r.client_query.set.fields)
414
if r.client_query.set.fields[field] == undefined
415
return {err: "FATAL: user set query not allowed for #{opts.table}.#{field}"}
416
val = r.client_query.set.fields[field]
417
418
if typeof(val) == 'function'
419
try
420
r.query[field] = val(r.query, @)
421
catch err
422
return {err:"FATAL: error setting '#{field}' -- #{err}"}
423
else
424
switch val
425
when 'account_id'
426
if not r.account_id?
427
return {err: "FATAL: account_id must be specified -- make sure you are signed in"}
428
r.query[field] = r.account_id
429
when 'project_id'
430
if not r.project_id?
431
return {err: "FATAL: project_id must be specified"}
432
r.query[field] = r.project_id
433
when 'time_id'
434
r.query[field] = uuid.v1()
435
when 'project_write'
436
if not r.query[field]?
437
return {err: "FATAL: must specify #{opts.table}.#{field}"}
438
r.require_project_ids_write_access = [r.query[field]]
439
when 'project_owner'
440
if not r.query[field]?
441
return {err:"FATAL: must specify #{opts.table}.#{field}"}
442
r.require_project_ids_owner = [r.query[field]]
443
444
if r.client_query.set.admin
445
r.require_admin = true
446
447
r.primary_keys = @_primary_keys(r.db_table)
448
449
r.json_fields = @_json_fields(r.db_table, r.query)
450
451
for k, v of r.query
452
if k in r.primary_keys
453
continue
454
if r.client_query?.set?.fields?[k] != undefined
455
continue
456
if s.admin_query?.set?.fields?[k] != undefined
457
r.require_admin = true
458
continue
459
return {err: "FATAL: changing #{r.table}.#{k} not allowed"}
460
461
# HOOKS which allow for running arbitrary code in response to
462
# user set queries. In each case, new_val below is only the part
463
# of the object that the user requested to change.
464
465
# 0. CHECK: Runs before doing any further processing; has callback, so this
466
# provides a generic way to quickly check whether or not this query is allowed
467
# for things that can't be done declaratively. The check_hook can also
468
# mutate the obj (the user query), e.g., to enforce limits on input size.
469
r.check_hook = r.client_query.set.check_hook
470
471
# 1. BEFORE: If before_change is set, it is called with input
472
# (database, old_val, new_val, account_id, cb)
473
# before the actual change to the database is made.
474
r.before_change_hook = r.client_query.set.before_change
475
476
# 2. INSTEAD OF: If instead_of_change is set, then instead_of_change_hook
477
# is called with input
478
# (database, old_val, new_val, account_id, cb)
479
# *instead* of actually doing the update/insert to
480
# the database. This makes it possible to run arbitrary
481
# code whenever the user does a certain type of set query.
482
# Obviously, if that code doesn't set the new_val in the
483
# database, then new_val won't be the new val.
484
r.instead_of_change_hook = r.client_query.set.instead_of_change
485
486
# 3. AFTER: If set, the on_change_hook is called with
487
# (database, old_val, new_val, account_id, cb)
488
# after everything the database has been modified.
489
r.on_change_hook = r.client_query.set.on_change
490
491
# 4. instead of query
492
r.instead_of_query = r.client_query.set.instead_of_query
493
494
#dbg("on_change_hook=#{on_change_hook?}, #{misc.to_json(misc.keys(client_query.set))}")
495
496
# Set the query options -- order doesn't matter for set queries (unlike for get), so we
497
# just merge the options into a single dictionary.
498
# NOTE: As I write this, there is just one supported option: {delete:true}.
499
r.options = {}
500
if r.client_query.set.options?
501
for x in r.client_query.set.options
502
for y, z of x
503
r.options[y] = z
504
if opts.options?
505
for x in opts.options
506
for y, z of x
507
r.options[y] = z
508
dbg("options = #{misc.to_json(r.options)}")
509
510
if r.options.delete and not r.client_query.set.delete
511
# delete option is set, but deletes aren't explicitly allowed on this table. ERROR.
512
return {err: "FATAL: delete from #{r.table} not allowed"}
513
514
return r
515
516
_user_set_query_enforce_requirements: (r, cb) =>
517
async.parallel([
518
(cb) =>
519
if r.require_admin
520
@_require_is_admin(r.account_id, cb)
521
else
522
cb()
523
(cb) =>
524
if r.require_project_ids_write_access?
525
if r.project_id?
526
err = undefined
527
for x in r.require_project_ids_write_access
528
if x != r.project_id
529
err = "FATAL: can only query same project"
530
break
531
cb(err)
532
else
533
@_require_project_ids_in_groups(r.account_id, r.require_project_ids_write_access,\
534
['owner', 'collaborator'], cb)
535
else
536
cb()
537
(cb) =>
538
if r.require_project_ids_owner?
539
@_require_project_ids_in_groups(r.account_id, r.require_project_ids_owner,\
540
['owner'], cb)
541
else
542
cb()
543
], cb)
544
545
_user_set_query_where: (r) =>
546
where = {}
547
for primary_key in @_primary_keys(r.db_table)
548
value = r.query[primary_key]
549
if SCHEMA[r.db_table].fields[primary_key].noCoerce
550
where["#{primary_key}=$"] = value
551
else
552
type = pg_type(SCHEMA[r.db_table].fields[primary_key])
553
if type == 'TIMESTAMP' and not misc.is_date(value)
554
# Javascript is better at parsing its own dates than PostgreSQL
555
# isNaN test so NOW(), etc. work still
556
x = new Date(value)
557
if not isNaN(x)
558
value = x
559
where["#{primary_key}=$::#{type}"] = value
560
return where
561
562
_user_set_query_values: (r) =>
563
values = {}
564
s = SCHEMA[r.db_table]
565
for key, value of r.query
566
type = pg_type(s?.fields?[key])
567
if value? and type? and not s?.fields?[key]?.noCoerce
568
if type == 'TIMESTAMP' and not misc.is_date(value)
569
# (as above) Javascript is better at parsing its own dates than PostgreSQL
570
x = new Date(value)
571
if not isNaN(x)
572
value = x
573
values["#{key}::#{type}"] = value
574
else
575
values[key] = value
576
return values
577
578
_user_set_query_hooks_prepare: (r, cb) =>
579
if r.on_change_hook? or r.before_change_hook? or r.instead_of_change_hook?
580
for primary_key in r.primary_keys
581
if not r.query[primary_key]?
582
cb("FATAL: query must specify (primary) key '#{primary_key}'")
583
return
584
# get the old value before changing it
585
# TODO: optimization -- can we restrict columns below?
586
@_query
587
query : "SELECT * FROM #{r.db_table}"
588
where : @_user_set_query_where(r)
589
cb : one_result (err, x) =>
590
r.old_val = x; cb(err)
591
else
592
cb()
593
594
_user_query_set_count: (r, cb) =>
595
@_query
596
query : "SELECT COUNT(*) FROM #{r.db_table}"
597
where : @_user_set_query_where(r)
598
cb : count_result(cb)
599
600
_user_query_set_delete: (r, cb) =>
601
@_query
602
query : "DELETE FROM #{r.db_table}"
603
where : @_user_set_query_where(r)
604
cb : cb
605
606
_user_set_query_conflict: (r) =>
607
return r.primary_keys
608
609
_user_query_set_upsert: (r, cb) =>
610
# r.dbg("_user_query_set_upsert #{JSON.stringify(r.query)}")
611
@_query
612
query : "INSERT INTO #{r.db_table}"
613
values : @_user_set_query_values(r)
614
conflict : @_user_set_query_conflict(r)
615
cb : cb
616
617
# Record is already in DB, so we update it:
618
# this function handles a case that involves both
619
# a jsonb_merge and an update.
620
_user_query_set_upsert_and_jsonb_merge: (r, cb) =>
621
jsonb_merge = {}
622
for k of r.json_fields
623
v = r.query[k]
624
if v?
625
jsonb_merge[k] = v
626
set = {}
627
for k, v of r.query
628
if k not in r.primary_keys and not jsonb_merge[k]?
629
set[k] = v
630
@_query
631
query : "UPDATE #{r.db_table}"
632
jsonb_merge : jsonb_merge
633
set : set
634
where : @_user_set_query_where(r)
635
cb : cb
636
637
_user_set_query_main_query: (r, cb) =>
638
r.dbg("_user_set_query_main_query")
639
640
if not r.client_query.set.allow_field_deletes
641
# allow_field_deletes not set, so remove any null/undefined
642
# fields from the query
643
for key of r.query
644
if not r.query[key]?
645
delete r.query[key]
646
647
if r.options.delete
648
for primary_key in r.primary_keys
649
if not r.query[primary_key]?
650
cb("FATAL: delete query must set primary key")
651
return
652
r.dbg("delete based on primary key")
653
@_user_query_set_delete(r, cb)
654
return
655
if r.instead_of_change_hook?
656
r.instead_of_change_hook(@, r.old_val, r.query, r.account_id, cb)
657
else
658
if misc.len(r.json_fields) == 0
659
# easy case -- there are no jsonb merge fields; just do an upsert.
660
@_user_query_set_upsert(r, cb)
661
return
662
# HARD CASE -- there are json_fields... so we are doing an insert
663
# if the object isn't already in the database, and an update
664
# if it is. This is ugly because I don't know how to do both
665
# a JSON merge as an upsert.
666
cnt = undefined # will equal number of records having the primary key (so 0 or 1)
667
async.series([
668
(cb) =>
669
@_user_query_set_count r, (err, n) =>
670
cnt = n; cb(err)
671
(cb) =>
672
r.dbg("do the set query")
673
if cnt == 0
674
# Just insert (do as upsert to avoid error in case of race)
675
@_user_query_set_upsert(r, cb)
676
else
677
# Do as an update -- record is definitely already in db since cnt > 0.
678
# This would fail in the unlikely (but possible) case that somebody deletes
679
# the record between the above count and when we do the UPDATE.
680
# Using a transaction could avoid this.
681
# Maybe such an error is reasonable and it's good to report it as such.
682
@_user_query_set_upsert_and_jsonb_merge(r, cb)
683
], cb)
684
685
user_set_query: (opts) =>
686
opts = defaults opts,
687
account_id : undefined
688
project_id : undefined
689
table : required
690
query : required
691
options : undefined # options=[{delete:true}] is the only supported nontrivial option here.
692
cb : required # cb(err)
693
694
# TODO: it would be nice to return the primary key part of the created object on creation.
695
# That's not implemented and will be somewhat nontrivial, and will use the RETURNING clause
696
# of postgres's INSERT - https://www.postgresql.org/docs/current/sql-insert.html
697
698
if @is_standby
699
opts.cb("set queries against standby not allowed")
700
return
701
r = @_parse_set_query_opts(opts)
702
703
# Only uncomment for debugging -- too big/verbose/dangerous
704
# r.dbg("parsed query opts = #{JSON.stringify(r)}")
705
706
if not r? # nothing to do
707
opts.cb()
708
return
709
if r.err
710
opts.cb(r.err)
711
return
712
713
async.series([
714
(cb) =>
715
@_user_set_query_enforce_requirements(r, cb)
716
(cb) =>
717
if r.check_hook?
718
r.check_hook(@, r.query, r.account_id, r.project_id, cb)
719
else
720
cb()
721
(cb) =>
722
@_user_set_query_hooks_prepare(r, cb)
723
(cb) =>
724
if r.before_change_hook?
725
r.before_change_hook @, r.old_val, r.query, r.account_id, (err, stop) =>
726
r.done = stop
727
cb(err)
728
else
729
cb()
730
(cb) =>
731
if r.done
732
cb()
733
return
734
if r.instead_of_query?
735
opts1 = misc.copy_without(opts, ['cb', 'changes', 'table'])
736
r.instead_of_query(@, opts1, cb)
737
else
738
@_user_set_query_main_query(r, cb)
739
(cb) =>
740
if r.done
741
cb()
742
return
743
if r.on_change_hook?
744
r.on_change_hook(@, r.old_val, r.query, r.account_id, cb)
745
else
746
cb()
747
], (err) => opts.cb(err))
748
749
# mod_fields counts the fields in query that might actually get modified
750
# in the database when we do the query; e.g., account_id won't since it gets
751
# filled in with the user's account_id, and project_write won't since it must
752
# refer to an existing project. We use mod_field **only** to skip doing
753
# no-op queries. It's just an optimization.
754
_mod_fields: (query, client_query) =>
755
for field in misc.keys(query)
756
if client_query.set.fields[field] not in ['account_id', 'project_write']
757
return true
758
return false
759
760
_user_get_query_json_timestamps: (obj, fields) =>
761
# obj is an object returned from the database via a query
762
# Postgres JSONB doesn't support timestamps, so we convert
763
# every json leaf node of obj that looks like JSON of a timestamp
764
# to a Javascript Date.
765
for k, v of obj
766
if fields[k]
767
obj[k] = misc.fix_json_dates(v, fields[k])
768
769
# fill in the default values for obj using the client_query spec.
770
_user_get_query_set_defaults: (client_query, obj, fields) =>
771
if not misc.is_array(obj)
772
obj = [obj]
773
else if obj.length == 0
774
return
775
s = client_query?.get?.fields ? {}
776
for k in fields
777
v = s[k]
778
if v?
779
# k is a field for which a default value (=v) is provided in the schema
780
for x in obj
781
# For each obj pulled from the database that is defined...
782
if x?
783
# We check to see if the field k was set on that object.
784
y = x[k]
785
if not y?
786
# It was NOT set, so we deep copy the default value for the field k.
787
x[k] = misc.deep_copy(v)
788
else if typeof(v) == 'object' and typeof(y) == 'object' and not misc.is_array(v)
789
# y *is* defined and is an object, so we merge in the provided defaults.
790
for k0, v0 of v
791
if not y[k0]?
792
y[k0] = v0
793
794
_user_set_query_project_users: (obj, account_id) =>
795
dbg = @_dbg("_user_set_query_project_users")
796
if not obj.users?
797
# nothing to do -- not changing users.
798
return
799
##dbg("disabled")
800
##return obj.users
801
# - ensures all keys of users are valid uuid's (though not that they are valid users).
802
# - and format is:
803
# {group:'owner' or 'collaborator', hide:bool, upgrades:{a map}}
804
# with valid upgrade fields.
805
upgrade_fields = PROJECT_UPGRADES.params
806
users = {}
807
# TODO: we obviously should check that a user is only changing the part
808
# of this object involving themselves... or adding/removing collaborators.
809
# That is not currently done below. TODO TODO TODO SECURITY.
810
for id, x of obj.users
811
if misc.is_valid_uuid_string(id)
812
for key in misc.keys(x)
813
if key not in ['group', 'hide', 'upgrades', 'ssh_keys']
814
throw Error("unknown field '#{key}")
815
if x.group? and (x.group not in ['owner', 'collaborator'])
816
throw Error("invalid value for field 'group'")
817
if x.hide? and typeof(x.hide) != 'boolean'
818
throw Error("invalid type for field 'hide'")
819
if x.upgrades?
820
if not misc.is_object(x.upgrades)
821
throw Error("invalid type for field 'upgrades'")
822
for k,_ of x.upgrades
823
if not upgrade_fields[k]
824
throw Error("invalid upgrades field '#{k}'")
825
if x.ssh_keys
826
# do some checks.
827
if not misc.is_object(x.ssh_keys)
828
throw Error("ssh_keys must be an object")
829
for fingerprint, key of x.ssh_keys
830
if not key # deleting
831
continue
832
if not misc.is_object(key)
833
throw Error("each key in ssh_keys must be an object")
834
for k, v of key
835
# the two dates are just numbers not actual timestamps...
836
if k not in ['title', 'value', 'creation_date', 'last_use_date']
837
throw Error("invalid ssh_keys field '#{k}'")
838
users[id] = x
839
return users
840
841
project_action: (opts) =>
842
opts = defaults opts,
843
project_id : required
844
action_request : required # action is object {action:?, time:?}
845
cb : required
846
if opts.action_request.action == 'test'
847
# used for testing -- shouldn't trigger anything to happen.
848
opts.cb()
849
return
850
dbg = @_dbg("project_action(project_id='#{opts.project_id}',action_request=#{misc.to_json(opts.action_request)})")
851
dbg()
852
project = undefined
853
action_request = misc.copy(opts.action_request)
854
set_action_request = (cb) =>
855
dbg("set action_request to #{misc.to_json(action_request)}")
856
@_query
857
query : "UPDATE projects"
858
where : 'project_id = $::UUID':opts.project_id
859
jsonb_set : {action_request : action_request}
860
cb : cb
861
async.series([
862
(cb) =>
863
action_request.started = new Date()
864
set_action_request(cb)
865
(cb) =>
866
dbg("get project")
867
try
868
project = await @projectControl(opts.project_id)
869
cb()
870
catch err
871
cb(err)
872
(cb) =>
873
dbg("doing action")
874
try
875
switch action_request.action
876
when 'restart'
877
await project.restart()
878
when 'stop'
879
await project.stop()
880
when 'start'
881
await project.start()
882
else
883
throw Error("FATAL: action '#{opts.action_request.action}' not implemented")
884
cb()
885
catch err
886
cb(err)
887
], (err) =>
888
if err
889
action_request.err = err
890
action_request.finished = new Date()
891
dbg("finished!")
892
set_action_request(opts.cb)
893
)
894
895
# This hook is called *before* the user commits a change to a project in the database
896
# via a user set query.
897
# TODO: Add a pre-check here as well that total upgrade isn't going to be exceeded.
898
# This will avoid a possible subtle edge case if user is cheating and always somehow
899
# crashes server...?
900
_user_set_query_project_change_before: (old_val, new_val, account_id, cb) =>
901
dbg = @_dbg("_user_set_query_project_change_before #{account_id}, #{misc.to_json(old_val)} --> #{misc.to_json(new_val)}")
902
dbg()
903
904
if new_val?.name and (new_val?.name != old_val?.name)
905
# Changing or setting the name of the project to something nontrivial.
906
try
907
checkProjectName(new_val.name);
908
catch err
909
cb(err.toString())
910
return
911
if new_val.name
912
# Setting name to something nontrivial, so we must check uniqueness
913
# among all projects this user owns.
914
result = await callback2 @_query,
915
query : 'SELECT COUNT(*) FROM projects'
916
where :
917
"users#>>'{#{account_id},group}' = $::TEXT" : 'owner'
918
"project_id != $::UUID" : new_val.project_id
919
"LOWER(name) = $::TEXT":new_val.name.toLowerCase()
920
if result.rows[0].count > 0
921
cb("There is already a project with the same owner as this project and name='#{new_val.name}'. Names are not case sensitive.")
922
return
923
# A second constraint is that only the project owner can change the project name.
924
result = await callback2 @_query,
925
query : 'SELECT COUNT(*) FROM projects'
926
where :
927
"users#>>'{#{account_id},group}' = $::TEXT" : 'owner'
928
"project_id = $::UUID" : new_val.project_id
929
if result.rows[0].count == 0
930
cb("Only the owner of the project can currently change the project name.")
931
return
932
933
if new_val?.action_request? and JSON.stringify(new_val.action_request.time) != JSON.stringify(old_val?.action_request?.time)
934
# Requesting an action, e.g., save, restart, etc.
935
dbg("action_request -- #{misc.to_json(new_val.action_request)}")
936
#
937
# WARNING: Above, we take the difference of times below, since != doesn't work as we want with
938
# separate Date objects, as it will say equal dates are not equal. Example:
939
# coffee> x = JSON.stringify(new Date()); {from_json}=require('misc'); a=from_json(x); b=from_json(x); [a!=b, a-b]
940
# [ true, 0 ]
941
942
# Launch the action -- success or failure communicated back to all clients through changes to state.
943
# Also, we don't have to worry about permissions here; that this function got called at all means
944
# the user has write access to the projects table entry with given project_id, which gives them permission
945
# to do any action with the project.
946
@project_action
947
project_id : new_val.project_id
948
action_request : misc.copy_with(new_val.action_request, ['action', 'time'])
949
cb : (err) =>
950
dbg("action_request #{misc.to_json(new_val.action_request)} completed -- #{err}")
951
# true means -- do nothing further. We don't want to the user to
952
# set this same thing since we already dealt with it properly.
953
cb(err, true)
954
return
955
956
if not new_val.users? # not changing users
957
cb(); return
958
old_val = old_val?.users ? {}
959
new_val = new_val?.users ? {}
960
for id in misc.keys(old_val).concat(new_val)
961
if account_id != id
962
# make sure user doesn't change anybody else's allocation
963
if not lodash.isEqual(old_val?[id]?.upgrades, new_val?[id]?.upgrades)
964
err = "FATAL: user '#{account_id}' tried to change user '#{id}' allocation toward a project"
965
dbg(err)
966
cb(err)
967
return
968
cb()
969
970
# This hook is called *after* the user commits a change to a project in the database
971
# via a user set query. It could undo changes the user isn't allowed to make, which
972
# might require doing various async calls, or take actions (e.g., setting quotas,
973
# starting projects, etc.).
974
_user_set_query_project_change_after: (old_val, new_val, account_id, cb) =>
975
dbg = @_dbg("_user_set_query_project_change_after #{account_id}, #{misc.to_json(old_val)} --> #{misc.to_json(new_val)}")
976
dbg()
977
old_upgrades = old_val.users?[account_id]?.upgrades
978
new_upgrades = new_val.users?[account_id]?.upgrades
979
if new_upgrades? and not lodash.isEqual(old_upgrades, new_upgrades)
980
dbg("upgrades changed for #{account_id} from #{misc.to_json(old_upgrades)} to #{misc.to_json(new_upgrades)}")
981
project = undefined
982
async.series([
983
(cb) =>
984
@ensure_user_project_upgrades_are_valid
985
account_id : account_id
986
cb : cb
987
(cb) =>
988
if not @projectControl?
989
cb()
990
else
991
dbg("get project")
992
try
993
project = await @projectControl(new_val.project_id)
994
cb()
995
catch err
996
cb(err)
997
(cb) =>
998
if not project?
999
cb()
1000
else
1001
dbg("determine total quotas and apply")
1002
try
1003
await project.setAllQuotas()
1004
cb()
1005
catch err
1006
cb(err)
1007
], cb)
1008
else
1009
cb()
1010
1011
###
1012
GET QUERIES
1013
###
1014
1015
# Make any functional substitutions defined by the schema.
1016
# This may mutate query in place.
1017
_user_get_query_functional_subs: (query, fields) =>
1018
if fields?
1019
for field, val of fields
1020
if typeof(val) == 'function'
1021
query[field] = val(query, @)
1022
1023
_parse_get_query_opts: (opts) =>
1024
if opts.changes? and not opts.changes.cb?
1025
return {err: "FATAL: user_get_query -- if opts.changes is specified, then opts.changes.cb must also be specified"}
1026
1027
r = {}
1028
# get data about user queries on this table
1029
if opts.project_id?
1030
r.client_query = SCHEMA[opts.table]?.project_query
1031
else
1032
r.client_query = SCHEMA[opts.table]?.user_query
1033
1034
if not r.client_query?.get?
1035
return {err: "FATAL: get queries not allowed for table '#{opts.table}'"}
1036
1037
if not opts.account_id? and not opts.project_id? and not SCHEMA[opts.table].anonymous
1038
return {err: "FATAL: anonymous get queries not allowed for table '#{opts.table}'"}
1039
1040
r.table = SCHEMA[opts.table].virtual ? opts.table
1041
1042
r.primary_keys = @_primary_keys(opts.table)
1043
1044
# Are only admins allowed any get access to this table?
1045
r.require_admin = !!r.client_query.get.admin
1046
1047
# Verify that all requested fields may be read by users
1048
for field in misc.keys(opts.query)
1049
if r.client_query.get.fields?[field] == undefined
1050
return {err: "FATAL: user get query not allowed for #{opts.table}.#{field}"}
1051
1052
# Functional substitutions defined by schema
1053
@_user_get_query_functional_subs(opts.query, r.client_query.get?.fields)
1054
1055
if r.client_query.get?.instead_of_query?
1056
return r
1057
1058
# Sanity check: make sure there is something in the query
1059
# that gets only things in this table that this user
1060
# is allowed to see, or at least a check_hook. This is not required
1061
# for admins.
1062
if not r.client_query.get.pg_where? and not r.client_query.get.check_hook? and not r.require_admin
1063
return {err: "FATAL: user get query not allowed for #{opts.table} (no getAll filter - pg_where or check_hook)"}
1064
1065
# Apply default options to the get query (don't impact changefeed)
1066
# The user can override these, e.g., if they were to want to explicitly increase a limit
1067
# to get more file use history.
1068
user_options = {}
1069
for x in opts.options
1070
for y, z of x
1071
user_options[y] = true
1072
1073
get_options = undefined
1074
if @is_heavily_loaded() and r.client_query.get.options_load?
1075
get_options = r.client_query.get.options_load
1076
else if r.client_query.get.options?
1077
get_options = r.client_query.get.options
1078
if get_options?
1079
# complicated since options is a list of {opt:val} !
1080
for x in get_options
1081
for y, z of x
1082
if not user_options[y]
1083
opts.options.push(x)
1084
break
1085
1086
r.json_fields = @_json_fields(opts.table, opts.query)
1087
return r
1088
1089
# _json_fields: map from field names to array of fields that should be parsed as timestamps
1090
# These keys of his map are also used by _user_query_set_upsert_and_jsonb_merge to determine
1091
# JSON deep merging for set queries.
1092
_json_fields: (table, query) =>
1093
json_fields = {}
1094
for field, info of SCHEMA[table].fields
1095
if (query[field]? or query[field] == null) and (info.type == 'map' or info.pg_type == 'JSONB')
1096
json_fields[field] = info.date ? []
1097
return json_fields
1098
1099
_user_get_query_where: (client_query, account_id, project_id, user_query, table, cb) =>
1100
dbg = @_dbg("_user_get_query_where")
1101
dbg()
1102
1103
pg_where = client_query.get.pg_where
1104
1105
if @is_heavily_loaded() and client_query.get.pg_where_load?
1106
# use a different query if load is heavy
1107
pg_where = client_query.get.pg_where_load
1108
1109
if not pg_where?
1110
pg_where = []
1111
if pg_where == 'projects'
1112
pg_where = ['projects']
1113
1114
if typeof(pg_where) == 'function'
1115
pg_where = pg_where(user_query, @)
1116
if not misc.is_array(pg_where)
1117
cb("FATAL: pg_where must be an array (of strings or objects)")
1118
return
1119
1120
# Do NOT mutate the schema itself!
1121
pg_where = misc.deep_copy(pg_where)
1122
1123
# expand 'projects' in query, depending on whether project_id is specified or not.
1124
# This is just a convenience to make the db schema simpler.
1125
for i in [0...pg_where.length]
1126
if pg_where[i] == 'projects'
1127
if user_query.project_id
1128
pg_where[i] = {"project_id = $::UUID" : 'project_id'}
1129
else
1130
pg_where[i] = {"project_id = ANY(select project_id from projects where users ? $::TEXT)" : 'account_id'}
1131
1132
# Now we fill in all the parametrized substitutions in the pg_where list.
1133
subs = {}
1134
for x in pg_where
1135
if misc.is_object(x)
1136
for key, value of x
1137
subs[value] = value
1138
1139
sub_value = (value, cb) =>
1140
switch value
1141
when 'account_id'
1142
if not account_id?
1143
cb('FATAL: account_id must be given')
1144
return
1145
subs[value] = account_id
1146
cb()
1147
when 'project_id'
1148
if project_id?
1149
subs[value] = project_id
1150
cb()
1151
else if not user_query.project_id
1152
cb("FATAL: must specify project_id")
1153
else if SCHEMA[table].anonymous
1154
subs[value] = user_query.project_id
1155
cb()
1156
else
1157
@user_is_in_project_group
1158
account_id : account_id
1159
project_id : user_query.project_id
1160
groups : ['owner', 'collaborator']
1161
cb : (err, in_group) =>
1162
if err
1163
cb(err)
1164
else if in_group
1165
subs[value] = user_query.project_id
1166
cb()
1167
else
1168
cb("FATAL: you do not have read access to this project")
1169
when 'project_id-public'
1170
if not user_query.project_id?
1171
cb("FATAL: must specify project_id")
1172
else
1173
if SCHEMA[table].anonymous
1174
@has_public_path
1175
project_id : user_query.project_id
1176
cb : (err, has_public_path) =>
1177
if err
1178
cb(err)
1179
else if not has_public_path
1180
cb("project does not have any public paths")
1181
else
1182
subs[value] = user_query.project_id
1183
cb()
1184
else
1185
cb("FATAL: table must allow anonymous queries")
1186
else
1187
cb()
1188
1189
async.map misc.keys(subs), sub_value, (err) =>
1190
if err
1191
cb(err)
1192
return
1193
for x in pg_where
1194
if misc.is_object(x)
1195
for key, value of x
1196
x[key] = subs[value]
1197
1198
# impose further restrictions (more where conditions)
1199
pg_where.push(userGetQueryFilter(user_query, client_query))
1200
1201
cb(undefined, pg_where)
1202
1203
_user_get_query_options: (options, multi, schema_options) =>
1204
r = {}
1205
1206
if schema_options?
1207
options = options.concat(schema_options)
1208
1209
# Parse option part of the query
1210
{limit, order_by, slice, only_changes, err} = @_query_parse_options(options)
1211
1212
if err
1213
return {err: err}
1214
if only_changes
1215
r.only_changes = true
1216
if limit?
1217
r.limit = limit
1218
else if not multi
1219
r.limit = 1
1220
if order_by?
1221
r.order_by = order_by
1222
if slice?
1223
return {err: "slice not implemented"}
1224
return r
1225
1226
_user_get_query_do_query: (query_opts, client_query, user_query, multi, json_fields, cb) =>
1227
query_opts.cb = all_results (err, x) =>
1228
if err
1229
cb(err)
1230
else
1231
if misc.len(json_fields) > 0
1232
# Convert timestamps to Date objects, if **explicitly** specified in the schema
1233
for obj in x
1234
@_user_get_query_json_timestamps(obj, json_fields)
1235
1236
if not multi
1237
x = x[0]
1238
# Fill in default values and remove null's
1239
@_user_get_query_set_defaults(client_query, x, misc.keys(user_query))
1240
# Get rid of undefined fields -- that's the default and wastes memory and bandwidth
1241
if x?
1242
for obj in x
1243
misc.map_mutate_out_undefined_and_null(obj)
1244
cb(undefined, x)
1245
@_query(query_opts)
1246
1247
_user_get_query_query: (table, user_query, remove_from_query) =>
1248
return "SELECT #{(quote_field(field) for field in @_user_get_query_columns(user_query, remove_from_query)).join(',')} FROM #{table}"
1249
1250
_user_get_query_satisfied_by_obj: (user_query, obj, possible_time_fields) =>
1251
#dbg = @_dbg("_user_get_query_satisfied_by_obj)
1252
#dbg(user_query, obj)
1253
for field, value of obj
1254
date_keys = possible_time_fields[field]
1255
if date_keys
1256
value = misc.fix_json_dates(value, date_keys)
1257
if (q = user_query[field])?
1258
if (op = queryIsCmp(q))
1259
#dbg(value:value, op: op, q:q)
1260
x = q[op]
1261
switch op
1262
when '=='
1263
if value != x
1264
return false
1265
when '!='
1266
if value == x
1267
return false
1268
when '>='
1269
if value < x
1270
return false
1271
when '<='
1272
if value > x
1273
return false
1274
when '>'
1275
if value <= x
1276
return false
1277
when '<'
1278
if value >= x
1279
return false
1280
else if value != q
1281
return false
1282
return true
1283
1284
_user_get_query_handle_field_deletes: (client_query, new_val) =>
1285
if client_query.get.allow_field_deletes
1286
# leave in the nulls that might be in new_val
1287
return
1288
# remove all nulls from new_val. Right now we
1289
# just can't support this due to default values.
1290
# TODO: completely get rid of default values (?) or
1291
# maybe figure out how to implement this. The symptom
1292
# of not doing this is a normal user will do things like
1293
# delete the users field of their projects. Not good.
1294
for key of new_val
1295
if not new_val[key]?
1296
delete new_val[key]
1297
1298
_user_get_query_changefeed: (changes, table, primary_keys, user_query,
1299
where, json_fields, account_id, client_query, cb) =>
1300
dbg = @_dbg("_user_get_query_changefeed(table='#{table}')")
1301
dbg()
1302
# WARNING: always call changes.cb! Do not do something like f = changes.cb, then call f!!!!
1303
# This is because the value of changes.cb may be changed by the caller.
1304
if not misc.is_object(changes)
1305
cb("FATAL: changes must be an object with keys id and cb")
1306
return
1307
if not misc.is_valid_uuid_string(changes.id)
1308
cb("FATAL: changes.id must be a uuid")
1309
return
1310
if typeof(changes.cb) != 'function'
1311
cb("FATAL: changes.cb must be a function")
1312
return
1313
for primary_key in primary_keys
1314
if not user_query[primary_key]? and user_query[primary_key] != null # TODO: this seems slightly off
1315
cb("FATAL: changefeed MUST include primary key (='#{primary_key}') in query")
1316
return
1317
watch = []
1318
select = {}
1319
init_tracker = tracker = free_tracker = undefined
1320
possible_time_fields = misc.deep_copy(json_fields)
1321
feed = undefined
1322
1323
for field, val of user_query
1324
type = pg_type(SCHEMA[table]?.fields?[field])
1325
if type == 'TIMESTAMP'
1326
possible_time_fields[field] = 'all'
1327
if val == null and field not in primary_keys
1328
watch.push(field)
1329
else
1330
select[field] = type
1331
1332
if misc.len(possible_time_fields) > 0
1333
# Convert (likely) timestamps to Date objects; fill in defaults for inserts
1334
process = (x) =>
1335
if not x?
1336
return
1337
if x.new_val?
1338
@_user_get_query_json_timestamps(x.new_val, possible_time_fields)
1339
if x.action == 'insert' # do not do this for delete or update actions!
1340
@_user_get_query_set_defaults(client_query, x.new_val, misc.keys(user_query))
1341
else if x.action == 'update'
1342
@_user_get_query_handle_field_deletes(client_query, x.new_val)
1343
if x.old_val?
1344
@_user_get_query_json_timestamps(x.old_val, possible_time_fields)
1345
else
1346
process = (x) =>
1347
if not x?
1348
return
1349
if x.new_val?
1350
if x.action == 'insert' # do not do this for delete or update actions!
1351
@_user_get_query_set_defaults(client_query, x.new_val, misc.keys(user_query))
1352
else if x.action == 'update'
1353
@_user_get_query_handle_field_deletes(client_query, x.new_val)
1354
1355
async.series([
1356
(cb) =>
1357
# check for alternative where test for changefeed.
1358
pg_changefeed = client_query?.get?.pg_changefeed
1359
if not pg_changefeed?
1360
cb(); return
1361
1362
if pg_changefeed == 'projects'
1363
tracker_add = (project_id) => feed.insert({project_id:project_id})
1364
tracker_remove = (project_id) => feed.delete({project_id:project_id})
1365
pg_changefeed = (db, account_id) =>
1366
where : (obj) =>
1367
# Check that this is a project we have read access to
1368
if not db._project_and_user_tracker?.get_projects(account_id)[obj.project_id]
1369
return false
1370
# Now check our actual query conditions on the object.
1371
# This would normally be done by the changefeed, but since
1372
# we are passing in a custom where, we have to do it.
1373
if not @_user_get_query_satisfied_by_obj(user_query, obj, possible_time_fields)
1374
return false
1375
return true
1376
1377
select : {'project_id':'UUID'}
1378
1379
init_tracker : (tracker) =>
1380
tracker.on "add_user_to_project-#{account_id}", tracker_add
1381
tracker.on "remove_user_from_project-#{account_id}", tracker_remove
1382
1383
free_tracker : (tracker) =>
1384
dbg("freeing project tracker events")
1385
tracker.removeListener("add_user_to_project-#{account_id}", tracker_add)
1386
tracker.removeListener("remove_user_from_project-#{account_id}", tracker_remove)
1387
1388
1389
else if pg_changefeed == 'news'
1390
pg_changefeed = ->
1391
where : (obj) ->
1392
if obj.date?
1393
date_obj = new Date(obj.date)
1394
# we send future news items to the frontend, but filter it based on the server time
1395
return date_obj >= misc.months_ago(3)
1396
else
1397
return true
1398
select : {id: 'SERIAL UNIQUE', date: 'TIMESTAMP'}
1399
1400
else if pg_changefeed == 'one-hour'
1401
pg_changefeed = ->
1402
where : (obj) ->
1403
if obj.time?
1404
return new Date(obj.time) >= misc.hours_ago(1)
1405
else
1406
return true
1407
select : {id:'UUID', time:'TIMESTAMP'}
1408
1409
else if pg_changefeed == 'five-minutes'
1410
pg_changefeed = ->
1411
where : (obj) ->
1412
if obj.time?
1413
return new Date(obj.time) >= misc.minutes_ago(5)
1414
else
1415
return true
1416
select : {id:'UUID', time:'TIMESTAMP'}
1417
1418
else if pg_changefeed == 'collaborators'
1419
if not account_id?
1420
cb("FATAL: account_id must be given")
1421
return
1422
tracker_add = (collab_id) => feed.insert({account_id:collab_id})
1423
tracker_remove = (collab_id) => feed.delete({account_id:collab_id})
1424
pg_changefeed = (db, account_id) ->
1425
shared_tracker = undefined
1426
where : (obj) -> # test of "is a collab with me"
1427
return shared_tracker.get_collabs(account_id)?[obj.account_id]
1428
init_tracker : (tracker) =>
1429
shared_tracker = tracker
1430
tracker.on "add_collaborator-#{account_id}", tracker_add
1431
tracker.on "remove_collaborator-#{account_id}", tracker_remove
1432
free_tracker : (tracker) =>
1433
dbg("freeing collab tracker events")
1434
tracker.removeListener("add_collaborator-#{account_id}", tracker_add)
1435
tracker.removeListener("remove_collaborator-#{account_id}", tracker_remove)
1436
1437
1438
x = pg_changefeed(@, account_id)
1439
if x.init_tracker?
1440
init_tracker = x.init_tracker
1441
if x.free_tracker?
1442
free_tracker = x.free_tracker
1443
if x.select?
1444
for k, v of x.select
1445
select[k] = v
1446
1447
if x.where? or x.init_tracker?
1448
where = x.where
1449
if not account_id?
1450
cb()
1451
return
1452
# initialize user tracker is needed for where tests...
1453
@project_and_user_tracker cb : (err, _tracker) =>
1454
if err
1455
cb(err)
1456
else
1457
tracker = _tracker
1458
try
1459
await tracker.register(account_id)
1460
cb()
1461
catch err
1462
cb(err)
1463
else
1464
cb()
1465
(cb) =>
1466
@changefeed
1467
table : table
1468
select : select
1469
where : where
1470
watch : watch
1471
cb : (err, _feed) =>
1472
if err
1473
cb(err)
1474
return
1475
feed = _feed
1476
feed.on 'change', (x) ->
1477
process(x)
1478
changes.cb(undefined, x)
1479
feed.on 'close', ->
1480
changes.cb(undefined, {action:'close'})
1481
dbg("feed close")
1482
if tracker? and free_tracker?
1483
dbg("free_tracker")
1484
free_tracker(tracker)
1485
dbg("do NOT free_tracker")
1486
feed.on 'error', (err) ->
1487
changes.cb("feed error - #{err}")
1488
@_changefeeds ?= {}
1489
@_changefeeds[changes.id] = feed
1490
init_tracker?(tracker)
1491
# Any tracker error means this changefeed is now broken and
1492
# has to be recreated.
1493
tracker?.once 'error', (err) ->
1494
changes.cb("tracker error - #{err}")
1495
cb()
1496
], cb)
1497
1498
user_get_query: (opts) =>
1499
opts = defaults opts,
1500
account_id : undefined
1501
project_id : undefined
1502
table : required
1503
query : required
1504
multi : required
1505
options : required # used for initial query; **IGNORED** by changefeed,
1506
# which ensures that *something* is sent every n minutes, in case no
1507
# changes are coming out of the changefeed. This is an additional
1508
# measure in case the client somehow doesn't get a "this changefeed died" message.
1509
# Use [{delete:true}] to instead delete the selected records (must
1510
# have delete:true in schema).
1511
changes : undefined # {id:?, cb:?}
1512
cb : required # cb(err, result)
1513
###
1514
The general idea is that user get queries are of the form
1515
1516
SELECT [columns] FROM table WHERE [get_all] AND [further restrictions] LIMIT/slice
1517
1518
Using the whitelist rules specified in SCHEMA, we
1519
determine each of the above, then run the query.
1520
1521
If no error in query, and changes is a given uuid, set up a change
1522
feed that calls opts.cb on changes as well.
1523
###
1524
id = misc.uuid().slice(0,6)
1525
#dbg = @_dbg("user_get_query(id=#{id})")
1526
dbg = -> # Logging below is just too verbose, and turns out to not be useful...
1527
dbg("account_id='#{opts.account_id}', project_id='#{opts.project_id}', query=#{misc.to_json(opts.query)}, multi=#{opts.multi}, options=#{misc.to_json(opts.options)}, changes=#{misc.to_json(opts.changes)}")
1528
{err, table, client_query, require_admin, primary_keys, json_fields} = @_parse_get_query_opts(opts)
1529
1530
if err
1531
dbg("error parsing query opts -- #{err}")
1532
opts.cb(err)
1533
return
1534
1535
_query_opts = {} # this will be the input to the @_query command.
1536
locals =
1537
result : undefined
1538
changes_cb : undefined
1539
1540
async.series([
1541
(cb) =>
1542
if client_query.get.check_hook?
1543
dbg("do check hook")
1544
client_query.get.check_hook(@, opts.query, opts.account_id, opts.project_id, cb)
1545
else
1546
cb()
1547
(cb) =>
1548
if require_admin
1549
dbg('require admin')
1550
@_require_is_admin(opts.account_id, cb)
1551
else
1552
cb()
1553
(cb) =>
1554
# NOTE: _user_get_query_where may mutate opts.query (for 'null' params)
1555
# so it is important that this is called before @_user_get_query_query below.
1556
# See the TODO in userGetQueryFilter.
1557
dbg("get_query_where")
1558
@_user_get_query_where client_query, opts.account_id, opts.project_id, opts.query, opts.table, (err, where) =>
1559
_query_opts.where = where
1560
cb(err)
1561
(cb) =>
1562
if client_query.get.instead_of_query?
1563
cb();
1564
return
1565
_query_opts.query = @_user_get_query_query(table, opts.query, client_query.get.remove_from_query)
1566
x = @_user_get_query_options(opts.options, opts.multi, client_query.options)
1567
if x.err
1568
dbg("error in get_query_options, #{x.err}")
1569
cb(x.err)
1570
return
1571
misc.merge(_query_opts, x)
1572
1573
nestloop = SCHEMA[opts.table]?.pg_nestloop # true, false or undefined
1574
if typeof nestloop == 'boolean'
1575
val = if nestloop then 'on' else 'off'
1576
_query_opts.pg_params = {enable_nestloop : val}
1577
1578
indexscan = SCHEMA[opts.table]?.pg_indexscan # true, false or undefined
1579
if typeof indexscan == 'boolean'
1580
val = if indexscan then 'on' else 'off'
1581
_query_opts.pg_params = {enable_indexscan : val}
1582
1583
if opts.changes?
1584
locals.changes_cb = opts.changes.cb
1585
locals.changes_queue = []
1586
# see note about why we do the following at the bottom of this file
1587
opts.changes.cb = (err, obj) ->
1588
locals.changes_queue.push({err:err, obj:obj})
1589
dbg("getting changefeed")
1590
@_user_get_query_changefeed(opts.changes, table, primary_keys,
1591
opts.query, _query_opts.where, json_fields,
1592
opts.account_id, client_query, cb)
1593
else
1594
cb()
1595
1596
(cb) =>
1597
if client_query.get.instead_of_query?
1598
if opts.changes?
1599
cb("changefeeds are not supported for querying this table")
1600
return
1601
# Custom version: instead of doing a full query, we instead
1602
# call a function and that's it.
1603
dbg("do instead_of_query instead")
1604
opts1 = misc.copy_without(opts, ['cb', 'changes', 'table'])
1605
client_query.get.instead_of_query @, opts1, (err, result) =>
1606
locals.result = result
1607
cb(err)
1608
return
1609
1610
if _query_opts.only_changes
1611
dbg("skipping query")
1612
locals.result = undefined
1613
cb()
1614
else
1615
dbg("finally doing query")
1616
@_user_get_query_do_query _query_opts, client_query, opts.query, opts.multi, json_fields, (err, result) =>
1617
if err
1618
cb(err)
1619
return
1620
locals.result = result
1621
cb()
1622
], (err) =>
1623
if err
1624
dbg("series failed -- err=#{err}")
1625
opts.cb(err)
1626
return
1627
dbg("series succeeded")
1628
opts.cb(undefined, locals.result)
1629
if opts.changes?
1630
dbg("sending change queue")
1631
opts.changes.cb = locals.changes_cb
1632
##dbg("sending queued #{JSON.stringify(locals.changes_queue)}")
1633
for {err, obj} in locals.changes_queue
1634
##dbg("sending queued changes #{JSON.stringify([err, obj])}")
1635
opts.changes.cb(err, obj)
1636
)
1637
1638
###
1639
Synchronized strings
1640
###
1641
_user_set_query_syncstring_change_after: (old_val, new_val, account_id, cb) =>
1642
dbg = @_dbg("_user_set_query_syncstring_change_after")
1643
cb() # return immediately -- stuff below can happen as side effect in the background.
1644
# Now do the following reactions to this syncstring change in the background:
1645
# 1. Awaken the relevant project.
1646
project_id = old_val?.project_id ? new_val?.project_id
1647
if project_id? and (new_val?.save?.state == 'requested' or (new_val?.last_active? and new_val?.last_active != old_val?.last_active))
1648
dbg("awakening project #{project_id}")
1649
awaken_project(@, project_id)
1650
1651
1652
# Verify that writing a patch is allowed.
1653
_user_set_query_patches_check: (obj, account_id, project_id, cb) =>
1654
# Reject any patch that is too new
1655
if obj.time - new Date() > MAX_PATCH_FUTURE_MS
1656
cb("clock") # this exact error is assumed in synctable!
1657
return
1658
# Write access
1659
@_syncstring_access_check(obj.string_id, account_id, project_id, cb)
1660
1661
# Verify that writing a patch is allowed.
1662
_user_get_query_patches_check: (obj, account_id, project_id, cb) =>
1663
# Write access (no notion of read only yet -- will be easy to add later)
1664
@_syncstring_access_check(obj.string_id, account_id, project_id, cb)
1665
1666
# Verify that writing a patch is allowed.
1667
_user_set_query_cursors_check: (obj, account_id, project_id, cb) =>
1668
@_syncstring_access_check(obj.string_id, account_id, project_id, cb)
1669
1670
# Verify that writing a patch is allowed.
1671
_user_get_query_cursors_check: (obj, account_id, project_id, cb) =>
1672
@_syncstring_access_check(obj.string_id, account_id, project_id, cb)
1673
1674
_syncstring_access_check: (string_id, account_id, project_id, cb) =>
1675
# Check that string_id is the id of a syncstring the given account_id or
1676
# project_id is allowed to write to. NOTE: We do not concern ourselves (for now at least)
1677
# with proof of identity (i.e., one user with full read/write access to a project
1678
# claiming they are another users of that SAME project), since our security model
1679
# is that any user of a project can edit anything there. In particular, the
1680
# synctable lets any user with write access to the project edit the users field.
1681
if string_id?.length != 40
1682
cb("FATAL: string_id (='#{string_id}') must be a string of length 40")
1683
return
1684
@_query
1685
query : "SELECT project_id FROM syncstrings"
1686
where : "string_id = $::CHAR(40)" : string_id
1687
cache : false # *MUST* leave as false (not true), since unfortunately, if this returns no, due to FATAL below this would break opening the file until cache clears.
1688
cb : one_result 'project_id', (err, x) =>
1689
if err
1690
cb(err)
1691
else if not x
1692
# There is no such syncstring with this id -- fail
1693
cb("FATAL: no such syncstring")
1694
else if account_id?
1695
# Attempt to read or write by a user browser client
1696
@_require_project_ids_in_groups(account_id, [x], ['owner', 'collaborator'], cb)
1697
else if project_id?
1698
# Attempt to read or write by a *project*
1699
if project_id == x
1700
cb()
1701
else
1702
cb("FATAL: project not allowed to write to syncstring in different project")
1703
1704
1705
# Check permissions for querying for syncstrings in a project
1706
_syncstrings_check: (obj, account_id, project_id, cb) =>
1707
#dbg = @dbg("_syncstrings_check")
1708
#dbg(misc.to_json([obj, account_id, project_id]))
1709
if not misc.is_valid_uuid_string(obj?.project_id)
1710
cb("FATAL: project_id (='#{obj?.project_id}') must be a valid uuid")
1711
return
1712
if project_id?
1713
if project_id == obj.project_id
1714
# The project can access its own syncstrings
1715
cb()
1716
else
1717
cb("FATAL: projects can only access their own syncstrings") # for now at least!
1718
return
1719
if account_id?
1720
# Access request by a client user
1721
@_require_project_ids_in_groups(account_id, [obj.project_id], ['owner', 'collaborator'], cb)
1722
else
1723
cb("FATAL: only users and projects can access syncstrings")
1724
1725
# Other functions that are needed to implement various use queries,
1726
# e.g., for virtual queries like file_use_times.
1727
# ASYNC FUNCTION with no callback.
1728
file_use_times: (opts) => # for docs, see where this is imported from.
1729
return await file_use_times(@, opts)
1730
1731
updateRetentionData: (opts) =>
1732
return await updateRetentionData(opts)
1733
1734
_last_awaken_time = {}
1735
awaken_project = (db, project_id, cb) ->
1736
# throttle so that this gets called *for a given project* at most once every 30s.
1737
now = new Date()
1738
if _last_awaken_time[project_id]? and now - _last_awaken_time[project_id] < 30000
1739
return
1740
_last_awaken_time[project_id] = now
1741
dbg = db._dbg("_awaken_project(project_id=#{project_id})")
1742
if not db.projectControl?
1743
dbg("skipping since no projectControl defined")
1744
return
1745
dbg("doing it...")
1746
async.series([
1747
(cb) ->
1748
try
1749
project = db.projectControl(project_id)
1750
await project.start()
1751
cb()
1752
catch err
1753
cb("error starting project = #{err}")
1754
(cb) ->
1755
if not db.ensure_connection_to_project?
1756
cb()
1757
return
1758
dbg("also make sure there is a connection from hub to project")
1759
# This is so the project can find out that the user wants to save a file (etc.)
1760
db.ensure_connection_to_project(project_id, cb)
1761
], (err) ->
1762
if err
1763
dbg("awaken project error -- #{err}")
1764
else
1765
dbg("success awakening project")
1766
cb?(err)
1767
)
1768
###
1769
Note about opts.changes.cb:
1770
1771
Regarding sync, what was happening I think is:
1772
- (a) https://github.com/sagemathinc/cocalc/blob/master/src/packages/hub/postgres-user-queries.coffee#L1384 starts sending changes
1773
- (b) https://github.com/sagemathinc/cocalc/blob/master/src/packages/hub/postgres-user-queries.coffee#L1393 sends the full table.
1774
1775
(a) could result in changes actually getting to the client before the table itself has been initialized. The client code assumes that it only gets changes *after* the table is initialized. The browser client seems to be smart enough that it detects this situation and resets itself, so the browser never gets messed up as a result.
1776
However, the project definitely does NOT do so well, and it can get messed up. Then it has a broken version of the table, missing some last minute change. It is broken until the project forgets about that table entirely, which is can be a pretty long time (or project restart).
1777
1778
My fix is to queue up those changes on the server, then only start sending them to the client **after** the (b) query is done. I tested this by using setTimeout to manually delay (b) for a few seconds, and fully seeing the "file won't save problem". The other approach would make it so clients are more robust against getting changes first. However, it would take a long time for all clients to update (restart all projects), and it's an annoying assumption to make in general -- we may have entirely new clients later and they could make the same bad assumptions about order...
1779
###
1780
1781