Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Messages/Attachments/V2/OrphanedAttachments/OrphanedAttachmentCleaner.swift
1 views
//
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import Foundation
import GRDB

/// Observes insertions to the OrphanedAttachmentRecord table and deletes the associated files added to it.
public protocol OrphanedAttachmentCleaner {

    /// Begin observing changes to the ``OrphanedAttachmentRecord`` table.
    /// Should be called on every app launch.
    ///
    /// Whenever a new row is inserted into the table, starts up a job to delete any files
    /// associated with rows in the table (cleaning up a deleted attachment's files) and
    /// removes the row once file deletion is confirmed.
    ///
    /// Also fires immediately to clean up existing rows in the table, if any remained from prior app launches.
    func beginObserving()

    func runUntilFinished() async throws(CancellationError)

    /// Marks pending attachment files for deletion.
    /// Call `releasePendingAttachment` to un-mark the files for deletion
    /// once the attachment has been created.
    ///
    /// This method opens a write transaction and commits the changes; this is required
    /// so that after this method returns attachment files can be safely created/moved at
    /// the target file paths.
    ///
    /// Return the id which can be used to release the pending attachment.
    func commitPendingAttachment(
        _ insertableRecord: OrphanedAttachmentRecord.InsertableRecord,
    ) async -> OrphanedAttachmentRecord.RowId

    /// See commitPendingAttachmentWithSneakyTransaction; does the same thing for
    /// multiple orphan records at once, keyed as chosen by the caller.
    func commitPendingAttachments<Key: Hashable>(
        _ insertableRecords: [Key: OrphanedAttachmentRecord.InsertableRecord],
    ) async -> [Key: OrphanedAttachmentRecord.RowId]

    /// Un-marks a pending attachment for deletion IFF currently marked for deletion.
    ///
    /// If the id is not found, throws an error.
    /// Why? Here is the expected sequence:
    /// 1. Reserve attachment file locations (assign random file UUIDs)
    /// 2. `commitPendingAttachment`
    /// 3. Copy/write files into the reserved file locations
    /// 4. Open write transaction
    /// 5. Create Attachment table row
    /// 6. Call `releasePendingAttachment`
    /// 7. Close write transaction
    ///
    /// If the attachment file(s) get deleted between steps 2 and 4, then this
    /// method will crash in step 6 rolling back the write transaction in step 4/5.
    ///
    /// This ensures that when we reach step 7, either:
    /// A. Step 6 succeeded, attachment is created and not marked for deletion
    /// B. Step 6 failed, everything is rolled back and we start from step 1 again.
    /// There is never a case where step 5 succeeds but we have deleted files,
    /// or step 5 fails but we didn't delete the files.
    func releasePendingAttachment(
        withId: OrphanedAttachmentRecord.RowId,
        tx: DBWriteTransaction,
    )
}

public class OrphanedAttachmentCleanerImpl: OrphanedAttachmentCleaner {

    private let db: DB
    private let taskScheduler: Shims.TaskScheduler

    private var observer: OrphanTableObserver!

    public convenience init(
        db: DB,
    ) {
        self.init(
            db: db,
            fileSystem: Wrappers.OWSFileSystem(),
            taskScheduler: Wrappers.TaskScheduler(),
        )
    }

    init(
        db: DB,
        fileSystem: Shims.OWSFileSystem,
        taskScheduler: Shims.TaskScheduler,
    ) {
        self.db = db
        self.taskScheduler = taskScheduler
        self.observer = OrphanTableObserver(
            jobRunner: JobRunner(
                db: db,
                fileSystem: fileSystem,
                cleaner: self,
            ),
            taskScheduler: taskScheduler,
        )
    }

    public func beginObserving() {
        // Kick off a run immediately for any rows already in the database.
        taskScheduler.task { [observer] in
            try? await observer!.jobRunner.runNextCleanupJob()
        }
        // Begin observing the database for changes.
        db.add(transactionObserver: observer, extent: .observerLifetime)
    }

    public func runUntilFinished() async throws(CancellationError) {
        try await observer.jobRunner.runNextCleanupJob()
    }

    public func commitPendingAttachment(
        _ insertableRecord: OrphanedAttachmentRecord.InsertableRecord,
    ) async -> OrphanedAttachmentRecord.RowId {
        let id = UUID()
        return await commitPendingAttachments([id: insertableRecord])[id]!
    }

    public func commitPendingAttachments<Key: Hashable>(
        _ insertableRecords: [Key: OrphanedAttachmentRecord.InsertableRecord],
    ) async -> [Key: OrphanedAttachmentRecord.RowId] {
        return await db.awaitableWrite { tx in
            var results = [Key: OrphanedAttachmentRecord.RowId]()
            for (key, insertableRecord) in insertableRecords {
                // Ensure we mark this attachment as pending.
                owsPrecondition(insertableRecord.isPendingAttachment, "must be pending")
                let record = OrphanedAttachmentRecord.insertRecord(insertableRecord, tx: tx)
                let id = record.id
                skippedRowIds.update(block: { $0.insert(id) })
                results[key] = id
            }
            return results
        }
    }

