Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Groups/GroupV2UpdatesImpl.swift
1 views
//
// Copyright 2020 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import Foundation
public import LibSignalClient

public class GroupV2UpdatesImpl: GroupV2Updates {

    // This tracks the last time that groups were updated to the current
    // revision.
    private static let groupRefreshStore = KeyValueStore(collection: "groupRefreshStore")

    private var lastSuccessfulRefreshMap = LRUCache<GroupIdentifier, Date>(maxSize: 256)

    private let operationQueue = ConcurrentTaskQueue(concurrentLimit: 1)

    init(appReadiness: AppReadiness) {
        SwiftSingletons.register(self)
    }

    // MARK: -

    // On launch, we refresh a randomly-selected group.
    public func autoRefreshGroup() async throws(CancellationError) {
        let tsAccountManager = DependenciesBridge.shared.tsAccountManager
        guard tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered else {
            return
        }

        try await SSKEnvironment.shared.messageProcessorRef.waitForFetchingAndProcessing()

        guard let groupInfoToRefresh = Self.findGroupToAutoRefresh() else {
            // We didn't find a group to refresh; abort.
            return
        }

        let groupId = groupInfoToRefresh.groupId
        let groupSecretParams = groupInfoToRefresh.groupSecretParams
        if let lastRefreshDate = groupInfoToRefresh.lastRefreshDate {
            let formattedDays = String(format: "%.1f", -lastRefreshDate.timeIntervalSinceNow / TimeInterval.day)
            Logger.info("auto-refreshing group: \(groupId) which hasn't been refreshed in \(formattedDays) days")
        } else {
            Logger.info("auto-refreshing group: \(groupId) which has never been refreshed")
        }

        do {
            try await self.refreshGroup(secretParams: groupSecretParams)
        } catch GroupsV2Error.localUserNotInGroup {
            Logger.warn("can't auto-refresh group unless we're a member")
        } catch {
            owsFailDebugUnlessNetworkFailure(error)
        }
    }

    private struct GroupInfo {
        let groupId: GroupIdentifier
        let groupSecretParams: GroupSecretParams
        let lastRefreshDate: Date?
    }

    private static func findGroupToAutoRefresh() -> GroupInfo? {
        // Enumerate the all v2 groups, trying to find the "best" one to refresh.
        // The "best" is the group that hasn't been refreshed in the longest time.
        SSKEnvironment.shared.databaseStorageRef.read { transaction in
            var groupInfoToRefresh: GroupInfo?
            TSThread.anyEnumerate(transaction: transaction, batchingPreference: .batched(Batching.kDefaultBatchSize)) { thread, stop in
                guard
                    let groupThread = thread as? TSGroupThread,
                    let groupModel = groupThread.groupModel as? TSGroupModelV2,
                    groupModel.groupMembership.isLocalUserFullOrInvitedMember,
                    !groupModel.isTerminated,
                    let groupSecretParams = try? groupModel.secretParams(),
                    let groupId = try? groupSecretParams.getPublicParams().getGroupIdentifier(),
                    !SSKEnvironment.shared.blockingManagerRef.isGroupIdBlocked(groupId, transaction: transaction)
                else {
                    // Refreshing a group we're not a member of will throw errors
                    return
                }

                let storeKey = groupId.serialize().toHex()
                guard
                    let lastRefreshDate: Date = Self.groupRefreshStore.getDate(
                        storeKey,
                        transaction: transaction,
                    )
                else {
                    // If we find a group that we have no record of refreshing,
                    // pick that one immediately.
                    groupInfoToRefresh = GroupInfo(
                        groupId: groupId,
                        groupSecretParams: groupSecretParams,
                        lastRefreshDate: nil,
                    )
                    stop = true
                    return
                }

                // Don't auto-refresh groups more than once a week.
                let maxRefreshFrequencyInternal: TimeInterval = .week
                guard abs(lastRefreshDate.timeIntervalSinceNow) > maxRefreshFrequencyInternal else {
                    return
                }

                if
                    let otherGroupInfo = groupInfoToRefresh,
                    let otherLastRefreshDate = otherGroupInfo.lastRefreshDate,
                    otherLastRefreshDate < lastRefreshDate
                {
                    // We already found another group with an older refresh
                    // date, so prefer that one.
                    return
                }

                groupInfoToRefresh = GroupInfo(
                    groupId: groupId,
                    groupSecretParams: groupSecretParams,
                    lastRefreshDate: lastRefreshDate,
                )
            }
            return groupInfoToRefresh
        }
    }

