Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Messages/OWSReceiptManager.swift
1 views
//
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import Foundation
import GRDB
import LibSignalClient

public protocol PendingReceiptRecorder {
    func recordPendingReadReceipt(for message: TSIncomingMessage, thread: TSThread, transaction: DBWriteTransaction)
    func recordPendingViewedReceipt(for message: TSIncomingMessage, thread: TSThread, transaction: DBWriteTransaction)
}

struct ReceiptForLinkedDevice: Codable {
    let senderAddress: SignalServiceAddress
    let messageUniqueId: String? // Only nil when decoding old values
    let messageIdTimestamp: UInt64
    let timestamp: UInt64

    init(senderAddress: SignalServiceAddress, messageUniqueId: String, messageIdTimestamp: UInt64, timestamp: UInt64) {
        self.senderAddress = senderAddress
        self.messageUniqueId = messageUniqueId
        self.messageIdTimestamp = messageIdTimestamp
        self.timestamp = timestamp
    }

    var asLinkedDeviceReadReceipt: LinkedDeviceReadReceipt? {
        guard let senderAci = senderAddress.aci else { return nil }
        return LinkedDeviceReadReceipt(
            senderAci: senderAci,
            messageUniqueId: messageUniqueId,
            messageIdTimestamp: messageIdTimestamp,
            readTimestamp: timestamp,
        )
    }

    var asLinkedDeviceViewedReceipt: LinkedDeviceViewedReceipt? {
        guard let senderAci = senderAddress.aci else { return nil }
        return LinkedDeviceViewedReceipt(
            senderAci: senderAci,
            messageUniqueId: messageUniqueId,
            messageIdTimestamp: messageIdTimestamp,
            viewedTimestamp: timestamp,
        )
    }
}

/// There are four kinds of read receipts:
///
/// * Read receipts that this client sends to linked
///   devices to inform them that a message has been read.
/// * Read receipts that this client receives from linked
///   devices that inform this client that a message has been read.
///    * These read receipts are saved so that they can be applied
///      if they arrive before the corresponding message.
/// * Read receipts that this client sends to other users
///   to inform them that a message has been read.
/// * Read receipts that this client receives from other users
///   that inform this client that a message has been read.
///    * These read receipts are saved so that they can be applied
///      if they arrive before the corresponding message.
///
/// This manager is responsible for handling and emitting all four kinds.
@objc
public class OWSReceiptManager: NSObject {

    private let appReadiness: any AppReadiness
    private let messageSenderJobQueue: MessageSenderJobQueue
    private var pendingReceiptRecorder: any PendingReceiptRecorder {
        SSKEnvironment.shared.pendingReceiptRecorderRef
    }

    private var receiptSender: ReceiptSender {
        SSKEnvironment.shared.receiptSenderRef
    }

    private var isProcessing = AtomicValue(false, lock: .init())

    static let keyValueStore = KeyValueStore(collection: "OWSReadReceiptManagerCollection")
    private static let toLinkedDevicesReadReceiptMapStore = KeyValueStore(collection: "OWSReceiptManager.toLinkedDevicesReadReceiptMapStore")
    private static let toLinkedDevicesViewedReceiptMapStore = KeyValueStore(collection: "OWSReceiptManager.toLinkedDevicesViewedReceiptMapStore")

    private static let kOwsReceiptManagerAreReadReceiptsEnabled = "areReadReceiptsEnabled"

    init(
        appReadiness: any AppReadiness,
        databaseStorage: SDSDatabaseStorage,
        messageSenderJobQueue: MessageSenderJobQueue,
        notificationPresenter: NotificationPresenter,
    ) {
        self.appReadiness = appReadiness
        self.messageSenderJobQueue = messageSenderJobQueue

        super.init()

        SwiftSingletons.register(self)

        self.appReadiness.runNowOrWhenAppDidBecomeReadyAsync { [self] in
            scheduleProcessing()
        }
    }

    /// Schedules a processing pass, unless one is already scheduled.
    private func scheduleProcessing() {
        owsAssertDebug(appReadiness.isAppReady)

        Task(priority: .medium) {
            do {
                try isProcessing.transition(from: false, to: true)
            } catch {
                return
            }
            await processReceiptsForLinkedDevices()
            do {
                try isProcessing.transition(from: true, to: false)
            } catch {
                owsFailDebug("someone else overwrote isProcessing while we were processing")
            }
        }
    }

    // MARK: - Locally Read

