Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/AttachmentBackfill/AttachmentBackfillManager.swift
1 views
//
// Copyright 2026 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import LibSignalClient

public class AttachmentBackfillManager {

    /// Wrapper around `MessageSenderJobQueue`, for tests.
    protocol AttachmentBackfillSyncMessageSender {
        func add(
            attachmentBackfillRequestSyncMessage: AttachmentBackfillRequestSyncMessage,
            tx: DBWriteTransaction,
        )

        func add(
            attachmentBackfillResponseSyncMessage: AttachmentBackfillResponseSyncMessage,
            tx: DBWriteTransaction,
        )
    }

    // MARK: -

    private let attachmentStore: AttachmentStore
    private let attachmentUploadManager: AttachmentUploadManager
    private let db: DB
    private let interactionStore: InteractionStore
    private let logger: PrefixedLogger
    private let notificationPresenter: NotificationPresenter
    private let recipientDatabaseTable: RecipientDatabaseTable
    private let syncMessageSender: AttachmentBackfillSyncMessageSender
    private let taskQueue: SerialTaskQueue
    private let threadStore: ThreadStore

    init(
        attachmentStore: AttachmentStore,
        attachmentUploadManager: AttachmentUploadManager,
        db: DB,
        interactionStore: InteractionStore,
        notificationPresenter: NotificationPresenter,
        recipientDatabaseTable: RecipientDatabaseTable,
        syncMessageSender: AttachmentBackfillSyncMessageSender,
        threadStore: ThreadStore,
    ) {
        self.attachmentStore = attachmentStore
        self.attachmentUploadManager = attachmentUploadManager
        self.db = db
        self.interactionStore = interactionStore
        self.logger = PrefixedLogger(prefix: "[Backfill]")
        self.notificationPresenter = notificationPresenter
        self.recipientDatabaseTable = recipientDatabaseTable
        self.syncMessageSender = syncMessageSender
        self.taskQueue = SerialTaskQueue()
        self.threadStore = threadStore
    }

    // MARK: - Outbound Requests

    public func sendOutboundRequest(
        message: TSMessage,
        localIdentifiers: LocalIdentifiers,
        tx: DBWriteTransaction,
    ) {
        guard
            let backfillTarget = assembleBackfillTarget(
                message: message,
                localIdentifiers: localIdentifiers,
                tx: tx,
            )
        else {
            logger.warn("Failed to assemble backfill target for outbound request.")
            return
        }

        guard let localThread = threadStore.getOrCreateLocalThread(tx: tx) else {
            owsFailDebug("Failed to get local thread.", logger: logger)
            return
        }

        let requestProtoBuilder = SSKProtoSyncMessageAttachmentBackfillRequest.builder()
        requestProtoBuilder.setTargetMessage(backfillTarget.addressableMessage.asProto)
        requestProtoBuilder.setTargetConversation(backfillTarget.conversationIdentifier.asProto)

        do {
            let syncMessage = try AttachmentBackfillRequestSyncMessage(
                requestProto: requestProtoBuilder.buildInfallibly(),
                localThread: localThread,
                tx: tx,
            )

            syncMessageSender.add(
                attachmentBackfillRequestSyncMessage: syncMessage,
                tx: tx,
            )
        } catch {
            owsFailDebug("Failed to build backfill request sync message! \(error)")
            return
        }
    }

    // MARK: - Inbound Requests

    /// Returns whether we have an enqueued `AttachmentBackfillInboundRequestRecord`
    /// for the given interaction ID.
    public func hasEnqueuedInboundRequest(interactionRowId: Int64, tx: DBReadTransaction) -> Bool {
        return AttachmentBackfillInboundRequestRecord.fetchRecord(
            interactionId: interactionRowId,
            tx: tx,
        ) != nil
    }

