Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/database/postgres-blobs.coffee
Views: 687
#########################################################################1# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2# License: MS-RSL – see LICENSE.md for details3#########################################################################45###6PostgreSQL -- implementation of queries needed for storage and managing blobs,7including backups, integration with google cloud storage, etc.89COPYRIGHT : (c) 2017 SageMath, Inc.10LICENSE : MS-RSL11###1213# Bucket used for cheaper longterm storage of blobs (outside of PostgreSQL).14# NOTE: We should add this to site configuration, and have it get read once when first15# needed and cached. Also it would be editable in admin account settings.16# If this env variable begins with a / it is assumed to be a path in the file system,17# e.g., a remote mount (in practice, we are using gcsfuse to mount gcloud buckets).18# If it is gs:// then it is a google cloud storage bucket.19COCALC_BLOB_STORE = process.env.COCALC_BLOB_STORE2021async = require('async')22zlib = require('zlib')23fs = require('fs')2425misc_node = require('@cocalc/backend/misc_node')2627{defaults} = misc = require('@cocalc/util/misc')28required = defaults.required2930{expire_time, one_result, all_results} = require('./postgres-base')31{delete_patches} = require('./postgres/delete-patches')3233{filesystem_bucket} = require('./filesystem-bucket')3435# some queries do searches, which could take a bit. we give them 5 minutes …36TIMEOUT_LONG_S = 3003738exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext39save_blob: (opts) =>40opts = defaults opts,41uuid : undefined # uuid=sha1-based id coming from blob42blob : required # unless check=true, we assume misc_node.uuidsha1(opts.blob) == opts.uuid;43# blob must be a string or Buffer44ttl : 0 # object in blobstore will have *at least* this ttl in seconds;45# if there is already something in blobstore with longer ttl, we leave it;46# infinite ttl = 0.47project_id : required # the id of the project that is saving the blob48check : false # if true, will give error if misc_node.uuidsha1(opts.blob) != opts.uuid49compress : undefined # optional compression to use: 'gzip', 'zlib'; only used if blob not already in db.50level : -1 # compression level (if compressed) -- see https://github.com/expressjs/compression#level51cb : required # cb(err, ttl actually used in seconds); ttl=0 for infinite ttl52if not Buffer.isBuffer(opts.blob)53# CRITICAL: We assume everywhere below that opts.blob is a54# buffer, e.g., in the .toString('hex') method!55opts.blob = Buffer.from(opts.blob)56if not opts.uuid?57opts.uuid = misc_node.uuidsha1(opts.blob)58else if opts.check59uuid = misc_node.uuidsha1(opts.blob)60if uuid != opts.uuid61opts.cb("the sha1 uuid (='#{uuid}') of the blob must equal the given uuid (='#{opts.uuid}')")62return63if not misc.is_valid_uuid_string(opts.uuid)64opts.cb("uuid is invalid")65return66dbg = @_dbg("save_blob(uuid='#{opts.uuid}')")67dbg()68rows = ttl = undefined69async.series([70(cb) =>71@_query72query : 'SELECT expire FROM blobs'73where : "id = $::UUID" : opts.uuid74cb : (err, x) =>75rows = x?.rows; cb(err)76(cb) =>77if rows.length == 0 and opts.compress78dbg("compression requested and blob not already saved, so we compress blob")79switch opts.compress80when 'gzip'81zlib.gzip opts.blob, {level:opts.level}, (err, blob) =>82opts.blob = blob; cb(err)83when 'zlib'84zlib.deflate opts.blob, {level:opts.level}, (err, blob) =>85opts.blob = blob; cb(err)86else87cb("compression format '#{opts.compress}' not implemented")88else89cb()90(cb) =>91if rows.length == 092dbg("nothing in DB, so we insert the blob.")93ttl = opts.ttl94@_query95query : "INSERT INTO blobs"96values :97id : opts.uuid98blob : '\\x'+opts.blob.toString('hex')99project_id : opts.project_id100count : 0101size : opts.blob.length102created : new Date()103compress : opts.compress104expire : if ttl then expire_time(ttl)105cb : cb106else107dbg("blob already in the DB, so see if we need to change the expire time")108@_extend_blob_ttl109expire : rows[0].expire110ttl : opts.ttl111uuid : opts.uuid112cb : (err, _ttl) =>113ttl = _ttl; cb(err)114(cb) =>115# double check that the blob definitely exists and has correct expire116# See discussion at https://github.com/sagemathinc/cocalc/issues/7715117# The problem is that maybe with VERY low probability somehow we extend118# the blob ttl at the same time that we're deleting blobs and the extend119# is too late and does an empty update.120@_query121query : 'SELECT expire FROM blobs'122where : "id = $::UUID" : opts.uuid123cb : (err, x) =>124if err125cb(err)126return127# some consistency checks128rows = x?.rows129if rows.length == 0130cb("blob got removed while saving it")131return132if !opts.ttl and rows[0].expire133cb("blob should have infinite ttl but it has expire set")134return135cb()136137], (err) => opts.cb(err, ttl))138139# Used internally by save_blob to possibly extend the expire time of a blob.140_extend_blob_ttl : (opts) =>141opts = defaults opts,142expire : undefined # what expire is currently set to in the database143ttl : required # requested ttl -- extend expire to at least this144uuid : required145cb : required # (err, effective ttl (with 0=oo))146if not misc.is_valid_uuid_string(opts.uuid)147opts.cb("uuid is invalid")148return149if not opts.expire150# ttl already infinite -- nothing to do151opts.cb(undefined, 0)152return153new_expire = ttl = undefined154if opts.ttl155# saved ttl is finite as is requested one; change in DB if requested is longer156z = expire_time(opts.ttl)157if z > opts.expire158new_expire = z159ttl = opts.ttl160else161ttl = (opts.expire - new Date())/1000.0162else163# saved ttl is finite but requested one is infinite164ttl = new_expire = 0165if new_expire?166# change the expire time for the blob already in the DB167@_query168query : 'UPDATE blobs'169where : "id = $::UUID" : opts.uuid170set : "expire :: TIMESTAMP " : if new_expire == 0 then undefined else new_expire171cb : (err) => opts.cb(err, ttl)172else173opts.cb(undefined, ttl)174175get_blob: (opts) =>176opts = defaults opts,177uuid : required178save_in_db : false # if true and blob isn't in DB and is only in gcloud, copies to local DB179# (for faster access e.g., 20ms versus 5ms -- i.e., not much faster; gcloud is FAST too.)180touch : true181cb : required # cb(err) or cb(undefined, blob_value) or cb(undefined, undefined) in case no such blob182if not misc.is_valid_uuid_string(opts.uuid)183opts.cb("uuid is invalid")184return185x = undefined186blob = undefined187async.series([188(cb) =>189@_query190query : "SELECT expire, blob, gcloud, compress FROM blobs"191where : "id = $::UUID" : opts.uuid192cb : one_result (err, _x) =>193x = _x; cb(err)194(cb) =>195if not x?196# nothing to do -- blob not in db (probably expired)197cb()198else if x.expire and x.expire <= new Date()199# the blob already expired -- background delete it200@_query # delete it (but don't wait for this to finish)201query : "DELETE FROM blobs"202where : "id = $::UUID" : opts.uuid203cb()204else if x.blob?205# blob not expired and is in database206blob = x.blob207cb()208else if x.gcloud209if not COCALC_BLOB_STORE?210cb("no blob store configured -- set the COCALC_BLOB_STORE env variable")211return212# blob not available locally, but should be in a Google cloud storage bucket -- try to get it213# NOTE: we now ignore the actual content of x.gcloud -- we don't support spreading blobs214# across multiple buckets... as it isn't needed because buckets are infinite, and it215# is potentially confusing to manage.216@blob_store().read217name : opts.uuid218cb : (err, _blob) =>219if err220cb(err)221else222blob = _blob223cb()224if opts.save_in_db225# also save in database so will be faster next time (again, don't wait on this)226@_query # delete it (but don't wait for this to finish)227query : "UPDATE blobs"228set : {blob : blob}229where : "id = $::UUID" : opts.uuid230else231# blob not local and not in gcloud -- this shouldn't happen232# (just view this as "expired" by not setting blob)233cb()234(cb) =>235if not blob? or not x?.compress?236cb(); return237# blob is compressed -- decompress it238switch x.compress239when 'gzip'240zlib.gunzip blob, (err, _blob) =>241blob = _blob; cb(err)242when 'zlib'243zlib.inflate blob, (err, _blob) =>244blob = _blob; cb(err)245else246cb("compression format '#{x.compress}' not implemented")247], (err) =>248opts.cb(err, blob)249if blob? and opts.touch250# blob was pulled from db or gcloud, so note that it was accessed (updates a counter)251@touch_blob(uuid : opts.uuid)252)253254touch_blob: (opts) =>255opts = defaults opts,256uuid : required257cb : undefined258if not misc.is_valid_uuid_string(opts.uuid)259opts.cb?("uuid is invalid")260return261@_query262query : "UPDATE blobs SET count = count + 1, last_active = NOW()"263where : "id = $::UUID" : opts.uuid264cb : opts.cb265266blob_store: (bucket) =>267if not bucket268bucket = COCALC_BLOB_STORE269# File system -- could be a big NFS volume, remotely mounted gcsfuse, or just270# a single big local file system -- etc. -- we don't care.271return filesystem_bucket(name: bucket)272273# Uploads the blob with given sha1 uuid to gcloud storage, if it hasn't already274# been uploaded there. Actually we copy to a directory, which uses gcsfuse to275# implicitly upload to gcloud...276copy_blob_to_gcloud: (opts) =>277opts = defaults opts,278uuid : required # uuid=sha1-based uuid coming from blob279bucket : COCALC_BLOB_STORE # name of bucket280force : false # if true, upload even if already uploaded281remove : false # if true, deletes blob from database after successful upload to gcloud (to free space)282cb : undefined # cb(err)283dbg = @_dbg("copy_blob_to_gcloud(uuid='#{opts.uuid}')")284dbg()285if not misc.is_valid_uuid_string(opts.uuid)286dbg("invalid uuid")287opts.cb?("uuid is invalid")288return289if not opts.bucket290dbg("invalid bucket")291opts.cb?("no blob store configured -- set the COCALC_BLOB_STORE env variable")292return293locals =294x: undefined295async.series([296(cb) =>297dbg("get blob info from database")298@_query299query : "SELECT blob, gcloud FROM blobs"300where : "id = $::UUID" : opts.uuid301cb : one_result (err, x) =>302locals.x = x303if err304cb(err)305else if not x?306cb('no such blob')307else if not x.blob and not x.gcloud308cb('blob not available -- this should not be possible')309else if not x.blob and opts.force310cb("blob can't be re-uploaded since it was already deleted")311else312cb()313(cb) =>314if (locals.x.gcloud? and not opts.force) or not locals.x.blob?315dbg("already uploaded -- don't need to do anything; or already deleted locally")316cb(); return317# upload to Google cloud storage318locals.bucket = @blob_store(opts.bucket)319locals.bucket.write320name : opts.uuid321content : locals.x.blob322cb : cb323(cb) =>324if (locals.x.gcloud? and not opts.force) or not locals.x.blob?325# already uploaded -- don't need to do anything; or already deleted locally326cb(); return327dbg("read blob back and compare") # -- we do *NOT* trust GCS with such important data328locals.bucket.read329name : opts.uuid330cb : (err, data) =>331if err332cb(err)333else if not locals.x.blob.equals(data)334dbg("FAILED!")335cb("BLOB write to GCS failed check!")336else337dbg("check succeeded")338cb()339(cb) =>340if not locals.x.blob?341# no blob in db; nothing further to do.342cb()343else344# We successful upload to gcloud -- set locals.x.gcloud345set = {gcloud: opts.bucket}346if opts.remove347set.blob = null # remove blob content from database to save space348@_query349query : "UPDATE blobs"350where : "id = $::UUID" : opts.uuid351set : set352cb : cb353], (err) => opts.cb?(err))354355###356Backup limit blobs that previously haven't been dumped to blobs, and put them in357a tarball in the given path. The tarball's name is the time when the backup starts.358The tarball is compressed using gzip compression.359360db._error_thresh=1e6; db.backup_blobs_to_tarball(limit:10000,path:'/backup/tmp-blobs',repeat_until_done:60, cb:done())361362I have not written code to restore from these tarballs. Assuming the database has been restored,363so there is an entry in the blobs table for each blob, it would suffice to upload the tarballs,364then copy their contents straight into the COCALC_BLOB_STORE, and that’s it.365If we don't have the blobs table in the DB, make dummy entries from the blob names in the tarballs.366###367backup_blobs_to_tarball: (opts) =>368opts = defaults opts,369limit : 10000 # number of blobs to backup370path : required # path where [timestamp].tar file is placed371throttle : 0 # wait this many seconds between pulling blobs from database372repeat_until_done : 0 # if positive, keeps re-call'ing this function until no more373# results to backup (pauses this many seconds between)374map_limit : 5375cb : undefined# cb(err, '[timestamp].tar')376dbg = @_dbg("backup_blobs_to_tarball(limit=#{opts.limit},path='#{opts.path}')")377join = require('path').join378dir = misc.date_to_snapshot_format(new Date())379target = join(opts.path, dir)380tarball = target + '.tar.gz'381v = undefined382to_remove = []383async.series([384(cb) =>385dbg("make target='#{target}'")386fs.mkdir(target, cb)387(cb) =>388dbg("get blobs that we need to back up")389@_query390query : "SELECT id FROM blobs"391where : "expire IS NULL and backup IS NOT true"392limit : opts.limit393timeout_s : TIMEOUT_LONG_S394cb : all_results 'id', (err, x) =>395v = x; cb(err)396(cb) =>397dbg("backing up #{v.length} blobs")398f = (id, cb) =>399@get_blob400uuid : id401touch : false402cb : (err, blob) =>403if err404dbg("ERROR! blob #{id} -- #{err}")405cb(err)406else if blob?407dbg("got blob #{id} from db -- now write to disk")408to_remove.push(id)409fs.writeFile join(target, id), blob, (err) =>410if opts.throttle411setTimeout(cb, opts.throttle*1000)412else413cb()414else415dbg("blob #{id} is expired, so nothing to be done, ever.")416cb()417async.mapLimit(v, opts.map_limit, f, cb)418(cb) =>419dbg("successfully wrote all blobs to files; now make tarball")420misc_node.execute_code421command : 'tar'422args : ['zcvf', tarball, dir]423path : opts.path424timeout : 3600425cb : cb426(cb) =>427dbg("remove temporary blobs")428f = (x, cb) =>429fs.unlink(join(target, x), cb)430async.mapLimit(to_remove, 10, f, cb)431(cb) =>432dbg("remove temporary directory")433fs.rmdir(target, cb)434(cb) =>435dbg("backup succeeded completely -- mark all blobs as backed up")436@_query437query : "UPDATE blobs"438set : {backup: true}439where : "id = ANY($)" : v440cb : cb441], (err) =>442if err443dbg("ERROR: #{err}")444opts.cb?(err)445else446dbg("done")447if opts.repeat_until_done and to_remove.length == opts.limit448f = () =>449@backup_blobs_to_tarball(opts)450setTimeout(f, opts.repeat_until_done*1000)451else452opts.cb?(undefined, tarball)453)454455###456Copied all blobs that will never expire to a google cloud storage bucket.457458errors={}; db.copy_all_blobs_to_gcloud(limit:500, cb:done(), remove:true, repeat_until_done_s:10, errors:errors)459###460copy_all_blobs_to_gcloud: (opts) =>461opts = defaults opts,462bucket : COCALC_BLOB_STORE463limit : 1000 # copy this many in each batch464map_limit : 1 # copy this many at once.465throttle : 0 # wait this many seconds between uploads466repeat_until_done_s : 0 # if nonzero, waits this many seconds, then calls this function again until nothing gets uploaded.467errors : undefined # object: used to accumulate errors -- if not given, then everything will terminate on first error468remove : false469cutoff : '1 month' # postgresql interval - only copy blobs to gcloud that haven't been accessed at least this long.470cb : required471dbg = @_dbg("copy_all_blobs_to_gcloud")472dbg()473# This query selects the blobs that will never expire, but have not yet474# been copied to Google cloud storage.475dbg("getting blob id's...")476@_query477query : 'SELECT id, size FROM blobs'478where : "expire IS NULL AND gcloud IS NULL and (last_active <= NOW() - INTERVAL '#{opts.cutoff}' OR last_active IS NULL)"479limit : opts.limit480timeout_s : TIMEOUT_LONG_S481## order_by : 'id' # this is not important and was causing VERY excessive load in production (due to bad query plannnig?!)482cb : all_results (err, v) =>483if err484dbg("fail: #{err}")485opts.cb(err)486else487n = v.length; m = 0488dbg("got #{n} blob id's")489f = (x, cb) =>490m += 1491k = m; start = new Date()492dbg("**** #{k}/#{n}: uploading #{x.id} of size #{x.size/1000}KB")493@copy_blob_to_gcloud494uuid : x.id495bucket : opts.bucket496remove : opts.remove497cb : (err) =>498dbg("**** #{k}/#{n}: finished -- #{err}; size #{x.size/1000}KB; time=#{new Date() - start}ms")499if err500if opts.error?501opts.errors[x.id] = err502else503cb(err)504if opts.throttle505setTimeout(cb, 1000*opts.throttle)506else507cb()508async.mapLimit v, opts.map_limit, f, (err) =>509dbg("finished this round -- #{err}")510if err and not opts.errors?511opts.cb(err)512return513if opts.repeat_until_done_s and v.length > 0514dbg("repeat_until_done triggering another round")515setTimeout((=> @copy_all_blobs_to_gcloud(opts)), opts.repeat_until_done_s*1000)516else517dbg("done : #{misc.to_json(opts.errors)}")518opts.cb(if misc.len(opts.errors) > 0 then opts.errors)519520blob_maintenance: (opts) =>521opts = defaults opts,522path : '/backup/blobs'523map_limit : 1524blobs_per_tarball : 10000525throttle : 0526cb : undefined527dbg = @_dbg("blob_maintenance()")528dbg()529async.series([530(cb) =>531dbg("maintain the patches and syncstrings")532@syncstring_maintenance533repeat_until_done : true534limit : 500535map_limit : opts.map_limit536delay : 1000 # 1s, since syncstring_maintence heavily loads db537cb : cb538(cb) =>539dbg("backup_blobs_to_tarball")540@backup_blobs_to_tarball541throttle : opts.throttle542limit : opts.blobs_per_tarball543path : opts.path544map_limit : opts.map_limit545repeat_until_done : 5546cb : cb547(cb) =>548dbg("copy_all_blobs_to_gcloud")549errors = {}550@copy_all_blobs_to_gcloud551limit : 1000552repeat_until_done_s : 5553errors : errors554remove : true555map_limit : opts.map_limit556throttle : opts.throttle557cb : (err) =>558if misc.len(errors) > 0559dbg("errors! #{misc.to_json(errors)}")560cb(err)561], (err) =>562opts.cb?(err)563)564565remove_blob_ttls: (opts) =>566opts = defaults opts,567uuids : required # uuid=sha1-based from blob568cb : required # cb(err)569@_query570query : "UPDATE blobs"571set : {expire: null}572where : "id::UUID = ANY($)" : (x for x in opts.uuids when misc.is_valid_uuid_string(x))573cb : opts.cb574575# If blob has been copied to gcloud, remove the BLOB part of the data576# from the database (to save space). If not copied, copy it to gcloud,577# then remove from database.578close_blob: (opts) =>579opts = defaults opts,580uuid : required # uuid=sha1-based from blob581bucket : COCALC_BLOB_STORE582cb : undefined # cb(err)583if not misc.is_valid_uuid_string(opts.uuid)584opts.cb?("uuid is invalid")585return586async.series([587(cb) =>588# ensure blob is in gcloud589@_query590query : 'SELECT gcloud FROM blobs'591where : 'id = $::UUID' : opts.uuid592cb : one_result 'gcloud', (err, gcloud) =>593if err594cb(err)595else if not gcloud596# not yet copied to gcloud storage597@copy_blob_to_gcloud598uuid : opts.uuid599bucket : opts.bucket600cb : cb601else602# copied already603cb()604(cb) =>605# now blob is in gcloud -- delete blob data in database606@_query607query : 'SELECT gcloud FROM blobs'608where : 'id = $::UUID' : opts.uuid609set : {blob: null}610cb : cb611], (err) => opts.cb?(err))612613614615###616# Syncstring maintenance617###618syncstring_maintenance: (opts) =>619opts = defaults opts,620age_days : 30 # archive patches of syncstrings that are inactive for at least this long621map_limit : 1 # how much parallelism to use622limit : 1000 # do only this many623repeat_until_done : true624delay : 0625cb : undefined626dbg = @_dbg("syncstring_maintenance")627dbg(opts)628syncstrings = undefined629async.series([630(cb) =>631dbg("determine inactive syncstring ids")632@_query633query : 'SELECT string_id FROM syncstrings'634where : [{'last_active <= $::TIMESTAMP' : misc.days_ago(opts.age_days)}, 'archived IS NULL', 'huge IS NOT TRUE']635limit : opts.limit636timeout_s : TIMEOUT_LONG_S637cb : all_results 'string_id', (err, v) =>638syncstrings = v639cb(err)640(cb) =>641dbg("archive patches for inactive syncstrings")642i = 0643f = (string_id, cb) =>644i += 1645console.log("*** #{i}/#{syncstrings.length}: archiving string #{string_id} ***")646@archive_patches647string_id : string_id648cb : (err) ->649if err or not opts.delay650cb(err)651else652setTimeout(cb, opts.delay)653async.mapLimit(syncstrings, opts.map_limit, f, cb)654], (err) =>655if err656opts.cb?(err)657else if opts.repeat_until_done and syncstrings.length == opts.limit658dbg("doing it again")659@syncstring_maintenance(opts)660else661opts.cb?()662)663664# Offlines and archives the patch, unless the string is active very recently, in665# which case this is a no-op.666#667# TODO: this ignores all syncstrings marked as "huge:true", because the patches are too large.668# come up with a better strategy (incremental?) to generate the blobs to avoid the problem.669archive_patches: (opts) =>670opts = defaults opts,671string_id : required672compress : 'zlib'673level : -1 # the default674cutoff : misc.minutes_ago(30) # never touch anything this new675cb : undefined676dbg = @_dbg("archive_patches(string_id='#{opts.string_id}')")677syncstring = patches = blob_uuid = project_id = last_active = huge = undefined678where = {"string_id = $::CHAR(40)" : opts.string_id}679async.series([680(cb) =>681dbg("get syncstring info")682@_query683query : "SELECT project_id, archived, last_active, huge FROM syncstrings"684where : where685cb : one_result (err, x) =>686if err687cb(err)688else if not x?689cb("no such syncstring with id '#{opts.string_id}'")690else if x.archived691cb("string_id='#{opts.string_id}' already archived as blob id '#{x.archived}'")692else693project_id = x.project_id694last_active = x.last_active695huge = !!x.huge696dbg("got last_active=#{last_active} project_id=#{project_id} huge=#{huge}")697cb()698(cb) =>699if last_active? and last_active >= opts.cutoff700dbg("excluding due to cutoff")701cb(); return702if huge703dbg("excluding due to being huge")704cb(); return705dbg("get patches")706@export_patches707string_id : opts.string_id708cb : (err, x) =>709patches = x710cb(err)711(cb) =>712if last_active? and last_active >= opts.cutoff713cb(); return714if huge715cb(); return716dbg("create blob from patches")717try718blob = Buffer.from(JSON.stringify(patches))719catch err720# TODO: This *will* happen if the total length of all patches is too big.721# need to break patches up...722# This is not exactly the end of the world as the entire point of all this is to723# just save some space in the database...724dbg('error creating blob, marking syncstring as being "huge": ' + err)725huge = true726@_query727query : "UPDATE syncstrings"728set : {huge : true}729where : where730cb : (err) =>731cb(err)732return733if not huge734dbg('save blob')735blob_uuid = misc_node.uuidsha1(blob)736@save_blob737uuid : blob_uuid738blob : blob739project_id : project_id740compress : opts.compress741level : opts.level742cb : cb743(cb) =>744if last_active? and last_active >= opts.cutoff745cb(); return746if huge747cb(); return748dbg("update syncstring to indicate patches have been archived in a blob")749@_query750query : "UPDATE syncstrings"751set : {archived : blob_uuid}752where : where753cb : cb754(cb) =>755if last_active? and last_active >= opts.cutoff756cb(); return757if huge758cb(); return759dbg("actually deleting patches")760delete_patches(db:@, string_id: opts.string_id, cb:cb)761], (err) => opts.cb?(err))762763unarchive_patches: (opts) =>764opts = defaults opts,765string_id : required766cb : undefined767dbg = @_dbg("unarchive_patches(string_id='#{opts.string_id}')")768where = {"string_id = $::CHAR(40)" : opts.string_id}769@_query770query : "SELECT archived FROM syncstrings"771where : where772cb : one_result 'archived', (err, blob_uuid) =>773if err or not blob_uuid?774opts.cb?(err)775return776blob = undefined777async.series([778#(cb) =>779# For testing only!780# setTimeout(cb, 7000)781(cb) =>782dbg("download blob")783@get_blob784uuid : blob_uuid785cb : (err, x) =>786if err787cb(err)788else if not x?789cb("blob is gone")790else791blob = x792cb(err)793(cb) =>794dbg("extract blob")795try796patches = JSON.parse(blob)797catch e798cb("corrupt patches blob -- #{e}")799return800@import_patches801patches : patches802cb : cb803(cb) =>804async.parallel([805(cb) =>806dbg("update syncstring to indicate that patches are now available")807@_query808query : "UPDATE syncstrings SET archived=NULL"809where : where810cb : cb811(cb) =>812dbg('delete blob, which is no longer needed')813@delete_blob814uuid : blob_uuid815cb : cb816], cb)817], (err) => opts.cb?(err))818819###820Export/import of syncstring history and info. Right now used mainly for debugging821purposes, but will obviously be useful for a user-facing feature involving import822and export (and copying) of complete edit history.823###824export_patches: (opts) =>825opts = defaults opts,826string_id : required827cb : required # cb(err, array)828@_query829query : "SELECT * FROM patches"830where : {"string_id = $::CHAR(40)" : opts.string_id}831cb : all_results (err, patches) =>832if err833opts.cb(err)834else835opts.cb(undefined, patches)836837import_patches: (opts) =>838opts = defaults opts,839patches : required # array as exported by export_patches840string_id : undefined # if given, change the string_id when importing the patches to this841cb : undefined842patches = opts.patches843if patches.length == 0 # easy844opts.cb?()845return846if patches[0].id?847# convert from OLD RethinkDB format!848v = []849for x in patches850patch =851string_id : x.id[0]852time : new Date(x.id[1])853user_id : x.user854patch : x.patch855snapshot : x.snapshot856sent : x.sent857prev : x.prev858v.push(patch)859patches = v860# change string_id, if requested.861if opts.string_id?862for x in patches863x.string_id = opts.string_id864# We break into blocks since there is limit (about 65K) on865# number of params that can be inserted in a single query.866insert_block_size = 1000867f = (i, cb) =>868@_query869query : 'INSERT INTO patches'870values : patches.slice(insert_block_size*i, insert_block_size*(i+1))871conflict : 'ON CONFLICT DO NOTHING' # in case multiple servers (or this server) are doing this import at once -- this can and does happen sometimes.872cb : cb873async.mapSeries([0...patches.length/insert_block_size], f, (err) => opts.cb?(err))874875delete_blob: (opts) =>876opts = defaults opts,877uuid : required878cb : undefined879if not misc.is_valid_uuid_string(opts.uuid)880opts.cb?("uuid is invalid")881return882gcloud = undefined883dbg = @_dbg("delete_blob(uuid='#{opts.uuid}')")884async.series([885(cb) =>886dbg("check if blob in gcloud")887@_query888query : "SELECT gcloud FROM blobs"889where : "id = $::UUID" : opts.uuid890cb : one_result 'gcloud', (err, x) =>891gcloud = x892cb(err)893(cb) =>894if not gcloud or not COCALC_BLOB_STORE895cb()896return897dbg("delete from gcloud")898@blob_store(gcloud).delete899name : opts.uuid900cb : cb901(cb) =>902dbg("delete from local database")903@_query904query : "DELETE FROM blobs"905where : "id = $::UUID" : opts.uuid906cb : cb907], (err) => opts.cb?(err))908909910911912