Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Backups/Attachments/BackupAttachmentDownloadQueueRunner.swift
1 views
//
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import Foundation
import GRDB
import LibSignalClient

public protocol BackupAttachmentDownloadQueueRunner {

    /// Restores all pending attachments in the BackupAttachmentDownloadQueue.
    ///
    /// Will keep restoring attachments until there are none left, then returns.
    /// Is cooperatively cancellable; will check and early terminate if the task is cancelled
    /// in between individual attachments.
    ///
    /// Returns immediately if there's no attachments left to restore.
    ///
    /// Throws an error IFF something would prevent all attachments from restoring (e.g. network issue).
    func restoreAttachmentsIfNeeded(mode: BackupAttachmentDownloadQueueMode) async throws
}

class BackupAttachmentDownloadQueueRunnerImpl: BackupAttachmentDownloadQueueRunner {

    private let appContext: AppContext
    private let attachmentStore: AttachmentStore
    private let backupAttachmentDownloadStore: BackupAttachmentDownloadStore
    private let backupMediaErrorNotificationPresenter: BackupMediaErrorNotificationPresenter
    private let backupSettingsStore: BackupSettingsStore
    private let dateProvider: DateProvider
    private let db: any DB
    private let logger: PrefixedLogger
    private let mediaBandwidthPreferenceStore: MediaBandwidthPreferenceStore
    private let progress: BackupAttachmentDownloadProgress
    private let remoteConfigProvider: RemoteConfigProvider
    private let statusManager: BackupAttachmentDownloadQueueStatusManager
    private let tsAccountManager: TSAccountManager

    private let fullsizeTaskQueue: TaskQueueLoader<TaskRunner>
    private let thumbnailTaskQueue: TaskQueueLoader<TaskRunner>

    init(
        appContext: AppContext,
        attachmentStore: AttachmentStore,
        attachmentDownloadManager: AttachmentDownloadManager,
        attachmentUploadStore: AttachmentUploadStore,
        backupAttachmentDownloadStore: BackupAttachmentDownloadStore,
        backupAttachmentUploadScheduler: BackupAttachmentUploadScheduler,
        backupMediaErrorNotificationPresenter: BackupMediaErrorNotificationPresenter,
        backupListMediaManager: BackupListMediaManager,
        backupSettingsStore: BackupSettingsStore,
        dateProvider: @escaping DateProvider,
        db: any DB,
        mediaBandwidthPreferenceStore: MediaBandwidthPreferenceStore,
        progress: BackupAttachmentDownloadProgress,
        remoteConfigProvider: RemoteConfigProvider,
        statusManager: BackupAttachmentDownloadQueueStatusManager,
        tsAccountManager: TSAccountManager,
    ) {
        let logger = PrefixedLogger(prefix: "[Backups]")

        self.appContext = appContext
        self.attachmentStore = attachmentStore
        self.backupAttachmentDownloadStore = backupAttachmentDownloadStore
        self.backupMediaErrorNotificationPresenter = backupMediaErrorNotificationPresenter
        self.logger = logger
        self.backupSettingsStore = backupSettingsStore
        self.dateProvider = dateProvider
        self.db = db
        self.mediaBandwidthPreferenceStore = mediaBandwidthPreferenceStore
        self.progress = progress
        self.remoteConfigProvider = remoteConfigProvider
        self.statusManager = statusManager
        self.tsAccountManager = tsAccountManager

        func taskQueue(mode: BackupAttachmentDownloadQueueMode) -> TaskQueueLoader<TaskRunner> {
            let taskRunner = TaskRunner(
                mode: mode,
                attachmentStore: attachmentStore,
                attachmentDownloadManager: attachmentDownloadManager,
                attachmentUploadStore: attachmentUploadStore,
                backupAttachmentDownloadStore: backupAttachmentDownloadStore,
                backupAttachmentUploadScheduler: backupAttachmentUploadScheduler,
                backupMediaErrorNotificationPresenter: backupMediaErrorNotificationPresenter,
                backupSettingsStore: backupSettingsStore,
                dateProvider: dateProvider,
                db: db,
                listMediaManager: backupListMediaManager,
                logger: logger,
                mediaBandwidthPreferenceStore: mediaBandwidthPreferenceStore,
                progress: progress,
                remoteConfigProvider: remoteConfigProvider,
                statusManager: statusManager,
                tsAccountManager: tsAccountManager,
            )
            return TaskQueueLoader(
                maxConcurrentTasks: {
                    switch mode {
                    case .thumbnail: Constants.numParallelDownloadsThumbnail
                    case .fullsize: Constants.numParallelDownloadsFullsize
                    }
                }(),
                dateProvider: dateProvider,
                db: db,
                runner: taskRunner,
            )
        }

        self.fullsizeTaskQueue = taskQueue(mode: .fullsize)
        self.thumbnailTaskQueue = taskQueue(mode: .thumbnail)
    }

