Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Backups/Attachments/BackupAttachmentCoordinator.swift
1 views
//
// Copyright 2025 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

extension Notification.Name {

    /// Fire this notification to trigger an asynchronous upload of all attachments in the upload queue
    public static let startBackupAttachmentUploadQueue = Notification.Name("Signal.startBackupAttachmentUploadQueue")
}

/// Coordinates backup attachment operations (upload, download, deletions, reconciliation)
/// which must all be locked relative to each other to prevent races with their async state updates.
///
/// Callers that wish to start or await uploads/downloads should go through this class.
public protocol BackupAttachmentCoordinator {

    // MARK: - Downloads

    /// Restores all pending attachments in the BackupAttachmentDownloadQueue.
    ///
    /// Will keep restoring attachments until there are none left, then returns.
    /// Is cooperatively cancellable; will check and early terminate if the task is cancelled
    /// in between individual attachments.
    ///
    /// Returns immediately if there's no attachments left to restore.
    ///
    /// Each individual attachments has its thumbnail and fullsize data downloaded as appropriate.
    ///
    /// Throws an error IFF something would prevent all attachments from restoring (e.g. network issue).
    func restoreAttachmentsIfNeeded() async throws

    // MARK: - Uploads

    /// Backs up all pending attachments in the BackupAttachmentUploadQueue.
    ///
    /// Will keep backing up attachments until there are none left, then returns.
    /// Is cooperatively cancellable; will check and early terminate if the task is cancelled
    /// in between individual attachments.
    ///
    /// Returns immediately if there's no attachments left to back up.
    ///
    /// Each individual attachments is either freshly uploaded or copied from the transit
    /// tier to the media tier as needed. Thumbnail versions are also created, uploaded, and
    /// backed up as needed.
    ///
    /// Throws an error IFF something would prevent all attachments from backing up (e.g. network issue).
    func backUpAllAttachments(waitOnThumbnails: Bool) async throws

    // MARK: - List Media

    func queryListMediaIfNeeded() async throws

    // MARK: - Orphaning

    /// Run all remote deletions, returning when finished. Supports cooperative cancellation.
    /// Should only be run after backup uploads have finished to avoid races.
    func deleteOrphansIfNeeded() async throws

    // MARK: - Offloading

    /// Walk over all attachments and delete local files for any that are eligible to be
    /// offloaded.
    /// This can be a very expensive operation (e.g. if "optimize local storage" was
    /// just enabled and there's a lot to clean up) so it is best to call this in a
    /// non-user-blocking context, e.g. during an overnight backup BGProcessingTask.
    ///
    /// Supports cooperative cancellation; makes incremental progress if cancelled.
    func offloadAttachmentsIfNeeded() async throws
}