    public func updateGroupWithChangeActions(
        groupId: GroupIdentifier,
        spamReportingMetadata: GroupUpdateSpamReportingMetadata,
        changeActionsProto: GroupsProtoGroupChangeActions,
        groupSendEndorsementsResponse: GroupSendEndorsementsResponse?,
        downloadedAvatars: GroupAvatarStateMap,
        transaction: DBWriteTransaction,
    ) throws -> TSGroupThread {

        guard let groupThread = TSGroupThread.fetch(forGroupId: groupId, tx: transaction) else {
            throw OWSAssertionError("Missing groupThread.")
        }
        guard let localIdentifiers = DependenciesBridge.shared.tsAccountManager.localIdentifiers(tx: transaction) else {
            throw OWSAssertionError("Not registered.")
        }
        let changedGroupModel = try GroupsV2IncomingChanges.applyChangesToGroupModel(
            groupThread: groupThread,
            localIdentifiers: localIdentifiers,
            changeActionsProto: changeActionsProto,
            downloadedAvatars: downloadedAvatars,
            options: [],
        )
        // The prior method throws if the revisions don't match.
        owsAssertDebug(changedGroupModel.newGroupModel.revision == changedGroupModel.oldGroupModel.revision + 1)

        GroupManager.updateExistingGroupThreadInDatabaseAndCreateInfoMessage(
            groupThread: groupThread,
            newGroupModel: changedGroupModel.newGroupModel,
            newDisappearingMessageToken: changedGroupModel.newDisappearingMessageToken,
            newlyLearnedPniToAciAssociations: changedGroupModel.newlyLearnedPniToAciAssociations,
            groupUpdateSource: changedGroupModel.updateSource,
            localIdentifiers: localIdentifiers,
            spamReportingMetadata: spamReportingMetadata,
            transaction: transaction,
        )
        // The prior method always updates the revision because we've confirmed it's newer.
        owsAssertDebug((groupThread.groupModel as? TSGroupModelV2)?.revision == changedGroupModel.newGroupModel.revision)

        let authoritativeProfileKeys = changedGroupModel.profileKeys.filter {
            $0.key == changedGroupModel.updateSource.serviceIdUnsafeForLocalUserComparison()
        }
        GroupManager.storeProfileKeysFromGroupProtos(
            allProfileKeysByAci: changedGroupModel.profileKeys,
            authoritativeProfileKeysByAci: authoritativeProfileKeys,
            localIdentifiers: localIdentifiers,
            tx: transaction,
        )

        if let groupSendEndorsementsResponse {
            SSKEnvironment.shared.groupsV2Ref.handleGroupSendEndorsementsResponse(
                groupSendEndorsementsResponse,
                groupThreadId: groupThread.sqliteRowId!,
                secretParams: try changedGroupModel.newGroupModel.secretParams(),
                membership: groupThread.groupMembership,
                localAci: localIdentifiers.aci,
                tx: transaction,
            )
        }

        return groupThread
    }

    public func refreshGroupImpl(
        secretParams: GroupSecretParams,
        spamReportingMetadata: GroupUpdateSpamReportingMetadata,
        source: GroupChangeActionFetchSource,
        options: TSGroupModelOptions,
    ) async throws {
        let groupId = try secretParams.getPublicParams().getGroupIdentifier()

        let taskQueue: ConcurrentTaskQueue
        switch source {
        case .groupMessage:
            // The upstream caller handles the concurrency for these requests, so
            // create a dummy queue that just runs it immediately. This avoids deadlock
            // that may happen if group message processing gets stuck behind an
            // unrelated group refresh that's waiting for group message processing.
            taskQueue = ConcurrentTaskQueue(concurrentLimit: 1)
        case .other:
            taskQueue = self.operationQueue
        }

        try await taskQueue.run {
            let isThrottled = { () -> Bool in
                guard options.contains(.throttle) else {
                    return false
                }
                guard let lastSuccessfulRefreshDate = self.lastSuccessfulRefreshDate(forGroupId: groupId) else {
                    return false
                }
                // Don't auto-refresh more often than once every N minutes.
                let refreshFrequency: TimeInterval = .minute * 5
                return abs(lastSuccessfulRefreshDate.timeIntervalSinceNow) < refreshFrequency
            }()

            let databaseStorage = SSKEnvironment.shared.databaseStorageRef
            try databaseStorage.read { tx in
                // - If we're blocked, it's an immediate error
                if SSKEnvironment.shared.blockingManagerRef.isGroupIdBlocked(groupId, transaction: tx) {
                    throw GroupsV2Error.groupBlocked
                }
            }

            if isThrottled {
                return
            }

            try await self.runUpdateOperation(
                secretParams: secretParams,
                spamReportingMetadata: spamReportingMetadata,
                source: source,
                options: options,
            )

            switch source {
            case .groupMessage:
                // We may or may not have updated to the very latest state, so we still
                // want to be able to refresh again when you open the conversation.
                break
            case .other:
                await self.didUpdateGroupToLatestRevision(groupId: groupId)
            }
        }
    }

    private func lastSuccessfulRefreshDate(forGroupId groupId: GroupIdentifier) -> Date? {
        lastSuccessfulRefreshMap[groupId]
    }