    public func releasePendingAttachment(withId id: OrphanedAttachmentRecord.RowId, tx: DBWriteTransaction) {
        let db = tx.database
        let foundRecord = failIfThrows { try OrphanedAttachmentRecord.fetchOne(db, key: id) }
        guard let foundRecord else {
            owsFailDebug("Pending attachment not marked for deletion")
            return
        }
        failIfThrows { try foundRecord.delete(db) }

        // Remove from skipped row ids.
        // This isn't critical; now that the row is gone skipping the id does nothing.
        skippedRowIds.update(block: { $0.remove(id) })
    }

    // Tracks the row ids that should be skipped for the current in-memory process.
    // This can be because they failed to delete, or they are pending attachments.
    // We track these so we can skip them and not block subsequent rows from deletion.
    // We keep this in memory; we will retry on next app launch.
    //
    // Should only be accessed from within a write transaction.
    fileprivate var skippedRowIds = AtomicValue<Set<OrphanedAttachmentRecord.RowId>>(Set(), lock: .init())

    private actor JobRunner {

        private let db: DB
        private nonisolated let fileSystem: Shims.OWSFileSystem
        private weak var cleaner: OrphanedAttachmentCleanerImpl?

        private let taskQueue = ConcurrentTaskQueue(concurrentLimit: 1)

        init(
            db: DB,
            fileSystem: Shims.OWSFileSystem,
            cleaner: OrphanedAttachmentCleanerImpl,
        ) {
            self.db = db
            self.fileSystem = fileSystem
            self.cleaner = cleaner
        }

        func runNextCleanupJob() async throws(CancellationError) {
            try await taskQueue.run {
                await self._runNextCleanupJob()
            }
        }

        private func _runNextCleanupJob() async {
            guard CurrentAppContext().isMainApp else {
                // Don't run the cleaner outside the main app.
                return
            }
            guard let nextRecord = fetchNextOrphanRecord() else {
                return
            }

            do {
                try Task.checkCancellation()
            } catch {
                return
            }
            await Task.yield()

            if nextRecord.isPendingAttachment {
                // This deletion job is potentially racing with the share
                // share extension's attachment sending flow. This job wants to
                // delete the files of the pending attachment being sent.
                //
                // If this job wins, the attachment send will fail (it will throw
                // an error when calling `releasePendingAttachment`).
                // That's recoverable, but its better if the send flow wins.
                // Add a delay to increase the chances of the send flow winning.
                try? await Task.sleep(nanoseconds: 5 * NSEC_PER_SEC)
            }

            let cleaner = self.cleaner

            try? await db.awaitableWrite { tx in
                // Ensure the record is still around; if it was a pending attachment
                // and the send flow finished while this job slept, just skip & exit.
                // This happens within the database write lock to ensure correctness.
                guard try nextRecord.exists(tx.database) else {
                    Logger.info("Skipping since-deleted orphan row")
                    return
                }
                if
                    let skippedRowIds = cleaner?.skippedRowIds,
                    skippedRowIds.get().contains(nextRecord.id)
                {
                    Logger.info("Skipping a marked-as-skipped row id")
                    return
                }
                do {
                    // Delete within the database write lock to ensure we don't
                    // conflict with the pending attachment send flow.
                    try self.deleteFiles(record: nextRecord)
                    _ = try nextRecord.delete(tx.database)
                    Logger.info("Cleaned up orphaned attachment files")
                    return
                } catch {
                    Logger.error("Failed to clean up orphan table row: \(error)")
                    let skipId = nextRecord.id
                    cleaner?.skippedRowIds.update(block: { $0.insert(skipId) })
                }
            }

            // Kick off the next run whether the prior run succeeded or not.
            await _runNextCleanupJob()
        }

        private func fetchNextOrphanRecord() -> OrphanedAttachmentRecord? {
            return db.read { tx -> OrphanedAttachmentRecord? in
                guard let skippedRowIds = cleaner?.skippedRowIds.get(), !skippedRowIds.isEmpty else {
                    return try? OrphanedAttachmentRecord.fetchOne(tx.database)
                }
                let rowIdColumn = Column(OrphanedAttachmentRecord.CodingKeys.id)
                var query: QueryInterfaceRequest<OrphanedAttachmentRecord>?

                let skippedRowIdsForQuery: any Collection<OrphanedAttachmentRecord.RowId>
                let skippedRowIdsForInMemoryFilter: any Collection<OrphanedAttachmentRecord.RowId>
                if skippedRowIds.count > 50 {
                    Logger.warn("Too many skipped row ids!")
                    (
                        skippedRowIdsForQuery,
                        skippedRowIdsForInMemoryFilter,
                    ) = skippedRowIds.split(
                        at: skippedRowIds.index(skippedRowIds.startIndex, offsetBy: 50),
                    )
                } else {
                    skippedRowIdsForQuery = skippedRowIds
                    skippedRowIdsForInMemoryFilter = []
                }

                for skippedRowId in skippedRowIdsForQuery {
                    if let querySoFar = query {
                        query = querySoFar.filter(rowIdColumn != skippedRowId)
                    } else {
                        query = OrphanedAttachmentRecord.filter(rowIdColumn != skippedRowId)
                    }
                }
                if skippedRowIdsForInMemoryFilter.isEmpty {
                    return try? query?.fetchOne(tx.database)
                } else {
                    let cursor = try? query?.fetchCursor(tx.database)
                    while let next = try? cursor?.next() {
                        if !skippedRowIdsForInMemoryFilter.contains(next.id) {
                            return next
                        }
                    }
                    return nil
                }
            }
        }

        private nonisolated func deleteFiles(record: OrphanedAttachmentRecord) throws {
            let relativeFilePaths: [String] = [
                record.localRelativeFilePath,
                record.localRelativeFilePathThumbnail,
                record.localRelativeFilePathAudioWaveform,
                record.localRelativeFilePathVideoStillFrame,
            ].compacted()

            try relativeFilePaths.forEach { relativeFilePath in
                let fileURL = AttachmentStream.absoluteAttachmentFileURL(relativeFilePath: relativeFilePath)
                try fileSystem.deleteFileIfExists(url: fileURL)
            }

            if let localRelativeFilePath = record.localRelativeFilePath {
                // Delete any cached thumbnails as well.
                for quality in AttachmentThumbnailQuality.allCases {
                    let cacheFileUrl = AttachmentThumbnailQuality.thumbnailCacheFileUrl(
                        attachmentLocalRelativeFilePath: localRelativeFilePath,
                        at: quality,
                    )
                    try fileSystem.deleteFileIfExists(url: cacheFileUrl)
                }
            }
        }
    }

