CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/hub/local_hub_connection.coffee
Views: 687
1
#########################################################################
2
# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
# License: MS-RSL – see LICENSE.md for details
4
#########################################################################
5
6
###
7
NOTE/ATTENTION!
8
9
A "local hub" is exactly the same thing as a "project". I just used to call
10
them "local hubs" a very long time ago.
11
12
###
13
14
15
{PROJECT_HUB_HEARTBEAT_INTERVAL_S} = require('@cocalc/util/heartbeat')
16
17
# Connection to a Project (="local hub", for historical reasons only.)
18
19
async = require('async')
20
{callback2} = require('@cocalc/util/async-utils')
21
22
uuid = require('uuid')
23
winston = require('./logger').getLogger('local-hub-connection')
24
underscore = require('underscore')
25
26
message = require('@cocalc/util/message')
27
misc_node = require('@cocalc/backend/misc_node')
28
{connectToLockedSocket} = require("@cocalc/backend/tcp/locked-socket")
29
misc = require('@cocalc/util/misc')
30
{defaults, required} = misc
31
32
blobs = require('./blobs')
33
clients = require('./clients')
34
35
# Blobs (e.g., files dynamically appearing as output in worksheets) are kept for this
36
# many seconds before being discarded. If the worksheet is saved (e.g., by a user's autosave),
37
# then the BLOB is saved indefinitely.
38
BLOB_TTL_S = 60*60*24 # 1 day
39
40
if not process.env.SMC_TEST
41
DEBUG = true
42
43
connect_to_a_local_hub = (opts) -> # opts.cb(err, socket)
44
opts = defaults opts,
45
port : required
46
host : required
47
secret_token : required
48
timeout : 10
49
cb : required
50
51
try
52
socket = await connectToLockedSocket({port:opts.port, host:opts.host, token:opts.secret_token, timeout:opts.timeout})
53
misc_node.enable_mesg(socket, 'connection_to_a_local_hub')
54
opts.cb(undefined, socket)
55
catch err
56
opts.cb(err)
57
58
_local_hub_cache = {}
59
exports.new_local_hub = (project_id, database, projectControl) ->
60
if not project_id?
61
throw "project_id must be specified (it is undefined)"
62
H = _local_hub_cache[project_id]
63
if H?
64
winston.debug("new_local_hub('#{project_id}') -- using cached version")
65
else
66
winston.debug("new_local_hub('#{project_id}') -- creating new one")
67
H = new LocalHub(project_id, database, projectControl)
68
_local_hub_cache[project_id] = H
69
return H
70
71
exports.connect_to_project = (project_id, database, projectControl, cb) ->
72
hub = exports.new_local_hub(project_id, database, projectControl)
73
hub.local_hub_socket (err) ->
74
if err
75
winston.debug("connect_to_project: error ensuring connection to #{project_id} -- #{err}")
76
else
77
winston.debug("connect_to_project: successfully ensured connection to #{project_id}")
78
cb?(err)
79
80
exports.disconnect_from_project = (project_id) ->
81
H = _local_hub_cache[project_id]
82
delete _local_hub_cache[project_id]
83
H?.free_resources()
84
return
85
86
exports.all_local_hubs = () ->
87
v = []
88
for k, h of _local_hub_cache
89
if h?
90
v.push(h)
91
return v
92
93
server_settings = undefined
94
init_server_settings = () ->
95
server_settings = await require('./servers/server-settings').default()
96
update = () ->
97
winston.debug("local_hub_connection (version might have changed) -- checking on clients")
98
for x in exports.all_local_hubs()
99
x.restart_if_version_too_old()
100
update()
101
server_settings.table.on('change', update)
102
103
class LocalHub # use the function "new_local_hub" above; do not construct this directly!
104
constructor: (@project_id, @database, @projectControl) ->
105
if not server_settings? # module being used -- make sure server_settings is initialized
106
init_server_settings()
107
@_local_hub_socket_connecting = false
108
@_sockets = {} # key = session_uuid:client_id
109
@_sockets_by_client_id = {} #key = client_id, value = list of sockets for that client
110
@call_callbacks = {}
111
@path = '.' # should deprecate - *is* used by some random code elsewhere in this file
112
@dbg("getting deployed running project")
113
114
init_heartbeat: =>
115
@dbg("init_heartbeat")
116
if @_heartbeat_interval? # already running
117
@dbg("init_heartbeat -- already running")
118
return
119
send_heartbeat = =>
120
@dbg("init_heartbeat -- send")
121
@_socket?.write_mesg('json', message.heartbeat())
122
@_heartbeat_interval = setInterval(send_heartbeat, PROJECT_HUB_HEARTBEAT_INTERVAL_S*1000)
123
124
delete_heartbeat: =>
125
if @_heartbeat_interval?
126
@dbg("delete_heartbeat")
127
clearInterval(@_heartbeat_interval)
128
delete @_heartbeat_interval
129
130
project: (cb) =>
131
try
132
cb(undefined, await @projectControl(@project_id))
133
catch err
134
cb(err)
135
136
dbg: (m) =>
137
## only enable when debugging
138
if DEBUG
139
winston.debug("local_hub('#{@project_id}'): #{misc.to_json(m)}")
140
141
restart: (cb) =>
142
@dbg("restart")
143
@free_resources()
144
try
145
await (await @projectControl(@project_id)).restart()
146
cb()
147
catch err
148
cb(err)
149
150
status: (cb) =>
151
@dbg("status: get status of a project")
152
try
153
cb(undefined, await (await @projectControl(@project_id)).status())
154
catch err
155
cb(err)
156
157
state: (cb) =>
158
@dbg("state: get state of a project")
159
try
160
cb(undefined, await (await @projectControl(@project_id)).state())
161
catch err
162
cb(err)
163
164
free_resources: () =>
165
@dbg("free_resources")
166
@query_cancel_all_changefeeds()
167
@delete_heartbeat()
168
delete @_ephemeral
169
if @_ephemeral_timeout
170
clearTimeout(@_ephemeral_timeout)
171
delete @_ephemeral_timeout
172
delete @address # so we don't continue trying to use old address
173
delete @_status
174
delete @smc_version # so when client next connects we ignore version checks until they tell us their version
175
try
176
@_socket?.end()
177
winston.debug("free_resources: closed main local_hub socket")
178
catch e
179
winston.debug("free_resources: exception closing main _socket: #{e}")
180
delete @_socket
181
for k, s of @_sockets
182
try
183
s.end()
184
winston.debug("free_resources: closed #{k}")
185
catch e
186
winston.debug("free_resources: exception closing a socket: #{e}")
187
@_sockets = {}
188
@_sockets_by_client_id = {}
189
190
free_resources_for_client_id: (client_id) =>
191
v = @_sockets_by_client_id[client_id]
192
if v?
193
@dbg("free_resources_for_client_id(#{client_id}) -- #{v.length} sockets")
194
for socket in v
195
try
196
socket.end()
197
socket.destroy()
198
catch e
199
# do nothing
200
delete @_sockets_by_client_id[client_id]
201
202
# async
203
init_ephemeral: () =>
204
settings = await callback2(@database.get_project_settings, {project_id:@project_id})
205
@_ephemeral = misc.copy_with(settings, ['ephemeral_disk', 'ephemeral_state'])
206
@dbg("init_ephemeral -- #{JSON.stringify(@_ephemeral)}")
207
# cache for 60s
208
@_ephemeral_timeout = setTimeout((() => delete @_ephemeral), 60000)
209
210
ephemeral_disk: () =>
211
if not @_ephemeral?
212
await @init_ephemeral()
213
return @_ephemeral.ephemeral_disk
214
215
ephemeral_state: () =>
216
if not @_ephemeral?
217
await @init_ephemeral()
218
return @_ephemeral.ephemeral_state
219
220
#
221
# Project query support code
222
#
223
mesg_query: (mesg, write_mesg) =>
224
dbg = (m) => winston.debug("mesg_query(project_id='#{@project_id}'): #{misc.trunc(m,200)}")
225
dbg(misc.to_json(mesg))
226
query = mesg.query
227
if not query?
228
write_mesg(message.error(error:"query must be defined"))
229
return
230
if await @ephemeral_state()
231
@dbg("project has ephemeral state")
232
write_mesg(message.error(error:"FATAL -- project has ephemeral state so no database queries are allowed"))
233
return
234
@dbg("project does NOT have ephemeral state")
235
first = true
236
if mesg.changes
237
@_query_changefeeds ?= {}
238
@_query_changefeeds[mesg.id] = true
239
mesg_id = mesg.id
240
@database.user_query
241
project_id : @project_id
242
query : query
243
options : mesg.options
244
changes : if mesg.changes then mesg_id
245
cb : (err, result) =>
246
if result?.action == 'close'
247
err = 'close'
248
if err
249
dbg("project_query error: #{misc.to_json(err)}")
250
if @_query_changefeeds?[mesg_id]
251
delete @_query_changefeeds[mesg_id]
252
write_mesg(message.error(error:err))
253
if mesg.changes and not first
254
# also, assume changefeed got messed up, so cancel it.
255
@database.user_query_cancel_changefeed(id : mesg_id)
256
else
257
#if Math.random() <= .3 # for testing -- force forgetting about changefeed with probability 10%.
258
# delete @_query_changefeeds[mesg_id]
259
if mesg.changes and not first
260
resp = result
261
resp.id = mesg_id
262
resp.multi_response = true
263
else
264
first = false
265
resp = mesg
266
resp.query = result
267
write_mesg(resp)
268
269
mesg_query_cancel: (mesg, write_mesg) =>
270
if not @_query_changefeeds?
271
# no changefeeds
272
write_mesg(mesg)
273
else
274
@database.user_query_cancel_changefeed
275
id : mesg.id
276
cb : (err, resp) =>
277
if err
278
write_mesg(message.error(error:err))
279
else
280
mesg.resp = resp
281
write_mesg(mesg)
282
delete @_query_changefeeds?[mesg.id]
283
284
query_cancel_all_changefeeds: (cb) =>
285
if not @_query_changefeeds? or @_query_changefeeds.length == 0
286
cb?(); return
287
dbg = (m) => winston.debug("query_cancel_all_changefeeds(project_id='#{@project_id}'): #{m}")
288
v = @_query_changefeeds
289
dbg("canceling #{v.length} changefeeds")
290
delete @_query_changefeeds
291
f = (id, cb) =>
292
dbg("canceling id=#{id}")
293
@database.user_query_cancel_changefeed
294
id : id
295
cb : (err) =>
296
if err
297
dbg("FEED: warning #{id} -- error canceling a changefeed #{misc.to_json(err)}")
298
else
299
dbg("FEED: canceled changefeed -- #{id}")
300
cb()
301
async.map(misc.keys(v), f, (err) => cb?(err))
302
303
# async -- throws error if project doesn't have access to string with this id.
304
check_syncdoc_access: (string_id) =>
305
if not typeof string_id == 'string' and string_id.length == 40
306
throw Error('string_id must be specified and valid')
307
return
308
opts =
309
query : "SELECT project_id FROM syncstrings"
310
where : {"string_id = $::CHAR(40)" : string_id}
311
results = await callback2(@database._query, opts)
312
if results.rows.length != 1
313
throw Error("no such syncdoc")
314
if results.rows[0].project_id != @project_id
315
throw Error("project does NOT have access to this syncdoc")
316
return # everything is fine.
317
318
mesg_get_syncdoc_history: (mesg, write_mesg) =>
319
try
320
# this raises an error if user does not have access
321
await @check_syncdoc_access(mesg.string_id)
322
# get the history
323
history = await @database.syncdoc_history_async(mesg.string_id, mesg.patches)
324
write_mesg(message.syncdoc_history(id:mesg.id, history:history))
325
catch err
326
write_mesg(message.error(id:mesg.id, error:"unable to get syncdoc history for string_id #{mesg.string_id} -- #{err}"))
327
328
#
329
# end project query support code
330
#
331
332
# local hub just told us its version. Record it. Restart project if hub version too old.
333
local_hub_version: (version) =>
334
winston.debug("local_hub_version: version=#{version}")
335
@smc_version = version
336
@restart_if_version_too_old()
337
338
# If our known version of the project is too old compared to the
339
# current version_min_project in smcu-util/smc-version, then
340
# we restart the project, which updates the code to the latest
341
# version. Only restarts the project if we have an open control
342
# socket to it.
343
# Please make damn sure to update the project code on the compute
344
# server before updating the version, or the project will be
345
# forced to restart and it won't help!
346
restart_if_version_too_old: () =>
347
if not @_socket?
348
# not connected at all -- just return
349
return
350
if not @smc_version?
351
# client hasn't told us their version yet
352
return
353
if server_settings.version.version_min_project <= @smc_version
354
# the project is up to date
355
return
356
if @_restart_goal_version == server_settings.version.version_min_project
357
# We already restarted the project in an attempt to update it to this version
358
# and it didn't get updated. Don't try again until @_restart_version is cleared, since
359
# we don't want to lock a user out of their project due to somebody forgetting
360
# to update code on the compute server! It could also be that the project just
361
# didn't finish restarting.
362
return
363
364
winston.debug("restart_if_version_too_old(#{@project_id}): #{@smc_version}, #{server_settings.version.version_min_project}")
365
# record some stuff so that we don't keep trying to restart the project constantly
366
ver = @_restart_goal_version = server_settings.version.version_min_project # version which we tried to get to
367
f = () =>
368
if @_restart_goal_version == ver
369
delete @_restart_goal_version
370
setTimeout(f, 15*60*1000) # don't try again for at least 15 minutes.
371
372
@dbg("restart_if_version_too_old -- restarting since #{server_settings.version.version_min_project} > #{@smc_version}")
373
@restart (err) =>
374
@dbg("restart_if_version_too_old -- done #{err}")
375
376
# handle incoming JSON messages from the local_hub
377
handle_mesg: (mesg, socket) =>
378
@dbg("local_hub --> hub: received mesg: #{misc.trunc(misc.to_json(mesg), 250)}")
379
if mesg.client_id?
380
# Should we worry about ensuring that message from this local hub are allowed to
381
# send messages to this client? NO. For them to send a message, they would have to
382
# know the client's id, which is a random uuid, assigned each time the user connects.
383
# It obviously is known to the local hub -- but if the user has connected to the local
384
# hub then they should be allowed to receive messages.
385
# NOTE: this should be possible to deprecate, because the clients all connect via
386
# a websocket directly to the project.
387
clients.pushToClient(mesg)
388
return
389
if mesg.event == 'version'
390
@local_hub_version(mesg.version)
391
return
392
if mesg.id?
393
f = @call_callbacks[mesg.id]
394
if f?
395
f(mesg)
396
else
397
winston.debug("handling call from local_hub")
398
write_mesg = (resp) =>
399
resp.id = mesg.id
400
@local_hub_socket (err, sock) =>
401
if not err
402
sock.write_mesg('json', resp)
403
switch mesg.event
404
when 'ping'
405
write_mesg(message.pong())
406
when 'query'
407
@mesg_query(mesg, write_mesg)
408
when 'query_cancel'
409
@mesg_query_cancel(mesg, write_mesg)
410
when 'get_syncdoc_history'
411
@mesg_get_syncdoc_history(mesg, write_mesg)
412
when 'file_written_to_project'
413
# ignore -- don't care; this is going away
414
return
415
when 'file_read_from_project'
416
# handle elsewhere by the code that requests the file
417
return
418
when 'error'
419
# ignore -- don't care since handler already gone.
420
return
421
else
422
write_mesg(message.error(error:"unknown event '#{mesg.event}'"))
423
return
424
425
handle_blob: (opts) =>
426
opts = defaults opts,
427
uuid : required
428
blob : required
429
430
@dbg("local_hub --> global_hub: received a blob with uuid #{opts.uuid}")
431
# Store blob in DB.
432
blobs.save_blob
433
uuid : opts.uuid
434
blob : opts.blob
435
project_id : @project_id
436
ttl : BLOB_TTL_S
437
check : true # if malicious user tries to overwrite a blob with given sha1 hash, they get an error.
438
database : @database
439
cb : (err, ttl) =>
440
if err
441
resp = message.save_blob(sha1:opts.uuid, error:err)
442
@dbg("handle_blob: error! -- #{err}")
443
else
444
resp = message.save_blob(sha1:opts.uuid, ttl:ttl)
445
446
@local_hub_socket (err, socket) =>
447
if not err
448
socket.write_mesg('json', resp)
449
450
# Connection to the remote local_hub daemon that we use for control.
451
local_hub_socket: (cb) =>
452
if @_socket?
453
#@dbg("local_hub_socket: re-using existing socket")
454
cb(undefined, @_socket)
455
return
456
457
if @_local_hub_socket_connecting
458
@_local_hub_socket_queue.push(cb)
459
@dbg("local_hub_socket: added socket request to existing queue, which now has length #{@_local_hub_socket_queue.length}")
460
return
461
@_local_hub_socket_connecting = true
462
@_local_hub_socket_queue = [cb]
463
connecting_timer = undefined
464
465
cancel_connecting = () =>
466
@_local_hub_socket_connecting = false
467
if @_local_hub_socket_queue?
468
@dbg("local_hub_socket: canceled due to timeout")
469
for c in @_local_hub_socket_queue
470
c?('timeout')
471
delete @_local_hub_socket_queue
472
clearTimeout(connecting_timer)
473
474
# If below fails for 20s for some reason, cancel everything to allow for future attempt.
475
connecting_timer = setTimeout(cancel_connecting, 20000)
476
477
@dbg("local_hub_socket: getting new socket")
478
@new_socket (err, socket) =>
479
if not @_local_hub_socket_queue?
480
# already gave up.
481
return
482
@_local_hub_socket_connecting = false
483
@dbg("local_hub_socket: new_socket returned #{err}")
484
if err
485
for c in @_local_hub_socket_queue
486
c?(err)
487
delete @_local_hub_socket_queue
488
else
489
socket.on 'mesg', (type, mesg) =>
490
switch type
491
when 'blob'
492
@handle_blob(mesg)
493
when 'json'
494
@handle_mesg(mesg, socket)
495
496
socket.on('end', @free_resources)
497
socket.on('close', @free_resources)
498
socket.on('error', @free_resources)
499
500
# Send a hello message to the local hub, so it knows this is the control connection,
501
# and not something else (e.g., a console).
502
socket.write_mesg('json', {event:'hello'})
503
504
for c in @_local_hub_socket_queue
505
c?(undefined, socket)
506
delete @_local_hub_socket_queue
507
508
@_socket = socket
509
@init_heartbeat() # start sending heartbeat over this socket
510
511
# Finally, we wait a bit to see if the version gets sent from
512
# the client. If not, we set it to 0, which will cause a restart,
513
# which will upgrade to a new version that sends versions.
514
# TODO: This code can be deleted after all projects get restarted.
515
check_version_received = () =>
516
if @_socket? and not @smc_version?
517
@smc_version = 0
518
@restart_if_version_too_old()
519
setTimeout(check_version_received, 60*1000)
520
521
cancel_connecting()
522
523
# Get a new connection to the local_hub,
524
# authenticated via the secret_token, and enhanced
525
# to be able to send/receive json and blob messages.
526
new_socket: (cb) => # cb(err, socket)
527
@dbg("new_socket")
528
f = (cb) =>
529
if not @address?
530
cb("no address")
531
return
532
if not @address.port?
533
cb("no port")
534
return
535
if not @address.host?
536
cb("no host")
537
return
538
if not @address.secret_token?
539
cb("no secret_token")
540
return
541
connect_to_a_local_hub
542
port : @address.port
543
host : @address.ip ? @address.host # prefer @address.ip if it exists (e.g., for cocalc-kubernetes); otherwise use host (which is where compute server is).
544
secret_token : @address.secret_token
545
cb : cb
546
socket = undefined
547
async.series([
548
(cb) =>
549
if not @address?
550
@dbg("get address of a working local hub")
551
try
552
@address = await (await @projectControl(@project_id)).address()
553
cb()
554
catch err
555
cb(err)
556
else
557
cb()
558
(cb) =>
559
@dbg("try to connect to local hub socket using last known address")
560
f (err, _socket) =>
561
if not err
562
socket = _socket
563
cb()
564
else
565
@dbg("failed to get address of a working local hub -- #{err}")
566
try
567
@address = await (await @projectControl(@project_id)).address()
568
cb()
569
catch err
570
cb(err)
571
(cb) =>
572
if not socket?
573
@dbg("still don't have our connection -- try again")
574
f (err, _socket) =>
575
socket = _socket; cb(err)
576
else
577
cb()
578
], (err) =>
579
cb(err, socket)
580
)
581
582
remove_multi_response_listener: (id) =>
583
delete @call_callbacks[id]
584
585
call: (opts) =>
586
opts = defaults opts,
587
mesg : required
588
timeout : undefined # NOTE: a nonzero timeout MUST be specified, or we will not even listen for a response from the local hub! (Ensures leaking listeners won't happen.)
589
multi_response : false # if true, timeout ignored; call @remove_multi_response_listener(mesg.id) to remove
590
cb : undefined
591
@dbg("call")
592
if not opts.mesg.id?
593
if opts.timeout or opts.multi_response # opts.timeout being undefined or 0 both mean "don't do it"
594
opts.mesg.id = uuid.v4()
595
596
@local_hub_socket (err, socket) =>
597
if err
598
@dbg("call: failed to get socket -- #{err}")
599
opts.cb?(err)
600
return
601
@dbg("call: get socket -- now writing message to the socket -- #{misc.trunc(misc.to_json(opts.mesg),200)}")
602
socket.write_mesg 'json', opts.mesg, (err) =>
603
if err
604
@free_resources() # at least next time it will get a new socket
605
opts.cb?(err)
606
return
607
if opts.multi_response
608
@call_callbacks[opts.mesg.id] = opts.cb
609
else if opts.timeout
610
# Listen to exactly one response, them remove the listener:
611
@call_callbacks[opts.mesg.id] = (resp) =>
612
delete @call_callbacks[opts.mesg.id]
613
if resp.event == 'error'
614
opts.cb(resp.error)
615
else
616
opts.cb(undefined, resp)
617
# As mentioned above -- there's no else -- if not timeout then
618
# we do not listen for a response.
619
620
# Read a file from a project into memory on the hub.
621
# I think this is used only by the API, but not by browser clients anymore.
622
read_file: (opts) => # cb(err, content_of_file)
623
{path, project_id, archive, cb} = defaults opts,
624
path : required
625
project_id : required
626
archive : 'tar.bz2' # for directories; if directory, then the output object "data" has data.archive=actual extension used.
627
cb : required
628
@dbg("read_file '#{path}'")
629
socket = undefined
630
id = uuid.v4()
631
data = undefined
632
data_uuid = undefined
633
result_archive = undefined
634
635
async.series([
636
# Get a socket connection to the local_hub.
637
(cb) =>
638
@local_hub_socket (err, _socket) =>
639
if err
640
cb(err)
641
else
642
socket = _socket
643
cb()
644
(cb) =>
645
socket.write_mesg('json', message.read_file_from_project(id:id, project_id:project_id, path:path, archive:archive))
646
socket.recv_mesg
647
type : 'json'
648
id : id
649
timeout : 60
650
cb : (mesg) =>
651
switch mesg.event
652
when 'error'
653
cb(mesg.error)
654
when 'file_read_from_project'
655
data_uuid = mesg.data_uuid
656
result_archive = mesg.archive
657
cb()
658
else
659
cb("Unknown mesg event '#{mesg.event}'")
660
(cb) =>
661
socket.recv_mesg
662
type : 'blob'
663
id : data_uuid
664
timeout : 60
665
cb : (_data) =>
666
# recv_mesg returns either a Buffer blob
667
# *or* a {event:'error', error:'the error'} object.
668
# Fortunately `new Buffer().event` is valid (and undefined).
669
if _data.event == 'error'
670
cb(_data.error)
671
else
672
data = _data
673
data.archive = result_archive
674
cb()
675
], (err) =>
676
if err
677
cb(err)
678
else
679
cb(undefined, data)
680
)
681
682
# Write a file to a project
683
# I think this is used only by the API, but not by browser clients anymore.
684
write_file: (opts) => # cb(err)
685
{path, project_id, cb, data} = defaults opts,
686
path : required
687
project_id : required
688
data : required # what to write
689
cb : required
690
@dbg("write_file '#{path}'")
691
id = uuid.v4()
692
data_uuid = uuid.v4()
693
694
@local_hub_socket (err, socket) =>
695
if err
696
opts.cb(err)
697
return
698
mesg = message.write_file_to_project
699
id : id
700
project_id : project_id
701
path : path
702
data_uuid : data_uuid
703
socket.write_mesg('json', mesg)
704
socket.write_mesg('blob', {uuid:data_uuid, blob:data})
705
socket.recv_mesg
706
type : 'json'
707
id : id
708
timeout : 10
709
cb : (mesg) =>
710
switch mesg.event
711
when 'file_written_to_project'
712
opts.cb()
713
when 'error'
714
opts.cb(mesg.error)
715
else
716
opts.cb("unexpected message type '#{mesg.event}'")
717
718