Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/database/postgres-user-query-queue.coffee
Views: 687
#########################################################################1# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2# License: MS-RSL – see LICENSE.md for details3#########################################################################45###6User query queue.78The point of this is to make it so:9(1) there is a limit on the number of simultaneous queries that a single connected client10can make to the database, and11(2) when the client disconnects, any outstanding (not started) queries are canceled, and12(3) queries that don't even start until a certain amount of time after they were made are13automatically considered to have failed (so the client retries).14###1516{defaults} = misc = require('@cocalc/util/misc')17required = defaults.required1819# We do at most this many user queries **at once** to the database on behalf20# of each connected client. This only applies when the global limit has21# been exceeded.22USER_QUERY_LIMIT = 102324# If we don't even start query by this long after we receive query, then we consider it failed25USER_QUERY_TIMEOUT_MS = 150002627# How many recent query times to save for each client.28# This is currently not used for anything except logging.29TIME_HISTORY_LENGTH = 1003031# Do not throttle queries at all unless there are at least this32# many global outstanding concurrent **user queries**. The point is that33# if there's very little load, we should get queries done as fast34# as possible for users.35GLOBAL_LIMIT = 2503637# Maximum queue size -- if user tries to do more queries than this38# at once, then all old ones return an error. They could then retry.39MAX_QUEUE_SIZE = 150 # client isn't supposed to send more than around 25-50 at once.4041# setup metrics42metrics = require('@cocalc/backend/metrics')4344getMetrics = () =>45try46query_queue_exec = metrics.newCounter('db', 'query_queue_executed_total',47'Executed queries and their status', ['status'])48query_queue_duration = metrics.newCounter('db', 'query_queue_duration_seconds_total',49'Total time it took to evaluate queries')50query_queue_done = metrics.newCounter('db', 'query_queue_done_total',51'Total number of evaluated queries')52return {query_queue_exec, query_queue_duration, query_queue_done}53catch err54console.log("WARNING: ", err)55return {}5657global_count = 05859class exports.UserQueryQueue60constructor: (opts) ->61opts = defaults opts,62do_query : required63dbg : required64limit : USER_QUERY_LIMIT65timeout_ms : USER_QUERY_TIMEOUT_MS66global_limit : GLOBAL_LIMIT67concurrent : required68@_do_query = opts.do_query69@_limit = opts.limit70@_dbg = opts.dbg71@_timeout_ms = opts.timeout_ms72@_global_limit = opts.global_limit73@_state = {}74@_concurrent = opts.concurrent7576destroy: =>77delete @_do_query78delete @_limit79delete @_timeout_ms80delete @_dbg81delete @_state82delete @_global_limit8384cancel_user_queries: (opts) =>85opts = defaults opts,86client_id : required87state = @_state[opts.client_id]88@_dbg("cancel_user_queries(client_id='#{opts.client_id}') -- discarding #{state?.queue?.length}")89if state?90delete state.queue # so we will stop trying to do queries for this client91delete @_state[opts.client_id] # and won't waste memory on them9293user_query: (opts) =>94opts = defaults opts,95client_id : required96priority : undefined # (NOT IMPLEMENTED) priority for this query97# (an integer [-10,...,19] like in UNIX)98account_id : undefined99project_id : undefined100query : required101options : []102changes : undefined103cb : undefined104client_id = opts.client_id105@_dbg("user_query(client_id='#{client_id}')")106state = @_state[client_id]107if not state?108state = @_state[client_id] =109client_id : client_id110queue : [] # queries in the queue111count : 0 # number of queries currently outstanding (waiting for these to finish)112sent : 0 # total number of queries sent to database113time_ms : [] # how long recent queries took in ms times_ms[times_ms.length-1] is most recent114opts.time = new Date()115state.queue.push(opts)116state.sent += 1117@_update(state)118119_do_one_query: (opts, cb) =>120if new Date() - opts.time >= @_timeout_ms121@_dbg("_do_one_query -- timed out")122# It took too long before we even **started** the query. There is no123# point in even trying; the client likely already gave up.124opts.cb?("timeout")125cb()126{query_queue_exec} = getMetrics()127query_queue_exec?.labels('timeout').inc()128return129130id = misc.uuid().slice(0,6)131tm = new Date()132client_id = opts.client_id133@_dbg("_do_one_query(client_id='#{client_id}', query_id='#{id}') -- doing the query")134# Actually do the query135orig_cb = opts.cb136# Remove the two properties from opts that @_do_query doesn't take137# as inputs, and of course we do not need anymore.138delete opts.time # no longer matters139delete opts.client_id140delete opts.priority141142# Set a cb that calls our cb exactly once, but sends anything143# it receives to the orig_cb, if there is one.144opts.cb = (err, result) =>145if cb?146@_dbg("_do_one_query(client_id='#{client_id}', query_id='#{id}') -- done; time=#{new Date() - tm}ms")147cb()148cb = undefined149if result?.action == 'close' or err150# I think this is necessary for this closure to ever151# get garbage collected. **Not tested, and this could be bad.**152delete opts.cb153orig_cb?(err, result)154155# Increment counter156{query_queue_exec} = getMetrics()157query_queue_exec?.labels('sent').inc()158# Finally, do the query.159@_do_query(opts)160161_update: (state) =>162if not state.queue? or state.queue.length == 0163return164# Discard all additional messages beyond outstanding and in queue. The client is165# assumed to be smart enough to try again.166while state.queue.length + state.count > MAX_QUEUE_SIZE167@_discard_next_call(state)168# Now handle the remaining messages up to the limit.169while state.queue.length > 0 and (@_concurrent() < @_global_limit or state.count < @_limit)170@_process_next_call(state)171172_discard_next_call: (state) =>173if not state.queue? or state.queue.length == 0174return175@_dbg("_discard_next_call -- discarding (queue size=#{state.queue.length})")176opts = state.queue.shift()177opts.cb("discarded")178@_info(state)179180_process_next_call: (state) =>181if not state.queue? or state.queue.length == 0182return183state.count += 1184global_count += 1185opts = state.queue.shift()186@_info(state)187tm = new Date()188@_do_one_query opts, =>189state.count -= 1190global_count -= 1191duration_ms = new Date() - tm192state.time_ms.push(duration_ms)193{query_queue_duration,query_queue_done} = getMetrics()194query_queue_duration?.inc(duration_ms / 1000)195query_queue_done?.inc(1)196if state.time_ms.length > TIME_HISTORY_LENGTH197state.time_ms.shift()198@_info(state)199@_update(state)200201_avg: (state) =>202# recent average time203v = state.time_ms.slice(state.time_ms.length - 10)204if v.length == 0205return 0206s = 0207for a in v208s += a209return s / v.length210211_info: (state) =>212avg = @_avg(state)213#query_queue_info.labels(state.client_id, 'count').set(state.count)214#query_queue_info.labels(state.client_id, 'avg').set(avg)215#query_queue_info.labels(state.client_id, 'length').set(state.queue?.length ? 0)216#query_queue_info.labels(state.client_id, 'sent').set(state.sent)217@_dbg("client_id='#{state.client_id}': avg=#{avg}ms, count(local=#{state.count},global=#{global_count}), queued.length=#{state.queue?.length}, sent=#{state.sent}")218219220