Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Groups/GroupsV2ProfileKeyUpdater.swift
1 views
//
// Copyright 2020 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import Foundation

// Whenever we rotate our profile key, we need to update all
// v2 groups of which we are a non-pending member.

// This is laborious, but important. It is too expensive to
// do unless necessary (e.g. we don't want to check every
// group on launch), but important enough to do durably.
//
// This class has responsibility for tracking which groups
// need to be updated and for updating them.
class GroupsV2ProfileKeyUpdater {

    private let appReadiness: AppReadiness

    init(appReadiness: AppReadiness) {
        self.appReadiness = appReadiness
        NotificationCenter.default.addObserver(
            self,
            selector: #selector(reachabilityChanged),
            name: SSKReachability.owsReachabilityDidChange,
            object: nil,
        )
        NotificationCenter.default.addObserver(
            self,
            selector: #selector(didBecomeActive),
            name: .OWSApplicationDidBecomeActive,
            object: nil,
        )
    }

    // MARK: -

    @objc
    private func didBecomeActive() {
        AssertIsOnMainThread()

        appReadiness.runNowOrWhenAppDidBecomeReadyAsync {
            self.setNeedsUpdate()
        }
    }

    @objc
    private func reachabilityChanged() {
        AssertIsOnMainThread()

        appReadiness.runNowOrWhenAppDidBecomeReadyAsync {
            self.setNeedsUpdate()
        }
    }

    // MARK: -

    // Stores the list of v2 groups that we need to update with our latest profile key.
    private let keyValueStore = KeyValueStore(collection: "GroupsV2ProfileKeyUpdater")

    private func key(for groupId: Data) -> String {
        return groupId.hexadecimalString
    }

    func updateLocalProfileKeyInGroup(groupId: Data, transaction: DBWriteTransaction) {
        guard let groupThread = TSGroupThread.fetch(groupId: groupId, transaction: transaction) else {
            owsFailDebug("Missing groupThread.")
            return
        }
        self.tryToScheduleGroupForProfileKeyUpdate(groupThread: groupThread, transaction: transaction)

        transaction.addSyncCompletion {
            self.setNeedsUpdate()
        }
    }

    func scheduleAllGroupsV2ForProfileKeyUpdate(transaction: DBWriteTransaction) {
        TSThread.anyEnumerate(transaction: transaction) { thread, _ in
            guard let groupThread = thread as? TSGroupThread else {
                return
            }
            self.tryToScheduleGroupForProfileKeyUpdate(groupThread: groupThread, transaction: transaction)
        }

        // Note that we don't kick off updates yet (don't schedule tryToUpdateNext
        // for the end of the transaction) because we want to make sure that any
        // profile key update is committed to the server first. This isn't a
        // guarantee because there could *already* be a series of updates going,
        // but it helps in the common case.
    }

    private func tryToScheduleGroupForProfileKeyUpdate(groupThread: TSGroupThread, transaction: DBWriteTransaction) {
        let tsAccountManager = DependenciesBridge.shared.tsAccountManager
        guard tsAccountManager.registrationState(tx: transaction).isRegisteredPrimaryDevice else {
            return
        }
        guard let localAddress = tsAccountManager.localIdentifiers(tx: transaction)?.aciAddress else {
            owsFailDebug("missing local address")
            return
        }

        let groupMembership = groupThread.groupModel.groupMembership
        // We only need to update v2 groups of which we are a full member.
        guard groupThread.isGroupV2Thread, groupMembership.isFullMember(localAddress), !groupThread.isTerminatedGroup else {
            return
        }
        let groupId = groupThread.groupModel.groupId
        let key = self.key(for: groupId)
        self.keyValueStore.setData(groupId, key: key, transaction: transaction)
    }

    func processProfileKeyUpdates() {
        setNeedsUpdate()
    }

    private struct State {
        var isUpdating = false
        var needsUpdate = false
    }

    private let state = AtomicValue<State>(State(), lock: .init())

    private func setNeedsUpdate() {
        self.state.update { $0.needsUpdate = true }
        startUpdatingIfNeeded()
    }

    private func startUpdatingIfNeeded() {
        Task { await self._startUpdatingIfNeeded() }
    }

