Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Backups/Archiving/BackupArchiveFullTextSearchIndexer.swift
1 views
//
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import GRDB

public protocol BackupArchiveFullTextSearchIndexer {

    /// Index all searchable threads.
    /// Does not cover message contents (or mentions in messages)
    ///
    /// Done synchronously with the actual backup (in the same transaction) because
    /// its cheap compared to messages (p99 thread count is relatively small).
    func indexThreads(tx: DBWriteTransaction)

    /// Schedule work to index message contents for all messages that have been inserted
    /// until this point. Future messages can index themselves upon insertion while this
    /// job runs.
    func scheduleMessagesJob(tx: DBWriteTransaction) throws
}

public class BackupArchiveFullTextSearchIndexerImpl: BackupArchiveFullTextSearchIndexer {

    private let appReadiness: AppReadiness
    private let dateProvider: DateProviderMonotonic
    private let db: any DB
    private let interactionStore: InteractionStore
    private let kvStore: KeyValueStore
    private let logger: PrefixedLogger
    private let searchableNameIndexer: SearchableNameIndexer
    private let taskQueue: SerialTaskQueue

    public init(
        appReadiness: AppReadiness,
        dateProvider: @escaping DateProviderMonotonic,
        db: any DB,
        interactionStore: InteractionStore,
        searchableNameIndexer: SearchableNameIndexer,
    ) {
        self.appReadiness = appReadiness
        self.dateProvider = dateProvider
        self.db = db
        self.interactionStore = interactionStore
        self.kvStore = KeyValueStore(collection: "BackupFullTextSearchIndexerImpl")
        self.logger = PrefixedLogger(prefix: "[Backups]")
        self.searchableNameIndexer = searchableNameIndexer
        self.taskQueue = SerialTaskQueue()

        appReadiness.runNowOrWhenAppDidBecomeReadyAsync { [self] in
            taskQueue.enqueue { [self] in
                try await runMessagesJobIfNeeded()
            }
        }
    }

    public func indexThreads(tx: DBWriteTransaction) {
        searchableNameIndexer.indexThreads(tx: tx)
    }

    public func scheduleMessagesJob(tx: DBWriteTransaction) throws {
        setMinInteractionRowIdExclusive(nil, tx: tx)
        let maxInteractionRowId = try Int64.fetchOne(
            tx.database,
            sql: """
            SELECT max(\(TSInteractionSerializer.idColumn.columnName))
            FROM \(TSInteraction.table.tableName);
            """,
        )
        if let maxInteractionRowId {
            setMaxInteractionRowIdInclusive(maxInteractionRowId, tx: tx)
            tx.addSyncCompletion {
                self.taskQueue.enqueue { [weak self] in
                    try await self?.runMessagesJobIfNeeded()
                }
            }
        }
    }

    private func runMessagesJobIfNeeded() async throws {
        guard appReadiness.isAppReady else {
            return
        }

        // This value is set once when we schedule the job, and won't change
        // across multiple runs of the job.
        guard
            let maxInteractionRowIdInclusive = db.read(block: { tx in
                maxInteractionRowIdInclusive(tx: tx)
            })
        else {
            // No job to run
            return
        }

        logger.info("Starting job")

        struct TxContext {
            let interactionCursor: AnyCursor<InteractionRecord>
            var maxInteractionRowIdSoFar: Int64?
        }
        await TimeGatedBatch.processAll(
            db: db,
            yieldTxAfter: 0.1,
            delayTwixtTx: 0.2,
            buildTxContext: { tx -> TxContext in
                let minInteractionRowIdExclusive = minInteractionRowIdExclusive(tx: tx)

                let interactionCursor = interactionStore.fetchCursor(
                    minRowIdExclusive: minInteractionRowIdExclusive,
                    maxRowIdInclusive: maxInteractionRowIdInclusive,
                    tx: tx,
                )

                return TxContext(
                    interactionCursor: interactionCursor,
                    maxInteractionRowIdSoFar: nil,
                )
            },
            processBatch: { tx, context -> TimeGatedBatch.ProcessBatchResult<Void> in
                let interactionRecord: InteractionRecord? = failIfThrows {
                    try context.interactionCursor.next()
                }

                guard let interactionRecord else {
                    return .done(())
                }

                context.maxInteractionRowIdSoFar = interactionRecord.id!

                let interaction: TSInteraction
                do {
                    interaction = try TSInteraction.fromRecord(interactionRecord)
                } catch {
                    // Skip this interaction and move on. It's already been
                    // popped from the cursor and we've recorded its row ID, so
                    // we'll skip it going forward.
                    logger.warn("Failed to create interaction from record! \(error)")
                    return .more
                }

                index(interaction, tx: tx)
                return .more
            },
            concludeTx: { tx, context in
                guard let maxInteractionRowIdSoFar = context.maxInteractionRowIdSoFar else {
                    // No interactions processed!
                    return
                }

                if maxInteractionRowIdSoFar >= maxInteractionRowIdInclusive {
                    // We made it to the end of the cursor, which means the end
                    // of the set of interactions at the time the job was
                    // scheduled. We're done!
                    setMaxInteractionRowIdInclusive(nil, tx: tx)
                    setMinInteractionRowIdExclusive(nil, tx: tx)
                    logger.info("Finished!")
                } else {
                    // The batch completed but there's more to do: update our
                    // lower bound, so the next batch starts here.
                    setMinInteractionRowIdExclusive(maxInteractionRowIdSoFar, tx: tx)
                }
            },
        )
    }

    private func index(_ interaction: TSInteraction, tx: DBWriteTransaction) {
        guard let message = interaction as? TSMessage else {
            return
        }

        FullTextSearchIndexer.insert(message, tx: tx)

        if let bodyRanges = message.bodyRanges {
            let uniqueMentionedAcis = Set(bodyRanges.mentions.values)
            for mentionedAci in uniqueMentionedAcis {
                let mention = TSMention(uniqueMessageId: message.uniqueId, uniqueThreadId: message.uniqueThreadId, aci: mentionedAci)
                failIfThrows {
                    try mention.save(tx.database)
                }
            }
        }
    }

    // MARK: - State

    private func setMinInteractionRowIdExclusive(_ newValue: Int64?, tx: DBWriteTransaction) {
        if let newValue {
            kvStore.setInt64(newValue, key: Constants.minInteractionRowIdKey, transaction: tx)
        } else {
            kvStore.removeValue(forKey: Constants.minInteractionRowIdKey, transaction: tx)
        }
    }

    private func minInteractionRowIdExclusive(tx: DBReadTransaction) -> Int64? {
        kvStore.getInt64(Constants.minInteractionRowIdKey, transaction: tx)
    }

    private func setMaxInteractionRowIdInclusive(_ newValue: Int64?, tx: DBWriteTransaction) {
        if let newValue {
            kvStore.setInt64(newValue, key: Constants.maxInteractionRowIdKey, transaction: tx)
        } else {
            kvStore.removeValue(forKey: Constants.maxInteractionRowIdKey, transaction: tx)
        }
    }

    private func maxInteractionRowIdInclusive(tx: DBReadTransaction) -> Int64? {
        kvStore.getInt64(Constants.maxInteractionRowIdKey, transaction: tx)
    }

    private enum Constants {
        /// Exclusive; this marke the last interaction row id we already indexed.
        static let minInteractionRowIdKey = "minInteractionRowIdKey"
        /// Inclusive; this marks the highest unindexed row id.
        static let maxInteractionRowIdKey = "maxInteractionRowIdKey"
    }
}