Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Messages/GroupMessageProcessor.swift
1 views
//
// Copyright 2020 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import Foundation
public import LibSignalClient

private struct IncomingGroupsV2MessageJobInfo {
    let job: GroupMessageProcessorJob
    let envelope: SSKProtoEnvelope
    let plaintextData: Data
    let groupContext: SSKProtoGroupContextV2
    let groupContextInfo: GroupV2ContextInfo
}

// MARK: -

/// Processes group messages for a single group.
///
/// * It retries with exponential backoff.
/// * It retries immediately if reachability, etc., change.
///
/// It returns when all jobs are processed.
class SpecificGroupMessageProcessor {
    fileprivate let groupId: Data
    private let finder = GroupMessageProcessorJobStore()

    fileprivate init(groupId: Data) {
        self.groupId = groupId

        observeNotifications()
    }

    private func observeNotifications() {
        let nc = NotificationCenter.default
        nc.addObserver(
            self,
            selector: #selector(chatConnectionStateDidChange),
            name: OWSChatConnection.chatConnectionStateDidChange,
            object: nil,
        )
        nc.addObserver(
            self,
            selector: #selector(reachabilityChanged),
            name: SSKReachability.owsReachabilityDidChange,
            object: nil,
        )
    }

    // MARK: - Notifications

    @objc
    @MainActor
    private func chatConnectionStateDidChange() {
        AssertIsOnMainThread()
        setMightBeAbleToMakeProgress()
    }

    @objc
    @MainActor
    private func reachabilityChanged() {
        AssertIsOnMainThread()
        setMightBeAbleToMakeProgress()
    }

    // MARK: -

    /// Trigger an immediate retry of queued jobs.
    ///
    /// Call this when an external trigger (e.g., "network became reachable")
    /// indicates that previously-failed jobs may succeed upon another attempt.
    private func setMightBeAbleToMakeProgress() {
        state.update {
            $0.mightBeAbleToMakeProgress = true
            // If we're currently waiting, try again immediately.
            $0.retryIntervalTask?.cancel()
        }
    }

    private struct State {
        var mightBeAbleToMakeProgress = true
        var retryIntervalTask: Task<Void, any Error>?
    }

    private let state = AtomicValue(State(), lock: .init())

    func processBatches(willFetchNextJobs: () -> Void) async {
        var backoffCount = 0
        // For as long as there are jobs to process...
        while true {
            let backgroundTask = OWSBackgroundTask(label: "\(#function)")
            do throws(RetryableError) {
                defer { backgroundTask.end() }

                var newestGuaranteedFailureJobId: Int64?

                // ...process them, until we hit an error that requires backoff.
                while true {
                    do throws(CancellationError) {
                        try await Preconditions([
                            ProcessingPermittedPrecondition(messagePipelineSupervisor: SSKEnvironment.shared.messagePipelineSupervisorRef),
                            NotificationPrecondition(notificationName: .registrationStateDidChange, isSatisfied: {
                                return DependenciesBridge.shared.tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered
                            }),
                        ]).waitUntilSatisfied()
                    } catch {
                        owsFail("Cancellation isn't supported.")
                    }

                    // We want a value that is just high enough to yield perf benefits.
                    let kIncomingMessageBatchSize: Int = 16
                    // If the app is in the background, use batch size of 1. This (only
                    // slightly) makes it less likely that we'll hit a 0xdead10cc crash and
                    // need to re-do work we've already done.
                    // TODO: Stop processing batches when suspending.
                    let batchSize: Int = CurrentAppContext().isInBackground() ? 1 : kIncomingMessageBatchSize

                    willFetchNextJobs()

                    // Keep track of external triggers while we're processing a batch. These
                    // may indicate that immediate retries are worthwhile.
                    state.update {
                        $0.mightBeAbleToMakeProgress = false
                    }

                    let hasMore = try await self.processBatch(
                        batchLimit: batchSize,
                        newestGuaranteedFailureJobId: &newestGuaranteedFailureJobId,
                    )
                    if !hasMore {
                        return
                    }

                    // If we successfully process a batch, reset the backoff counter. If we had
                    // to back off while processing a specific update, the next update
                    // shouldn't reuse that backoff because things seem to be back to normal.
                    backoffCount = 0
                }
            } catch let error {
                Logger.warn("\(error)")

                let retryIntervalTask = state.update { mutableState -> Task<Void, any Error>? in
                    if mutableState.mightBeAbleToMakeProgress {
                        Logger.info("Retrying immediately because of an external trigger.")
                        return nil
                    } else {
                        let retryIntervalNs = OWSOperation.retryIntervalForExponentialBackoff(failureCount: backoffCount).clampedNanoseconds
                        Logger.warn("Waiting for \(OWSOperation.formattedNs(retryIntervalNs))s before retrying.")
                        let retryIntervalTask = Task {
                            try await Task.sleep(nanoseconds: retryIntervalNs)
                        }
                        mutableState.retryIntervalTask = retryIntervalTask
                        return retryIntervalTask
                    }
                }

                do {
                    if let retryIntervalTask {
                        try await retryIntervalTask.value
                        // Don't increment backoffCount until after we wait for the entire backoff
                        // to elapse. This ensures that repeated failures (i.e., hitting the server
                        // and getting errors) will exponentially back off, but if an external
                        // trigger flip-flops repeatedly (i.e., turning Airplane Mode on and off),
                        // we increment the exponential backoff to an absurd level.
                        backoffCount += 1
                    }
                } catch {
                    Logger.info("Waking up & retrying immediately because of an external trigger.")
                }
            }
        }
    }