    @objc
    public func messageWasRead(_ message: TSIncomingMessage, thread: TSThread, circumstance: OWSReceiptCircumstance, transaction: DBWriteTransaction) {
        switch circumstance {
        case .onLinkedDevice:
            break
        case .onLinkedDeviceWhilePendingMessageRequest:
            if Self.areReadReceiptsEnabled(transaction: transaction) {
                pendingReceiptRecorder.recordPendingReadReceipt(for: message, thread: thread, transaction: transaction)
            }
        case .onThisDevice:
            enqueueLinkedDeviceReadReceipt(forMessage: message, transaction: transaction)
            transaction.addSyncCompletion { self.scheduleProcessing() }
            if message.authorAddress.isLocalAddress {
                owsFailDebug("We don't support incoming messages from self.")
                return
            }
            guard let authorAci = self.authorAci(forMessage: message, tx: transaction) else {
                Logger.warn("Dropping receipt for message without an Aci.")
                return
            }
            if Self.areReadReceiptsEnabled(transaction: transaction) {
                receiptSender.enqueueReadReceipt(for: authorAci, timestamp: message.timestamp, messageUniqueId: message.uniqueId, tx: transaction)
            }
        case .onThisDeviceWhilePendingMessageRequest:
            enqueueLinkedDeviceReadReceipt(forMessage: message, transaction: transaction)
            if Self.areReadReceiptsEnabled(transaction: transaction) {
                pendingReceiptRecorder.recordPendingReadReceipt(for: message, thread: thread, transaction: transaction)
            }
        }
    }

    @objc
    public func messageWasViewed(_ message: TSIncomingMessage, thread: TSThread, circumstance: OWSReceiptCircumstance, transaction: DBWriteTransaction) {
        switch circumstance {
        case .onLinkedDevice:
            break
        case .onLinkedDeviceWhilePendingMessageRequest:
            if Self.areReadReceiptsEnabled(transaction: transaction) {
                pendingReceiptRecorder.recordPendingViewedReceipt(for: message, thread: thread, transaction: transaction)
            }
        case .onThisDevice:
            enqueueLinkedDeviceViewedReceipt(forIncomingMessage: message, transaction: transaction)
            transaction.addSyncCompletion { self.scheduleProcessing() }
            if message.authorAddress.isLocalAddress {
                owsFailDebug("We don't support incoming messages from self.")
                return
            }
            guard let authorAci = self.authorAci(forMessage: message, tx: transaction) else {
                Logger.warn("Dropping receipt for message without an Aci.")
                return
            }
            if Self.areReadReceiptsEnabled(transaction: transaction) {
                receiptSender.enqueueViewedReceipt(for: authorAci, timestamp: message.timestamp, messageUniqueId: message.uniqueId, tx: transaction)
            }
        case .onThisDeviceWhilePendingMessageRequest:
            enqueueLinkedDeviceViewedReceipt(forIncomingMessage: message, transaction: transaction)
            if Self.areReadReceiptsEnabled(transaction: transaction) {
                pendingReceiptRecorder.recordPendingViewedReceipt(for: message, thread: thread, transaction: transaction)
            }
        }
    }

    private func authorAci(forMessage message: TSIncomingMessage, tx: DBReadTransaction) -> Aci? {
        if let authorAddressAci = message.authorAddress.aci {
            // By far the most common case.
            return authorAddressAci
        }
        if let authorAddressPhoneNumber = message.authorAddress.phoneNumber {
            let recipientDatabaseTable = DependenciesBridge.shared.recipientDatabaseTable
            return recipientDatabaseTable.fetchRecipient(phoneNumber: authorAddressPhoneNumber, transaction: tx)?.aci
        }
        return nil
    }

    public func storyWasRead(_ storyMessage: StoryMessage, circumstance: OWSReceiptCircumstance, transaction: DBWriteTransaction) {
        switch circumstance {
        case .onLinkedDevice:
            break
        case .onLinkedDeviceWhilePendingMessageRequest:
            owsFailDebug("Unexpectedly had story receipt blocked by message request.")
        case .onThisDeviceWhilePendingMessageRequest:
            owsFailDebug("Unexpectedly had story receipt blocked by message request.")
        case .onThisDevice:
            // We only send read receipts to linked devices, not to the author.
            enqueueLinkedDeviceReadReceipt(forStoryMessage: storyMessage, transaction: transaction)
            transaction.addSyncCompletion { self.scheduleProcessing() }
        }
    }

    public func storyWasViewed(_ storyMessage: StoryMessage, circumstance: OWSReceiptCircumstance, transaction: DBWriteTransaction) {
        switch circumstance {
        case .onLinkedDevice:
            break
        case .onLinkedDeviceWhilePendingMessageRequest:
            owsFailDebug("Unexpectedly had story receipt blocked by message request.")
        case .onThisDeviceWhilePendingMessageRequest:
            owsFailDebug("Unexpectedly had story receipt blocked by message request.")
        case .onThisDevice:
            enqueueLinkedDeviceViewedReceipt(forStoryMessage: storyMessage, transaction: transaction)
            transaction.addSyncCompletion { self.scheduleProcessing() }

            if StoryManager.areViewReceiptsEnabled {
                enqueueSenderViewedReceipt(forStoryMessage: storyMessage, transaction: transaction)
            }
        }
    }

