CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres-user-query-queue.coffee
Views: 687
1
#########################################################################
2
# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
# License: MS-RSL – see LICENSE.md for details
4
#########################################################################
5
6
###
7
User query queue.
8
9
The point of this is to make it so:
10
(1) there is a limit on the number of simultaneous queries that a single connected client
11
can make to the database, and
12
(2) when the client disconnects, any outstanding (not started) queries are canceled, and
13
(3) queries that don't even start until a certain amount of time after they were made are
14
automatically considered to have failed (so the client retries).
15
###
16
17
{defaults} = misc = require('@cocalc/util/misc')
18
required = defaults.required
19
20
# We do at most this many user queries **at once** to the database on behalf
21
# of each connected client. This only applies when the global limit has
22
# been exceeded.
23
USER_QUERY_LIMIT = 10
24
25
# If we don't even start query by this long after we receive query, then we consider it failed
26
USER_QUERY_TIMEOUT_MS = 15000
27
28
# How many recent query times to save for each client.
29
# This is currently not used for anything except logging.
30
TIME_HISTORY_LENGTH = 100
31
32
# Do not throttle queries at all unless there are at least this
33
# many global outstanding concurrent **user queries**. The point is that
34
# if there's very little load, we should get queries done as fast
35
# as possible for users.
36
GLOBAL_LIMIT = 250
37
38
# Maximum queue size -- if user tries to do more queries than this
39
# at once, then all old ones return an error. They could then retry.
40
MAX_QUEUE_SIZE = 150 # client isn't supposed to send more than around 25-50 at once.
41
42
# setup metrics
43
metrics = require('@cocalc/backend/metrics')
44
45
getMetrics = () =>
46
try
47
query_queue_exec = metrics.newCounter('db', 'query_queue_executed_total',
48
'Executed queries and their status', ['status'])
49
query_queue_duration = metrics.newCounter('db', 'query_queue_duration_seconds_total',
50
'Total time it took to evaluate queries')
51
query_queue_done = metrics.newCounter('db', 'query_queue_done_total',
52
'Total number of evaluated queries')
53
return {query_queue_exec, query_queue_duration, query_queue_done}
54
catch err
55
console.log("WARNING: ", err)
56
return {}
57
58
global_count = 0
59
60
class exports.UserQueryQueue
61
constructor: (opts) ->
62
opts = defaults opts,
63
do_query : required
64
dbg : required
65
limit : USER_QUERY_LIMIT
66
timeout_ms : USER_QUERY_TIMEOUT_MS
67
global_limit : GLOBAL_LIMIT
68
concurrent : required
69
@_do_query = opts.do_query
70
@_limit = opts.limit
71
@_dbg = opts.dbg
72
@_timeout_ms = opts.timeout_ms
73
@_global_limit = opts.global_limit
74
@_state = {}
75
@_concurrent = opts.concurrent
76
77
destroy: =>
78
delete @_do_query
79
delete @_limit
80
delete @_timeout_ms
81
delete @_dbg
82
delete @_state
83
delete @_global_limit
84
85
cancel_user_queries: (opts) =>
86
opts = defaults opts,
87
client_id : required
88
state = @_state[opts.client_id]
89
@_dbg("cancel_user_queries(client_id='#{opts.client_id}') -- discarding #{state?.queue?.length}")
90
if state?
91
delete state.queue # so we will stop trying to do queries for this client
92
delete @_state[opts.client_id] # and won't waste memory on them
93
94
user_query: (opts) =>
95
opts = defaults opts,
96
client_id : required
97
priority : undefined # (NOT IMPLEMENTED) priority for this query
98
# (an integer [-10,...,19] like in UNIX)
99
account_id : undefined
100
project_id : undefined
101
query : required
102
options : []
103
changes : undefined
104
cb : undefined
105
client_id = opts.client_id
106
@_dbg("user_query(client_id='#{client_id}')")
107
state = @_state[client_id]
108
if not state?
109
state = @_state[client_id] =
110
client_id : client_id
111
queue : [] # queries in the queue
112
count : 0 # number of queries currently outstanding (waiting for these to finish)
113
sent : 0 # total number of queries sent to database
114
time_ms : [] # how long recent queries took in ms times_ms[times_ms.length-1] is most recent
115
opts.time = new Date()
116
state.queue.push(opts)
117
state.sent += 1
118
@_update(state)
119
120
_do_one_query: (opts, cb) =>
121
if new Date() - opts.time >= @_timeout_ms
122
@_dbg("_do_one_query -- timed out")
123
# It took too long before we even **started** the query. There is no
124
# point in even trying; the client likely already gave up.
125
opts.cb?("timeout")
126
cb()
127
{query_queue_exec} = getMetrics()
128
query_queue_exec?.labels('timeout').inc()
129
return
130
131
id = misc.uuid().slice(0,6)
132
tm = new Date()
133
client_id = opts.client_id
134
@_dbg("_do_one_query(client_id='#{client_id}', query_id='#{id}') -- doing the query")
135
# Actually do the query
136
orig_cb = opts.cb
137
# Remove the two properties from opts that @_do_query doesn't take
138
# as inputs, and of course we do not need anymore.
139
delete opts.time # no longer matters
140
delete opts.client_id
141
delete opts.priority
142
143
# Set a cb that calls our cb exactly once, but sends anything
144
# it receives to the orig_cb, if there is one.
145
opts.cb = (err, result) =>
146
if cb?
147
@_dbg("_do_one_query(client_id='#{client_id}', query_id='#{id}') -- done; time=#{new Date() - tm}ms")
148
cb()
149
cb = undefined
150
if result?.action == 'close' or err
151
# I think this is necessary for this closure to ever
152
# get garbage collected. **Not tested, and this could be bad.**
153
delete opts.cb
154
orig_cb?(err, result)
155
156
# Increment counter
157
{query_queue_exec} = getMetrics()
158
query_queue_exec?.labels('sent').inc()
159
# Finally, do the query.
160
@_do_query(opts)
161
162
_update: (state) =>
163
if not state.queue? or state.queue.length == 0
164
return
165
# Discard all additional messages beyond outstanding and in queue. The client is
166
# assumed to be smart enough to try again.
167
while state.queue.length + state.count > MAX_QUEUE_SIZE
168
@_discard_next_call(state)
169
# Now handle the remaining messages up to the limit.
170
while state.queue.length > 0 and (@_concurrent() < @_global_limit or state.count < @_limit)
171
@_process_next_call(state)
172
173
_discard_next_call: (state) =>
174
if not state.queue? or state.queue.length == 0
175
return
176
@_dbg("_discard_next_call -- discarding (queue size=#{state.queue.length})")
177
opts = state.queue.shift()
178
opts.cb("discarded")
179
@_info(state)
180
181
_process_next_call: (state) =>
182
if not state.queue? or state.queue.length == 0
183
return
184
state.count += 1
185
global_count += 1
186
opts = state.queue.shift()
187
@_info(state)
188
tm = new Date()
189
@_do_one_query opts, =>
190
state.count -= 1
191
global_count -= 1
192
duration_ms = new Date() - tm
193
state.time_ms.push(duration_ms)
194
{query_queue_duration,query_queue_done} = getMetrics()
195
query_queue_duration?.inc(duration_ms / 1000)
196
query_queue_done?.inc(1)
197
if state.time_ms.length > TIME_HISTORY_LENGTH
198
state.time_ms.shift()
199
@_info(state)
200
@_update(state)
201
202
_avg: (state) =>
203
# recent average time
204
v = state.time_ms.slice(state.time_ms.length - 10)
205
if v.length == 0
206
return 0
207
s = 0
208
for a in v
209
s += a
210
return s / v.length
211
212
_info: (state) =>
213
avg = @_avg(state)
214
#query_queue_info.labels(state.client_id, 'count').set(state.count)
215
#query_queue_info.labels(state.client_id, 'avg').set(avg)
216
#query_queue_info.labels(state.client_id, 'length').set(state.queue?.length ? 0)
217
#query_queue_info.labels(state.client_id, 'sent').set(state.sent)
218
@_dbg("client_id='#{state.client_id}': avg=#{avg}ms, count(local=#{state.count},global=#{global_count}), queued.length=#{state.queue?.length}, sent=#{state.sent}")
219
220