Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/hub/local_hub_connection.coffee
Views: 687
#########################################################################1# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2# License: MS-RSL – see LICENSE.md for details3#########################################################################45###6NOTE/ATTENTION!78A "local hub" is exactly the same thing as a "project". I just used to call9them "local hubs" a very long time ago.1011###121314{PROJECT_HUB_HEARTBEAT_INTERVAL_S} = require('@cocalc/util/heartbeat')1516# Connection to a Project (="local hub", for historical reasons only.)1718async = require('async')19{callback2} = require('@cocalc/util/async-utils')2021uuid = require('uuid')22winston = require('./logger').getLogger('local-hub-connection')23underscore = require('underscore')2425message = require('@cocalc/util/message')26misc_node = require('@cocalc/backend/misc_node')27{connectToLockedSocket} = require("@cocalc/backend/tcp/locked-socket")28misc = require('@cocalc/util/misc')29{defaults, required} = misc3031blobs = require('./blobs')32clients = require('./clients')3334# Blobs (e.g., files dynamically appearing as output in worksheets) are kept for this35# many seconds before being discarded. If the worksheet is saved (e.g., by a user's autosave),36# then the BLOB is saved indefinitely.37BLOB_TTL_S = 60*60*24 # 1 day3839if not process.env.SMC_TEST40DEBUG = true4142connect_to_a_local_hub = (opts) -> # opts.cb(err, socket)43opts = defaults opts,44port : required45host : required46secret_token : required47timeout : 1048cb : required4950try51socket = await connectToLockedSocket({port:opts.port, host:opts.host, token:opts.secret_token, timeout:opts.timeout})52misc_node.enable_mesg(socket, 'connection_to_a_local_hub')53opts.cb(undefined, socket)54catch err55opts.cb(err)5657_local_hub_cache = {}58exports.new_local_hub = (project_id, database, projectControl) ->59if not project_id?60throw "project_id must be specified (it is undefined)"61H = _local_hub_cache[project_id]62if H?63winston.debug("new_local_hub('#{project_id}') -- using cached version")64else65winston.debug("new_local_hub('#{project_id}') -- creating new one")66H = new LocalHub(project_id, database, projectControl)67_local_hub_cache[project_id] = H68return H6970exports.connect_to_project = (project_id, database, projectControl, cb) ->71hub = exports.new_local_hub(project_id, database, projectControl)72hub.local_hub_socket (err) ->73if err74winston.debug("connect_to_project: error ensuring connection to #{project_id} -- #{err}")75else76winston.debug("connect_to_project: successfully ensured connection to #{project_id}")77cb?(err)7879exports.disconnect_from_project = (project_id) ->80H = _local_hub_cache[project_id]81delete _local_hub_cache[project_id]82H?.free_resources()83return8485exports.all_local_hubs = () ->86v = []87for k, h of _local_hub_cache88if h?89v.push(h)90return v9192server_settings = undefined93init_server_settings = () ->94server_settings = await require('./servers/server-settings').default()95update = () ->96winston.debug("local_hub_connection (version might have changed) -- checking on clients")97for x in exports.all_local_hubs()98x.restart_if_version_too_old()99update()100server_settings.table.on('change', update)101102class LocalHub # use the function "new_local_hub" above; do not construct this directly!103constructor: (@project_id, @database, @projectControl) ->104if not server_settings? # module being used -- make sure server_settings is initialized105init_server_settings()106@_local_hub_socket_connecting = false107@_sockets = {} # key = session_uuid:client_id108@_sockets_by_client_id = {} #key = client_id, value = list of sockets for that client109@call_callbacks = {}110@path = '.' # should deprecate - *is* used by some random code elsewhere in this file111@dbg("getting deployed running project")112113init_heartbeat: =>114@dbg("init_heartbeat")115if @_heartbeat_interval? # already running116@dbg("init_heartbeat -- already running")117return118send_heartbeat = =>119@dbg("init_heartbeat -- send")120@_socket?.write_mesg('json', message.heartbeat())121@_heartbeat_interval = setInterval(send_heartbeat, PROJECT_HUB_HEARTBEAT_INTERVAL_S*1000)122123delete_heartbeat: =>124if @_heartbeat_interval?125@dbg("delete_heartbeat")126clearInterval(@_heartbeat_interval)127delete @_heartbeat_interval128129project: (cb) =>130try131cb(undefined, await @projectControl(@project_id))132catch err133cb(err)134135dbg: (m) =>136## only enable when debugging137if DEBUG138winston.debug("local_hub('#{@project_id}'): #{misc.to_json(m)}")139140restart: (cb) =>141@dbg("restart")142@free_resources()143try144await (await @projectControl(@project_id)).restart()145cb()146catch err147cb(err)148149status: (cb) =>150@dbg("status: get status of a project")151try152cb(undefined, await (await @projectControl(@project_id)).status())153catch err154cb(err)155156state: (cb) =>157@dbg("state: get state of a project")158try159cb(undefined, await (await @projectControl(@project_id)).state())160catch err161cb(err)162163free_resources: () =>164@dbg("free_resources")165@query_cancel_all_changefeeds()166@delete_heartbeat()167delete @_ephemeral168if @_ephemeral_timeout169clearTimeout(@_ephemeral_timeout)170delete @_ephemeral_timeout171delete @address # so we don't continue trying to use old address172delete @_status173delete @smc_version # so when client next connects we ignore version checks until they tell us their version174try175@_socket?.end()176winston.debug("free_resources: closed main local_hub socket")177catch e178winston.debug("free_resources: exception closing main _socket: #{e}")179delete @_socket180for k, s of @_sockets181try182s.end()183winston.debug("free_resources: closed #{k}")184catch e185winston.debug("free_resources: exception closing a socket: #{e}")186@_sockets = {}187@_sockets_by_client_id = {}188189free_resources_for_client_id: (client_id) =>190v = @_sockets_by_client_id[client_id]191if v?192@dbg("free_resources_for_client_id(#{client_id}) -- #{v.length} sockets")193for socket in v194try195socket.end()196socket.destroy()197catch e198# do nothing199delete @_sockets_by_client_id[client_id]200201# async202init_ephemeral: () =>203settings = await callback2(@database.get_project_settings, {project_id:@project_id})204@_ephemeral = misc.copy_with(settings, ['ephemeral_disk', 'ephemeral_state'])205@dbg("init_ephemeral -- #{JSON.stringify(@_ephemeral)}")206# cache for 60s207@_ephemeral_timeout = setTimeout((() => delete @_ephemeral), 60000)208209ephemeral_disk: () =>210if not @_ephemeral?211await @init_ephemeral()212return @_ephemeral.ephemeral_disk213214ephemeral_state: () =>215if not @_ephemeral?216await @init_ephemeral()217return @_ephemeral.ephemeral_state218219#220# Project query support code221#222mesg_query: (mesg, write_mesg) =>223dbg = (m) => winston.debug("mesg_query(project_id='#{@project_id}'): #{misc.trunc(m,200)}")224dbg(misc.to_json(mesg))225query = mesg.query226if not query?227write_mesg(message.error(error:"query must be defined"))228return229if await @ephemeral_state()230@dbg("project has ephemeral state")231write_mesg(message.error(error:"FATAL -- project has ephemeral state so no database queries are allowed"))232return233@dbg("project does NOT have ephemeral state")234first = true235if mesg.changes236@_query_changefeeds ?= {}237@_query_changefeeds[mesg.id] = true238mesg_id = mesg.id239@database.user_query240project_id : @project_id241query : query242options : mesg.options243changes : if mesg.changes then mesg_id244cb : (err, result) =>245if result?.action == 'close'246err = 'close'247if err248dbg("project_query error: #{misc.to_json(err)}")249if @_query_changefeeds?[mesg_id]250delete @_query_changefeeds[mesg_id]251write_mesg(message.error(error:err))252if mesg.changes and not first253# also, assume changefeed got messed up, so cancel it.254@database.user_query_cancel_changefeed(id : mesg_id)255else256#if Math.random() <= .3 # for testing -- force forgetting about changefeed with probability 10%.257# delete @_query_changefeeds[mesg_id]258if mesg.changes and not first259resp = result260resp.id = mesg_id261resp.multi_response = true262else263first = false264resp = mesg265resp.query = result266write_mesg(resp)267268mesg_query_cancel: (mesg, write_mesg) =>269if not @_query_changefeeds?270# no changefeeds271write_mesg(mesg)272else273@database.user_query_cancel_changefeed274id : mesg.id275cb : (err, resp) =>276if err277write_mesg(message.error(error:err))278else279mesg.resp = resp280write_mesg(mesg)281delete @_query_changefeeds?[mesg.id]282283query_cancel_all_changefeeds: (cb) =>284if not @_query_changefeeds? or @_query_changefeeds.length == 0285cb?(); return286dbg = (m) => winston.debug("query_cancel_all_changefeeds(project_id='#{@project_id}'): #{m}")287v = @_query_changefeeds288dbg("canceling #{v.length} changefeeds")289delete @_query_changefeeds290f = (id, cb) =>291dbg("canceling id=#{id}")292@database.user_query_cancel_changefeed293id : id294cb : (err) =>295if err296dbg("FEED: warning #{id} -- error canceling a changefeed #{misc.to_json(err)}")297else298dbg("FEED: canceled changefeed -- #{id}")299cb()300async.map(misc.keys(v), f, (err) => cb?(err))301302# async -- throws error if project doesn't have access to string with this id.303check_syncdoc_access: (string_id) =>304if not typeof string_id == 'string' and string_id.length == 40305throw Error('string_id must be specified and valid')306return307opts =308query : "SELECT project_id FROM syncstrings"309where : {"string_id = $::CHAR(40)" : string_id}310results = await callback2(@database._query, opts)311if results.rows.length != 1312throw Error("no such syncdoc")313if results.rows[0].project_id != @project_id314throw Error("project does NOT have access to this syncdoc")315return # everything is fine.316317mesg_get_syncdoc_history: (mesg, write_mesg) =>318try319# this raises an error if user does not have access320await @check_syncdoc_access(mesg.string_id)321# get the history322history = await @database.syncdoc_history_async(mesg.string_id, mesg.patches)323write_mesg(message.syncdoc_history(id:mesg.id, history:history))324catch err325write_mesg(message.error(id:mesg.id, error:"unable to get syncdoc history for string_id #{mesg.string_id} -- #{err}"))326327#328# end project query support code329#330331# local hub just told us its version. Record it. Restart project if hub version too old.332local_hub_version: (version) =>333winston.debug("local_hub_version: version=#{version}")334@smc_version = version335@restart_if_version_too_old()336337# If our known version of the project is too old compared to the338# current version_min_project in smcu-util/smc-version, then339# we restart the project, which updates the code to the latest340# version. Only restarts the project if we have an open control341# socket to it.342# Please make damn sure to update the project code on the compute343# server before updating the version, or the project will be344# forced to restart and it won't help!345restart_if_version_too_old: () =>346if not @_socket?347# not connected at all -- just return348return349if not @smc_version?350# client hasn't told us their version yet351return352if server_settings.version.version_min_project <= @smc_version353# the project is up to date354return355if @_restart_goal_version == server_settings.version.version_min_project356# We already restarted the project in an attempt to update it to this version357# and it didn't get updated. Don't try again until @_restart_version is cleared, since358# we don't want to lock a user out of their project due to somebody forgetting359# to update code on the compute server! It could also be that the project just360# didn't finish restarting.361return362363winston.debug("restart_if_version_too_old(#{@project_id}): #{@smc_version}, #{server_settings.version.version_min_project}")364# record some stuff so that we don't keep trying to restart the project constantly365ver = @_restart_goal_version = server_settings.version.version_min_project # version which we tried to get to366f = () =>367if @_restart_goal_version == ver368delete @_restart_goal_version369setTimeout(f, 15*60*1000) # don't try again for at least 15 minutes.370371@dbg("restart_if_version_too_old -- restarting since #{server_settings.version.version_min_project} > #{@smc_version}")372@restart (err) =>373@dbg("restart_if_version_too_old -- done #{err}")374375# handle incoming JSON messages from the local_hub376handle_mesg: (mesg, socket) =>377@dbg("local_hub --> hub: received mesg: #{misc.trunc(misc.to_json(mesg), 250)}")378if mesg.client_id?379# Should we worry about ensuring that message from this local hub are allowed to380# send messages to this client? NO. For them to send a message, they would have to381# know the client's id, which is a random uuid, assigned each time the user connects.382# It obviously is known to the local hub -- but if the user has connected to the local383# hub then they should be allowed to receive messages.384# NOTE: this should be possible to deprecate, because the clients all connect via385# a websocket directly to the project.386clients.pushToClient(mesg)387return388if mesg.event == 'version'389@local_hub_version(mesg.version)390return391if mesg.id?392f = @call_callbacks[mesg.id]393if f?394f(mesg)395else396winston.debug("handling call from local_hub")397write_mesg = (resp) =>398resp.id = mesg.id399@local_hub_socket (err, sock) =>400if not err401sock.write_mesg('json', resp)402switch mesg.event403when 'ping'404write_mesg(message.pong())405when 'query'406@mesg_query(mesg, write_mesg)407when 'query_cancel'408@mesg_query_cancel(mesg, write_mesg)409when 'get_syncdoc_history'410@mesg_get_syncdoc_history(mesg, write_mesg)411when 'file_written_to_project'412# ignore -- don't care; this is going away413return414when 'file_read_from_project'415# handle elsewhere by the code that requests the file416return417when 'error'418# ignore -- don't care since handler already gone.419return420else421write_mesg(message.error(error:"unknown event '#{mesg.event}'"))422return423424handle_blob: (opts) =>425opts = defaults opts,426uuid : required427blob : required428429@dbg("local_hub --> global_hub: received a blob with uuid #{opts.uuid}")430# Store blob in DB.431blobs.save_blob432uuid : opts.uuid433blob : opts.blob434project_id : @project_id435ttl : BLOB_TTL_S436check : true # if malicious user tries to overwrite a blob with given sha1 hash, they get an error.437database : @database438cb : (err, ttl) =>439if err440resp = message.save_blob(sha1:opts.uuid, error:err)441@dbg("handle_blob: error! -- #{err}")442else443resp = message.save_blob(sha1:opts.uuid, ttl:ttl)444445@local_hub_socket (err, socket) =>446if not err447socket.write_mesg('json', resp)448449# Connection to the remote local_hub daemon that we use for control.450local_hub_socket: (cb) =>451if @_socket?452#@dbg("local_hub_socket: re-using existing socket")453cb(undefined, @_socket)454return455456if @_local_hub_socket_connecting457@_local_hub_socket_queue.push(cb)458@dbg("local_hub_socket: added socket request to existing queue, which now has length #{@_local_hub_socket_queue.length}")459return460@_local_hub_socket_connecting = true461@_local_hub_socket_queue = [cb]462connecting_timer = undefined463464cancel_connecting = () =>465@_local_hub_socket_connecting = false466if @_local_hub_socket_queue?467@dbg("local_hub_socket: canceled due to timeout")468for c in @_local_hub_socket_queue469c?('timeout')470delete @_local_hub_socket_queue471clearTimeout(connecting_timer)472473# If below fails for 20s for some reason, cancel everything to allow for future attempt.474connecting_timer = setTimeout(cancel_connecting, 20000)475476@dbg("local_hub_socket: getting new socket")477@new_socket (err, socket) =>478if not @_local_hub_socket_queue?479# already gave up.480return481@_local_hub_socket_connecting = false482@dbg("local_hub_socket: new_socket returned #{err}")483if err484for c in @_local_hub_socket_queue485c?(err)486delete @_local_hub_socket_queue487else488socket.on 'mesg', (type, mesg) =>489switch type490when 'blob'491@handle_blob(mesg)492when 'json'493@handle_mesg(mesg, socket)494495socket.on('end', @free_resources)496socket.on('close', @free_resources)497socket.on('error', @free_resources)498499# Send a hello message to the local hub, so it knows this is the control connection,500# and not something else (e.g., a console).501socket.write_mesg('json', {event:'hello'})502503for c in @_local_hub_socket_queue504c?(undefined, socket)505delete @_local_hub_socket_queue506507@_socket = socket508@init_heartbeat() # start sending heartbeat over this socket509510# Finally, we wait a bit to see if the version gets sent from511# the client. If not, we set it to 0, which will cause a restart,512# which will upgrade to a new version that sends versions.513# TODO: This code can be deleted after all projects get restarted.514check_version_received = () =>515if @_socket? and not @smc_version?516@smc_version = 0517@restart_if_version_too_old()518setTimeout(check_version_received, 60*1000)519520cancel_connecting()521522# Get a new connection to the local_hub,523# authenticated via the secret_token, and enhanced524# to be able to send/receive json and blob messages.525new_socket: (cb) => # cb(err, socket)526@dbg("new_socket")527f = (cb) =>528if not @address?529cb("no address")530return531if not @address.port?532cb("no port")533return534if not @address.host?535cb("no host")536return537if not @address.secret_token?538cb("no secret_token")539return540connect_to_a_local_hub541port : @address.port542host : @address.ip ? @address.host # prefer @address.ip if it exists (e.g., for cocalc-kubernetes); otherwise use host (which is where compute server is).543secret_token : @address.secret_token544cb : cb545socket = undefined546async.series([547(cb) =>548if not @address?549@dbg("get address of a working local hub")550try551@address = await (await @projectControl(@project_id)).address()552cb()553catch err554cb(err)555else556cb()557(cb) =>558@dbg("try to connect to local hub socket using last known address")559f (err, _socket) =>560if not err561socket = _socket562cb()563else564@dbg("failed to get address of a working local hub -- #{err}")565try566@address = await (await @projectControl(@project_id)).address()567cb()568catch err569cb(err)570(cb) =>571if not socket?572@dbg("still don't have our connection -- try again")573f (err, _socket) =>574socket = _socket; cb(err)575else576cb()577], (err) =>578cb(err, socket)579)580581remove_multi_response_listener: (id) =>582delete @call_callbacks[id]583584call: (opts) =>585opts = defaults opts,586mesg : required587timeout : undefined # NOTE: a nonzero timeout MUST be specified, or we will not even listen for a response from the local hub! (Ensures leaking listeners won't happen.)588multi_response : false # if true, timeout ignored; call @remove_multi_response_listener(mesg.id) to remove589cb : undefined590@dbg("call")591if not opts.mesg.id?592if opts.timeout or opts.multi_response # opts.timeout being undefined or 0 both mean "don't do it"593opts.mesg.id = uuid.v4()594595@local_hub_socket (err, socket) =>596if err597@dbg("call: failed to get socket -- #{err}")598opts.cb?(err)599return600@dbg("call: get socket -- now writing message to the socket -- #{misc.trunc(misc.to_json(opts.mesg),200)}")601socket.write_mesg 'json', opts.mesg, (err) =>602if err603@free_resources() # at least next time it will get a new socket604opts.cb?(err)605return606if opts.multi_response607@call_callbacks[opts.mesg.id] = opts.cb608else if opts.timeout609# Listen to exactly one response, them remove the listener:610@call_callbacks[opts.mesg.id] = (resp) =>611delete @call_callbacks[opts.mesg.id]612if resp.event == 'error'613opts.cb(resp.error)614else615opts.cb(undefined, resp)616# As mentioned above -- there's no else -- if not timeout then617# we do not listen for a response.618619# Read a file from a project into memory on the hub.620# I think this is used only by the API, but not by browser clients anymore.621read_file: (opts) => # cb(err, content_of_file)622{path, project_id, archive, cb} = defaults opts,623path : required624project_id : required625archive : 'tar.bz2' # for directories; if directory, then the output object "data" has data.archive=actual extension used.626cb : required627@dbg("read_file '#{path}'")628socket = undefined629id = uuid.v4()630data = undefined631data_uuid = undefined632result_archive = undefined633634async.series([635# Get a socket connection to the local_hub.636(cb) =>637@local_hub_socket (err, _socket) =>638if err639cb(err)640else641socket = _socket642cb()643(cb) =>644socket.write_mesg('json', message.read_file_from_project(id:id, project_id:project_id, path:path, archive:archive))645socket.recv_mesg646type : 'json'647id : id648timeout : 60649cb : (mesg) =>650switch mesg.event651when 'error'652cb(mesg.error)653when 'file_read_from_project'654data_uuid = mesg.data_uuid655result_archive = mesg.archive656cb()657else658cb("Unknown mesg event '#{mesg.event}'")659(cb) =>660socket.recv_mesg661type : 'blob'662id : data_uuid663timeout : 60664cb : (_data) =>665# recv_mesg returns either a Buffer blob666# *or* a {event:'error', error:'the error'} object.667# Fortunately `new Buffer().event` is valid (and undefined).668if _data.event == 'error'669cb(_data.error)670else671data = _data672data.archive = result_archive673cb()674], (err) =>675if err676cb(err)677else678cb(undefined, data)679)680681# Write a file to a project682# I think this is used only by the API, but not by browser clients anymore.683write_file: (opts) => # cb(err)684{path, project_id, cb, data} = defaults opts,685path : required686project_id : required687data : required # what to write688cb : required689@dbg("write_file '#{path}'")690id = uuid.v4()691data_uuid = uuid.v4()692693@local_hub_socket (err, socket) =>694if err695opts.cb(err)696return697mesg = message.write_file_to_project698id : id699project_id : project_id700path : path701data_uuid : data_uuid702socket.write_mesg('json', mesg)703socket.write_mesg('blob', {uuid:data_uuid, blob:data})704socket.recv_mesg705type : 'json'706id : id707timeout : 10708cb : (mesg) =>709switch mesg.event710when 'file_written_to_project'711opts.cb()712when 'error'713opts.cb(mesg.error)714else715opts.cb("unexpected message type '#{mesg.event}'")716717718