public actor BackupAttachmentCoordinatorImpl: BackupAttachmentCoordinator {

    private let appContext: AppContext
    private let appReadiness: AppReadiness
    private let backupSettingsStore: BackupSettingsStore
    private let db: any DB
    private let downloadRunner: BackupAttachmentDownloadQueueRunner
    private let listMediaManager: BackupListMediaManager
    private let offloadingManager: AttachmentOffloadingManager
    private let orphanRunner: OrphanedBackupAttachmentQueueRunner
    private let orphanStore: OrphanedBackupAttachmentStore
    private nonisolated let tsAccountManager: TSAccountManager
    private nonisolated let uploadRunner: BackupAttachmentUploadQueueRunner

    public init(
        appContext: AppContext,
        appReadiness: AppReadiness,
        backupSettingsStore: BackupSettingsStore,
        db: any DB,
        downloadRunner: BackupAttachmentDownloadQueueRunner,
        listMediaManager: BackupListMediaManager,
        offloadingManager: AttachmentOffloadingManager,
        orphanRunner: OrphanedBackupAttachmentQueueRunner,
        orphanStore: OrphanedBackupAttachmentStore,
        tsAccountManager: TSAccountManager,
        uploadRunner: BackupAttachmentUploadQueueRunner,
    ) {
        self.appContext = appContext
        self.appReadiness = appReadiness
        self.backupSettingsStore = backupSettingsStore
        self.db = db
        self.downloadRunner = downloadRunner
        self.listMediaManager = listMediaManager
        self.offloadingManager = offloadingManager
        self.orphanRunner = orphanRunner
        self.orphanStore = orphanStore
        self.tsAccountManager = tsAccountManager
        self.uploadRunner = uploadRunner

        let weakSelf = Weak(value: self)
        appReadiness.runNowOrWhenMainAppDidBecomeReadyAsync {
            weakSelf.value?.startObservingExternalEvents()
            Task {
                await weakSelf.value?.scheduleOperations([
                    .listMedia,
                    .downloadFullsize,
                    .downloadThumbnail,
                    .uploadFullsize,
                    .uploadThumbnail,
                ])
            }
        }
    }

    // MARK: - Downloads

    public func restoreAttachmentsIfNeeded() async throws {
        try await awaitOperations([.downloadFullsize, .downloadThumbnail])
    }

    // MARK: - Uploads

    public func backUpAllAttachments(waitOnThumbnails: Bool) async throws {
        var operations: [Operation] = [.uploadFullsize]
        if waitOnThumbnails {
            operations.append(.uploadThumbnail)
        }
        try await awaitOperations(operations)
    }

    // MARK: - List Media

    public func queryListMediaIfNeeded() async throws {
        if !(isRunning(.listMedia) || db.read(block: listMediaManager.getNeedsQueryListMedia(tx:))) {
            // Early exit if we don't need to run list media at all and aren't currently running.
            return
        }
        try await self.awaitOperation(.listMedia)
    }

    // MARK: - Orphaning

    public func deleteOrphansIfNeeded() async throws {
        try await self.awaitOperation(.deleteOrphans)
    }

    // MARK: - Offloading

    public func offloadAttachmentsIfNeeded() async throws {
        try await self.awaitOperation(.offloading)
    }

    // MARK: - Status Observation

    private nonisolated func startObservingExternalEvents() {
        NotificationCenter.default.addObserver(
            self,
            selector: #selector(backUpAllAttachmentsFromNotification),
            name: .startBackupAttachmentUploadQueue,
            object: nil,
        )
        NotificationCenter.default.addObserver(
            self,
            selector: #selector(fullsizeDownloadQueueStatusDidChange),
            name: .backupAttachmentDownloadQueueStatusDidChange(mode: .fullsize),
            object: nil,
        )
        NotificationCenter.default.addObserver(
            self,
            selector: #selector(thumbnailDownloadQueueStatusDidChange),
            name: .backupAttachmentDownloadQueueStatusDidChange(mode: .thumbnail),
            object: nil,
        )
        NotificationCenter.default.addObserver(
            self,
            selector: #selector(fullsizeUploadQueueStatusDidChange),
            name: .backupAttachmentUploadQueueStatusDidChange(for: .fullsize),
            object: nil,
        )
        NotificationCenter.default.addObserver(
            self,
            selector: #selector(thumbnailUploadQueueStatusDidChange),
            name: .backupAttachmentUploadQueueStatusDidChange(for: .thumbnail),
            object: nil,
        )
        NotificationCenter.default.addObserver(
            self,
            selector: #selector(backupPlanDidChange),
            name: .backupPlanChanged,
            object: nil,
        )
        NotificationCenter.default.addObserver(
            self,
            selector: #selector(registrationStateDidChange),
            name: .registrationStateDidChange,
            object: nil,
        )
    }

    @objc
    private nonisolated func fullsizeDownloadQueueStatusDidChange() {
        Task { [weak self] in
            await self?.scheduleOperation(.downloadFullsize)
        }
    }

    @objc
    private nonisolated func thumbnailDownloadQueueStatusDidChange() {
        Task { [weak self] in
            await self?.scheduleOperation(.downloadThumbnail)
        }
    }

    @objc
    private nonisolated func fullsizeUploadQueueStatusDidChange() {
        Task { [weak self] in
            await self?.scheduleOperation(.uploadFullsize)
        }
    }

    @objc
    private nonisolated func thumbnailUploadQueueStatusDidChange() {
        Task { [weak self] in
            await self?.scheduleOperation(.uploadThumbnail)
        }
    }

    @objc
    private nonisolated func backupPlanDidChange() {
        Task { [weak self] in
            await self?.scheduleOperations([
                .listMedia,
                .downloadFullsize,
                .downloadThumbnail,
                .uploadFullsize,
                .uploadThumbnail,
            ])
        }
    }

    @objc
    private nonisolated func registrationStateDidChange() {
        Task { [weak self] in
            await self?.scheduleOperations([
                .listMedia,
                .downloadFullsize,
                .downloadThumbnail,
                .uploadFullsize,
                .uploadThumbnail,
            ])
        }
    }

    @objc
    private nonisolated func backUpAllAttachmentsFromNotification() {
        Task { [weak self] in
            await self?.scheduleOperations([.uploadFullsize, .uploadThumbnail])
        }
    }

    // MARK: - State Management

    private struct Observer {
        let id = UUID()
        let continuation = CancellableContinuation<Void>()
    }

    private enum Operation: Hashable, CaseIterable {
        case listMedia
        case downloadFullsize
        case downloadThumbnail
        case uploadFullsize
        case uploadThumbnail
        case deleteOrphans
        case offloading
    }

    private var runningTasks = [Operation: Task<Void, Error>]()
    /// When an operation starts to run, we snapshot the observers at the time.
    /// If an observer is added while running, it goes into pendingObservers instead
    /// so that it can trigger a second run of the operation after the current one finishes.
    private var runningTaskObservers = [Operation: [Observer]]()
    private var pendingObservers = [Operation: [Observer]]()

    private func needsToRun(_ operation: Operation) -> Bool {
        return !(pendingObservers[operation]?.isEmpty ?? true)
    }

    private func isRunning(_ operation: Operation) -> Bool {
        return runningTasks[operation] != nil
    }

    private func scheduleOperation(_ operation: Operation) {
        self.scheduleOperations([operation])
    }

    private func scheduleOperations(_ operations: [Operation]) {
        guard appContext.isMainApp, tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered else {
            return
        }

        operations.forEach { operation in
            self.pendingObservers[operation, default: []] += [Observer()]
        }
        kickOffNextOperation()
    }

    private func awaitOperation(_ operation: Operation) async throws {
        try await awaitOperations([operation])
    }

    private func awaitOperations(_ operations: [Operation]) async throws {
        guard appContext.isMainApp else {
            return
        }

        try Task.checkCancellation()

        let observers = operations.map { operation in
            let observer = Observer()
            self.pendingObservers[operation, default: []] += [observer]
            return (operation, observer)
        }

        kickOffNextOperation()

        let weakSelf = Weak(value: self)
        try await withThrowingTaskGroup { taskGroup in
            for (operation, observer) in observers {
                let observerId = observer.id
                let continuation = observer.continuation
                taskGroup.addTask {
                    try await withTaskCancellationHandler(
                        operation: {
                            try await continuation.wait()
                        },
                        onCancel: {
                            Task {
                                await weakSelf.value?.cancelOperation(
                                    operation,
                                    observerId: observerId,
                                )
                            }
                        },
                    )
                }
            }
            try await taskGroup.waitForAll()
        }
    }

    private func cancelOperation(
        _ operation: Operation,
        observerId: UUID,
    ) {
        var pendingObservers = self.pendingObservers[operation] ?? []
        if let observer = pendingObservers.removeFirst(where: { $0.id == observerId }) {
            self.pendingObservers[operation] = pendingObservers
            observer.continuation.cancel()
            return
        }
        var runningTaskObservers = self.runningTaskObservers[operation] ?? []
        if nil != runningTaskObservers.removeFirst(where: { $0.id == observerId }) {
            self.runningTaskObservers[operation] = runningTaskObservers
            if runningTaskObservers.isEmpty {
                // Cancel the actual operation if this is the only observer.
                self.runningTasks[operation]?.cancel()
            }
        }
    }

    private func kickOffNextOperation() {
        guard appReadiness.isAppReady && appContext.isMainApp && tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered else {
            // We will kick off again when the app is ready; whatever
            // got scheduled will run then.
            return
        }

        // Always check if we need to run list media and do so aggresively
        // even if there are no observers. The other operations rely on this.
        if
            self.pendingObservers[.listMedia] == nil,
            self.runningTasks.isEmpty,
            db.read(block: { listMediaManager.getNeedsQueryListMedia(tx: $0) })
        {
            pendingObservers[.listMedia, default: []] += [Observer()]
        }

        if needsToRun(.listMedia) {
            // List media cannot run in parallel with anything;
            // only run if nothing is running.
            // Other operations stop themselves if list media is
            // needed; when they do we will loop back here and
            // run list media then.
            if self.runningTasks.isEmpty {
                self.runOperation(.listMedia)
            }
            // If we need to list media, we never run anything else.
            // Return now whether we started list media this run
            // loop or not.
            return
        }

        func canRunDeleteOrphans() -> Bool {
            let isRunningExclusiveOperation = runningTasks.contains(where: {
                switch $0.key {
                case .downloadThumbnail, .uploadThumbnail:
                    // These can run in parallel with the rest
                    return false
                case .downloadFullsize:
                    // Downloads and orphaning can run in parallel
                    return false
                case
                    .listMedia, .uploadFullsize,
                    .deleteOrphans, .offloading:
                    return true
                }
            })
            return !isRunningExclusiveOperation
        }

        let hasConsumedMediaTierCapacity = db.read { tx in
            backupSettingsStore.hasConsumedMediaTierCapacity(tx: tx)
        }
        if hasConsumedMediaTierCapacity {
            // If we are out of storage space...
            // * issue deletes if we can, to free up space
            if canRunDeleteOrphans(), needsToRun(.deleteOrphans) {

                let orphanCount = db.read { tx in
                    orphanStore.peek(count: 1, tx: tx).count
                }
                if orphanCount > 0 {
                    self.runOperation(.deleteOrphans)
                    // * do NOT run anything else until we can run deletes
                    // Other operations stop themselves if out of space;
                    // when they do we will loop back here and run deletes.
                    return
                }
            }
        }

        // Thumbnail upload and download can run in parallel with
        // any of the other operations:
        // * no overlap/races with fullsize upload download
        // * orphaning does have a race: delete an attachment, issue
        //   delete request for media tier thumbnail, wait on response,
        //   recreate same media bytes, upload thumbnail, delete finishes.
        //   That deletes the upload, but note this requires an enormous
        //   gap between delete request/response large enough to do a whole
        //   upload so in practice not a concern. (And even if it happens
        //   we just lose a thumbnail which is recoverable from fullsize.)
        // * offloading may generate thumbnails from already-downloaded
        //   fullsize media, but we don't download thumbnails if we
        //   already have fullsize, anyway
        if needsToRun(.downloadThumbnail) && !isRunning(.downloadThumbnail) {
            runOperation(.downloadThumbnail)
        }
        if needsToRun(.uploadThumbnail) && !isRunning(.uploadThumbnail) {
            runOperation(.uploadThumbnail)
        }

        if needsToRun(.uploadFullsize) || needsToRun(.downloadFullsize) {
            // Upload and download can run in parallel, but cannot
            // run if anything else is running.
            let isRunningNonUploadDownload: Bool = runningTasks.contains(where: {
                switch $0.key {
                case
                    .downloadFullsize, .downloadThumbnail,
                    .uploadFullsize, .uploadThumbnail:
                    return false
                case .listMedia, .deleteOrphans, .offloading:
                    return true
                }
            })
            if !isRunningNonUploadDownload {
                if needsToRun(.downloadFullsize), !isRunning(.downloadFullsize) {
                    self.runOperation(.downloadFullsize)
                }
                if needsToRun(.uploadFullsize), !isRunning(.uploadFullsize) {
                    self.runOperation(.uploadFullsize)
                }
            }
        }

        func canRunOffloading() -> Bool {
            let isRunningExclusiveOperation = runningTasks.contains(where: {
                switch $0.key {
                case .downloadThumbnail, .uploadThumbnail:
                    // These can run in parallel with the rest
                    return false
                case
                    .listMedia, .downloadFullsize, .uploadFullsize,
                    .deleteOrphans, .offloading:
                    return true
                }
            })
            return !isRunningExclusiveOperation
        }

        if canRunDeleteOrphans(), needsToRun(.deleteOrphans) {
            self.runOperation(.deleteOrphans)
        } else if canRunOffloading(), needsToRun(.offloading) {
            self.runOperation(.offloading)
        }
    }

    private func runOperation(_ operation: Operation) {
        let task: Task = switch operation {
        case .listMedia:
            Task { [appReadiness, listMediaManager] in
                try await appReadiness.waitForAppReady()
                try await listMediaManager.queryListMediaIfNeeded()
            }
        case .downloadFullsize:
            Task { [appReadiness, downloadRunner] in
                try await appReadiness.waitForAppReady()
                try await downloadRunner.restoreAttachmentsIfNeeded(mode: .fullsize)
            }
        case .downloadThumbnail:
            Task { [appReadiness, downloadRunner] in
                try await appReadiness.waitForAppReady()
                try await downloadRunner.restoreAttachmentsIfNeeded(mode: .thumbnail)
            }
        case .uploadFullsize:
            Task { [appReadiness, uploadRunner] in
                try await appReadiness.waitForAppReady()
                try await uploadRunner.backUpAllAttachments(mode: .fullsize)
            }
        case .uploadThumbnail:
            Task { [appReadiness, uploadRunner] in
                try await appReadiness.waitForAppReady()
                try await uploadRunner.backUpAllAttachments(mode: .thumbnail)
            }
        case .deleteOrphans:
            Task { [appReadiness, orphanRunner] in
                try await appReadiness.waitForAppReady()
                try await orphanRunner.runIfNeeded()
            }
        case .offloading:
            Task { [appReadiness, offloadingManager] in
                try await appReadiness.waitForAppReady()
                try await offloadingManager.offloadAttachmentsIfNeeded()
            }
        }
        self.runningTasks[operation] = task
        self.runningTaskObservers[operation] = self.pendingObservers[operation]
        self.pendingObservers[operation] = nil
        Task { [weak self] in
            let result = await Result(catching: {
                try await task.value
            })
            await self?.didFinishOperation(operation, result)
        }
    }

    private func didFinishOperation(
        _ operation: Operation,
        _ result: Result<Void, Error>,
    ) {
        self.runningTasks[operation] = nil
        switch result {
        case .failure(let error) where error is NeedsListMediaError:
            // If we stopped because we need to list media,
            // do that by inserting a list media operation observer.
            // (Even if there was already an observer, insert a new one.)
            pendingObservers[.listMedia, default: []] += [Observer()]

            // Do not mark the current operation finished, it will
            // run again after list media is done. Mark all observers
            // pending instead.
            pendingObservers[operation, default: []]
                .append(contentsOf: self.runningTaskObservers[operation] ?? [])

            self.kickOffNextOperation()
        case .failure(let error) where error is CancellationError:
            // Any remaining operations won't run due to cancellation, so notify any observers.
            // Related to this, skip calling `kickOffNextOperation` on cancellation to avoid
            // beginning any further operations (e.g. - implicit listMedia operations).
            for operationObservers in pendingObservers.values {
                operationObservers.forEach { observer in
                    observer.continuation.resume(with: result)
                }
            }
        case .success, .failure:
            self.runningTaskObservers[operation]?.forEach { observer in
                observer.continuation.resume(with: result)
            }
            self.runningTaskObservers[operation] = nil
            self.kickOffNextOperation()
        }
    }
}

#if TESTABLE_BUILD

open class MockBackupAttachmentCoordinator: BackupAttachmentCoordinator {
    open func restoreAttachmentsIfNeeded() async throws {
        // Do nothing
    }

    open func backUpAllAttachments(waitOnThumbnails: Bool) async throws {
        // Do nothing
    }

    open func queryListMediaIfNeeded() async throws {
        // Do nothing
    }

    open func deleteOrphansIfNeeded() async throws {
        // Do nothing
    }

    open func offloadAttachmentsIfNeeded() async throws {
        // Do nothing
    }
}

#endif