Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Messages/MessageSender.swift
1 views
//
// Copyright 2020 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import Foundation
import LibSignalClient

// MARK: - MessageSender

public class MessageSender {

    private var preKeyManager: PreKeyManager { DependenciesBridge.shared.preKeyManager }

    let accountChecker: AccountChecker
    private let groupSendEndorsementStore: any GroupSendEndorsementStore

    init(
        accountChecker: AccountChecker,
        groupSendEndorsementStore: any GroupSendEndorsementStore,
    ) {
        self.accountChecker = accountChecker
        self.groupSendEndorsementStore = groupSendEndorsementStore

        SwiftSingletons.register(self)
    }

    // MARK: - Creating Signal Protocol Sessions

    private func validSession(for serviceId: ServiceId, deviceId: DeviceId, tx: DBReadTransaction) throws -> LibSignalClient.SessionRecord? {
        let sessionStore = DependenciesBridge.shared.signalProtocolStoreManager.signalProtocolStore(for: .aci).sessionStore
        do {
            guard let session = try sessionStore.loadSession(forServiceId: serviceId, deviceId: deviceId, tx: tx) else {
                return nil
            }
            guard session.hasCurrentState else {
                return nil
            }
            return session
        } catch {
            switch error {
            case RecipientIdError.mustNotUsePniBecauseAciExists:
                throw error
            default:
                return nil
            }
        }
    }

