Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Backups/Attachments/OrphanedBackupAttachmentQueueRunner.swift
1 views
//
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import GRDB

/// In charge of deleting attachments off the backup cdn after they've been deleted locally (or otherwise orphaned).
public protocol OrphanedBackupAttachmentQueueRunner {

    /// Run all remote deletions, returning when finished. Supports cooperative cancellation.
    /// Should only be run after backup uploads have finished to avoid races.
    func runIfNeeded() async throws
}

public class OrphanedBackupAttachmentQueueRunnerImpl: OrphanedBackupAttachmentQueueRunner {

    private let appReadiness: AppReadiness
    private let taskQueue: TaskQueueLoader<TaskRunner>
    private let tsAccountManager: TSAccountManager

    public init(
        accountKeyStore: AccountKeyStore,
        appReadiness: AppReadiness,
        attachmentStore: AttachmentStore,
        backupRequestManager: BackupRequestManager,
        backupSettingsStore: BackupSettingsStore,
        dateProvider: @escaping DateProvider,
        db: any DB,
        listMediaManager: BackupListMediaManager,
        orphanedBackupAttachmentStore: OrphanedBackupAttachmentStore,
        tsAccountManager: TSAccountManager,
    ) {
        self.appReadiness = appReadiness
        self.tsAccountManager = tsAccountManager
        let taskRunner = TaskRunner(
            accountKeyStore: accountKeyStore,
            attachmentStore: attachmentStore,
            backupRequestManager: backupRequestManager,
            backupSettingsStore: backupSettingsStore,
            db: db,
            listMediaManager: listMediaManager,
            orphanedBackupAttachmentStore: orphanedBackupAttachmentStore,
            tsAccountManager: tsAccountManager,
        )
        self.taskQueue = TaskQueueLoader(
            maxConcurrentTasks: 8,
            dateProvider: dateProvider,
            db: db,
            runner: taskRunner,
        )
    }

    public func runIfNeeded() async throws {
        try await appReadiness.waitForAppReady()
        try Task.checkCancellation()
        guard tsAccountManager.localIdentifiersWithMaybeSneakyTransaction != nil else {
            return
        }
        try await taskQueue.loadAndRunTasks()
    }

    // MARK: - TaskRecordRunner

    private final class TaskRunner: TaskRecordRunner {

        private let accountKeyStore: AccountKeyStore
        private let attachmentStore: AttachmentStore
        private let backupRequestManager: BackupRequestManager
        private let backupSettingsStore: BackupSettingsStore
        private let db: any DB
        private let listMediaManager: BackupListMediaManager
        private let orphanedBackupAttachmentStore: OrphanedBackupAttachmentStore
        private let tsAccountManager: TSAccountManager

        let store: TaskStore

        init(
            accountKeyStore: AccountKeyStore,
            attachmentStore: AttachmentStore,
            backupRequestManager: BackupRequestManager,
            backupSettingsStore: BackupSettingsStore,
            db: any DB,
            listMediaManager: BackupListMediaManager,
            orphanedBackupAttachmentStore: OrphanedBackupAttachmentStore,
            tsAccountManager: TSAccountManager,
        ) {
            self.accountKeyStore = accountKeyStore
            self.attachmentStore = attachmentStore
            self.backupRequestManager = backupRequestManager
            self.backupSettingsStore = backupSettingsStore
            self.db = db
            self.listMediaManager = listMediaManager
            self.orphanedBackupAttachmentStore = orphanedBackupAttachmentStore
            self.tsAccountManager = tsAccountManager

            self.store = TaskStore(orphanedBackupAttachmentStore: orphanedBackupAttachmentStore)
        }

        private actor ErrorCounts {
            var counts = [TaskRecord.IDType: Int]()

            func updateCount(_ id: TaskRecord.IDType) -> Int {
                let count = (counts[id] ?? 0) + 1
                counts[id] = count
                return count
            }
        }

        private let errorCounts = ErrorCounts()