    public func incomingGiftWasRedeemed(_ incomingMessage: TSIncomingMessage, transaction: DBWriteTransaction) {
        enqueueLinkedDeviceViewedReceipt(forIncomingMessage: incomingMessage, transaction: transaction)
        transaction.addSyncCompletion { self.scheduleProcessing() }
    }

    public func outgoingGiftWasOpened(_ outgoingMessage: TSOutgoingMessage, transaction: DBWriteTransaction) {
        enqueueLinkedDeviceViewedReceipt(forOutgoingMessage: outgoingMessage, transaction: transaction)
        transaction.addSyncCompletion { self.scheduleProcessing() }
    }

    // MARK: - Settings

    public static func areReadReceiptsEnabled(transaction: DBReadTransaction) -> Bool {
        keyValueStore.getBool(kOwsReceiptManagerAreReadReceiptsEnabled, defaultValue: false, transaction: transaction)
    }

    public func setAreReadReceiptsEnabledWithSneakyTransactionAndSyncConfiguration(_ value: Bool) {
        Logger.info("setAreReadReceiptsEnabledWithSneakyTransactionAndSyncConfiguration: \(value)")
        SSKEnvironment.shared.databaseStorageRef.write { self.setAreReadReceiptsEnabled(value, transaction: $0) }
        SSKEnvironment.shared.syncManagerRef.sendConfigurationSyncMessage()
        SSKEnvironment.shared.storageServiceManagerRef.recordPendingLocalAccountUpdates()
    }

    public func setAreReadReceiptsEnabled(_ value: Bool, transaction: DBWriteTransaction) {
        Self.keyValueStore.setBool(value, key: Self.kOwsReceiptManagerAreReadReceiptsEnabled, transaction: transaction)
    }
}

// MARK: -

extension OWSReceiptManager {

    private func processReceiptsForLinkedDevices(transaction: DBWriteTransaction) -> Bool {
        let readReceiptsForLinkedDevices: [ReceiptForLinkedDevice]
        do {
            readReceiptsForLinkedDevices = try Self.toLinkedDevicesReadReceiptMapStore.allCodableValues(transaction: transaction)
        } catch {
            owsFailDebug("Error: \(error).")
            return false
        }

        let viewedReceiptsForLinkedDevices: [ReceiptForLinkedDevice]
        do {
            viewedReceiptsForLinkedDevices = try Self.toLinkedDevicesViewedReceiptMapStore.allCodableValues(transaction: transaction)
        } catch {
            owsFailDebug("Error: \(error).")
            return false
        }

        guard !readReceiptsForLinkedDevices.isEmpty || !viewedReceiptsForLinkedDevices.isEmpty else {
            return false
        }

        guard let thread = TSContactThread.getOrCreateLocalThread(transaction: transaction) else {
            owsFailDebug("Missing thread.")
            return false
        }

        if !readReceiptsForLinkedDevices.isEmpty {
            let readReceiptsToSend = readReceiptsForLinkedDevices.compactMap { $0.asLinkedDeviceReadReceipt }
            if !readReceiptsToSend.isEmpty {
                let message = OutgoingReadReceiptsSyncMessage(
                    localThread: thread,
                    readReceipts: readReceiptsToSend,
                    tx: transaction,
                )
                let preparedMessage = PreparedOutgoingMessage.preprepared(transientMessageWithoutAttachments: message)
                messageSenderJobQueue.add(message: preparedMessage, transaction: transaction)
            }
            Self.toLinkedDevicesReadReceiptMapStore.removeAll(transaction: transaction)
        }

        if !viewedReceiptsForLinkedDevices.isEmpty {
            let viewedReceiptsToSend = viewedReceiptsForLinkedDevices.compactMap { $0.asLinkedDeviceViewedReceipt }
            if !viewedReceiptsToSend.isEmpty {
                let message = OutgoingViewedReceiptsSyncMessage(
                    localThread: thread,
                    viewedReceipts: viewedReceiptsToSend,
                    tx: transaction,
                )
                let preparedMessage = PreparedOutgoingMessage.preprepared(transientMessageWithoutAttachments: message)
                messageSenderJobQueue.add(message: preparedMessage, transaction: transaction)
            }
            Self.toLinkedDevicesViewedReceiptMapStore.removeAll(transaction: transaction)
        }

        return true
    }

    func processReceiptsForLinkedDevices() async {
        let didWork = await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { self.processReceiptsForLinkedDevices(transaction: $0) }

        if didWork {
            // Wait N seconds before processing read receipts again.
            // This allows time for a batch to accumulate.
            //
            // We want a value high enough to allow us to effectively de-duplicate,
            // read receipts without being so high that we risk not sending read
            // receipts due to app exit.
            do {
                try await Task.sleep(nanoseconds: 3 * NSEC_PER_SEC)
                await processReceiptsForLinkedDevices()
            } catch {}
        }
    }