    private static func jobInfo(forJob job: GroupMessageProcessorJob) -> IncomingGroupsV2MessageJobInfo? {
        guard let envelope = try? job.parseEnvelope() else {
            owsFailDebug("Missing envelope.")
            return nil
        }
        guard
            let plaintextData = job.plaintextData,
            let groupContext = GroupMessageProcessorManager.groupContextV2(fromPlaintextData: plaintextData)
        else {
            owsFailDebug("Missing group context.")
            return nil
        }
        guard groupContext.hasRevision else {
            owsFailDebug("Missing revision.")
            return nil
        }
        let groupContextInfo: GroupV2ContextInfo
        do {
            groupContextInfo = try GroupV2ContextInfo.deriveFrom(masterKeyData: groupContext.masterKey ?? Data())
        } catch {
            owsFailDebug("Invalid group context: \(error).")
            return nil
        }
        return IncomingGroupsV2MessageJobInfo(
            job: job,
            envelope: envelope,
            plaintextData: plaintextData,
            groupContext: groupContext,
            groupContextInfo: groupContextInfo,
        )
    }

    static func discardMode(
        forMessageFrom sourceAci: Aci,
        groupContext: SSKProtoGroupContextV2,
        tx: DBReadTransaction,
    ) -> GroupMessageProcessorManager.DiscardMode {
        guard groupContext.hasRevision else {
            Logger.info("Missing revision in group context")
            return .discard
        }

        let groupContextInfo: GroupV2ContextInfo
        do {
            groupContextInfo = try GroupV2ContextInfo.deriveFrom(masterKeyData: groupContext.masterKey ?? Data())
        } catch {
            owsFailDebug("Invalid group context: \(error).")
            return .discard
        }

        return GroupMessageProcessorManager.discardMode(
            forMessageFrom: sourceAci,
            groupId: groupContextInfo.groupId,
            tx: tx,
        )
    }

    private static func discardMode(
        forJobInfo jobInfo: IncomingGroupsV2MessageJobInfo,
        hasGroupBeenUpdated: Bool,
        tx: DBReadTransaction,
    ) -> GroupMessageProcessorManager.DiscardMode {
        guard
            let sourceAci = Aci.parseFrom(
                serviceIdBinary: jobInfo.envelope.sourceServiceIDBinary,
                serviceIdString: jobInfo.envelope.sourceServiceID,
            )
        else {
            owsFailDebug("Invalid source address.")
            return .discard
        }
        return GroupMessageProcessorManager.discardMode(
            forMessageFrom: sourceAci,
            groupId: jobInfo.groupContextInfo.groupId,
            shouldCheckGroupModel: hasGroupBeenUpdated,
            tx: tx,
        )
    }