    // MARK: - Observation

    private class OrphanTableObserver: TransactionObserver {

        fileprivate let jobRunner: JobRunner
        private let taskScheduler: Shims.TaskScheduler

        lazy var runNextCleanupJobEvent = DebouncedEvents.build(
            mode: .lastOnly,
            maxFrequencySeconds: 1.0,
            onQueue: .sharedUserInitiated,
            notifyBlock: { [weak self] in
                self?.taskScheduler.task { [weak self] in
                    try await self?.jobRunner.runNextCleanupJob()
                }
            },
        )

        init(
            jobRunner: JobRunner,
            taskScheduler: Shims.TaskScheduler,
        ) {
            self.jobRunner = jobRunner
            self.taskScheduler = taskScheduler
        }

        func observes(eventsOfKind eventKind: DatabaseEventKind) -> Bool {
            switch eventKind {
            case .insert:
                return eventKind.tableName == OrphanedAttachmentRecord.databaseTableName
            case .delete, .update:
                return false
            }
        }

        /// `observes(eventsOfKind:)` filtering _only_ applies to `databaseDidChange`,  _not_ `databaseDidCommit`.
        /// We want to filter, but only want to _do_ anything after the changes commit.
        /// Use this bool to track when the filter is passed (didChange) so we know whether to do anything on didCommit .
        private var shouldRunOnNextCommit = false

        func databaseDidChange(with event: DatabaseEvent) {
            shouldRunOnNextCommit = true
        }

        func databaseDidCommit(_ db: GRDB.Database) {
            guard shouldRunOnNextCommit else {
                return
            }
            shouldRunOnNextCommit = false

            // When we get a matching event, run the next job _after_ committing.
            // The job should pick up whatever new row(s) got added to the table.
            runNextCleanupJobEvent.requestNotify()
        }

        func databaseDidRollback(_ db: GRDB.Database) {}
    }
}

extension OrphanedAttachmentCleanerImpl {
    public enum Shims {
        public typealias OWSFileSystem = _OrphanedAttachmentCleanerImpl_OWSFileSystemShim
        public typealias TaskScheduler = _OrphanedAttachmentCleanerImpl_TaskSchedulerShim
    }

    public enum Wrappers {
        public typealias OWSFileSystem = _OrphanedAttachmentCleanerImpl_OWSFileSystemWrapper
        public typealias TaskScheduler = _OrphanedAttachmentCleanerImpl_TaskSchedulerWrapper
    }
}

public protocol _OrphanedAttachmentCleanerImpl_OWSFileSystemShim {

    func deleteFileIfExists(url: URL) throws
}

public class _OrphanedAttachmentCleanerImpl_OWSFileSystemWrapper: _OrphanedAttachmentCleanerImpl_OWSFileSystemShim {

    public init() {}

    public func deleteFileIfExists(url: URL) throws {
        try OWSFileSystem.deleteFileIfExists(url: url)
    }
}

public protocol _OrphanedAttachmentCleanerImpl_TaskSchedulerShim {

    func task(_ block: @Sendable @escaping () async throws -> Void)
}

public class _OrphanedAttachmentCleanerImpl_TaskSchedulerWrapper: _OrphanedAttachmentCleanerImpl_TaskSchedulerShim {

    public init() {}

    public func task(_ block: @Sendable @escaping () async throws -> Void) {
        Task(operation: block)
    }
}