LOCAL_HUB_PORT = 6000
RAW_PORT = 6001
SAGE_SERVER_PORT = 6002
CONSOLE_SERVER_PORT = 6003
{EventEmitter} = require('events')
request = require('request')
async = require('async')
underscore = require('underscore')
misc = require('smc-util/misc')
{defaults, required} = misc
exports.get_json = get_json = (url, cb) ->
request.get url, (err, response, body) ->
if err
cb(err)
else if response.statusCode != 200
cb("ERROR: statusCode #{response.statusCode}")
else
try
cb(undefined, JSON.parse(body))
catch e
cb("ERROR: invalid JSON -- #{e} -- '#{body}'")
return
exports.get_file = get_file = (url, cb) ->
request.get url, {encoding: null}, (err, response, body) ->
if err
cb(err)
else if response.statusCode != 200
cb("ERROR: statusCode #{response.statusCode}")
else
cb(undefined, body)
return
exports.compute_client = (db, logger) ->
return new Client(db, logger)
class Dbg extends EventEmitter
project_cache = {}
quota_compute = require('./quota')
class Client
constructor: (@database, @logger) ->
@dbg("constructor")()
if not @database?
throw Error("database must be defined")
copy_paths_synctable: (cb) =>
if @_synctable
cb(undefined, @_synctable)
return
if @_synctable_cbs?
@_synctable_cbs.push(cb)
return
@_synctable_cbs = [cb]
@database.synctable
table : 'copy_paths'
columns : ['id', 'started', 'error', 'finished']
where :
"time > $::TIMESTAMP": new Date()
where_function : ->
return true
cb : (err, synctable) =>
for cb in @_synctable_cbs
if err
cb(err)
else
cb(undefined, synctable)
@_synctable = synctable
delete @_synctable_cbs
dbg: (f) =>
if not @logger?
return ->
else
return (args...) => @logger?.debug("kucalc.Client.#{f}", args...)
project: (opts) =>
opts = defaults opts,
project_id : required
cb : required
dbg = @dbg("project('#{opts.project_id}')")
P = project_cache[opts.project_id]
if P?
dbg('in cache')
if P.is_ready
P.active()
opts.cb(undefined, P)
else
P.once 'ready', (err) ->
opts.cb(err, P)
return
dbg("not in cache, so creating")
P = project_cache[opts.project_id] = new Project(@, opts.project_id, @logger, @database)
P.once 'ready', ->
opts.cb(undefined, P)
class Project extends EventEmitter
constructor: (@client, @project_id, @logger, @database) ->
@host = "project-#{@project_id}"
dbg = @dbg('constructor')
dbg("initializing")
@active = underscore.debounce(@free, 10*60*1000)
@active()
@database.synctable
table : 'projects'
columns : ['state', 'status', 'action_request']
where : {"project_id = $::UUID" : @project_id}
where_function : (project_id) =>
return project_id == @project_id
cb : (err, synctable) =>
@active()
if err
dbg("error creating synctable ", err)
@emit("ready", err)
@close()
else
dbg("successfully created synctable; now ready")
@is_ready = true
@synctable = synctable
@synctable.on 'change', => @emit('change')
@emit("ready")
get: (field) =>
t = @synctable.get(@project_id)
if field?
return t?.get(field)
else
return t
getIn: (v) =>
return @get()?.getIn(v)
_action_request: =>
x = @get('action_request')?.toJS()
if x.started?
x.started = new Date(x.started)
if x.finished?
x.finished = new Date(x.finished)
return x
dbg: (f) =>
if not @logger?
return ->
else
return (args...) => @logger?.debug("kucalc.Project('#{@project_id}').#{f}", args...)
free: () =>
@dbg('free')()
delete @idle
if @free_check?
clearInterval(@free_check)
delete @free_check
delete project_cache[@project_id]
@synctable?.close()
delete @synctable
delete @logger
delete @project_id
delete @compute_server
delete @host
delete @is_ready
@removeAllListeners()
state: (opts) =>
opts = defaults opts,
force : false
update : false
cb : required
dbg = @dbg("state")
dbg()
opts.cb(undefined, @get('state')?.toJS())
status: (opts) =>
opts = defaults opts,
cb : required
dbg = @dbg("status")
dbg()
status = @get('status')?.toJS() ? {}
misc.merge status,
"local_hub.port" : LOCAL_HUB_PORT
"raw.port" : RAW_PORT
"sage_server.port" : SAGE_SERVER_PORT
"console_server.port" : CONSOLE_SERVER_PORT
opts.cb(undefined, status)
_action: (opts) =>
opts = defaults opts,
action : required
goal : required
timeout_s : 300
cb : undefined
dbg = @dbg("_action('#{opts.action}')")
if opts.goal(@get())
dbg("condition already holds; nothing to do.")
opts.cb?()
return
if opts.goal?
dbg("start waiting for goal to be satisfied")
@active()
@synctable.wait
until : () =>
@active()
return opts.goal(@get())
timeout : opts.timeout_s
cb : (err) =>
@active()
dbg("done waiting for goal #{err}")
opts.cb?(err)
delete opts.cb
dbg("request action to happen")
@active()
@_query
jsonb_set :
action_request :
action : opts.action
time : new Date()
started : undefined
finished : undefined
cb : (err) =>
@active()
if err
dbg('action request failed')
opts.cb?(err)
delete opts.cb
else
dbg("action requested")
_query: (opts) =>
opts.query = 'UPDATE projects'
opts.where = {'project_id = $::UUID' : @project_id}
@client.database._query(opts)
open: (opts) =>
opts = defaults opts,
cb : undefined
dbg = @dbg("open")
dbg()
@_action
action : 'open'
goal : (project) => (project?.getIn(['state', 'state']) ? 'closed') != 'closed'
cb : opts.cb
start: (opts) =>
opts = defaults opts,
set_quotas : true
cb : undefined
dbg = @dbg("start")
dbg()
@_action
action : 'start'
goal : (project) -> project?.getIn(['state', 'state']) == 'running'
cb : opts.cb
stop: (opts) =>
opts = defaults opts,
cb : undefined
dbg = @dbg("stop")
dbg()
@_action
action : 'stop'
goal : (project) -> project?.getIn(['state', 'state']) in ['opened', 'closed']
cb : opts.cb
restart: (opts) =>
opts = defaults opts,
set_quotas : true
cb : undefined
dbg = @dbg("restart")
dbg()
async.series([
(cb) =>
@stop(cb:cb)
(cb) =>
@start(cb:cb)
], (err) => opts.cb?(err))
ensure_running: (opts) =>
@start(opts)
ensure_closed: (opts) =>
opts = defaults opts,
cb : undefined
dbg = @dbg("ensure_closed")
dbg()
@_action
action : 'close'
goal : (project) -> project?.getIn(['state', 'state']) == 'closed'
cb : opts.cb
move: (opts) =>
opts = defaults opts,
target : undefined
force : false
cb : required
opts.cb("move makes no sense for Kubernetes")
address: (opts) =>
opts = defaults opts,
cb : required
dbg = @dbg("address")
dbg('first ensure is running')
@ensure_running
cb : (err) =>
if err
dbg('error starting it up')
opts.cb(err)
return
dbg('it is running')
address =
host : @host
port : LOCAL_HUB_PORT
secret_token : @getIn(['status', 'secret_token'])
if not address.secret_token
err = 'BUG -- running, but no secret_token!'
dbg(err)
opts.cb(err)
else
opts.cb(undefined, address)
save: (opts) =>
opts = defaults opts,
min_interval : undefined
cb : undefined
dbg = @dbg("save(min_interval:#{opts.min_interval})")
dbg()
opts.cb?()
copy_path: (opts) =>
opts = defaults opts,
path : ""
target_project_id : ""
target_path : ""
overwrite_newer : undefined
delete_missing : undefined
backup : undefined
exclude_history : undefined
timeout : 5*60
bwlimit : '5MB'
cb : undefined
if not opts.target_project_id
opts.target_project_id = @project_id
if not opts.target_path
opts.target_path = opts.path
synctable = undefined
copy_id = misc.uuid()
dbg = @dbg("copy_path('#{opts.path}', id='#{copy_id}')")
dbg("copy a path using rsync from one project to another")
@active()
async.series([
(cb) =>
dbg("get synctable")
@client.copy_paths_synctable (err, s) =>
synctable = s; cb(err)
(cb) =>
@active()
dbg('write query requesting the copy to the database')
@database._query
query : "INSERT INTO copy_paths"
values :
"id ::UUID" : copy_id
"time ::TIMESTAMP" : new Date()
"source_project_id ::UUID" : @project_id
"source_path ::TEXT" : opts.path
"target_project_id ::UUID" : opts.target_project_id
"target_path ::TEXT" : opts.target_path
"overwrite_newer ::BOOLEAN" : opts.overwrite_newer
"delete_missing ::BOOLEAN" : opts.delete_missing
"backup ::BOOLEAN" : opts.backup
"bwlimit ::TEXT" : opts.bwlimit
"timeout ::NUMERIC" : opts.timeout
cb: cb
(cb) =>
@active()
if synctable.getIn([copy_id, 'finished'])
dbg("copy instantly finished")
cb()
return
dbg('waiting for copy to finish...')
handle_change = =>
obj = synctable.get(copy_id)
if obj?.get('started')
dbg("copy started...")
if obj?.get('finished')
dbg("copy finished!")
synctable.removeListener('change', handle_change)
cb(obj.get('error'))
synctable.on('change', handle_change)
], (err) =>
@active()
dbg('done', err)
opts.cb?(err)
)
directory_listing: (opts) =>
opts = defaults opts,
path : ''
hidden : false
time : undefined
start : undefined
limit : undefined
cb : required
dbg = @dbg("directory_listing")
dbg()
listing = undefined
async.series([
(cb) =>
dbg("starting project if necessary...")
@start(cb:cb)
(cb) =>
url = "http://project-#{@project_id}:6001/#{@project_id}/raw/.smc/directory_listing/#{opts.path}"
dbg("fetching listing from '#{url}'")
if opts.hidden
url += '?hidden=true'
misc.retry_until_success
f : (cb) =>
@active()
get_json url, (err, x) =>
listing = x
cb(err)
max_time : 30000
start_delay : 2000
max_delay : 7000
cb : cb
], (err) =>
@active()
opts.cb(err, listing)
)
read_file: (opts) =>
opts = defaults opts,
path : required
maxsize : 5000000
cb : required
dbg = @dbg("read_file(path:'#{opts.path}')")
dbg("read a file from disk")
content = undefined
@active()
async.series([
(cb) =>
{dir, base} = require('path').parse(opts.path)
if not base
cb("not a file -- '#{base}'")
return
@directory_listing
path : dir
hidden : true
cb : (err, listing) =>
if err
cb(err)
else
for x in listing?.files ? []
if x.name == base
if x.size <= opts.maxsize
cb()
return
cb('file too big or not found in listing')
(cb) =>
url = "http://project-#{@project_id}:6001/#{@project_id}/raw/#{opts.path}"
dbg("fetching file from '#{url}'")
misc.retry_until_success
f : (cb) =>
@active()
get_file url, (err, x) =>
content = x
cb(err)
max_time : 30000
start_delay : 2000
max_delay : 7000
cb : cb
], (err) =>
@active()
opts.cb(err, content)
)
set_all_quotas: (opts) =>
opts = defaults opts,
cb : required
dbg = @dbg("set_all_quotas")
dbg()
@active()
@database.get_project
project_id : @project_id
columns : ['state', 'users', 'settings', 'run_quota']
cb : (err, x) =>
@active()
if err
dbg("error -- #{err}")
opts.cb(err)
return
if x.state?.state not in ['running', 'starting', 'pending']
dbg("project not active")
opts.cb()
return
cur = quota_compute.quota(x.settings, x.users)
if underscore.isEqual(x.run_quota, cur)
dbg("running, but no quotas changed")
opts.cb()
else
dbg('running and a quota changed; restart')
@restart(cb:opts.cb)