    func enqueueLinkedDeviceReadReceipt(
        forMessage message: TSIncomingMessage,
        transaction: DBWriteTransaction,
    ) {
        let threadUniqueId = message.uniqueThreadId

        let messageAuthorAddress = message.authorAddress
        assert(messageAuthorAddress.isValid)

        let newReadReceipt = ReceiptForLinkedDevice(
            senderAddress: messageAuthorAddress,
            messageUniqueId: message.uniqueId,
            messageIdTimestamp: message.timestamp,
            timestamp: Date.ows_millisecondTimestamp(),
        )

        do {
            if
                let oldReadReceipt: ReceiptForLinkedDevice = try Self.toLinkedDevicesReadReceiptMapStore.getCodableValue(forKey: threadUniqueId, transaction: transaction),
                oldReadReceipt.messageIdTimestamp > newReadReceipt.messageIdTimestamp
            {
                // If there's an existing "linked device" read receipt for the same thread with
                // a newer timestamp, discard this "linked device" read receipt.
            } else {
                try Self.toLinkedDevicesReadReceiptMapStore.setCodable(newReadReceipt, key: threadUniqueId, transaction: transaction)
            }
        } catch {
            owsFailDebug("Error: \(error).")
        }
    }

    func enqueueLinkedDeviceViewedReceipt(
        forIncomingMessage message: TSIncomingMessage,
        transaction: DBWriteTransaction,
    ) {

        self.enqueueLinkedDeviceViewedReceipt(
            messageAuthorAddress: message.authorAddress,
            messageUniqueId: message.uniqueId,
            messageIdTimestamp: message.timestamp,
            transaction: transaction,
        )
    }

    func enqueueLinkedDeviceViewedReceipt(
        forOutgoingMessage message: TSOutgoingMessage,
        transaction: DBWriteTransaction,
    ) {

        guard let localAddress = DependenciesBridge.shared.tsAccountManager.localIdentifiers(tx: transaction)?.aciAddress else {
            owsFailDebug("no local address")
            return
        }

        self.enqueueLinkedDeviceViewedReceipt(
            messageAuthorAddress: localAddress,
            messageUniqueId: message.uniqueId,
            messageIdTimestamp: message.timestamp,
            transaction: transaction,
        )
    }

    func enqueueLinkedDeviceReadReceipt(
        forStoryMessage message: StoryMessage,
        transaction: DBWriteTransaction,
    ) {
        guard !message.authorAddress.isSystemStoryAddress else {
            Logger.info("Not sending linked device read receipt for system story")
            return
        }

        assert(message.authorAddress.isValid)

        let newReadReceipt = ReceiptForLinkedDevice(
            senderAddress: message.authorAddress,
            messageUniqueId: message.uniqueId,
            messageIdTimestamp: message.timestamp,
            timestamp: Date.ows_millisecondTimestamp(),
        )

        // Unlike message read receipts, we send every story message read receipt requested.
        // On the caller side of things, we may choose to only send a read receipt for the latest
        // known message per story context at the time of reading.
        // On the receiving end we keep track of the latest read timestamp per context and should
        // be fine whether we send every read receipt or just the latest; its purely a bandwidth/perf difference.
        do {
            try Self.toLinkedDevicesReadReceiptMapStore.setCodable(newReadReceipt, key: message.uniqueId, transaction: transaction)
        } catch {
            owsFailDebug("Error: \(error)")
        }
    }

    func enqueueLinkedDeviceViewedReceipt(
        forStoryMessage message: StoryMessage,
        transaction: DBWriteTransaction,
    ) {
        guard !message.authorAddress.isSystemStoryAddress else {
            Logger.info("Not sending linked device viewed receipt for system story")
            return
        }

        self.enqueueLinkedDeviceViewedReceipt(
            messageAuthorAddress: message.authorAddress,
            messageUniqueId: message.uniqueId,
            messageIdTimestamp: message.timestamp,
            transaction: transaction,
        )
    }

    func enqueueSenderViewedReceipt(
        forStoryMessage message: StoryMessage,
        transaction: DBWriteTransaction,
    ) {
        guard !message.authorAddress.isSystemStoryAddress else {
            Logger.info("Not sending sender viewed receipt for system story")
            return
        }
        guard !message.authorAddress.isLocalAddress else {
            Logger.info("We don't support incoming messages from self.")
            return
        }

        receiptSender.enqueueViewedReceipt(
            for: message.authorAci,
            timestamp: message.timestamp,
            messageUniqueId: message.uniqueId,
            tx: transaction,
        )
    }

