Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Jobs/MessageSenderJobQueue.swift
1 views
//
// Copyright 2018 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import Foundation
import LibSignalClient

/// Durably enqueues outgoing messages.
///
/// Calls `MessageSender` to send messages.
///
/// # Retries
///
/// Both `MessageSenderJobQueue` and `MessageSender` implement retry
/// handling.
///
/// The latter (`MessageSender`) retries only specific errors that the
/// server indicates are immediately retryable (e.g., "you are missing a
/// device for the destination; add it and try again"). These retries aren't
/// "configurable", nor do they have any backoff. They are expected when the
/// system is operating normally, and they are part of the expected flow for
/// sending a message.
///
/// The former (`MessageSenderJobQueue`) retries generic/unknown failures
/// (e.g., "the server gave us a 5xx error; try after a few seconds", "there
/// isn't any Internet; try when we reconnect"). These retries are
/// "configurable", meaning we can decide how many occur and how often they
/// occur. These only happen when something is operating abnormally (e.g.,
/// "the server is down", "the user isn't connected to the network").
///
/// Both respect `IsRetryableProvider` and only retry retryable errors.
public class MessageSenderJobQueue {
    private var jobSerializer = CompletionSerializer()

    public init(appReadiness: AppReadiness) {
        appReadiness.runNowOrWhenAppDidBecomeReadyAsync {
            self.setUp()
        }
    }

    public func add(
        message: PreparedOutgoingMessage,
        limitToCurrentProcessLifetime: Bool = false,
        isHighPriority: Bool = false,
        transaction: DBWriteTransaction,
    ) {
        self.add(
            message: message,
            exclusiveToCurrentProcessIdentifier: limitToCurrentProcessLifetime,
            isHighPriority: isHighPriority,
            future: nil,
            transaction: transaction,
        )
    }

    public func add(
        _ namespace: PromiseNamespace,
        message: PreparedOutgoingMessage,
        limitToCurrentProcessLifetime: Bool = false,
        isHighPriority: Bool = false,
        transaction: DBWriteTransaction,
    ) -> Promise<Void> {
        return Promise { future in
            self.add(
                message: message,
                exclusiveToCurrentProcessIdentifier: limitToCurrentProcessLifetime,
                isHighPriority: isHighPriority,
                future: future,
                transaction: transaction,
            )
        }
    }

    private func add(
        message: PreparedOutgoingMessage,
        exclusiveToCurrentProcessIdentifier: Bool,
        isHighPriority: Bool,
        future: Future<Void>?,
        transaction: DBWriteTransaction,
    ) {
        // Mark as sending now so the UI updates immediately.
        message.updateAllUnsentRecipientsAsSending(tx: transaction)
        let jobRecord: MessageSenderJobRecord
        do {
            jobRecord = try message.asMessageSenderJobRecord(isHighPriority: isHighPriority, tx: transaction)
        } catch {
            message.updateWithAllSendingRecipientsMarkedAsFailed(error: error, tx: transaction)
            future?.reject(error)
            return
        }
        owsAssertDebug(jobRecord.status == .ready)
        if exclusiveToCurrentProcessIdentifier {
            // Nothing to do. Just don't insert it into the database.
        } else {
            jobRecord.anyInsert(transaction: transaction)
        }

        self.state.update {
            $0.pendingJobs.append(Job(record: jobRecord, isInMemoryOnly: exclusiveToCurrentProcessIdentifier))
            if let future {
                $0.jobFutures[jobRecord.uniqueId] = future
            }
        }

        transaction.addSyncCompletion {
            self.startPendingJobRecordsIfPossible()
        }
    }

    // MARK: JobQueue

    /// A job that needs to be executed.
    private struct Job {
        let record: MessageSenderJobRecord
        let isInMemoryOnly: Bool
    }

    /// A job that's been queued but hasn't started yet.
    private struct QueuedOperationState {
        let job: Job
        let message: PreparedOutgoingMessage
        let future: Future<Void>?
    }

    /// A job that's actively executing; it may be suspended due to errors.
    private struct ActiveOperationState {
        let job: Job
        let message: PreparedOutgoingMessage
        let future: Future<Void>?
        let externalRetryTriggerState = AtomicValue(ExternalRetryTriggerState(), lock: .init())

        init(queuedOperation: QueuedOperationState) {
            self.job = queuedOperation.job
            self.message = queuedOperation.message
            self.future = queuedOperation.future
        }

