CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres-base.coffee
Views: 687
1
#########################################################################
2
# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
# License: MS-RSL – see LICENSE.md for details
4
#########################################################################
5
6
# PostgreSQL -- basic queries and database interface
7
8
exports.DEBUG = true
9
10
# If database connection is non-responsive but no error raised directly
11
# by db client, then we will know and fix, rather than just sitting there...
12
DEFAULT_TIMEOUS_MS = 60000
13
14
# Do not test for non-responsiveness until a while after initial connection
15
# established, since things tend to work initially, *but* may also be much
16
# slower, due to tons of clients simultaneously connecting to DB.
17
DEFAULT_TIMEOUT_DELAY_MS = DEFAULT_TIMEOUS_MS * 4
18
19
QUERY_ALERT_THRESH_MS=5000
20
21
consts = require('./consts')
22
DEFAULT_STATEMENT_TIMEOUT_MS = consts.STATEMENT_TIMEOUT_MS
23
24
EventEmitter = require('events')
25
26
fs = require('fs')
27
async = require('async')
28
escapeString = require('sql-string-escape')
29
validator = require('validator')
30
{callback2} = require('@cocalc/util/async-utils')
31
32
LRU = require('lru-cache')
33
34
pg = require('pg')
35
36
winston = require('@cocalc/backend/logger').getLogger('postgres')
37
{do_query_with_pg_params} = require('./postgres/set-pg-params')
38
39
{ syncSchema } = require('./postgres/schema')
40
{ pgType } = require('./postgres/schema/pg-type')
41
{ quoteField } = require('./postgres/schema/util')
42
{ primaryKey, primaryKeys } = require('./postgres/schema/table')
43
44
misc_node = require('@cocalc/backend/misc_node')
45
{ sslConfigToPsqlEnv, pghost, pgdatabase, pguser, pgssl } = require("@cocalc/backend/data")
46
47
48
{defaults} = misc = require('@cocalc/util/misc')
49
required = defaults.required
50
51
{SCHEMA, client_db} = require('@cocalc/util/schema')
52
53
metrics = require('@cocalc/backend/metrics')
54
55
exports.PUBLIC_PROJECT_COLUMNS = ['project_id', 'last_edited', 'title', 'description', 'deleted', 'created', 'env']
56
exports.PROJECT_COLUMNS = ['users'].concat(exports.PUBLIC_PROJECT_COLUMNS)
57
58
dbPassword = require('@cocalc/database/pool/password').default;
59
60
class exports.PostgreSQL extends EventEmitter # emits a 'connect' event whenever we successfully connect to the database and 'disconnect' when connection to postgres fails
61
constructor: (opts) ->
62
63
super()
64
opts = defaults opts,
65
host : pghost # DEPRECATED: or 'hostname:port' or 'host1,host2,...' (multiple hosts) -- TODO -- :port only works for one host.
66
database : pgdatabase
67
user : pguser
68
ssl : pgssl
69
debug : exports.DEBUG
70
connect : true
71
password : undefined
72
cache_expiry : 5000 # expire cached queries after this many milliseconds
73
# keep this very short; it's just meant to reduce impact of a bunch of
74
# identical permission checks in a single user query.
75
cache_size : 300 # cache this many queries; use @_query(cache:true, ...) to cache result
76
concurrent_warn : 500
77
concurrent_heavily_loaded : 70 # when concurrent hits this, consider load "heavy"; this changes home some queries behave to be faster but provide less info
78
ensure_exists : true # ensure database exists on startup (runs psql in a shell)
79
timeout_ms : DEFAULT_TIMEOUS_MS # **IMPORTANT: if *any* query takes this long, entire connection is terminated and recreated!**
80
timeout_delay_ms : DEFAULT_TIMEOUT_DELAY_MS # Only reconnect on timeout this many ms after connect. Motivation: on initial startup queries may take much longer due to competition with other clients.
81
@setMaxListeners(0) # because of a potentially large number of changefeeds
82
@_state = 'init'
83
@_debug = opts.debug
84
@_timeout_ms = opts.timeout_ms
85
@_timeout_delay_ms = opts.timeout_delay_ms
86
@_ensure_exists = opts.ensure_exists
87
@_init_test_query()
88
dbg = @_dbg("constructor") # must be after setting @_debug above
89
dbg(opts)
90
i = opts.host.indexOf(':')
91
if i != -1
92
@_host = opts.host.slice(0, i)
93
@_port = parseInt(opts.host.slice(i+1))
94
else
95
@_host = opts.host
96
@_port = 5432
97
@_concurrent_warn = opts.concurrent_warn
98
@_concurrent_heavily_loaded = opts.concurrent_heavily_loaded
99
@_user = opts.user
100
@_database = opts.database
101
@_ssl = opts.ssl
102
@_password = opts.password ? dbPassword()
103
@_init_metrics()
104
105
if opts.cache_expiry and opts.cache_size
106
@_query_cache = new LRU({max:opts.cache_size, ttl: opts.cache_expiry})
107
if opts.connect
108
@connect() # start trying to connect
109
110
clear_cache: =>
111
@_query_cache?.reset()
112
113
close: =>
114
if @_state == 'closed'
115
return # nothing to do
116
@_close_test_query()
117
@_state = 'closed'
118
@emit('close')
119
@removeAllListeners()
120
if @_clients?
121
for client in @_clients
122
client.removeAllListeners()
123
client.end()
124
delete @_clients
125
126
###
127
If @_timeout_ms is set, then we periodically do a simple test query,
128
to ensure that the database connection is working and responding to queries.
129
If the query below times out, then the connection will get recreated.
130
###
131
_do_test_query: =>
132
dbg = @_dbg('test_query')
133
dbg('starting')
134
@_query
135
query : 'SELECT NOW()'
136
cb : (err, result) =>
137
dbg("finished", err, result)
138
139
_init_test_query: =>
140
if not @_timeout_ms
141
return
142
@_test_query = setInterval(@_do_test_query, @_timeout_ms)
143
144
_close_test_query: =>
145
if @_test_query?
146
clearInterval(@_test_query)
147
delete @_test_query
148
149
engine: -> 'postgresql'
150
151
connect: (opts) =>
152
opts = defaults opts,
153
max_time : undefined # set to something shorter to not try forever
154
# Only first max_time is used.
155
cb : undefined
156
if @_state == 'closed'
157
opts.cb?("closed")
158
return
159
dbg = @_dbg("connect")
160
if @_clients?
161
dbg("already connected")
162
opts.cb?()
163
return
164
if @_connecting?
165
dbg('already trying to connect')
166
@_connecting.push(opts.cb)
167
# keep several times the db-concurrent-warn limit of callbacks
168
max_connecting = 5 * @_concurrent_warn
169
while @_connecting.length > max_connecting
170
@_connecting.shift()
171
dbg("WARNING: still no DB available, dropping old callbacks (limit: #{max_connecting})")
172
return
173
dbg('will try to connect')
174
@_state = 'init'
175
if opts.max_time
176
dbg("for up to #{opts.max_time}ms")
177
else
178
dbg("until successful")
179
@_connecting = [opts.cb]
180
misc.retry_until_success
181
f : @_connect
182
max_delay : 10000
183
max_time : opts.max_time
184
start_delay : 500 + 500*Math.random()
185
log : dbg
186
cb : (err) =>
187
v = @_connecting
188
delete @_connecting
189
for cb in v
190
cb?(err)
191
if not err
192
@_state = 'connected'
193
@emit('connect')
194
195
disconnect: () =>
196
if @_clients?
197
for client in @_clients
198
client.end()
199
client.removeAllListeners()
200
delete @_clients
201
202
is_connected: () =>
203
return @_clients? and @_clients.length > 0
204
205
_connect: (cb) =>
206
dbg = @_dbg("_connect")
207
dbg("connect to #{@_host}")
208
@_clear_listening_state() # definitely not listening
209
if @_clients?
210
@disconnect()
211
locals =
212
clients : []
213
hosts : []
214
@_connect_time = 0
215
@_concurrent_queries = 0 # can't be any going on now.
216
async.series([
217
(cb) =>
218
if @_ensure_exists
219
dbg("first make sure db exists")
220
@_ensure_database_exists(cb)
221
else
222
dbg("assuming database exists")
223
cb()
224
(cb) =>
225
if not @_host # undefined if @_host=''
226
locals.hosts = [undefined]
227
cb()
228
return
229
if @_host.indexOf('/') != -1
230
dbg("using a local socket file (not a hostname)")
231
locals.hosts = [@_host]
232
cb()
233
return
234
f = (host, cb) =>
235
hostname = host.split(':')[0]
236
winston.debug("Looking up ip addresses of #{hostname}")
237
require('dns').lookup hostname, {all:true}, (err, ips) =>
238
if err
239
winston.debug("Got #{hostname} --> err=#{err}")
240
# NON-FATAL -- we just don't include these and hope to
241
# have at least one total working host...
242
cb()
243
else
244
winston.debug("Got #{hostname} --> #{JSON.stringify(ips)}")
245
# In kubernetes the stateful set service just has
246
# lots of ip address. We connect to *all* of them,
247
# and spread queries across them equally.
248
for x in ips
249
locals.hosts.push(x.address)
250
cb()
251
async.map(@_host.split(','), f, (err) => cb(err))
252
(cb) =>
253
dbg("connecting to #{JSON.stringify(locals.hosts)}...")
254
if locals.hosts.length == 0
255
dbg("locals.hosts has length 0 -- no available db")
256
cb("no databases available")
257
return
258
259
dbg("create client and start connecting...")
260
locals.clients = []
261
262
# Use a function to initialize the client, to avoid any issues with scope of "client" below.
263
# Ref: https://node-postgres.com/apis/client
264
init_client = (host) =>
265
client = new pg.Client
266
user : @_user
267
host : host
268
port : @_port
269
password : @_password
270
database : @_database
271
ssl : @_ssl
272
statement_timeout: DEFAULT_STATEMENT_TIMEOUT_MS # we set a statement_timeout, to avoid queries locking up PG
273
if @_notification?
274
client.on('notification', @_notification)
275
onError = (err) =>
276
# only listen once for error; after that we've
277
# killed connection and don't care.
278
client.removeListener('error', onError)
279
if @_state == 'init'
280
# already started connecting
281
return
282
@emit('disconnect')
283
dbg("error -- #{err}")
284
@disconnect()
285
@connect() # start trying to reconnect
286
client.on('error', onError)
287
client.setMaxListeners(0) # there is one emitter for each concurrent query... (see query_cb)
288
locals.clients.push(client)
289
290
for host in locals.hosts
291
init_client(host)
292
293
# Connect the clients. If at least one succeeds, we use this.
294
# If none succeed, we declare failure.
295
# Obviously, this is NOT optimal -- it's just hopefully sufficiently robust/works.
296
# I'm going to redo this with experience.
297
locals.clients_that_worked = []
298
locals.errors = []
299
f = (client, c) =>
300
try
301
await client.connect()
302
locals.clients_that_worked.push(client)
303
catch err
304
locals.errors.push(err)
305
c()
306
async.map locals.clients, f, () =>
307
if locals.clients_that_worked.length == 0
308
console.warn("ALL clients failed", locals.errors)
309
dbg("ALL clients failed", locals.errors)
310
cb("ALL clients failed to connect")
311
else
312
# take what we got
313
if locals.clients.length == locals.clients_that_worked.length
314
dbg("ALL clients worked")
315
else
316
dbg("ONLY #{locals.clients_that_worked.length} clients worked")
317
locals.clients = locals.clients_that_worked
318
dbg("cb = ", cb)
319
cb()
320
321
(cb) =>
322
@_connect_time = new Date()
323
locals.i = 0
324
325
# Weird and unfortunate fact -- this query can and does **HANG** never returning
326
# in some edge cases. That's why we have to be paranoid about this entire _connect
327
# function...
328
f = (client, cb) =>
329
it_hung = =>
330
cb?("hung")
331
cb = undefined
332
timeout = setTimeout(it_hung, 15000)
333
dbg("now connected; checking if we can actually query the DB via client #{locals.i}")
334
locals.i += 1
335
client.query "SELECT NOW()", (err) =>
336
clearTimeout(timeout)
337
cb?(err)
338
async.map(locals.clients, f, cb)
339
(cb) =>
340
dbg("checking if ANY db server is in recovery, i.e., we are doing standby queries only")
341
@is_standby = false
342
f = (client, cb) =>
343
# Is this a read/write or read-only connection?
344
client.query "SELECT pg_is_in_recovery()", (err, resp) =>
345
if err
346
cb(err)
347
else
348
# True if ANY db connection is read only.
349
if resp.rows[0].pg_is_in_recovery
350
@is_standby = true
351
cb()
352
async.map(locals.clients, f, cb)
353
], (err) =>
354
if err
355
mesg = "Failed to connect to database -- #{err}"
356
dbg(mesg)
357
console.warn(mesg) # make it clear for interactive users with debugging off -- common mistake with env not setup right.
358
cb?(err)
359
else
360
@_clients = locals.clients
361
@_concurrent_queries = 0
362
dbg("connected!")
363
cb?(undefined, @)
364
)
365
366
# Return a native pg client connection. This will
367
# round robbin through all connections. It returns
368
# undefined if there are no connections.
369
_client: =>
370
if not @_clients?
371
return
372
if @_clients.length <= 1
373
return @_clients[0]
374
@_client_index ?= -1
375
@_client_index = @_client_index + 1
376
if @_client_index >= @_clients.length
377
@_client_index = 0
378
return @_clients[@_client_index]
379
380
# Return query function of a database connection.
381
get_db_query: =>
382
db = @_client()
383
return db?.query.bind(db)
384
385
_dbg: (f) =>
386
if @_debug
387
return (m) => winston.debug("PostgreSQL.#{f}: #{misc.trunc_middle(JSON.stringify(m), 250)}")
388
else
389
return ->
390
391
_init_metrics: =>
392
# initialize metrics
393
try
394
@query_time_histogram = metrics.newHistogram('db', 'query_ms_histogram', 'db queries'
395
buckets : [1, 5, 10, 20, 50, 100, 200, 500, 1000, 5000, 10000]
396
labels: ['table']
397
)
398
@concurrent_counter = metrics.newCounter('db', 'concurrent_total',
399
'Concurrent queries (started and finished)',
400
['state']
401
)
402
catch err
403
@_dbg("_init_metrics")("WARNING -- #{err}")
404
405
async_query: (opts) =>
406
return await callback2(@_query.bind(@), opts)
407
408
_query: (opts) =>
409
opts = defaults opts,
410
query : undefined # can give select and table instead
411
select : undefined # if given, should be string or array of column names -| can give these
412
table : undefined # if given, name of table -| two instead of query
413
params : []
414
cache : false # Will cache results for a few seconds or use cache. Use this
415
# when speed is very important, and results that are a few seconds
416
# out of date are fine.
417
where : undefined # Used for SELECT: If given, can be
418
# - a map with keys clauses with $::TYPE (not $1::TYPE!) and values
419
# the corresponding params. Also, WHERE must not be in the query already.
420
# If where[cond] is undefined, then cond is completely **ignored**.
421
# - a string, which is inserted as is as a normal WHERE condition.
422
# - an array of maps or strings.
423
set : undefined # Appends a SET clause to the query; same format as values.
424
values : undefined # Used for INSERT: If given, then params and where must not be given. Values is a map
425
# {'field1::type1':value, , 'field2::type2':value2, ...} which gets converted to
426
# ' (field1, field2, ...) VALUES ($1::type1, $2::type2, ...) '
427
# with corresponding params set. Undefined valued fields are ignored and types may
428
# be omitted. Javascript null is not ignored and converts to PostgreSQL NULL.
429
conflict : undefined # If given, then values must also be given; appends this to query:
430
# ON CONFLICT (name) DO UPDATE SET value=EXCLUDED.value'
431
# Or, if conflict starts with "ON CONFLICT", then just include as is, e.g.,
432
# "ON CONFLICT DO NOTHING"
433
jsonb_set : undefined # Used for setting a field that contains a JSONB javascript map.
434
# NOTE: This does some merging! If you just want to replace the whole thing use the normal set above.
435
# Give as input an object
436
#
437
# { field1:{key1:val1, key2:val2, ...}, field2:{key3:val3,...}, ...}
438
#
439
# In each field, every key has the corresponding value set, unless val is undefined/null, in which
440
# case that key is deleted from the JSONB object fieldi. Simple as that! This is much, much
441
# cleaner to use than SQL. Also, if the value in field itself is NULL, it gets
442
# created automatically.
443
jsonb_merge : undefined # Exactly like jsonb_set, but when val1 (say) is an object, it merges that object in,
444
# *instead of* setting field1[key1]=val1. So after this field1[key1] has what was in it
445
# and also what is in val1. Obviously field1[key1] had better have been an array or NULL.
446
order_by : undefined
447
limit : undefined
448
offset : undefined
449
safety_check: true
450
retry_until_success : undefined # if given, should be options to misc.retry_until_success
451
pg_params : undefined # key/value map of postgres parameters, which will be set for the query in a single transaction
452
timeout_s : undefined # by default, there is a "statement_timeout" set. set to 0 to disable or a number in seconds
453
cb : undefined
454
455
# quick check for write query against read-only connection
456
if @is_standby and (opts.set? or opts.jsonb_set? or opts.jsonb_merge?)
457
opts.cb?("set queries against standby not allowed")
458
return
459
460
if opts.retry_until_success
461
@_query_retry_until_success(opts)
462
return
463
464
if not @is_connected()
465
dbg = @_dbg("_query")
466
dbg("connecting first...")
467
# 2022-06: below there was {max_time: 45000} set with the note
468
# "don't try forever; queries could pile up."
469
# but I think this is rather harmful, since the hub could stop
470
# trying to connect to the database altogether.
471
# Rather, hub/health-checks::checkDBConnectivity will
472
# mark the hub as being bad if it can't connect to the database.
473
@connect
474
cb : (err) =>
475
if err
476
dbg("FAILED to connect -- #{err}")
477
opts.cb?("database is down (please try later)")
478
else
479
dbg("connected, now doing query")
480
@__do_query(opts)
481
else
482
@__do_query(opts)
483
484
_query_retry_until_success: (opts) =>
485
retry_opts = opts.retry_until_success
486
orig_cb = opts.cb
487
delete opts.retry_until_success
488
489
# f just calls @_do_query, but with a different cb (same opts)
490
args = undefined
491
f = (cb) =>
492
opts.cb = (args0...) =>
493
args = args0
494
cb(args[0])
495
@_query(opts)
496
497
retry_opts.f = f
498
# When misc.retry_until_success finishes, it calls this, which just
499
# calls the original cb.
500
retry_opts.cb = (err) =>
501
if err
502
orig_cb?(err)
503
else
504
orig_cb?(args...)
505
506
# OK, now start it attempting.
507
misc.retry_until_success(retry_opts)
508
509
__do_query: (opts) =>
510
dbg = @_dbg("__do_query('#{misc.trunc(opts.query?.replace(/\n/g, " "),250)}',id='#{misc.uuid().slice(0,6)}')")
511
if not @is_connected()
512
# TODO: should also check that client is connected.
513
opts.cb?("client not yet initialized")
514
return
515
if opts.params? and not misc.is_array(opts.params)
516
opts.cb?("params must be an array")
517
return
518
if not opts.query?
519
if not opts.table?
520
opts.cb?("if query not given, then table must be given")
521
return
522
if not opts.select?
523
opts.select = '*'
524
if misc.is_array(opts.select)
525
opts.select = (quote_field(field) for field in opts.select).join(',')
526
opts.query = "SELECT #{opts.select} FROM \"#{opts.table}\""
527
delete opts.select
528
529
push_param = (param, type) ->
530
if type?.toUpperCase() == 'JSONB'
531
param = misc.to_json(param) # I don't understand why this is needed by the driver....
532
opts.params.push(param)
533
return opts.params.length
534
535
if opts.jsonb_merge?
536
if opts.jsonb_set?
537
opts.cb?("if jsonb_merge is set then jsonb_set must not be set")
538
return
539
opts.jsonb_set = opts.jsonb_merge
540
541
SET = []
542
if opts.jsonb_set?
543
# This little piece of very hard to write (and clever?) code
544
# makes it so we can set or **merge in at any nested level** (!)
545
# arbitrary JSON objects. We can also delete any key at any
546
# level by making the value null or undefined! This is amazingly
547
# easy to use in queries -- basically making JSONP with postgres
548
# as expressive as RethinkDB REQL (even better in some ways).
549
set = (field, data, path) =>
550
obj = "COALESCE(#{field}#>'{#{path.join(',')}}', '{}'::JSONB)"
551
for key, val of data
552
if not val?
553
# remove key from object
554
obj = "(#{obj} - '#{key}')"
555
else
556
if opts.jsonb_merge? and (typeof(val) == 'object' and not misc.is_date(val))
557
subobj = set(field, val, path.concat([key]))
558
obj = "JSONB_SET(#{obj}, '{#{key}}', #{subobj})"
559
else
560
# completely replace field[key] with val.
561
obj = "JSONB_SET(#{obj}, '{#{key}}', $#{push_param(val, 'JSONB')}::JSONB)"
562
return obj
563
v = ("#{field}=#{set(field, data, [])}" for field, data of opts.jsonb_set)
564
SET.push(v...)
565
566
if opts.values?
567
#dbg("values = #{misc.to_json(opts.values)}")
568
if opts.where?
569
opts.cb?("where must not be defined if opts.values is defined")
570
return
571
572
if misc.is_array(opts.values)
573
# An array of numerous separate object that we will insert all at once.
574
# Determine the fields, which as the union of the keys of all values.
575
fields = {}
576
for x in opts.values
577
if not misc.is_object(x)
578
opts.cb?("if values is an array, every entry must be an object")
579
return
580
for k, p of x
581
fields[k] = true
582
# convert to array
583
fields = misc.keys(fields)
584
fields_to_index = {}
585
n = 0
586
for field in fields
587
fields_to_index[field] = n
588
n += 1
589
values = []
590
for x in opts.values
591
value = []
592
for field, param of x
593
if field.indexOf('::') != -1
594
[field, type] = field.split('::')
595
type = type.trim()
596
y = "$#{push_param(param, type)}::#{type}"
597
else
598
y = "$#{push_param(param)}"
599
value[fields_to_index[field]] = y
600
values.push(value)
601
else
602
# A single entry that we'll insert.
603
604
fields = []
605
values = []
606
for field, param of opts.values
607
if param == undefined
608
# ignore undefined fields -- makes code cleaner (and makes sense)
609
continue
610
if field.indexOf('::') != -1
611
[field, type] = field.split('::')
612
fields.push(quote_field(field.trim()))
613
type = type.trim()
614
values.push("$#{push_param(param, type)}::#{type}")
615
continue
616
else
617
fields.push(quote_field(field))
618
values.push("$#{push_param(param)}")
619
values = [values] # just one
620
621
if values.length > 0
622
opts.query += " (#{(quote_field(field) for field in fields).join(',')}) VALUES " + (" (#{value.join(',')}) " for value in values).join(',')
623
624
if opts.set?
625
v = []
626
for field, param of opts.set
627
if field.indexOf('::') != -1
628
[field, type] = field.split('::')
629
type = type.trim()
630
v.push("#{quote_field(field.trim())}=$#{push_param(param, type)}::#{type}")
631
continue
632
else
633
v.push("#{quote_field(field.trim())}=$#{push_param(param)}")
634
if v.length > 0
635
SET.push(v...)
636
637
if opts.conflict?
638
if misc.is_string(opts.conflict) and misc.startswith(opts.conflict.toLowerCase().trim(), 'on conflict')
639
# Straight string inclusion
640
opts.query += ' ' + opts.conflict + ' '
641
else
642
if not opts.values?
643
opts.cb?("if conflict is specified then values must also be specified")
644
return
645
if not misc.is_array(opts.conflict)
646
if typeof(opts.conflict) != 'string'
647
opts.cb?("conflict (='#{misc.to_json(opts.conflict)}') must be a string (the field name), for now")
648
return
649
else
650
conflict = [opts.conflict]
651
else
652
conflict = opts.conflict
653
v = ("#{quote_field(field)}=EXCLUDED.#{field}" for field in fields when field not in conflict)
654
SET.push(v...)
655
if SET.length == 0
656
opts.query += " ON CONFLICT (#{conflict.join(',')}) DO NOTHING "
657
else
658
opts.query += " ON CONFLICT (#{conflict.join(',')}) DO UPDATE "
659
660
if SET.length > 0
661
opts.query += " SET " + SET.join(' , ')
662
663
WHERE = []
664
push_where = (x) =>
665
if typeof(x) == 'string'
666
WHERE.push(x)
667
else if misc.is_array(x)
668
for v in x
669
push_where(v)
670
else if misc.is_object(x)
671
for cond, param of x
672
if typeof(cond) != 'string'
673
opts.cb?("each condition must be a string but '#{cond}' isn't")
674
return
675
if not param? # *IGNORE* where conditions where value is explicitly undefined
676
continue
677
if cond.indexOf('$') == -1
678
# where condition is missing it's $ parameter -- default to equality
679
cond += " = $"
680
WHERE.push(cond.replace('$', "$#{push_param(param)}"))
681
682
if opts.where?
683
push_where(opts.where)
684
685
if WHERE.length > 0
686
if opts.values?
687
opts.cb?("values must not be given if where clause given")
688
return
689
opts.query += " WHERE #{WHERE.join(' AND ')}"
690
691
if opts.order_by?
692
if opts.order_by.indexOf("'") >= 0
693
err = "ERROR -- detected ' apostrophe in order_by='#{opts.order_by}'"
694
dbg(err)
695
opts.cb?(err)
696
return
697
opts.query += " ORDER BY #{opts.order_by}"
698
699
if opts.limit?
700
if not validator.isInt('' + opts.limit, min:0)
701
err = "ERROR -- opts.limit = '#{opts.limit}' is not an integer"
702
dbg(err)
703
opts.cb?(err)
704
return
705
opts.query += " LIMIT #{opts.limit} "
706
707
if opts.offset?
708
if not validator.isInt('' + opts.offset, min:0)
709
err = "ERROR -- opts.offset = '#{opts.offset}' is not an integer"
710
dbg(err)
711
opts.cb?(err)
712
return
713
opts.query += " OFFSET #{opts.offset} "
714
715
716
717
if opts.safety_check
718
safety_check = opts.query.toLowerCase().trim()
719
if (safety_check.startsWith('update') or safety_check.startsWith('delete')) and (safety_check.indexOf('where') == -1 and safety_check.indexOf('trigger') == -1 and safety_check.indexOf('insert') == -1 and safety_check.indexOf('create') == -1)
720
# This is always a bug.
721
err = "ERROR -- Dangerous UPDATE or DELETE without a WHERE, TRIGGER, or INSERT: query='#{opts.query}'"
722
dbg(err)
723
opts.cb?(err)
724
return
725
726
if opts.cache and @_query_cache?
727
# check for cached result
728
full_query_string = JSON.stringify([opts.query, opts.params])
729
if (x = @_query_cache.get(full_query_string))?
730
dbg("using cache for '#{opts.query}'")
731
opts.cb?(x...)
732
return
733
734
# params can easily be huge, e.g., a blob. But this may be
735
# needed at some point for debugging.
736
#dbg("query='#{opts.query}', params=#{misc.to_json(opts.params)}")
737
client = @_client()
738
if not client?
739
opts.cb?("not connected")
740
return
741
@_concurrent_queries ?= 0
742
@_concurrent_queries += 1
743
dbg("query='#{opts.query} (concurrent=#{@_concurrent_queries})'")
744
745
@concurrent_counter?.labels('started').inc(1)
746
try
747
start = new Date()
748
if @_timeout_ms and @_timeout_delay_ms
749
# Create a timer, so that if the query doesn't return within
750
# timeout_ms time, then the entire connection is destroyed.
751
# It then gets recreated automatically. I tested
752
# and all outstanding queries also get an error when this happens.
753
timeout_error = =>
754
# Only disconnect with timeout error if it has been sufficiently long
755
# since connecting. This way when an error is triggered, all the
756
# outstanding timers at the moment of the error will just get ignored
757
# when they fire (since @_connect_time is 0 or too recent).
758
if @_connect_time and new Date() - @_connect_time > @_timeout_delay_ms
759
client.emit('error', 'timeout')
760
timer = setTimeout(timeout_error, @_timeout_ms)
761
762
# PAINFUL FACT: In client.query below, if the client is closed/killed/errored
763
# (especially via client.emit above), then none of the callbacks from
764
# client.query are called!
765
finished = false
766
error_listener = ->
767
dbg("error_listener fired")
768
query_cb('error')
769
client.once('error', error_listener)
770
query_cb = (err, result) =>
771
if finished # ensure no matter what that query_cb is called at most once.
772
dbg("called when finished (ignoring)")
773
return
774
finished = true
775
client.removeListener('error', error_listener)
776
777
if @_timeout_ms
778
clearTimeout(timer)
779
query_time_ms = new Date() - start
780
@_concurrent_queries -= 1
781
@query_time_histogram?.observe({table:opts.table ? ''}, query_time_ms)
782
@concurrent_counter?.labels('ended').inc(1)
783
if err
784
dbg("done (concurrent=#{@_concurrent_queries}), (query_time_ms=#{query_time_ms}) -- error: #{err}")
785
## DANGER
786
# Only uncomment this for low level debugging!
787
#### dbg("params = #{JSON.stringify(opts.params)}")
788
##
789
err = 'postgresql ' + err
790
else
791
dbg("done (concurrent=#{@_concurrent_queries}) (query_time_ms=#{query_time_ms}) -- success")
792
if opts.cache and @_query_cache?
793
@_query_cache.set(full_query_string, [err, result])
794
opts.cb?(err, result)
795
if query_time_ms >= QUERY_ALERT_THRESH_MS
796
dbg("QUERY_ALERT_THRESH: query_time_ms=#{query_time_ms}\nQUERY_ALERT_THRESH: query='#{opts.query}'\nQUERY_ALERT_THRESH: params='#{misc.to_json(opts.params)}'")
797
798
# set a timeout for one specific query (there is a default when creating the pg.Client, see @_connect)
799
if opts.timeout_s? and typeof opts.timeout_s == 'number' and opts.timeout_s >= 0
800
dbg("set query timeout to #{opts.timeout_s}secs")
801
opts.pg_params ?= {}
802
# the actual param is in milliseconds
803
# https://postgresqlco.nf/en/doc/param/statement_timeout/
804
opts.pg_params.statement_timeout = 1000 * opts.timeout_s
805
806
if opts.pg_params?
807
dbg("run query with specific postgres parameters in a transaction")
808
do_query_with_pg_params(client: client, query: opts.query, params: opts.params, pg_params:opts.pg_params, cb: query_cb)
809
else
810
client.query(opts.query, opts.params, query_cb)
811
812
catch e
813
# this should never ever happen
814
dbg("EXCEPTION in client.query: #{e}")
815
opts.cb?(e)
816
@_concurrent_queries -= 1
817
@concurrent_counter?.labels('ended').inc(1)
818
return
819
820
# Special case of query for counting entries in a table.
821
_count: (opts) =>
822
opts = defaults opts,
823
table : required
824
where : undefined # as in _query
825
cb : required
826
@_query
827
query : "SELECT COUNT(*) FROM #{opts.table}"
828
where : opts.where
829
cb : count_result(opts.cb)
830
831
_validate_opts: (opts) =>
832
for k, v of opts
833
if k.slice(k.length-2) == 'id'
834
if v? and not misc.is_valid_uuid_string(v)
835
opts.cb?("invalid #{k} -- #{v}")
836
return false
837
if k.slice(k.length-3) == 'ids'
838
for w in v
839
if not misc.is_valid_uuid_string(w)
840
opts.cb?("invalid uuid #{w} in #{k} -- #{misc.to_json(v)}")
841
return false
842
if k == 'group' and v not in misc.PROJECT_GROUPS
843
opts.cb?("unknown project group '#{v}'"); return false
844
if k == 'groups'
845
for w in v
846
if w not in misc.PROJECT_GROUPS
847
opts.cb?("unknown project group '#{w}' in groups"); return false
848
return true
849
850
_ensure_database_exists: (cb) =>
851
dbg = @_dbg("_ensure_database_exists")
852
dbg("ensure database '#{@_database}' exists")
853
args = ['--user', @_user, '--host', @_host.split(',')[0], '--port', @_port, '--list', '--tuples-only']
854
sslEnv = sslConfigToPsqlEnv(@_ssl)
855
dbg("psql #{args.join(' ')}")
856
misc_node.execute_code
857
command : 'psql'
858
args : args
859
env : Object.assign sslEnv,
860
PGPASSWORD : @_password
861
cb : (err, output) =>
862
if err
863
cb(err)
864
return
865
databases = (x.split('|')[0].trim() for x in output.stdout.split('\n') when x)
866
if @_database in databases
867
dbg("database '#{@_database}' already exists")
868
cb()
869
return
870
dbg("creating database '#{@_database}'")
871
misc_node.execute_code
872
command : 'createdb'
873
args : ['--host', @_host, '--port', @_port, @_database]
874
env :
875
PGPASSWORD : @_password
876
cb : cb
877
878
_confirm_delete: (opts) =>
879
opts = defaults opts,
880
confirm : 'no'
881
cb : required
882
dbg = @_dbg("confirm")
883
if opts.confirm != 'yes'
884
err = "Really delete all data? -- you must explicitly pass in confirm='yes' (but confirm:'#{opts.confirm}')"
885
dbg(err)
886
opts.cb(err)
887
return false
888
else
889
return true
890
891
set_random_password: (opts) =>
892
throw Error("NotImplementedError")
893
894
# This will fail if any other clients have db open.
895
# This function is very important for automated testing.
896
delete_entire_database: (opts) =>
897
dbg = @_dbg("delete_entire_database")
898
dbg("deleting database '#{@_database}'")
899
if not @_confirm_delete(opts)
900
dbg("failed confirmation")
901
return
902
async.series([
903
(cb) =>
904
dbg("disconnect from db")
905
@disconnect()
906
cb()
907
(cb) =>
908
misc_node.execute_code
909
command : 'dropdb'
910
args : ['--host', @_host, '--port', @_port, @_database]
911
cb : cb
912
], opts.cb)
913
914
# Deletes all the contents of the tables in the database. It doesn't
915
# delete anything about the schema itself: indexes or tables.
916
delete_all: (opts) =>
917
dbg = @_dbg("delete_all")
918
dbg("deleting all contents of tables in '#{@_database}'")
919
if not @_confirm_delete(opts)
920
return
921
922
# If the cache is enabled, be sure to also clear it.
923
@clear_cache()
924
925
tables = undefined
926
927
# Delete anything cached in the db object. Obviously, not putting something here
928
# is a natural place in which to cause bugs... but they will probably all be bugs
929
# of the form "the test suite fails", so we'll find them.
930
delete @_stats_cached
931
932
# Actually delete tables
933
async.series([
934
(cb) =>
935
@_get_tables (err, t) =>
936
tables = t; cb(err)
937
(cb) =>
938
f = (table, cb) =>
939
@_query
940
query : "DELETE FROM #{table}"
941
safety_check : false
942
cb : cb
943
async.map(tables, f, cb)
944
], opts.cb)
945
946
# return list of tables in the database
947
_get_tables: (cb) =>
948
@_query
949
query : "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"
950
cb : (err, result) =>
951
if err
952
cb(err)
953
else
954
cb(undefined, (row.table_name for row in result.rows))
955
956
# Return list of columns in a given table
957
_get_columns: (table, cb) =>
958
@_query
959
query : "SELECT column_name FROM information_schema.columns"
960
where :
961
"table_name = $::text" : table
962
cb : (err, result) =>
963
if err
964
cb(err)
965
else
966
cb(undefined, (row.column_name for row in result.rows))
967
968
_primary_keys: (table) =>
969
return primaryKeys(table)
970
971
# Return *the* primary key, assuming unique; otherwise raise an exception.
972
_primary_key: (table) =>
973
return primaryKey(table)
974
975
_throttle: (name, time_s, key...) =>
976
key = misc.to_json(key)
977
x = "_throttle_#{name}"
978
@[x] ?= {}
979
if @[x][key]
980
return true
981
@[x][key] = true
982
setTimeout((()=>delete @[x]?[key]), time_s*1000)
983
return false
984
985
# Ensure that the actual schema in the database matches the one defined in SCHEMA.
986
# This creates the initial schema, adds new columns, and in a VERY LIMITED
987
# range of cases, *might be* be able to change the data type of a column.
988
update_schema: (opts) =>
989
try
990
await syncSchema(SCHEMA);
991
opts.cb?()
992
catch err
993
opts.cb?(err)
994
995
# Return the number of outstanding concurrent queries.
996
concurrent: =>
997
return @_concurrent_queries ? 0
998
999
is_heavily_loaded: =>
1000
return @_concurrent_queries >= @_concurrent_heavily_loaded
1001
1002
# Compute the sha1 hash (in hex) of the input arguments, which are
1003
# converted to strings (via json) if they are not strings, then concatenated.
1004
# This is used for computing compound primary keys in a way that is relatively
1005
# safe, and in situations where if there were a highly unlikely collision, it
1006
# wouldn't be the end of the world. There is a similar client-only slower version
1007
# of this function (in schema.coffee), so don't change it willy nilly.
1008
sha1: (args...) ->
1009
v = ((if typeof(x) == 'string' then x else JSON.stringify(x)) for x in args).join('')
1010
return misc_node.sha1(v)
1011
1012
# Go through every table in the schema with a column called "expire", and
1013
# delete every entry where expire is <= right now.
1014
# Note: this ignores those rows, where expire is NULL, because comparisons with NULL are NULL
1015
delete_expired: (opts) =>
1016
opts = defaults opts,
1017
count_only : false # if true, only count the number of rows that would be deleted
1018
table : undefined # only delete from this table
1019
cb : required
1020
dbg = @_dbg("delete_expired(...)")
1021
dbg()
1022
f = (table, cb) =>
1023
dbg("table='#{table}'")
1024
if opts.count_only
1025
@_query
1026
query : "SELECT COUNT(*) FROM #{table} WHERE expire <= NOW()"
1027
cb : (err, result) =>
1028
if not err
1029
dbg("COUNT for table #{table} is #{result.rows[0].count}")
1030
cb(err)
1031
else
1032
dbg("deleting expired entries from '#{table}'")
1033
@_query
1034
query : "DELETE FROM #{table} WHERE expire <= NOW()"
1035
cb : (err) =>
1036
dbg("finished deleting expired entries from '#{table}' -- #{err}")
1037
cb(err)
1038
if opts.table
1039
tables = [opts.table]
1040
else
1041
tables = (k for k, v of SCHEMA when v.fields?.expire? and not v.virtual)
1042
async.map(tables, f, opts.cb)
1043
1044
# count number of entries in a table
1045
count: (opts) =>
1046
opts = defaults opts,
1047
table : required
1048
cb : required
1049
@_query
1050
query : "SELECT COUNT(*) FROM #{opts.table}"
1051
cb : count_result(opts.cb)
1052
1053
# sanitize strings before inserting them into a query string
1054
sanitize: (s) =>
1055
escapeString(s)
1056
1057
###
1058
Other misc functions
1059
###
1060
1061
exports.pg_type = pg_type = (info) ->
1062
return pgType(info)
1063
1064
exports.quote_field = quote_field = (field) ->
1065
return quoteField(field)
1066
1067
# Timestamp the given number of seconds **in the future**.
1068
exports.expire_time = expire_time = (ttl) ->
1069
if ttl then new Date((new Date() - 0) + ttl*1000)
1070
1071
# Returns a function that takes as input the output of doing a SQL query.
1072
# If there are no results, returns undefined.
1073
# If there is exactly one result, what is returned depends on pattern:
1074
# 'a_field' --> returns the value of this field in the result
1075
# If more than one result, an error
1076
exports.one_result = one_result = (pattern, cb) ->
1077
if not cb? and typeof(pattern) == 'function'
1078
cb = pattern
1079
pattern = undefined
1080
if not cb?
1081
return -> # do nothing -- return function that ignores result
1082
return (err, result) ->
1083
if err
1084
cb(err)
1085
return
1086
if not result?.rows?
1087
cb()
1088
return
1089
switch result.rows.length
1090
when 0
1091
cb()
1092
when 1
1093
obj = misc.map_without_undefined_and_null(result.rows[0])
1094
if not pattern?
1095
cb(undefined, obj)
1096
return
1097
switch typeof(pattern)
1098
when 'string'
1099
x = obj[pattern]
1100
if not x? # null or undefined -- SQL returns null, but we want undefined
1101
cb()
1102
else
1103
if obj.expire? and new Date() >= obj.expire
1104
cb()
1105
else
1106
cb(undefined, x)
1107
when 'object'
1108
x = {}
1109
for p in pattern
1110
if obj[p]?
1111
x[p] = obj[p]
1112
cb(undefined, x)
1113
else
1114
cb("BUG: unknown pattern -- #{pattern}")
1115
else
1116
cb("more than one result")
1117
1118
exports.all_results = all_results = (pattern, cb) ->
1119
if not cb? and typeof(pattern) == 'function'
1120
cb = pattern
1121
pattern = undefined
1122
if not cb?
1123
return -> # do nothing -- return function that ignores result
1124
return (err, result) ->
1125
if err
1126
cb(err)
1127
else
1128
rows = result.rows
1129
if not pattern?
1130
# TODO: we use stupid (?) misc.copy to unwrap from pg driver type -- investigate better!
1131
# Maybe this is fine. I don't know.
1132
cb(undefined, (misc.copy(x) for x in rows))
1133
else if typeof(pattern) == 'string'
1134
cb(undefined, ((x[pattern] ? undefined) for x in rows))
1135
else
1136
cb("unsupported pattern type '#{typeof(pattern)}'")
1137
1138
1139
exports.count_result = count_result = (cb) ->
1140
if not cb?
1141
return -> # do nothing -- return function that ignores result
1142
return (err, result) ->
1143
if err
1144
cb(err)
1145
else
1146
cb(undefined, parseInt(result?.rows?[0]?.count))
1147
1148