    private func enqueueLinkedDeviceViewedReceipt(
        messageAuthorAddress: SignalServiceAddress,
        messageUniqueId: String,
        messageIdTimestamp: UInt64,
        transaction: DBWriteTransaction,
    ) {

        assert(messageAuthorAddress.isValid)

        let newViewedReceipt = ReceiptForLinkedDevice(
            senderAddress: messageAuthorAddress,
            messageUniqueId: messageUniqueId,
            messageIdTimestamp: messageIdTimestamp,
            timestamp: Date.ows_millisecondTimestamp(),
        )

        // Unlike read receipts, we must send *every* viewed receipt, so we use
        // `message.uniqueId` as the key. If you read message N, we can assume that
        // messages [0, N-1] have also been read, and this is reflected in the UI
        // via the unread marker. However, if you view message N (whether it's view
        // once, voice note, etc.), this has no bearing on whether or not you've
        // viewed other messages in the chat.
        do {
            try Self.toLinkedDevicesViewedReceiptMapStore.setCodable(newViewedReceipt, key: messageUniqueId, transaction: transaction)
        } catch {
            owsFailDebug("Error: \(error)")
        }
    }

    private func processReceiptsFromLinkedDevice<T>(
        _ receiptProtos: [T],
        senderAci: KeyPath<T, String?>,
        senderAciBinary: KeyPath<T, Data?>,
        messageTimestamp: KeyPath<T, UInt64>,
        tx: DBWriteTransaction,
        markMessage: (TSMessage) -> Void,
        markStoryMessage: (StoryMessage) -> Void,
    ) -> [T] {
        var earlyReceiptProtos = [T]()
        let messageTimestamps = receiptProtos.map { $0[keyPath: messageTimestamp] }
        Logger.info("Handling \(receiptProtos.count) \(T.self)(s) w/timestamps: \(messageTimestamps)")
        for receiptProto in receiptProtos {
            guard
                let senderAci = Aci.parseFrom(
                    serviceIdBinary: receiptProto[keyPath: senderAciBinary],
                    serviceIdString: receiptProto[keyPath: senderAci],
                )
            else {
                owsFailDebug("Missing ACI.")
                continue
            }
            let messageTimestamp = receiptProto[keyPath: messageTimestamp]
            guard messageTimestamp > 0, SDS.fitsInInt64(messageTimestamp) else {
                owsFailDebug("Invalid timestamp.")
                continue
            }

            let interactions: [TSInteraction]
            do {
                interactions = try InteractionFinder.fetchInteractions(
                    timestamp: messageTimestamp,
                    transaction: tx,
                )
            } catch {
                owsFailDebug("Error loading interactions: \(error)")
                interactions = []
            }

            let messages = interactions.compactMap({ $0 as? TSMessage }).filter {
                switch $0 {
                case is TSOutgoingMessage:
                    return senderAci == DependenciesBridge.shared.tsAccountManager.localIdentifiers(tx: tx)?.aci
                case let incomingMessage as TSIncomingMessage:
                    return senderAci == incomingMessage.authorAddress.serviceId
                default:
                    return false
                }
            }

            if !messages.isEmpty {
                messages.forEach { markMessage($0) }
                continue
            }

            let storyMessage = StoryFinder.story(timestamp: messageTimestamp, author: senderAci, transaction: tx)
            if let storyMessage {
                markStoryMessage(storyMessage)
                continue
            }

            earlyReceiptProtos.append(receiptProto)
        }
        return earlyReceiptProtos
    }

    func processReadReceiptsFromLinkedDevice(
        _ readReceiptProtos: [SSKProtoSyncMessageRead],
        readTimestamp: UInt64,
        tx: DBWriteTransaction,
    ) -> [SSKProtoSyncMessageRead] {
        return processReceiptsFromLinkedDevice(
            readReceiptProtos,
            senderAci: \.senderAci,
            senderAciBinary: \.senderAciBinary,
            messageTimestamp: \.timestamp,
            tx: tx,
            markMessage: {
                markMessageAsReadOnLinkedDevice($0, readTimestamp: readTimestamp, tx: tx)
            },
            markStoryMessage: {
                $0.markAsRead(at: readTimestamp, circumstance: .onLinkedDevice, transaction: tx)
            },
        )
    }

    func processViewedReceiptsFromLinkedDevice(
        _ viewedReceiptProtos: [SSKProtoSyncMessageViewed],
        viewedTimestamp: UInt64,
        tx: DBWriteTransaction,
    ) -> [SSKProtoSyncMessageViewed] {
        return processReceiptsFromLinkedDevice(
            viewedReceiptProtos,
            senderAci: \.senderAci,
            senderAciBinary: \.senderAciBinary,
            messageTimestamp: \.timestamp,
            tx: tx,
            markMessage: {
                markMessageAsViewedOnLinkedDevice($0, viewedTimestamp: viewedTimestamp, tx: tx)
            },
            markStoryMessage: {
                $0.markAsViewed(at: viewedTimestamp, circumstance: .onLinkedDevice, transaction: tx)
            },
        )
    }