    func restoreAttachmentsIfNeeded(mode: BackupAttachmentDownloadQueueMode) async throws {
        guard appContext.isMainApp else { return }

        let taskQueue: TaskQueueLoader<TaskRunner>
        let logString: String
        switch mode {
        case .fullsize:
            taskQueue = fullsizeTaskQueue
            logString = "fullsize"
        case .thumbnail:
            taskQueue = thumbnailTaskQueue
            logString = "thumbnail"
        }

        switch await statusManager.beginObservingIfNecessary(for: mode) {
        case .running:
            logger.info("Starting \(logString) backup attachment downloads")
        case .suspended:
            // The queue will stop on its own if suspended
            logger.info("Skipping \(logString) backup attachment downloads while suspended")
            return
        case .empty:
            // The queue will stop on its own if empty.
            logger.info("\(logString) backup attachment download queue empty!")
            return
        case .notRegisteredAndReady:
            try await taskQueue.stop()
            return
        case .noWifiReachability:
            logger.info("Skipping \(logString) backup attachment downloads while not reachable by wifi")
            try await taskQueue.stop()
            return
        case .noReachability:
            logger.info("Skipping \(logString) backup attachment downloads while not reachable at all")
            try await taskQueue.stop()
            return
        case .lowBattery:
            logger.info("Skipping \(logString) backup attachment downloads while low battery")
            try await taskQueue.stop()
            return
        case .lowPowerMode:
            logger.info("Skipping \(logString) backup attachment downloads while low power mode")
            try await taskQueue.stop()
            return
        case .lowDiskSpace:
            logger.info("Skipping \(logString) backup attachment downloads while low on disk space")
            try await taskQueue.stop()
            return
        case .appBackgrounded:
            logger.info("Skipping \(logString) backup attachment downloads while backgrounded")
            try await taskQueue.stop()
            return
        }

        do {
            try await progress.beginObserving()
        } catch {
            owsFailDebug("Unable to observe \(logString) download progres \(error.grdbErrorForLogging)")
        }

        let backgroundTask = OWSBackgroundTask(
            label: #function + logString,
        ) { [weak taskQueue] status in
            switch status {
            case .expired:
                Task {
                    try? await taskQueue?.stop()
                }
            case .couldNotStart, .success:
                break
            }
        }
        defer { backgroundTask.end() }

        try await taskQueue.loadAndRunTasks()

        logger.info("Finished \(logString) backup attachment downloads")
    }

    // MARK: - TaskRecordRunner

    private final class TaskRunner: TaskRecordRunner {

        private let attachmentStore: AttachmentStore
        private let attachmentDownloadManager: AttachmentDownloadManager
        private let attachmentUploadStore: AttachmentUploadStore
        private let backupAttachmentDownloadStore: BackupAttachmentDownloadStore
        private let backupAttachmentUploadScheduler: BackupAttachmentUploadScheduler
        private let backupMediaErrorNotificationPresenter: BackupMediaErrorNotificationPresenter
        private let backupSettingsStore: BackupSettingsStore
        private let dateProvider: DateProvider
        private let db: any DB
        private let listMediaManager: BackupListMediaManager
        private let logger: PrefixedLogger
        private let mediaBandwidthPreferenceStore: MediaBandwidthPreferenceStore
        private let progress: BackupAttachmentDownloadProgress
        private let remoteConfigProvider: RemoteConfigProvider
        private let statusManager: BackupAttachmentDownloadQueueStatusManager
        private let tsAccountManager: TSAccountManager

        private lazy var availableDiskSpaceCheck = DebouncedEvents.build(
            mode: .lastOnly,
            maxFrequencySeconds: 1.0,
            onQueue: .sharedUserInitiated,
            notifyBlock: { [weak self] in
                Task {
                    await self?.statusManager.checkAvailableDiskSpace(clearPreviousOutOfSpaceErrors: false)
                }
            },
        )