    /// As in other message receiving flows, we want to do batch processing
    /// wherever possible for perf reasons (to reduce view updates). We should
    /// be able to mostly do that. However, in some cases we need to update the
    /// group before we can process the message. These messages may require
    /// network requests, so they aren't batched with other messages.
    private func canJobBeProcessedWithoutUpdate(
        jobInfo: IncomingGroupsV2MessageJobInfo,
        tx: DBReadTransaction,
    ) -> Bool {
        if .discard == Self.discardMode(forJobInfo: jobInfo, hasGroupBeenUpdated: false, tx: tx) {
            return true
        }
        return GroupMessageProcessorManager.canContextBeProcessedWithoutUpdate(
            groupContext: jobInfo.groupContext,
            groupContextInfo: jobInfo.groupContextInfo,
            tx: tx,
        )
    }

    /// This method may process only a subset of the jobs.
    ///
    /// - Returns: True if there are more jobs to process.
    private func processBatch(batchLimit: Int, newestGuaranteedFailureJobId: inout Int64?) async throws(RetryableError) -> Bool {
        let databaseStorage = SSKEnvironment.shared.databaseStorageRef

        let hasMore: Bool
        let asyncJob: IncomingGroupsV2MessageJobInfo?

        (hasMore, asyncJob) = await databaseStorage.awaitableWrite { tx -> (Bool, IncomingGroupsV2MessageJobInfo?) in
            for _ in 1...batchLimit {
                guard let job = self.nextJob(tx: tx) else {
                    // Either there's no more jobs or we couldn't fetch jobs. Stop trying.
                    return (false, nil)
                }
                guard let jobInfo = Self.jobInfo(forJob: job) else {
                    self.didCompleteJob(job, tx: tx)
                    continue
                }
                if self.canJobBeProcessedWithoutUpdate(jobInfo: jobInfo, tx: tx) {
                    self.performLocalProcessingSync(jobInfo: jobInfo, tx: tx)
                    self.didCompleteJob(job, tx: tx)
                    continue
                }
                return (true, jobInfo)
            }
            return (true, nil)
        }

        if let asyncJob {
            let didUpdateGroup = try await updateGroup(
                jobInfo: asyncJob,
                newestGuaranteedFailureJobId: &newestGuaranteedFailureJobId,
            )
            await databaseStorage.awaitableWrite { tx in
                if didUpdateGroup {
                    self.performLocalProcessingSync(jobInfo: asyncJob, tx: tx)
                } else {
                    // This only happens for terminal errors when updating the group via the
                    // service. That should only happen if we can no longer access the group
                    // state, e.g. a) we were kicked out of the group, b) our invite was
                    // revoked, or c) our request to join via group invite link was denied.
                    Logger.warn("Discarding unprocess-able message \(asyncJob.envelope.timestamp)")
                }
                self.didCompleteJob(asyncJob.job, tx: tx)
            }
        }

        return hasMore
    }

    private func nextJob(tx: DBReadTransaction) -> GroupMessageProcessorJob? {
        return finder.nextJob(forGroupId: self.groupId, tx: tx)
    }

    private func newestJobId() -> Int64? {
        let db = DependenciesBridge.shared.db
        return db.read { tx in
            finder.newestJobId(tx: tx)
        }
    }

    private func didCompleteJob(_ job: GroupMessageProcessorJob, tx: DBWriteTransaction) {
        finder.removeJob(withRowId: job.id, tx: tx)
    }

    private func performLocalProcessingSync(
        jobInfo: IncomingGroupsV2MessageJobInfo,
        tx: DBWriteTransaction,
    ) {
        let discardMode = Self.discardMode(forJobInfo: jobInfo, hasGroupBeenUpdated: true, tx: tx)
        switch discardMode {
        case .discard:
            // Do nothing.
            break
        case .doNotDiscard, .discardVisibleMessages:
            let tsAccountManager = DependenciesBridge.shared.tsAccountManager
            guard let registeredState = try? tsAccountManager.registeredState(tx: tx) else {
                Logger.warn("Missing registeredState!")
                return
            }

            SSKEnvironment.shared.messageReceiverRef.processEnvelope(
                jobInfo.envelope,
                plaintextData: jobInfo.plaintextData,
                wasReceivedByUD: jobInfo.job.wasReceivedByUD,
                serverDeliveryTimestamp: jobInfo.job.serverDeliveryTimestamp,
                shouldDiscardVisibleMessages: discardMode == .discardVisibleMessages,
                registeredState: registeredState,
                tx: tx,
            )
        }
    }