    private func didUpdateGroupToLatestRevision(groupId: GroupIdentifier) async {
        lastSuccessfulRefreshMap[groupId] = Date()
        await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
            Self.groupRefreshStore.setDate(Date(), key: groupId.serialize().hexadecimalString, transaction: tx)
        }
    }

    private func runUpdateOperation(
        secretParams: GroupSecretParams,
        spamReportingMetadata: GroupUpdateSpamReportingMetadata,
        source: GroupChangeActionFetchSource,
        options: TSGroupModelOptions,
    ) async throws {
        switch source {
        case .groupMessage:
            // If we're processing a message, we can't wait to finish processing
            // messages or we'll deadlock.
            break
        case .other:
            try await SSKEnvironment.shared.messageProcessorRef.waitForFetchingAndProcessing()
        }

        do {
            try await refreshGroupFromService(
                secretParams: secretParams,
                spamReportingMetadata: spamReportingMetadata,
                source: source,
                options: options,
            )
        } catch {
            Logger.warn("Group update failed: \(error)")
            switch error {
            case _ where error.isNetworkFailureOrTimeout:
                break
            case GroupsV2Error.localUserNotInGroup, GroupsV2Error.timeout:
                break
            case URLError.cancelled:
                break
            default:
                owsFailDebug("Group update failed: \(error)")
            }
            throw error
        }
    }
}

// MARK: - Refresh group from service

public extension GroupV2UpdatesImpl {

    // Fetch group state from service and apply.
    //
    // * Try to fetch and apply incremental "changes" -
    //   if the group already existing in the database.
    // * Failover to fetching and applying latest snapshot.
    // * We need to distinguish between retryable (network) errors
    //   and non-retryable errors.
    // * In the case of networking errors, we should do exponential
    //   backoff.
    // * If reachability changes, we should retry network errors
    //   immediately.
    func refreshGroupFromService(
        secretParams: GroupSecretParams,
        spamReportingMetadata: GroupUpdateSpamReportingMetadata,
        source: GroupChangeActionFetchSource,
        options: TSGroupModelOptions,
    ) async throws {
        do {
            // Try to use individual changes.
            try await self.fetchAndApplyChangeActionsFromService(
                secretParams: secretParams,
                spamReportingMetadata: spamReportingMetadata,
                source: source,
                options: options,
            )
        } catch {
            let shouldTrySnapshot = { () -> Bool in
                // This should not fail over in the case of networking problems.
                if error.isNetworkFailureOrTimeout {
                    Logger.warn("Error: \(error)")
                    return false
                }

                switch error {
                case GroupsV2Error.localUserNotInGroup:
                    // We can recover from some auth edge cases using a
                    // snapshot. For example, if we are joining via an
                    // invite link we will be unable to fetch change
                    // actions.
                    return true
                case GroupsV2Error.groupChangeProtoForIncompatibleRevision:
                    // If we got change protos for an incompatible revision,
                    // try and recover using a snapshot.
                    return true
                case URLError.cancelled:
                    return false
                default:
                    owsFailDebugUnlessNetworkFailure(error)
                    return false
                }
            }()

            guard shouldTrySnapshot else {
                throw error
            }

            // Failover to applying latest snapshot.
            try await self.fetchAndApplyCurrentGroupV2SnapshotFromService(
                secretParams: secretParams,
                spamReportingMetadata: spamReportingMetadata,
                options: options,
                skipTerminatedGroup: false,
            )
        }
    }

    private func fetchAndApplyChangeActionsFromService(
        secretParams: GroupSecretParams,
        spamReportingMetadata: GroupUpdateSpamReportingMetadata,
        source: GroupChangeActionFetchSource,
        options: TSGroupModelOptions,
    ) async throws {
        while true {
            let groupsV2 = SSKEnvironment.shared.groupsV2Ref
            let response = try await groupsV2.fetchSomeGroupChangeActions(
                secretParams: secretParams,
                source: source,
            )

            var groupChanges = response.groupChanges
            var groupSendEndorsementsResponse = response.groupSendEndorsementsResponse

            switch source {
            case .groupMessage(let upThroughRevision):
                if groupChanges.contains(where: { $0.revision > upThroughRevision }) {
                    owsFailDebug("Ignoring revisions beyond \(upThroughRevision).")
                    groupChanges.removeAll(where: { $0.revision > upThroughRevision })
                    // We dropped the final revision, and this is valid for that.
                    groupSendEndorsementsResponse = nil
                }
            case .other:
                break
            }

            try await self.tryToApplyGroupChangesFromService(
                secretParams: secretParams,
                spamReportingMetadata: spamReportingMetadata,
                groupChanges: groupChanges,
                groupSendEndorsementsResponse: groupSendEndorsementsResponse,
                options: options,
            )

            if !response.shouldFetchMore {
                break
            }
        }

        // Retry avatar downloads that were skipped because of the thread not
        // yet existing at the time of the snapshot being applied.
        try await SSKEnvironment.shared.groupsV2Ref.downloadAndApplyGroupAvatarIfSkipped(secretParams)
    }