        let store: TaskStore

        private let mode: BackupAttachmentDownloadQueueMode

        init(
            mode: BackupAttachmentDownloadQueueMode,
            attachmentStore: AttachmentStore,
            attachmentDownloadManager: AttachmentDownloadManager,
            attachmentUploadStore: AttachmentUploadStore,
            backupAttachmentDownloadStore: BackupAttachmentDownloadStore,
            backupAttachmentUploadScheduler: BackupAttachmentUploadScheduler,
            backupMediaErrorNotificationPresenter: BackupMediaErrorNotificationPresenter,
            backupSettingsStore: BackupSettingsStore,
            dateProvider: @escaping DateProvider,
            db: any DB,
            listMediaManager: BackupListMediaManager,
            logger: PrefixedLogger,
            mediaBandwidthPreferenceStore: MediaBandwidthPreferenceStore,
            progress: BackupAttachmentDownloadProgress,
            remoteConfigProvider: RemoteConfigProvider,
            statusManager: BackupAttachmentDownloadQueueStatusManager,
            tsAccountManager: TSAccountManager,
        ) {
            self.mode = mode
            self.attachmentStore = attachmentStore
            self.attachmentDownloadManager = attachmentDownloadManager
            self.attachmentUploadStore = attachmentUploadStore
            self.backupAttachmentDownloadStore = backupAttachmentDownloadStore
            self.backupAttachmentUploadScheduler = backupAttachmentUploadScheduler
            self.backupMediaErrorNotificationPresenter = backupMediaErrorNotificationPresenter
            self.backupSettingsStore = backupSettingsStore
            self.dateProvider = dateProvider
            self.db = db
            self.listMediaManager = listMediaManager
            self.logger = logger
            self.mediaBandwidthPreferenceStore = mediaBandwidthPreferenceStore
            self.progress = progress
            self.remoteConfigProvider = remoteConfigProvider
            self.statusManager = statusManager
            self.tsAccountManager = tsAccountManager

            self.store = TaskStore(
                mode: mode,
                backupAttachmentDownloadStore: backupAttachmentDownloadStore,
            )
        }