        /// "Consume" any triggers that have already fired.
        ///
        /// Callers should do this before performing any retryable action that might
        /// fail due to one of the triggers. For example, if a message might fail to
        /// send because there's no Internet, this should be called before
        /// attempting to send the message.
        ///
        /// This pattern ensures no triggers are missed due to concurrently
        /// executing operations & triggers. For example, if Internet isn't
        /// available, you start sending a message, Internet becomes available, and
        /// then the message fails to send with a "network failure" error, we want
        /// to immediately retry. If we don't retry, we'd be stuck until something
        /// *else* triggers a retry (e.g., losing & gaining Internet again).
        func clearExternalRetryTriggers() {
            self.externalRetryTriggerState.update {
                $0.reportedExternalRetryTriggers = []
            }
        }

        /// Trigger any jobs that failed because of `failureReason`.
        ///
        /// This also triggers any in-progress jobs (after they fail) that fail
        /// because of `failureReason`. This avoids race conditions (see above).
        func reportExternalRetryTrigger(_ externalRetryTrigger: ExternalRetryTriggers) {
            self.externalRetryTriggerState.update {
                $0.reportedExternalRetryTriggers.formUnion(externalRetryTrigger)
                notifyIfPossible(mutableState: &$0)
            }
        }

        /// Waits until any of `failureReasons` has been triggered.
        func waitForAnyExternalRetryTrigger(fromExternalRetryTriggers externalRetryTriggers: ExternalRetryTriggers) async throws {
            let waitingContinuation = CancellableContinuation<Void>()
            self.externalRetryTriggerState.update {
                $0.waitingState = (waitingContinuation, externalRetryTriggers)
                notifyIfPossible(mutableState: &$0)
            }
            return try await waitingContinuation.wait()
        }

        private func notifyIfPossible(mutableState: inout ExternalRetryTriggerState) {
            guard let waitingState = mutableState.waitingState else {
                return
            }
            if mutableState.reportedExternalRetryTriggers.isDisjoint(with: waitingState.externalRetryTriggers) {
                return
            }
            waitingState.continuation.resume(with: .success(()))
        }
    }

    /// Tracks information about failures with external retry triggers.
    private struct ExternalRetryTriggerState {
        var reportedExternalRetryTriggers: ExternalRetryTriggers = []
        var waitingState: (continuation: CancellableContinuation<Void>, externalRetryTriggers: ExternalRetryTriggers)?
    }

    /// Tracks failure types with external retry triggers.
    ///
    /// For example, a "network failure" error can be triggered before its
    /// timer-based retry interval if Internet suddenly becomes available.
    /// Conversely, 5xx errors are transient but can only be retried when their
    /// timer-based retry fires, so they're not included here.
    private struct ExternalRetryTriggers: OptionSet {
        let rawValue: Int

        static let networkBecameReachable = ExternalRetryTriggers(rawValue: 1 << 0)
        static let chatConnectionOpened = ExternalRetryTriggers(rawValue: 1 << 1)
    }

    private enum JobPriority: Hashable {
        case high
        case renderableContent
        case low
    }

    private struct State {
        var isLoaded = false
        var pendingJobs = [Job]()
        var isTransferringPendingJobs = false
        var queueStates = [QueueKey: QueueState]()
        var jobFutures = [String: Future<Void>]()

        /// Resumed when `isDone` is true.
        var onDone = [NSObject: Monitor.Continuation]()
        var isDone: Bool {
            return isLoaded && pendingJobs.isEmpty && !isTransferringPendingJobs && queueStates.isEmpty
        }
    }

    private struct QueueKey: Hashable {
        let threadId: String?
        let priority: JobPriority
    }

    private struct QueueState {
        var activeOperations = [ActiveOperationState]()
        var queuedOperations = [QueuedOperationState]()

        var isEmpty: Bool {
            return activeOperations.isEmpty && queuedOperations.isEmpty
        }

        var hasExactlyOneActiveOperationThatUsesTheMediaQueue: Bool {
            return activeOperations.count == 1 && activeOperations[0].job.record.useMediaQueue
        }
    }

    private let state = AtomicValue<State>(State(), lock: .init())

    private func didMarkAsReady(oldJobRecord: MessageSenderJobRecord, transaction: DBWriteTransaction) {
        // TODO: Remove this method and status swapping logic entirely.
        let uniqueId: String?
        switch oldJobRecord.messageType {
        case .persisted(let messageId, _):
            uniqueId = messageId
        case .editMessage(let editedMessageId, _, _):
            uniqueId = editedMessageId
        case .transient, .none:
            return
        }
        guard let uniqueId else {
            return
        }

        TSOutgoingMessage
            .fetchViaCache(
                uniqueId: uniqueId,
                transaction: transaction,
            )
            .flatMap { $0 as? TSOutgoingMessage }?
            .updateAllUnsentRecipientsAsSending(transaction: transaction)
    }