    private func updateGroup(
        jobInfo: IncomingGroupsV2MessageJobInfo,
        newestGuaranteedFailureJobId: inout Int64?,
    ) async throws(RetryableError) -> Bool {
        // First, we try to update the group locally using changes embedded in
        // the group context (if any).
        if try await updateUsingEmbeddedGroupUpdate(jobInfo: jobInfo) {
            return true
        }

        // Next, we check if we've already failed to fetch state from the server.
        // If we've hit a terminal error, that error applies to ALL of the
        // already-enqueued messages. (It doesn't apply to newly-enqueued messages,
        // though, which is why we compare the job's ID.)
        if let newestGuaranteedFailureJobId, jobInfo.job.id <= newestGuaranteedFailureJobId {
            return false
        }
        // If we're going to check with the server, capture the newest job ID
        // BEFORE issuing the request. This ensures that jobs enqueued after we
        // start this fetch will issue their own fetch.
        let newestJobId: Int64? = self.newestJobId()

        // If that fails, fall back to a fetch via the service.
        if try await tryToUpdateUsingService(jobInfo: jobInfo) {
            return true
        }

        // If we can't fetch via the service, store that result for reuse in future
        // invocations of this method.
        newestGuaranteedFailureJobId = newestJobId
        return false
    }

    /// Try to apply a single embedded (peer-to-peer) update.
    ///
    /// This method should:
    ///
    /// * Return true if message processing should proceed. This means that the
    /// group state has been updated to (at least) the message's revision.
    ///
    /// * Return false if the group could not be updated to the target revision
    /// and we should fail over to fetching group changes and/or latest group
    /// state from the service.
    private func updateUsingEmbeddedGroupUpdate(
        jobInfo: IncomingGroupsV2MessageJobInfo,
    ) async throws(RetryableError) -> Bool {
        let groupId = jobInfo.groupContextInfo.groupId
        let secretParams = jobInfo.groupContextInfo.groupSecretParams

        // TODO: Move this to the other method to avoid duplicate fetches.
        let groupThread = SSKEnvironment.shared.databaseStorageRef.read { tx in
            return TSGroupThread.fetch(forGroupId: groupId, tx: tx)
        }
        guard
            let groupThread,
            let oldGroupModel = groupThread.groupModel as? TSGroupModelV2,
            jobInfo.groupContext.revision == oldGroupModel.revision + 1
        else {
            // We might be learning of a group for the first time, or we might be
            // getting re-added to a group we were previously a member of, or we might
            // have been offline for a while and lost messages in our queue. In all of
            // these cases, we need to fall back to the service.
            return false
        }

        guard let changeProtoData = jobInfo.groupContext.groupChange else {
            // No embedded group change.
            return false
        }

        let changeActionsProto: GroupsProtoGroupChangeActions
        do {
            let changeProto = try GroupsProtoGroupChange(serializedData: changeProtoData)
            guard changeProto.changeEpoch <= GroupManager.changeProtoEpoch else {
                throw OWSGenericError("Not-yet-supported embedded change proto epoch: \(changeProto.changeEpoch).")
            }

            // We need to verify the signatures because these protos came from another
            // client, not the service.
            changeActionsProto = try GroupsV2Protos.parseGroupChangeProto(changeProto, verificationOperation: .verifySignature(groupId: groupId.serialize()))
        } catch {
            Logger.warn("Couldn't verify change actions: \(error)")
            return false
        }

        guard changeActionsProto.revision == jobInfo.groupContext.revision else {
            owsFailDebug("Embedded change proto revision doesn't match context revision.")
            return false
        }

        do {
            let spamReportingMetadata: GroupUpdateSpamReportingMetadata = {
                guard let serverGuid = ValidatedIncomingEnvelope.parseServerGuid(fromEnvelope: jobInfo.envelope) else {
                    return .unreportable
                }
                return .reportable(serverGuid: serverGuid.uuidString.lowercased())
            }()
            try await SSKEnvironment.shared.groupsV2Ref.updateGroupWithChangeActions(
                spamReportingMetadata: spamReportingMetadata,
                changeActionsProto: changeActionsProto,
                groupSecretParams: secretParams,
            )
        } catch {
            if let retryableError = RetryableError(error) {
                throw retryableError
            }
            if case GroupsV2Error.cantApplyChangesToPlaceholder = error {
                Logger.warn("Error: \(error)")
            } else {
                owsFailDebug("Error: \(error)")
            }
            return false
        }

        return true
    }