    private func tryToApplyGroupChangesFromService(
        secretParams: GroupSecretParams,
        spamReportingMetadata: GroupUpdateSpamReportingMetadata,
        groupChanges: [GroupV2Change],
        groupSendEndorsementsResponse: GroupSendEndorsementsResponse?,
        options: TSGroupModelOptions,
    ) async throws {
        try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in
            let tsAccountManager = DependenciesBridge.shared.tsAccountManager
            guard let localIdentifiers = tsAccountManager.localIdentifiers(tx: transaction) else {
                throw OWSAssertionError("Missing localIdentifiers.")
            }

            let groupV2Params = try GroupV2Params(groupSecretParams: secretParams)
            let groupId = try groupV2Params.groupPublicParams.getGroupIdentifier()

            let groupThread: TSGroupThread
            var localUserWasAddedBy: GroupUpdateSource?

            if let existingThread = TSGroupThread.fetch(forGroupId: groupId, tx: transaction) {
                groupThread = existingThread
                localUserWasAddedBy = nil
            } else {
                (groupThread, localUserWasAddedBy) = try self.insertThreadForGroupChanges(
                    groupId: groupId,
                    spamReportingMetadata: spamReportingMetadata,
                    groupV2Params: groupV2Params,
                    groupChanges: groupChanges,
                    groupModelOptions: options,
                    localIdentifiers: localIdentifiers,
                    transaction: transaction,
                )
            }

            var profileKeysByAci = [Aci: Data]()
            var authoritativeProfileKeysByAci = [Aci: Data]()
            for groupChange in groupChanges {
                let applyResult = try autoreleasepool {
                    try self.tryToApplySingleChangeFromService(
                        groupThread: groupThread,
                        groupV2Params: groupV2Params,
                        options: options,
                        groupChange: groupChange,
                        profileKeysByAci: &profileKeysByAci,
                        authoritativeProfileKeysByAci: &authoritativeProfileKeysByAci,
                        localIdentifiers: localIdentifiers,
                        spamReportingMetadata: spamReportingMetadata,
                        transaction: transaction,
                    )
                }

                if
                    let applyResult,
                    applyResult.wasLocalUserAddedByChange
                {
                    localUserWasAddedBy = applyResult.changeAuthor
                }
            }

            GroupManager.storeProfileKeysFromGroupProtos(
                allProfileKeysByAci: profileKeysByAci,
                authoritativeProfileKeysByAci: authoritativeProfileKeysByAci,
                localIdentifiers: localIdentifiers,
                tx: transaction,
            )

            let localUserWasAddedByBlockedUser: Bool
            switch localUserWasAddedBy {
            case nil, .unknown, .localUser:
                localUserWasAddedByBlockedUser = false
            case .legacyE164(let e164):
                localUserWasAddedByBlockedUser = SSKEnvironment.shared.blockingManagerRef.isAddressBlocked(
                    .legacyAddress(serviceId: nil, phoneNumber: e164.stringValue),
                    transaction: transaction,
                )
            case .aci(let aci):
                localUserWasAddedByBlockedUser = SSKEnvironment.shared.blockingManagerRef.isAddressBlocked(
                    .init(aci),
                    transaction: transaction,
                )
            case .rejectedInviteToPni:
                owsFailDebug("Local user added, but group update source was a PNI invite decline?")
                localUserWasAddedByBlockedUser = false
            }

            if localUserWasAddedByBlockedUser {
                // If we have been added to the group by a blocked user, we
                // should automatically leave the group. To that end, enqueue
                // a leave action after we've finished processing messages.
                _ = GroupManager.localLeaveGroupOrDeclineInvite(
                    groupThread: groupThread,
                    waitForMessageProcessing: true,
                    tx: transaction,
                )
            } else if
                let groupProfileKey = profileKeysByAci[localIdentifiers.aci],
                let localProfileKey = SSKEnvironment.shared.profileManagerRef.localUserProfile(tx: transaction)?.profileKey,
                groupProfileKey != localProfileKey.keyData
            {
                // If the final group state includes a stale profile key for the
                // local user, schedule an update to fix that. Note that we skip
                // this step if we are planning to leave the group via the block
                // above, as it's redundant.
                SSKEnvironment.shared.groupsV2Ref.updateLocalProfileKeyInGroup(
                    groupId: groupId.serialize(),
                    transaction: transaction,
                )
            }

            if let groupSendEndorsementsResponse {
                SSKEnvironment.shared.groupsV2Ref.handleGroupSendEndorsementsResponse(
                    groupSendEndorsementsResponse,
                    groupThreadId: groupThread.sqliteRowId!,
                    secretParams: secretParams,
                    membership: groupThread.groupMembership,
                    localAci: localIdentifiers.aci,
                    tx: transaction,
                )
            }
        }
    }

    // When learning about a v2 group for the first time, we need a snapshot of
    // the group's current state to get us started. From then on we prefer to
    // update the group using change actions, since those have more information.
    // Specifically, change actions record who performed the action, e.g. who
    // created the group or added us.
    //
    // We use this method to insert a thread if need be, so we can use change
    // actions going forward to keep the group up-to-date.
    private func insertThreadForGroupChanges(
        groupId: GroupIdentifier,
        spamReportingMetadata: GroupUpdateSpamReportingMetadata,
        groupV2Params: GroupV2Params,
        groupChanges: [GroupV2Change],
        groupModelOptions: TSGroupModelOptions,
        localIdentifiers: LocalIdentifiers,
        transaction: DBWriteTransaction,
    ) throws -> (TSGroupThread, addedToNewThreadBy: GroupUpdateSource?) {
        if TSGroupThread.fetch(forGroupId: groupId, tx: transaction) != nil {
            throw OWSAssertionError("Can't insert group thread that already exists.")
        }

        guard
            let firstGroupChange = groupChanges.first,
            let snapshot = firstGroupChange.snapshot
        else {
            throw OWSAssertionError("Missing first group change with snapshot")
        }

        let groupUpdateSource = try firstGroupChange.author(
            groupV2Params: groupV2Params,
            localIdentifiers: localIdentifiers,
        )

        var builder = try TSGroupModelBuilder.builderForSnapshot(
            groupV2Snapshot: snapshot,
            transaction: transaction,
        )
        builder.apply(options: groupModelOptions)

        let newGroupModel = try builder.buildAsV2()

        let newDisappearingMessageToken = snapshot.disappearingMessageToken
        let didAddLocalUserToV2Group = self.didAddLocalUserToV2Group(
            inGroupChange: firstGroupChange,
            groupV2Params: groupV2Params,
            localIdentifiers: localIdentifiers,
        )

        let groupThread = GroupManager.tryToUpsertExistingGroupThreadInDatabaseAndCreateInfoMessage(
            newGroupModel: newGroupModel,
            newDisappearingMessageToken: newDisappearingMessageToken,
            newlyLearnedPniToAciAssociations: [:],
            groupUpdateSource: groupUpdateSource,
            didAddLocalUserToV2Group: didAddLocalUserToV2Group,
            infoMessagePolicy: .insert,
            localIdentifiers: localIdentifiers,
            spamReportingMetadata: spamReportingMetadata,
            transaction: transaction,
        )

        // NOTE: We don't need to worry about profile keys here.  This method is
        // only used by tryToApplyGroupChangesFromServiceNow() which will take
        // care of that.

        return (
            groupThread,
            addedToNewThreadBy: didAddLocalUserToV2Group ? groupUpdateSource : nil,
        )
    }

    private struct ApplySingleChangeFromServiceResult {
        let changeAuthor: GroupUpdateSource
        let wasLocalUserAddedByChange: Bool
    }

    private func tryToApplySingleChangeFromService(
        groupThread: TSGroupThread,
        groupV2Params: GroupV2Params,
        options: TSGroupModelOptions,
        groupChange: GroupV2Change,
        profileKeysByAci: inout [Aci: Data],
        authoritativeProfileKeysByAci: inout [Aci: Data],
        localIdentifiers: LocalIdentifiers,
        spamReportingMetadata: GroupUpdateSpamReportingMetadata,
        transaction: DBWriteTransaction,
    ) throws -> ApplySingleChangeFromServiceResult? {
        guard let oldGroupModel = groupThread.groupModel as? TSGroupModelV2 else {
            throw OWSAssertionError("Invalid group model.")
        }

        // If this change is older than the group, there's nothing to do. If it's
        // the same, it probably means it's a snapshot we're supposed to re-apply.
        if groupChange.revision < oldGroupModel.revision {
            return nil
        }

        let logger = PrefixedLogger(
            prefix: "ApplySingleChange",
            suffix: "\(oldGroupModel.revision) -> \(groupChange.revision)",
        )

        let newGroupModel: TSGroupModel
        let newDisappearingMessageToken: DisappearingMessageToken?
        let newProfileKeys: [Aci: Data]
        let newlyLearnedPniToAciAssociations: [Pni: Aci]
        let groupUpdateSource: GroupUpdateSource

        // We should prefer to update models using the change action if we can,
        // since it contains information about the change author.
        if
            let changeActionsProto = groupChange.changeActionsProto,
            groupChange.revision == oldGroupModel.revision + 1,
            !oldGroupModel.isJoinRequestPlaceholder
        {
            logger.info("Applying changeActions.")

            let changedGroupModel = try GroupsV2IncomingChanges.applyChangesToGroupModel(
                groupThread: groupThread,
                localIdentifiers: localIdentifiers,
                changeActionsProto: changeActionsProto,
                downloadedAvatars: groupChange.downloadedAvatars,
                options: options,
            )
            newGroupModel = changedGroupModel.newGroupModel
            newDisappearingMessageToken = changedGroupModel.newDisappearingMessageToken
            newProfileKeys = changedGroupModel.profileKeys
            newlyLearnedPniToAciAssociations = changedGroupModel.newlyLearnedPniToAciAssociations
            groupUpdateSource = changedGroupModel.updateSource
        } else if let snapshot = groupChange.snapshot {
            logger.info("Applying snapshot.")

            var builder = try TSGroupModelBuilder.builderForSnapshot(groupV2Snapshot: snapshot, transaction: transaction)
            builder.apply(options: options)
            newGroupModel = try builder.build()
            newDisappearingMessageToken = snapshot.disappearingMessageToken
            newProfileKeys = snapshot.profileKeys
            newlyLearnedPniToAciAssociations = [:]
            // Snapshots don't have a single author, so we don't know the source.
            groupUpdateSource = .unknown
        } else {
            // We had a group change proto with no snapshot, but the change was
            // not a single revision update.
            throw GroupsV2Error.groupChangeProtoForIncompatibleRevision
        }

        GroupManager.updateExistingGroupThreadInDatabaseAndCreateInfoMessage(
            groupThread: groupThread,
            newGroupModel: newGroupModel,
            newDisappearingMessageToken: newDisappearingMessageToken,
            newlyLearnedPniToAciAssociations: newlyLearnedPniToAciAssociations,
            groupUpdateSource: groupUpdateSource,
            localIdentifiers: localIdentifiers,
            spamReportingMetadata: spamReportingMetadata,
            transaction: transaction,
        )

        switch groupUpdateSource {
        case .unknown, .legacyE164, .rejectedInviteToPni, .localUser:
            break
        case .aci(let groupUpdateSourceAci):
            if let groupUpdateProfileKey = newProfileKeys[groupUpdateSourceAci] {
                authoritativeProfileKeysByAci[groupUpdateSourceAci] = groupUpdateProfileKey
            }
        }

        // Merge known profile keys, always taking latest.
        profileKeysByAci.merge(newProfileKeys) { _, latest in latest }

        return ApplySingleChangeFromServiceResult(
            changeAuthor: groupUpdateSource,
            wasLocalUserAddedByChange: didAddLocalUserToV2Group(
                inGroupChange: groupChange,
                groupV2Params: groupV2Params,
                localIdentifiers: localIdentifiers,
            ),
        )
    }

    // MARK: - Current Snapshot

    func fetchAndApplyCurrentGroupV2SnapshotFromService(
        secretParams: GroupSecretParams,
        spamReportingMetadata: GroupUpdateSpamReportingMetadata,
        options: TSGroupModelOptions,
        skipTerminatedGroup: Bool,
    ) async throws {
        let groupsV2 = SSKEnvironment.shared.groupsV2Ref
        let snapshotResponse = try await groupsV2.fetchLatestSnapshot(
            secretParams: secretParams,
            justUploadedAvatars: nil,
        )

        let groupV2Snapshot = snapshotResponse.groupSnapshot

        if skipTerminatedGroup, groupV2Snapshot.isTerminated {
            throw GroupsV2Error.skipRestoringTerminatedGroup
        }

        try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in
            guard let localIdentifiers = DependenciesBridge.shared.tsAccountManager.localIdentifiers(tx: transaction) else {
                throw OWSAssertionError("Missing localIdentifiers.")
            }
            let localAci = localIdentifiers.aci

            let profileManager = SSKEnvironment.shared.profileManagerRef
            let localProfileKey = profileManager.localUserProfile(tx: transaction)?.profileKey

            var builder = try TSGroupModelBuilder.builderForSnapshot(groupV2Snapshot: groupV2Snapshot, transaction: transaction)
            builder.apply(options: options)

            let groupId = try secretParams.getPublicParams().getGroupIdentifier()
            if
                let groupThread = TSGroupThread.fetch(forGroupId: groupId, tx: transaction),
                let oldGroupModel = groupThread.groupModel as? TSGroupModelV2,
                oldGroupModel.revision == builder.groupV2Revision
            {
                // Preserve certain transient properties if overwriting a model
                // at the same revision.
                if oldGroupModel.didJustAddSelfViaGroupLink {
                    builder.didJustAddSelfViaGroupLink = true
                }
            }

            let newGroupModel = try builder.buildAsV2()
            let newDisappearingMessageToken = groupV2Snapshot.disappearingMessageToken
            // groupUpdateSource is unknown because we don't know the
            // author(s) of changes reflected in the snapshot.
            let groupUpdateSource: GroupUpdateSource = .unknown
            let groupThread = GroupManager.tryToUpsertExistingGroupThreadInDatabaseAndCreateInfoMessage(
                newGroupModel: newGroupModel,
                newDisappearingMessageToken: newDisappearingMessageToken,
                newlyLearnedPniToAciAssociations: [:], // Not available from snapshots
                groupUpdateSource: groupUpdateSource,
                didAddLocalUserToV2Group: false,
                infoMessagePolicy: .insert,
                localIdentifiers: localIdentifiers,
                spamReportingMetadata: spamReportingMetadata,
                transaction: transaction,
            )

            GroupManager.storeProfileKeysFromGroupProtos(
                allProfileKeysByAci: groupV2Snapshot.profileKeys,
                localIdentifiers: localIdentifiers,
                tx: transaction,
            )

            // If the group state includes a stale profile key for the
            // local user, schedule an update to fix that.
            if let localProfileKey, let profileKey = groupV2Snapshot.profileKeys[localAci], profileKey != localProfileKey.keyData {
                groupsV2.updateLocalProfileKeyInGroup(groupId: newGroupModel.groupId, transaction: transaction)
            }

            if let groupSendEndorsementsResponse = snapshotResponse.groupSendEndorsementsResponse {
                groupsV2.handleGroupSendEndorsementsResponse(
                    groupSendEndorsementsResponse,
                    groupThreadId: groupThread.sqliteRowId!,
                    secretParams: secretParams,
                    membership: groupV2Snapshot.groupMembership,
                    localAci: localAci,
                    tx: transaction,
                )
            }
        }

        // Retry avatar downloads that were skipped because of the thread not
        // yet existing at the time of the snapshot being applied.
        try await groupsV2.downloadAndApplyGroupAvatarIfSkipped(secretParams)
    }

    private func didAddLocalUserToV2Group(
        inGroupChange groupChange: GroupV2Change,
        groupV2Params: GroupV2Params,
        localIdentifiers: LocalIdentifiers,
    ) -> Bool {
        let localAci = localIdentifiers.aci
        if groupChange.revision == 0 {
            // Revision 0 is a special case and won't have actions to
            // reflect the initial membership.
            return true
        }
        guard let changeActionsProto = groupChange.changeActionsProto else {
            // We're missing a change here, so we can't assume this is how we got into the group.
            return false
        }

        for action in changeActionsProto.addMembers {
            do {
                guard let member = action.added else {
                    continue
                }
                guard let userId = member.userID else {
                    continue
                }
                // Some userIds/uuidCiphertexts can be validated by
                // the service. This is one.
                let aci = try groupV2Params.aci(for: userId)
                if aci == localAci {
                    return true
                }
            } catch {
                owsFailDebug("Error: \(error)")
            }
        }
        for action in changeActionsProto.promotePendingMembers {
            do {
                let uuidCiphertext: UuidCiphertext
                if let userId = action.userID {
                    uuidCiphertext = try UuidCiphertext(contents: userId)
                } else if let presentationData = action.presentation {
                    let presentation = try ProfileKeyCredentialPresentation(contents: presentationData)
                    uuidCiphertext = try presentation.getUuidCiphertext()
                } else {
                    throw OWSAssertionError("Missing userId.")
                }

                let aci = try groupV2Params.serviceId(for: uuidCiphertext)
                if aci == localAci {
                    return true
                }
            } catch {
                owsFailDebug("Error: \(error)")
            }
        }
        for action in changeActionsProto.promotePniPendingMembers {
            do {
                guard let userId = action.userID else {
                    throw OWSAssertionError("Missing userID.")
                }
                let aci = try groupV2Params.aci(for: userId)
                if aci == localAci {
                    return true
                }
            } catch {
                owsFailDebug("Error: \(error)")
            }
        }
        for action in changeActionsProto.promoteRequestingMembers {
            do {
                guard let userId = action.userID else {
                    throw OWSAssertionError("Missing userID.")
                }
                // Some userIds/uuidCiphertexts can be validated by
                // the service. This is one.
                let aci = try groupV2Params.aci(for: userId)
                if aci == localAci {
                    return true
                }
            } catch {
                owsFailDebug("Error: \(error)")
            }
        }
        return false
    }
}