    private let pendingJobQueue = DispatchQueue(label: "MessageSenderJobQueue.pendingJobRecords")

    private func startPendingJobRecordsIfPossible() {
        // Use a queue to ensure "pendingJobs" get passed to queueJob in the correct order.
        pendingJobQueue.async {
            let pendingJobs = self.state.update {
                if $0.isLoaded {
                    let result = $0.pendingJobs
                    $0.pendingJobs = []
                    return result
                }
                $0.isTransferringPendingJobs = true
                return []
            }
            defer {
                self.updateStateAndNotify {
                    $0.isTransferringPendingJobs = false
                }
            }
            if !pendingJobs.isEmpty {
                SSKEnvironment.shared.databaseStorageRef.write { tx in
                    for pendingJob in pendingJobs {
                        self.queueJob(pendingJob, tx: tx)
                    }
                }
            }
        }
    }

    private func queueJob(_ job: Job, tx transaction: DBWriteTransaction) {
        let future = self.state.update { $0.jobFutures.removeValue(forKey: job.record.uniqueId) }

        guard let message = PreparedOutgoingMessage.restore(from: job.record, tx: transaction) else {
            if !job.isInMemoryOnly {
                job.record.anyRemove(transaction: transaction)
            }
            future?.reject(OWSAssertionError("Can't start job that can't be prepared."))
            return
        }

        let sendPriority: JobPriority
        if job.record.isHighPriority {
            sendPriority = .high
        } else if message.hasRenderableContent(tx: transaction) {
            sendPriority = .renderableContent
        } else {
            sendPriority = .low
        }

        let operation = QueuedOperationState(
            job: job,
            message: message,
            future: future,
        )

        let queueKey = QueueKey(threadId: job.record.threadId, priority: sendPriority)
        self.jobSerializer.addOrderedSyncCompletion(tx: transaction) {
            self.state.update {
                $0.queueStates[queueKey, default: QueueState()].queuedOperations.append(operation)
            }
            self.startNextJobIfNeeded(queueKey: queueKey)
        }
    }

    public func setUp() {
        let jobRecordFinder = JobRecordFinderImpl<MessageSenderJobRecord>(db: DependenciesBridge.shared.db)
        Task {
            if CurrentAppContext().isMainApp {
                do {
                    let jobRecords = try await jobRecordFinder.loadRunnableJobs(updateRunnableJobRecord: { jobRecord, tx in
                        self.didMarkAsReady(oldJobRecord: jobRecord, transaction: tx)
                    })
                    let jobRecordUniqueIds = Set(jobRecords.lazy.map(\.uniqueId))
                    self.state.update {
                        var newlyPendingJobs = $0.pendingJobs
                        newlyPendingJobs.removeAll(where: { jobRecordUniqueIds.contains($0.record.uniqueId) })
                        $0.pendingJobs = jobRecords.map { Job(record: $0, isInMemoryOnly: false) }
                        $0.pendingJobs.append(contentsOf: newlyPendingJobs)
                    }
                } catch {
                    owsFailDebug("Couldn't load existing message send jobs: \(error)")
                }
            }

            // FIXME: The returned observer token is never unregistered.
            // In practice all our JobQueues live forever, so this isn't a problem.

            // We use "unowned" so that don't silently fail (or leak) when this changes.
            let becameReachableBlock = { [unowned self] in
                self.becameReachable()
            }
            NotificationCenter.default.addObserver(
                forName: SSKReachability.owsReachabilityDidChange,
                object: nil,
                queue: nil,
            ) { _ in
                if SSKEnvironment.shared.reachabilityManagerRef.isReachable {
                    becameReachableBlock()
                }
            }
            let chatConnectionOpenedBlock = { [unowned self] in
                self.reportExternalRetryTrigger(.chatConnectionOpened)
            }
            NotificationCenter.default.addObserver(
                forName: OWSChatConnection.chatConnectionStateDidChange,
                object: nil,
                queue: nil,
                using: { note in
                    let connectionState = note.userInfo![OWSChatConnection.chatConnectionStateKey]! as! OWSChatConnectionState
                    if connectionState == .open {
                        chatConnectionOpenedBlock()
                    }
                },
            )

            // No matter what, mark it as loaded. This keeps things semi-functional.
            self.updateStateAndNotify { $0.isLoaded = true }
            startPendingJobRecordsIfPossible()
        }
    }