        func runTask(record: Store.Record, loader: TaskQueueLoader<TaskRunner>) async -> TaskRecordResult {
            let (
                localAci,
                registrationState,
                mediaRootBackupKey,
                needsListMedia,
            ) = db.read { tx in
                return (
                    tsAccountManager.localIdentifiers(tx: tx)?.aci,
                    tsAccountManager.registrationState(tx: tx),
                    accountKeyStore.getMediaRootBackupKey(tx: tx),
                    self.listMediaManager.getNeedsQueryListMedia(tx: tx),
                )
            }

            switch registrationState {
            case
                .unregistered,
                .reregistering,
                .deregistered,
                .transferringIncoming,
                .transferringPrimaryOutgoing,
                .transferred:
                // These states are potentially temporary. Return a retryable error
                // but cancel the task.
                Logger.info("Stopping when unregistered")
                let error = OWSRetryableError()
                try? await loader.stop(reason: error)
                return .retryableError(error)
            case
                .relinking,
                .delinked,
                .transferringLinkedOutgoing,
                .provisioned:
                // Linked devices never issue these delete requests.
                // Cancel the task so we never run it again.
                return .obsolete
            case .registered:
                break
            }

            guard let localAci else {
                let error = OWSAssertionError("Deleting without being registered")
                try? await loader.stop(reason: error)
                return .retryableError(error)
            }

            guard let mediaRootBackupKey else {
                let error = OWSAssertionError("Deleting without being registered")
                try? await loader.stop(reason: error)
                return .retryableError(error)
            }

            if needsListMedia {
                // If we need to list media, quit out early so we can do that.
                try? await loader.stop(reason: NeedsListMediaError())
                return .retryableError(NeedsListMediaError())
            }

            let mediaId: Data

            if let recordMediaId = record.record.mediaId {
                mediaId = recordMediaId
            } else if let type = record.record.type, let mediaName = record.record.mediaName {
                let mediaNameToUse: String
                switch type {
                case .fullsize:
                    mediaNameToUse = mediaName
                case .thumbnail:
                    mediaNameToUse = AttachmentBackupThumbnail.thumbnailMediaName(fullsizeMediaName: mediaName)
                }

                do {
                    mediaId = try mediaRootBackupKey.mediaEncryptionMetadata(
                        mediaName: mediaNameToUse,
                        // Doesn't matter what we use, we just want the mediaId.
                        type: .outerLayerFullsizeOrThumbnail,
                    ).mediaId
                } catch let error {
                    Logger.error("Failed to generate media IDs")
                    return .unretryableError(error)
                }
            } else {
                return .unretryableError(OWSAssertionError("Invalid record"))
            }

            let backupAuth: BackupServiceAuth
            do {
                backupAuth = try await backupRequestManager.fetchBackupServiceAuth(
                    for: mediaRootBackupKey,
                    localAci: localAci,
                    auth: .implicit(),
                    logger: .empty(), // TODO: [Logging]
                )
            } catch let error {
                try? await loader.stop(reason: error)
                return .unretryableError(error)
            }

            do {
                try await backupRequestManager.deleteMediaObjects(
                    objects: [BackupArchive.Request.DeleteMediaTarget(
                        cdn: record.record.cdnNumber,
                        mediaId: mediaId,
                    )],
                    auth: backupAuth,
                    logger: .empty(), // TODO: [Logging]
                )
            } catch let error {
                if error.isNetworkFailureOrTimeout {
                    let errorCount = await errorCounts.updateCount(record.id)
                    if error.isRetryable, errorCount < Constants.maxRetryableErrorCount {
                        return .retryableError(error)
                    } else {
                        return .unretryableError(error)
                    }
                } else {
                    return .unretryableError(error)
                }
            }

            return .success
        }

        func didSucceed(record: Store.Record, tx: DBWriteTransaction) throws {
            Logger.info("Finished deleting backup attachment \(record.id)")

            // Any time we successfully delete anything on remote cdn, optimistically wipe
            // the local state saying we've consumed all media tier quota; we will set it
            // again if the server tells us we're still out of space on next upload attempt.
            backupSettingsStore.setHasConsumedMediaTierCapacity(false, tx: tx)
        }

        func didFail(record: Store.Record, error: any Error, isRetryable: Bool, tx: DBWriteTransaction) throws {
            Logger.warn("Failed deleting backup attachment \(record.id), isRetryable: \(isRetryable), error: \(error)")
        }

        func didObsolete(record: Store.Record, tx: DBWriteTransaction) throws {
            Logger.info("Obsoleted deleting backup attachment \(record.id)")
        }
    }

    // MARK: - TaskRecordStore

    struct TaskRecord: SignalServiceKit.TaskRecord {
        let id: OrphanedBackupAttachment.IDType
        let record: OrphanedBackupAttachment
    }

    class TaskStore: TaskRecordStore {

        private let orphanedBackupAttachmentStore: OrphanedBackupAttachmentStore

        init(orphanedBackupAttachmentStore: OrphanedBackupAttachmentStore) {
            self.orphanedBackupAttachmentStore = orphanedBackupAttachmentStore
        }

        func peek(count: UInt, tx: DBReadTransaction) -> [TaskRecord] {
            return orphanedBackupAttachmentStore.peek(count: count, tx: tx).map { record in
                return TaskRecord(id: record.id!, record: record)
            }
        }

        func removeRecord(_ record: TaskRecord, tx: DBWriteTransaction) {
            orphanedBackupAttachmentStore.remove(record.record, tx: tx)
        }
    }

    private enum Constants {
        static let maxRetryableErrorCount = 2
    }
}

#if TESTABLE_BUILD

open class OrphanedBackupAttachmentQueueRunnerMock: OrphanedBackupAttachmentQueueRunner {

    public init() {}

    open func runIfNeeded() async throws {
        // Do nothing
    }
}

#endif