Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Avatar for KuCalc : devops.
Download
50640 views
1
###
2
Compute client for use in Kubernetes cluster by the hub.
3
4
The hub uses this module to get information about a project. This is meant
5
to be used as part of kucalc, and replaces the other variants
6
of compute-client.coffee.
7
8
The name 'compute-client' probably isn't the best. Really this is a module
9
that gets information about and controls projects.
10
11
What this modules should acomplish:
12
13
- Modify database in response to requests to start/stop/etc project.
14
- Provide the project secret token to the hub
15
16
###
17
18
LOCAL_HUB_PORT = 6000
19
RAW_PORT = 6001
20
SAGE_SERVER_PORT = 6002
21
CONSOLE_SERVER_PORT = 6003
22
23
{EventEmitter} = require('events')
24
25
request = require('request')
26
async = require('async')
27
underscore = require('underscore')
28
29
misc = require('smc-util/misc')
30
{defaults, required} = misc
31
32
exports.get_json = get_json = (url, cb) ->
33
request.get url, (err, response, body) ->
34
if err
35
cb(err)
36
else if response.statusCode != 200
37
cb("ERROR: statusCode #{response.statusCode}")
38
else
39
try
40
cb(undefined, JSON.parse(body))
41
catch e
42
cb("ERROR: invalid JSON -- #{e} -- '#{body}'")
43
return
44
45
exports.get_file = get_file = (url, cb) ->
46
request.get url, {encoding: null}, (err, response, body) ->
47
if err
48
cb(err)
49
else if response.statusCode != 200
50
cb("ERROR: statusCode #{response.statusCode}")
51
else
52
cb(undefined, body)
53
return
54
55
exports.compute_client = (db, logger) ->
56
return new Client(db, logger)
57
58
class Dbg extends EventEmitter
59
60
project_cache = {}
61
62
quota_compute = require('./quota')
63
64
class Client
65
constructor: (@database, @logger) ->
66
@dbg("constructor")()
67
if not @database?
68
throw Error("database must be defined")
69
70
copy_paths_synctable: (cb) =>
71
if @_synctable
72
cb(undefined, @_synctable)
73
return
74
if @_synctable_cbs?
75
@_synctable_cbs.push(cb)
76
return
77
@_synctable_cbs = [cb]
78
@database.synctable
79
table : 'copy_paths'
80
columns : ['id', 'started', 'error', 'finished']
81
where :
82
"time > $::TIMESTAMP": new Date()
83
where_function : ->
84
# Whenever anything *changes* in this table, we are interested in it, so no need
85
# to do a query to decide.
86
return true
87
cb : (err, synctable) =>
88
for cb in @_synctable_cbs
89
if err
90
cb(err)
91
else
92
cb(undefined, synctable)
93
@_synctable = synctable
94
delete @_synctable_cbs
95
96
dbg: (f) =>
97
if not @logger?
98
return ->
99
       else
100
# still need @logger? since it can get cleaned
101
# up when Project is being freed.
102
return (args...) => @logger?.debug("kucalc.Client.#{f}", args...)
103
104
project: (opts) =>
105
opts = defaults opts,
106
project_id : required
107
cb : required
108
dbg = @dbg("project('#{opts.project_id}')")
109
P = project_cache[opts.project_id]
110
if P?
111
dbg('in cache')
112
if P.is_ready
113
P.active()
114
opts.cb(undefined, P)
115
else
116
P.once 'ready', (err) ->
117
opts.cb(err, P)
118
return
119
dbg("not in cache, so creating")
120
P = project_cache[opts.project_id] = new Project(@, opts.project_id, @logger, @database)
121
P.once 'ready', ->
122
opts.cb(undefined, P)
123
124
# NOTE: I think (and am assuming) that EventEmitter aspect of Project is NOT used in KuCalc by any
125
# client code.
126
class Project extends EventEmitter
127
constructor: (@client, @project_id, @logger, @database) ->
128
@host = "project-#{@project_id}"
129
dbg = @dbg('constructor')
130
dbg("initializing")
131
132
# We debounce the free function (which cleans everything up).
133
# Every time we're doing something, we call @active();
134
# once we DON'T call it for a few minutes, the project
135
# is **then** freed, because that's how debounce works.
136
@active = underscore.debounce(@free, 10*60*1000)
137
@active()
138
@database.synctable
139
table : 'projects'
140
columns : ['state', 'status', 'action_request']
141
where : {"project_id = $::UUID" : @project_id}
142
where_function : (project_id) =>
143
return project_id == @project_id # fast easy test for matching
144
cb : (err, synctable) =>
145
@active()
146
if err
147
dbg("error creating synctable ", err)
148
@emit("ready", err)
149
@close()
150
else
151
dbg("successfully created synctable; now ready")
152
@is_ready = true
153
@synctable = synctable
154
@synctable.on 'change', => @emit('change')
155
@emit("ready")
156
157
# Get the current data about the project from the database.
158
get: (field) =>
159
t = @synctable.get(@project_id)
160
if field?
161
return t?.get(field)
162
else
163
return t
164
165
getIn: (v) =>
166
return @get()?.getIn(v)
167
168
_action_request: =>
169
x = @get('action_request')?.toJS()
170
if x.started?
171
x.started = new Date(x.started)
172
if x.finished?
173
x.finished = new Date(x.finished)
174
return x
175
176
dbg: (f) =>
177
if not @logger?
178
return ->
179
       else