    /// Serially processes already-enqueued `AttachmentBackfillInboundRequestRecord`s,
    /// for example from interrupted previous launches.
    /// - SeeAlso `processInboundRequest(requestRecord:localIdentifiers:)`
    public func processEnqueuedInboundRequests(registeredState: RegisteredState) {
        guard registeredState.isPrimary else {
            return
        }

        let enqueuedRequestRecords: [AttachmentBackfillInboundRequestRecord] = db.read { tx in
            AttachmentBackfillInboundRequestRecord.fetchAllAscending(tx: tx)
        }

        for enqueuedRequestRecord in enqueuedRequestRecords {
            _ = processInboundRequest(
                requestRecordId: enqueuedRequestRecord.id,
                localIdentifiers: registeredState.localIdentifiers,
            )
        }
    }

    /// Await the processing of any currently-enqueued inbound requests,
    /// cooperatively cancelling and waiting for teardown of said processing if
    /// cancelled.
    func awaitProcessingEnqueuedInboundRequests() async throws(CancellationError) {
        let flushTask = taskQueue.enqueue {
            // No-op: wait for the queue to flush.
        }
        let cancelledQueueFlushTask = AtomicValue<Task<Void, Error>?>(nil, lock: .init())

        await withTaskCancellationHandler(
            operation: {
                try? await flushTask.value
            },
            onCancel: {
                cancelledQueueFlushTask.set(taskQueue.enqueueCancellingPrevious {
                    // No-op: wait for the queue to flush.
                })
            },
        )

        // If we were cancelled while waiting, as a best effort wait for the
        // cancelled tasks in the queue to complete.
        if let task = cancelledQueueFlushTask.get() {
            try? await task.value
            throw CancellationError()
        }
    }

    /// Enqueues and kicks off an `AttachmentBackfillInboundRequestRecord` for
    /// the given inbound backfill request sync message.
    func enqueueInboundRequest(
        attachmentBackfillRequestProto: SSKProtoSyncMessageAttachmentBackfillRequest,
        registeredState: RegisteredState,
        tx: DBWriteTransaction,
    ) {
        guard BuildFlags.AttachmentBackfill.handleRequests else {
            logger.warn("Dropping backfill request: not yet supported.")
            return
        }

        guard registeredState.isPrimary else {
            logger.warn("Dropping backfill request: not registered primary.")
            return
        }

        guard
            let backfillTarget: AttachmentBackfillTarget = parseBackfillTarget(
                attachmentBackfillRequestProto: attachmentBackfillRequestProto,
                tx: tx,
            )
        else {
            logger.warn("Missing or invalid backfill target!")
            return
        }

        guard
            let backfillTargetMessage: TSMessage = locateBackfillTargetMessage(
                backfillTarget,
                tx: tx,
            )
        else {
            logger.info("Missing message for backfill target.")
            sendTargetNotFoundResponse(
                backfillTarget: backfillTarget,
                localIdentifiers: registeredState.localIdentifiers,
                tx: tx,
            )
            return
        }

        let backfillRecord = AttachmentBackfillInboundRequestRecord.fetchOrInsertRecord(
            interactionId: backfillTargetMessage.sqliteRowId!,
            tx: tx,
        )

        let logger = logger.suffixed(inboundRequestRecordId: backfillRecord.id)
        logger.info("Enqueued inbound request.")

        // Touch the target message so we reload it in the ConversationView, to
        // display that it's being backfilled.
        db.touch(interaction: backfillTargetMessage, shouldReindex: false, tx: tx)

        tx.addSyncCompletion { [self] in
            _ = processInboundRequest(
                requestRecordId: backfillRecord.id,
                localIdentifiers: registeredState.localIdentifiers,
            )
        }
    }