    /// Establishes a session with the recipient if one doesn't already exist.
    private func createSession(
        serviceId: ServiceId,
        deviceId: PreKeyDevice,
        sealedSenderParameters: SealedSenderParameters?,
    ) async throws {
        var preKeyBundle = try await makePreKeyRequest(
            serviceId: serviceId,
            deviceId: deviceId,
            sealedSenderParameters: sealedSenderParameters,
        )

        try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
            switch deviceId {
            case .all:
                self.updateDevices(
                    serviceId: serviceId,
                    deviceIds: preKeyBundle.devices.map(\.deviceId),
                    tx: tx,
                )
            case .specific(let deviceId):
                owsAssertDebug(preKeyBundle.devices.map(\.deviceId) == [deviceId], "Server returned unexpected device bundles.")
                preKeyBundle.devices.removeAll(where: { $0.deviceId != deviceId })
                guard preKeyBundle.devices.map(\.deviceId) == [deviceId] else {
                    throw OWSAssertionError("The server didn't return a bundle for the device we requested.")
                }
            }
            try self._createSessions(for: preKeyBundle, serviceId: serviceId, tx: tx)
        }
    }

    private enum PreKeyDevice {
        case all
        case specific(DeviceId)
    }

    private func makePreKeyRequest(
        serviceId: ServiceId,
        deviceId: PreKeyDevice,
        sealedSenderParameters: SealedSenderParameters?,
    ) async throws -> SignalServiceKit.PreKeyBundle {
        // As an optimization, skip the request if an error is guaranteed.
        if willDefinitelyHaveUntrustedIdentityError(for: serviceId) {
            Logger.warn("Skipping prekey request due to untrusted identity.")
            throw UntrustedIdentityError(serviceId: serviceId)
        }

        if willLikelyHaveInvalidKeySignatureError(for: serviceId) {
            Logger.warn("Skipping prekey request due to invalid prekey signature.")

            // Check if this error is happening repeatedly. If so, return an
            // InvalidKeySignatureError as a terminal failure.
            throw InvalidKeySignatureError(serviceId: serviceId, isTerminalFailure: true)
        }

        var requestOptions: RequestMaker.Options = []

        // If we're sending a story, we can use the identified connection to fetch
        // pre keys and the unidentified connection to send the message. For other
        // types of messages, we expect unidentified message sends to fail if we
        // can't fetch pre keys via the unidentified connection.
        if let sealedSenderParameters, sealedSenderParameters.message.isStorySend {
            requestOptions.insert(.allowIdentifiedFallback)
        }

        let requestMaker = RequestMaker(
            label: "Prekey Fetch",
            serviceId: serviceId,
            canUseStoryAuth: false,
            accessKey: sealedSenderParameters?.accessKey,
            endorsement: sealedSenderParameters?.endorsement,
            authedAccount: .implicit(),
            options: requestOptions,
        )

        let deviceIdParam: String
        switch deviceId {
        case .all:
            deviceIdParam = "*"
        case .specific(let deviceId):
            deviceIdParam = String(deviceId.rawValue)
        }
        let result = try await requestMaker.makeRequest {
            return OWSRequestFactory.recipientPreKeyRequest(serviceId: serviceId, deviceId: deviceIdParam, auth: $0)
        }
        guard let responseData = result.response.responseBodyData else {
            throw OWSAssertionError("Prekey fetch missing response object.")
        }
        guard let bundle = try? JSONDecoder().decode(SignalServiceKit.PreKeyBundle.self, from: responseData) else {
            throw OWSAssertionError("Prekey fetch returned an invalid bundle.")
        }
        return bundle
    }

    private func _createSessions(
        for preKeyBundle: SignalServiceKit.PreKeyBundle,
        serviceId: ServiceId,
        tx: DBWriteTransaction,
    ) throws {
        assert(!Thread.isMainThread)

        for deviceBundle in preKeyBundle.devices {
            try _createSession(for: deviceBundle, serviceId: serviceId, identityKey: preKeyBundle.identityKey, tx: tx)
        }
    }

    private func _createSession(
        for deviceBundle: SignalServiceKit.PreKeyBundle.PreKeyDeviceBundle,
        serviceId: ServiceId,
        identityKey: IdentityKey,
        tx transaction: DBWriteTransaction,
    ) throws {
        let deviceId = deviceBundle.deviceId

        if try validSession(for: serviceId, deviceId: deviceId, tx: transaction) != nil {
            Logger.warn("Session already exists for \(serviceId), deviceId: \(deviceId).")
            return
        }

        Logger.info("Creating session for \(serviceId), deviceId: \(deviceId); signed \(deviceBundle.signedPreKey.keyId), one-time \(deviceBundle.preKey?.keyId as Optional), kyber \(deviceBundle.pqPreKey.keyId as Optional)")

        let bundle: LibSignalClient.PreKeyBundle
        if let preKey = deviceBundle.preKey {
            bundle = try LibSignalClient.PreKeyBundle(
                registrationId: deviceBundle.registrationId,
                deviceId: deviceId.uint32Value,
                prekeyId: preKey.keyId,
                prekey: preKey.publicKey,
                signedPrekeyId: deviceBundle.signedPreKey.keyId,
                signedPrekey: deviceBundle.signedPreKey.publicKey,
                signedPrekeySignature: deviceBundle.signedPreKey.signature,
                identity: identityKey,
                kyberPrekeyId: deviceBundle.pqPreKey.keyId,
                kyberPrekey: deviceBundle.pqPreKey.publicKey,
                kyberPrekeySignature: deviceBundle.pqPreKey.signature,
            )
        } else {
            bundle = try LibSignalClient.PreKeyBundle(
                registrationId: deviceBundle.registrationId,
                deviceId: deviceId.uint32Value,
                signedPrekeyId: deviceBundle.signedPreKey.keyId,
                signedPrekey: deviceBundle.signedPreKey.publicKey,
                signedPrekeySignature: deviceBundle.signedPreKey.signature,
                identity: identityKey,
                kyberPrekeyId: deviceBundle.pqPreKey.keyId,
                kyberPrekey: deviceBundle.pqPreKey.publicKey,
                kyberPrekeySignature: deviceBundle.pqPreKey.signature,
            )
        }

        do {
            let identityManager = DependenciesBridge.shared.identityManager
            let protocolAddress = ProtocolAddress(serviceId, deviceId: deviceId.uint32Value)
            try processPreKeyBundle(
                bundle,
                for: protocolAddress,
                sessionStore: DependenciesBridge.shared.signalProtocolStoreManager.signalProtocolStore(for: .aci).sessionStore,
                identityStore: identityManager.libSignalStore(for: .aci, tx: transaction),
                context: transaction,
            )
        } catch SignalError.untrustedIdentity(_), IdentityManagerError.identityKeyMismatchForOutgoingMessage {
            Logger.warn("Found untrusted identity for \(serviceId)")
            handleUntrustedIdentityKeyError(
                serviceId: serviceId,
                identityKey: identityKey,
                transaction: transaction,
            )
            throw UntrustedIdentityError(serviceId: serviceId)
        } catch SignalError.invalidSignature(_) {
            Logger.error("Invalid key signature for \(serviceId)")

            // Received this error from the server, so this could either be
            // an invalid key due to a broken client, or it may be a random
            // corruption in transit.  Mark having encountered an error for
            // this recipient so later checks can determine if this has happend
            // more than once and fail early.
            // The error thrown here is considered non-terminal which allows
            // the request to be retried.
            hadInvalidKeySignatureError(for: serviceId)
            throw InvalidKeySignatureError(serviceId: serviceId, isTerminalFailure: false)
        }
        owsAssertDebug(try validSession(for: serviceId, deviceId: deviceId, tx: transaction) != nil, "Couldn't create session.")
    }

    // MARK: - Untrusted Identities

    private func handleUntrustedIdentityKeyError(
        serviceId: ServiceId,
        identityKey: IdentityKey,
        transaction tx: DBWriteTransaction,
    ) {
        let identityManager = DependenciesBridge.shared.identityManager
        identityManager.saveIdentityKey(identityKey, for: serviceId, tx: tx)
    }

    /// If true, we expect fetching a bundle will fail no matter what it contains.
    ///
    /// If we're noLongerVerified, nothing we fetch can alter the state. The
    /// user must manually accept the new identity key and then retry the
    /// message.
    ///
    /// If we're implicit & it's not trusted, it means it changed recently. It
    /// would work if we waited a few seconds, but we want to surface the error
    /// to the user.
    ///
    /// Even though it's only a few seconds, we must not talk to the server in
    /// the implicit case because there could be many messages queued up that
    /// would all try to fetch their own bundle.
    private func willDefinitelyHaveUntrustedIdentityError(for serviceId: ServiceId) -> Bool {
        assert(!Thread.isMainThread)

        // Prekey rate limits are strict. Therefore, we want to avoid requesting
        // prekey bundles that can't be processed. After a prekey request, we might
        // not be able to process it if the new identity key isn't trusted.

        let identityManager = DependenciesBridge.shared.identityManager
        return SSKEnvironment.shared.databaseStorageRef.read { tx in
            return identityManager.untrustedIdentityForSending(
                to: SignalServiceAddress(serviceId),
                untrustedThreshold: nil,
                tx: tx,
            ) != nil
        }
    }

    // MARK: - Invalid Signatures

    private typealias InvalidSignatureCache = [ServiceId: InvalidSignatureCacheItem]
    private struct InvalidSignatureCacheItem {
        let lastErrorDate: Date
        let errorCount: UInt32
    }

    private let invalidKeySignatureCache = AtomicValue(InvalidSignatureCache(), lock: .init())

    private func hadInvalidKeySignatureError(for serviceId: ServiceId) {
        invalidKeySignatureCache.update { cache in
            var errorCount: UInt32 = 1
            if let mostRecentError = cache[serviceId] {
                errorCount = mostRecentError.errorCount + 1
            }

            cache[serviceId] = InvalidSignatureCacheItem(
                lastErrorDate: Date(),
                errorCount: errorCount,
            )
        }
    }

    private func willLikelyHaveInvalidKeySignatureError(for serviceId: ServiceId) -> Bool {
        assert(!Thread.isMainThread)

        // Similar to untrusted identity errors, when an invalid signature for a prekey
        // is encountered, it will probably be encountered for a while until the
        // target client rotates prekeys and hopfully fixes the bad signature.
        // To avoid running into prekey rate limits, remember when an error is
        // encountered and slow down sending prekey requests for this recipient.
        //
        // Additionally, there is always a chance of corruption of the prekey
        // bundle during data transmission, which would result in an invalid
        // signature of an otherwise correct bundle. To handle this rare case,
        // don't begin limiting the prekey request until after encounting the
        // second bad signature for a particular recipient.

        guard let mostRecentError = invalidKeySignatureCache.get()[serviceId] else {
            return false
        }

        let staleIdentityLifetime: TimeInterval = .minute * 5
        guard abs(mostRecentError.lastErrorDate.timeIntervalSinceNow) < staleIdentityLifetime else {

            // Error has expired, remove it to reset the count
            invalidKeySignatureCache.update { cache in
                _ = cache.removeValue(forKey: serviceId)
            }

            return false
        }

        // Let the first error go, only skip starting on the second error
        guard mostRecentError.errorCount > 1 else {
            return false
        }

        return true
    }

    // MARK: - Constructing Message Sends

    enum SendResult {
        case success

        /// Something happened before[^1] we branched based on ServiceIds, so the
        /// same Error applies to the entire attempt to send the message.
        ///
        /// [^1]: If we try to send to a group and every group member is
        /// unregistered, this is treated as an overall failure. There is an
        /// argument that this shouldn't be an error at all or should be
        /// per-recipient "recipients don't exist" errors.
        case overallFailure(any Error)

        /// We reached a point where we may have a different error for every
        /// recipient. It will often be the case that many recipients encounter the
        /// "same" error. (For example, we may use the multi-recipient endpoint and
        /// then copy the same Error object for every recipient, but we also may fan
        /// out to individual recipients, and they all may encounter their own
        /// equivalent network failure error.)
        case recipientsFailure(SendMessageFailure)
    }

    func sendMessage(_ preparedOutgoingMessage: PreparedOutgoingMessage) async -> SendResult {
        let sendFailure: SendMessageFailure?
        do {
            Logger.info("Sending \(preparedOutgoingMessage)")
            sendFailure = try await _sendMessage(preparedOutgoingMessage)
        } catch {
            Logger.warn("Couldn't send \(preparedOutgoingMessage); the overall failure is: \(error)")
            return .overallFailure(error)
        }
        if let sendFailure {
            Logger.warn("Couldn't send \(preparedOutgoingMessage); up to 3 per-recipient failures: \(sendFailure.recipientErrors.prefix(3))")
            return .recipientsFailure(sendFailure)
        }
        return .success
    }

    private func _sendMessage(_ preparedOutgoingMessage: PreparedOutgoingMessage) async throws -> SendMessageFailure? {
        await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
            preparedOutgoingMessage.updateAllUnsentRecipientsAsSending(tx: tx)
        }

        try await withThrowingTaskGroup(of: Void.self) { taskGroup in
            let uploadOperations = SSKEnvironment.shared.databaseStorageRef.read { tx in
                preparedOutgoingMessage.attachmentUploadOperations(tx: tx)
            }
            for uploadOperation in uploadOperations {
                taskGroup.addTask {
                    try await Upload.uploadQueue.run(uploadOperation)
                }
            }
            try await taskGroup.waitForAll()
        }

        return try await preparedOutgoingMessage.send(self.sendPreparedMessage(_:))
    }

    private func waitForPreKeyRotationIfNeeded() async throws {
        while let taskToWaitFor = preKeyRotationTaskIfNeeded() {
            try await taskToWaitFor.value
        }
    }

    private let pendingPreKeyRotation = AtomicValue<Task<Void, Error>?>(nil, lock: .init())

    private func preKeyRotationTaskIfNeeded() -> Task<Void, Error>? {
        return pendingPreKeyRotation.map { existingTask in
            if let existingTask {
                return existingTask
            }
            let shouldRunPreKeyRotation = SSKEnvironment.shared.databaseStorageRef.read { tx in
                preKeyManager.isAppLockedDueToPreKeyUpdateFailures(tx: tx)
            }
            if shouldRunPreKeyRotation {
                Logger.info("Rotating signed pre-key before sending message.")
                // Retry prekey update every time user tries to send a message while app is
                // disabled due to prekey update failures.
                //
                // Only try to update the signed prekey; updating it is sufficient to
                // re-enable message sending.
                return Task {
                    defer {
                        // If this succeeds, or if we hit an error, allow another attempt.
                        self.pendingPreKeyRotation.set(nil)
                    }
                    try await self.preKeyManager.rotateSignedPreKeysIfNeeded()
                }
            }
            return nil
        }
    }

    // Mark skipped recipients as such. We may skip because:
    //
    // * A recipient is no longer in the group.
    // * A recipient is blocked.
    // * A recipient is unregistered.
    // * A recipient does not have the required capability.
    private func markSkippedRecipients(
        of message: any SendableMessage,
        sendingRecipients: [ServiceId],
        tx: DBWriteTransaction,
    ) {
        let skippedRecipients = Set(message.sendingRecipientAddresses())
            .subtracting(sendingRecipients.lazy.map { SignalServiceAddress($0) })
        message.updateWithSkippedRecipients(skippedRecipients, tx: tx)
    }

    private func unsentRecipients(
        of message: any SendableMessage,
        in thread: TSThread,
        localIdentifiers: LocalIdentifiers,
        tx: DBReadTransaction,
    ) throws -> [SignalServiceAddress] {
        if message is OutgoingSyncMessage {
            return [localIdentifiers.aciAddress]
        }

        if let groupThread = thread as? TSGroupThread {
            // Send to the intersection of:
            //
            // * "sending" recipients of the message.
            // * members of the group.
            //
            // I.e. try to send a message IFF:
            //
            // * The recipient was in the group when the message was first tried to be sent.
            // * The recipient is still in the group.
            // * The recipient is in the "sending" state.

            var recipientAddresses = Set<SignalServiceAddress>()

            recipientAddresses.formUnion(message.sendingRecipientAddresses())

            // Only send to members in the latest known group member list.
            // If a member has left the group since this message was enqueued,
            // they should not receive the message.
            let groupMembership = groupThread.groupModel.groupMembership
            var currentValidRecipients = groupMembership.fullMembers

            // ...or latest known list of "additional recipients".
            if GroupManager.shouldMessageHaveAdditionalRecipients(message, groupThread: groupThread) {
                currentValidRecipients.formUnion(groupMembership.invitedMembers)
            }
            currentValidRecipients.remove(localIdentifiers.aciAddress)
            if let localPni = localIdentifiers.pni {
                currentValidRecipients.remove(SignalServiceAddress(localPni))
            }
            recipientAddresses.formIntersection(currentValidRecipients)

            let blockedAddresses = SSKEnvironment.shared.blockingManagerRef.blockedAddresses(transaction: tx)
            recipientAddresses.subtract(blockedAddresses)

            return Array(recipientAddresses)
        } else if let contactAddress = (thread as? TSContactThread)?.contactAddress {
            // Treat 1:1 sends to blocked contacts as failures.
            // If we block a user, don't send 1:1 messages to them. The UI
            // should prevent this from occurring, but in some edge cases
            // you might, for example, have a pending outgoing message when
            // you block them.
            if SSKEnvironment.shared.blockingManagerRef.isAddressBlocked(contactAddress, transaction: tx) {
                Logger.info("Skipping 1:1 send to blocked contact: \(contactAddress).")
                throw MessageSenderError.blockedContactRecipient
            } else {
                return [contactAddress]
            }
        } else {
            // Send to the intersection of:
            //
            // * "sending" recipients of the message.
            // * recipients of the thread
            //
            // I.e. try to send a message IFF:
            //
            // * The recipient was part of the thread when the message was first tried to be sent.
            // * The recipient is still part of the thread.
            // * The recipient is in the "sending" state.

            var recipientAddresses = Set(message.sendingRecipientAddresses())

            // Only send to members in the latest known thread recipients list.
            let currentValidThreadRecipients = thread.recipientAddresses(with: tx)

            recipientAddresses.formIntersection(currentValidThreadRecipients)

            let blockedAddresses = SSKEnvironment.shared.blockingManagerRef.blockedAddresses(transaction: tx)
            recipientAddresses.subtract(blockedAddresses)

            recipientAddresses.remove(localIdentifiers.aciAddress)
            if let localPni = localIdentifiers.pni {
                recipientAddresses.remove(SignalServiceAddress(localPni))
            }

            return Array(recipientAddresses)
        }
    }

    private static func partitionAddresses(_ addresses: [SignalServiceAddress]) -> ([ServiceId], [E164]) {
        var serviceIds = [ServiceId]()
        var phoneNumbers = [E164]()

        for address in addresses {
            if let serviceId = address.serviceId {
                serviceIds.append(serviceId)
            } else if let phoneNumber = address.e164 {
                phoneNumbers.append(phoneNumber)
            } else {
                owsFailDebug("Recipient has neither ServiceId nor E164.")
            }
        }

        return (serviceIds, phoneNumbers)
    }

    private func lookUpPhoneNumbers(_ phoneNumbers: [E164]) async throws {
        _ = try await SSKEnvironment.shared.contactDiscoveryManagerRef.lookUp(
            phoneNumbers: Set(phoneNumbers.lazy.map { $0.stringValue }),
            mode: .outgoingMessage,
        )
    }

    private func sendPreparedMessage(_ message: any SendableMessage) async throws -> SendMessageFailure? {
        if DependenciesBridge.shared.appExpiry.isExpired(now: Date()) {
            throw AppExpiredError()
        }
        let tsAccountManager = DependenciesBridge.shared.tsAccountManager
        _ = try tsAccountManager.registeredStateWithMaybeSneakyTransaction()
        if let message = message as? TSOutgoingMessage, !(message is TransientOutgoingMessage) {
            let databaseStorage = SSKEnvironment.shared.databaseStorageRef
            let latestCopy = databaseStorage.read { tx in
                return TSInteraction.fetchViaCache(uniqueId: message.uniqueId, transaction: tx) as? TSOutgoingMessage
            }
            guard let latestCopy, !latestCopy.wasRemotelyDeleted else {
                throw MessageDeletedBeforeSentError()
            }
        }
        if DebugFlags.messageSendsFail.get() {
            throw OWSGenericError("failure toggle is enabled")
        }
        try await waitForPreKeyRotationIfNeeded()
        let udManager = SSKEnvironment.shared.udManagerRef
        let senderCertificates = try await udManager.fetchSenderCertificates()
        let registeredState = try tsAccountManager.registeredStateWithMaybeSneakyTransaction()
        guard let localDeviceId = tsAccountManager.storedDeviceIdWithMaybeTransaction.ifValid else {
            throw OWSGenericError("missing local device id")
        }
        // Send the message.
        let sendResult = await Result(catching: {
            return try await sendPreparedMessage(
                message,
                recoveryState: OuterRecoveryState(),
                senderCertificates: senderCertificates,
                localIdentifiers: registeredState.localIdentifiers,
                localDeviceId: localDeviceId,
            )
        })
        // Send the sync message if it succeeded overall or for any recipient.
        let syncResult: Result<Void, any Error>?
        if sendResult.isSuccess || message.wasSentToAnyRecipient {
            syncResult = await Result(catching: {
                try await handleMessageSentLocally(
                    message,
                    sendResult: sendResult,
                    localIdentifiers: registeredState.localIdentifiers,
                    localDeviceId: localDeviceId,
                )
            })
        } else {
            syncResult = nil
        }
        // If we encountered an error when sending, return that.
        if let sendFailure = try sendResult.get().sendMessageFailure {
            return sendFailure
        }
        // Otherwise, if only the sync message failed, return that.
        try syncResult?.get()
        return nil
    }

    private enum SendMessageNextAction {
        /// Look up missing phone numbers & then try sending again.
        case lookUpPhoneNumbersAndTryAgain([E164])

        /// There's nothing to be sent; return an early result. NOTE: We might still
        /// need to send a sync transcript.
        case skipSend(SendMessageResult)

        /// Fetch a new set of GSEs & then try sending again.
        case fetchGroupSendEndorsementsAndTryAgain(GroupSecretParams)

        /// Perform the `sendPreparedMessage` step.
        case sendPreparedMessage(PreparedState)

        struct PreparedState {
            let serializedMessage: SerializedMessage
            let thread: TSThread
            let fanoutRecipients: Set<ServiceId>
            let sendViaSenderKey: (@Sendable () async -> [(ServiceId, any Error)])?
            let senderCertificate: SenderCertificate
            let udAccess: [ServiceId: OWSUDAccess]
            let endorsements: GroupSendEndorsements?
            let localIdentifiers: LocalIdentifiers
            let localDeviceId: DeviceId
        }
    }

    /// Certain errors are "correctable" and result in immediate retries. For
    /// example, if there's a newly-added device, we should encrypt the message
    /// for that device and try to send it immediately. However, some of these
    /// errors can *theoretically* happen ad nauseam (but they shouldn't). To
    /// avoid tight retry loops, we handle them immediately just once and then
    /// use the standard retry logic if they happen repeatedly.
    private struct OuterRecoveryState {
        var canLookUpPhoneNumbers = true
        var canRefreshExpiringGroupSendEndorsements = true
        var canUseMultiRecipientSealedSender = true
        var canHandleMultiRecipientMismatchedDevices = true

        func mutated(_ block: (inout Self) -> Void) -> Self {
            var mutableSelf = self
            block(&mutableSelf)
            return mutableSelf
        }
    }

    private struct SendMessageResult {
        let isNoteToSelf: Bool
        let sendMessageFailure: SendMessageFailure?
    }

    private func sendPreparedMessage(
        _ message: any SendableMessage,
        recoveryState: OuterRecoveryState,
        senderCertificates: SenderCertificates,
        localIdentifiers: LocalIdentifiers,
        localDeviceId: DeviceId,
    ) async throws -> SendMessageResult {
        let databaseStorage = SSKEnvironment.shared.databaseStorageRef
        let nextAction = try await databaseStorage.awaitableWrite { tx -> SendMessageNextAction in
            guard let thread = message.thread(tx: tx) else {
                throw MessageSenderError.threadMissing
            }

            try checkIfCanSendMessage(message, toThread: thread)

            let proposedAddresses = try self.unsentRecipients(of: message, in: thread, localIdentifiers: localIdentifiers, tx: tx)
            let (serviceIds, phoneNumbersToFetch) = Self.partitionAddresses(proposedAddresses)

            // If we haven't yet tried to look up phone numbers, send an asynchronous
            // request to look up phone numbers, and then try to go through this logic
            // *again* in a new transaction. Things may change for that subsequent
            // attempt, and if there's still missing phone numbers at that point, we'll
            // skip them for this message.
            if recoveryState.canLookUpPhoneNumbers, !phoneNumbersToFetch.isEmpty {
                return .lookUpPhoneNumbersAndTryAgain(phoneNumbersToFetch)
            }

            self.markSkippedRecipients(of: message, sendingRecipients: serviceIds, tx: tx)

            // In the "self-send" aka "Note to Self" special case, we only need to send
            // certain kinds of messages. (In particular, regular data messages are
            // sent via their implicit sync message only.)
            // TODO: Consider combining this with SyncTranscriptableMessage.
            if
                let contactThread = thread as? TSContactThread,
                contactThread.contactAddress == localIdentifiers.aciAddress,
                !(message is OutgoingSyncMessage),
                !(message is OutgoingCallMessage),
                !(message is OutgoingResendRequest),
                !(message is OWSOutgoingResendResponse)
            {
                owsAssertDebug(serviceIds.count == 1)
                // Don't mark self-sent messages as read (or sent) until the sync transcript is sent.
                return .skipSend(SendMessageResult(isNoteToSelf: true, sendMessageFailure: nil))
            }

            if serviceIds.isEmpty {
                // All recipients are already sent or can be skipped.
                return .skipSend(SendMessageResult(isNoteToSelf: false, sendMessageFailure: nil))
            }

            let serializedMessage = try self.buildAndRecordMessage(message, in: thread, tx: tx)

            let senderCertificate: SenderCertificate = {
                switch SSKEnvironment.shared.udManagerRef.phoneNumberSharingMode(tx: tx).orDefault {
                case .everybody:
                    return senderCertificates.defaultCert
                case .nobody:
                    return senderCertificates.uuidOnlyCert
                }
            }()

            let udAccessMap = self.fetchSealedSenderAccess(
                for: serviceIds.compactMap { $0 as? Aci },
                senderCertificate: senderCertificate,
                localIdentifiers: localIdentifiers,
                tx: tx,
            )

            let endorsements: GroupSendEndorsements?
            do {
                if let secretParams = try? ((thread as? TSGroupThread)?.groupModel as? TSGroupModelV2)?.secretParams() {
                    let threadId = thread.sqliteRowId!
                    endorsements = try fetchEndorsements(forThreadId: threadId, secretParams: secretParams, tx: tx)
                    if
                        recoveryState.canRefreshExpiringGroupSendEndorsements,
                        GroupSendEndorsements.willExpireSoon(expirationDate: endorsements?.expiration)
                    {
                        Logger.warn("Refetching GSEs for \(thread.logString) that are missing or about to expire.")
                        return .fetchGroupSendEndorsementsAndTryAgain(secretParams)
                    }
                } else {
                    endorsements = nil
                }
            } catch {
                owsFailDebug("Continuing without GSEs that couldn't be fetched: \(error)")
                endorsements = nil
            }

            let senderKeyRecipients: Set<ServiceId>
            let sendViaSenderKey: (@Sendable () async -> [(ServiceId, any Error)])?
            if thread.usesSenderKey {
                do throws(OWSAssertionError) {
                    guard recoveryState.canUseMultiRecipientSealedSender else {
                        throw OWSAssertionError("Can't use Sender Key because of a prior failure.")
                    }
                    (senderKeyRecipients, sendViaSenderKey) = try self.prepareSenderKeyMessageSend(
                        for: serviceIds,
                        in: thread,
                        message: message,
                        serializedMessage: serializedMessage,
                        endorsements: endorsements,
                        udAccessMap: udAccessMap,
                        senderCertificate: senderCertificate,
                        localIdentifiers: localIdentifiers,
                        localDeviceId: localDeviceId,
                        tx: tx,
                    )
                } catch {
                    senderKeyRecipients = []
                    sendViaSenderKey = nil

                    let notificationPresenter = SSKEnvironment.shared.notificationPresenterRef
                    notificationPresenter.notifyTestPopulation(ofErrorMessage: error.description)
                }
            } else {
                senderKeyRecipients = []
                sendViaSenderKey = nil
            }

            return .sendPreparedMessage(SendMessageNextAction.PreparedState(
                serializedMessage: serializedMessage,
                thread: thread,
                fanoutRecipients: Set(serviceIds).subtracting(senderKeyRecipients),
                sendViaSenderKey: sendViaSenderKey,
                senderCertificate: senderCertificate,
                udAccess: udAccessMap,
                endorsements: endorsements,
                localIdentifiers: localIdentifiers,
                localDeviceId: localDeviceId,
            ))
        }

        let retryRecoveryState: OuterRecoveryState

        switch nextAction {
        case .skipSend(let result):
            return result
        case .lookUpPhoneNumbersAndTryAgain(let phoneNumbers):
            try await lookUpPhoneNumbers(phoneNumbers)
            retryRecoveryState = recoveryState.mutated({ $0.canLookUpPhoneNumbers = false })
        case .fetchGroupSendEndorsementsAndTryAgain(let secretParams):
            do {
                try await SSKEnvironment.shared.groupV2UpdatesRef.refreshGroup(secretParams: secretParams)
            } catch {
                let groupId = try secretParams.getPublicParams().getGroupIdentifier()
                Logger.warn("Couldn't refresh \(groupId) to fetch GSEs: \(error)")
                // If we hit a network failure, assume fanout message sends will also fail,
                // so don't bother fanning out. Just wait.
                if error.isNetworkFailureOrTimeout {
                    throw error
                }
                // Otherwise, continue anyways. We'll fall back to a fanout when retrying,
                // and that should avoid blocking sends on weird groups edge cases.
            }
            retryRecoveryState = recoveryState.mutated({ $0.canRefreshExpiringGroupSendEndorsements = false })
        case .sendPreparedMessage(let state):
            let perRecipientErrors = await sendPreparedMessage(
                message: message,
                serializedMessage: state.serializedMessage,
                in: state.thread,
                viaFanoutTo: state.fanoutRecipients,
                viaSenderKey: state.sendViaSenderKey,
                senderCertificate: state.senderCertificate,
                udAccess: state.udAccess,
                endorsements: state.endorsements,
                localIdentifiers: state.localIdentifiers,
                localDeviceId: state.localDeviceId,
            )
            let sendMessageFailure: SendMessageFailure?
            if perRecipientErrors.isEmpty {
                sendMessageFailure = nil
            } else {
                sendMessageFailure = try await handleSendFailure(
                    message: message,
                    thread: state.thread,
                    perRecipientErrors: perRecipientErrors,
                )
            }
            if let sendMessageFailure {
                if sendMessageFailure.containsAny(of: .invalidAuthHeader) {
                    retryRecoveryState = recoveryState.mutated({ $0.canUseMultiRecipientSealedSender = false })
                    break
                }
                if recoveryState.canHandleMultiRecipientMismatchedDevices, sendMessageFailure.containsAny(of: .mismatchedDevices) {
                    retryRecoveryState = recoveryState.mutated({ $0.canHandleMultiRecipientMismatchedDevices = false })
                    break
                }
            }
            return SendMessageResult(isNoteToSelf: false, sendMessageFailure: sendMessageFailure)
        }

        return try await sendPreparedMessage(
            message,
            recoveryState: retryRecoveryState,
            senderCertificates: senderCertificates,
            localIdentifiers: localIdentifiers,
            localDeviceId: localDeviceId,
        )
    }

    private func checkIfCanSendMessage(_ message: any SendableMessage, toThread thread: TSThread) throws {
        if let thread = thread as? TSGroupThread {
            // We can't send any messages to GV1 threads.
            guard let groupModel = thread.groupModel as? TSGroupModelV2 else {
                throw OWSGenericError("can't send to gv1 thread")
            }
            // Allow group update messages for leaving groups.
            if !(message is OutgoingGroupUpdateMessage) {
                // We can't send messages to GV2 threads if we're not a member.
                guard groupModel.groupMembership.isLocalUserFullMember else {
                    throw OWSGenericError("can't send because we're not a member")
                }

                guard !groupModel.isTerminated else {
                    throw OWSGenericError("can't send because group is ended")
                }

                // We can't send most messages to GV2 "announcement-only" threads if we're
                // not an admin. See also `processFlaglessDataMessage`.
                guard
                    !groupModel.isAnnouncementsOnly
                    || (message is OutgoingReactionMessage)
                    || (message is OutgoingPollVoteMessage)
                    || (message is OutgoingDeleteMessage)
                    || groupModel.groupMembership.isLocalUserFullMemberAndAdministrator
                else {
                    throw OWSGenericError("can't send because we're not an admin")
                }
            }
        }
        // If we reach this point, we can send the message.
    }

    private func sendPreparedMessage(
        message: any SendableMessage,
        serializedMessage: SerializedMessage,
        in thread: TSThread,
        viaFanoutTo fanoutRecipients: Set<ServiceId>,
        viaSenderKey sendViaSenderKey: (@Sendable () async -> [(ServiceId, any Error)])?,
        senderCertificate: SenderCertificate,
        udAccess sendingAccessMap: [ServiceId: OWSUDAccess],
        endorsements: GroupSendEndorsements?,
        localIdentifiers: LocalIdentifiers,
        localDeviceId: DeviceId,
    ) async -> [(ServiceId, any Error)] {
        // Both types are Arrays because Sender Key Tasks may return N errors when
        // sending to N participants. (Fanout Tasks always send to one recipient
        // and will therefore return either no error or exactly one error.)
        return await withTaskGroup(
            of: [(ServiceId, any Error)].self,
            returning: [(ServiceId, any Error)].self,
        ) { taskGroup in
            if let sendViaSenderKey {
                taskGroup.addTask(operation: sendViaSenderKey)
            }

            // Perform an "OWSMessageSend" for each non-senderKey recipient.
            for serviceId in fanoutRecipients {
                let messageSend = OWSMessageSend(
                    message: message,
                    plaintextContent: serializedMessage.plaintextData,
                    plaintextPayloadId: serializedMessage.payloadId,
                    thread: thread,
                    serviceId: serviceId,
                    localIdentifiers: localIdentifiers,
                    localDeviceId: localDeviceId,
                )
                var sealedSenderParameters = SealedSenderParameters(
                    message: message,
                    senderCertificate: senderCertificate,
                    accessKey: sendingAccessMap[serviceId],
                    endorsement: endorsements?.tokenBuilder(forServiceId: serviceId),
                )
                if localIdentifiers.contains(serviceId: serviceId) {
                    owsAssertDebug(sealedSenderParameters == nil, "Can't use Sealed Sender for ourselves.")
                    sealedSenderParameters = nil
                }
                taskGroup.addTask {
                    do {
                        try await self.performMessageSend(messageSend, sealedSenderParameters: sealedSenderParameters)
                        return []
                    } catch {
                        return [(messageSend.serviceId, error)]
                    }
                }
            }

            return await taskGroup.reduce(into: [], { $0.append(contentsOf: $1) })
        }
    }

    private func fetchSealedSenderAccess(
        for acis: [Aci],
        senderCertificate: SenderCertificate,
        localIdentifiers: LocalIdentifiers,
        tx: DBReadTransaction,
    ) -> [Aci: OWSUDAccess] {
        var result = [Aci: OWSUDAccess]()
        for aci in acis {
            if localIdentifiers.contains(serviceId: aci) {
                continue
            }
            result[aci] = SSKEnvironment.shared.udManagerRef.udAccess(for: aci, tx: tx)
        }
        return result
    }

    private func fetchEndorsements(forThreadId threadId: Int64, secretParams: GroupSecretParams, tx: DBReadTransaction) throws -> GroupSendEndorsements? {
        let combinedRecord = try groupSendEndorsementStore.fetchCombinedEndorsement(groupThreadId: threadId, tx: tx)
        guard let combinedRecord else {
            return nil
        }
        let combinedEndorsement = try GroupSendEndorsement(contents: combinedRecord.endorsement)

        var individualEndorsements = [ServiceId: GroupSendEndorsement]()
        for record in try groupSendEndorsementStore.fetchIndividualEndorsements(groupThreadId: threadId, tx: tx) {
            let endorsement = try GroupSendEndorsement(contents: record.endorsement)
            let recipient = DependenciesBridge.shared.recipientDatabaseTable.fetchRecipient(rowId: record.recipientId, tx: tx)
            guard let recipient else {
                throw OWSAssertionError("Missing Recipient that must exist.")
            }
            guard let serviceId = recipient.aci ?? recipient.pni else {
                throw OWSAssertionError("Missing ServiceId that must exist.")
            }
            individualEndorsements[serviceId] = endorsement
        }
        return GroupSendEndorsements(
            secretParams: secretParams,
            expiration: combinedRecord.expiration,
            combined: combinedEndorsement,
            individual: individualEndorsements,
        )
    }

    private func handleSendFailure(
        message: any SendableMessage,
        thread: TSThread,
        perRecipientErrors allErrors: [(serviceId: ServiceId, error: any Error)],
    ) async throws -> SendMessageFailure? {
        var skippedRecipients = [ServiceId]()
        var filteredErrors = [(serviceId: ServiceId, error: any Error)]()

        for (serviceId, error) in allErrors {
            // If we're sending a group message to an account that doesn't exist, we
            // mark them as "Skipped" rather than fail the entire operation.
            if !(thread is TSContactThread), error is MessageSenderNoSuchSignalRecipientError {
                skippedRecipients.append(serviceId)
                continue
            }
            // If we're deleting our account and run into a rate limit, we mark them as
            // "Skipped" because the group update is best-effort and this mimics the
            // behavior of a user-initiated manual retry for the account deletion.
            if (message as? OutgoingGroupUpdateMessage)?.isDeletingAccount == true, error is AccountChecker.RateLimitError {
                skippedRecipients.append(serviceId)
                continue
            }
            filteredErrors.append((serviceId, error))
        }

        // Record the individual error for each "failed" recipient.
        await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
            if !skippedRecipients.isEmpty {
                message.updateWithSkippedRecipients(skippedRecipients.map { SignalServiceAddress($0) }, tx: tx)
            }
            if !filteredErrors.isEmpty {
                message.updateWithFailedRecipients(filteredErrors, tx: tx)
                self.normalizeRecipientStatesIfNeeded(message: message, recipientErrors: filteredErrors, tx: tx)
            }
        }

        // If we only received errors that we should ignore, consider this send a
        // success, unless the message could not be sent to any recipient.
        guard let sendMessageFailure = SendMessageFailure(recipientErrors: filteredErrors) else {
            if message.sentRecipientAddresses().count == 0 {
                throw MessageSenderErrorNoValidRecipients()
            }
            return nil
        }

        return sendMessageFailure
    }

    static func isRetryableError(_ error: any Error) -> Bool {
        if error.isRetryable, error.httpStatusCode != 508 {
            return true
        }
        if error.httpStatusCode == 429 {
            return true
        }
        if case SignalError.rateLimitedError(retryAfter: _, message: _) = error {
            return true
        }
        if error is AccountChecker.RateLimitError {
            return true
        }
        return false
    }

    private func normalizeRecipientStatesIfNeeded(
        message: any SendableMessage,
        recipientErrors: some Sequence<(serviceId: ServiceId, error: Error)>,
        tx: DBWriteTransaction,
    ) {
        guard
            recipientErrors.contains(where: {
                switch $0.error {
                case RecipientIdError.mustNotUsePniBecauseAciExists:
                    return true
                default:
                    return false
                }
            })
        else {
            return
        }
        let recipientStateMerger = RecipientStateMerger(
            recipientDatabaseTable: DependenciesBridge.shared.recipientDatabaseTable,
            signalServiceAddressCache: SSKEnvironment.shared.signalServiceAddressCacheRef,
        )
        message.anyUpdateOutgoingMessage(transaction: tx) { message in
            recipientStateMerger.normalize(&message.recipientAddressStates, tx: tx)
        }
    }

    /// Sending a reply to a hidden recipient unhides them. But how we
    /// define "reply" is not inclusive of all outgoing messages. We unhide
    /// when the message indicates the user's intent to resume association
    /// with the hidden recipient.
    ///
    /// It is important to be conservative about which messages unhide a
    /// recipient. It is far better to not unhide when should than to
    /// unhide when we should not.
    private func shouldMessageSendUnhideRecipient(_ message: any SendableMessage, tx: DBReadTransaction) -> Bool {
        if
            message.shouldBeSaved,
            let rowId = message.sqliteRowId,
            // Its a persisted message; check if its renderable
            message.insertedMessageHasRenderableContent(rowId: rowId, tx: tx)
        {
            return true
        }
        if message is OutgoingReactionMessage {
            return true
        }
        if
            let message = message as? OutgoingCallMessage,
            /// OWSOutgoingCallMessages include not only calling
            /// someone (ie, an "offer message"), but also sending
            /// hangup messages, busy messages, and other kinds of
            /// call-related "messages" that do not indicate the
            /// sender's intent to resume association with a recipient.
            case .offerMessage = message.messageType
        {
            return true
        }
        return false
    }

    // TODO: Remove Result<...> from the sendResult parameter.
    private func handleMessageSentLocally(
        _ message: any SendableMessage,
        sendResult: Result<SendMessageResult, any Error>,
        localIdentifiers: LocalIdentifiers,
        localDeviceId: DeviceId,
    ) async throws {
        await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
            if
                let thread = message.thread(tx: tx) as? TSContactThread,
                self.shouldMessageSendUnhideRecipient(message, tx: tx),
                !localIdentifiers.aciAddress.isEqualToAddress(thread.contactAddress)
            {
                DependenciesBridge.shared.recipientHidingManager.removeHiddenRecipient(
                    thread.contactAddress,
                    wasLocallyInitiated: true,
                    tx: tx,
                )
            }
        }

        await completeViewOnceMessageIfNeeded(message)

        try await sendSyncTranscriptIfNeeded(forMessage: message, localIdentifiers: localIdentifiers, localDeviceId: localDeviceId)

        // Don't mark self-sent messages as read (or sent) until the sync
        // transcript is sent.
        //
        // NOTE: This only applies to the 'note to self' conversation.
        if let sendResult = try? sendResult.get() {
            await markNoteToSelfMessageAsReadAndViewedIfNecessary(message, sendResult: sendResult)
        }
    }

    private func completeViewOnceMessageIfNeeded(_ message: any SendableMessage) async {
        // View once messages are never transient (transient messages don't have
        // attachments AND they're deleted immediately after being sent).
        guard let message = message as? TSOutgoingMessage, !(message is TransientOutgoingMessage) else {
            return
        }
        // Don't refetch the message unless it's view once; most messages won't be
        // and are thus able to avoid an expensive read operation.
        guard message.isViewOnceMessage else {
            return
        }
        let databaseStorage = SSKEnvironment.shared.databaseStorageRef
        await databaseStorage.awaitableWrite { tx in
            let latestMessage = TSInteraction.fetchViaCache(uniqueId: message.uniqueId, transaction: tx)
            guard let latestMessage = latestMessage as? TSOutgoingMessage else {
                Logger.warn("Could not update expiration for deleted message.")
                return
            }
            ViewOnceMessages.completeIfNecessary(message: latestMessage, transaction: tx)
        }
    }

    private func sendSyncTranscriptIfNeeded(
        forMessage message: any SendableMessage,
        localIdentifiers: LocalIdentifiers,
        localDeviceId: DeviceId,
    ) async throws {
        guard message.shouldSyncTranscript() else {
            return
        }
        try await sendSyncTranscript(forMessage: message, localIdentifiers: localIdentifiers, localDeviceId: localDeviceId)
        let databaseStorage = SSKEnvironment.shared.databaseStorageRef
        await databaseStorage.awaitableWrite { tx in
            message.update(withHasSyncedTranscript: true, transaction: tx)
        }
    }

    private func sendSyncTranscript(
        forMessage message: any SendableMessage,
        localIdentifiers: LocalIdentifiers,
        localDeviceId: DeviceId,
    ) async throws {
        let databaseStorage = SSKEnvironment.shared.databaseStorageRef
        let messageSend = try await databaseStorage.awaitableWrite { tx in
            let localThread = TSContactThread.getOrCreateThread(withContactAddress: localIdentifiers.aciAddress, transaction: tx)

            let transcript = try message.buildSyncTranscriptMessage(localThread: localThread, tx: tx)

            let serializedMessage = try buildAndRecordMessage(transcript, in: localThread, tx: tx)

            return OWSMessageSend(
                message: transcript,
                plaintextContent: serializedMessage.plaintextData,
                plaintextPayloadId: serializedMessage.payloadId,
                thread: localThread,
                serviceId: localIdentifiers.aci,
                localIdentifiers: localIdentifiers,
                localDeviceId: localDeviceId,
            )
        }
        try await performMessageSend(messageSend, sealedSenderParameters: nil)
    }

    // TODO: Remove this method; it won't be necessary with modern receipts.
    private func markNoteToSelfMessageAsReadAndViewedIfNecessary(
        _ message: any SendableMessage,
        sendResult: SendMessageResult,
    ) async {
        // Non-TSOutgoingMessage/"normal" messages never have receipts and thus
        // never need to be marked as read/viewed. However, some transient messages
        // pass through receipt updates to a corresponding "normal" message and
        // thus require receipts.
        guard let message = message as? TSOutgoingMessage, !(message is OutgoingSyncMessage) else {
            return
        }
        // Non-Note to Self messages don't require Note to Self treatment.
        guard sendResult.isNoteToSelf else {
            return
        }
        let databaseStorage = SSKEnvironment.shared.databaseStorageRef
        let tsAccountManager = DependenciesBridge.shared.tsAccountManager
        owsAssertDebug(message.recipientAddresses().count == 1)
        await databaseStorage.awaitableWrite { tx in
            guard let deviceId = tsAccountManager.storedDeviceId(tx: tx).ifValid else {
                owsFailDebug("Can't send a Note to Self message with an invalid deviceId.")
                return
            }
            for sendingAddress in message.sendingRecipientAddresses() {
                message.update(
                    withReadRecipient: sendingAddress,
                    deviceId: deviceId,
                    readTimestamp: message.timestamp,
                    tx: tx,
                )
                if message.isVoiceMessage || message.isViewOnceMessage {
                    message.update(
                        withViewedRecipient: sendingAddress,
                        deviceId: deviceId,
                        viewedTimestamp: message.timestamp,
                        tx: tx,
                    )
                }
            }
        }
    }

    // MARK: - Performing Message Sends

    struct SerializedMessage {
        let plaintextData: Data
        let payloadId: Int64?
    }

    func buildAndRecordMessage(
        _ message: any SendableMessage,
        in thread: TSThread,
        tx: DBWriteTransaction,
    ) throws -> SerializedMessage {
        let plaintextData = try message.buildPlaintextData(inThread: thread, tx: tx)
        let messageSendLog = SSKEnvironment.shared.messageSendLogRef
        let payloadId = messageSendLog.recordPayload(plaintextData, for: message, tx: tx)
        return SerializedMessage(plaintextData: plaintextData, payloadId: payloadId)
    }

    private struct InnerRecoveryState {
        var canHandleMismatchedDevices = true
        var canHandleStaleDevices = true
        var canHandleCaptcha = true

        func mutated(_ block: (inout Self) -> Void) -> Self {
            var mutableSelf = self
            block(&mutableSelf)
            return mutableSelf
        }
    }

    private let sendQueues = KeyedConcurrentTaskQueue<ServiceId>(concurrentLimitPerKey: 1)

    @discardableResult
    func performMessageSend(
        _ messageSend: OWSMessageSend,
        sealedSenderParameters: SealedSenderParameters?,
    ) async throws -> [SentDeviceMessage] {
        return try await sendQueues.run(forKey: messageSend.serviceId) {
            return try await performMessageSendAttempt(
                messageSend,
                recoveryState: InnerRecoveryState(),
                sealedSenderParameters: sealedSenderParameters,
            )
        }
    }

    private func performMessageSendAttempt(
        _ messageSend: OWSMessageSend,
        recoveryState: InnerRecoveryState,
        sealedSenderParameters: SealedSenderParameters?,
    ) async throws -> [SentDeviceMessage] {
        let message = messageSend.message
        let serviceId = messageSend.serviceId

        Logger.info("Sending message: \(type(of: message)); timestamp: \(message.timestamp); serviceId: \(serviceId)")

        let retryRecoveryState: InnerRecoveryState
        do {
            if messageSend.isSelfSend {
                owsAssertDebug(!(messageSend.serviceId is Pni), "Shouldn't send \(type(of: message)) to \(messageSend.serviceId)")
            }

            var deviceMessages = try await buildDeviceMessages(
                messageSend: messageSend,
                sealedSenderParameters: sealedSenderParameters,
            )
            if deviceMessages.isEmpty {
                if messageSend.isSelfSend {
                    // This emulates the completion logic of an actual successful send (see below).
                    await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
                        message.updateWithSkippedRecipients([SignalServiceAddress(messageSend.serviceId)], tx: tx)
                    }
                    return []
                }
                if !(messageSend.thread is TSContactThread) {
                    try checkIfAccountExistsUsingCache(serviceId: messageSend.serviceId)
                }
                try await checkIfAccountExists(serviceId: messageSend.serviceId)
                deviceMessages = try await buildDeviceMessages(
                    messageSend: messageSend,
                    sealedSenderParameters: sealedSenderParameters,
                )
            }

            for deviceMessage in deviceMessages {
                let hasValidMessageType: Bool = {
                    switch deviceMessage.type {
                    case .unidentifiedSender:
                        return sealedSenderParameters != nil
                    case .ciphertext, .prekeyBundle, .plaintextContent:
                        return sealedSenderParameters == nil
                    case .unknown, .receipt:
                        return false
                    }
                }()
                guard hasValidMessageType else {
                    throw OWSAssertionError("Invalid message type: \(deviceMessage.type)")
                }
            }

            return try await sendDeviceMessages(
                deviceMessages,
                messageSend: messageSend,
                sealedSenderParameters: sealedSenderParameters,
            )
        } catch RequestMakerUDAuthError.udAuthFailure {
            owsPrecondition(sealedSenderParameters != nil)
            // This failure can happen on pre key fetches or message sends.
            return try await performMessageSendAttempt(
                messageSend,
                recoveryState: recoveryState,
                sealedSenderParameters: nil, // Retry as an unsealed send.
            )
        } catch DeviceMessagesError.mismatchedDevices where recoveryState.canHandleMismatchedDevices {
            retryRecoveryState = recoveryState.mutated({ $0.canHandleMismatchedDevices = false })
        } catch DeviceMessagesError.staleDevices where recoveryState.canHandleStaleDevices {
            retryRecoveryState = recoveryState.mutated({ $0.canHandleStaleDevices = false })
        } catch where error.httpStatusCode == 428 && recoveryState.canHandleCaptcha {
            retryRecoveryState = recoveryState.mutated({ $0.canHandleCaptcha = false })
        }
        return try await performMessageSendAttempt(
            messageSend,
            recoveryState: retryRecoveryState,
            sealedSenderParameters: sealedSenderParameters,
        )
    }

    private let nonExistentAccountCache = AtomicValue([ServiceId: MonotonicDate](), lock: .init())

    private func checkIfAccountExists(serviceId: ServiceId) async throws {
        if !(try await self.accountChecker.checkIfAccountExists(serviceId: serviceId)) {
            nonExistentAccountCache.update { $0[serviceId] = MonotonicDate() }
            throw MessageSenderNoSuchSignalRecipientError()
        }
    }

    private func checkIfAccountExistsUsingCache(serviceId: ServiceId) throws {
        let mostRecentErrorDate = nonExistentAccountCache.update { $0[serviceId] }
        guard let mostRecentErrorDate else {
            return
        }
        let timeSinceMostRecentError = MonotonicDate() - mostRecentErrorDate
        if timeSinceMostRecentError.seconds < (6 * TimeInterval.hour) {
            throw MessageSenderNoSuchSignalRecipientError()
        }
    }

    private func buildDeviceMessages(
        messageSend: OWSMessageSend,
        sealedSenderParameters: SealedSenderParameters?,
    ) async throws -> [DeviceMessage] {
        return try await buildDeviceMessages(
            serviceId: messageSend.serviceId,
            isSelfSend: messageSend.isSelfSend,
            encryptionStyle: messageSend.message.encryptionStyle,
            buildPlaintextContent: { _, _ in messageSend.plaintextContent },
            isTransient: messageSend.message.isOnline || (messageSend.message as? OutgoingSenderKeyDistributionMessage)?.isSentOnBehalfOfOnlineMessage == true,
            sealedSenderParameters: sealedSenderParameters,
            localAci: messageSend.localIdentifiers.aci,
            localDeviceId: messageSend.localDeviceId,
        )
    }

    /// Builds ``DeviceMessage``s for a recipient.
    ///
    /// This method is heavily optimized for the fast path where a session
    /// already exists for all of the recipient's devices.
    ///
    /// - Parameters:
    ///   - serviceId: The recipient's ServiceId. This may be an ACI, a PNI, or
    ///   our own ACI. (It should never be our own PNI. Callers are expected to
    ///   enforce this invariant.)
    ///
    ///   - isSelfSend: If true, `serviceId` is our own ACI. Callers are
    ///   expected to have pre-existing knowledge of `localIdentifiers` and can
    ///   thus compute this more efficiently.
    ///
    ///   - buildPlaintextContent: Constructs the plaintext content (i.e., the
    ///   content to be encrypted) for a given `DeviceId`. This block will be
    ///   invoked once for every `DeviceId` for which a `DeviceMessage` is
    ///   returned. It may also be invoked for `DeviceId`s which aren't returned
    ///   if we can't fetch pre keys for those devices.
    ///
    ///   - isTransient: If false (the standard behavior), this method will
    ///   issue network requests to fetch pre keys to establish missing Signal
    ///   Protocol sessions. (If we don't establish a Signal Protocol session
    ///   with `serviceId`, we can't send it ANY messages.) As a rate limiting
    ///   optimization, if true, this method will never make a network request;
    ///   it will either encrypt using already-available Signal Protocol
    ///   sessions or will throw an error.
    func buildDeviceMessages(
        serviceId: ServiceId,
        isSelfSend: Bool,
        encryptionStyle: EncryptionStyle,
        buildPlaintextContent: (DeviceId, DBWriteTransaction) throws -> Data,
        isTransient: Bool,
        sealedSenderParameters: SealedSenderParameters?,
        localAci: Aci,
        localDeviceId: DeviceId,
    ) async throws -> [DeviceMessage] {
        let databaseStorage = SSKEnvironment.shared.databaseStorageRef
        let recipientDatabaseTable = DependenciesBridge.shared.recipientDatabaseTable

        var deviceMessages: [DeviceMessage]
        let missingSessionPlaintextContent: [DeviceId: Data]
        (deviceMessages, missingSessionPlaintextContent) = try await databaseStorage.awaitableWrite { tx -> ([DeviceMessage], [DeviceId: Data]) in
            let recipient = recipientDatabaseTable.fetchRecipient(serviceId: serviceId, transaction: tx)

            guard let recipient, recipient.isRegistered else {
                return ([], [:])
            }

            var deviceIds = recipient.deviceIds

            if isSelfSend {
                deviceIds.removeAll(where: { localDeviceId == $0 })
            }

            var deviceMessages = [DeviceMessage]()
            var missingSessionPlaintextContent = [DeviceId: Data]()
            for deviceId in deviceIds {
                let plaintextContent = try buildPlaintextContent(deviceId, tx)
                do {
                    deviceMessages.append(try self.buildDeviceMessage(
                        serviceId: serviceId,
                        deviceId: deviceId,
                        encryptionStyle: encryptionStyle,
                        plaintextContent: plaintextContent,
                        sealedSenderParameters: sealedSenderParameters,
                        localAci: localAci,
                        localDeviceId: localDeviceId,
                        tx: tx,
                    ))
                } catch SignalError.sessionNotFound(_) {
                    missingSessionPlaintextContent[deviceId] = plaintextContent
                }
            }

            return (deviceMessages, missingSessionPlaintextContent)
        }

        if !missingSessionPlaintextContent.isEmpty {
            if isTransient {
                // When users re-register, we don't want transient messages (like typing
                // indicators) to cause users to hit the prekey fetch rate limit. So we
                // silently discard these message if there is no pre-existing session for
                // the recipient.
                throw MessageSenderNoSessionForTransientMessageError()
            }

            // If we don't have *any* sessions, we can do less work by asking the
            // server for all of them at the same time. (This also helps establish the
            // initial list of devices when contacting someone for the first time.)
            if deviceMessages.isEmpty {
                do {
                    try await createSession(
                        serviceId: serviceId,
                        deviceId: .all,
                        sealedSenderParameters: sealedSenderParameters,
                    )
                } catch where error.httpStatusCode == 404 {
                    try await handle404(serviceId: serviceId, isSelfSend: isSelfSend)
                }
            } else {
                try await withThrowingTaskGroup { taskGroup in
                    for (deviceId, _) in missingSessionPlaintextContent {
                        taskGroup.addTask {
                            do {
                                try await self.createSession(
                                    serviceId: serviceId,
                                    deviceId: .specific(deviceId),
                                    sealedSenderParameters: sealedSenderParameters,
                                )
                            } catch where error.httpStatusCode == 404 {
                                // If we have an invalid device exception, remove this device from the
                                // recipient and suppress the error.
                                await databaseStorage.awaitableWrite { tx in
                                    self.updateDevices(
                                        serviceId: serviceId,
                                        devicesToAdd: [],
                                        devicesToRemove: [deviceId],
                                        transaction: tx,
                                    )
                                }
                            }
                        }
                    }
                    try await taskGroup.waitForAll()
                }
            }

            deviceMessages += try await databaseStorage.awaitableWrite { tx -> [DeviceMessage] in
                // Re-fetch the list of deviceIds so that we can handle devices that get
                // added/removed when fetching pre keys. (We may learn about added/removed
                // devices when fetching keys for all devices, and we may learn about
                // removed devices when fetching keys for a specific device.)
                var deviceIds = recipientDatabaseTable.fetchRecipient(serviceId: serviceId, transaction: tx)?.deviceIds ?? []
                if isSelfSend {
                    deviceIds.removeAll(where: { localDeviceId == $0 })
                }

                let missingDeviceIds = Set(deviceIds).subtracting(deviceMessages.map(\.destinationDeviceId))

                return try missingDeviceIds.map {
                    do {
                        return try self.buildDeviceMessage(
                            serviceId: serviceId,
                            deviceId: $0,
                            encryptionStyle: encryptionStyle,
                            plaintextContent: missingSessionPlaintextContent[$0] ?? buildPlaintextContent($0, tx),
                            sealedSenderParameters: sealedSenderParameters,
                            localAci: localAci,
                            localDeviceId: localDeviceId,
                            tx: tx,
                        )
                    } catch SignalError.sessionNotFound(_) {
                        // It's possible that we'll archive or delete a session we just created
                        // above before we reach this point. (For example, perhaps Storage Service
                        // will tell us that the account is no longer registered.) This should be
                        // rare, and we should be able to resolve any discrepancies by trying again
                        // with exponential backoff.
                        Logger.warn("Couldn't find session for \(serviceId) that we just created. Retrying…")
                        throw OWSRetryableMessageSenderError()
                    }
                }
            }
        }

        return deviceMessages
    }

    private func buildDeviceMessage(
        serviceId: ServiceId,
        deviceId: DeviceId,
        encryptionStyle: EncryptionStyle,
        plaintextContent: Data,
        sealedSenderParameters: SealedSenderParameters?,
        localAci: Aci,
        localDeviceId: DeviceId,
        tx: DBWriteTransaction,
    ) throws -> DeviceMessage {
        do {
            switch encryptionStyle {
            case .whisper:
                return try self.encryptMessage(
                    plaintextContent: plaintextContent,
                    destinationServiceId: serviceId,
                    destinationDeviceId: deviceId,
                    localAci: localAci,
                    localDeviceId: localDeviceId,
                    sealedSenderParameters: sealedSenderParameters,
                    transaction: tx,
                )
            case .plaintext:
                return try self.wrapPlaintextMessage(
                    plaintextContent: plaintextContent,
                    serviceId: serviceId,
                    deviceId: deviceId,
                    sealedSenderParameters: sealedSenderParameters,
                    transaction: tx,
                )
            @unknown default:
                throw OWSAssertionError("Unrecognized encryption style")
            }
        } catch IdentityManagerError.identityKeyMismatchForOutgoingMessage {
            Logger.warn("Found identity key mismatch on outgoing message to \(serviceId).\(deviceId). Archiving session before retrying...")
            let signalProtocolStoreManager = DependenciesBridge.shared.signalProtocolStoreManager
            let aciSessionStore = signalProtocolStoreManager.signalProtocolStore(for: .aci).sessionStore
            aciSessionStore.archiveSession(forServiceId: serviceId, deviceId: deviceId, tx: tx)
            throw OWSRetryableMessageSenderError()
        } catch SignalError.untrustedIdentity {
            Logger.warn("Found untrusted identity on outgoing message to \(serviceId). Wrapping error and throwing...")
            throw UntrustedIdentityError(serviceId: serviceId)
        } catch {
            switch error {
            case SignalError.sessionNotFound:
                // Callers expect this error & handle it. They will report any anomalous failures.
                break
            default:
                Logger.warn("Failed to encrypt message \(error)")
            }
            throw error
        }
    }

    private enum DeviceMessagesError: Error, IsRetryableProvider {
        case mismatchedDevices
        case staleDevices

        var isRetryableProvider: Bool { true }
    }

    private func sendDeviceMessages(
        _ deviceMessages: [DeviceMessage],
        messageSend: OWSMessageSend,
        sealedSenderParameters: SealedSenderParameters?,
    ) async throws -> [SentDeviceMessage] {
        let message = messageSend.message

        let requestMaker = RequestMaker(
            label: "Message Send",
            serviceId: messageSend.serviceId,
            canUseStoryAuth: sealedSenderParameters?.message.isStorySend == true,
            accessKey: sealedSenderParameters?.accessKey,
            endorsement: sealedSenderParameters?.endorsement,
            authedAccount: .implicit(),
            options: [],
        )

        owsAssertDebug(!message.isStorySend || sealedSenderParameters != nil, "Story messages must use Sealed Sender.")

        do {
            let result = try await requestMaker.makeRequest {
                return OWSRequestFactory.submitMessageRequest(
                    serviceId: messageSend.serviceId,
                    messages: deviceMessages,
                    timestamp: message.timestamp,
                    isOnline: message.isOnline,
                    isUrgent: message.isUrgent,
                    auth: $0,
                )
            }
            return await messageSendDidSucceed(
                messageSend,
                deviceMessages: deviceMessages,
                wasSentByUD: result.wasSentByUD,
            )
        } catch {
            return try await messageSendDidFail(
                messageSend,
                responseError: error,
                sealedSenderParameters: sealedSenderParameters,
            )
        }
    }

    private func messageSendDidSucceed(
        _ messageSend: OWSMessageSend,
        deviceMessages: [DeviceMessage],
        wasSentByUD: Bool,
    ) async -> [SentDeviceMessage] {
        let message = messageSend.message

        Logger.info("Successfully sent message: \(type(of: message)), serviceId: \(messageSend.serviceId), timestamp: \(message.timestamp), wasSentByUD: \(wasSentByUD)")

        let sentDeviceMessages = deviceMessages.map {
            return SentDeviceMessage(
                destinationDeviceId: $0.destinationDeviceId,
                destinationRegistrationId: $0.destinationRegistrationId,
            )
        }

        await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in
            deviceMessages.forEach { deviceMessage in
                if let payloadId = messageSend.plaintextPayloadId, let recipientAci = messageSend.serviceId as? Aci {
                    let messageSendLog = SSKEnvironment.shared.messageSendLogRef
                    messageSendLog.recordPendingDelivery(
                        payloadId: payloadId,
                        recipientAci: recipientAci,
                        recipientDeviceId: deviceMessage.destinationDeviceId,
                        message: message,
                        tx: transaction,
                    )
                }
            }

            message.updateWithSentRecipients([messageSend.serviceId], wasSentByUD: wasSentByUD, tx: transaction)

            if let resendResponse = message as? OWSOutgoingResendResponse {
                resendResponse.didPerformMessageSend(sentDeviceMessages, to: messageSend.serviceId, tx: transaction)
            }

            SSKEnvironment.shared.profileManagerRef.didSendOrReceiveMessage(
                serviceId: messageSend.serviceId,
                localIdentifiers: messageSend.localIdentifiers,
                tx: transaction,
            )
        }

        return sentDeviceMessages
    }

    struct MismatchedDevices: Decodable {
        let extraDevices: [DeviceId]
        let missingDevices: [DeviceId]

        fileprivate static func parse(_ responseData: Data) throws -> Self {
            return try JSONDecoder().decode(Self.self, from: responseData)
        }
    }

    struct StaleDevices: Decodable {
        let staleDevices: [DeviceId]

        fileprivate static func parse(_ responseData: Data) throws -> Self {
            return try JSONDecoder().decode(Self.self, from: responseData)
        }
    }

    private func messageSendDidFail(
        _ messageSend: OWSMessageSend,
        responseError: Error,
        sealedSenderParameters: SealedSenderParameters?,
    ) async throws -> [SentDeviceMessage] {
        let message = messageSend.message

        Logger.warn("\(type(of: message)) to \(messageSend.serviceId), timestamp: \(message.timestamp), error: \(responseError)")

        switch responseError.httpStatusCode {
        case 404:
            try await handle404(serviceId: messageSend.serviceId, isSelfSend: messageSend.isSelfSend)
        case 409:
            let response = try MismatchedDevices.parse(responseError.httpResponseData ?? Data())
            await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
                handleMismatchedDevices(
                    serviceId: messageSend.serviceId,
                    missingDevices: response.missingDevices,
                    extraDevices: response.extraDevices,
                    tx: tx,
                )
            }
            throw DeviceMessagesError.mismatchedDevices
        case 410:
            let response = try StaleDevices.parse(responseError.httpResponseData ?? Data())
            await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
                handleStaleDevices(serviceId: messageSend.serviceId, staleDevices: response.staleDevices, tx: tx)
            }
            throw DeviceMessagesError.staleDevices
        case 428:
            // SPAM TODO: Only retry messages with -hasRenderableContent
            Logger.warn("Server requested user complete spam challenge.")
            try await SSKEnvironment.shared.spamChallengeResolverRef.tryToHandleSilently(
                bodyData: responseError.httpResponseData,
                retryAfter: responseError.httpRetryAfterDate,
            )
            // The resolver has 10s to asynchronously resolve a challenge If it
            // resolves, great! We'll let MessageSender auto-retry. Otherwise, it'll be
            // marked as "pending"
            throw responseError
        default:
            throw responseError
        }
    }

    private func handle404(serviceId: ServiceId, isSelfSend: Bool) async throws -> Never {
        if !isSelfSend {
            try await checkIfAccountExists(serviceId: serviceId)
        }
        Logger.warn("Server endpoints disagree about registration status for \(serviceId). Backing off and retrying…")
        throw OWSRetryableMessageSenderError()
    }

    // MARK: - Unregistered, Missing, & Stale Devices

    func handleMismatchedDevices(serviceId: ServiceId, missingDevices: [DeviceId], extraDevices: [DeviceId], tx: DBWriteTransaction) {
        Logger.warn("Mismatched devices for \(serviceId): +\(missingDevices) -\(extraDevices)")
        self.updateDevices(
            serviceId: serviceId,
            devicesToAdd: missingDevices,
            devicesToRemove: extraDevices,
            transaction: tx,
        )
    }

    func handleStaleDevices(serviceId: ServiceId, staleDevices: [DeviceId], tx: DBWriteTransaction) {
        Logger.warn("Stale devices for \(serviceId): \(staleDevices)")
        let sessionStore = DependenciesBridge.shared.signalProtocolStoreManager.signalProtocolStore(for: .aci).sessionStore
        for staleDeviceId in staleDevices {
            sessionStore.archiveSession(forServiceId: serviceId, deviceId: staleDeviceId, tx: tx)
        }
    }

    private func updateDevices(
        serviceId: ServiceId,
        deviceIds: [DeviceId],
        tx: DBWriteTransaction,
    ) {
        let recipientFetcher = DependenciesBridge.shared.recipientFetcher
        var recipient = recipientFetcher.fetchOrCreate(serviceId: serviceId, tx: tx)
        self._updateDevices(
            serviceId: serviceId,
            recipient: &recipient,
            devicesToAdd: Array(Set(deviceIds).subtracting(recipient.deviceIds)),
            devicesToRemove: Array(Set(recipient.deviceIds).subtracting(deviceIds)),
            tx: tx,
        )
    }

    func updateDevices(
        serviceId: ServiceId,
        devicesToAdd: [DeviceId],
        devicesToRemove: [DeviceId],
        transaction tx: DBWriteTransaction,
    ) {
        let recipientFetcher = DependenciesBridge.shared.recipientFetcher
        var recipient = recipientFetcher.fetchOrCreate(serviceId: serviceId, tx: tx)
        self._updateDevices(serviceId: serviceId, recipient: &recipient, devicesToAdd: devicesToAdd, devicesToRemove: devicesToRemove, tx: tx)
    }

    private func _updateDevices(
        serviceId: ServiceId,
        recipient: inout SignalRecipient,
        devicesToAdd: [DeviceId],
        devicesToRemove: [DeviceId],
        tx: DBWriteTransaction,
    ) {
        AssertNotOnMainThread()
        owsAssertDebug(Set(devicesToAdd).isDisjoint(with: devicesToRemove))

        let recipientManager = DependenciesBridge.shared.recipientManager
        recipientManager.modifyAndSave(
            &recipient,
            deviceIdsToAdd: devicesToAdd,
            deviceIdsToRemove: devicesToRemove,
            shouldUpdateStorageService: true,
            tx: tx,
        )

        if !devicesToRemove.isEmpty {
            Logger.info("Archiving sessions for extra devices: \(devicesToRemove)")
            let sessionStore = DependenciesBridge.shared.signalProtocolStoreManager.signalProtocolStore(for: .aci).sessionStore
            for deviceId in devicesToRemove {
                sessionStore.archiveSession(forServiceId: serviceId, deviceId: deviceId, tx: tx)
            }
        }
    }

    // MARK: - Encryption

    private func encryptMessage(
        plaintextContent plainText: Data,
        destinationServiceId: ServiceId,
        destinationDeviceId: DeviceId,
        localAci: Aci,
        localDeviceId: DeviceId,
        sealedSenderParameters: SealedSenderParameters?,
        transaction: DBWriteTransaction,
    ) throws -> DeviceMessage {
        owsAssertDebug(!Thread.isMainThread)

        let paddedPlaintext = plainText.paddedMessageBody

        let serializedMessage: Data
        let messageType: SSKProtoEnvelopeType

        let identityManager = DependenciesBridge.shared.identityManager
        let signalProtocolStoreManager = DependenciesBridge.shared.signalProtocolStoreManager
        let signalProtocolStore = signalProtocolStoreManager.signalProtocolStore(for: .aci)
        let preKeyStore = signalProtocolStoreManager.preKeyStore.forIdentity(.aci)
        let protocolAddress = ProtocolAddress(destinationServiceId, deviceId: destinationDeviceId)
        let localAddress = ProtocolAddress(localAci, deviceId: localDeviceId)

        if let sealedSenderParameters {
            let secretCipher = SMKSecretSessionCipher(
                sessionStore: signalProtocolStore.sessionStore,
                preKeyStore: preKeyStore,
                signedPreKeyStore: preKeyStore,
                kyberPreKeyStore: preKeyStore,
                identityStore: try identityManager.libSignalStore(for: .aci, tx: transaction),
                senderKeyStore: SSKEnvironment.shared.senderKeyStoreRef,
            )

            serializedMessage = try secretCipher.encryptMessage(
                for: protocolAddress,
                localAddress: localAddress,
                paddedPlaintext: paddedPlaintext,
                contentHint: sealedSenderParameters.contentHint.signalClientHint,
                groupId: sealedSenderParameters.envelopeGroupId(tx: transaction),
                senderCertificate: sealedSenderParameters.senderCertificate,
                protocolContext: transaction,
            )

            messageType = .unidentifiedSender

        } else {
            let result = try signalEncrypt(
                message: paddedPlaintext,
                for: protocolAddress,
                localAddress: localAddress,
                sessionStore: signalProtocolStore.sessionStore,
                identityStore: identityManager.libSignalStore(for: .aci, tx: transaction),
                context: transaction,
            )

            switch result.messageType {
            case .whisper:
                messageType = .ciphertext
            case .preKey:
                messageType = .prekeyBundle
            case .plaintext:
                messageType = .plaintextContent
            default:
                owsFailDebug("Unrecognized message type")
                messageType = .unknown
            }

            serializedMessage = result.serialize()
        }

        // We had better have a session after encrypting for this recipient!
        let session = try signalProtocolStore.sessionStore.loadSession(
            for: protocolAddress,
            context: transaction,
        )!

        return DeviceMessage(
            type: messageType,
            destinationDeviceId: destinationDeviceId,
            destinationRegistrationId: try session.remoteRegistrationId(),
            content: serializedMessage,
        )
    }

    private func wrapPlaintextMessage(
        plaintextContent rawPlaintext: Data,
        serviceId: ServiceId,
        deviceId: DeviceId,
        sealedSenderParameters: SealedSenderParameters?,
        transaction: DBWriteTransaction,
    ) throws -> DeviceMessage {
        owsAssertDebug(!Thread.isMainThread)

        let identityManager = DependenciesBridge.shared.identityManager
        let protocolAddress = ProtocolAddress(serviceId, deviceId: deviceId)

        let plaintext = try PlaintextContent(bytes: rawPlaintext)

        let serializedMessage: Data
        let messageType: SSKProtoEnvelopeType

        if let sealedSenderParameters {
            let usmc = try UnidentifiedSenderMessageContent(
                CiphertextMessage(plaintext),
                from: sealedSenderParameters.senderCertificate,
                contentHint: sealedSenderParameters.contentHint.signalClientHint,
                groupId: sealedSenderParameters.envelopeGroupId(tx: transaction) ?? Data(),
            )
            let outerBytes = try sealedSenderEncrypt(
                usmc,
                for: protocolAddress,
                identityStore: identityManager.libSignalStore(for: .aci, tx: transaction),
                context: transaction,
            )

            serializedMessage = outerBytes
            messageType = .unidentifiedSender

        } else {
            serializedMessage = plaintext.serialize()
            messageType = .plaintextContent
        }

        guard let session = try validSession(for: serviceId, deviceId: deviceId, tx: transaction) else {
            throw SignalError.sessionNotFound("")
        }
        return DeviceMessage(
            type: messageType,
            destinationDeviceId: deviceId,
            destinationRegistrationId: try session.remoteRegistrationId(),
            content: serializedMessage,
        )
    }
}