    private func tryToUpdateUsingService(jobInfo: IncomingGroupsV2MessageJobInfo) async throws(RetryableError) -> Bool {
        let spamReportingMetadata: GroupUpdateSpamReportingMetadata = {
            guard let serverGuid = ValidatedIncomingEnvelope.parseServerGuid(fromEnvelope: jobInfo.envelope) else {
                return .unreportable
            }
            return .reportable(serverGuid: serverGuid.uuidString.lowercased())
        }()
        do {
            try await SSKEnvironment.shared.groupV2UpdatesRef.refreshGroup(
                secretParams: jobInfo.groupContextInfo.groupSecretParams,
                spamReportingMetadata: spamReportingMetadata,
                source: .groupMessage(revision: jobInfo.groupContext.revision),
            )
            return true
        } catch {
            if let retryableError = RetryableError(error) {
                throw retryableError
            }
            if case GroupsV2Error.localUserNotInGroup = error {
                Logger.warn("Error: \(error)")
            } else {
                owsFailDebugUnlessNetworkFailure(error)
            }
            return false
        }
    }

    private struct RetryableError: Error {
        let rawValue: any Error

        init?(_ error: any Error) {
            guard Self.isRetryableError(error) else {
                return nil
            }
            self.rawValue = error
        }

        private static func isRetryableError(_ error: any Error) -> Bool {
            if error.isNetworkFailureOrTimeout {
                return true
            }
            if let statusCode = error.httpStatusCode {
                if statusCode == 401 || (500 <= statusCode && statusCode <= 599) {
                    return true
                }
            }
            switch error {
            case GroupsV2Error.timeout:
                return true
            default:
                return false
            }
        }
    }
}

// MARK: -

public class GroupMessageProcessorManager {

    public static let didFlushGroupsV2MessageQueue = Notification.Name("didFlushGroupsV2MessageQueue")

    private let finder = GroupMessageProcessorJobStore()

    public init() {
        SwiftSingletons.register(self)
    }

    // MARK: -

    private struct State {
        var activeGroupIds = Set<Data>()
        var pendingGroupIds = Set<Data>()
    }

    private let state = AtomicValue(State(), lock: .init())

    /// Starts a processor for every groupId with pending work.
    public func startAllProcessors() async {
        guard CurrentAppContext().shouldProcessIncomingMessages else {
            return
        }

        // Obtain the list of groups that currently need processing.
        let db = DependenciesBridge.shared.db
        let groupIds = db.read { tx -> Set<Data> in
            return Set(finder.allEnqueuedGroupIds(tx: tx))
        }

        if !groupIds.isEmpty {
            Logger.info("(Re-)starting \(groupIds.count) group message processor(s) with pending messages.")
        }

        for groupId in groupIds {
            startProcessorIfNeeded(groupId: groupId)
        }
    }

    private func startProcessorIfNeeded(groupId: Data) {
        let canStart = state.update {
            // Indicate that there's more work (in case it's already running).
            $0.pendingGroupIds.insert(groupId)
            // Start it now if it's not currently running.
            return $0.activeGroupIds.insert(groupId).inserted
        }
        guard canStart else {
            // It's already running, so there's nothing to do.
            return
        }
        // This Task must not be canceled.
        Task {
            var mightHaveMoreWork = true
            while mightHaveMoreWork {
                var didCallWillFetchNextJobs = false
                await SpecificGroupMessageProcessor(groupId: groupId).processBatches(willFetchNextJobs: {
                    state.update(block: { $0.pendingGroupIds.remove(groupId) })
                    didCallWillFetchNextJobs = true
                })
                owsPrecondition(didCallWillFetchNextJobs)
                // There is a race condition where `processBatches()` sees that there's no
                // work left to do and stops executing; however, before this code can
                // remove `groupId` from `activeGroupIds`, another component enqueues new
                // work, calls `startProcessorIfNeeded()`, and returns early because a
                // processor is already running. In this case, we'd "never" process the job
                // (the current processor is stopping; the new one doesn't start). By
                // checking `pendingGroupIds` before clearing `activeGroupIds`, we ensure
                // that (a) this processor will perform the work; or (b) this processor
                // will stop and the next invocation of this method will start a new one.
                //
                // See also: GroupsV2ProfileKeyUpdater's isUpdating/needsUpdate flags.
                mightHaveMoreWork = state.update(block: {
                    if $0.pendingGroupIds.contains(groupId) {
                        return true
                    }
                    $0.activeGroupIds.remove(groupId)
                    if $0.activeGroupIds.isEmpty {
                        NotificationCenter.default.postOnMainThread(name: GroupMessageProcessorManager.didFlushGroupsV2MessageQueue, object: nil)
                    }
                    return false
                })
            }
        }
    }

