Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Jobs/BulkDeleteInteractionJobQueue.swift
1 views
//
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

public final class BulkDeleteInteractionJobQueue {
    private let jobRunnerFactory: BulkDeleteInteractionJobRunnerFactory
    private let jobQueueRunner: JobQueueRunner<
        JobRecordFinderImpl<BulkDeleteInteractionJobRecord>,
        BulkDeleteInteractionJobRunnerFactory,
    >
    private var jobSerializer = CompletionSerializer()

    init(
        addressableMessageFinder: DeleteForMeAddressableMessageFinder,
        db: any DB,
        interactionDeleteManager: InteractionDeleteManager,
        threadSoftDeleteManager: ThreadSoftDeleteManager,
        threadStore: ThreadStore,
    ) {
        self.jobRunnerFactory = BulkDeleteInteractionJobRunnerFactory(
            addressableMessageFinder: addressableMessageFinder,
            db: db,
            interactionDeleteManager: interactionDeleteManager,
            threadSoftDeleteManager: threadSoftDeleteManager,
            threadStore: threadStore,
        )
        self.jobQueueRunner = JobQueueRunner(
            canExecuteJobsConcurrently: false,
            db: db,
            jobFinder: JobRecordFinderImpl(db: db),
            jobRunnerFactory: self.jobRunnerFactory,
        )
    }

    func start(appContext: AppContext) {
        jobQueueRunner.start(shouldRestartExistingJobs: appContext.isMainApp)
    }

    func addJob(
        anchorMessageRowId: Int64,
        isFullThreadDelete: Bool,
        threadUniqueId: String,
        tx: DBWriteTransaction,
    ) {
        let jobRecord = BulkDeleteInteractionJobRecord(
            anchorMessageRowId: anchorMessageRowId,
            fullThreadDeletionAnchorMessageRowId: { () -> Int64? in
                if isFullThreadDelete {
                    return InteractionFinder(threadUniqueId: threadUniqueId)
                        .mostRecentRowId(tx: tx)
                }

                return nil
            }(),
            threadUniqueId: threadUniqueId,
        )

        jobRecord.anyInsert(transaction: tx)

        jobSerializer.addOrderedSyncCompletion(tx: tx) {
            self.jobQueueRunner.addPersistedJob(jobRecord)
        }
    }
}

// MARK: -

private class BulkDeleteInteractionJobRunner: JobRunner {
    typealias JobRecordType = BulkDeleteInteractionJobRecord

    private enum Constants {
        static let maxRetries: UInt = 110
        static let deletionBatchSize: Int = 500
    }

    private let addressableMessageFinder: DeleteForMeAddressableMessageFinder
    private let db: any DB
    private let interactionDeleteManager: InteractionDeleteManager
    private let threadSoftDeleteManager: ThreadSoftDeleteManager
    private let threadStore: ThreadStore

    private let logger = PrefixedLogger(prefix: "[DeleteForMe]")

    init(
        addressableMessageFinder: DeleteForMeAddressableMessageFinder,
        db: any DB,
        interactionDeleteManager: InteractionDeleteManager,
        threadSoftDeleteManager: ThreadSoftDeleteManager,
        threadStore: ThreadStore,
    ) {
        self.addressableMessageFinder = addressableMessageFinder
        self.db = db
        self.interactionDeleteManager = interactionDeleteManager
        self.threadSoftDeleteManager = threadSoftDeleteManager
        self.threadStore = threadStore
    }

    func runJobAttempt(
        _ jobRecord: BulkDeleteInteractionJobRecord,
    ) async -> JobAttemptResult<Void> {
        return await JobAttemptResult.executeBlockWithDefaultErrorHandler(
            jobRecord: jobRecord,
            retryLimit: Constants.maxRetries,
            db: db,
            block: {
                try await _runJobAttempt(jobRecord)
            },
        )
    }

    func didFinishJob(_ jobRecordId: JobRecord.RowId, result: JobResult<Void>) async {
        switch result.ranSuccessfullyOrError {
        case .success:
            break
        case .failure(let failure):
            logger.error("Failed to perform delete-for-me bulk action! \(failure)")
        }
    }

