Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Messages/BackgroundMessageFetcher.swift
1 views
//
// Copyright 2025 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import Foundation

public struct BackgroundMessageFetcherFactory {
    private let attachmentBackfillManager: AttachmentBackfillManager
    private let chatConnectionManager: any ChatConnectionManager
    private let groupMessageProcessorManager: GroupMessageProcessorManager
    private let messageProcessor: MessageProcessor
    private let messageSenderJobQueue: MessageSenderJobQueue
    private let receiptSender: ReceiptSender
    private let storageServiceManager: any StorageServiceManager

    public init(
        attachmentBackfillManager: AttachmentBackfillManager,
        chatConnectionManager: any ChatConnectionManager,
        groupMessageProcessorManager: GroupMessageProcessorManager,
        messageProcessor: MessageProcessor,
        messageSenderJobQueue: MessageSenderJobQueue,
        receiptSender: ReceiptSender,
        storageServiceManager: any StorageServiceManager,
    ) {
        self.attachmentBackfillManager = attachmentBackfillManager
        self.chatConnectionManager = chatConnectionManager
        self.groupMessageProcessorManager = groupMessageProcessorManager
        self.messageProcessor = messageProcessor
        self.messageSenderJobQueue = messageSenderJobQueue
        self.receiptSender = receiptSender
        self.storageServiceManager = storageServiceManager
    }

    public func buildFetcher() -> BackgroundMessageFetcher {
        return BackgroundMessageFetcher(
            attachmentBackfillManager: self.attachmentBackfillManager,
            chatConnectionManager: self.chatConnectionManager,
            groupMessageProcessorManager: self.groupMessageProcessorManager,
            messageProcessor: self.messageProcessor,
            messageSenderJobQueue: self.messageSenderJobQueue,
            receiptSender: self.receiptSender,
            storageServiceManager: self.storageServiceManager,
        )
    }
}

public actor BackgroundMessageFetcher {
    private let attachmentBackfillManager: AttachmentBackfillManager
    private let chatConnectionManager: any ChatConnectionManager
    private let groupMessageProcessorManager: GroupMessageProcessorManager
    private let messageProcessor: MessageProcessor
    private let messageSenderJobQueue: MessageSenderJobQueue
    private let receiptSender: ReceiptSender
    private let storageServiceManager: any StorageServiceManager

    fileprivate init(
        attachmentBackfillManager: AttachmentBackfillManager,
        chatConnectionManager: any ChatConnectionManager,
        groupMessageProcessorManager: GroupMessageProcessorManager,
        messageProcessor: MessageProcessor,
        messageSenderJobQueue: MessageSenderJobQueue,
        receiptSender: ReceiptSender,
        storageServiceManager: any StorageServiceManager,
    ) {
        self.attachmentBackfillManager = attachmentBackfillManager
        self.chatConnectionManager = chatConnectionManager
        self.groupMessageProcessorManager = groupMessageProcessorManager
        self.messageProcessor = messageProcessor
        self.messageSenderJobQueue = messageSenderJobQueue
        self.receiptSender = receiptSender
        self.storageServiceManager = storageServiceManager
    }

    private var connectionTokens = [OWSChatConnection.ConnectionToken]()

    public func start() async {
        self.connectionTokens = chatConnectionManager.requestConnections()
        await self.groupMessageProcessorManager.startAllProcessors()
    }

    public func reset() {
        self.connectionTokens.forEach { $0.releaseConnection() }
        self.connectionTokens = []
    }

    /// Waits until message processing has reached an "idle" state.
    ///
    /// - Throws: An error when canceled or if the connection closes and won't
    /// immediately try to re-open.
    public func waitForFetchingProcessingAndSideEffects() async throws {
        try await withCooperativeRace(
            { try await self._waitForFetchingProcessingAndSideEffects() },
            { try await self.waitUntilSocketShouldBeClosedIfCanUseSockets() },
        )
    }

    /// Waits until `deadline`.
    ///
    /// - Throws: An error when canceled or if the connection closes and won't
    /// immediately try to re-open.
    public func waitUntil(deadline: MonotonicDate) async throws {
        let now = MonotonicDate()
        if now < deadline {
            try await withCooperativeRace(
                { try await Task.sleep(nanoseconds: (deadline - now).nanoseconds) },
                { try await self.waitUntilSocketShouldBeClosedIfCanUseSockets() },
            )
        }
    }

    private func waitUntilSocketShouldBeClosedIfCanUseSockets() async throws {
        try await self.chatConnectionManager.waitUntilIdentifiedConnectionShouldBeClosed()
        // We wanted to wait for things to happen, but we can't wait, so throw.
        throw OWSGenericError("Should be closed.")
    }

    private func _waitForFetchingProcessingAndSideEffects() async throws {
        try await messageProcessor.waitForFetchingAndProcessing()

        // Wait for these in parallel.
        do {
            // Wait until all outgoing receipt sends are complete.
            async let pendingReceipts: Void = self.receiptSender.waitForPendingReceipts()
            // Wait until all outgoing messages are sent.
            async let pendingMessages: Void = self.messageSenderJobQueue.waitUntilDone()
            // Wait until all sync requests are fulfilled.
            async let pendingOps: Void = MessageReceiver.waitForPendingTasks()
            // Wait until Storage Service has settled.
            async let pendingStorageService: Void = self.storageServiceManager.waitForSteadyState()
            // Wait until attachment backfills are done.
            async let pendingAttachmentBackfills: Void = self.attachmentBackfillManager.awaitProcessingEnqueuedInboundRequests()

            try await pendingReceipts
            try await pendingMessages
            try await pendingOps
            try await pendingStorageService
            try await pendingAttachmentBackfills
        }

        // Finally, wait for any notifications to finish posting
        try await NotificationPresenterImpl.waitForPendingNotifications()
    }

    public func stopAndWaitBeforeSuspending() async {
        // Wrap the cleanup of message processing in a new Task, so if we're
        // canceled, that method doesn't inherit our cancellation.
        return await Task { await self._stopAndWaitBeforeSuspending() }.value
    }

    private func _stopAndWaitBeforeSuspending() async {
        // Release the connections and wait for them to close.
        self.reset()
        await chatConnectionManager.waitForDisconnectIfClosed()

        // Wait for notifications that are already scheduled to be posted.
        do {
            try await NotificationPresenterImpl.waitForPendingNotifications()
        } catch {
            owsFailDebug("\(error)")
        }
    }
}