    // MARK: -

    func enqueue(
        envelope: DecryptedIncomingEnvelope,
        envelopeData: Data,
        serverDeliveryTimestamp: UInt64,
        tx: DBWriteTransaction,
    ) {
        guard !envelopeData.isEmpty else {
            owsFailDebug("Empty envelope.")
            return
        }

        guard let groupId = Self.groupId(from: envelope.content) else {
            owsFailDebug("Missing or invalid group id")
            return
        }

        // We need to persist the decrypted envelope data in this transaction to
        // prevent data loss.
        failIfThrows {
            _ = try GroupMessageProcessorJob.insertRecord(
                envelopeData: envelopeData,
                plaintextData: envelope.plaintextData,
                groupId: groupId,
                wasReceivedByUD: envelope.wasReceivedByUD,
                serverDeliveryTimestamp: serverDeliveryTimestamp,
                tx: tx,
            )
        }

        // The new envelope won't be visible to the processor until this
        // transaction commits, so start it in the transaction completion.
        tx.addSyncCompletion {
            self.startProcessorIfNeeded(groupId: groupId)
        }
    }

    private static func groupId(from contentProto: SSKProtoContent?) -> Data? {
        guard let contentProto, let groupContext = groupContextV2(from: contentProto) else {
            owsFailDebug("Invalid content.")
            return nil
        }
        do {
            let groupContextInfo = try GroupV2ContextInfo.deriveFrom(masterKeyData: groupContext.masterKey ?? Data())
            return groupContextInfo.groupId.serialize()
        } catch {
            owsFailDebug("Invalid group context: \(error).")
            return nil
        }
    }

    public class func canContextBeProcessedImmediately(
        groupContext: SSKProtoGroupContextV2,
        tx: DBReadTransaction,
    ) -> Bool {
        let groupContextInfo: GroupV2ContextInfo
        do {
            groupContextInfo = try GroupV2ContextInfo.deriveFrom(masterKeyData: groupContext.masterKey ?? Data())
        } catch {
            owsFailDebug("Invalid group context: \(error).")
            return false
        }

        // We can only process GV2 messages immediately if:
        // 1. We don't have any other messages queued for this thread
        // 2. The message can be processed without updates

        let existsJob: Bool = GroupMessageProcessorJobStore().existsJob(
            forGroupId: groupContextInfo.groupId.serialize(),
            tx: tx,
        )
        if existsJob {
            Logger.info("Cannot immediately process GV2 message because there are messages queued.")
            return false
        }

        return canContextBeProcessedWithoutUpdate(
            groupContext: groupContext,
            groupContextInfo: groupContextInfo,
            tx: tx,
        )
    }

    fileprivate class func canContextBeProcessedWithoutUpdate(
        groupContext: SSKProtoGroupContextV2,
        groupContextInfo: GroupV2ContextInfo,
        tx: DBReadTransaction,
    ) -> Bool {
        guard let groupThread = TSGroupThread.fetch(forGroupId: groupContextInfo.groupId, tx: tx) else {
            return false
        }
        guard let groupModel = groupThread.groupModel as? TSGroupModelV2 else {
            owsFailDebug("Invalid group model.")
            return true
        }
        let messageRevision = groupContext.revision
        let modelRevision = groupModel.revision
        if messageRevision <= modelRevision {
            return true
        }
        // The incoming message indicates that there is a new group revision. We'll
        // update our group model using either the change proto embedded in the
        // group context or by fetching latest state from the service.
        return false
    }