        func runTask(record: Store.Record, loader: TaskQueueLoader<TaskRunner>) async -> TaskRecordResult {
            struct SuspendedError: Error {}
            struct NeedsDiskSpaceError: Error {}
            struct NeedsBatteryError: Error {}
            struct AppBackgroundedError: Error {}
            struct NeedsInternetError: Error {}
            struct NeedsToBeRegisteredError: Error {}

            availableDiskSpaceCheck.requestNotify()

            let (status, statusToken) = await statusManager.currentStatusAndToken(for: mode)
            switch status {
            case .running:
                break
            case .empty:
                // The queue will stop on its own, finish this task.
                break
            case .suspended:
                try? await loader.stop()
                return .retryableError(SuspendedError())
            case .lowDiskSpace:
                try? await loader.stop()
                return .retryableError(NeedsDiskSpaceError())
            case .lowBattery, .lowPowerMode:
                try? await loader.stop()
                return .retryableError(NeedsBatteryError())
            case .appBackgrounded:
                try? await loader.stop()
                return .retryableError(AppBackgroundedError())
            case .noWifiReachability, .noReachability:
                try? await loader.stop()
                return .retryableError(NeedsInternetError())
            case .notRegisteredAndReady:
                try? await loader.stop()
                return .retryableError(NeedsToBeRegisteredError())
            }

            let (
                attachment,
                backupPlan,
                registrationState,
                needsListMedia,
            ) = db.read { tx -> (Attachment?, BackupPlan, TSRegistrationState, Bool) in
                return (
                    attachmentStore.fetch(id: record.record.attachmentRowId, tx: tx),
                    backupSettingsStore.backupPlan(tx: tx),
                    tsAccountManager.registrationState(tx: tx),
                    self.listMediaManager.getNeedsQueryListMedia(tx: tx),
                )
            }

            if needsListMedia {
                // If we need to list media, quit out early so we can do that.
                try? await loader.stop(reason: NeedsListMediaError())
                return .retryableError(NeedsListMediaError())
            }

            guard let attachment else {
                return .obsolete
            }

            let progressSink: OWSProgressSink?
            if record.record.isThumbnail {
                progressSink = nil
            } else {
                progressSink = await progress.willBeginDownloadingFullsizeAttachment(
                    withId: record.record.attachmentRowId,
                )
            }

            let nowMs = dateProvider().ows_millisecondsSince1970
            let remoteConfig = remoteConfigProvider.currentConfig()
            let eligibility = BackupAttachmentDownloadEligibility.forAttachment(
                attachment,
                downloadRecord: record.record,
                currentTimestamp: nowMs,
                backupPlan: backupPlan,
                remoteConfig: remoteConfig,
                isPrimaryDevice: registrationState.isRegisteredPrimaryDevice,
            )

            struct NoLongerEligibleError: Error {}
            let relevantEligibilityState: QueuedBackupAttachmentDownload.State? = {
                if record.record.isThumbnail {
                    return eligibility.thumbnailMediaTierState
                } else {
                    return eligibility.fullsizeState
                }
            }()
            switch relevantEligibilityState {
            case .ready:
                break
            case nil:
                // No longer at all eligible to download from this source.
                // count this as having completed the download for progress tracking purposes.
                if !record.record.isThumbnail {
                    await progress.didFinishDownloadOfFullsizeAttachment(
                        withId: record.record.attachmentRowId,
                        byteCount: UInt64(record.record.estimatedByteCount),
                    )
                }
                return .obsolete
            case .ineligible:
                // Current state prevents running this row; unclear how we
                // got here but mark it ineligible in the queue now and return
                // a a "retryable" error so we don't wipe this row from the queue.
                // Since its now ineligible it will be skipped going forward.
                Logger.info("Marking \(attachment.id) ineligible and skipping download")
                await db.awaitableWrite { tx in
                    backupAttachmentDownloadStore.markIneligible(
                        attachmentId: attachment.id,
                        thumbnail: record.record.isThumbnail,
                        tx: tx,
                    )
                }
                return .retryableError(NoLongerEligibleError())
            case .done:
                // All done! Mark done and treat it as a "retryable" error
                // so we don't wipe this row from the queue. Since its now
                // done it will be skipped going forward.
                await db.awaitableWrite { tx in
                    backupAttachmentDownloadStore.markDone(
                        attachmentId: attachment.id,
                        thumbnail: record.record.isThumbnail,
                        tx: tx,
                    )
                }
                // count this as having completed the download.
                if !record.record.isThumbnail {
                    await progress.didFinishDownloadOfFullsizeAttachment(
                        withId: record.record.attachmentRowId,
                        byteCount: UInt64(record.record.estimatedByteCount),
                    )
                }
                return .retryableError(NoLongerEligibleError())
            }

            let source: DownloadSource = {
                if record.record.isThumbnail {
                    return .mediaTierThumbnail
                }
                if
                    eligibility.fullsizeMediaTierState == .ready,
                    // Try media tier once first if available
                    record.record.numRetries == 0
                {
                    return .mediaTierFullsize
                } else if
                    let transitTierInfo = attachment.latestTransitTierInfo,
                    eligibility.fullsizeMediaTierState != .ready
                    || record.record.numRetries == 1,
                    eligibility.fullsizeTransitTierState == .ready
                {
                    // Otherwise try transit tier if media tier has failed once before.
                    return .transitTier(transitTierInfo)
                } else {
                    // And then fall back to media tier.
                    return .mediaTierFullsize
                }
            }()

            do {
                try await self.attachmentDownloadManager.downloadAttachment(
                    id: record.record.attachmentRowId,
                    priority: .backupRestore,
                    source: source.asSourceType,
                    progress: progressSink,
                )
            } catch let error {
                if Task.isCancelled {
                    logger.info("Cancelled; stopping the queue")
                    try? await loader.stop(reason: CancellationError())
                    return .retryableError(CancellationError())
                }

                switch await statusManager.jobDidExperienceError(error, token: statusToken, mode: mode) {
                case nil:
                    // No state change, keep going.
                    break
                case .running:
                    break
                case .empty:
                    // The queue will stop on its own, finish this task.
                    break
                case .suspended, .lowDiskSpace, .lowBattery, .lowPowerMode, .noWifiReachability, .noReachability, .appBackgrounded, .notRegisteredAndReady:
                    // Stop the queue now proactively.
                    try? await loader.stop()
                }

                switch error as? AttachmentDownloads.Error {
                case nil, .expiredCredentials:
                    break
                case .blockedByAutoDownloadSettings:
                    owsFailDebug("Backup downloads should never be blocked by auto download settings!")
                    // This should be impossible. Stop the queue, it can start up again later
                    // on whatever the next trigger is.
                    await backupMediaErrorNotificationPresenter.notifyIfNecessary()
                    try? await loader.stop()
                    return .retryableError(error)
                case .blockedByActiveCall:
                    // TODO: [Backups] suspend downloads during calls and resume after
                    owsFailDebug("Backup downloads should never be blocked by active calls!")
                    await backupMediaErrorNotificationPresenter.notifyIfNecessary()
                    try? await loader.stop()
                    return .retryableError(error)
                case .blockedByPendingMessageRequest:
                    switch source {
                    case .transitTier:
                        // Don't do transit tier downloads of message request state chats.
                        // When we restore transit tier download info from a backup, we don't
                        // know if the attachment had been previously downloaded (we'd back up
                        // the transit tier info even if it hadn't been) and, if it hadn't, we
                        // should not auto-download stuff in message request state.
                        return .unretryableError(error)
                    case .mediaTierFullsize, .mediaTierThumbnail:
                        owsFailDebug("Media tier downloads should never be blocked by message request state!")
                        await backupMediaErrorNotificationPresenter.notifyIfNecessary()
                        // This should be impossible. Stop the queue, it can start up again later
                        // on whatever the next trigger is.
                        try? await loader.stop()
                        return .retryableError(error)
                    }
                case .blockedByNetworkState:
                    // The attachment download is the thing that discovered incompatible reachability state
                    // (e.g. we need wifi and aren't connected). Stop the queue; the status should
                    // catch up momentarily and notify when reachability state changes.
                    Logger.warn("Download failed due to reachability; proactively stopping the queue")
                    try? await loader.stop()
                    return .retryableError(error)
                }

                // We only retry fullsize media tier 404s.
                // Retries work one of two ways: we first fall back to transit tier
                // if possible with no backoff, then only if we're on a linked device
                // we retry media tier with some delay. The latter retry is because the
                // primary might still be working on uploading the attachment to media tier.
                func canRetryMediaTier404() -> Bool {
                    db.read { tx in
                        guard tsAccountManager.registrationState(tx: tx).isPrimaryDevice == false else {
                            return false
                        }
                        switch backupSettingsStore.backupPlan(tx: tx) {
                        case .disabling, .disabled, .free:
                            // The primary would only be uploading if were paid tier.
                            // (this is inexact but the user can always tap to download)
                            return false
                        case .paid, .paidExpiringSoon, .paidAsTester:
                            break
                        }
                        guard let attachment = attachmentStore.fetch(id: record.record.attachmentRowId, tx: tx) else {
                            return false
                        }
                        return attachment.mediaTierInfo != nil
                            // If we had a cdn number, that came from the primary, and the
                            // primary therefore _thinks_ its uploaded, and won't upload again.
                            // That or we discovered this via list media and its gone now.
                            && attachment.mediaTierInfo?.cdnNumber == nil
                    }
                }

                if
                    !record.record.isThumbnail,
                    record.record.numRetries == 0,
                    eligibility.fullsizeTransitTierState == .ready,
                    source == .mediaTierFullsize
                {
                    // Retry as transit tier. If we wouldn't have retried as media tier anyway,
                    // wipe the media tier info so that we reupload in the future.
                    return .retryableError(RetryAsTransitTierError(
                        shouldWipeMediaTierInfo: error.httpStatusCode == 404 && !canRetryMediaTier404(),
                    ))
                } else if
                    error.httpStatusCode == 404,
                    !record.record.isThumbnail,
                    record.record.canDownloadFromMediaTier,
                    canRetryMediaTier404(),
                    let nextRetryTimestamp = { () -> UInt64? in
                        guard record.record.numRetries < 32 else {
                            owsFailDebug("Too many retries!")
                            return nil
                        }
                        // Exponential backoff, starting at 1 day.
                        let delay = OWSOperation.retryIntervalForExponentialBackoff(
                            failureCount: record.record.numRetries,
                            minAverageBackoff: .day,
                            maxAverageBackoff: .day * 30,
                        )
                        return dateProvider().addingTimeInterval(delay).ows_millisecondsSince1970
                    }()
                {
                    return .retryableError(RetryMediaTierError(nextRetryTimestamp: nextRetryTimestamp))
                } else if error.httpStatusCode == 404 {
                    return .unretryableError(Unretryable404Error(source: source))
                } else if error.is5xxServiceResponse || error.isNetworkFailureOrTimeout {
                    // These suspend the queue status so just treat the row as retryable
                    return .retryableError(error)
                } else {
                    return .unretryableError(error)
                }
            }

            await statusManager.jobDidSucceed(token: statusToken, mode: mode)

            if !record.record.isThumbnail {
                await progress.didFinishDownloadOfFullsizeAttachment(
                    withId: record.record.attachmentRowId,
                    byteCount: UInt64(record.record.estimatedByteCount),
                )
            }

            return .success
        }

