Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Backups/Attachments/BackupAttachmentUploadQueueRunner.swift
1 views
//
// Copyright 2025 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import Foundation
import LibSignalClient

public protocol BackupAttachmentUploadQueueRunner {

    /// Backs up all pending attachments in the BackupAttachmentUploadQueue.
    ///
    /// Will keep backing up attachments until there are none left, then returns.
    /// Is cooperatively cancellable; will check and early terminate if the task is cancelled
    /// in between individual attachments.
    ///
    /// Returns immediately if there's no attachments left to back up.
    ///
    /// Each individual attachments is either freshly uploaded or copied from the transit
    /// tier to the media tier as needed. Thumbnail versions are also created, uploaded, and
    /// backed up as needed.
    ///
    /// Throws an error IFF something would prevent all attachments from backing up (e.g. network issue).
    func backUpAllAttachments(mode: BackupAttachmentUploadQueueMode) async throws
}

class BackupAttachmentUploadQueueRunnerImpl: BackupAttachmentUploadQueueRunner {

    private let accountKeyStore: AccountKeyStore
    private let attachmentStore: AttachmentStore
    private let backupAttachmentUploadScheduler: BackupAttachmentUploadScheduler
    private let backupAttachmentUploadStore: BackupAttachmentUploadStore
    private let backupMediaErrorNotificationPresenter: BackupMediaErrorNotificationPresenter
    private let backupRequestManager: BackupRequestManager
    private let backupSettingsStore: BackupSettingsStore
    private let db: any DB
    private let logger: PrefixedLogger
    private let progress: BackupAttachmentUploadProgress
    private let statusManager: BackupAttachmentUploadQueueStatusManager
    private let tsAccountManager: TSAccountManager

    /// We keep these two separate because we allow more thumbnails in parallel
    /// than fullsize, so we just run them as separate queues configured at init time
    /// but sharing the same runner class.
    private let fullsizeTaskQueue: TaskQueueLoader<TaskRunner>
    private let thumbnailTaskQueue: TaskQueueLoader<TaskRunner>

    init(
        accountKeyStore: AccountKeyStore,
        attachmentStore: AttachmentStore,
        attachmentUploadManager: AttachmentUploadManager,
        backupAttachmentUploadScheduler: BackupAttachmentUploadScheduler,
        backupAttachmentUploadStore: BackupAttachmentUploadStore,
        backupAttachmentUploadEraStore: BackupAttachmentUploadEraStore,
        backupListMediaManager: BackupListMediaManager,
        backupMediaErrorNotificationPresenter: BackupMediaErrorNotificationPresenter,
        backupRequestManager: BackupRequestManager,
        backupSettingsStore: BackupSettingsStore,
        dateProvider: @escaping DateProvider,
        db: any DB,
        notificationPresenter: NotificationPresenter,
        orphanedBackupAttachmentStore: OrphanedBackupAttachmentStore,
        progress: BackupAttachmentUploadProgress,
        statusManager: BackupAttachmentUploadQueueStatusManager,
        tsAccountManager: TSAccountManager,
    ) {
        self.accountKeyStore = accountKeyStore
        self.attachmentStore = attachmentStore
        self.backupAttachmentUploadScheduler = backupAttachmentUploadScheduler
        self.backupAttachmentUploadStore = backupAttachmentUploadStore
        self.backupMediaErrorNotificationPresenter = backupMediaErrorNotificationPresenter
        self.backupRequestManager = backupRequestManager
        self.backupSettingsStore = backupSettingsStore
        self.db = db
        let logger = PrefixedLogger(prefix: "[Backups]")
        self.logger = logger
        self.progress = progress
        self.statusManager = statusManager
        self.tsAccountManager = tsAccountManager

        func makeTaskQueue(mode: BackupAttachmentUploadQueueMode) -> TaskQueueLoader<TaskRunner> {
            let taskRunner = TaskRunner(
                mode: mode,
                accountKeyStore: accountKeyStore,
                attachmentStore: attachmentStore,
                attachmentUploadManager: attachmentUploadManager,
                backupAttachmentUploadScheduler: backupAttachmentUploadScheduler,
                backupAttachmentUploadStore: backupAttachmentUploadStore,
                backupAttachmentUploadEraStore: backupAttachmentUploadEraStore,
                backupMediaErrorNotificationPresenter: backupMediaErrorNotificationPresenter,
                backupRequestManager: backupRequestManager,
                backupSettingsStore: backupSettingsStore,
                dateProvider: dateProvider,
                db: db,
                listMediaManager: backupListMediaManager,
                logger: logger,
                notificationPresenter: notificationPresenter,
                orphanedBackupAttachmentStore: orphanedBackupAttachmentStore,
                progress: progress,
                statusManager: statusManager,
                tsAccountManager: tsAccountManager,
            )
            return TaskQueueLoader(
                maxConcurrentTasks: {
                    switch mode {
                    case .fullsize: Constants.numParallelUploadsFullsize
                    case .thumbnail: Constants.numParallelUploadsThumbnail
                    }
                }(),
                dateProvider: dateProvider,
                db: db,
                runner: taskRunner,
            )
        }

        self.fullsizeTaskQueue = makeTaskQueue(mode: .fullsize)
        self.thumbnailTaskQueue = makeTaskQueue(mode: .thumbnail)
    }