    // MARK: - Mark as read

    public func markAsReadLocally(
        beforeSortId sortId: UInt64,
        thread: TSThread,
        hasPendingMessageRequest: Bool,
        completion: @escaping () -> Void,
    ) {
        DispatchQueue.global().async {
            let interactionFinder = InteractionFinder(threadUniqueId: thread.uniqueId)

            let hasMessagesToMarkRead = SSKEnvironment.shared.databaseStorageRef.read { transaction in
                return interactionFinder.hasMessagesToMarkRead(
                    beforeSortId: sortId,
                    transaction: transaction,
                )
            }
            guard hasMessagesToMarkRead else {
                // Avoid unnecessary writes.
                DispatchQueue.main.async(execute: completion)
                return
            }

            let localAci = DependenciesBridge.shared.tsAccountManager.localIdentifiersWithMaybeSneakyTransaction?.aci
            let readTimestamp = Date.ows_millisecondTimestamp()
            let maxBatchSize = 500

            let circumstance: OWSReceiptCircumstance
            let logSuffix: String
            if hasPendingMessageRequest {
                circumstance = .onThisDeviceWhilePendingMessageRequest
                logSuffix = " while pending message request"
            } else {
                circumstance = .onThisDevice
                logSuffix = ""
            }
            Logger.info("Marking received messages and sent messages with reactions as read locally\(logSuffix) (in batches of \(maxBatchSize))")

            var batchQuotaRemaining: Int
            repeat {
                batchQuotaRemaining = maxBatchSize
                SSKEnvironment.shared.databaseStorageRef.write { transaction in
                    var cursor = interactionFinder.fetchUnreadMessages(
                        beforeSortId: sortId,
                        transaction: transaction,
                    )
                    do {
                        while batchQuotaRemaining > 0, let readItem = try cursor.next() {
                            readItem.markAsRead(
                                atTimestamp: readTimestamp,
                                thread: thread,
                                circumstance: circumstance,
                                shouldClearNotifications: true,
                                transaction: transaction,
                            )
                            batchQuotaRemaining -= 1
                        }
                    } catch {
                        owsFailDebug("unexpected failure fetching unread messages: \(error)")
                        // Bail out of the outer loop by leaving the quota > 0;
                        // we're likely to hit the error multiple times.
                    }
                }
                // Continue until we process a batch and have some quota left.
            } while
                batchQuotaRemaining == 0

            // Mark outgoing messages with unread reactions as well.
            repeat {
                batchQuotaRemaining = maxBatchSize
                SSKEnvironment.shared.databaseStorageRef.write { transaction in
                    var receiptsForMessage: [LinkedDeviceReadReceipt] = []
                    var cursor = interactionFinder.fetchMessagesWithUnreadReactions(
                        beforeSortId: sortId,
                        transaction: transaction,
                    )

                    do {
                        while batchQuotaRemaining > 0, let message = try cursor.next() {
                            message.markUnreadReactionsAsRead(transaction: transaction)

                            if let localAci {
                                let receipt = LinkedDeviceReadReceipt(
                                    senderAci: localAci,
                                    messageUniqueId: message.uniqueId,
                                    messageIdTimestamp: message.timestamp,
                                    readTimestamp: readTimestamp,
                                )
                                receiptsForMessage.append(receipt)
                            }

                            batchQuotaRemaining -= 1
                        }
                    } catch {
                        owsFailDebug("unexpected failure fetching messages with unread reactions: \(error)")
                        // Bail out of the outer loop by leaving the quota > 0;
                        // we're likely to hit the error multiple times.
                    }

                    if !receiptsForMessage.isEmpty {
                        guard let localThread = TSContactThread.getOrCreateLocalThread(transaction: transaction) else {
                            owsFailDebug("Couldn't create localThread.")
                            return
                        }
                        let message = OutgoingReadReceiptsSyncMessage(
                            localThread: localThread,
                            readReceipts: receiptsForMessage,
                            tx: transaction,
                        )
                        let preparedMessage = PreparedOutgoingMessage.preprepared(
                            transientMessageWithoutAttachments: message,
                        )
                        self.messageSenderJobQueue.add(message: preparedMessage, transaction: transaction)
                    }
                }
                // Continue until we process a batch and have some quota left.
            } while batchQuotaRemaining == 0

            DispatchQueue.main.async(execute: completion)
        }
    }