// MARK: -

extension GroupsV2Error: IsRetryableProvider {
    public var isRetryableProvider: Bool {
        if self.isNetworkFailureOrTimeout {
            return true
        }

        switch self {
        case
            .conflictingChangeOnService,
            .timeout:
            return true
        case
            .localUserNotInGroup,
            .cannotBuildGroupChangeProto_conflictingChange,
            .cannotBuildGroupChangeProto_tooManyMembers,
            .localUserIsNotARequestingMember,
            .cantApplyChangesToPlaceholder,
            .expiredGroupInviteLink,
            .terminatedGroupInviteLink,
            .groupBlocked,
            .localUserBlockedFromJoining,
            .groupChangeProtoForIncompatibleRevision,
            .serviceRequestHitRecoverable400,
            .skipRestoringTerminatedGroup:
            return false
        }
    }
}

private extension GroupV2Change {
    func author(
        groupV2Params: GroupV2Params,
        localIdentifiers: LocalIdentifiers,
    ) throws -> GroupUpdateSource {
        if let changeActionsProto {
            return try changeActionsProto.updateSource(
                groupV2Params: groupV2Params,
                localIdentifiers: localIdentifiers,
            ).0
        }
        return .unknown
    }
}

public extension GroupsProtoGroupChangeActions {