    // MARK: -

    func backUpAllAttachments(mode: BackupAttachmentUploadQueueMode) async throws {
        let taskQueue: TaskQueueLoader<TaskRunner>
        let logString: String
        switch mode {
        case .fullsize:
            taskQueue = fullsizeTaskQueue
            logString = "fullsize"
        case .thumbnail:
            taskQueue = thumbnailTaskQueue
            logString = "thumbnail"
        }

        let (isRegisteredPrimary, localAci, backupPlan, backupKey) = db.read { tx in
            (
                self.tsAccountManager.registrationState(tx: tx).isRegisteredPrimaryDevice,
                self.tsAccountManager.localIdentifiers(tx: tx)?.aci,
                backupSettingsStore.backupPlan(tx: tx),
                accountKeyStore.getMediaRootBackupKey(tx: tx),
            )
        }

        guard isRegisteredPrimary, let localAci else {
            return
        }

        guard let backupKey else {
            logger.info("Skipping \(logString) attachment backups while media backup key is missing")
            return
        }

        switch backupPlan {
        case .disabled, .disabling, .free:
            return
        case .paid, .paidExpiringSoon, .paidAsTester:
            break
        }

        let backupAuth: BackupServiceAuth
        do {
            // If we have no credentials, or a free credential, we want to fetch new
            // credentials, as we think we're paid tier according to local state.
            // We'll need the paid credential to upload in each task; we load and cache
            // it now so its available (and so we can bail early if its somehow free tier).
            backupAuth = try await backupRequestManager.fetchBackupServiceAuth(
                for: backupKey,
                localAci: localAci,
                auth: .implicit(),
                forceRefreshUnlessCachedPaidCredential: true,
                logger: logger,
            )
        } catch let error as BackupAuthCredentialFetchError {
            switch error {
            case .noExistingBackupId:
                // If we have no backup, we sure won't be uploading.
                logger.info("Bailing on \(logString) attachment backups when backup id not registered")
                return
            }
        } catch let error {
            throw error
        }

        switch backupAuth.backupLevel {
        case .free:
            Logger.warn("Local backupPlan is paid but credential is free")
            await backupMediaErrorNotificationPresenter.notifyIfNecessary()
            try? await taskQueue.stop()
            return
        case .paid:
            break
        }

        switch await statusManager.beginObservingIfNecessary(for: mode) {
        case .running:
            logger.info("Running \(logString) Backup uploads.")
            let backgroundTask = OWSBackgroundTask(
                label: #function + logString,
            ) { [weak taskQueue] status in
                switch status {
                case .expired:
                    Task {
                        try await taskQueue?.stop()
                    }
                case .couldNotStart, .success:
                    break
                }
            }
            defer { backgroundTask.end() }
            try await taskQueue.loadAndRunTasks()
            logger.info("Finished \(logString) Backup uploads.")
        case .suspended:
            logger.info("Skipping \(logString) Backup uploads: suspende by user.")
            try await taskQueue.stop()
        case .empty:
            logger.info("Skipping \(logString) Backup uploads: queue is empty.")
            return
        case .notRegisteredAndReady:
            logger.warn("Skipping \(logString) Backup uploads: not registered and ready.")
            try await taskQueue.stop()
        case .noWifiReachability:
            logger.warn("Skipping \(logString) Backup uploads: need wifi.")
            try await taskQueue.stop()
        case .noReachability:
            logger.warn("Skipping \(logString) Backup uploads: need internet.")
            try await taskQueue.stop()
        case .lowBattery:
            logger.warn("Skipping \(logString) Backup uploads: low battery.")
            try await taskQueue.stop()
        case .lowPowerMode:
            logger.warn("Skipping \(logString) Backup uploads: low power mode.")
            try await taskQueue.stop()
        case .appBackgrounded:
            logger.warn("Skipping \(logString) Backup uploads: app backgrounded")
            try await taskQueue.stop()
        case .hasConsumedMediaTierCapacity:
            logger.warn("Skipping \(logString) Backup uploads: out of capacity")
            try await taskQueue.stop()
        }
    }