    /// Process the given `AttachmentBackfillInboundRequestRecord`, by uploading
    /// its body attachments and enqueuing a response sync message.
    func processInboundRequest(
        requestRecordId: AttachmentBackfillInboundRequestRecord.IDType,
        localIdentifiers: LocalIdentifiers,
    ) -> Task<Void, Error> {
        return taskQueue.enqueue { [self] in
            let logger = logger.suffixed(inboundRequestRecordId: requestRecordId)

            let requestRecord: AttachmentBackfillInboundRequestRecord?
            let backfillTarget: AttachmentBackfillTarget?
            let threadUniqueId: String?
            let messageUniqueId: String?
            (
                requestRecord,
                backfillTarget,
                threadUniqueId,
                messageUniqueId,
            ) = await db.awaitableWrite { tx in
                guard
                    let record = failIfThrows(block: {
                        try AttachmentBackfillInboundRequestRecord.fetchOne(
                            tx.database,
                            key: requestRecordId,
                        )
                    })
                else {
                    logger.warn("Missing request record: no response will be sent.")
                    return (nil, nil, nil, nil)
                }

                guard
                    let message = interactionStore.fetchInteraction(
                        rowId: record.interactionId,
                        tx: tx,
                    ) as? TSMessage,
                    let backfillTarget = assembleBackfillTarget(
                        message: message,
                        localIdentifiers: localIdentifiers,
                        tx: tx,
                    )
                else {
                    logger.warn("Missing backfill target: no response will be sent.")
                    failIfThrows {
                        try record.delete(tx.database)
                    }
                    return (nil, nil, nil, nil)
                }

                return (record, backfillTarget, message.uniqueThreadId, message.uniqueId)
            }

            guard
                let requestRecord,
                let backfillTarget,
                let threadUniqueId,
                let messageUniqueId
            else {
                return
            }

            notificationPresenter.notifyUserOfAttachmentBackfill(
                threadUniqueId: threadUniqueId,
                messageUniqueId: messageUniqueId,
                body: OWSLocalizedString(
                    "ATTACHMENT_BACKFILL_SYNCING_NOTIFICATION",
                    comment: "Notification body shown while syncing media to a linked device.",
                ),
            )

            let backfillAttemptResults = await attemptBackfill(
                interactionId: requestRecord.interactionId,
            )

            if Task.isCancelled {
                logger.warn("Backfill attempt cancelled.")

                notificationPresenter.notifyUserOfAttachmentBackfill(
                    threadUniqueId: threadUniqueId,
                    messageUniqueId: messageUniqueId,
                    body: OWSLocalizedString(
                        "ATTACHMENT_BACKFILL_INTERRUPTED_NOTIFICATION",
                        comment: "Notification body shown when media sync to a linked device was interrupted.",
                    ),
                )
                throw CancellationError()
            }

            await db.awaitableWrite { tx in
                let backfillAttemptDescription = backfillAttemptResults
                    .map { result in
                        switch result {
                        case .success: "success"
                        case .failure(let error) where error.isRetryable: "retry"
                        case .failure: "failure"
                        }
                    }
                    .joined(separator: ";")
                logger.info("Sending backfill response: \(backfillAttemptDescription)")

                self.sendBackfillAttemptResponse(
                    backfillTarget: backfillTarget,
                    backfillAttemptResults: backfillAttemptResults,
                    localIdentifiers: localIdentifiers,
                    tx: tx,
                )

                failIfThrows {
                    try requestRecord.delete(tx.database)
                }

                // Touch the target message so we reload it in the ConversationView,
                // to display that we're done with the backfill.
                if let backfillTargetMessage = locateBackfillTargetMessage(backfillTarget, tx: tx) {
                    db.touch(interaction: backfillTargetMessage, shouldReindex: false, tx: tx)
                }
            }

            notificationPresenter.notifyUserOfAttachmentBackfill(
                threadUniqueId: threadUniqueId,
                messageUniqueId: messageUniqueId,
                body: OWSLocalizedString(
                    "ATTACHMENT_BACKFILL_FINISHED_NOTIFICATION",
                    comment: "Notification body shown when media sync to a linked device is complete.",
                ),
            )
        }
    }