    private func _runJobAttempt(
        _ jobRecord: BulkDeleteInteractionJobRecord,
    ) async throws {
        let anchorMessageRowId = jobRecord.anchorMessageRowId
        let fullThreadDeletionAnchorMessageRowId = jobRecord.fullThreadDeletionAnchorMessageRowId
        let threadUniqueId = jobRecord.threadUniqueId

        logger.info("Attempting to bulk-delete interactions for thread \(threadUniqueId), isFullThreadDelete \(fullThreadDeletionAnchorMessageRowId != nil).")

        var deletedCount = 0
        await TimeGatedBatch.processAll(db: db) { tx -> TimeGatedBatch.ProcessBatchResult in
            let batchDeletedCount = self.deleteSomeInteractions(
                threadUniqueId: threadUniqueId,
                anchorMessageRowId: anchorMessageRowId,
                tx: tx,
            )
            deletedCount += batchDeletedCount
            return batchDeletedCount == 0 ? .done(()) : .more
        }

        logger.info("Deleted \(deletedCount) messages for thread \(threadUniqueId), isFullThreadDelete \(fullThreadDeletionAnchorMessageRowId != nil).")

        await db.awaitableWrite { tx in
            jobRecord.anyRemove(transaction: tx)

            guard
                let fullThreadDeletionAnchorMessageRowId,
                let thread = self.threadStore.fetchThread(uniqueId: threadUniqueId, tx: tx)
            else { return }

            /// At this point we've deleted all the messages at or before our
            /// view of the most-recent addressable message. Since we also know
            /// that the user's intent was a "full thread delete", we'll try and
            /// go further and additionally soft-delete the thread.
            ///
            /// This will have a couple desirable side-effects: at the time of
            /// writing, these include deleting associated story messages and
            /// hiding the thread from the chat list. Additionally, if there
            /// were any non-addressable messages that were newer than our
            /// bulk-delete anchor, those will also be deleted.
            ///
            /// Caveats:
            ///
            /// 1. We'll abort the soft-delete if there are any addressable
            ///    messages remaining. This would indicate that the user sent or
            ///    received messages newer than our bulk-delete anchor.
            ///
            /// 2. We'll abort the soft-delete if the most-recent message in the
            ///    thread now is newer than the most-recent message when we
            ///    created the bulk-delete job. This would indicate that while
            ///    all the remaining messages are non-addressable, one of them
            ///    was inserted while the bulk-delete was running.
            if
                self.addressableMessageFinder.threadContainsAnyAddressableMessages(
                    threadUniqueId: threadUniqueId,
                    tx: tx,
                )
            {
                self.logger.warn("Not doing thread soft-delete – thread contains addressable messages after delete.")
            } else if
                InteractionFinder(threadUniqueId: threadUniqueId)
                    .mostRecentRowId(tx: tx) > fullThreadDeletionAnchorMessageRowId
            {
                self.logger.warn("Not doing thread soft-delete – most recent row ID was newer than when we started delete.")
            } else {
                self.threadSoftDeleteManager.softDelete(
                    threads: [thread],
                    sendDeleteForMeSyncMessage: false,
                    tx: tx,
                )
            }
        }
    }

    /// Delete a batch of interactions.
    /// - Returns
    /// The number of interactions deleted. A return value of 0 indicates that
    /// there are no more interactions to delete.
    private func deleteSomeInteractions(
        threadUniqueId: String,
        anchorMessageRowId: Int64,
        tx: DBWriteTransaction,
    ) -> Int {
        let interactionsToDelete: [TSInteraction]
        do {
            interactionsToDelete = try InteractionFinder(
                threadUniqueId: threadUniqueId,
            ).fetchAllInteractions(
                rowIdFilter: .atOrBefore(anchorMessageRowId),
                limit: Constants.deletionBatchSize,
                tx: tx,
            )
        } catch {
            owsFailDebug("Failed to get interactions to delete!")
            return 0
        }

        if interactionsToDelete.isEmpty { return 0 }

        for interaction in interactionsToDelete {
            interactionDeleteManager.delete(
                interaction,
                sideEffects: .custom(
                    associatedCallDelete: .localDeleteOnly,
                    updateThreadOnInteractionDelete: .doNotUpdate,
                ),
                tx: tx,
            )
        }

        /// Above, we're skipping a per-interaction thread update that would
        /// otherwise set various "last visible" properties on the thread. To
        /// compensate, we'll do a single thread update at the end of each
        /// transaction (note that because we're in a `TimeGatedBatch`, we don't
        /// know how many interactions will be deleted in a single transaction).
        /// This will ensure that anyone who opens a transaction between our
        /// time-gated batches sees a thread with appropriately-updated values.
        tx.addFinalizationBlock(key: "BulkDeleteInteractionJobQueue") { tx in
            if
                let thread = self.threadStore.fetchThread(
                    uniqueId: threadUniqueId,
                    tx: tx,
                )
            {
                thread.updateOnInteractionsRemoved(
                    needsToUpdateLastInteractionRowId: true,
                    needsToUpdateLastVisibleSortId: true,
                    tx: tx,
                )
            }
        }

        return interactionsToDelete.count
    }
}

// MARK: -

private class BulkDeleteInteractionJobRunnerFactory: JobRunnerFactory {
    typealias JobRunnerType = BulkDeleteInteractionJobRunner

    private let addressableMessageFinder: DeleteForMeAddressableMessageFinder
    private let db: any DB
    private let interactionDeleteManager: InteractionDeleteManager
    private let threadSoftDeleteManager: ThreadSoftDeleteManager
    private let threadStore: ThreadStore

    init(
        addressableMessageFinder: DeleteForMeAddressableMessageFinder,
        db: any DB,
        interactionDeleteManager: InteractionDeleteManager,
        threadSoftDeleteManager: ThreadSoftDeleteManager,
        threadStore: ThreadStore,
    ) {
        self.addressableMessageFinder = addressableMessageFinder
        self.db = db
        self.interactionDeleteManager = interactionDeleteManager
        self.threadSoftDeleteManager = threadSoftDeleteManager
        self.threadStore = threadStore
    }

    func buildRunner() -> BulkDeleteInteractionJobRunner {
        return BulkDeleteInteractionJobRunner(
            addressableMessageFinder: addressableMessageFinder,
            db: db,
            interactionDeleteManager: interactionDeleteManager,
            threadSoftDeleteManager: threadSoftDeleteManager,
            threadStore: threadStore,
        )
    }
}