    // MARK: - TaskRecordRunner

    private final class TaskRunner: TaskRecordRunner {

        private let accountKeyStore: AccountKeyStore
        private let attachmentStore: AttachmentStore
        private let attachmentUploadManager: AttachmentUploadManager
        private let backupAttachmentUploadScheduler: BackupAttachmentUploadScheduler
        private let backupAttachmentUploadStore: BackupAttachmentUploadStore
        private let backupAttachmentUploadEraStore: BackupAttachmentUploadEraStore
        private let backupRequestManager: BackupRequestManager
        private let backupSettingsStore: BackupSettingsStore
        private let dateProvider: DateProvider
        private let db: any DB
        private let backupMediaErrorNotificationPresenter: BackupMediaErrorNotificationPresenter
        private let listMediaManager: BackupListMediaManager
        private let logger: PrefixedLogger
        private let notificationPresenter: NotificationPresenter
        private let orphanedBackupAttachmentStore: OrphanedBackupAttachmentStore
        private let progress: BackupAttachmentUploadProgress
        private let statusManager: BackupAttachmentUploadQueueStatusManager
        private let tsAccountManager: TSAccountManager

        let mode: BackupAttachmentUploadQueueMode
        let store: TaskStore

        init(
            mode: BackupAttachmentUploadQueueMode,
            accountKeyStore: AccountKeyStore,
            attachmentStore: AttachmentStore,
            attachmentUploadManager: AttachmentUploadManager,
            backupAttachmentUploadScheduler: BackupAttachmentUploadScheduler,
            backupAttachmentUploadStore: BackupAttachmentUploadStore,
            backupAttachmentUploadEraStore: BackupAttachmentUploadEraStore,
            backupMediaErrorNotificationPresenter: BackupMediaErrorNotificationPresenter,
            backupRequestManager: BackupRequestManager,
            backupSettingsStore: BackupSettingsStore,
            dateProvider: @escaping DateProvider,
            db: any DB,
            listMediaManager: BackupListMediaManager,
            logger: PrefixedLogger,
            notificationPresenter: NotificationPresenter,
            orphanedBackupAttachmentStore: OrphanedBackupAttachmentStore,
            progress: BackupAttachmentUploadProgress,
            statusManager: BackupAttachmentUploadQueueStatusManager,
            tsAccountManager: TSAccountManager,
        ) {
            self.accountKeyStore = accountKeyStore
            self.attachmentStore = attachmentStore
            self.attachmentUploadManager = attachmentUploadManager
            self.backupAttachmentUploadScheduler = backupAttachmentUploadScheduler
            self.backupAttachmentUploadStore = backupAttachmentUploadStore
            self.backupAttachmentUploadEraStore = backupAttachmentUploadEraStore
            self.backupMediaErrorNotificationPresenter = backupMediaErrorNotificationPresenter
            self.backupRequestManager = backupRequestManager
            self.backupSettingsStore = backupSettingsStore
            self.dateProvider = dateProvider
            self.db = db
            self.listMediaManager = listMediaManager
            self.logger = logger
            self.notificationPresenter = notificationPresenter
            self.orphanedBackupAttachmentStore = orphanedBackupAttachmentStore
            self.progress = progress
            self.statusManager = statusManager
            self.tsAccountManager = tsAccountManager

            self.mode = mode
            self.store = TaskStore(
                mode: mode,
                backupAttachmentUploadStore: backupAttachmentUploadStore,
            )
        }

