Path: blob/main/SignalServiceKit/Jobs/MessageSenderJobQueue.swift
1 views
//
// Copyright 2018 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
import LibSignalClient
/// Durably enqueues outgoing messages.
///
/// Calls `MessageSender` to send messages.
///
/// # Retries
///
/// Both `MessageSenderJobQueue` and `MessageSender` implement retry
/// handling.
///
/// The latter (`MessageSender`) retries only specific errors that the
/// server indicates are immediately retryable (e.g., "you are missing a
/// device for the destination; add it and try again"). These retries aren't
/// "configurable", nor do they have any backoff. They are expected when the
/// system is operating normally, and they are part of the expected flow for
/// sending a message.
///
/// The former (`MessageSenderJobQueue`) retries generic/unknown failures
/// (e.g., "the server gave us a 5xx error; try after a few seconds", "there
/// isn't any Internet; try when we reconnect"). These retries are
/// "configurable", meaning we can decide how many occur and how often they
/// occur. These only happen when something is operating abnormally (e.g.,
/// "the server is down", "the user isn't connected to the network").
///
/// Both respect `IsRetryableProvider` and only retry retryable errors.
public class MessageSenderJobQueue {
private var jobSerializer = CompletionSerializer()
public init(appReadiness: AppReadiness) {
appReadiness.runNowOrWhenAppDidBecomeReadyAsync {
self.setUp()
}
}
public func add(
message: PreparedOutgoingMessage,
limitToCurrentProcessLifetime: Bool = false,
isHighPriority: Bool = false,
transaction: DBWriteTransaction,
) {
self.add(
message: message,
exclusiveToCurrentProcessIdentifier: limitToCurrentProcessLifetime,
isHighPriority: isHighPriority,
future: nil,
transaction: transaction,
)
}
public func add(
_ namespace: PromiseNamespace,
message: PreparedOutgoingMessage,
limitToCurrentProcessLifetime: Bool = false,
isHighPriority: Bool = false,
transaction: DBWriteTransaction,
) -> Promise<Void> {
return Promise { future in
self.add(
message: message,
exclusiveToCurrentProcessIdentifier: limitToCurrentProcessLifetime,
isHighPriority: isHighPriority,
future: future,
transaction: transaction,
)
}
}
private func add(
message: PreparedOutgoingMessage,
exclusiveToCurrentProcessIdentifier: Bool,
isHighPriority: Bool,
future: Future<Void>?,
transaction: DBWriteTransaction,
) {
// Mark as sending now so the UI updates immediately.
message.updateAllUnsentRecipientsAsSending(tx: transaction)
let jobRecord: MessageSenderJobRecord
do {
jobRecord = try message.asMessageSenderJobRecord(isHighPriority: isHighPriority, tx: transaction)
} catch {
message.updateWithAllSendingRecipientsMarkedAsFailed(error: error, tx: transaction)
future?.reject(error)
return
}
owsAssertDebug(jobRecord.status == .ready)
if exclusiveToCurrentProcessIdentifier {
// Nothing to do. Just don't insert it into the database.
} else {
jobRecord.anyInsert(transaction: transaction)
}
self.state.update {
$0.pendingJobs.append(Job(record: jobRecord, isInMemoryOnly: exclusiveToCurrentProcessIdentifier))
if let future {
$0.jobFutures[jobRecord.uniqueId] = future
}
}
transaction.addSyncCompletion {
self.startPendingJobRecordsIfPossible()
}
}
// MARK: JobQueue
/// A job that needs to be executed.
private struct Job {
let record: MessageSenderJobRecord
let isInMemoryOnly: Bool
}
/// A job that's been queued but hasn't started yet.
private struct QueuedOperationState {
let job: Job
let message: PreparedOutgoingMessage
let future: Future<Void>?
}
/// A job that's actively executing; it may be suspended due to errors.
private struct ActiveOperationState {
let job: Job
let message: PreparedOutgoingMessage
let future: Future<Void>?
let externalRetryTriggerState = AtomicValue(ExternalRetryTriggerState(), lock: .init())
init(queuedOperation: QueuedOperationState) {
self.job = queuedOperation.job
self.message = queuedOperation.message
self.future = queuedOperation.future
}
/// "Consume" any triggers that have already fired.
///
/// Callers should do this before performing any retryable action that might
/// fail due to one of the triggers. For example, if a message might fail to
/// send because there's no Internet, this should be called before
/// attempting to send the message.
///
/// This pattern ensures no triggers are missed due to concurrently
/// executing operations & triggers. For example, if Internet isn't
/// available, you start sending a message, Internet becomes available, and
/// then the message fails to send with a "network failure" error, we want
/// to immediately retry. If we don't retry, we'd be stuck until something
/// *else* triggers a retry (e.g., losing & gaining Internet again).
func clearExternalRetryTriggers() {
self.externalRetryTriggerState.update {
$0.reportedExternalRetryTriggers = []
}
}
/// Trigger any jobs that failed because of `failureReason`.
///
/// This also triggers any in-progress jobs (after they fail) that fail
/// because of `failureReason`. This avoids race conditions (see above).
func reportExternalRetryTrigger(_ externalRetryTrigger: ExternalRetryTriggers) {
self.externalRetryTriggerState.update {
$0.reportedExternalRetryTriggers.formUnion(externalRetryTrigger)
notifyIfPossible(mutableState: &$0)
}
}
/// Waits until any of `failureReasons` has been triggered.
func waitForAnyExternalRetryTrigger(fromExternalRetryTriggers externalRetryTriggers: ExternalRetryTriggers) async throws {
let waitingContinuation = CancellableContinuation<Void>()
self.externalRetryTriggerState.update {
$0.waitingState = (waitingContinuation, externalRetryTriggers)
notifyIfPossible(mutableState: &$0)
}
return try await waitingContinuation.wait()
}
private func notifyIfPossible(mutableState: inout ExternalRetryTriggerState) {
guard let waitingState = mutableState.waitingState else {
return
}
if mutableState.reportedExternalRetryTriggers.isDisjoint(with: waitingState.externalRetryTriggers) {
return
}
waitingState.continuation.resume(with: .success(()))
}
}
/// Tracks information about failures with external retry triggers.
private struct ExternalRetryTriggerState {
var reportedExternalRetryTriggers: ExternalRetryTriggers = []
var waitingState: (continuation: CancellableContinuation<Void>, externalRetryTriggers: ExternalRetryTriggers)?
}
/// Tracks failure types with external retry triggers.
///
/// For example, a "network failure" error can be triggered before its
/// timer-based retry interval if Internet suddenly becomes available.
/// Conversely, 5xx errors are transient but can only be retried when their
/// timer-based retry fires, so they're not included here.
private struct ExternalRetryTriggers: OptionSet {
let rawValue: Int
static let networkBecameReachable = ExternalRetryTriggers(rawValue: 1 << 0)
static let chatConnectionOpened = ExternalRetryTriggers(rawValue: 1 << 1)
}
private enum JobPriority: Hashable {
case high
case renderableContent
case low
}
private struct State {
var isLoaded = false
var pendingJobs = [Job]()
var isTransferringPendingJobs = false
var queueStates = [QueueKey: QueueState]()
var jobFutures = [String: Future<Void>]()
/// Resumed when `isDone` is true.
var onDone = [NSObject: Monitor.Continuation]()
var isDone: Bool {
return isLoaded && pendingJobs.isEmpty && !isTransferringPendingJobs && queueStates.isEmpty
}
}
private struct QueueKey: Hashable {
let threadId: String?
let priority: JobPriority
}
private struct QueueState {
var activeOperations = [ActiveOperationState]()
var queuedOperations = [QueuedOperationState]()
var isEmpty: Bool {
return activeOperations.isEmpty && queuedOperations.isEmpty
}
var hasExactlyOneActiveOperationThatUsesTheMediaQueue: Bool {
return activeOperations.count == 1 && activeOperations[0].job.record.useMediaQueue
}
}
private let state = AtomicValue<State>(State(), lock: .init())
private func didMarkAsReady(oldJobRecord: MessageSenderJobRecord, transaction: DBWriteTransaction) {
// TODO: Remove this method and status swapping logic entirely.
let uniqueId: String?
switch oldJobRecord.messageType {
case .persisted(let messageId, _):
uniqueId = messageId
case .editMessage(let editedMessageId, _, _):
uniqueId = editedMessageId
case .transient, .none:
return
}
guard let uniqueId else {
return
}
TSOutgoingMessage
.fetchViaCache(
uniqueId: uniqueId,
transaction: transaction,
)
.flatMap { $0 as? TSOutgoingMessage }?
.updateAllUnsentRecipientsAsSending(transaction: transaction)
}
private let pendingJobQueue = DispatchQueue(label: "MessageSenderJobQueue.pendingJobRecords")
private func startPendingJobRecordsIfPossible() {
// Use a queue to ensure "pendingJobs" get passed to queueJob in the correct order.
pendingJobQueue.async {
let pendingJobs = self.state.update {
if $0.isLoaded {
let result = $0.pendingJobs
$0.pendingJobs = []
return result
}
$0.isTransferringPendingJobs = true
return []
}
defer {
self.updateStateAndNotify {
$0.isTransferringPendingJobs = false
}
}
if !pendingJobs.isEmpty {
SSKEnvironment.shared.databaseStorageRef.write { tx in
for pendingJob in pendingJobs {
self.queueJob(pendingJob, tx: tx)
}
}
}
}
}
private func queueJob(_ job: Job, tx transaction: DBWriteTransaction) {
let future = self.state.update { $0.jobFutures.removeValue(forKey: job.record.uniqueId) }
guard let message = PreparedOutgoingMessage.restore(from: job.record, tx: transaction) else {
if !job.isInMemoryOnly {
job.record.anyRemove(transaction: transaction)
}
future?.reject(OWSAssertionError("Can't start job that can't be prepared."))
return
}
let sendPriority: JobPriority
if job.record.isHighPriority {
sendPriority = .high
} else if message.hasRenderableContent(tx: transaction) {
sendPriority = .renderableContent
} else {
sendPriority = .low
}
let operation = QueuedOperationState(
job: job,
message: message,
future: future,
)
let queueKey = QueueKey(threadId: job.record.threadId, priority: sendPriority)
self.jobSerializer.addOrderedSyncCompletion(tx: transaction) {
self.state.update {
$0.queueStates[queueKey, default: QueueState()].queuedOperations.append(operation)
}
self.startNextJobIfNeeded(queueKey: queueKey)
}
}
public func setUp() {
let jobRecordFinder = JobRecordFinderImpl<MessageSenderJobRecord>(db: DependenciesBridge.shared.db)
Task {
if CurrentAppContext().isMainApp {
do {
let jobRecords = try await jobRecordFinder.loadRunnableJobs(updateRunnableJobRecord: { jobRecord, tx in
self.didMarkAsReady(oldJobRecord: jobRecord, transaction: tx)
})
let jobRecordUniqueIds = Set(jobRecords.lazy.map(\.uniqueId))
self.state.update {
var newlyPendingJobs = $0.pendingJobs
newlyPendingJobs.removeAll(where: { jobRecordUniqueIds.contains($0.record.uniqueId) })
$0.pendingJobs = jobRecords.map { Job(record: $0, isInMemoryOnly: false) }
$0.pendingJobs.append(contentsOf: newlyPendingJobs)
}
} catch {
owsFailDebug("Couldn't load existing message send jobs: \(error)")
}
}
// FIXME: The returned observer token is never unregistered.
// In practice all our JobQueues live forever, so this isn't a problem.
// We use "unowned" so that don't silently fail (or leak) when this changes.
let becameReachableBlock = { [unowned self] in
self.becameReachable()
}
NotificationCenter.default.addObserver(
forName: SSKReachability.owsReachabilityDidChange,
object: nil,
queue: nil,
) { _ in
if SSKEnvironment.shared.reachabilityManagerRef.isReachable {
becameReachableBlock()
}
}
let chatConnectionOpenedBlock = { [unowned self] in
self.reportExternalRetryTrigger(.chatConnectionOpened)
}
NotificationCenter.default.addObserver(
forName: OWSChatConnection.chatConnectionStateDidChange,
object: nil,
queue: nil,
using: { note in
let connectionState = note.userInfo![OWSChatConnection.chatConnectionStateKey]! as! OWSChatConnectionState
if connectionState == .open {
chatConnectionOpenedBlock()
}
},
)
// No matter what, mark it as loaded. This keeps things semi-functional.
self.updateStateAndNotify { $0.isLoaded = true }
startPendingJobRecordsIfPossible()
}
}
func becameReachable() {
self.reportExternalRetryTrigger(.networkBecameReachable)
}
private func reportExternalRetryTrigger(_ externalRetryTrigger: ExternalRetryTriggers) {
self.state.update {
for (_, queueState) in $0.queueStates {
for activeOperation in queueState.activeOperations {
activeOperation.reportExternalRetryTrigger(externalRetryTrigger)
}
}
}
}
private func startNextJobIfNeeded(queueKey: QueueKey) {
self.updateStateAndNotify {
var queueState = $0.queueStates[queueKey, default: QueueState()]
// If nothing is running, start *any* operation that needs to be started.
if queueState.activeOperations.isEmpty {
if let nextIndex = queueState.queuedOperations.indices.first {
startNextJob(atQueuedIndex: nextIndex, forQueueKey: queueKey, in: &queueState)
}
}
// Non-media messages get an extra slot to run so that they don't get stuck
// behind media messages. If the first slot got filled by a media message,
// this one can be filled by a non-media message. If the first slot is
// filled by a non-media message, we can't schedule anything else.
// For example, if you send A, B, C, and D, where C is media and everything
// else is a text message, then only orderings ABCD and ABDC are allowed.
// This block exists to start sending "D" concurrently with "C".
if queueState.hasExactlyOneActiveOperationThatUsesTheMediaQueue {
if let nextIndex = queueState.queuedOperations.firstIndex(where: { !$0.job.record.useMediaQueue }) {
startNextJob(atQueuedIndex: nextIndex, forQueueKey: queueKey, in: &queueState)
}
}
$0.queueStates[queueKey] = queueState.isEmpty ? nil : queueState
}
}
private func startNextJob(atQueuedIndex index: Int, forQueueKey queueKey: QueueKey, in queueState: inout QueueState) {
let queuedOperation = queueState.queuedOperations.remove(at: index)
let activeOperation = ActiveOperationState(queuedOperation: queuedOperation)
queueState.activeOperations.append(activeOperation)
Task(priority: Self.taskPriority(forJobPriority: queueKey.priority)) {
await self.runOperation(activeOperation)
self.state.update {
$0.queueStates[queueKey]!.activeOperations.removeAll(where: { $0.job.record.uniqueId == activeOperation.job.record.uniqueId })
}
startNextJobIfNeeded(queueKey: queueKey)
}
}
private static func taskPriority(forJobPriority jobPriority: JobPriority) -> TaskPriority {
switch jobPriority {
case .high, .renderableContent:
return .userInitiated
case .low:
return .medium
}
}
/// Runs a job to send a particular message.
///
/// This method returns after the operation reaches a terminal result and
/// the job record has been deleted.
private func runOperation(_ operation: ActiveOperationState) async {
let result = await Result { try await self._runOperation(operation) }
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
if !operation.job.isInMemoryOnly {
operation.job.record.anyRemove(transaction: tx)
}
switch result {
case .success(()):
operation.message.updateWithSendSuccess(tx: tx)
case .failure(let error):
operation.message.updateWithAllSendingRecipientsMarkedAsFailed(error: error, tx: tx)
}
}
switch result {
case .success(()):
operation.future?.resolve()
case .failure(let error):
operation.future?.reject(error)
}
}
/// Use max-retries as a stand-in for a timeout for messages we want sent or cancelled in less than 24 hours.
/// Eventually this will be replaced with support for actual timeouts.
private func getMaxRetriesForMessageType(message: PreparedOutgoingMessage) -> Int {
if message.isPinChange {
return 2
}
return 110
}
/// Runs a job to send a particular message.
///
/// This methods returns after the operation has reached a terminal result
/// but before that result has been processed.
private func _runOperation(_ operation: ActiveOperationState) async throws {
var attemptCount = Int(operation.job.record.failureCount)
let maxRetries = getMaxRetriesForMessageType(message: operation.message)
while true {
assert(!Task.isCancelled, "Cancellation isn't supported.")
operation.clearExternalRetryTriggers()
let result = await SSKEnvironment.shared.messageSenderRef.sendMessage(operation.message)
let errors: [any Error]
let arbitraryError: any Error
switch result {
case .success:
return
case .overallFailure(let error):
errors = [error]
arbitraryError = error
case .recipientsFailure(let failure):
errors = failure.recipientErrors.map(\.error)
arbitraryError = failure.arbitraryError
}
var retryableError: (any Error)?
var externalRetryTriggers: ExternalRetryTriggers = []
var suggestedRetryDelay: TimeInterval = 0
var accountCheckerRetryDelay: TimeInterval = 0
for error in errors {
// Some errors should never be retried. Because group send is
// all-or-nothing, this means we need to fail the entire operation even
// when retries may work for other recipients.
if error.isFatalError {
throw error
}
// Keep track of the first retryable error we encounter -- we'd prefer to
// throw a retryable error rather than one that's not retryable.
if MessageSender.isRetryableError(error) {
retryableError = retryableError ?? error
}
// If there's a network failure, this is an external error, so we want to
// retry as soon as we reconnect.
if error.isNetworkFailure {
externalRetryTriggers.insert(.chatConnectionOpened)
}
// If there's a timeout, we interrupted the request ourselves, and sending
// the same request again on a new connection will typically result in the
// same outcome, so we want to perform exponential backoff before retrying.
// However, if Reachability indicates that something has changed, we might
// be on a better network, and it may be worth retrying immediately.
if error.isTimeout {
externalRetryTriggers.insert(.networkBecameReachable)
}
// If there's a Retry-After header, pick the largest one. That's when we
// expect we'll be able to complete the entire send successfully.
if let retryAfterDelay = error.httpResponseHeaders?.retryAfterTimeInterval {
suggestedRetryDelay = max(suggestedRetryDelay, retryAfterDelay)
}
if case SignalError.rateLimitedError(retryAfter: let retryAfter, message: _) = error {
suggestedRetryDelay = max(suggestedRetryDelay, retryAfter)
}
// If there's a Retry-After from the AccountChecker, we want to wait for
// the sum of the Retry-Afters. (This avoids pathological O(n^2) behavior.)
if let rateLimitError = error as? AccountChecker.RateLimitError {
accountCheckerRetryDelay += rateLimitError.retryAfter
}
}
guard let retryableError else {
throw arbitraryError
}
guard attemptCount < maxRetries else {
throw retryableError
}
attemptCount += 1
if !operation.job.isInMemoryOnly {
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
operation.job.record.addFailure(tx: tx)
}
}
// Determine the minimum amount of backoff.
let maxAverageBackoff: TimeInterval = 14.1 * .minute
let exponentialRetryDelay: TimeInterval = OWSOperation.retryIntervalForExponentialBackoff(
failureCount: attemptCount,
maxAverageBackoff: maxAverageBackoff,
)
// We pick the largest of the values -- we don't want Retry-After headers
// to be able to trigger tight retry loops on the client, so we maintain a
// minimum of exponential backoff.
let retryDelay = max(
exponentialRetryDelay,
min(maxAverageBackoff, suggestedRetryDelay),
min(maxAverageBackoff, accountCheckerRetryDelay),
)
var httpBlurb = ""
if suggestedRetryDelay > 0 {
httpBlurb += " (retry-after: \(String(format: "%.1f", suggestedRetryDelay))s)"
}
if accountCheckerRetryDelay > 0 {
httpBlurb += " (account-checker-retry-after: \(String(format: "%.1f", accountCheckerRetryDelay))s)"
}
Logger.warn("Resending \(operation.message.description) after \(String(format: "%.1f", retryDelay))s\(httpBlurb)")
try? await withCooperativeTimeout(
seconds: retryDelay,
operation: { try await operation.waitForAnyExternalRetryTrigger(fromExternalRetryTriggers: externalRetryTriggers) },
)
}
}
// MARK: - Notifications
private let doneCondition = Monitor.Condition<State>(
isSatisfied: \.isDone,
waiters: \.onDone,
)
private func updateStateAndNotify<T>(_ block: (inout State) -> T) -> T {
return Monitor.updateAndNotify(
in: state,
block: block,
conditions: doneCondition,
)
}
public func waitUntilDone() async throws(CancellationError) {
return try await Monitor.waitForCondition(doneCondition, in: state)
}
}