    func updateSource(
        groupV2Params: GroupV2Params,
        localIdentifiers: LocalIdentifiers,
    ) throws -> (GroupUpdateSource, ServiceId?) {
        func compareToLocal(
            source: GroupUpdateSource,
            serviceId: ServiceId,
        ) -> (GroupUpdateSource, ServiceId) {
            if localIdentifiers.contains(serviceId: serviceId) {
                return (.localUser(originalSource: source), serviceId)
            }

            return (source, serviceId)
        }

        guard let changeAuthorUserId: Data = self.sourceUserID else {
            owsFailDebug("Explicit changes should always have authors")
            return (.unknown, nil)
        }

        let serviceId = try groupV2Params.serviceId(for: changeAuthorUserId)
        switch serviceId.concreteType {
        case .aci(let aci):
            return compareToLocal(
                source: .aci(aci),
                serviceId: aci,
            )
        case .pni(let pni):
            /// At the time of writing, the only change actions with a PNI
            /// author are accepting or declining a PNI invite.
            ///
            /// If future updates to change actions introduce more actions with
            /// PNI authors, differentiate them here. The best time to
            /// differentiate is when we have access to the raw change actions.
            if
                self.deletePendingMembers.count == 1,
                let firstDeletePendingMember = self.deletePendingMembers.first,
                let firstDeletePendingMemberUserId = firstDeletePendingMember.deletedUserID,
                let firstDeletePendingMemberPni = try? groupV2Params.serviceId(for: firstDeletePendingMemberUserId) as? Pni
            {
                guard firstDeletePendingMemberPni == pni else {
                    owsFailDebug("Canary: PNI from change author doesn't match service ID in delete pending member change action!")
                    return (.unknown, nil)
                }

                return compareToLocal(
                    source: .rejectedInviteToPni(pni),
                    serviceId: pni,
                )
            } else if
                self.promotePniPendingMembers.count == 1,
                let firstPromotePniPendingMember = self.promotePniPendingMembers.first,
                let firstPromotePniPendingMemberAciUserId = firstPromotePniPendingMember.userID,
                let firstPromotePniPendingMemberAci = try? groupV2Params.serviceId(for: firstPromotePniPendingMemberAciUserId) as? Aci,
                let firstPromotePniPendingMemberPniUserId = firstPromotePniPendingMember.pni,
                let firstPromotePniPendingMemberPni = try? groupV2Params.serviceId(for: firstPromotePniPendingMemberPniUserId) as? Pni
            {
                guard firstPromotePniPendingMemberPni == pni else {
                    owsFailDebug("Canary: PNI from change author doesn't match service ID in promote PNI pending member change action!")
                    return (.unknown, nil)
                }

                /// While the service ID we received as the group update source
                /// from the server was a PNI, we know (thanks to the change
                /// action itself) the associated ACI. Since the ACI is how
                /// we're going to address this user going forward, we'll
                /// claim starting now that's who authored the change action.
                return compareToLocal(
                    source: .aci(firstPromotePniPendingMemberAci),
                    serviceId: firstPromotePniPendingMemberAci,
                )
            } else if
                self.addMembers.count == 1,
                let addMemberAction = self.addMembers.first,
                addMemberAction.joinFromInviteLink,
                let firstPniMemberAddedByLinkUserId = addMemberAction.added?.userID,
                let firstPniMemberAddedByLinkAci = try? groupV2Params.serviceId(for: firstPniMemberAddedByLinkUserId) as? Aci
            {
                /// While the service ID we received as the group update source
                /// from the server was a PNI, we know (thanks to the change
                /// action itself) the associated ACI. Since the ACI is how
                /// we're going to address this user going forward, we'll
                /// claim starting now that's who authored the change action.
                /// Note that this particular situation is legacy behavior and should
                /// eventually stop happening in the future.
                owsFailDebug("Canary: Legacy change action received from PNI change author!")
                return compareToLocal(
                    source: .aci(firstPniMemberAddedByLinkAci),
                    serviceId: firstPniMemberAddedByLinkAci,
                )
            } else if
                self.addRequestingMembers.count == 1,
                let addRequestingMemebers = self.addRequestingMembers.first,
                let firstPniMemberRequestingAddUserId = addRequestingMemebers.added?.userID,
                let firstPniMemberRequestingAddAci = try? groupV2Params.serviceId(for: firstPniMemberRequestingAddUserId) as? Aci
            {
                /// While the service ID we received as the group update source
                /// from the server was a PNI, we know (thanks to the change
                /// action itself) the associated ACI. Since the ACI is how
                /// we're going to address this user going forward, we'll
                /// claim starting now that's who authored the change action.
                /// Note that this particular situation is legacy behavior and should
                /// eventually stop happening in the future.
                owsFailDebug("Canary: Legacy change action received from PNI change author!")
                return compareToLocal(
                    source: .aci(firstPniMemberRequestingAddAci),
                    serviceId: firstPniMemberRequestingAddAci,
                )
            } else {
                owsFailDebug("Canary: unknown type of PNI-authored group update!")
                return (.unknown, nil)
            }
        }
    }
}