        func runTask(record: Store.Record, loader: TaskQueueLoader<TaskRunner>) async -> TaskRecordResult {
            struct ExplicitlySuspendedError: Error {}
            struct NeedsBatteryError: Error {}
            struct NeedsInternetError: Error {}
            struct NeedsToBeRegisteredError: Error {}
            struct AppBackgroundedError: Error {}

            switch await statusManager.currentStatus(for: mode) {
            case .running:
                break
            case .empty:
                // The queue will stop on its own, finish this task.
                break
            case .suspended:
                try? await loader.stop()
                return .retryableError(ExplicitlySuspendedError())
            case .lowBattery, .lowPowerMode:
                try? await loader.stop()
                return .retryableError(NeedsBatteryError())
            case .noWifiReachability, .noReachability:
                try? await loader.stop()
                return .retryableError(NeedsInternetError())
            case .notRegisteredAndReady:
                try? await loader.stop()
                return .retryableError(NeedsToBeRegisteredError())
            case .appBackgrounded:
                try? await loader.stop()
                return .retryableError(AppBackgroundedError())
            case .hasConsumedMediaTierCapacity:
                try? await loader.stop()
                return .retryableError(OutOfCapacityError())
            }

            let (
                attachment,
                backupPlan,
                currentUploadEra,
                backupKey,
                needsListMedia,
            ) = db.read { tx in
                return (
                    self.attachmentStore.fetch(id: record.record.attachmentRowId, tx: tx),
                    self.backupSettingsStore.backupPlan(tx: tx),
                    self.backupAttachmentUploadEraStore.currentUploadEra(tx: tx),
                    self.accountKeyStore.getMediaRootBackupKey(tx: tx),
                    self.listMediaManager.getNeedsQueryListMedia(tx: tx),
                )
            }

            if needsListMedia {
                // If we need to list media, quit out early so we can do that.
                try? await loader.stop(reason: NeedsListMediaError())
                return .retryableError(NeedsListMediaError())
            }

            guard let attachment else {
                // Attachment got deleted; early exit.
                return .obsolete
            }

            guard attachment.asStream() != nil, let mediaName = attachment.mediaName else {
                // We only back up attachments we've downloaded (streams)
                return .obsolete
            }

            guard let backupKey else {
                owsFailDebug("Missing media backup key.  Unable to upload attachments.", logger: logger)
                return .obsolete
            }

            // We're about to upload; ensure we aren't also enqueuing a media tier delete.
            // This is only defensive as we should be cancelling any deletes any time we
            // create an attachmenr stream and enqueue an upload to begin with.
            do {
                try await db.awaitableWrite { tx in
                    if record.record.isFullsize {
                        let mediaId = try backupKey.mediaEncryptionMetadata(
                            mediaName: mediaName,
                            // Doesn't matter what we use, we just want the mediaId
                            type: .outerLayerFullsizeOrThumbnail,
                        ).mediaId
                        orphanedBackupAttachmentStore.removeFullsize(
                            mediaName: mediaName,
                            fullsizeMediaId: mediaId,
                            tx: tx,
                        )
                    } else {
                        let mediaId = try backupKey.mediaEncryptionMetadata(
                            mediaName: AttachmentBackupThumbnail.thumbnailMediaName(fullsizeMediaName: mediaName),
                            // Doesn't matter what we use, we just want the mediaId
                            type: .outerLayerFullsizeOrThumbnail,
                        ).mediaId
                        orphanedBackupAttachmentStore.removeThumbnail(
                            fullsizeMediaName: mediaName,
                            thumbnailMediaId: mediaId,
                            tx: tx,
                        )
                    }
                }
            } catch {
                owsFailDebug("Unable to delete orphan row. Proceeding anyway.")
            }

            struct IsFreeTierError: Error {}
            switch backupPlan {
            case .disabled, .disabling, .free:
                try? await loader.stop()
                return .retryableError(IsFreeTierError())
            case .paid, .paidExpiringSoon, .paidAsTester:
                break
            }

            let localAci: Aci
            let isPrimary: Bool
            do throws(NotRegisteredError) {
                let registeredState = try self.tsAccountManager.registeredStateWithMaybeSneakyTransaction()
                localAci = registeredState.localIdentifiers.aci
                isPrimary = registeredState.isPrimary
            } catch {
                try? await loader.stop(reason: error)
                return .retryableError(error)
            }
            guard isPrimary else {
                let error = OWSAssertionError("not primary")
                try? await loader.stop(reason: error)
                return .retryableError(error)
            }

            let backupAuth: BackupServiceAuth
            do {
                backupAuth = try await backupRequestManager.fetchBackupServiceAuth(
                    for: backupKey,
                    localAci: localAci,
                    auth: .implicit(),
                    // No need to force it here; when we start up the queue we force
                    // a fetch, and should've bailed and never gotten this far if it
                    // was a free tier credential. (Though the paid state and credential
                    // can change _after_ this queue has already started running, so we
                    // do need to handle that case).
                    forceRefreshUnlessCachedPaidCredential: false,
                    logger: logger,
                )
            } catch let error {
                try? await loader.stop(reason: error)
                return .retryableError(error)
            }

            switch backupAuth.backupLevel {
            case .paid:
                break
            case .free:
                // If we find ourselves with a free tier credential,
                // all uploads will fail. Just quit.
                try? await loader.stop()
                await backupMediaErrorNotificationPresenter.notifyIfNecessary()
                return .retryableError(IsFreeTierError())
            }

            let progressSink: OWSProgressSink?
            if record.record.isFullsize {
                progressSink = await progress.willBeginUploadingFullsizeAttachment(
                    uploadRecord: record.record,
                )
            } else {
                progressSink = nil
            }

            guard
                db.read(block: { tx in
                    backupAttachmentUploadScheduler.isEligibleToUpload(
                        attachment,
                        mode: record.record.isFullsize ? .fullsize : .thumbnail,
                        currentUploadEra: currentUploadEra,
                        tx: tx,
                    )
                })
            else {
                if record.record.isFullsize {
                    await progress.didFinishUploadOfFullsizeAttachment(
                        uploadRecord: record.record,
                    )
                }
                // Not eligible anymore, count as success.
                return .success
            }

            do {
                if record.record.isFullsize {
                    try await attachmentUploadManager.uploadMediaTierAttachment(
                        attachmentId: attachment.id,
                        uploadEra: currentUploadEra,
                        localAci: localAci,
                        backupKey: backupKey,
                        auth: backupAuth,
                        progress: progressSink,
                    )
                } else {
                    try await attachmentUploadManager.uploadMediaTierThumbnailAttachment(
                        attachmentId: attachment.id,
                        uploadEra: currentUploadEra,
                        localAci: localAci,
                        backupKey: backupKey,
                        auth: backupAuth,
                        progress: nil,
                    )
                }
            } catch let error {
                if Task.isCancelled {
                    logger.info("Cancelled; stopping the queue")
                    try? await loader.stop(reason: CancellationError())
                    return .retryableError(CancellationError())
                }

                switch error as? BackupArchive.Response.CopyToMediaTierError {
                case .sourceObjectNotFound, .badArgument:
                    // Any time we find this error, retry. It means the upload
                    // expired, and so the copy failed. This is always transient
                    // and can always be fixed by reuploading, don't even increment
                    // the retry count.
                    return .retryableError(error)
                case .forbidden:
                    // This only happens if we've lost write access to the media tier.
                    // As a final check, force refresh our crednetial and check if its
                    // paid. If its not (we just got a 403 so that's what we expect),
                    // all uploads will fail so dequeue them and quit.
                    let credential = try? await backupRequestManager.fetchBackupServiceAuth(
                        for: backupKey,
                        localAci: localAci,
                        auth: .implicit(),
                        forceRefreshUnlessCachedPaidCredential: true,
                        logger: logger,
                    )
                    switch credential?.backupLevel {
                    case .free, nil:
                        await backupMediaErrorNotificationPresenter.notifyIfNecessary()
                        try? await loader.stop()
                        return .retryableError(IsFreeTierError())
                    case .paid:
                        return .retryableError(OWSGenericError("Refreshed credential is paid: should retry upload."))
                    }
                case .outOfCapacity:
                    let didSetConsumeMediaTierCapacity = await db.awaitableWrite { tx in
                        if !backupSettingsStore.hasConsumedMediaTierCapacity(tx: tx) {
                            backupSettingsStore.setHasConsumedMediaTierCapacity(true, tx: tx)
                            return true
                        } else {
                            return false
                        }
                    }
                    if didSetConsumeMediaTierCapacity {
                        await MainActor.run { [notificationPresenter] in
                            notificationPresenter.notifyUserOfMediaTierQuotaConsumed()
                        }
                    }
                    let error = OutOfCapacityError()
                    try? await loader.stop(reason: error)
                    return .retryableError(error)
                default:
                    // All other errors should be treated as per normal.
                    if error.httpStatusCode == 429 {
                        if let retryAfter = error.httpResponseHeaders?.retryAfterTimeInterval {
                            return .retryableError(RateLimitedRetryError(retryAfter: retryAfter))
                        }

                        // If for whatever reason we don't have a retry-after,
                        // treat this like a network error that retries with
                        // backoff.
                        return .retryableError(NetworkRetryError())
                    } else if
                        error.isNetworkFailureOrTimeout
                        // Retry 500s per-item with the same backoff as network errors
                        || error.is5xxServiceResponse
                        || (error as? Upload.Error) == .networkTimeout
                        || (error as? Upload.Error) == .networkError
                    {
                        switch await statusManager.currentStatus(for: mode) {
                        case .running:
                            // If we _think_ we are connected and should be running,
                            // use a more crude retry time mechanism to retry later.
                            // Note that we update the individual row but really this
                            // will end up holding up the entire queue because we don't
                            // reorder when popping off the queue based on retry time,
                            // so this row will remain first in line (unless something else
                            // changes, in which case we retry and either succeed or fall back
                            // into here) and block the rest of the queue from trying,
                            // which is what we want because the error is a general network
                            // issue.
                            return .retryableError(NetworkRetryError())
                        case .noWifiReachability, .notRegisteredAndReady, .hasConsumedMediaTierCapacity,
                             .lowBattery, .lowPowerMode, .appBackgrounded, .empty, .suspended:
                            // These other states may be overriding reachability;
                            // just allow the queue itself to retry and once the
                            // other states are resolved reachability will kick in,
                            // or won't.
                            fallthrough
                        case .noReachability:
                            // If reachability thinks we are not connected, queue status
                            // will cover us. Don't touch the record itself; the queue will stop
                            // running and start again once reconnected, and we want to try
                            // the record again immediately then.
                            return .retryableError(error)
                        }
                    } else if let uploadError = error as? Upload.Error {
                        switch uploadError {
                        case .missingFile:
                            // The file is missing! We can never retry this upload;
                            // call it a "success" so we don't mess with progress
                            // state and so we wipe the upload task row, and move on.
                            logger.error("Missing attachment file; skipping and proceeding")
                            if record.record.isFullsize {
                                await progress.didFinishUploadOfFullsizeAttachment(
                                    uploadRecord: record.record,
                                )
                            }
                            return .success
                        case .uploadFailure(let recovery):
                            switch recovery {
                            case .resume(let retryMode), .restart(let retryMode):
                                switch retryMode {
                                case .afterBackoff:
                                    return .retryableError(RateLimitedRetryError(retryAfter: nil))
                                case .afterServerRequestedDelay(let retryAfter):
                                    return .retryableError(RateLimitedRetryError(retryAfter: retryAfter))
                                case .immediately:
                                    return .retryableError(RateLimitedRetryError(retryAfter: 0))
                                }
                            case .noMoreRetries:
                                logger.error("No more upload retries; stopping the queue")
                                await backupMediaErrorNotificationPresenter.notifyIfNecessary()
                                try? await loader.stop()
                                return .retryableError(error)
                            }
                        default:
                            // For other errors stop the queue to prevent thundering herd;
                            // when it starts up again (e.g. on app launch) we will retry.
                            logger.error("Unknown error occurred; stopping the queue. \(error)")
                            await backupMediaErrorNotificationPresenter.notifyIfNecessary()
                            try? await loader.stop()
                            return .retryableError(error)
                        }
                    } else if record.record.isFullsize {
                        // For other errors stop the queue to prevent thundering herd;
                        // when it starts up again (e.g. on app launch) we will retry.
                        logger.error("Unknown error occurred; stopping the queue")
                        await backupMediaErrorNotificationPresenter.notifyIfNecessary()
                        try? await loader.stop()
                        return .retryableError(error)
                    } else {
                        // Ignore the error if we e.g. fail to generate a thumbnail;
                        // just upload the fullsize.
                        logger.error("Failed to upload thumbnail; proceeding")
                        if record.record.isFullsize {
                            await progress.didFinishUploadOfFullsizeAttachment(
                                uploadRecord: record.record,
                            )
                        }
                        return .success
                    }
                }
            }

            if record.record.isFullsize {
                await progress.didFinishUploadOfFullsizeAttachment(
                    uploadRecord: record.record,
                )
            }

            return .success
        }