    func becameReachable() {
        self.reportExternalRetryTrigger(.networkBecameReachable)
    }

    private func reportExternalRetryTrigger(_ externalRetryTrigger: ExternalRetryTriggers) {
        self.state.update {
            for (_, queueState) in $0.queueStates {
                for activeOperation in queueState.activeOperations {
                    activeOperation.reportExternalRetryTrigger(externalRetryTrigger)
                }
            }
        }
    }

    private func startNextJobIfNeeded(queueKey: QueueKey) {
        self.updateStateAndNotify {
            var queueState = $0.queueStates[queueKey, default: QueueState()]

            // If nothing is running, start *any* operation that needs to be started.
            if queueState.activeOperations.isEmpty {
                if let nextIndex = queueState.queuedOperations.indices.first {
                    startNextJob(atQueuedIndex: nextIndex, forQueueKey: queueKey, in: &queueState)
                }
            }

            // Non-media messages get an extra slot to run so that they don't get stuck
            // behind media messages. If the first slot got filled by a media message,
            // this one can be filled by a non-media message. If the first slot is
            // filled by a non-media message, we can't schedule anything else.

            // For example, if you send A, B, C, and D, where C is media and everything
            // else is a text message, then only orderings ABCD and ABDC are allowed.
            // This block exists to start sending "D" concurrently with "C".
            if queueState.hasExactlyOneActiveOperationThatUsesTheMediaQueue {
                if let nextIndex = queueState.queuedOperations.firstIndex(where: { !$0.job.record.useMediaQueue }) {
                    startNextJob(atQueuedIndex: nextIndex, forQueueKey: queueKey, in: &queueState)
                }
            }

            $0.queueStates[queueKey] = queueState.isEmpty ? nil : queueState
        }
    }

    private func startNextJob(atQueuedIndex index: Int, forQueueKey queueKey: QueueKey, in queueState: inout QueueState) {
        let queuedOperation = queueState.queuedOperations.remove(at: index)
        let activeOperation = ActiveOperationState(queuedOperation: queuedOperation)
        queueState.activeOperations.append(activeOperation)
        Task(priority: Self.taskPriority(forJobPriority: queueKey.priority)) {
            await self.runOperation(activeOperation)
            self.state.update {
                $0.queueStates[queueKey]!.activeOperations.removeAll(where: { $0.job.record.uniqueId == activeOperation.job.record.uniqueId })
            }
            startNextJobIfNeeded(queueKey: queueKey)
        }
    }

    private static func taskPriority(forJobPriority jobPriority: JobPriority) -> TaskPriority {
        switch jobPriority {
        case .high, .renderableContent:
            return .userInitiated
        case .low:
            return .medium
        }
    }

