Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Jobs/LocalUserLeaveGroupJob.swift
1 views
//
// Copyright 2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import Foundation
public import LibSignalClient

private class LocalUserLeaveGroupJobRunnerFactory: JobRunnerFactory {
    func buildRunner() -> LocalUserLeaveGroupJobRunner {
        return buildRunner(isDeletingAccount: false, future: nil)
    }

    func buildRunner(isDeletingAccount: Bool, future: Future<[Promise<Void>]>?) -> LocalUserLeaveGroupJobRunner {
        return LocalUserLeaveGroupJobRunner(isDeletingAccount: isDeletingAccount, future: future)
    }
}

private class LocalUserLeaveGroupJobRunner: JobRunner {
    private enum Constants {
        static let maxRetries: UInt = 110
    }

    private let isDeletingAccount: Bool
    private let future: Future<[Promise<Void>]>?

    init(isDeletingAccount: Bool, future: Future<[Promise<Void>]>?) {
        self.isDeletingAccount = isDeletingAccount
        self.future = future
    }

    func runJobAttempt(_ jobRecord: LocalUserLeaveGroupJobRecord) async -> JobAttemptResult<[Promise<Void>]> {
        return await JobAttemptResult.executeBlockWithDefaultErrorHandler(
            jobRecord: jobRecord,
            retryLimit: Constants.maxRetries,
            db: DependenciesBridge.shared.db,
            block: { try await _runJobAttempt(jobRecord) },
        )
    }

    func didFinishJob(_ jobRecordId: JobRecord.RowId, result: JobResult<[Promise<Void>]>) async {
        switch result.ranSuccessfullyOrError {
        case .success(let result):
            future?.resolve(result)
        case .failure(let error):
            future?.reject(error)
        }
    }

    private func _runJobAttempt(_ jobRecord: LocalUserLeaveGroupJobRecord) async throws -> [Promise<Void>] {
        if jobRecord.waitForMessageProcessing {
            try await GroupManager.waitForMessageFetchingAndProcessingWithTimeout()
        }

        let groupThread = SSKEnvironment.shared.databaseStorageRef.read { tx in
            return TSGroupThread.fetchGroupThreadViaCache(uniqueId: jobRecord.threadId, transaction: tx)
        }

        guard let groupThread, let groupModel = groupThread.groupModel as? TSGroupModelV2 else {
            throw OWSAssertionError("Missing V2 group thread for operation")
        }

        let replacementAdminAci: Aci? = try jobRecord.replacementAdminAciString.map { aciString in
            guard let aci = Aci.parseFrom(aciString: aciString) else {
                throw OWSAssertionError("Couldn't parse replacementAdminAci")
            }
            return aci
        }

        do {
            try await GroupManager.refreshGroupSendEndorsementsIfNeeded(threadId: groupThread.sqliteRowId!, groupModel: groupModel)
        } catch where !error.isNetworkFailureOrTimeout {
            Logger.warn("Tried and failed to refresh credentials; continuing anyways because credentials aren't required; error: \(error)")
        }

        let sendPromises = try await GroupManager.updateGroupV2(
            groupModel: groupModel,
            description: #fileID,
            isDeletingAccount: isDeletingAccount,
        ) { groupChangeSet in
            groupChangeSet.setShouldLeaveGroupDeclineInvite()

            // Sometimes when we leave a group we take care to assign a new admin.
            if let replacementAdminAci {
                groupChangeSet.changeRoleForMember(replacementAdminAci, role: .administrator)
            }
        }

        await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
            jobRecord.anyRemove(transaction: tx)
        }

        return sendPromises
    }
}

public class LocalUserLeaveGroupJobQueue {
    private let jobQueueRunner: JobQueueRunner<
        JobRecordFinderImpl<LocalUserLeaveGroupJobRecord>,
        LocalUserLeaveGroupJobRunnerFactory,
    >
    private var jobSerializer = CompletionSerializer()
    private let jobRunnerFactory: LocalUserLeaveGroupJobRunnerFactory

    public init(db: any DB, reachabilityManager: SSKReachabilityManager) {
        self.jobRunnerFactory = LocalUserLeaveGroupJobRunnerFactory()
        self.jobQueueRunner = JobQueueRunner(
            canExecuteJobsConcurrently: false,
            db: db,
            jobFinder: JobRecordFinderImpl(db: db),
            jobRunnerFactory: self.jobRunnerFactory,
        )
        self.jobQueueRunner.listenForReachabilityChanges(reachabilityManager: reachabilityManager)
    }

    func start(appContext: AppContext) {
        jobQueueRunner.start(shouldRestartExistingJobs: appContext.isMainApp)
    }

    // MARK: - Promises

    /// - Returns: A Promise for leaving the group whose value is a list of
    /// Promises for sending the group update message(s) about leaving the
    /// group. (See `updateGroupV2` for details.)
    public func addJob(
        groupThread: TSGroupThread,
        replacementAdminAci: Aci?,
        waitForMessageProcessing: Bool,
        isDeletingAccount: Bool,
        tx: DBWriteTransaction,
    ) -> Promise<[Promise<Void>]> {
        guard groupThread.isGroupV2Thread else {
            owsFail("[GV1] Mutations on V1 groups should be impossible!")
        }
        return Promise { future in
            addJob(
                threadId: groupThread.uniqueId,
                replacementAdminAci: replacementAdminAci,
                waitForMessageProcessing: waitForMessageProcessing,
                isDeletingAccount: isDeletingAccount,
                future: future,
                tx: tx,
            )
        }
    }

    private func addJob(
        threadId: String,
        replacementAdminAci: Aci?,
        waitForMessageProcessing: Bool,
        isDeletingAccount: Bool,
        future: Future<[Promise<Void>]>,
        tx: DBWriteTransaction,
    ) {
        let jobRecord = LocalUserLeaveGroupJobRecord(
            threadId: threadId,
            replacementAdminAci: replacementAdminAci,
            waitForMessageProcessing: waitForMessageProcessing,
        )
        jobRecord.anyInsert(transaction: tx)
        jobSerializer.addOrderedSyncCompletion(tx: tx) {
            self.jobQueueRunner.addPersistedJob(
                jobRecord,
                runner: self.jobRunnerFactory.buildRunner(isDeletingAccount: isDeletingAccount, future: future),
            )
        }
    }
}