        func didSucceed(record: Store.Record, tx: DBWriteTransaction) {
            logger.info("Finished restoring attachment \(record.record.attachmentRowId), download \(record.id), isThumbnail: \(record.record.isThumbnail)")
            // Mark the record done when we succeed; this will filter it out
            // from future queue pop/peek operations.
            backupAttachmentDownloadStore.markDone(
                attachmentId: record.record.attachmentRowId,
                thumbnail: record.record.isThumbnail,
                tx: tx,
            )
        }

        private struct RetryMediaTierError: Error {
            let nextRetryTimestamp: UInt64
        }

        private struct Retry5xxError: Error {
            let nextRetryTimestamp: UInt64
        }

        private struct RetryAsTransitTierError: Error {
            let shouldWipeMediaTierInfo: Bool
        }

        private enum DownloadSource: Equatable {
            case transitTier(Attachment.TransitTierInfo)
            case mediaTierFullsize
            case mediaTierThumbnail

            var asSourceType: QueuedAttachmentDownloadRecord.SourceType {
                return switch self {
                case .transitTier: .transitTier
                case .mediaTierFullsize: .mediaTierFullsize
                case .mediaTierThumbnail: .mediaTierThumbnail
                }
            }
        }

        private struct Unretryable404Error: Error {
            let source: DownloadSource
        }