180
# still need @logger? since it can get cleaned
181
# up when Project is being freed.
182
return (args...) => @logger?.debug("kucalc.Project('#{@project_id}').#{f}", args...)
183
184
# free -- stop listening for status updates from the database and broadcasting
185
# updates about this project.
186
free: () =>
187
@dbg('free')()
188
delete @idle
189
if @free_check?
190
clearInterval(@free_check)
191
delete @free_check
192
# Ensure that next time this project gets requested, a fresh one is created, rather than
193
# this cached one, which has been free'd up, and will no longer work.
194
delete project_cache[@project_id]
195
# Close the changefeed, so get no further data from database.
196
@synctable?.close()
197
delete @synctable
198
delete @logger
199
delete @project_id
200
delete @compute_server
201
delete @host
202
delete @is_ready
203
# Make sure nothing else reacts to changes on this ProjectClient, since they won't happen.
204
@removeAllListeners()
205
206
state: (opts) =>
207
opts = defaults opts,
208
force : false # ignored
209
update : false # ignored
210
cb : required # cb(err, {state:?, time:?, error:?})
211
dbg = @dbg("state")
212
dbg()
213
opts.cb(undefined, @get('state')?.toJS())
214
215
status: (opts) =>
216
opts = defaults opts,
217
cb : required
218
dbg = @dbg("status")
219
dbg()
220
status = @get('status')?.toJS() ? {}
221
misc.merge status, # merge in canonical information
222
"local_hub.port" : LOCAL_HUB_PORT
223
"raw.port" : RAW_PORT
224
"sage_server.port" : SAGE_SERVER_PORT
225
"console_server.port" : CONSOLE_SERVER_PORT
226
opts.cb(undefined, status)
227
228
_action: (opts) =>
229
opts = defaults opts,
230
action : required # action to do
231
           goal      : required    # wait until goal(project) is true, where project is immutable js obj
232
           timeout_s : 300     # timeout in seconds (only used for wait)
