CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres-blobs.coffee
Views: 687
1
#########################################################################
2
# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
# License: MS-RSL – see LICENSE.md for details
4
#########################################################################
5
6
###
7
PostgreSQL -- implementation of queries needed for storage and managing blobs,
8
including backups, integration with google cloud storage, etc.
9
10
COPYRIGHT : (c) 2017 SageMath, Inc.
11
LICENSE : MS-RSL
12
###
13
14
# Bucket used for cheaper longterm storage of blobs (outside of PostgreSQL).
15
# NOTE: We should add this to site configuration, and have it get read once when first
16
# needed and cached. Also it would be editable in admin account settings.
17
# If this env variable begins with a / it is assumed to be a path in the file system,
18
# e.g., a remote mount (in practice, we are using gcsfuse to mount gcloud buckets).
19
# If it is gs:// then it is a google cloud storage bucket.
20
COCALC_BLOB_STORE = process.env.COCALC_BLOB_STORE
21
22
async = require('async')
23
zlib = require('zlib')
24
fs = require('fs')
25
26
misc_node = require('@cocalc/backend/misc_node')
27
28
{defaults} = misc = require('@cocalc/util/misc')
29
required = defaults.required
30
31
{expire_time, one_result, all_results} = require('./postgres-base')
32
{delete_patches} = require('./postgres/delete-patches')
33
34
{filesystem_bucket} = require('./filesystem-bucket')
35
36
# some queries do searches, which could take a bit. we give them 5 minutes …
37
TIMEOUT_LONG_S = 300
38
39
exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext
40
save_blob: (opts) =>
41
opts = defaults opts,
42
uuid : undefined # uuid=sha1-based id coming from blob
43
blob : required # unless check=true, we assume misc_node.uuidsha1(opts.blob) == opts.uuid;
44
# blob must be a string or Buffer
45
ttl : 0 # object in blobstore will have *at least* this ttl in seconds;
46
# if there is already something in blobstore with longer ttl, we leave it;
47
# infinite ttl = 0.
48
project_id : required # the id of the project that is saving the blob
49
check : false # if true, will give error if misc_node.uuidsha1(opts.blob) != opts.uuid
50
compress : undefined # optional compression to use: 'gzip', 'zlib'; only used if blob not already in db.
51
level : -1 # compression level (if compressed) -- see https://github.com/expressjs/compression#level
52
cb : required # cb(err, ttl actually used in seconds); ttl=0 for infinite ttl
53
if not Buffer.isBuffer(opts.blob)
54
# CRITICAL: We assume everywhere below that opts.blob is a
55
# buffer, e.g., in the .toString('hex') method!
56
opts.blob = Buffer.from(opts.blob)
57
if not opts.uuid?
58
opts.uuid = misc_node.uuidsha1(opts.blob)
59
else if opts.check
60
uuid = misc_node.uuidsha1(opts.blob)
61
if uuid != opts.uuid
62
opts.cb("the sha1 uuid (='#{uuid}') of the blob must equal the given uuid (='#{opts.uuid}')")
63
return
64
if not misc.is_valid_uuid_string(opts.uuid)
65
opts.cb("uuid is invalid")
66
return
67
dbg = @_dbg("save_blob(uuid='#{opts.uuid}')")
68
dbg()
69
rows = ttl = undefined
70
async.series([
71
(cb) =>
72
@_query
73
query : 'SELECT expire FROM blobs'
74
where : "id = $::UUID" : opts.uuid
75
cb : (err, x) =>
76
rows = x?.rows; cb(err)
77
(cb) =>
78
if rows.length == 0 and opts.compress
79
dbg("compression requested and blob not already saved, so we compress blob")
80
switch opts.compress
81
when 'gzip'
82
zlib.gzip opts.blob, {level:opts.level}, (err, blob) =>
83
opts.blob = blob; cb(err)
84
when 'zlib'
85
zlib.deflate opts.blob, {level:opts.level}, (err, blob) =>
86
opts.blob = blob; cb(err)
87
else
88
cb("compression format '#{opts.compress}' not implemented")
89
else
90
cb()
91
(cb) =>
92
if rows.length == 0
93
dbg("nothing in DB, so we insert the blob.")
94
ttl = opts.ttl
95
@_query
96
query : "INSERT INTO blobs"
97
values :
98
id : opts.uuid
99
blob : '\\x'+opts.blob.toString('hex')
100
project_id : opts.project_id
101
count : 0
102
size : opts.blob.length
103
created : new Date()
104
compress : opts.compress
105
expire : if ttl then expire_time(ttl)
106
cb : cb
107
else
108
dbg("blob already in the DB, so see if we need to change the expire time")
109
@_extend_blob_ttl
110
expire : rows[0].expire
111
ttl : opts.ttl
112
uuid : opts.uuid
113
cb : (err, _ttl) =>
114
ttl = _ttl; cb(err)
115
(cb) =>
116
# double check that the blob definitely exists and has correct expire
117
# See discussion at https://github.com/sagemathinc/cocalc/issues/7715
118
# The problem is that maybe with VERY low probability somehow we extend
119
# the blob ttl at the same time that we're deleting blobs and the extend
120
# is too late and does an empty update.
121
@_query
122
query : 'SELECT expire FROM blobs'
123
where : "id = $::UUID" : opts.uuid
124
cb : (err, x) =>
125
if err
126
cb(err)
127
return
128
# some consistency checks
129
rows = x?.rows
130
if rows.length == 0
131
cb("blob got removed while saving it")
132
return
133
if !opts.ttl and rows[0].expire
134
cb("blob should have infinite ttl but it has expire set")
135
return
136
cb()
137
138
], (err) => opts.cb(err, ttl))
139
140
# Used internally by save_blob to possibly extend the expire time of a blob.
141
_extend_blob_ttl : (opts) =>
142
opts = defaults opts,
143
expire : undefined # what expire is currently set to in the database
144
ttl : required # requested ttl -- extend expire to at least this
145
uuid : required
146
cb : required # (err, effective ttl (with 0=oo))
147
if not misc.is_valid_uuid_string(opts.uuid)
148
opts.cb("uuid is invalid")
149
return
150
if not opts.expire
151
# ttl already infinite -- nothing to do
152
opts.cb(undefined, 0)
153
return
154
new_expire = ttl = undefined
155
if opts.ttl
156
# saved ttl is finite as is requested one; change in DB if requested is longer
157
z = expire_time(opts.ttl)
158
if z > opts.expire
159
new_expire = z
160
ttl = opts.ttl
161
else
162
ttl = (opts.expire - new Date())/1000.0
163
else
164
# saved ttl is finite but requested one is infinite
165
ttl = new_expire = 0
166
if new_expire?
167
# change the expire time for the blob already in the DB
168
@_query
169
query : 'UPDATE blobs'
170
where : "id = $::UUID" : opts.uuid
171
set : "expire :: TIMESTAMP " : if new_expire == 0 then undefined else new_expire
172
cb : (err) => opts.cb(err, ttl)
173
else
174
opts.cb(undefined, ttl)
175
176
get_blob: (opts) =>
177
opts = defaults opts,
178
uuid : required
179
save_in_db : false # if true and blob isn't in DB and is only in gcloud, copies to local DB
180
# (for faster access e.g., 20ms versus 5ms -- i.e., not much faster; gcloud is FAST too.)
181
touch : true
182
cb : required # cb(err) or cb(undefined, blob_value) or cb(undefined, undefined) in case no such blob
183
if not misc.is_valid_uuid_string(opts.uuid)
184
opts.cb("uuid is invalid")
185
return
186
x = undefined
187
blob = undefined
188
async.series([
189
(cb) =>
190
@_query
191
query : "SELECT expire, blob, gcloud, compress FROM blobs"
192
where : "id = $::UUID" : opts.uuid
193
cb : one_result (err, _x) =>
194
x = _x; cb(err)
195
(cb) =>
196
if not x?
197
# nothing to do -- blob not in db (probably expired)
198
cb()
199
else if x.expire and x.expire <= new Date()
200
# the blob already expired -- background delete it
201
@_query # delete it (but don't wait for this to finish)
202
query : "DELETE FROM blobs"
203
where : "id = $::UUID" : opts.uuid
204
cb()
205
else if x.blob?
206
# blob not expired and is in database
207
blob = x.blob
208
cb()
209
else if x.gcloud
210
if not COCALC_BLOB_STORE?
211
cb("no blob store configured -- set the COCALC_BLOB_STORE env variable")
212
return
213
# blob not available locally, but should be in a Google cloud storage bucket -- try to get it
214
# NOTE: we now ignore the actual content of x.gcloud -- we don't support spreading blobs
215
# across multiple buckets... as it isn't needed because buckets are infinite, and it
216
# is potentially confusing to manage.
217
@blob_store().read
218
name : opts.uuid
219
cb : (err, _blob) =>
220
if err
221
cb(err)
222
else
223
blob = _blob
224
cb()
225
if opts.save_in_db
226
# also save in database so will be faster next time (again, don't wait on this)
227
@_query # delete it (but don't wait for this to finish)
228
query : "UPDATE blobs"
229
set : {blob : blob}
230
where : "id = $::UUID" : opts.uuid
231
else
232
# blob not local and not in gcloud -- this shouldn't happen
233
# (just view this as "expired" by not setting blob)
234
cb()
235
(cb) =>
236
if not blob? or not x?.compress?
237
cb(); return
238
# blob is compressed -- decompress it
239
switch x.compress
240
when 'gzip'
241
zlib.gunzip blob, (err, _blob) =>
242
blob = _blob; cb(err)
243
when 'zlib'
244
zlib.inflate blob, (err, _blob) =>
245
blob = _blob; cb(err)
246
else
247
cb("compression format '#{x.compress}' not implemented")
248
], (err) =>
249
opts.cb(err, blob)
250
if blob? and opts.touch
251
# blob was pulled from db or gcloud, so note that it was accessed (updates a counter)
252
@touch_blob(uuid : opts.uuid)
253
)
254
255
touch_blob: (opts) =>
256
opts = defaults opts,
257
uuid : required
258
cb : undefined
259
if not misc.is_valid_uuid_string(opts.uuid)
260
opts.cb?("uuid is invalid")
261
return
262
@_query
263
query : "UPDATE blobs SET count = count + 1, last_active = NOW()"
264
where : "id = $::UUID" : opts.uuid
265
cb : opts.cb
266
267
blob_store: (bucket) =>
268
if not bucket
269
bucket = COCALC_BLOB_STORE
270
# File system -- could be a big NFS volume, remotely mounted gcsfuse, or just
271
# a single big local file system -- etc. -- we don't care.
272
return filesystem_bucket(name: bucket)
273
274
# Uploads the blob with given sha1 uuid to gcloud storage, if it hasn't already
275
# been uploaded there. Actually we copy to a directory, which uses gcsfuse to
276
# implicitly upload to gcloud...
277
copy_blob_to_gcloud: (opts) =>
278
opts = defaults opts,
279
uuid : required # uuid=sha1-based uuid coming from blob
280
bucket : COCALC_BLOB_STORE # name of bucket
281
force : false # if true, upload even if already uploaded
282
remove : false # if true, deletes blob from database after successful upload to gcloud (to free space)
283
cb : undefined # cb(err)
284
dbg = @_dbg("copy_blob_to_gcloud(uuid='#{opts.uuid}')")
285
dbg()
286
if not misc.is_valid_uuid_string(opts.uuid)
287
dbg("invalid uuid")
288
opts.cb?("uuid is invalid")
289
return
290
if not opts.bucket
291
dbg("invalid bucket")
292
opts.cb?("no blob store configured -- set the COCALC_BLOB_STORE env variable")
293
return
294
locals =
295
x: undefined
296
async.series([
297
(cb) =>
298
dbg("get blob info from database")
299
@_query
300
query : "SELECT blob, gcloud FROM blobs"
301
where : "id = $::UUID" : opts.uuid
302
cb : one_result (err, x) =>
303
locals.x = x
304
if err
305
cb(err)
306
else if not x?
307
cb('no such blob')
308
else if not x.blob and not x.gcloud
309
cb('blob not available -- this should not be possible')
310
else if not x.blob and opts.force
311
cb("blob can't be re-uploaded since it was already deleted")
312
else
313
cb()
314
(cb) =>
315
if (locals.x.gcloud? and not opts.force) or not locals.x.blob?
316
dbg("already uploaded -- don't need to do anything; or already deleted locally")
317
cb(); return
318
# upload to Google cloud storage
319
locals.bucket = @blob_store(opts.bucket)
320
locals.bucket.write
321
name : opts.uuid
322
content : locals.x.blob
323
cb : cb
324
(cb) =>
325
if (locals.x.gcloud? and not opts.force) or not locals.x.blob?
326
# already uploaded -- don't need to do anything; or already deleted locally
327
cb(); return
328
dbg("read blob back and compare") # -- we do *NOT* trust GCS with such important data
329
locals.bucket.read
330
name : opts.uuid
331
cb : (err, data) =>
332
if err
333
cb(err)
334
else if not locals.x.blob.equals(data)
335
dbg("FAILED!")
336
cb("BLOB write to GCS failed check!")
337
else
338
dbg("check succeeded")
339
cb()
340
(cb) =>
341
if not locals.x.blob?
342
# no blob in db; nothing further to do.
343
cb()
344
else
345
# We successful upload to gcloud -- set locals.x.gcloud
346
set = {gcloud: opts.bucket}
347
if opts.remove
348
set.blob = null # remove blob content from database to save space
349
@_query
350
query : "UPDATE blobs"
351
where : "id = $::UUID" : opts.uuid
352
set : set
353
cb : cb
354
], (err) => opts.cb?(err))
355
356
###
357
Backup limit blobs that previously haven't been dumped to blobs, and put them in
358
a tarball in the given path. The tarball's name is the time when the backup starts.
359
The tarball is compressed using gzip compression.
360
361
db._error_thresh=1e6; db.backup_blobs_to_tarball(limit:10000,path:'/backup/tmp-blobs',repeat_until_done:60, cb:done())
362
363
I have not written code to restore from these tarballs. Assuming the database has been restored,
364
so there is an entry in the blobs table for each blob, it would suffice to upload the tarballs,
365
then copy their contents straight into the COCALC_BLOB_STORE, and that’s it.
366
If we don't have the blobs table in the DB, make dummy entries from the blob names in the tarballs.
367
###
368
backup_blobs_to_tarball: (opts) =>
369
opts = defaults opts,
370
limit : 10000 # number of blobs to backup
371
path : required # path where [timestamp].tar file is placed
372
throttle : 0 # wait this many seconds between pulling blobs from database
373
repeat_until_done : 0 # if positive, keeps re-call'ing this function until no more
374
# results to backup (pauses this many seconds between)
375
map_limit : 5
376
cb : undefined# cb(err, '[timestamp].tar')
377
dbg = @_dbg("backup_blobs_to_tarball(limit=#{opts.limit},path='#{opts.path}')")
378
join = require('path').join
379
dir = misc.date_to_snapshot_format(new Date())
380
target = join(opts.path, dir)
381
tarball = target + '.tar.gz'
382
v = undefined
383
to_remove = []
384
async.series([
385
(cb) =>
386
dbg("make target='#{target}'")
387
fs.mkdir(target, cb)
388
(cb) =>
389
dbg("get blobs that we need to back up")
390
@_query
391
query : "SELECT id FROM blobs"
392
where : "expire IS NULL and backup IS NOT true"
393
limit : opts.limit
394
timeout_s : TIMEOUT_LONG_S
395
cb : all_results 'id', (err, x) =>
396
v = x; cb(err)
397
(cb) =>
398
dbg("backing up #{v.length} blobs")
399
f = (id, cb) =>
400
@get_blob
401
uuid : id
402
touch : false
403
cb : (err, blob) =>
404
if err
405
dbg("ERROR! blob #{id} -- #{err}")
406
cb(err)
407
else if blob?
408
dbg("got blob #{id} from db -- now write to disk")
409
to_remove.push(id)
410
fs.writeFile join(target, id), blob, (err) =>
411
if opts.throttle
412
setTimeout(cb, opts.throttle*1000)
413
else
414
cb()
415
else
416
dbg("blob #{id} is expired, so nothing to be done, ever.")
417
cb()
418
async.mapLimit(v, opts.map_limit, f, cb)
419
(cb) =>
420
dbg("successfully wrote all blobs to files; now make tarball")
421
misc_node.execute_code
422
command : 'tar'
423
args : ['zcvf', tarball, dir]
424
path : opts.path
425
timeout : 3600
426
cb : cb
427
(cb) =>
428
dbg("remove temporary blobs")
429
f = (x, cb) =>
430
fs.unlink(join(target, x), cb)
431
async.mapLimit(to_remove, 10, f, cb)
432
(cb) =>
433
dbg("remove temporary directory")
434
fs.rmdir(target, cb)
435
(cb) =>
436
dbg("backup succeeded completely -- mark all blobs as backed up")
437
@_query
438
query : "UPDATE blobs"
439
set : {backup: true}
440
where : "id = ANY($)" : v
441
cb : cb
442
], (err) =>
443
if err
444
dbg("ERROR: #{err}")
445
opts.cb?(err)
446
else
447
dbg("done")
448
if opts.repeat_until_done and to_remove.length == opts.limit
449
f = () =>
450
@backup_blobs_to_tarball(opts)
451
setTimeout(f, opts.repeat_until_done*1000)
452
else
453
opts.cb?(undefined, tarball)
454
)
455
456
###
457
Copied all blobs that will never expire to a google cloud storage bucket.
458
459
errors={}; db.copy_all_blobs_to_gcloud(limit:500, cb:done(), remove:true, repeat_until_done_s:10, errors:errors)
460
###
461
copy_all_blobs_to_gcloud: (opts) =>
462
opts = defaults opts,
463
bucket : COCALC_BLOB_STORE
464
limit : 1000 # copy this many in each batch
465
map_limit : 1 # copy this many at once.
466
throttle : 0 # wait this many seconds between uploads
467
repeat_until_done_s : 0 # if nonzero, waits this many seconds, then calls this function again until nothing gets uploaded.
468
errors : undefined # object: used to accumulate errors -- if not given, then everything will terminate on first error
469
remove : false
470
cutoff : '1 month' # postgresql interval - only copy blobs to gcloud that haven't been accessed at least this long.
471
cb : required
472
dbg = @_dbg("copy_all_blobs_to_gcloud")
473
dbg()
474
# This query selects the blobs that will never expire, but have not yet
475
# been copied to Google cloud storage.
476
dbg("getting blob id's...")
477
@_query
478
query : 'SELECT id, size FROM blobs'
479
where : "expire IS NULL AND gcloud IS NULL and (last_active <= NOW() - INTERVAL '#{opts.cutoff}' OR last_active IS NULL)"
480
limit : opts.limit
481
timeout_s : TIMEOUT_LONG_S
482
## order_by : 'id' # this is not important and was causing VERY excessive load in production (due to bad query plannnig?!)
483
cb : all_results (err, v) =>
484
if err
485
dbg("fail: #{err}")
486
opts.cb(err)
487
else
488
n = v.length; m = 0
489
dbg("got #{n} blob id's")
490
f = (x, cb) =>
491
m += 1
492
k = m; start = new Date()
493
dbg("**** #{k}/#{n}: uploading #{x.id} of size #{x.size/1000}KB")
494
@copy_blob_to_gcloud
495
uuid : x.id
496
bucket : opts.bucket
497
remove : opts.remove
498
cb : (err) =>
499
dbg("**** #{k}/#{n}: finished -- #{err}; size #{x.size/1000}KB; time=#{new Date() - start}ms")
500
if err
501
if opts.error?
502
opts.errors[x.id] = err
503
else
504
cb(err)
505
if opts.throttle
506
setTimeout(cb, 1000*opts.throttle)
507
else
508
cb()
509
async.mapLimit v, opts.map_limit, f, (err) =>
510
dbg("finished this round -- #{err}")
511
if err and not opts.errors?
512
opts.cb(err)
513
return
514
if opts.repeat_until_done_s and v.length > 0
515
dbg("repeat_until_done triggering another round")
516
setTimeout((=> @copy_all_blobs_to_gcloud(opts)), opts.repeat_until_done_s*1000)
517
else
518
dbg("done : #{misc.to_json(opts.errors)}")
519
opts.cb(if misc.len(opts.errors) > 0 then opts.errors)
520
521
blob_maintenance: (opts) =>
522
opts = defaults opts,
523
path : '/backup/blobs'
524
map_limit : 1
525
blobs_per_tarball : 10000
526
throttle : 0
527
cb : undefined
528
dbg = @_dbg("blob_maintenance()")
529
dbg()
530
async.series([
531
(cb) =>
532
dbg("maintain the patches and syncstrings")
533
@syncstring_maintenance
534
repeat_until_done : true
535
limit : 500
536
map_limit : opts.map_limit
537
delay : 1000 # 1s, since syncstring_maintence heavily loads db
538
cb : cb
539
(cb) =>
540
dbg("backup_blobs_to_tarball")
541
@backup_blobs_to_tarball
542
throttle : opts.throttle
543
limit : opts.blobs_per_tarball
544
path : opts.path
545
map_limit : opts.map_limit
546
repeat_until_done : 5
547
cb : cb
548
(cb) =>
549
dbg("copy_all_blobs_to_gcloud")
550
errors = {}
551
@copy_all_blobs_to_gcloud
552
limit : 1000
553
repeat_until_done_s : 5
554
errors : errors
555
remove : true
556
map_limit : opts.map_limit
557
throttle : opts.throttle
558
cb : (err) =>
559
if misc.len(errors) > 0
560
dbg("errors! #{misc.to_json(errors)}")
561
cb(err)
562
], (err) =>
563
opts.cb?(err)
564
)
565
566
remove_blob_ttls: (opts) =>
567
opts = defaults opts,
568
uuids : required # uuid=sha1-based from blob
569
cb : required # cb(err)
570
@_query
571
query : "UPDATE blobs"
572
set : {expire: null}
573
where : "id::UUID = ANY($)" : (x for x in opts.uuids when misc.is_valid_uuid_string(x))
574
cb : opts.cb
575
576
# If blob has been copied to gcloud, remove the BLOB part of the data
577
# from the database (to save space). If not copied, copy it to gcloud,
578
# then remove from database.
579
close_blob: (opts) =>
580
opts = defaults opts,
581
uuid : required # uuid=sha1-based from blob
582
bucket : COCALC_BLOB_STORE
583
cb : undefined # cb(err)
584
if not misc.is_valid_uuid_string(opts.uuid)
585
opts.cb?("uuid is invalid")
586
return
587
async.series([
588
(cb) =>
589
# ensure blob is in gcloud
590
@_query
591
query : 'SELECT gcloud FROM blobs'
592
where : 'id = $::UUID' : opts.uuid
593
cb : one_result 'gcloud', (err, gcloud) =>
594
if err
595
cb(err)
596
else if not gcloud
597
# not yet copied to gcloud storage
598
@copy_blob_to_gcloud
599
uuid : opts.uuid
600
bucket : opts.bucket
601
cb : cb
602
else
603
# copied already
604
cb()
605
(cb) =>
606
# now blob is in gcloud -- delete blob data in database
607
@_query
608
query : 'SELECT gcloud FROM blobs'
609
where : 'id = $::UUID' : opts.uuid
610
set : {blob: null}
611
cb : cb
612
], (err) => opts.cb?(err))
613
614
615
616
###
617
# Syncstring maintenance
618
###
619
syncstring_maintenance: (opts) =>
620
opts = defaults opts,
621
age_days : 30 # archive patches of syncstrings that are inactive for at least this long
622
map_limit : 1 # how much parallelism to use
623
limit : 1000 # do only this many
624
repeat_until_done : true
625
delay : 0
626
cb : undefined
627
dbg = @_dbg("syncstring_maintenance")
628
dbg(opts)
629
syncstrings = undefined
630
async.series([
631
(cb) =>
632
dbg("determine inactive syncstring ids")
633
@_query
634
query : 'SELECT string_id FROM syncstrings'
635
where : [{'last_active <= $::TIMESTAMP' : misc.days_ago(opts.age_days)}, 'archived IS NULL', 'huge IS NOT TRUE']
636
limit : opts.limit
637
timeout_s : TIMEOUT_LONG_S
638
cb : all_results 'string_id', (err, v) =>
639
syncstrings = v
640
cb(err)
641
(cb) =>
642
dbg("archive patches for inactive syncstrings")
643
i = 0
644
f = (string_id, cb) =>
645
i += 1
646
console.log("*** #{i}/#{syncstrings.length}: archiving string #{string_id} ***")
647
@archive_patches
648
string_id : string_id
649
cb : (err) ->
650
if err or not opts.delay
651
cb(err)
652
else
653
setTimeout(cb, opts.delay)
654
async.mapLimit(syncstrings, opts.map_limit, f, cb)
655
], (err) =>
656
if err
657
opts.cb?(err)
658
else if opts.repeat_until_done and syncstrings.length == opts.limit
659
dbg("doing it again")
660
@syncstring_maintenance(opts)
661
else
662
opts.cb?()
663
)
664
665
# Offlines and archives the patch, unless the string is active very recently, in
666
# which case this is a no-op.
667
#
668
# TODO: this ignores all syncstrings marked as "huge:true", because the patches are too large.
669
# come up with a better strategy (incremental?) to generate the blobs to avoid the problem.
670
archive_patches: (opts) =>
671
opts = defaults opts,
672
string_id : required
673
compress : 'zlib'
674
level : -1 # the default
675
cutoff : misc.minutes_ago(30) # never touch anything this new
676
cb : undefined
677
dbg = @_dbg("archive_patches(string_id='#{opts.string_id}')")
678
syncstring = patches = blob_uuid = project_id = last_active = huge = undefined
679
where = {"string_id = $::CHAR(40)" : opts.string_id}
680
async.series([
681
(cb) =>
682
dbg("get syncstring info")
683
@_query
684
query : "SELECT project_id, archived, last_active, huge FROM syncstrings"
685
where : where
686
cb : one_result (err, x) =>
687
if err
688
cb(err)
689
else if not x?
690
cb("no such syncstring with id '#{opts.string_id}'")
691
else if x.archived
692
cb("string_id='#{opts.string_id}' already archived as blob id '#{x.archived}'")
693
else
694
project_id = x.project_id
695
last_active = x.last_active
696
huge = !!x.huge
697
dbg("got last_active=#{last_active} project_id=#{project_id} huge=#{huge}")
698
cb()
699
(cb) =>
700
if last_active? and last_active >= opts.cutoff
701
dbg("excluding due to cutoff")
702
cb(); return
703
if huge
704
dbg("excluding due to being huge")
705
cb(); return
706
dbg("get patches")
707
@export_patches
708
string_id : opts.string_id
709
cb : (err, x) =>
710
patches = x
711
cb(err)
712
(cb) =>
713
if last_active? and last_active >= opts.cutoff
714
cb(); return
715
if huge
716
cb(); return
717
dbg("create blob from patches")
718
try
719
blob = Buffer.from(JSON.stringify(patches))
720
catch err
721
# TODO: This *will* happen if the total length of all patches is too big.
722
# need to break patches up...
723
# This is not exactly the end of the world as the entire point of all this is to
724
# just save some space in the database...
725
dbg('error creating blob, marking syncstring as being "huge": ' + err)
726
huge = true
727
@_query
728
query : "UPDATE syncstrings"
729
set : {huge : true}
730
where : where
731
cb : (err) =>
732
cb(err)
733
return
734
if not huge
735
dbg('save blob')
736
blob_uuid = misc_node.uuidsha1(blob)
737
@save_blob
738
uuid : blob_uuid
739
blob : blob
740
project_id : project_id
741
compress : opts.compress
742
level : opts.level
743
cb : cb
744
(cb) =>
745
if last_active? and last_active >= opts.cutoff
746
cb(); return
747
if huge
748
cb(); return
749
dbg("update syncstring to indicate patches have been archived in a blob")
750
@_query
751
query : "UPDATE syncstrings"
752
set : {archived : blob_uuid}
753
where : where
754
cb : cb
755
(cb) =>
756
if last_active? and last_active >= opts.cutoff
757
cb(); return
758
if huge
759
cb(); return
760
dbg("actually deleting patches")
761
delete_patches(db:@, string_id: opts.string_id, cb:cb)
762
], (err) => opts.cb?(err))
763
764
unarchive_patches: (opts) =>
765
opts = defaults opts,
766
string_id : required
767
cb : undefined
768
dbg = @_dbg("unarchive_patches(string_id='#{opts.string_id}')")
769
where = {"string_id = $::CHAR(40)" : opts.string_id}
770
@_query
771
query : "SELECT archived FROM syncstrings"
772
where : where
773
cb : one_result 'archived', (err, blob_uuid) =>
774
if err or not blob_uuid?
775
opts.cb?(err)
776
return
777
blob = undefined
778
async.series([
779
#(cb) =>
780
# For testing only!
781
# setTimeout(cb, 7000)
782
(cb) =>
783
dbg("download blob")
784
@get_blob
785
uuid : blob_uuid
786
cb : (err, x) =>
787
if err
788
cb(err)
789
else if not x?
790
cb("blob is gone")
791
else
792
blob = x
793
cb(err)
794
(cb) =>
795
dbg("extract blob")
796
try
797
patches = JSON.parse(blob)
798
catch e
799
cb("corrupt patches blob -- #{e}")
800
return
801
@import_patches
802
patches : patches
803
cb : cb
804
(cb) =>
805
async.parallel([
806
(cb) =>
807
dbg("update syncstring to indicate that patches are now available")
808
@_query
809
query : "UPDATE syncstrings SET archived=NULL"
810
where : where
811
cb : cb
812
(cb) =>
813
dbg('delete blob, which is no longer needed')
814
@delete_blob
815
uuid : blob_uuid
816
cb : cb
817
], cb)
818
], (err) => opts.cb?(err))
819
820
###
821
Export/import of syncstring history and info. Right now used mainly for debugging
822
purposes, but will obviously be useful for a user-facing feature involving import
823
and export (and copying) of complete edit history.
824
###
825
export_patches: (opts) =>
826
opts = defaults opts,
827
string_id : required
828
cb : required # cb(err, array)
829
@_query
830
query : "SELECT * FROM patches"
831
where : {"string_id = $::CHAR(40)" : opts.string_id}
832
cb : all_results (err, patches) =>
833
if err
834
opts.cb(err)
835
else
836
opts.cb(undefined, patches)
837
838
import_patches: (opts) =>
839
opts = defaults opts,
840
patches : required # array as exported by export_patches
841
string_id : undefined # if given, change the string_id when importing the patches to this
842
cb : undefined
843
patches = opts.patches
844
if patches.length == 0 # easy
845
opts.cb?()
846
return
847
if patches[0].id?
848
# convert from OLD RethinkDB format!
849
v = []
850
for x in patches
851
patch =
852
string_id : x.id[0]
853
time : new Date(x.id[1])
854
user_id : x.user
855
patch : x.patch
856
snapshot : x.snapshot
857
sent : x.sent
858
prev : x.prev
859
v.push(patch)
860
patches = v
861
# change string_id, if requested.
862
if opts.string_id?
863
for x in patches
864
x.string_id = opts.string_id
865
# We break into blocks since there is limit (about 65K) on
866
# number of params that can be inserted in a single query.
867
insert_block_size = 1000
868
f = (i, cb) =>
869
@_query
870
query : 'INSERT INTO patches'
871
values : patches.slice(insert_block_size*i, insert_block_size*(i+1))
872
conflict : 'ON CONFLICT DO NOTHING' # in case multiple servers (or this server) are doing this import at once -- this can and does happen sometimes.
873
cb : cb
874
async.mapSeries([0...patches.length/insert_block_size], f, (err) => opts.cb?(err))
875
876
delete_blob: (opts) =>
877
opts = defaults opts,
878
uuid : required
879
cb : undefined
880
if not misc.is_valid_uuid_string(opts.uuid)
881
opts.cb?("uuid is invalid")
882
return
883
gcloud = undefined
884
dbg = @_dbg("delete_blob(uuid='#{opts.uuid}')")
885
async.series([
886
(cb) =>
887
dbg("check if blob in gcloud")
888
@_query
889
query : "SELECT gcloud FROM blobs"
890
where : "id = $::UUID" : opts.uuid
891
cb : one_result 'gcloud', (err, x) =>
892
gcloud = x
893
cb(err)
894
(cb) =>
895
if not gcloud or not COCALC_BLOB_STORE
896
cb()
897
return
898
dbg("delete from gcloud")
899
@blob_store(gcloud).delete
900
name : opts.uuid
901
cb : cb
902
(cb) =>
903
dbg("delete from local database")
904
@_query
905
query : "DELETE FROM blobs"
906
where : "id = $::UUID" : opts.uuid
907
cb : cb
908
], (err) => opts.cb?(err))
909
910
911
912