        func didFail(record: Store.Record, error: any Error, isRetryable: Bool, tx: DBWriteTransaction) {
            logger.warn("Failed restoring attachment \(record.record.attachmentRowId), download \(record.id), isRetryable: \(isRetryable), isThumbnail: \(record.record.isThumbnail), error: \(error)")

            if
                isRetryable,
                let nextRetryTimestamp =
                (error as? RetryMediaTierError)?.nextRetryTimestamp
                    ?? (error as? Retry5xxError)?.nextRetryTimestamp
            {
                var downloadRecord = record.record
                downloadRecord.minRetryTimestamp = nextRetryTimestamp
                downloadRecord.numRetries += 1
                failIfThrows {
                    try downloadRecord.update(tx.database)
                }
            } else if
                isRetryable,
                let error = error as? RetryAsTransitTierError
            {
                // Just increment the retry count by 1 but don't update
                // the retry timestamp so we retry immediately as transit tier.
                var downloadRecord = record.record
                downloadRecord.numRetries += 1
                failIfThrows {
                    try downloadRecord.update(tx.database)
                }

                if
                    error.shouldWipeMediaTierInfo,
                    let attachment = attachmentStore.fetch(id: record.record.attachmentRowId, tx: tx)
                {
                    attachmentStore.removeMediaTierInfo(
                        attachment: attachment,
                        tx: tx,
                    )
                    backupAttachmentUploadScheduler.enqueueUsingHighestPriorityOwnerIfNeeded(
                        attachment,
                        mode: .fullsize,
                        tx: tx,
                    )
                }
            } else if !isRetryable {
                backupAttachmentDownloadStore.remove(
                    attachmentId: record.record.attachmentRowId,
                    thumbnail: record.record.isThumbnail,
                    tx: tx,
                )
                // For non-retryable 404 errors, go ahead and wipe the relevant cdn
                // info from the attachment, as download failed.
                if let error = error as? Unretryable404Error {
                    guard
                        let attachment = attachmentStore.fetch(
                            id: record.record.attachmentRowId,
                            tx: tx,
                        )
                    else {
                        owsFailDebug("Missing attachment!")
                        return
                    }

                    switch error.source {
                    case .mediaTierThumbnail:
                        attachmentStore.removeThumbnailMediaTierInfo(
                            attachment: attachment,
                            tx: tx,
                        )
                        backupAttachmentUploadScheduler.enqueueUsingHighestPriorityOwnerIfNeeded(
                            attachment,
                            mode: .thumbnail,
                            tx: tx,
                        )
                    case .mediaTierFullsize:
                        attachmentStore.removeMediaTierInfo(
                            attachment: attachment,
                            tx: tx,
                        )
                        backupAttachmentUploadScheduler.enqueueUsingHighestPriorityOwnerIfNeeded(
                            attachment,
                            mode: .fullsize,
                            tx: tx,
                        )
                    case .transitTier(let transitTierInfo):
                        attachmentStore.removeTransitTierInfo(
                            transitTierInfo,
                            attachment: attachment,
                            tx: tx,
                        )
                    }
                }
            } else {
                // Do nothing for other retryable errors.
            }
        }