    func markAsRead(
        beforeSortId sortId: UInt64,
        thread: TSThread,
        readTimestamp: UInt64,
        circumstance: OWSReceiptCircumstance,
        shouldClearNotifications: Bool,
        transaction: DBWriteTransaction,
    ) -> [String] {
        owsAssertDebug(sortId > 0)

        var readUniqueIds = [String]()
        let interactionFinder = InteractionFinder(threadUniqueId: thread.uniqueId)
        var cursor = interactionFinder.fetchUnreadMessages(
            beforeSortId: sortId,
            transaction: transaction,
        )
        do {
            while let readItem = try cursor.next() {
                readItem.markAsRead(
                    atTimestamp: readTimestamp,
                    thread: thread,
                    circumstance: circumstance,
                    shouldClearNotifications: shouldClearNotifications,
                    transaction: transaction,
                )
                readUniqueIds.append(readItem.uniqueId)
            }
        } catch {
            owsFailDebug("unexpected failure fetching unread messages: \(error)")
            return []
        }

        return readUniqueIds
    }

    func markMessageAsReadOnLinkedDevice(
        _ message: TSMessage,
        readTimestamp: UInt64,
        tx: DBWriteTransaction,
    ) {
        switch message {
        case let incomingMessage as TSIncomingMessage:
            guard let thread = message.thread(tx: tx) else {
                break
            }
            let circumstance = linkedDeviceReceiptCircumstance(for: thread, tx: tx)

            // Always re-mark the message as read to ensure any earlier read time is
            // applied to disappearing messages.
            incomingMessage.markAsRead(
                atTimestamp: readTimestamp,
                thread: thread,
                circumstance: circumstance,
                // Do not automatically clear notifications; we will do so below.
                shouldClearNotifications: false,
                transaction: tx,
            )

            // Also mark any unread messages appearing earlier in the thread as read.
            let markedAsReadIds = self.markAsRead(
                beforeSortId: incomingMessage.sortId,
                thread: thread,
                readTimestamp: readTimestamp,
                circumstance: circumstance,
                // Do not automatically clear notifications; we will do so below.
                shouldClearNotifications: false,
                transaction: tx,
            )

            // Clear notifications for all the now-marked-read messages in one batch.
            SSKEnvironment.shared.notificationPresenterRef.cancelNotifications(messageIds: [incomingMessage.uniqueId] + markedAsReadIds)
        case let outgoingMessage as TSOutgoingMessage:
            // Outgoing messages are always "read", but if we get a receipt
            // from our linked device about one that indicates that any reactions
            // we received on this message should also be marked read.
            outgoingMessage.markUnreadReactionsAsRead(transaction: tx)
        default:
            owsFailDebug("Message was neither incoming nor outgoing!")
        }
    }

    func markMessageAsViewedOnLinkedDevice(_ message: TSMessage, viewedTimestamp: UInt64, tx: DBWriteTransaction) {
        if message.giftBadge != nil {
            message.anyUpdateMessage(transaction: tx) { obj in
                switch obj {
                case let incomingMessage as TSIncomingMessage:
                    incomingMessage.giftBadge?.redemptionState = .redeemed
                case let outgoingMessage as TSOutgoingMessage:
                    outgoingMessage.giftBadge?.redemptionState = .opened
                default:
                    owsFailDebug("Unexpected giftBadge message")
                }
            }
            return
        }

        switch message {
        case let incomingMessage as TSIncomingMessage:
            guard let thread = message.thread(tx: tx) else {
                break
            }
            let circumstance = linkedDeviceReceiptCircumstance(for: thread, tx: tx)
            incomingMessage.markAsViewed(
                atTimestamp: viewedTimestamp,
                thread: thread,
                circumstance: circumstance,
                transaction: tx,
            )
        default:
            break
        }
    }

    private func linkedDeviceReceiptCircumstance(for thread: TSThread, tx: DBReadTransaction) -> OWSReceiptCircumstance {
        if thread.hasPendingMessageRequest(transaction: tx) {
            return .onLinkedDeviceWhilePendingMessageRequest
        } else {
            return .onLinkedDevice
        }
    }

    static func markAllCallInteractionsAsReadLocally(
        beforeSQLId sqlId: NSNumber?, /* Clears everything if nil */
        thread: TSThread,
        transaction tx: DBWriteTransaction,
    ) {
        var sql = """
        UPDATE \(InteractionRecord.databaseTableName)
        \(DEBUG_INDEXED_BY("index_model_TSInteraction_UnreadMessages"))
        SET read = 1
        WHERE \(interactionColumn: .read) = 0
        AND \(interactionColumn: .threadUniqueId) = ?
        AND \(interactionColumn: .recordType) = ?
        """
        var arguments: StatementArguments = [thread.uniqueId, SDSRecordType.call.rawValue]
        if let sqlId {
            sql += " AND \(interactionColumn: .id) <= ?"
            arguments += [sqlId]
        }
        failIfThrows {
            try tx.database.execute(
                sql: sql,
                arguments: arguments,
            )
        }
    }
}

// MARK: -