    fileprivate static func groupContextV2(fromPlaintextData plaintextData: Data) -> SSKProtoGroupContextV2? {
        guard !plaintextData.isEmpty else {
            return nil
        }

        let contentProto: SSKProtoContent
        do {
            contentProto = try SSKProtoContent(serializedData: plaintextData)
        } catch {
            owsFailDebug("could not parse proto: \(error)")
            return nil
        }

        return groupContextV2(from: contentProto)
    }

    public class func groupContextV2(from contentProto: SSKProtoContent) -> SSKProtoGroupContextV2? {
        if let groupV2 = contentProto.dataMessage?.groupV2 {
            return groupV2
        }
        if let groupV2 = contentProto.editMessage?.dataMessage?.groupV2 {
            return groupV2
        }
        if let groupV2 = contentProto.syncMessage?.sent?.message?.groupV2 {
            return groupV2
        }
        if let groupV2 = contentProto.syncMessage?.sent?.editMessage?.dataMessage?.groupV2 {
            return groupV2
        }
        return nil
    }

    public func isProcessing() -> Bool {
        return self.state.update(block: { !$0.activeGroupIds.isEmpty })
    }

    public enum DiscardMode {
        /// Do not process this envelope.
        case discard
        /// Process this envelope.
        case doNotDiscard
        /// Process this envelope, but discard any "renderable" content,
        /// e.g. calls or messages in the chat history.
        case discardVisibleMessages
    }

    /// Returns whether a group message from the given user should be discarded.
    ///
    /// If `shouldCheckGroupModel` is false, only checks whether the sender or
    /// group is blocked.
    public static func discardMode(
        forMessageFrom sourceAci: Aci,
        groupId: GroupIdentifier,
        shouldCheckGroupModel: Bool = true,
        tx: DBReadTransaction,
    ) -> DiscardMode {
        let blockingManager = SSKEnvironment.shared.blockingManagerRef
        let isBlocked: Bool = (
            blockingManager.isAddressBlocked(SignalServiceAddress(sourceAci), transaction: tx)
                || blockingManager.isGroupIdBlocked(groupId, transaction: tx),
        )
        if isBlocked {
            Logger.info("Discarding blocked envelope.")
            return .discard
        }

        // We need to pre-process all incoming envelopes; they might change our
        // group state.
        //
        // But we should only pass envelopes to the MessageManager for processing
        // if they correspond to v2 groups of which we are a non-pending member.
        if shouldCheckGroupModel {
            guard let localAddress = DependenciesBridge.shared.tsAccountManager.localIdentifiers(tx: tx)?.aciAddress else {
                owsFailDebug("Missing localAddress.")
                return .discard
            }
            guard let groupThread = TSGroupThread.fetch(forGroupId: groupId, tx: tx) else {
                // The user might have just deleted the thread
                // but this race should be extremely rare.
                // Usually this should indicate a bug.
                owsFailDebug("Missing thread.")
                return .discard
            }
            guard let groupModel = groupThread.groupModel as? TSGroupModelV2 else {
                owsFailDebug("Invalid group model.")
                return .discard
            }
            guard !groupModel.isTerminated else {
                Logger.warn("Discarding envelope; group is ended.")
                return .discard
            }
            guard groupModel.groupMembership.isFullMember(localAddress) else {
                // * Local user might have just left the group.
                // * Local user may have just learned that we were removed from the group.
                // * Local user might be a pending member with an invite.
                Logger.warn("Discarding envelope; local user is not an active group member.")
                return .discard
            }
            guard groupModel.groupMembership.isFullMember(SignalServiceAddress(sourceAci)) else {
                // * The sender might have just left the group.
                Logger.warn("Discarding envelope; sender is not an active group member.")
                return .discard
            }
            if groupModel.isAnnouncementsOnly {
                guard groupModel.groupMembership.isFullMemberAndAdministrator(SignalServiceAddress(sourceAci)) else {
                    // * Only administrators can send "renderable" messages to a "announcement-only" group.
                    Logger.warn("Discarding renderable content in envelope; sender is not an active group member.")
                    return .discardVisibleMessages
                }
            }
        }

        return .doNotDiscard
    }
}