    private func _startUpdatingIfNeeded() async {
        let shouldStart = self.state.update {
            if $0.isUpdating || !$0.needsUpdate {
                return false
            }
            $0.isUpdating = true
            $0.needsUpdate = false
            return true
        }
        guard shouldStart else {
            // Only one update should be in flight at a time.
            return
        }
        defer {
            self.state.update { $0.isUpdating = false }
            // An external trigger might have called setNeedsUpdate while we were
            // running, after we checked for runnable jobs, but before we cleared
            // isUpdating. Check again since there might now be runnable jobs.
            startUpdatingIfNeeded()
        }
        var failureCount = 0
        while true {
            // If an external trigger called setNeedsUpdate, we'll observe anything it
            // wants us to observe during this iteration because we haven't checked
            // anything yet. (If we'd already checked, eg, isReachable, then we'd risk
            // missing the latest reachability update.)
            self.state.update { $0.needsUpdate = false }

            let tsAccountManager = DependenciesBridge.shared.tsAccountManager
            guard
                await CurrentAppContext().isMainAppAndActiveIsolated,
                !CurrentAppContext().isRunningTests,
                tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegisteredPrimaryDevice,
                SSKEnvironment.shared.reachabilityManagerRef.isReachable
            else {
                return
            }

            do {
                let databaseStorage = SSKEnvironment.shared.databaseStorageRef
                let groupIdKeys = databaseStorage.read(block: { self.keyValueStore.allKeys(transaction: $0) })
                let taskQueue = ConcurrentTaskQueue(concurrentLimit: 16)
                try await withThrowingTaskGroup(of: Void.self) { taskGroup in
                    for groupIdKey in groupIdKeys {
                        _ = taskGroup.addTaskUnlessCancelled {
                            try await taskQueue.run {
                                try Task.checkCancellation()
                                try await self._tryToUpdateNext(groupIdKey: groupIdKey)
                            }
                        }
                    }
                    try await taskGroup.waitForAll()
                }
                return
            } catch {
                failureCount += 1
                try? await Task.sleep(nanoseconds: OWSOperation.retryIntervalForExponentialBackoff(failureCount: failureCount, maxAverageBackoff: 6 * .hour).clampedNanoseconds)
            }
        }
    }

    private func _tryToUpdateNext(groupIdKey: String) async throws {
        let databaseStorage = SSKEnvironment.shared.databaseStorageRef
        guard let groupId = databaseStorage.read(block: { tx in keyValueStore.getData(groupIdKey, transaction: tx) }) else {
            return
        }
        let sendPromises: [Promise<Void>]
        do {
            sendPromises = try await self.tryToUpdate(groupId: groupId)
        } catch {
            Logger.warn("\(error)")
            switch error {
            case GroupsV2Error.localUserNotInGroup:
                // If the update is no longer necessary, skip it.
                sendPromises = []
            case let httpError as OWSHTTPError where (400...499).contains(httpError.responseStatusCode):
                // If a non-recoverable error occurs (e.g. we've been kicked out of the
                // group), give up.
                sendPromises = []
            case is CancellationError:
                throw error
            case URLError.cancelled:
                throw error
            case is OWSHTTPError:
                throw error
            case is AppExpiredError:
                throw error
            case _ where error.isNetworkFailureOrTimeout:
                throw error
            case GroupsV2Error.timeout:
                throw error
            default:
                // This should never occur. If it does, we don't want to get stuck in a
                // retry loop.
                owsFailDebug("unexpected error: \(error)")
                sendPromises = []
            }
        }

        // Mark it as complete immediately; we don't need to check this group again
        // if we get interrupted before sending the group update messages.
        await markAsComplete(groupIdKey: groupIdKey)

        // Make a best-effort attempt to wait for group update messages to be sent;
        // this adds back pressure and avoids overwhelming MessageSenderJobQueue.
        for sendPromise in sendPromises {
            try? await sendPromise.awaitableWithUncooperativeCancellationHandling()
            try Task.checkCancellation()
        }
    }

    private func markAsComplete(groupIdKey: String) async {
        await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in
            self.keyValueStore.removeValue(forKey: groupIdKey, transaction: transaction)
        }
    }

    /// - Returns: A list of Promises for sending the group update message(s).
    /// Each Promise represents sending a message to one or more recipients.
    private func tryToUpdate(groupId: Data) async throws -> [Promise<Void>] {
        let tsAccountManager = DependenciesBridge.shared.tsAccountManager
        guard let localAci = tsAccountManager.localIdentifiersWithMaybeSneakyTransaction?.aci else {
            throw OWSGenericError("missing local address")
        }

        try await SSKEnvironment.shared.messageProcessorRef.waitForFetchingAndProcessing()

        let groupModel = SSKEnvironment.shared.databaseStorageRef.read { tx in
            return TSGroupThread.fetch(groupId: groupId, transaction: tx)?.groupModel as? TSGroupModelV2
        }
        guard let groupModel, let secretParams = try? groupModel.secretParams() else {
            throw OWSGenericError("missing secret params")
        }

        // Get latest group state from service and verify that this update is still necessary.
        try Task.checkCancellation()
        // Collect the avatar state to avoid an unnecessary download in the case
        // where we've already fetched the latest avatar.
        let snapshotResponse = try await SSKEnvironment.shared.groupsV2Ref.fetchLatestSnapshot(
            secretParams: secretParams,
            justUploadedAvatars: GroupAvatarStateMap.from(groupModel: groupModel),
        )
        guard snapshotResponse.groupSnapshot.groupMembership.isFullMember(localAci) else {
            // We're not a full member, no need to update profile key.
            return []
        }
        let profileManager = SSKEnvironment.shared.profileManagerRef
        let databaseStorage = SSKEnvironment.shared.databaseStorageRef
        let profileKey = databaseStorage.read(block: profileManager.localUserProfile(tx:))?.profileKey
        guard let profileKey else {
            throw OWSGenericError("missing local profile key")
        }
        guard snapshotResponse.groupSnapshot.profileKeys[localAci] != profileKey.keyData else {
            // Group state already has our current key.
            return []
        }

        Logger.info("Updating profile key for group.")
        try Task.checkCancellation()
        return try await GroupManager.updateLocalProfileKey(groupModel: groupModel)
    }
}