extension OWSReceiptManager {
    /// Fetches outgoing messages that need to have incoming receipts applied to them.
    private func outgoingMessages(sentAt timestamp: UInt64, tx: DBReadTransaction) -> [TSOutgoingMessage] {
        let interactions: [TSInteraction]
        do {
            interactions = try InteractionFinder.fetchInteractions(timestamp: timestamp, transaction: tx)
        } catch {
            owsFailDebug("Error loading interactions: \(error)")
            interactions = []
        }

        let result = interactions.compactMap({ $0 as? TSOutgoingMessage })

        if result.count > 1 {
            Logger.error("More than one matching message with timestamp: \(timestamp)")
        }

        return result
    }

    /// Processes a bundle of `sentTimestamps` from a receipt from another user.
    ///
    /// - Returns: A subset of `sentTimestamps` that don't have corresponding
    /// messages. These should be persisted by the caller since the messages
    /// might arrive after the receipts.
    private func processReceiptsForMessages(
        sentAt sentTimestamps: [UInt64],
        tx: DBReadTransaction,
        handleTimestampMessages: (UInt64, [TSOutgoingMessage]) -> Bool,
    ) -> [UInt64] {
        return sentTimestamps.filter { sentTimestamp in
            let messages = outgoingMessages(sentAt: sentTimestamp, tx: tx)
            return !handleTimestampMessages(sentTimestamp, messages)
        }
    }

    /// Processes a bundle of delivery receipts from another user.
    ///
    /// - Returns: A subset of `sentTimestamps` that don't have corresponding
    /// messages. These should be persisted by the caller since the messages
    /// might arrive after the receipts.
    func processDeliveryReceipts(
        from recipientServiceId: ServiceId,
        recipientDeviceId: DeviceId,
        sentTimestamps: [UInt64],
        deliveryTimestamp: UInt64,
        context: DeliveryReceiptContext,
        tx: DBWriteTransaction,
    ) -> [UInt64] {
        return processReceiptsForMessages(sentAt: sentTimestamps, tx: tx) { _, messages in
            if !messages.isEmpty {
                for message in messages {
                    message.update(
                        withDeliveredRecipient: SignalServiceAddress(recipientServiceId),
                        deviceId: recipientDeviceId,
                        deliveryTimestamp: deliveryTimestamp,
                        context: context,
                        tx: tx,
                    )
                }
                return true
            }
            return false
        }
    }

    /// Processes a bundle of read receipts from another user.
    ///
    /// - Returns: A subset of `sentTimestamps` that don't have corresponding
    /// messages. These should be persisted by the caller since the messages
    /// might arrive after the receipts.
    func processReadReceipts(
        from recipientAci: Aci,
        recipientDeviceId: DeviceId,
        sentTimestamps: [UInt64],
        readTimestamp: UInt64,
        tx: DBWriteTransaction,
    ) -> [UInt64] {
        guard Self.areReadReceiptsEnabled(transaction: tx) else {
            return []
        }
        return processReceiptsForMessages(sentAt: sentTimestamps, tx: tx) { _, messages in
            if !messages.isEmpty {
                // TODO: We might also need to "mark as read by recipient" any older messages
                // from us in that thread. Or maybe this state should hang on the thread?
                for message in messages {
                    message.update(
                        withReadRecipient: SignalServiceAddress(recipientAci),
                        deviceId: recipientDeviceId,
                        readTimestamp: readTimestamp,
                        tx: tx,
                    )
                }
                return true
            }
            return false
        }
    }

    /// Processes a bundle of viewed receipts from another user.
    ///
    /// - Returns: A subset of `sentTimestamps` that don't have corresponding
    /// messages. These should be persisted by the caller since the messages
    /// might arrive after the receipts.
    func processViewedReceipts(
        from recipientAci: Aci,
        recipientDeviceId: DeviceId,
        sentTimestamps: [UInt64],
        viewedTimestamp: UInt64,
        tx: DBWriteTransaction,
    ) -> [UInt64] {
        return processReceiptsForMessages(sentAt: sentTimestamps, tx: tx) { sentTimestamp, messages in
            if !messages.isEmpty {
                if Self.areReadReceiptsEnabled(transaction: tx) {
                    for message in messages {
                        message.update(
                            withViewedRecipient: SignalServiceAddress(recipientAci),
                            deviceId: recipientDeviceId,
                            viewedTimestamp: viewedTimestamp,
                            tx: tx,
                        )
                    }
                } else {
                    Logger.info("Ignoring incoming receipt message as read receipts are disabled.")
                }
                return true
            }
            let localAci = DependenciesBridge.shared.tsAccountManager.localIdentifiers(tx: tx)!.aci
            let storyMessage = StoryFinder.story(timestamp: sentTimestamp, author: localAci, transaction: tx)
            if let storyMessage {
                if StoryManager.areViewReceiptsEnabled {
                    storyMessage.markAsViewed(
                        at: viewedTimestamp,
                        by: recipientAci,
                        transaction: tx,
                    )
                } else {
                    Logger.info("Ignoring incoming story receipt message as view receipts are disabled.")
                }
                return true
            }
            return false
        }
    }
}