Contact Us!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres-blobs.coffee
Views: 1107
1
#########################################################################
2
# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
# License: MS-RSL – see LICENSE.md for details
4
#########################################################################
5
6
###
7
PostgreSQL -- implementation of queries needed for storage and managing blobs,
8
including backups, integration with google cloud storage, etc.
9
10
COPYRIGHT : (c) 2017 SageMath, Inc.
11
LICENSE : MS-RSL
12
###
13
14
# Bucket used for cheaper longterm storage of blobs (outside of PostgreSQL).
15
# NOTE: We should add this to site configuration, and have it get read once when first
16
# needed and cached. Also it would be editable in admin account settings.
17
# If this env variable begins with a / it is assumed to be a path in the file system,
18
# e.g., a remote mount (in practice, we are using gcsfuse to mount gcloud buckets).
19
# If it is gs:// then it is a google cloud storage bucket.
20
# 2025-01-10: noticed rarely this variable is not set, at least not initially after startup.
21
# Hardcoding the path, which has never changed anyways.
22
# Maybe https://github.com/nodejs/help/issues/3618
23
COCALC_BLOB_STORE_FALLBACK = "/blobs"
24
COCALC_BLOB_STORE = String(process.env.COCALC_BLOB_STORE ? COCALC_BLOB_STORE_FALLBACK)
25
26
async = require('async')
27
zlib = require('zlib')
28
fs = require('fs')
29
30
misc_node = require('@cocalc/backend/misc_node')
31
32
{defaults} = misc = require('@cocalc/util/misc')
33
required = defaults.required
34
35
{expire_time, one_result, all_results} = require('./postgres-base')
36
{delete_patches} = require('./postgres/delete-patches')
37
blobs = require('./postgres/blobs')
38
39
{filesystem_bucket} = require('./filesystem-bucket')
40
41
# some queries do searches, which could take a bit. we give them 5 minutes …
42
TIMEOUT_LONG_S = 300
43
44
exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext
45
save_blob: (opts) =>
46
opts = defaults opts,
47
uuid : undefined # uuid=sha1-based id coming from blob
48
blob : required # unless check=true, we assume misc_node.uuidsha1(opts.blob) == opts.uuid;
49
# blob must be a string or Buffer
50
ttl : 0 # object in blobstore will have *at least* this ttl in seconds;
51
# if there is already something in blobstore with longer ttl, we leave it;
52
# infinite ttl = 0.
53
project_id : undefined # the id of the project that is saving the blob
54
account_id : undefined # the id of the user that is saving the blob
55
check : false # if true, will give error if misc_node.uuidsha1(opts.blob) != opts.uuid
56
compress : undefined # optional compression to use: 'gzip', 'zlib'; only used if blob not already in db.
57
level : -1 # compression level (if compressed) -- see https://github.com/expressjs/compression#level
58
cb : required # cb(err, ttl actually used in seconds); ttl=0 for infinite ttl
59
if not Buffer.isBuffer(opts.blob)
60
# CRITICAL: We assume everywhere below that opts.blob is a
61
# buffer, e.g., in the .toString('hex') method!
62
opts.blob = Buffer.from(opts.blob)
63
if not opts.uuid?
64
opts.uuid = misc_node.uuidsha1(opts.blob)
65
else if opts.check
66
uuid = misc_node.uuidsha1(opts.blob)
67
if uuid != opts.uuid
68
opts.cb("the sha1 uuid (='#{uuid}') of the blob must equal the given uuid (='#{opts.uuid}')")
69
return
70
if not misc.is_valid_uuid_string(opts.uuid)
71
opts.cb("uuid is invalid")
72
return
73
dbg = @_dbg("save_blob(uuid='#{opts.uuid}')")
74
dbg()
75
rows = ttl = undefined
76
async.series([
77
(cb) =>
78
@_query
79
query : 'SELECT expire FROM blobs'
80
where : "id = $::UUID" : opts.uuid
81
cb : (err, x) =>
82
rows = x?.rows; cb(err)
83
(cb) =>
84
if rows.length == 0 and opts.compress
85
dbg("compression requested and blob not already saved, so we compress blob")
86
switch opts.compress
87
when 'gzip'
88
zlib.gzip opts.blob, {level:opts.level}, (err, blob) =>
89
opts.blob = blob; cb(err)
90
when 'zlib'
91
zlib.deflate opts.blob, {level:opts.level}, (err, blob) =>
92
opts.blob = blob; cb(err)
93
else
94
cb("compression format '#{opts.compress}' not implemented")
95
else
96
cb()
97
(cb) =>
98
if rows.length == 0
99
dbg("nothing in DB, so we insert the blob.")
100
ttl = opts.ttl
101
@_query
102
query : "INSERT INTO blobs"
103
values :
104
id : opts.uuid
105
blob : '\\x'+opts.blob.toString('hex')
106
project_id : opts.project_id
107
account_id : opts.account_id
108
count : 0
109
size : opts.blob.length
110
created : new Date()
111
compress : opts.compress
112
expire : if ttl then expire_time(ttl)
113
cb : cb
114
else
115
dbg("blob already in the DB, so see if we need to change the expire time")
116
@_extend_blob_ttl
117
expire : rows[0].expire
118
ttl : opts.ttl
119
uuid : opts.uuid
120
cb : (err, _ttl) =>
121
ttl = _ttl; cb(err)
122
(cb) =>
123
# double check that the blob definitely exists and has correct expire
124
# See discussion at https://github.com/sagemathinc/cocalc/issues/7715
125
# The problem is that maybe with VERY low probability somehow we extend
126
# the blob ttl at the same time that we're deleting blobs and the extend
127
# is too late and does an empty update.
128
@_query
129
query : 'SELECT expire FROM blobs'
130
where : "id = $::UUID" : opts.uuid
131
cb : (err, x) =>
132
if err
133
cb(err)
134
return
135
# some consistency checks
136
rows = x?.rows
137
if rows.length == 0
138
cb("blob got removed while saving it")
139
return
140
if !opts.ttl and rows[0].expire
141
cb("blob should have infinite ttl but it has expire set")
142
return
143
cb()
144
145
], (err) => opts.cb(err, ttl))
146
147
# Used internally by save_blob to possibly extend the expire time of a blob.
148
_extend_blob_ttl : (opts) =>
149
opts = defaults opts,
150
expire : undefined # what expire is currently set to in the database
151
ttl : required # requested ttl -- extend expire to at least this
152
uuid : required
153
cb : required # (err, effective ttl (with 0=oo))
154
if not misc.is_valid_uuid_string(opts.uuid)
155
opts.cb("uuid is invalid")
156
return
157
if not opts.expire
158
# ttl already infinite -- nothing to do
159
opts.cb(undefined, 0)
160
return
161
new_expire = ttl = undefined
162
if opts.ttl
163
# saved ttl is finite as is requested one; change in DB if requested is longer
164
z = expire_time(opts.ttl)
165
if z > opts.expire
166
new_expire = z
167
ttl = opts.ttl
168
else
169
ttl = (opts.expire - new Date())/1000.0
170
else
171
# saved ttl is finite but requested one is infinite
172
ttl = new_expire = 0
173
if new_expire?
174
# change the expire time for the blob already in the DB
175
@_query
176
query : 'UPDATE blobs'
177
where : "id = $::UUID" : opts.uuid
178
set : "expire :: TIMESTAMP " : if new_expire == 0 then undefined else new_expire
179
cb : (err) => opts.cb(err, ttl)
180
else
181
opts.cb(undefined, ttl)
182
183
get_blob: (opts) =>
184
opts = defaults opts,
185
uuid : required
186
save_in_db : false # if true and blob isn't in DB and is only in gcloud, copies to local DB
187
# (for faster access e.g., 20ms versus 5ms -- i.e., not much faster; gcloud is FAST too.)
188
touch : true
189
cb : required # cb(err) or cb(undefined, blob_value) or cb(undefined, undefined) in case no such blob
190
if not misc.is_valid_uuid_string(opts.uuid)
191
opts.cb("uuid is invalid")
192
return
193
x = undefined
194
blob = undefined
195
async.series([
196
(cb) =>
197
@_query
198
query : "SELECT expire, blob, gcloud, compress FROM blobs"
199
where : "id = $::UUID" : opts.uuid
200
cb : one_result (err, _x) =>
201
x = _x; cb(err)
202
(cb) =>
203
if not x?
204
# nothing to do -- blob not in db (probably expired)
205
cb()
206
else if x.expire and x.expire <= new Date()
207
# the blob already expired -- background delete it
208
@_query # delete it (but don't wait for this to finish)
209
query : "DELETE FROM blobs"
210
where : "id = $::UUID" : opts.uuid
211
cb()
212
else if x.blob?
213
# blob not expired and is in database
214
blob = x.blob
215
cb()
216
else if x.gcloud
217
if not COCALC_BLOB_STORE?
218
# see comment https://github.com/sagemathinc/cocalc/pull/8110
219
COCALC_BLOB_STORE = COCALC_BLOB_STORE_FALLBACK
220
# blob not available locally, but should be in a Google cloud storage bucket -- try to get it
221
# NOTE: we now ignore the actual content of x.gcloud -- we don't support spreading blobs
222
# across multiple buckets... as it isn't needed because buckets are infinite, and it
223
# is potentially confusing to manage.
224
@blob_store().read
225
name : opts.uuid
226
cb : (err, _blob) =>
227
if err
228
cb(err)
229
else
230
blob = _blob
231
cb()
232
if opts.save_in_db
233
# also save in database so will be faster next time (again, don't wait on this)
234
@_query # delete it (but don't wait for this to finish)
235
query : "UPDATE blobs"
236
set : {blob : blob}
237
where : "id = $::UUID" : opts.uuid
238
else
239
# blob not local and not in gcloud -- this shouldn't happen
240
# (just view this as "expired" by not setting blob)
241
cb()
242
(cb) =>
243
if not blob? or not x?.compress?
244
cb(); return
245
# blob is compressed -- decompress it
246
switch x.compress
247
when 'gzip'
248
zlib.gunzip blob, (err, _blob) =>
249
blob = _blob; cb(err)
250
when 'zlib'
251
zlib.inflate blob, (err, _blob) =>
252
blob = _blob; cb(err)
253
else
254
cb("compression format '#{x.compress}' not implemented")
255
], (err) =>
256
opts.cb(err, blob)
257
if blob? and opts.touch
258
# blob was pulled from db or gcloud, so note that it was accessed (updates a counter)
259
@touch_blob(uuid : opts.uuid)
260
)
261
262
touch_blob: (opts) =>
263
opts = defaults opts,
264
uuid : required
265
cb : undefined
266
if not misc.is_valid_uuid_string(opts.uuid)
267
opts.cb?("uuid is invalid")
268
return
269
@_query
270
query : "UPDATE blobs SET count = count + 1, last_active = NOW()"
271
where : "id = $::UUID" : opts.uuid
272
cb : opts.cb
273
274
blob_store: (bucket) =>
275
if not bucket
276
bucket = COCALC_BLOB_STORE
277
# File system -- could be a big NFS volume, remotely mounted gcsfuse, or just
278
# a single big local file system -- etc. -- we don't care.
279
return filesystem_bucket(name: bucket)
280
281
# Uploads the blob with given sha1 uuid to gcloud storage, if it hasn't already
282
# been uploaded there. Actually we copy to a directory, which uses gcsfuse to
283
# implicitly upload to gcloud...
284
copy_blob_to_gcloud: (opts) =>
285
opts = defaults opts,
286
uuid : required # uuid=sha1-based uuid coming from blob
287
bucket : COCALC_BLOB_STORE # name of bucket
288
force : false # if true, upload even if already uploaded
289
remove : false # if true, deletes blob from database after successful upload to gcloud (to free space)
290
cb : undefined # cb(err)
291
dbg = @_dbg("copy_blob_to_gcloud(uuid='#{opts.uuid}')")
292
dbg()
293
if not misc.is_valid_uuid_string(opts.uuid)
294
dbg("invalid uuid")
295
opts.cb?("uuid is invalid")
296
return
297
if not opts.bucket
298
opts.bucket = COCALC_BLOB_STORE_FALLBACK
299
locals =
300
x: undefined
301
async.series([
302
(cb) =>
303
dbg("get blob info from database")
304
@_query
305
query : "SELECT blob, gcloud FROM blobs"
306
where : "id = $::UUID" : opts.uuid
307
cb : one_result (err, x) =>
308
locals.x = x
309
if err
310
cb(err)
311
else if not x?
312
cb('no such blob')
313
else if not x.blob and not x.gcloud
314
cb('blob not available -- this should not be possible')
315
else if not x.blob and opts.force
316
cb("blob can't be re-uploaded since it was already deleted")
317
else
318
cb()
319
(cb) =>
320
if (locals.x.gcloud? and not opts.force) or not locals.x.blob?
321
dbg("already uploaded -- don't need to do anything; or already deleted locally")
322
cb(); return
323
# upload to Google cloud storage
324
locals.bucket = @blob_store(opts.bucket)
325
locals.bucket.write
326
name : opts.uuid
327
content : locals.x.blob
328
cb : cb
329
(cb) =>
330
if (locals.x.gcloud? and not opts.force) or not locals.x.blob?
331
# already uploaded -- don't need to do anything; or already deleted locally
332
cb(); return
333
dbg("read blob back and compare") # -- we do *NOT* trust GCS with such important data
334
locals.bucket.read
335
name : opts.uuid
336
cb : (err, data) =>
337
if err
338
cb(err)
339
else if not locals.x.blob.equals(data)
340
dbg("FAILED!")
341
cb("BLOB write to GCS failed check!")
342
else
343
dbg("check succeeded")
344
cb()
345
(cb) =>
346
if not locals.x.blob?
347
# no blob in db; nothing further to do.
348
cb()
349
else
350
# We successful upload to gcloud -- set locals.x.gcloud
351
set = {gcloud: opts.bucket}
352
if opts.remove
353
set.blob = null # remove blob content from database to save space
354
@_query
355
query : "UPDATE blobs"
356
where : "id = $::UUID" : opts.uuid
357
set : set
358
cb : cb
359
], (err) => opts.cb?(err))
360
361
###
362
Backup limit blobs that previously haven't been dumped to blobs, and put them in
363
a tarball in the given path. The tarball's name is the time when the backup starts.
364
The tarball is compressed using gzip compression.
365
366
db._error_thresh=1e6; db.backup_blobs_to_tarball(limit:10000,path:'/backup/tmp-blobs',repeat_until_done:60, cb:done())
367
368
I have not written code to restore from these tarballs. Assuming the database has been restored,
369
so there is an entry in the blobs table for each blob, it would suffice to upload the tarballs,
370
then copy their contents straight into the COCALC_BLOB_STORE, and that’s it.
371
If we don't have the blobs table in the DB, make dummy entries from the blob names in the tarballs.
372
###
373
backup_blobs_to_tarball: (opts) =>
374
opts = defaults opts,
375
limit : 10000 # number of blobs to backup
376
path : required # path where [timestamp].tar file is placed
377
throttle : 0 # wait this many seconds between pulling blobs from database
378
repeat_until_done : 0 # if positive, keeps re-call'ing this function until no more
379
# results to backup (pauses this many seconds between)
380
map_limit : 5
381
cb : undefined# cb(err, '[timestamp].tar')
382
dbg = @_dbg("backup_blobs_to_tarball(limit=#{opts.limit},path='#{opts.path}')")
383
join = require('path').join
384
dir = misc.date_to_snapshot_format(new Date())
385
target = join(opts.path, dir)
386
tarball = target + '.tar.gz'
387
v = undefined
388
to_remove = []
389
async.series([
390
(cb) =>
391
dbg("make target='#{target}'")
392
fs.mkdir(target, cb)
393
(cb) =>
394
dbg("get blobs that we need to back up")
395
@_query
396
query : "SELECT id FROM blobs"
397
where : "expire IS NULL and backup IS NOT true"
398
limit : opts.limit
399
timeout_s : TIMEOUT_LONG_S
400
cb : all_results 'id', (err, x) =>
401
v = x; cb(err)
402
(cb) =>
403
dbg("backing up #{v.length} blobs")
404
f = (id, cb) =>
405
@get_blob
406
uuid : id
407
touch : false
408
cb : (err, blob) =>
409
if err
410
dbg("ERROR! blob #{id} -- #{err}")
411
cb(err)
412
else if blob?
413
dbg("got blob #{id} from db -- now write to disk")
414
to_remove.push(id)
415
fs.writeFile join(target, id), blob, (err) =>
416
if opts.throttle
417
setTimeout(cb, opts.throttle*1000)
418
else
419
cb()
420
else
421
dbg("blob #{id} is expired, so nothing to be done, ever.")
422
cb()
423
async.mapLimit(v, opts.map_limit, f, cb)
424
(cb) =>
425
dbg("successfully wrote all blobs to files; now make tarball")
426
misc_node.execute_code
427
command : 'tar'
428
args : ['zcvf', tarball, dir]
429
path : opts.path
430
timeout : 3600
431
cb : cb
432
(cb) =>
433
dbg("remove temporary blobs")
434
f = (x, cb) =>
435
fs.unlink(join(target, x), cb)
436
async.mapLimit(to_remove, 10, f, cb)
437
(cb) =>
438
dbg("remove temporary directory")
439
fs.rmdir(target, cb)
440
(cb) =>
441
dbg("backup succeeded completely -- mark all blobs as backed up")
442
@_query
443
query : "UPDATE blobs"
444
set : {backup: true}
445
where : "id = ANY($)" : v
446
cb : cb
447
], (err) =>
448
if err
449
dbg("ERROR: #{err}")
450
opts.cb?(err)
451
else
452
dbg("done")
453
if opts.repeat_until_done and to_remove.length == opts.limit
454
f = () =>
455
@backup_blobs_to_tarball(opts)
456
setTimeout(f, opts.repeat_until_done*1000)
457
else
458
opts.cb?(undefined, tarball)
459
)
460
461
###
462
Copied all blobs that will never expire to a google cloud storage bucket.
463
464
errors={}; db.copy_all_blobs_to_gcloud(limit:500, cb:done(), remove:true, repeat_until_done_s:10, errors:errors)
465
###
466
copy_all_blobs_to_gcloud: (opts) =>
467
opts = defaults opts,
468
bucket : COCALC_BLOB_STORE
469
limit : 1000 # copy this many in each batch
470
map_limit : 1 # copy this many at once.
471
throttle : 0 # wait this many seconds between uploads
472
repeat_until_done_s : 0 # if nonzero, waits this many seconds, then calls this function again until nothing gets uploaded.
473
errors : undefined # object: used to accumulate errors -- if not given, then everything will terminate on first error
474
remove : false
475
cutoff : '1 month' # postgresql interval - only copy blobs to gcloud that haven't been accessed at least this long.
476
cb : required
477
dbg = @_dbg("copy_all_blobs_to_gcloud")
478
dbg()
479
# This query selects the blobs that will never expire, but have not yet
480
# been copied to Google cloud storage.
481
dbg("getting blob id's...")
482
@_query
483
query : 'SELECT id, size FROM blobs'
484
where : "expire IS NULL AND gcloud IS NULL and (last_active <= NOW() - INTERVAL '#{opts.cutoff}' OR last_active IS NULL)"
485
limit : opts.limit
486
timeout_s : TIMEOUT_LONG_S
487
## order_by : 'id' # this is not important and was causing VERY excessive load in production (due to bad query plannnig?!)
488
cb : all_results (err, v) =>
489
if err
490
dbg("fail: #{err}")
491
opts.cb(err)
492
else
493
n = v.length; m = 0
494
dbg("got #{n} blob id's")
495
f = (x, cb) =>
496
m += 1
497
k = m; start = new Date()
498
dbg("**** #{k}/#{n}: uploading #{x.id} of size #{x.size/1000}KB")
499
@copy_blob_to_gcloud
500
uuid : x.id
501
bucket : opts.bucket
502
remove : opts.remove
503
cb : (err) =>
504
dbg("**** #{k}/#{n}: finished -- #{err}; size #{x.size/1000}KB; time=#{new Date() - start}ms")
505
if err
506
if opts.error?
507
opts.errors[x.id] = err
508
else
509
cb(err)
510
if opts.throttle
511
setTimeout(cb, 1000*opts.throttle)
512
else
513
cb()
514
async.mapLimit v, opts.map_limit, f, (err) =>
515
dbg("finished this round -- #{err}")
516
if err and not opts.errors?
517
opts.cb(err)
518
return
519
if opts.repeat_until_done_s and v.length > 0
520
dbg("repeat_until_done triggering another round")
521
setTimeout((=> @copy_all_blobs_to_gcloud(opts)), opts.repeat_until_done_s*1000)
522
else
523
dbg("done : #{misc.to_json(opts.errors)}")
524
opts.cb(if misc.len(opts.errors) > 0 then opts.errors)
525
526
blob_maintenance: (opts) =>
527
opts = defaults opts,
528
path : '/backup/blobs'
529
map_limit : 1
530
blobs_per_tarball : 10000
531
throttle : 0
532
cb : undefined
533
dbg = @_dbg("blob_maintenance()")
534
dbg()
535
async.series([
536
(cb) =>
537
dbg("maintain the patches and syncstrings")
538
@syncstring_maintenance
539
repeat_until_done : true
540
limit : 500
541
map_limit : opts.map_limit
542
delay : 1000 # 1s, since syncstring_maintence heavily loads db
543
cb : cb
544
(cb) =>
545
dbg("backup_blobs_to_tarball")
546
@backup_blobs_to_tarball
547
throttle : opts.throttle
548
limit : opts.blobs_per_tarball
549
path : opts.path
550
map_limit : opts.map_limit
551
repeat_until_done : 5
552
cb : cb
553
(cb) =>
554
dbg("copy_all_blobs_to_gcloud")
555
errors = {}
556
@copy_all_blobs_to_gcloud
557
limit : 1000
558
repeat_until_done_s : 5
559
errors : errors
560
remove : true
561
map_limit : opts.map_limit
562
throttle : opts.throttle
563
cb : (err) =>
564
if misc.len(errors) > 0
565
dbg("errors! #{misc.to_json(errors)}")
566
cb(err)
567
], (err) =>
568
opts.cb?(err)
569
)
570
571
remove_blob_ttls: (opts) =>
572
opts = defaults opts,
573
uuids : required # uuid=sha1-based from blob
574
cb : required # cb(err)
575
@_query
576
query : "UPDATE blobs"
577
set : {expire: null}
578
where : "id::UUID = ANY($)" : (x for x in opts.uuids when misc.is_valid_uuid_string(x))
579
cb : opts.cb
580
581
# If blob has been copied to gcloud, remove the BLOB part of the data
582
# from the database (to save space). If not copied, copy it to gcloud,
583
# then remove from database.
584
close_blob: (opts) =>
585
opts = defaults opts,
586
uuid : required # uuid=sha1-based from blob
587
bucket : COCALC_BLOB_STORE
588
cb : undefined # cb(err)
589
if not misc.is_valid_uuid_string(opts.uuid)
590
opts.cb?("uuid is invalid")
591
return
592
async.series([
593
(cb) =>
594
# ensure blob is in gcloud
595
@_query
596
query : 'SELECT gcloud FROM blobs'
597
where : 'id = $::UUID' : opts.uuid
598
cb : one_result 'gcloud', (err, gcloud) =>
599
if err
600
cb(err)
601
else if not gcloud
602
# not yet copied to gcloud storage
603
@copy_blob_to_gcloud
604
uuid : opts.uuid
605
bucket : opts.bucket
606
cb : cb
607
else
608
# copied already
609
cb()
610
(cb) =>
611
# now blob is in gcloud -- delete blob data in database
612
@_query
613
query : 'SELECT gcloud FROM blobs'
614
where : 'id = $::UUID' : opts.uuid
615
set : {blob: null}
616
cb : cb
617
], (err) => opts.cb?(err))
618
619
620
621
###
622
# Syncstring maintenance
623
###
624
syncstring_maintenance: (opts) =>
625
opts = defaults opts,
626
age_days : 30 # archive patches of syncstrings that are inactive for at least this long
627
map_limit : 1 # how much parallelism to use
628
limit : 1000 # do only this many
629
repeat_until_done : true
630
delay : 0
631
cb : undefined
632
dbg = @_dbg("syncstring_maintenance")
633
dbg(opts)
634
syncstrings = undefined
635
async.series([
636
(cb) =>
637
dbg("determine inactive syncstring ids")
638
@_query
639
query : 'SELECT string_id FROM syncstrings'
640
where : [{'last_active <= $::TIMESTAMP' : misc.days_ago(opts.age_days)}, 'archived IS NULL', 'huge IS NOT TRUE']
641
limit : opts.limit
642
timeout_s : TIMEOUT_LONG_S
643
cb : all_results 'string_id', (err, v) =>
644
syncstrings = v
645
cb(err)
646
(cb) =>
647
dbg("archive patches for inactive syncstrings")
648
i = 0
649
f = (string_id, cb) =>
650
i += 1
651
console.log("*** #{i}/#{syncstrings.length}: archiving string #{string_id} ***")
652
@archivePatches
653
string_id : string_id
654
cb : (err) ->
655
if err or not opts.delay
656
cb(err)
657
else
658
setTimeout(cb, opts.delay)
659
async.mapLimit(syncstrings, opts.map_limit, f, cb)
660
], (err) =>
661
if err
662
opts.cb?(err)
663
else if opts.repeat_until_done and syncstrings.length == opts.limit
664
dbg("doing it again")
665
@syncstring_maintenance(opts)
666
else
667
opts.cb?()
668
)
669
670
archivePatches: (opts) =>
671
try
672
await blobs.archivePatches({db:@, ...opts})
673
opts.cb?()
674
catch err
675
opts.cb?(err)
676
677
unarchivePatches: (string_id) =>
678
await blobs.unarchivePatches({db:@, string_id:string_id})
679
680
###
681
Export/import of syncstring history and info.
682
###
683
export_patches: (opts) =>
684
try
685
patches = await blobs.exportPatches(opts.string_id);
686
opts.cb(undefined, patches)
687
return patches
688
catch err
689
opts.cb(err)
690
691
import_patches: (opts) =>
692
opts = defaults opts,
693
patches : required # array as exported by export_patches
694
string_id : undefined # if given, change the string_id when importing the patches to this
695
cb : undefined
696
patches = opts.patches
697
if patches.length == 0 # easy
698
opts.cb?()
699
return
700
if patches[0].id?
701
# convert from OLD RethinkDB format!
702
v = []
703
for x in patches
704
patch =
705
string_id : x.id[0]
706
time : new Date(x.id[1])
707
user_id : x.user
708
patch : x.patch
709
snapshot : x.snapshot
710
sent : x.sent
711
prev : x.prev
712
v.push(patch)
713
patches = v
714
# change string_id, if requested.
715
if opts.string_id?
716
for x in patches
717
x.string_id = opts.string_id
718
# We break into blocks since there is limit (about 65K) on
719
# number of params that can be inserted in a single query.
720
insert_block_size = 1000
721
f = (i, cb) =>
722
@_query
723
query : 'INSERT INTO patches'
724
values : patches.slice(insert_block_size*i, insert_block_size*(i+1))
725
conflict : 'ON CONFLICT DO NOTHING' # in case multiple servers (or this server) are doing this import at once -- this can and does happen sometimes.
726
cb : cb
727
async.mapSeries([0...patches.length/insert_block_size], f, (err) => opts.cb?(err))
728
729
delete_blob: (opts) =>
730
opts = defaults opts,
731
uuid : required
732
cb : undefined
733
if not misc.is_valid_uuid_string(opts.uuid)
734
opts.cb?("uuid is invalid")
735
return
736
gcloud = undefined
737
dbg = @_dbg("delete_blob(uuid='#{opts.uuid}')")
738
async.series([
739
(cb) =>
740
dbg("check if blob in gcloud")
741
@_query
742
query : "SELECT gcloud FROM blobs"
743
where : "id = $::UUID" : opts.uuid
744
cb : one_result 'gcloud', (err, x) =>
745
gcloud = x
746
cb(err)
747
(cb) =>
748
if not gcloud or not COCALC_BLOB_STORE
749
cb()
750
return
751
dbg("delete from gcloud")
752
@blob_store(gcloud).delete
753
name : opts.uuid
754
cb : cb
755
(cb) =>
756
dbg("delete from local database")
757
@_query
758
query : "DELETE FROM blobs"
759
where : "id = $::UUID" : opts.uuid
760
cb : cb
761
], (err) => opts.cb?(err))
762
763
764
765