    private func attemptBackfill(
        interactionId: Int64,
    ) async -> [Result<SSKProtoAttachmentPointer, Error>] {
        let backfillableAttachmentReferences: [AttachmentReference] = db.read { tx in
            let stickerReferences = attachmentStore.fetchReferences(
                owner: .messageSticker(messageRowId: interactionId),
                tx: tx,
            )

            if !stickerReferences.isEmpty {
                return stickerReferences
            }

            return attachmentStore.fetchReferences(
                owner: .messageBodyAttachment(messageRowId: interactionId),
                tx: tx,
            )
        }

        if backfillableAttachmentReferences.isEmpty {
            logger.warn("No attachments for backfill target.")
            return []
        }

        return await withTaskGroup(
            of: (Int, Result<SSKProtoAttachmentPointer, Error>).self,
            returning: [Result<SSKProtoAttachmentPointer, Error>].self,
        ) { taskGroup in
            for (index, attachmentReference) in backfillableAttachmentReferences.enumerated() {
                taskGroup.addTask { [self] in
                    let result = await uploadAttachmentForBackfill(attachmentReference: attachmentReference)
                    return (index, result)
                }
            }

            var indexedResults = [(Int, Result<SSKProtoAttachmentPointer, Error>)]()
            for await indexedResult in taskGroup {
                indexedResults.append(indexedResult)
            }

            return indexedResults
                .sorted(by: { $0.0 < $1.0 })
                .map(\.1)
        }
    }

    private func uploadAttachmentForBackfill(
        attachmentReference: AttachmentReference,
    ) async -> Result<SSKProtoAttachmentPointer, Error> {
        let logger = logger.suffixed(with: "[\(attachmentReference.attachmentRowId)]")

        do {
            try await Retry.performWithBackoff(maxAttempts: 4) { [attachmentUploadManager] in
                // This method will short circuit and return without doing an
                // actual upload if the attachment has recent, reusable transit-
                // tier info.
                try await attachmentUploadManager.uploadTransitTierAttachment(
                    attachmentId: attachmentReference.attachmentRowId,
                    progress: nil,
                )
            }

            guard
                let attachment = db.read(block: { tx in
                    return attachmentStore.fetch(
                        id: attachmentReference.attachmentRowId,
                        tx: tx,
                    )
                }),
                let attachmentProto = ReferencedAttachment(
                    reference: attachmentReference,
                    attachment: attachment,
                ).asProtoForSending()
            else {
                return .failure(OWSAssertionError(
                    "Failed to get proto for sending after successful upload!",
                    logger: logger,
                ))
            }

            return .success(attachmentProto)
        } catch {
            if error.isRetryable {
                logger.warn("Ran out of retries uploading for backfill.")
            } else {
                logger.error("Failed to upload for backfill! \(error)")
            }

            return .failure(error)
        }
    }

    // MARK: - Outbound Responses

    private func sendTargetNotFoundResponse(
        backfillTarget: AttachmentBackfillTarget,
        localIdentifiers: LocalIdentifiers,
        tx: DBWriteTransaction,
    ) {
        sendBackfillResponse(
            backfillTarget: backfillTarget,
            localIdentifiers: localIdentifiers,
            customizeResponseBlock: { responseBuilder in
                responseBuilder.setError(.messageNotFound)
            },
            tx: tx,
        )
    }

