Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/database/postgres-base.coffee
Views: 687
#########################################################################1# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2# License: MS-RSL – see LICENSE.md for details3#########################################################################45# PostgreSQL -- basic queries and database interface67exports.DEBUG = true89# If database connection is non-responsive but no error raised directly10# by db client, then we will know and fix, rather than just sitting there...11DEFAULT_TIMEOUS_MS = 600001213# Do not test for non-responsiveness until a while after initial connection14# established, since things tend to work initially, *but* may also be much15# slower, due to tons of clients simultaneously connecting to DB.16DEFAULT_TIMEOUT_DELAY_MS = DEFAULT_TIMEOUS_MS * 41718QUERY_ALERT_THRESH_MS=50001920consts = require('./consts')21DEFAULT_STATEMENT_TIMEOUT_MS = consts.STATEMENT_TIMEOUT_MS2223EventEmitter = require('events')2425fs = require('fs')26async = require('async')27escapeString = require('sql-string-escape')28validator = require('validator')29{callback2} = require('@cocalc/util/async-utils')3031LRU = require('lru-cache')3233pg = require('pg')3435winston = require('@cocalc/backend/logger').getLogger('postgres')36{do_query_with_pg_params} = require('./postgres/set-pg-params')3738{ syncSchema } = require('./postgres/schema')39{ pgType } = require('./postgres/schema/pg-type')40{ quoteField } = require('./postgres/schema/util')41{ primaryKey, primaryKeys } = require('./postgres/schema/table')4243misc_node = require('@cocalc/backend/misc_node')44{ sslConfigToPsqlEnv, pghost, pgdatabase, pguser, pgssl } = require("@cocalc/backend/data")454647{defaults} = misc = require('@cocalc/util/misc')48required = defaults.required4950{SCHEMA, client_db} = require('@cocalc/util/schema')5152metrics = require('@cocalc/backend/metrics')5354exports.PUBLIC_PROJECT_COLUMNS = ['project_id', 'last_edited', 'title', 'description', 'deleted', 'created', 'env']55exports.PROJECT_COLUMNS = ['users'].concat(exports.PUBLIC_PROJECT_COLUMNS)5657dbPassword = require('@cocalc/database/pool/password').default;5859class exports.PostgreSQL extends EventEmitter # emits a 'connect' event whenever we successfully connect to the database and 'disconnect' when connection to postgres fails60constructor: (opts) ->6162super()63opts = defaults opts,64host : pghost # DEPRECATED: or 'hostname:port' or 'host1,host2,...' (multiple hosts) -- TODO -- :port only works for one host.65database : pgdatabase66user : pguser67ssl : pgssl68debug : exports.DEBUG69connect : true70password : undefined71cache_expiry : 5000 # expire cached queries after this many milliseconds72# keep this very short; it's just meant to reduce impact of a bunch of73# identical permission checks in a single user query.74cache_size : 300 # cache this many queries; use @_query(cache:true, ...) to cache result75concurrent_warn : 50076concurrent_heavily_loaded : 70 # when concurrent hits this, consider load "heavy"; this changes home some queries behave to be faster but provide less info77ensure_exists : true # ensure database exists on startup (runs psql in a shell)78timeout_ms : DEFAULT_TIMEOUS_MS # **IMPORTANT: if *any* query takes this long, entire connection is terminated and recreated!**79timeout_delay_ms : DEFAULT_TIMEOUT_DELAY_MS # Only reconnect on timeout this many ms after connect. Motivation: on initial startup queries may take much longer due to competition with other clients.80@setMaxListeners(0) # because of a potentially large number of changefeeds81@_state = 'init'82@_debug = opts.debug83@_timeout_ms = opts.timeout_ms84@_timeout_delay_ms = opts.timeout_delay_ms85@_ensure_exists = opts.ensure_exists86@_init_test_query()87dbg = @_dbg("constructor") # must be after setting @_debug above88dbg(opts)89i = opts.host.indexOf(':')90if i != -191@_host = opts.host.slice(0, i)92@_port = parseInt(opts.host.slice(i+1))93else94@_host = opts.host95@_port = 543296@_concurrent_warn = opts.concurrent_warn97@_concurrent_heavily_loaded = opts.concurrent_heavily_loaded98@_user = opts.user99@_database = opts.database100@_ssl = opts.ssl101@_password = opts.password ? dbPassword()102@_init_metrics()103104if opts.cache_expiry and opts.cache_size105@_query_cache = new LRU({max:opts.cache_size, ttl: opts.cache_expiry})106if opts.connect107@connect() # start trying to connect108109clear_cache: =>110@_query_cache?.reset()111112close: =>113if @_state == 'closed'114return # nothing to do115@_close_test_query()116@_state = 'closed'117@emit('close')118@removeAllListeners()119if @_clients?120for client in @_clients121client.removeAllListeners()122client.end()123delete @_clients124125###126If @_timeout_ms is set, then we periodically do a simple test query,127to ensure that the database connection is working and responding to queries.128If the query below times out, then the connection will get recreated.129###130_do_test_query: =>131dbg = @_dbg('test_query')132dbg('starting')133@_query134query : 'SELECT NOW()'135cb : (err, result) =>136dbg("finished", err, result)137138_init_test_query: =>139if not @_timeout_ms140return141@_test_query = setInterval(@_do_test_query, @_timeout_ms)142143_close_test_query: =>144if @_test_query?145clearInterval(@_test_query)146delete @_test_query147148engine: -> 'postgresql'149150connect: (opts) =>151opts = defaults opts,152max_time : undefined # set to something shorter to not try forever153# Only first max_time is used.154cb : undefined155if @_state == 'closed'156opts.cb?("closed")157return158dbg = @_dbg("connect")159if @_clients?160dbg("already connected")161opts.cb?()162return163if @_connecting?164dbg('already trying to connect')165@_connecting.push(opts.cb)166# keep several times the db-concurrent-warn limit of callbacks167max_connecting = 5 * @_concurrent_warn168while @_connecting.length > max_connecting169@_connecting.shift()170dbg("WARNING: still no DB available, dropping old callbacks (limit: #{max_connecting})")171return172dbg('will try to connect')173@_state = 'init'174if opts.max_time175dbg("for up to #{opts.max_time}ms")176else177dbg("until successful")178@_connecting = [opts.cb]179misc.retry_until_success180f : @_connect181max_delay : 10000182max_time : opts.max_time183start_delay : 500 + 500*Math.random()184log : dbg185cb : (err) =>186v = @_connecting187delete @_connecting188for cb in v189cb?(err)190if not err191@_state = 'connected'192@emit('connect')193194disconnect: () =>195if @_clients?196for client in @_clients197client.end()198client.removeAllListeners()199delete @_clients200201is_connected: () =>202return @_clients? and @_clients.length > 0203204_connect: (cb) =>205dbg = @_dbg("_connect")206dbg("connect to #{@_host}")207@_clear_listening_state() # definitely not listening208if @_clients?209@disconnect()210locals =211clients : []212hosts : []213@_connect_time = 0214@_concurrent_queries = 0 # can't be any going on now.215async.series([216(cb) =>217if @_ensure_exists218dbg("first make sure db exists")219@_ensure_database_exists(cb)220else221dbg("assuming database exists")222cb()223(cb) =>224if not @_host # undefined if @_host=''225locals.hosts = [undefined]226cb()227return228if @_host.indexOf('/') != -1229dbg("using a local socket file (not a hostname)")230locals.hosts = [@_host]231cb()232return233f = (host, cb) =>234hostname = host.split(':')[0]235winston.debug("Looking up ip addresses of #{hostname}")236require('dns').lookup hostname, {all:true}, (err, ips) =>237if err238winston.debug("Got #{hostname} --> err=#{err}")239# NON-FATAL -- we just don't include these and hope to240# have at least one total working host...241cb()242else243winston.debug("Got #{hostname} --> #{JSON.stringify(ips)}")244# In kubernetes the stateful set service just has245# lots of ip address. We connect to *all* of them,246# and spread queries across them equally.247for x in ips248locals.hosts.push(x.address)249cb()250async.map(@_host.split(','), f, (err) => cb(err))251(cb) =>252dbg("connecting to #{JSON.stringify(locals.hosts)}...")253if locals.hosts.length == 0254dbg("locals.hosts has length 0 -- no available db")255cb("no databases available")256return257258dbg("create client and start connecting...")259locals.clients = []260261# Use a function to initialize the client, to avoid any issues with scope of "client" below.262# Ref: https://node-postgres.com/apis/client263init_client = (host) =>264client = new pg.Client265user : @_user266host : host267port : @_port268password : @_password269database : @_database270ssl : @_ssl271statement_timeout: DEFAULT_STATEMENT_TIMEOUT_MS # we set a statement_timeout, to avoid queries locking up PG272if @_notification?273client.on('notification', @_notification)274onError = (err) =>275# only listen once for error; after that we've276# killed connection and don't care.277client.removeListener('error', onError)278if @_state == 'init'279# already started connecting280return281@emit('disconnect')282dbg("error -- #{err}")283@disconnect()284@connect() # start trying to reconnect285client.on('error', onError)286client.setMaxListeners(0) # there is one emitter for each concurrent query... (see query_cb)287locals.clients.push(client)288289for host in locals.hosts290init_client(host)291292# Connect the clients. If at least one succeeds, we use this.293# If none succeed, we declare failure.294# Obviously, this is NOT optimal -- it's just hopefully sufficiently robust/works.295# I'm going to redo this with experience.296locals.clients_that_worked = []297locals.errors = []298f = (client, c) =>299try300await client.connect()301locals.clients_that_worked.push(client)302catch err303locals.errors.push(err)304c()305async.map locals.clients, f, () =>306if locals.clients_that_worked.length == 0307console.warn("ALL clients failed", locals.errors)308dbg("ALL clients failed", locals.errors)309cb("ALL clients failed to connect")310else311# take what we got312if locals.clients.length == locals.clients_that_worked.length313dbg("ALL clients worked")314else315dbg("ONLY #{locals.clients_that_worked.length} clients worked")316locals.clients = locals.clients_that_worked317dbg("cb = ", cb)318cb()319320(cb) =>321@_connect_time = new Date()322locals.i = 0323324# Weird and unfortunate fact -- this query can and does **HANG** never returning325# in some edge cases. That's why we have to be paranoid about this entire _connect326# function...327f = (client, cb) =>328it_hung = =>329cb?("hung")330cb = undefined331timeout = setTimeout(it_hung, 15000)332dbg("now connected; checking if we can actually query the DB via client #{locals.i}")333locals.i += 1334client.query "SELECT NOW()", (err) =>335clearTimeout(timeout)336cb?(err)337async.map(locals.clients, f, cb)338(cb) =>339dbg("checking if ANY db server is in recovery, i.e., we are doing standby queries only")340@is_standby = false341f = (client, cb) =>342# Is this a read/write or read-only connection?343client.query "SELECT pg_is_in_recovery()", (err, resp) =>344if err345cb(err)346else347# True if ANY db connection is read only.348if resp.rows[0].pg_is_in_recovery349@is_standby = true350cb()351async.map(locals.clients, f, cb)352], (err) =>353if err354mesg = "Failed to connect to database -- #{err}"355dbg(mesg)356console.warn(mesg) # make it clear for interactive users with debugging off -- common mistake with env not setup right.357cb?(err)358else359@_clients = locals.clients360@_concurrent_queries = 0361dbg("connected!")362cb?(undefined, @)363)364365# Return a native pg client connection. This will366# round robbin through all connections. It returns367# undefined if there are no connections.368_client: =>369if not @_clients?370return371if @_clients.length <= 1372return @_clients[0]373@_client_index ?= -1374@_client_index = @_client_index + 1375if @_client_index >= @_clients.length376@_client_index = 0377return @_clients[@_client_index]378379# Return query function of a database connection.380get_db_query: =>381db = @_client()382return db?.query.bind(db)383384_dbg: (f) =>385if @_debug386return (m) => winston.debug("PostgreSQL.#{f}: #{misc.trunc_middle(JSON.stringify(m), 250)}")387else388return ->389390_init_metrics: =>391# initialize metrics392try393@query_time_histogram = metrics.newHistogram('db', 'query_ms_histogram', 'db queries'394buckets : [1, 5, 10, 20, 50, 100, 200, 500, 1000, 5000, 10000]395labels: ['table']396)397@concurrent_counter = metrics.newCounter('db', 'concurrent_total',398'Concurrent queries (started and finished)',399['state']400)401catch err402@_dbg("_init_metrics")("WARNING -- #{err}")403404async_query: (opts) =>405return await callback2(@_query.bind(@), opts)406407_query: (opts) =>408opts = defaults opts,409query : undefined # can give select and table instead410select : undefined # if given, should be string or array of column names -| can give these411table : undefined # if given, name of table -| two instead of query412params : []413cache : false # Will cache results for a few seconds or use cache. Use this414# when speed is very important, and results that are a few seconds415# out of date are fine.416where : undefined # Used for SELECT: If given, can be417# - a map with keys clauses with $::TYPE (not $1::TYPE!) and values418# the corresponding params. Also, WHERE must not be in the query already.419# If where[cond] is undefined, then cond is completely **ignored**.420# - a string, which is inserted as is as a normal WHERE condition.421# - an array of maps or strings.422set : undefined # Appends a SET clause to the query; same format as values.423values : undefined # Used for INSERT: If given, then params and where must not be given. Values is a map424# {'field1::type1':value, , 'field2::type2':value2, ...} which gets converted to425# ' (field1, field2, ...) VALUES ($1::type1, $2::type2, ...) '426# with corresponding params set. Undefined valued fields are ignored and types may427# be omitted. Javascript null is not ignored and converts to PostgreSQL NULL.428conflict : undefined # If given, then values must also be given; appends this to query:429# ON CONFLICT (name) DO UPDATE SET value=EXCLUDED.value'430# Or, if conflict starts with "ON CONFLICT", then just include as is, e.g.,431# "ON CONFLICT DO NOTHING"432jsonb_set : undefined # Used for setting a field that contains a JSONB javascript map.433# NOTE: This does some merging! If you just want to replace the whole thing use the normal set above.434# Give as input an object435#436# { field1:{key1:val1, key2:val2, ...}, field2:{key3:val3,...}, ...}437#438# In each field, every key has the corresponding value set, unless val is undefined/null, in which439# case that key is deleted from the JSONB object fieldi. Simple as that! This is much, much440# cleaner to use than SQL. Also, if the value in field itself is NULL, it gets441# created automatically.442jsonb_merge : undefined # Exactly like jsonb_set, but when val1 (say) is an object, it merges that object in,443# *instead of* setting field1[key1]=val1. So after this field1[key1] has what was in it444# and also what is in val1. Obviously field1[key1] had better have been an array or NULL.445order_by : undefined446limit : undefined447offset : undefined448safety_check: true449retry_until_success : undefined # if given, should be options to misc.retry_until_success450pg_params : undefined # key/value map of postgres parameters, which will be set for the query in a single transaction451timeout_s : undefined # by default, there is a "statement_timeout" set. set to 0 to disable or a number in seconds452cb : undefined453454# quick check for write query against read-only connection455if @is_standby and (opts.set? or opts.jsonb_set? or opts.jsonb_merge?)456opts.cb?("set queries against standby not allowed")457return458459if opts.retry_until_success460@_query_retry_until_success(opts)461return462463if not @is_connected()464dbg = @_dbg("_query")465dbg("connecting first...")466# 2022-06: below there was {max_time: 45000} set with the note467# "don't try forever; queries could pile up."468# but I think this is rather harmful, since the hub could stop469# trying to connect to the database altogether.470# Rather, hub/health-checks::checkDBConnectivity will471# mark the hub as being bad if it can't connect to the database.472@connect473cb : (err) =>474if err475dbg("FAILED to connect -- #{err}")476opts.cb?("database is down (please try later)")477else478dbg("connected, now doing query")479@__do_query(opts)480else481@__do_query(opts)482483_query_retry_until_success: (opts) =>484retry_opts = opts.retry_until_success485orig_cb = opts.cb486delete opts.retry_until_success487488# f just calls @_do_query, but with a different cb (same opts)489args = undefined490f = (cb) =>491opts.cb = (args0...) =>492args = args0493cb(args[0])494@_query(opts)495496retry_opts.f = f497# When misc.retry_until_success finishes, it calls this, which just498# calls the original cb.499retry_opts.cb = (err) =>500if err501orig_cb?(err)502else503orig_cb?(args...)504505# OK, now start it attempting.506misc.retry_until_success(retry_opts)507508__do_query: (opts) =>509dbg = @_dbg("__do_query('#{misc.trunc(opts.query?.replace(/\n/g, " "),250)}',id='#{misc.uuid().slice(0,6)}')")510if not @is_connected()511# TODO: should also check that client is connected.512opts.cb?("client not yet initialized")513return514if opts.params? and not misc.is_array(opts.params)515opts.cb?("params must be an array")516return517if not opts.query?518if not opts.table?519opts.cb?("if query not given, then table must be given")520return521if not opts.select?522opts.select = '*'523if misc.is_array(opts.select)524opts.select = (quote_field(field) for field in opts.select).join(',')525opts.query = "SELECT #{opts.select} FROM \"#{opts.table}\""526delete opts.select527528push_param = (param, type) ->529if type?.toUpperCase() == 'JSONB'530param = misc.to_json(param) # I don't understand why this is needed by the driver....531opts.params.push(param)532return opts.params.length533534if opts.jsonb_merge?535if opts.jsonb_set?536opts.cb?("if jsonb_merge is set then jsonb_set must not be set")537return538opts.jsonb_set = opts.jsonb_merge539540SET = []541if opts.jsonb_set?542# This little piece of very hard to write (and clever?) code543# makes it so we can set or **merge in at any nested level** (!)544# arbitrary JSON objects. We can also delete any key at any545# level by making the value null or undefined! This is amazingly546# easy to use in queries -- basically making JSONP with postgres547# as expressive as RethinkDB REQL (even better in some ways).548set = (field, data, path) =>549obj = "COALESCE(#{field}#>'{#{path.join(',')}}', '{}'::JSONB)"550for key, val of data551if not val?552# remove key from object553obj = "(#{obj} - '#{key}')"554else555if opts.jsonb_merge? and (typeof(val) == 'object' and not misc.is_date(val))556subobj = set(field, val, path.concat([key]))557obj = "JSONB_SET(#{obj}, '{#{key}}', #{subobj})"558else559# completely replace field[key] with val.560obj = "JSONB_SET(#{obj}, '{#{key}}', $#{push_param(val, 'JSONB')}::JSONB)"561return obj562v = ("#{field}=#{set(field, data, [])}" for field, data of opts.jsonb_set)563SET.push(v...)564565if opts.values?566#dbg("values = #{misc.to_json(opts.values)}")567if opts.where?568opts.cb?("where must not be defined if opts.values is defined")569return570571if misc.is_array(opts.values)572# An array of numerous separate object that we will insert all at once.573# Determine the fields, which as the union of the keys of all values.574fields = {}575for x in opts.values576if not misc.is_object(x)577opts.cb?("if values is an array, every entry must be an object")578return579for k, p of x580fields[k] = true581# convert to array582fields = misc.keys(fields)583fields_to_index = {}584n = 0585for field in fields586fields_to_index[field] = n587n += 1588values = []589for x in opts.values590value = []591for field, param of x592if field.indexOf('::') != -1593[field, type] = field.split('::')594type = type.trim()595y = "$#{push_param(param, type)}::#{type}"596else597y = "$#{push_param(param)}"598value[fields_to_index[field]] = y599values.push(value)600else601# A single entry that we'll insert.602603fields = []604values = []605for field, param of opts.values606if param == undefined607# ignore undefined fields -- makes code cleaner (and makes sense)608continue609if field.indexOf('::') != -1610[field, type] = field.split('::')611fields.push(quote_field(field.trim()))612type = type.trim()613values.push("$#{push_param(param, type)}::#{type}")614continue615else616fields.push(quote_field(field))617values.push("$#{push_param(param)}")618values = [values] # just one619620if values.length > 0621opts.query += " (#{(quote_field(field) for field in fields).join(',')}) VALUES " + (" (#{value.join(',')}) " for value in values).join(',')622623if opts.set?624v = []625for field, param of opts.set626if field.indexOf('::') != -1627[field, type] = field.split('::')628type = type.trim()629v.push("#{quote_field(field.trim())}=$#{push_param(param, type)}::#{type}")630continue631else632v.push("#{quote_field(field.trim())}=$#{push_param(param)}")633if v.length > 0634SET.push(v...)635636if opts.conflict?637if misc.is_string(opts.conflict) and misc.startswith(opts.conflict.toLowerCase().trim(), 'on conflict')638# Straight string inclusion639opts.query += ' ' + opts.conflict + ' '640else641if not opts.values?642opts.cb?("if conflict is specified then values must also be specified")643return644if not misc.is_array(opts.conflict)645if typeof(opts.conflict) != 'string'646opts.cb?("conflict (='#{misc.to_json(opts.conflict)}') must be a string (the field name), for now")647return648else649conflict = [opts.conflict]650else651conflict = opts.conflict652v = ("#{quote_field(field)}=EXCLUDED.#{field}" for field in fields when field not in conflict)653SET.push(v...)654if SET.length == 0655opts.query += " ON CONFLICT (#{conflict.join(',')}) DO NOTHING "656else657opts.query += " ON CONFLICT (#{conflict.join(',')}) DO UPDATE "658659if SET.length > 0660opts.query += " SET " + SET.join(' , ')661662WHERE = []663push_where = (x) =>664if typeof(x) == 'string'665WHERE.push(x)666else if misc.is_array(x)667for v in x668push_where(v)669else if misc.is_object(x)670for cond, param of x671if typeof(cond) != 'string'672opts.cb?("each condition must be a string but '#{cond}' isn't")673return674if not param? # *IGNORE* where conditions where value is explicitly undefined675continue676if cond.indexOf('$') == -1677# where condition is missing it's $ parameter -- default to equality678cond += " = $"679WHERE.push(cond.replace('$', "$#{push_param(param)}"))680681if opts.where?682push_where(opts.where)683684if WHERE.length > 0685if opts.values?686opts.cb?("values must not be given if where clause given")687return688opts.query += " WHERE #{WHERE.join(' AND ')}"689690if opts.order_by?691if opts.order_by.indexOf("'") >= 0692err = "ERROR -- detected ' apostrophe in order_by='#{opts.order_by}'"693dbg(err)694opts.cb?(err)695return696opts.query += " ORDER BY #{opts.order_by}"697698if opts.limit?699if not validator.isInt('' + opts.limit, min:0)700err = "ERROR -- opts.limit = '#{opts.limit}' is not an integer"701dbg(err)702opts.cb?(err)703return704opts.query += " LIMIT #{opts.limit} "705706if opts.offset?707if not validator.isInt('' + opts.offset, min:0)708err = "ERROR -- opts.offset = '#{opts.offset}' is not an integer"709dbg(err)710opts.cb?(err)711return712opts.query += " OFFSET #{opts.offset} "713714715716if opts.safety_check717safety_check = opts.query.toLowerCase().trim()718if (safety_check.startsWith('update') or safety_check.startsWith('delete')) and (safety_check.indexOf('where') == -1 and safety_check.indexOf('trigger') == -1 and safety_check.indexOf('insert') == -1 and safety_check.indexOf('create') == -1)719# This is always a bug.720err = "ERROR -- Dangerous UPDATE or DELETE without a WHERE, TRIGGER, or INSERT: query='#{opts.query}'"721dbg(err)722opts.cb?(err)723return724725if opts.cache and @_query_cache?726# check for cached result727full_query_string = JSON.stringify([opts.query, opts.params])728if (x = @_query_cache.get(full_query_string))?729dbg("using cache for '#{opts.query}'")730opts.cb?(x...)731return732733# params can easily be huge, e.g., a blob. But this may be734# needed at some point for debugging.735#dbg("query='#{opts.query}', params=#{misc.to_json(opts.params)}")736client = @_client()737if not client?738opts.cb?("not connected")739return740@_concurrent_queries ?= 0741@_concurrent_queries += 1742dbg("query='#{opts.query} (concurrent=#{@_concurrent_queries})'")743744@concurrent_counter?.labels('started').inc(1)745try746start = new Date()747if @_timeout_ms and @_timeout_delay_ms748# Create a timer, so that if the query doesn't return within749# timeout_ms time, then the entire connection is destroyed.750# It then gets recreated automatically. I tested751# and all outstanding queries also get an error when this happens.752timeout_error = =>753# Only disconnect with timeout error if it has been sufficiently long754# since connecting. This way when an error is triggered, all the755# outstanding timers at the moment of the error will just get ignored756# when they fire (since @_connect_time is 0 or too recent).757if @_connect_time and new Date() - @_connect_time > @_timeout_delay_ms758client.emit('error', 'timeout')759timer = setTimeout(timeout_error, @_timeout_ms)760761# PAINFUL FACT: In client.query below, if the client is closed/killed/errored762# (especially via client.emit above), then none of the callbacks from763# client.query are called!764finished = false765error_listener = ->766dbg("error_listener fired")767query_cb('error')768client.once('error', error_listener)769query_cb = (err, result) =>770if finished # ensure no matter what that query_cb is called at most once.771dbg("called when finished (ignoring)")772return773finished = true774client.removeListener('error', error_listener)775776if @_timeout_ms777clearTimeout(timer)778query_time_ms = new Date() - start779@_concurrent_queries -= 1780@query_time_histogram?.observe({table:opts.table ? ''}, query_time_ms)781@concurrent_counter?.labels('ended').inc(1)782if err783dbg("done (concurrent=#{@_concurrent_queries}), (query_time_ms=#{query_time_ms}) -- error: #{err}")784## DANGER785# Only uncomment this for low level debugging!786#### dbg("params = #{JSON.stringify(opts.params)}")787##788err = 'postgresql ' + err789else790dbg("done (concurrent=#{@_concurrent_queries}) (query_time_ms=#{query_time_ms}) -- success")791if opts.cache and @_query_cache?792@_query_cache.set(full_query_string, [err, result])793opts.cb?(err, result)794if query_time_ms >= QUERY_ALERT_THRESH_MS795dbg("QUERY_ALERT_THRESH: query_time_ms=#{query_time_ms}\nQUERY_ALERT_THRESH: query='#{opts.query}'\nQUERY_ALERT_THRESH: params='#{misc.to_json(opts.params)}'")796797# set a timeout for one specific query (there is a default when creating the pg.Client, see @_connect)798if opts.timeout_s? and typeof opts.timeout_s == 'number' and opts.timeout_s >= 0799dbg("set query timeout to #{opts.timeout_s}secs")800opts.pg_params ?= {}801# the actual param is in milliseconds802# https://postgresqlco.nf/en/doc/param/statement_timeout/803opts.pg_params.statement_timeout = 1000 * opts.timeout_s804805if opts.pg_params?806dbg("run query with specific postgres parameters in a transaction")807do_query_with_pg_params(client: client, query: opts.query, params: opts.params, pg_params:opts.pg_params, cb: query_cb)808else809client.query(opts.query, opts.params, query_cb)810811catch e812# this should never ever happen813dbg("EXCEPTION in client.query: #{e}")814opts.cb?(e)815@_concurrent_queries -= 1816@concurrent_counter?.labels('ended').inc(1)817return818819# Special case of query for counting entries in a table.820_count: (opts) =>821opts = defaults opts,822table : required823where : undefined # as in _query824cb : required825@_query826query : "SELECT COUNT(*) FROM #{opts.table}"827where : opts.where828cb : count_result(opts.cb)829830_validate_opts: (opts) =>831for k, v of opts832if k.slice(k.length-2) == 'id'833if v? and not misc.is_valid_uuid_string(v)834opts.cb?("invalid #{k} -- #{v}")835return false836if k.slice(k.length-3) == 'ids'837for w in v838if not misc.is_valid_uuid_string(w)839opts.cb?("invalid uuid #{w} in #{k} -- #{misc.to_json(v)}")840return false841if k == 'group' and v not in misc.PROJECT_GROUPS842opts.cb?("unknown project group '#{v}'"); return false843if k == 'groups'844for w in v845if w not in misc.PROJECT_GROUPS846opts.cb?("unknown project group '#{w}' in groups"); return false847return true848849_ensure_database_exists: (cb) =>850dbg = @_dbg("_ensure_database_exists")851dbg("ensure database '#{@_database}' exists")852args = ['--user', @_user, '--host', @_host.split(',')[0], '--port', @_port, '--list', '--tuples-only']853sslEnv = sslConfigToPsqlEnv(@_ssl)854dbg("psql #{args.join(' ')}")855misc_node.execute_code856command : 'psql'857args : args858env : Object.assign sslEnv,859PGPASSWORD : @_password860cb : (err, output) =>861if err862cb(err)863return864databases = (x.split('|')[0].trim() for x in output.stdout.split('\n') when x)865if @_database in databases866dbg("database '#{@_database}' already exists")867cb()868return869dbg("creating database '#{@_database}'")870misc_node.execute_code871command : 'createdb'872args : ['--host', @_host, '--port', @_port, @_database]873env :874PGPASSWORD : @_password875cb : cb876877_confirm_delete: (opts) =>878opts = defaults opts,879confirm : 'no'880cb : required881dbg = @_dbg("confirm")882if opts.confirm != 'yes'883err = "Really delete all data? -- you must explicitly pass in confirm='yes' (but confirm:'#{opts.confirm}')"884dbg(err)885opts.cb(err)886return false887else888return true889890set_random_password: (opts) =>891throw Error("NotImplementedError")892893# This will fail if any other clients have db open.894# This function is very important for automated testing.895delete_entire_database: (opts) =>896dbg = @_dbg("delete_entire_database")897dbg("deleting database '#{@_database}'")898if not @_confirm_delete(opts)899dbg("failed confirmation")900return901async.series([902(cb) =>903dbg("disconnect from db")904@disconnect()905cb()906(cb) =>907misc_node.execute_code908command : 'dropdb'909args : ['--host', @_host, '--port', @_port, @_database]910cb : cb911], opts.cb)912913# Deletes all the contents of the tables in the database. It doesn't914# delete anything about the schema itself: indexes or tables.915delete_all: (opts) =>916dbg = @_dbg("delete_all")917dbg("deleting all contents of tables in '#{@_database}'")918if not @_confirm_delete(opts)919return920921# If the cache is enabled, be sure to also clear it.922@clear_cache()923924tables = undefined925926# Delete anything cached in the db object. Obviously, not putting something here927# is a natural place in which to cause bugs... but they will probably all be bugs928# of the form "the test suite fails", so we'll find them.929delete @_stats_cached930931# Actually delete tables932async.series([933(cb) =>934@_get_tables (err, t) =>935tables = t; cb(err)936(cb) =>937f = (table, cb) =>938@_query939query : "DELETE FROM #{table}"940safety_check : false941cb : cb942async.map(tables, f, cb)943], opts.cb)944945# return list of tables in the database946_get_tables: (cb) =>947@_query948query : "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"949cb : (err, result) =>950if err951cb(err)952else953cb(undefined, (row.table_name for row in result.rows))954955# Return list of columns in a given table956_get_columns: (table, cb) =>957@_query958query : "SELECT column_name FROM information_schema.columns"959where :960"table_name = $::text" : table961cb : (err, result) =>962if err963cb(err)964else965cb(undefined, (row.column_name for row in result.rows))966967_primary_keys: (table) =>968return primaryKeys(table)969970# Return *the* primary key, assuming unique; otherwise raise an exception.971_primary_key: (table) =>972return primaryKey(table)973974_throttle: (name, time_s, key...) =>975key = misc.to_json(key)976x = "_throttle_#{name}"977@[x] ?= {}978if @[x][key]979return true980@[x][key] = true981setTimeout((()=>delete @[x]?[key]), time_s*1000)982return false983984# Ensure that the actual schema in the database matches the one defined in SCHEMA.985# This creates the initial schema, adds new columns, and in a VERY LIMITED986# range of cases, *might be* be able to change the data type of a column.987update_schema: (opts) =>988try989await syncSchema(SCHEMA);990opts.cb?()991catch err992opts.cb?(err)993994# Return the number of outstanding concurrent queries.995concurrent: =>996return @_concurrent_queries ? 0997998is_heavily_loaded: =>999return @_concurrent_queries >= @_concurrent_heavily_loaded10001001# Compute the sha1 hash (in hex) of the input arguments, which are1002# converted to strings (via json) if they are not strings, then concatenated.1003# This is used for computing compound primary keys in a way that is relatively1004# safe, and in situations where if there were a highly unlikely collision, it1005# wouldn't be the end of the world. There is a similar client-only slower version1006# of this function (in schema.coffee), so don't change it willy nilly.1007sha1: (args...) ->1008v = ((if typeof(x) == 'string' then x else JSON.stringify(x)) for x in args).join('')1009return misc_node.sha1(v)10101011# Go through every table in the schema with a column called "expire", and1012# delete every entry where expire is <= right now.1013# Note: this ignores those rows, where expire is NULL, because comparisons with NULL are NULL1014delete_expired: (opts) =>1015opts = defaults opts,1016count_only : false # if true, only count the number of rows that would be deleted1017table : undefined # only delete from this table1018cb : required1019dbg = @_dbg("delete_expired(...)")1020dbg()1021f = (table, cb) =>1022dbg("table='#{table}'")1023if opts.count_only1024@_query1025query : "SELECT COUNT(*) FROM #{table} WHERE expire <= NOW()"1026cb : (err, result) =>1027if not err1028dbg("COUNT for table #{table} is #{result.rows[0].count}")1029cb(err)1030else1031dbg("deleting expired entries from '#{table}'")1032@_query1033query : "DELETE FROM #{table} WHERE expire <= NOW()"1034cb : (err) =>1035dbg("finished deleting expired entries from '#{table}' -- #{err}")1036cb(err)1037if opts.table1038tables = [opts.table]1039else1040tables = (k for k, v of SCHEMA when v.fields?.expire? and not v.virtual)1041async.map(tables, f, opts.cb)10421043# count number of entries in a table1044count: (opts) =>1045opts = defaults opts,1046table : required1047cb : required1048@_query1049query : "SELECT COUNT(*) FROM #{opts.table}"1050cb : count_result(opts.cb)10511052# sanitize strings before inserting them into a query string1053sanitize: (s) =>1054escapeString(s)10551056###1057Other misc functions1058###10591060exports.pg_type = pg_type = (info) ->1061return pgType(info)10621063exports.quote_field = quote_field = (field) ->1064return quoteField(field)10651066# Timestamp the given number of seconds **in the future**.1067exports.expire_time = expire_time = (ttl) ->1068if ttl then new Date((new Date() - 0) + ttl*1000)10691070# Returns a function that takes as input the output of doing a SQL query.1071# If there are no results, returns undefined.1072# If there is exactly one result, what is returned depends on pattern:1073# 'a_field' --> returns the value of this field in the result1074# If more than one result, an error1075exports.one_result = one_result = (pattern, cb) ->1076if not cb? and typeof(pattern) == 'function'1077cb = pattern1078pattern = undefined1079if not cb?1080return -> # do nothing -- return function that ignores result1081return (err, result) ->1082if err1083cb(err)1084return1085if not result?.rows?1086cb()1087return1088switch result.rows.length1089when 01090cb()1091when 11092obj = misc.map_without_undefined_and_null(result.rows[0])1093if not pattern?1094cb(undefined, obj)1095return1096switch typeof(pattern)1097when 'string'1098x = obj[pattern]1099if not x? # null or undefined -- SQL returns null, but we want undefined1100cb()1101else1102if obj.expire? and new Date() >= obj.expire1103cb()1104else1105cb(undefined, x)1106when 'object'1107x = {}1108for p in pattern1109if obj[p]?1110x[p] = obj[p]1111cb(undefined, x)1112else1113cb("BUG: unknown pattern -- #{pattern}")1114else1115cb("more than one result")11161117exports.all_results = all_results = (pattern, cb) ->1118if not cb? and typeof(pattern) == 'function'1119cb = pattern1120pattern = undefined1121if not cb?1122return -> # do nothing -- return function that ignores result1123return (err, result) ->1124if err1125cb(err)1126else1127rows = result.rows1128if not pattern?1129# TODO: we use stupid (?) misc.copy to unwrap from pg driver type -- investigate better!1130# Maybe this is fine. I don't know.1131cb(undefined, (misc.copy(x) for x in rows))1132else if typeof(pattern) == 'string'1133cb(undefined, ((x[pattern] ? undefined) for x in rows))1134else1135cb("unsupported pattern type '#{typeof(pattern)}'")113611371138exports.count_result = count_result = (cb) ->1139if not cb?1140return -> # do nothing -- return function that ignores result1141return (err, result) ->1142if err1143cb(err)1144else1145cb(undefined, parseInt(result?.rows?[0]?.count))114611471148