    /// Runs a job to send a particular message.
    ///
    /// This method returns after the operation reaches a terminal result and
    /// the job record has been deleted.
    private func runOperation(_ operation: ActiveOperationState) async {
        let result = await Result { try await self._runOperation(operation) }
        await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
            if !operation.job.isInMemoryOnly {
                operation.job.record.anyRemove(transaction: tx)
            }
            switch result {
            case .success(()):
                operation.message.updateWithSendSuccess(tx: tx)
            case .failure(let error):
                operation.message.updateWithAllSendingRecipientsMarkedAsFailed(error: error, tx: tx)
            }
        }
        switch result {
        case .success(()):
            operation.future?.resolve()
        case .failure(let error):
            operation.future?.reject(error)
        }
    }

    /// Use max-retries as a stand-in for a timeout for messages we want sent or cancelled in less than 24 hours.
    /// Eventually this will be replaced with support for actual timeouts.
    private func getMaxRetriesForMessageType(message: PreparedOutgoingMessage) -> Int {
        if message.isPinChange {
            return 2
        }
        return 110
    }

    /// Runs a job to send a particular message.
    ///
    /// This methods returns after the operation has reached a terminal result
    /// but before that result has been processed.
    private func _runOperation(_ operation: ActiveOperationState) async throws {
        var attemptCount = Int(operation.job.record.failureCount)
        let maxRetries = getMaxRetriesForMessageType(message: operation.message)
        while true {
            assert(!Task.isCancelled, "Cancellation isn't supported.")
            operation.clearExternalRetryTriggers()
            let result = await SSKEnvironment.shared.messageSenderRef.sendMessage(operation.message)
            let errors: [any Error]
            let arbitraryError: any Error
            switch result {
            case .success:
                return
            case .overallFailure(let error):
                errors = [error]
                arbitraryError = error
            case .recipientsFailure(let failure):
                errors = failure.recipientErrors.map(\.error)
                arbitraryError = failure.arbitraryError
            }
            var retryableError: (any Error)?
            var externalRetryTriggers: ExternalRetryTriggers = []
            var suggestedRetryDelay: TimeInterval = 0
            var accountCheckerRetryDelay: TimeInterval = 0
            for error in errors {
                // Some errors should never be retried. Because group send is
                // all-or-nothing, this means we need to fail the entire operation even
                // when retries may work for other recipients.
                if error.isFatalError {
                    throw error
                }
                // Keep track of the first retryable error we encounter -- we'd prefer to
                // throw a retryable error rather than one that's not retryable.
                if MessageSender.isRetryableError(error) {
                    retryableError = retryableError ?? error
                }
                // If there's a network failure, this is an external error, so we want to
                // retry as soon as we reconnect.
                if error.isNetworkFailure {
                    externalRetryTriggers.insert(.chatConnectionOpened)
                }
                // If there's a timeout, we interrupted the request ourselves, and sending
                // the same request again on a new connection will typically result in the
                // same outcome, so we want to perform exponential backoff before retrying.
                // However, if Reachability indicates that something has changed, we might
                // be on a better network, and it may be worth retrying immediately.
                if error.isTimeout {
                    externalRetryTriggers.insert(.networkBecameReachable)
                }
                // If there's a Retry-After header, pick the largest one. That's when we
                // expect we'll be able to complete the entire send successfully.
                if let retryAfterDelay = error.httpResponseHeaders?.retryAfterTimeInterval {
                    suggestedRetryDelay = max(suggestedRetryDelay, retryAfterDelay)
                }
                if case SignalError.rateLimitedError(retryAfter: let retryAfter, message: _) = error {
                    suggestedRetryDelay = max(suggestedRetryDelay, retryAfter)
                }
                // If there's a Retry-After from the AccountChecker, we want to wait for
                // the sum of the Retry-Afters. (This avoids pathological O(n^2) behavior.)
                if let rateLimitError = error as? AccountChecker.RateLimitError {
                    accountCheckerRetryDelay += rateLimitError.retryAfter
                }
            }
            guard let retryableError else {
                throw arbitraryError
            }
            guard attemptCount < maxRetries else {
                throw retryableError
            }
            attemptCount += 1
            if !operation.job.isInMemoryOnly {
                await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
                    operation.job.record.addFailure(tx: tx)
                }
            }
            // Determine the minimum amount of backoff.
            let maxAverageBackoff: TimeInterval = 14.1 * .minute
            let exponentialRetryDelay: TimeInterval = OWSOperation.retryIntervalForExponentialBackoff(
                failureCount: attemptCount,
                maxAverageBackoff: maxAverageBackoff,
            )
            // We pick the largest of the values -- we don't want Retry-After headers
            // to be able to trigger tight retry loops on the client, so we maintain a
            // minimum of exponential backoff.
            let retryDelay = max(
                exponentialRetryDelay,
                min(maxAverageBackoff, suggestedRetryDelay),
                min(maxAverageBackoff, accountCheckerRetryDelay),
            )
            var httpBlurb = ""
            if suggestedRetryDelay > 0 {
                httpBlurb += " (retry-after: \(String(format: "%.1f", suggestedRetryDelay))s)"
            }
            if accountCheckerRetryDelay > 0 {
                httpBlurb += " (account-checker-retry-after: \(String(format: "%.1f", accountCheckerRetryDelay))s)"
            }
            Logger.warn("Resending \(operation.message.description) after \(String(format: "%.1f", retryDelay))s\(httpBlurb)")
            try? await withCooperativeTimeout(
                seconds: retryDelay,
                operation: { try await operation.waitForAnyExternalRetryTrigger(fromExternalRetryTriggers: externalRetryTriggers) },
            )
        }
    }

    // MARK: - Notifications

    private let doneCondition = Monitor.Condition<State>(
        isSatisfied: \.isDone,
        waiters: \.onDone,
    )

    private func updateStateAndNotify<T>(_ block: (inout State) -> T) -> T {
        return Monitor.updateAndNotify(
            in: state,
            block: block,
            conditions: doneCondition,
        )
    }

    public func waitUntilDone() async throws(CancellationError) {
        return try await Monitor.waitForCondition(doneCondition, in: state)
    }
}