    private func sendBackfillAttemptResponse(
        backfillTarget: AttachmentBackfillTarget,
        backfillAttemptResults: [Result<SSKProtoAttachmentPointer, Error>],
        localIdentifiers: LocalIdentifiers,
        tx: DBWriteTransaction,
    ) {
        sendBackfillResponse(
            backfillTarget: backfillTarget,
            localIdentifiers: localIdentifiers,
            customizeResponseBlock: { responseBuilder in
                let attachmentDatas = backfillAttemptResults.map { attemptResult -> SSKProtoSyncMessageAttachmentBackfillResponseAttachmentData in
                    let attachmentDataBuilder = SSKProtoSyncMessageAttachmentBackfillResponseAttachmentData.builder()
                    switch attemptResult {
                    case .success(let attachmentProto):
                        attachmentDataBuilder.setAttachment(attachmentProto)
                    case .failure(let error) where error.isRetryable:
                        attachmentDataBuilder.setStatus(.pending)
                    case .failure:
                        attachmentDataBuilder.setStatus(.terminalError)
                    }
                    return attachmentDataBuilder.buildInfallibly()
                }

                let attachmentDataListBuilder = SSKProtoSyncMessageAttachmentBackfillResponseAttachmentDataList.builder()
                attachmentDataListBuilder.setAttachments(attachmentDatas)
                responseBuilder.setAttachments(attachmentDataListBuilder.buildInfallibly())
            },
            tx: tx,
        )
    }

    private func sendBackfillResponse(
        backfillTarget: AttachmentBackfillTarget,
        localIdentifiers: LocalIdentifiers,
        customizeResponseBlock: (SSKProtoSyncMessageAttachmentBackfillResponseBuilder) -> Void,
        tx: DBWriteTransaction,
    ) {
        guard let localThread = threadStore.getOrCreateLocalThread(tx: tx) else {
            owsFailDebug("Failed to get local thread.", logger: logger)
            return
        }

        let responseProtoBuilder = SSKProtoSyncMessageAttachmentBackfillResponse.builder()
        responseProtoBuilder.setTargetMessage(backfillTarget.addressableMessage.asProto)
        responseProtoBuilder.setTargetConversation(backfillTarget.conversationIdentifier.asProto)
        customizeResponseBlock(responseProtoBuilder)

        do {
            let syncMessage = try AttachmentBackfillResponseSyncMessage(
                responseProto: responseProtoBuilder.buildInfallibly(),
                localThread: localThread,
                tx: tx,
            )

            syncMessageSender.add(
                attachmentBackfillResponseSyncMessage: syncMessage,
                tx: tx,
            )
        } catch {
            owsFailDebug("Failed to build backfill response sync message! \(error)")
            return
        }
    }

    // MARK: - Inbound Responses

    func enqueueInboundResponse(
        attachmentBackfillResponseProto: SSKProtoSyncMessageAttachmentBackfillResponse,
        registeredState: RegisteredState,
        tx: DBWriteTransaction,
    ) {
        // TODO: Enqueue downloads using the attachment pointers in the response.
        logger.warn("AttachmentBackfillResponse not yet supported.")
    }

    // MARK: - Backfill Targets

    private struct AttachmentBackfillTarget {
        let addressableMessage: AddressableMessage
        let conversationIdentifier: ConversationIdentifier
    }

    private func parseBackfillTarget(
        attachmentBackfillRequestProto: SSKProtoSyncMessageAttachmentBackfillRequest,
        tx: DBReadTransaction,
    ) -> AttachmentBackfillTarget? {
        guard
            let addressableMessageProto = attachmentBackfillRequestProto.targetMessage,
            let addressableMessage = AddressableMessage(proto: addressableMessageProto)
        else {
            return nil
        }

        guard
            let conversationIdentifierProto = attachmentBackfillRequestProto.targetConversation,
            let conversationIdentifier = ConversationIdentifier(proto: conversationIdentifierProto)
        else {
            return nil
        }

        return AttachmentBackfillTarget(
            addressableMessage: addressableMessage,
            conversationIdentifier: conversationIdentifier,
        )
    }