        func didObsolete(record: Store.Record, tx: DBWriteTransaction) {
            logger.warn("Obsoleted restoring attachment \(record.record.attachmentRowId), download \(record.id), isThumbnail: \(record.record.isThumbnail)")
            backupAttachmentDownloadStore.remove(
                attachmentId: record.record.attachmentRowId,
                thumbnail: record.record.isThumbnail,
                tx: tx,
            )
        }

        func didDrainQueue() async {
            switch mode {
            case .thumbnail:
                Logger.info("Did drain thumbnail queue")
            case .fullsize:
                Logger.info("Did drain fullsize queue")
                await progress.didEmptyFullsizeDownloadQueue()
            }
            await statusManager.didEmptyQueue(for: mode)
            switch mode {
            case .thumbnail:
                break
            case .fullsize:
                await db.awaitableWrite { tx in
                    // Go ahead and delete all done rows to reset the byte count.
                    // This isn't load-bearing, but its nice to do just in case
                    // some new download gets added it can just count up to its own
                    // total.
                    backupAttachmentDownloadStore.deleteAllDone(tx: tx)
                }
            }
        }
    }

    // MARK: - TaskRecordStore

    struct TaskRecord: SignalServiceKit.TaskRecord {
        let id: QueuedBackupAttachmentDownload.IDType
        let record: QueuedBackupAttachmentDownload
        let nextRetryTimestamp: UInt64?
    }

    class TaskStore: TaskRecordStore {

        private let backupAttachmentDownloadStore: BackupAttachmentDownloadStore

        private let mode: BackupAttachmentDownloadQueueMode

        init(
            mode: BackupAttachmentDownloadQueueMode,
            backupAttachmentDownloadStore: BackupAttachmentDownloadStore,
        ) {
            self.backupAttachmentDownloadStore = backupAttachmentDownloadStore
            self.mode = mode
        }

        func peek(count: UInt, tx: DBReadTransaction) -> [TaskRecord] {
            let forThumbnailDownloads = switch mode {
            case .thumbnail: true
            case .fullsize: false
            }
            return backupAttachmentDownloadStore.peek(
                count: count,
                isThumbnail: forThumbnailDownloads,
                tx: tx,
            ).map { record in
                return TaskRecord(
                    id: record.id!,
                    record: record,
                    nextRetryTimestamp: record.minRetryTimestamp,
                )
            }
        }

        func removeRecord(_ record: TaskRecord, tx: DBWriteTransaction) throws {
            // Rather than remove when we finish running a record, we mark it done
            // instead in the success callback, and delete it in failure callbacks.
            // So we do nothing here on purpose.
        }
    }

    // MARK: -

    private enum Constants {
        static let numParallelDownloadsFullsize: UInt = 12
        static let numParallelDownloadsThumbnail: UInt = 8
    }
}

#if TESTABLE_BUILD

open class BackupAttachmentDownloadQueueRunnerMock: BackupAttachmentDownloadQueueRunner {

    public init() {}

    public func restoreAttachmentsIfNeeded(mode: BackupAttachmentDownloadQueueMode) async throws {
        // Do nothing
    }

    public func backupPlanDidChange(
        from oldPlan: BackupPlan,
        to newPlan: BackupPlan,
        tx: DBWriteTransaction,
    ) throws {
        // Do nothing
    }

    public func prepareToDisableBackups(currentBackupPlan: BackupPlan, tx: DBWriteTransaction) throws {
        // Do nothing
    }
}

#endif