233
cb : undefined
234
dbg = @dbg("_action('#{opts.action}')")
235
if opts.goal(@get())
236
dbg("condition already holds; nothing to do.")
237
opts.cb?()
238
return
239
240
if opts.goal?
241
dbg("start waiting for goal to be satisfied")
242
@active()
243
@synctable.wait
244
until : () =>
245
@active()
246
return opts.goal(@get())
247
timeout : opts.timeout_s
248
cb : (err) =>
249
@active()
250
dbg("done waiting for goal #{err}")
251
opts.cb?(err)
252
delete opts.cb
253
254
dbg("request action to happen")
255
@active()
256
@_query
257
jsonb_set :
258
action_request :
259
action : opts.action
260
time : new Date()
261
started : undefined
262
finished : undefined
263
cb : (err) =>
264
@active()
265
if err
266
dbg('action request failed')
267
opts.cb?(err)
268
delete opts.cb
269
else
270
dbg("action requested")
271
272
_query: (opts) =>
273
opts.query = 'UPDATE projects'
274
opts.where = {'project_id = $::UUID' : @project_id}
275
@client.database._query(opts)
276
277
open: (opts) =>
278
opts = defaults opts,
279
cb : undefined
280
dbg = @dbg("open")
281
dbg()
282
@_action
283
action : 'open'
284
goal : (project) => (project?.getIn(['state', 'state']) ? 'closed') != 'closed'
285
cb : opts.cb
286
287
start: (opts) =>
288
opts = defaults opts,
289
set_quotas : true # ignored
290
cb : undefined
291
dbg = @dbg("start")
292
dbg()
293
@_action
294
action : 'start'
295
goal : (project) -> project?.getIn(['state', 'state']) == 'running'
296
cb : opts.cb
297
298
stop: (opts) =>
299
opts = defaults opts,
300
cb : undefined
301
dbg = @dbg("stop")
302
dbg()
303
@_action
304
action : 'stop'
305
goal : (project) -> project?.getIn(['state', 'state']) in ['opened', 'closed']
306
cb : opts.cb
307
308
restart: (opts) =>
309
opts = defaults opts,
310
set_quotas : true # ignored
311
cb : undefined
312
dbg = @dbg("restart")
313
dbg()
314
async.series([
315
(cb) =>
316
@stop(cb:cb)
317
(cb) =>
318
@start(cb:cb)
319
], (err) => opts.cb?(err))
320
321
ensure_running: (opts) =>
322
@start(opts) # it's just the same
323
324
ensure_closed: (opts) =>
325
opts = defaults opts,
326
cb : undefined
327
dbg = @dbg("ensure_closed")
328
dbg()
329
@_action
330
action : 'close'
331
goal : (project) -> project?.getIn(['state', 'state']) == 'closed'
332
cb : opts.cb
333
334
move: (opts) =>
335
opts = defaults opts,
336
target : undefined # ignored
337
force : false # ignored for now
338
cb : required
339
opts.cb("move makes no sense for Kubernetes")
340
341
address: (opts) =>
342
opts = defaults opts,
343
cb : required
344
dbg = @dbg("address")
345
dbg('first ensure is running')
346
@ensure_running
347
cb : (err) =>
348
if err
349
dbg('error starting it up')
350
opts.cb(err)
351
return
352
dbg('it is running')
353
address =
354
host : @host
355
port : LOCAL_HUB_PORT
356
secret_token : @getIn(['status', 'secret_token'])
357
if not address.secret_token
358
err = 'BUG -- running, but no secret_token!'
359
dbg(err)
360
opts.cb(err)
361
else
362
opts.cb(undefined, address)
363
364
# this is a no-op for Kubernetes; this was only used for serving
365
# some static websites, e.g., wstein.org, so may evolve into that...
366
save: (opts) =>
367
opts = defaults opts,
368
min_interval : undefined # ignored
369
cb : undefined # ignored
370
dbg = @dbg("save(min_interval:#{opts.min_interval})")
371
dbg()
372
opts.cb?()
373
374
copy_path: (opts) =>
375
opts = defaults opts,
376
path : ""
377
target_project_id : ""
378
target_path : "" # path into project; if "", defaults to path above.
379
overwrite_newer : undefined # if true, newer files in target are copied over (otherwise, uses rsync's --update)
380
delete_missing : undefined # if true, delete files in dest path not in source, **including** newer files
381
backup : undefined # make backup files
382
exclude_history : undefined
383
timeout : 5*60
384
bwlimit : '5MB'
385
cb : undefined
386
if not opts.target_project_id
387
opts.target_project_id = @project_id
388
if not opts.target_path
389
opts.target_path = opts.path
390
synctable = undefined
391
copy_id = misc.uuid()
392
dbg = @dbg("copy_path('#{opts.path}', id='#{copy_id}')")
393
dbg("copy a path using rsync from one project to another")
394
@active()
395
async.series([
396
(cb) =>
397
dbg("get synctable")
398
@client.copy_paths_synctable (err, s) =>
399
synctable = s; cb(err)
400
(cb) =>
401
@active()
402
dbg('write query requesting the copy to the database')
403
@database._query
404
query : "INSERT INTO copy_paths"
405
values :
406
"id ::UUID" : copy_id
407
"time ::TIMESTAMP" : new Date()
408
"source_project_id ::UUID" : @project_id
409
"source_path ::TEXT" : opts.path
410
"target_project_id ::UUID" : opts.target_project_id
411
"target_path ::TEXT" : opts.target_path
412
"overwrite_newer ::BOOLEAN" : opts.overwrite_newer
413
"delete_missing ::BOOLEAN" : opts.delete_missing
414
"backup ::BOOLEAN" : opts.backup
415
"bwlimit ::TEXT" : opts.bwlimit
416
"timeout ::NUMERIC" : opts.timeout
417
cb: cb
418
(cb) =>
419
@active()
420
if synctable.getIn([copy_id, 'finished'])
421
dbg("copy instantly finished")
422
# no way this ever happens - the server can't be that fast.
423
# but just in case, logically we have to check this case.
424
cb()
425
return
426
dbg('waiting for copy to finish...')
427
handle_change = =>
428
obj = synctable.get(copy_id)
429
if obj?.get('started')
430
dbg("copy started...")
431
if obj?.get('finished')
432
dbg("copy finished!")
433
synctable.removeListener('change', handle_change)
434
cb(obj.get('error'))
435
synctable.on('change', handle_change)
436
], (err) =>
437
@active()
438
dbg('done', err)
439
opts.cb?(err)
440
)
441
442
directory_listing: (opts) =>
443
opts = defaults opts,
444
path : ''
445
hidden : false # used
446
time : undefined # ignored/deprecated
447
start : undefined # ignored/deprecated
448
limit : undefined # ignored/deprecated
449
cb : required
450
dbg = @dbg("directory_listing")
451
dbg()
452
listing = undefined
453
async.series([
454
(cb) =>
455
dbg("starting project if necessary...")
456
@start(cb:cb)
457
(cb) =>
458
# TODO: This URL is obviously very specific to KuCalc -- hardcoded port and base url.
459
url = "http://project-#{@project_id}:6001/#{@project_id}/raw/.smc/directory_listing/#{opts.path}"
460
dbg("fetching listing from '#{url}'")
461
if opts.hidden
462
url += '?hidden=true'
463
misc.retry_until_success
464
f : (cb) =>
465
@active()
466
get_json url, (err, x) =>
467
listing = x
468
cb(err)
469
max_time : 30000
470
start_delay : 2000
471
max_delay : 7000
472
cb : cb
473
], (err) =>
474
@active()
475
opts.cb(err, listing)
476
)
477
478
read_file: (opts) =>
479
opts = defaults opts,
480
path : required
481
maxsize : 5000000 # maximum file size in bytes to read
482
cb : required # cb(err, Buffer)
483
dbg = @dbg("read_file(path:'#{opts.path}')")
484
dbg("read a file from disk")
485
content = undefined
486
@active()
487
async.series([
488
(cb) =>
489
# (this also starts the project)
490
# TODO: get listing and confirm size
491
# TODO - obviusly we should just stream... so there is much less of a limit... though
492
# limits are good, as this frickin' costs!
493
{dir, base} = require('path').parse(opts.path)
494
if not base
495
cb("not a file -- '#{base}'")
496
return
497
@directory_listing
498
path : dir
499
hidden : true
500
cb : (err, listing) =>
501
if err
502
cb(err)
503
else
504
for x in listing?.files ? []
505
if x.name == base
506
if x.size <= opts.maxsize
507
cb()
508
return
509
cb('file too big or not found in listing')
510
(cb) =>
511
url = "http://project-#{@project_id}:6001/#{@project_id}/raw/#{opts.path}"
512
dbg("fetching file from '#{url}'")
513
misc.retry_until_success
514
f : (cb) =>
515
@active()
516
get_file url, (err, x) =>
517
content = x
518
cb(err)
519
max_time : 30000
520
start_delay : 2000
521
max_delay : 7000
522
cb : cb
523
], (err) =>
524
@active()
525
opts.cb(err, content)
526
)
527
528
###
529
set_all_quotas ensures that if the project is running and the quotas
530
(except idle_timeout) have changed, then the project is restarted.
531
###
532
set_all_quotas: (opts) =>
533
opts = defaults opts,
534
cb : required
535
dbg = @dbg("set_all_quotas")
536
dbg()
537
# 1. Get data about project from the database, namely:
538
# - is project currently running (if not, nothing to do)
539
# - if running, what quotas it was started with and what its quotas are now
540
# 2. If quotas differ, restarts project.
541
@active()
542
@database.get_project
543
project_id : @project_id
544
columns : ['state', 'users', 'settings', 'run_quota']
545
cb : (err, x) =>
546
@active()
547
if err
548
dbg("error -- #{err}")
549
opts.cb(err)
550
return
551
if x.state?.state not in ['running', 'starting', 'pending']
552
dbg("project not active")
553
opts.cb()
554
return
555
cur = quota_compute.quota(x.settings, x.users)
556
if underscore.isEqual(x.run_quota, cur)
557
dbg("running, but no quotas changed")
558
opts.cb()
559
else
560
dbg('running and a quota changed; restart')
561
@restart(cb:opts.cb)
562
563
564
565
566
567