    private func locateBackfillTargetMessage(
        _ backfillTarget: AttachmentBackfillTarget,
        tx: DBReadTransaction,
    ) -> TSMessage? {
        let thread: TSThread
        switch backfillTarget.conversationIdentifier {
        case .serviceId(let serviceId):
            guard
                let recipient = recipientDatabaseTable.fetchRecipient(serviceId: serviceId, transaction: tx),
                let contactThread = threadStore.fetchContactThread(recipient: recipient, tx: tx)
            else {
                return nil
            }

            thread = contactThread
        case .e164(let e164):
            guard
                let recipient = recipientDatabaseTable.fetchRecipient(phoneNumber: e164.stringValue, transaction: tx),
                let contactThread = threadStore.fetchContactThread(recipient: recipient, tx: tx)
            else {
                return nil
            }

            thread = contactThread
        case .groupIdentifier(let groupIdentifier):
            guard let groupThread = threadStore.fetchGroupThread(groupId: groupIdentifier, tx: tx) else {
                return nil
            }

            thread = groupThread
        }

        let authorAddress: SignalServiceAddress
        switch backfillTarget.addressableMessage.author {
        case .aci(let aci):
            authorAddress = SignalServiceAddress(aci)
        case .e164(let e164):
            authorAddress = SignalServiceAddress(e164)
        }

        return interactionStore.findMessage(
            withTimestamp: backfillTarget.addressableMessage.sentTimestamp,
            threadId: thread.uniqueId,
            author: authorAddress,
            tx: tx,
        )
    }

    private func assembleBackfillTarget(
        message: TSMessage,
        localIdentifiers: LocalIdentifiers,
        tx: DBReadTransaction,
    ) -> AttachmentBackfillTarget? {
        guard
            let addressableMessage = AddressableMessage(
                message: message,
                localIdentifiers: localIdentifiers,
            )
        else {
            owsFailDebug("Target message missing author info!", logger: logger)
            return nil
        }

        guard let thread = threadStore.fetchThread(uniqueId: message.uniqueThreadId, tx: tx) else {
            owsFailDebug("Missing thread for message!", logger: logger)
            return nil
        }

        let conversationIdentifier: ConversationIdentifier
        if let contactThread = thread as? TSContactThread {
            if let serviceId = ServiceId.parseFrom(serviceIdBinary: nil, serviceIdString: contactThread.contactUUID) {
                conversationIdentifier = .serviceId(serviceId)
            } else if let e164 = E164(contactThread.contactPhoneNumber) {
                conversationIdentifier = .e164(e164)
            } else {
                logger.warn("Contact thread missing identifiers!")
                return nil
            }
        } else if
            let groupThread = thread as? TSGroupThread,
            let groupIdentifier = try? groupThread.groupIdentifier
        {
            conversationIdentifier = .groupIdentifier(groupIdentifier)
        } else {
            owsFailDebug("Target thread is neither contact nor group!", logger: logger)
            return nil
        }

        return AttachmentBackfillTarget(
            addressableMessage: addressableMessage,
            conversationIdentifier: conversationIdentifier,
        )
    }
}

// MARK: - AttachmentBackfillManager.AttachmentBackfillSyncMessageSender

extension MessageSenderJobQueue: AttachmentBackfillManager.AttachmentBackfillSyncMessageSender {
    func add(
        attachmentBackfillRequestSyncMessage: AttachmentBackfillRequestSyncMessage,
        tx: DBWriteTransaction,
    ) {
        add(
            message: .preprepared(transientMessageWithoutAttachments: attachmentBackfillRequestSyncMessage),
            transaction: tx,
        )
    }

    func add(
        attachmentBackfillResponseSyncMessage: AttachmentBackfillResponseSyncMessage,
        tx: DBWriteTransaction,
    ) {
        add(
            message: .preprepared(transientMessageWithoutAttachments: attachmentBackfillResponseSyncMessage),
            transaction: tx,
        )
    }
}

// MARK: -

private extension PrefixedLogger {
    func suffixed(inboundRequestRecordId: AttachmentBackfillInboundRequestRecord.IDType) -> PrefixedLogger {
        return suffixed(with: "[ID:\(inboundRequestRecordId)]")
    }
}