        func didSucceed(record: Store.Record, tx: DBWriteTransaction) throws {
            logger.info("Finished backing up attachment \(record.record.attachmentRowId), upload \(record.id), fullsize? \(record.record.isFullsize)")
        }

        private struct RateLimitedRetryError: Error {
            let retryAfter: TimeInterval?
        }

        private struct NetworkRetryError: Error {}
        private struct OutOfCapacityError: Error {}

        func didFail(record: Store.Record, error: any Error, isRetryable: Bool, tx: DBWriteTransaction) throws {
            logger.warn("Failed backing up attachment \(record.record.attachmentRowId), upload \(record.id), fullsize? \(record.record.isFullsize), isRetryable: \(isRetryable), error: \(error)")

            guard isRetryable else {
                return
            }

            var record = record.record
            let retryDelay: TimeInterval

            if error is NetworkRetryError {
                record.numRetries += 1
                retryDelay = OWSOperation.retryIntervalForExponentialBackoff(failureCount: record.numRetries)
            } else if let rateLimitedError = error as? RateLimitedRetryError {
                if let retryAfter = rateLimitedError.retryAfter {
                    retryDelay = retryAfter
                } else {
                    // If no delay provided, use standard backoff and increment retry count.
                    record.numRetries += 1
                    retryDelay = OWSOperation.retryIntervalForExponentialBackoff(failureCount: record.numRetries)
                }
            } else {
                return
            }

            record.minRetryTimestamp = dateProvider().addingTimeInterval(retryDelay).ows_millisecondsSince1970
            try record.update(tx.database)
        }

        func didObsolete(record: Store.Record, tx: DBWriteTransaction) throws {
            logger.warn("Obsoleted backing up attachment \(record.record.attachmentRowId), upload \(record.id), fullsize? \(record.record.isFullsize)")
        }

        func didDrainQueue() async {
            switch mode {
            case .fullsize:
                logger.info("Did drain fullsize upload queue")
                await progress.didEmptyFullsizeUploadQueue()
            case .thumbnail:
                logger.info("Did drain thumbnail upload queue")
            }
            await statusManager.didEmptyQueue(for: mode)
        }
    }

    // MARK: - TaskRecordStore

    struct TaskRecord: SignalServiceKit.TaskRecord {
        let id: Int64
        let record: QueuedBackupAttachmentUpload

        var nextRetryTimestamp: UInt64? {
            return record.minRetryTimestamp
        }
    }

    class TaskStore: TaskRecordStore {

        private let mode: BackupAttachmentUploadQueueMode
        private let backupAttachmentUploadStore: BackupAttachmentUploadStore

        init(
            mode: BackupAttachmentUploadQueueMode,
            backupAttachmentUploadStore: BackupAttachmentUploadStore,
        ) {
            self.mode = mode
            self.backupAttachmentUploadStore = backupAttachmentUploadStore
        }

        func peek(count: UInt, tx: DBReadTransaction) -> [TaskRecord] {
            let forFullsizeUploads = switch mode {
            case .fullsize:
                true
            case .thumbnail:
                false
            }
            return backupAttachmentUploadStore.fetchNextUploads(
                count: count,
                isFullsize: forFullsizeUploads,
                tx: tx,
            ).map {
                return TaskRecord(id: $0.id!, record: $0)
            }
        }

        func removeRecord(_ record: TaskRecord, tx: DBWriteTransaction) {
            // We don't actually delete records when finishing; we just mark
            // them done so we can still keep track of their byte count.
            backupAttachmentUploadStore.markUploadDone(
                for: record.record.attachmentRowId,
                fullsize: record.record.isFullsize,
                tx: tx,
            )
        }
    }

    // MARK: -

    private enum Constants {
        static let numParallelUploadsFullsize: UInt = 12
        static let numParallelUploadsThumbnail: UInt = 8
    }
}

#if TESTABLE_BUILD

open class BackupAttachmentUploadQueueRunnerMock: BackupAttachmentUploadQueueRunner {

    public init() {}

    public func backUpAllAttachments(mode: BackupAttachmentUploadQueueMode) async throws {
        // Do nothing
    }
}

#endif