Path: blob/main/SignalServiceKit/Messages/GroupMessageProcessor.swift
1 views
//
// Copyright 2020 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
public import LibSignalClient
private struct IncomingGroupsV2MessageJobInfo {
let job: GroupMessageProcessorJob
let envelope: SSKProtoEnvelope
let plaintextData: Data
let groupContext: SSKProtoGroupContextV2
let groupContextInfo: GroupV2ContextInfo
}
// MARK: -
/// Processes group messages for a single group.
///
/// * It retries with exponential backoff.
/// * It retries immediately if reachability, etc., change.
///
/// It returns when all jobs are processed.
class SpecificGroupMessageProcessor {
fileprivate let groupId: Data
private let finder = GroupMessageProcessorJobStore()
fileprivate init(groupId: Data) {
self.groupId = groupId
observeNotifications()
}
private func observeNotifications() {
let nc = NotificationCenter.default
nc.addObserver(
self,
selector: #selector(chatConnectionStateDidChange),
name: OWSChatConnection.chatConnectionStateDidChange,
object: nil,
)
nc.addObserver(
self,
selector: #selector(reachabilityChanged),
name: SSKReachability.owsReachabilityDidChange,
object: nil,
)
}
// MARK: - Notifications
@objc
@MainActor
private func chatConnectionStateDidChange() {
AssertIsOnMainThread()
setMightBeAbleToMakeProgress()
}
@objc
@MainActor
private func reachabilityChanged() {
AssertIsOnMainThread()
setMightBeAbleToMakeProgress()
}
// MARK: -
/// Trigger an immediate retry of queued jobs.
///
/// Call this when an external trigger (e.g., "network became reachable")
/// indicates that previously-failed jobs may succeed upon another attempt.
private func setMightBeAbleToMakeProgress() {
state.update {
$0.mightBeAbleToMakeProgress = true
// If we're currently waiting, try again immediately.
$0.retryIntervalTask?.cancel()
}
}
private struct State {
var mightBeAbleToMakeProgress = true
var retryIntervalTask: Task<Void, any Error>?
}
private let state = AtomicValue(State(), lock: .init())
func processBatches(willFetchNextJobs: () -> Void) async {
var backoffCount = 0
// For as long as there are jobs to process...
while true {
let backgroundTask = OWSBackgroundTask(label: "\(#function)")
do throws(RetryableError) {
defer { backgroundTask.end() }
var newestGuaranteedFailureJobId: Int64?
// ...process them, until we hit an error that requires backoff.
while true {
do throws(CancellationError) {
try await Preconditions([
ProcessingPermittedPrecondition(messagePipelineSupervisor: SSKEnvironment.shared.messagePipelineSupervisorRef),
NotificationPrecondition(notificationName: .registrationStateDidChange, isSatisfied: {
return DependenciesBridge.shared.tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered
}),
]).waitUntilSatisfied()
} catch {
owsFail("Cancellation isn't supported.")
}
// We want a value that is just high enough to yield perf benefits.
let kIncomingMessageBatchSize: Int = 16
// If the app is in the background, use batch size of 1. This (only
// slightly) makes it less likely that we'll hit a 0xdead10cc crash and
// need to re-do work we've already done.
// TODO: Stop processing batches when suspending.
let batchSize: Int = CurrentAppContext().isInBackground() ? 1 : kIncomingMessageBatchSize
willFetchNextJobs()
// Keep track of external triggers while we're processing a batch. These
// may indicate that immediate retries are worthwhile.
state.update {
$0.mightBeAbleToMakeProgress = false
}
let hasMore = try await self.processBatch(
batchLimit: batchSize,
newestGuaranteedFailureJobId: &newestGuaranteedFailureJobId,
)
if !hasMore {
return
}
// If we successfully process a batch, reset the backoff counter. If we had
// to back off while processing a specific update, the next update
// shouldn't reuse that backoff because things seem to be back to normal.
backoffCount = 0
}
} catch let error {
Logger.warn("\(error)")
let retryIntervalTask = state.update { mutableState -> Task<Void, any Error>? in
if mutableState.mightBeAbleToMakeProgress {
Logger.info("Retrying immediately because of an external trigger.")
return nil
} else {
let retryIntervalNs = OWSOperation.retryIntervalForExponentialBackoff(failureCount: backoffCount).clampedNanoseconds
Logger.warn("Waiting for \(OWSOperation.formattedNs(retryIntervalNs))s before retrying.")
let retryIntervalTask = Task {
try await Task.sleep(nanoseconds: retryIntervalNs)
}
mutableState.retryIntervalTask = retryIntervalTask
return retryIntervalTask
}
}
do {
if let retryIntervalTask {
try await retryIntervalTask.value
// Don't increment backoffCount until after we wait for the entire backoff
// to elapse. This ensures that repeated failures (i.e., hitting the server
// and getting errors) will exponentially back off, but if an external
// trigger flip-flops repeatedly (i.e., turning Airplane Mode on and off),
// we increment the exponential backoff to an absurd level.
backoffCount += 1
}
} catch {
Logger.info("Waking up & retrying immediately because of an external trigger.")
}
}
}
}
private static func jobInfo(forJob job: GroupMessageProcessorJob) -> IncomingGroupsV2MessageJobInfo? {
guard let envelope = try? job.parseEnvelope() else {
owsFailDebug("Missing envelope.")
return nil
}
guard
let plaintextData = job.plaintextData,
let groupContext = GroupMessageProcessorManager.groupContextV2(fromPlaintextData: plaintextData)
else {
owsFailDebug("Missing group context.")
return nil
}
guard groupContext.hasRevision else {
owsFailDebug("Missing revision.")
return nil
}
let groupContextInfo: GroupV2ContextInfo
do {
groupContextInfo = try GroupV2ContextInfo.deriveFrom(masterKeyData: groupContext.masterKey ?? Data())
} catch {
owsFailDebug("Invalid group context: \(error).")
return nil
}
return IncomingGroupsV2MessageJobInfo(
job: job,
envelope: envelope,
plaintextData: plaintextData,
groupContext: groupContext,
groupContextInfo: groupContextInfo,
)
}
static func discardMode(
forMessageFrom sourceAci: Aci,
groupContext: SSKProtoGroupContextV2,
tx: DBReadTransaction,
) -> GroupMessageProcessorManager.DiscardMode {
guard groupContext.hasRevision else {
Logger.info("Missing revision in group context")
return .discard
}
let groupContextInfo: GroupV2ContextInfo
do {
groupContextInfo = try GroupV2ContextInfo.deriveFrom(masterKeyData: groupContext.masterKey ?? Data())
} catch {
owsFailDebug("Invalid group context: \(error).")
return .discard
}
return GroupMessageProcessorManager.discardMode(
forMessageFrom: sourceAci,
groupId: groupContextInfo.groupId,
tx: tx,
)
}
private static func discardMode(
forJobInfo jobInfo: IncomingGroupsV2MessageJobInfo,
hasGroupBeenUpdated: Bool,
tx: DBReadTransaction,
) -> GroupMessageProcessorManager.DiscardMode {
guard
let sourceAci = Aci.parseFrom(
serviceIdBinary: jobInfo.envelope.sourceServiceIDBinary,
serviceIdString: jobInfo.envelope.sourceServiceID,
)
else {
owsFailDebug("Invalid source address.")
return .discard
}
return GroupMessageProcessorManager.discardMode(
forMessageFrom: sourceAci,
groupId: jobInfo.groupContextInfo.groupId,
shouldCheckGroupModel: hasGroupBeenUpdated,
tx: tx,
)
}
/// As in other message receiving flows, we want to do batch processing
/// wherever possible for perf reasons (to reduce view updates). We should
/// be able to mostly do that. However, in some cases we need to update the
/// group before we can process the message. These messages may require
/// network requests, so they aren't batched with other messages.
private func canJobBeProcessedWithoutUpdate(
jobInfo: IncomingGroupsV2MessageJobInfo,
tx: DBReadTransaction,
) -> Bool {
if .discard == Self.discardMode(forJobInfo: jobInfo, hasGroupBeenUpdated: false, tx: tx) {
return true
}
return GroupMessageProcessorManager.canContextBeProcessedWithoutUpdate(
groupContext: jobInfo.groupContext,
groupContextInfo: jobInfo.groupContextInfo,
tx: tx,
)
}
/// This method may process only a subset of the jobs.
///
/// - Returns: True if there are more jobs to process.
private func processBatch(batchLimit: Int, newestGuaranteedFailureJobId: inout Int64?) async throws(RetryableError) -> Bool {
let databaseStorage = SSKEnvironment.shared.databaseStorageRef
let hasMore: Bool
let asyncJob: IncomingGroupsV2MessageJobInfo?
(hasMore, asyncJob) = await databaseStorage.awaitableWrite { tx -> (Bool, IncomingGroupsV2MessageJobInfo?) in
for _ in 1...batchLimit {
guard let job = self.nextJob(tx: tx) else {
// Either there's no more jobs or we couldn't fetch jobs. Stop trying.
return (false, nil)
}
guard let jobInfo = Self.jobInfo(forJob: job) else {
self.didCompleteJob(job, tx: tx)
continue
}
if self.canJobBeProcessedWithoutUpdate(jobInfo: jobInfo, tx: tx) {
self.performLocalProcessingSync(jobInfo: jobInfo, tx: tx)
self.didCompleteJob(job, tx: tx)
continue
}
return (true, jobInfo)
}
return (true, nil)
}
if let asyncJob {
let didUpdateGroup = try await updateGroup(
jobInfo: asyncJob,
newestGuaranteedFailureJobId: &newestGuaranteedFailureJobId,
)
await databaseStorage.awaitableWrite { tx in
if didUpdateGroup {
self.performLocalProcessingSync(jobInfo: asyncJob, tx: tx)
} else {
// This only happens for terminal errors when updating the group via the
// service. That should only happen if we can no longer access the group
// state, e.g. a) we were kicked out of the group, b) our invite was
// revoked, or c) our request to join via group invite link was denied.
Logger.warn("Discarding unprocess-able message \(asyncJob.envelope.timestamp)")
}
self.didCompleteJob(asyncJob.job, tx: tx)
}
}
return hasMore
}
private func nextJob(tx: DBReadTransaction) -> GroupMessageProcessorJob? {
return finder.nextJob(forGroupId: self.groupId, tx: tx)
}
private func newestJobId() -> Int64? {
let db = DependenciesBridge.shared.db
return db.read { tx in
finder.newestJobId(tx: tx)
}
}
private func didCompleteJob(_ job: GroupMessageProcessorJob, tx: DBWriteTransaction) {
finder.removeJob(withRowId: job.id, tx: tx)
}
private func performLocalProcessingSync(
jobInfo: IncomingGroupsV2MessageJobInfo,
tx: DBWriteTransaction,
) {
let discardMode = Self.discardMode(forJobInfo: jobInfo, hasGroupBeenUpdated: true, tx: tx)
switch discardMode {
case .discard:
// Do nothing.
break
case .doNotDiscard, .discardVisibleMessages:
let tsAccountManager = DependenciesBridge.shared.tsAccountManager
guard let registeredState = try? tsAccountManager.registeredState(tx: tx) else {
Logger.warn("Missing registeredState!")
return
}
SSKEnvironment.shared.messageReceiverRef.processEnvelope(
jobInfo.envelope,
plaintextData: jobInfo.plaintextData,
wasReceivedByUD: jobInfo.job.wasReceivedByUD,
serverDeliveryTimestamp: jobInfo.job.serverDeliveryTimestamp,
shouldDiscardVisibleMessages: discardMode == .discardVisibleMessages,
registeredState: registeredState,
tx: tx,
)
}
}
private func updateGroup(
jobInfo: IncomingGroupsV2MessageJobInfo,
newestGuaranteedFailureJobId: inout Int64?,
) async throws(RetryableError) -> Bool {
// First, we try to update the group locally using changes embedded in
// the group context (if any).
if try await updateUsingEmbeddedGroupUpdate(jobInfo: jobInfo) {
return true
}
// Next, we check if we've already failed to fetch state from the server.
// If we've hit a terminal error, that error applies to ALL of the
// already-enqueued messages. (It doesn't apply to newly-enqueued messages,
// though, which is why we compare the job's ID.)
if let newestGuaranteedFailureJobId, jobInfo.job.id <= newestGuaranteedFailureJobId {
return false
}
// If we're going to check with the server, capture the newest job ID
// BEFORE issuing the request. This ensures that jobs enqueued after we
// start this fetch will issue their own fetch.
let newestJobId: Int64? = self.newestJobId()
// If that fails, fall back to a fetch via the service.
if try await tryToUpdateUsingService(jobInfo: jobInfo) {
return true
}
// If we can't fetch via the service, store that result for reuse in future
// invocations of this method.
newestGuaranteedFailureJobId = newestJobId
return false
}
/// Try to apply a single embedded (peer-to-peer) update.
///
/// This method should:
///
/// * Return true if message processing should proceed. This means that the
/// group state has been updated to (at least) the message's revision.
///
/// * Return false if the group could not be updated to the target revision
/// and we should fail over to fetching group changes and/or latest group
/// state from the service.
private func updateUsingEmbeddedGroupUpdate(
jobInfo: IncomingGroupsV2MessageJobInfo,
) async throws(RetryableError) -> Bool {
let groupId = jobInfo.groupContextInfo.groupId
let secretParams = jobInfo.groupContextInfo.groupSecretParams
// TODO: Move this to the other method to avoid duplicate fetches.
let groupThread = SSKEnvironment.shared.databaseStorageRef.read { tx in
return TSGroupThread.fetch(forGroupId: groupId, tx: tx)
}
guard
let groupThread,
let oldGroupModel = groupThread.groupModel as? TSGroupModelV2,
jobInfo.groupContext.revision == oldGroupModel.revision + 1
else {
// We might be learning of a group for the first time, or we might be
// getting re-added to a group we were previously a member of, or we might
// have been offline for a while and lost messages in our queue. In all of
// these cases, we need to fall back to the service.
return false
}
guard let changeProtoData = jobInfo.groupContext.groupChange else {
// No embedded group change.
return false
}
let changeActionsProto: GroupsProtoGroupChangeActions
do {
let changeProto = try GroupsProtoGroupChange(serializedData: changeProtoData)
guard changeProto.changeEpoch <= GroupManager.changeProtoEpoch else {
throw OWSGenericError("Not-yet-supported embedded change proto epoch: \(changeProto.changeEpoch).")
}
// We need to verify the signatures because these protos came from another
// client, not the service.
changeActionsProto = try GroupsV2Protos.parseGroupChangeProto(changeProto, verificationOperation: .verifySignature(groupId: groupId.serialize()))
} catch {
Logger.warn("Couldn't verify change actions: \(error)")
return false
}
guard changeActionsProto.revision == jobInfo.groupContext.revision else {
owsFailDebug("Embedded change proto revision doesn't match context revision.")
return false
}
do {
let spamReportingMetadata: GroupUpdateSpamReportingMetadata = {
guard let serverGuid = ValidatedIncomingEnvelope.parseServerGuid(fromEnvelope: jobInfo.envelope) else {
return .unreportable
}
return .reportable(serverGuid: serverGuid.uuidString.lowercased())
}()
try await SSKEnvironment.shared.groupsV2Ref.updateGroupWithChangeActions(
spamReportingMetadata: spamReportingMetadata,
changeActionsProto: changeActionsProto,
groupSecretParams: secretParams,
)
} catch {
if let retryableError = RetryableError(error) {
throw retryableError
}
if case GroupsV2Error.cantApplyChangesToPlaceholder = error {
Logger.warn("Error: \(error)")
} else {
owsFailDebug("Error: \(error)")
}
return false
}
return true
}
private func tryToUpdateUsingService(jobInfo: IncomingGroupsV2MessageJobInfo) async throws(RetryableError) -> Bool {
let spamReportingMetadata: GroupUpdateSpamReportingMetadata = {
guard let serverGuid = ValidatedIncomingEnvelope.parseServerGuid(fromEnvelope: jobInfo.envelope) else {
return .unreportable
}
return .reportable(serverGuid: serverGuid.uuidString.lowercased())
}()
do {
try await SSKEnvironment.shared.groupV2UpdatesRef.refreshGroup(
secretParams: jobInfo.groupContextInfo.groupSecretParams,
spamReportingMetadata: spamReportingMetadata,
source: .groupMessage(revision: jobInfo.groupContext.revision),
)
return true
} catch {
if let retryableError = RetryableError(error) {
throw retryableError
}
if case GroupsV2Error.localUserNotInGroup = error {
Logger.warn("Error: \(error)")
} else {
owsFailDebugUnlessNetworkFailure(error)
}
return false
}
}
private struct RetryableError: Error {
let rawValue: any Error
init?(_ error: any Error) {
guard Self.isRetryableError(error) else {
return nil
}
self.rawValue = error
}
private static func isRetryableError(_ error: any Error) -> Bool {
if error.isNetworkFailureOrTimeout {
return true
}
if let statusCode = error.httpStatusCode {
if statusCode == 401 || (500 <= statusCode && statusCode <= 599) {
return true
}
}
switch error {
case GroupsV2Error.timeout:
return true
default:
return false
}
}
}
}
// MARK: -
public class GroupMessageProcessorManager {
public static let didFlushGroupsV2MessageQueue = Notification.Name("didFlushGroupsV2MessageQueue")
private let finder = GroupMessageProcessorJobStore()
public init() {
SwiftSingletons.register(self)
}
// MARK: -
private struct State {
var activeGroupIds = Set<Data>()
var pendingGroupIds = Set<Data>()
}
private let state = AtomicValue(State(), lock: .init())
/// Starts a processor for every groupId with pending work.
public func startAllProcessors() async {
guard CurrentAppContext().shouldProcessIncomingMessages else {
return
}
// Obtain the list of groups that currently need processing.
let db = DependenciesBridge.shared.db
let groupIds = db.read { tx -> Set<Data> in
return Set(finder.allEnqueuedGroupIds(tx: tx))
}
if !groupIds.isEmpty {
Logger.info("(Re-)starting \(groupIds.count) group message processor(s) with pending messages.")
}
for groupId in groupIds {
startProcessorIfNeeded(groupId: groupId)
}
}
private func startProcessorIfNeeded(groupId: Data) {
let canStart = state.update {
// Indicate that there's more work (in case it's already running).
$0.pendingGroupIds.insert(groupId)
// Start it now if it's not currently running.
return $0.activeGroupIds.insert(groupId).inserted
}
guard canStart else {
// It's already running, so there's nothing to do.
return
}
// This Task must not be canceled.
Task {
var mightHaveMoreWork = true
while mightHaveMoreWork {
var didCallWillFetchNextJobs = false
await SpecificGroupMessageProcessor(groupId: groupId).processBatches(willFetchNextJobs: {
state.update(block: { $0.pendingGroupIds.remove(groupId) })
didCallWillFetchNextJobs = true
})
owsPrecondition(didCallWillFetchNextJobs)
// There is a race condition where `processBatches()` sees that there's no
// work left to do and stops executing; however, before this code can
// remove `groupId` from `activeGroupIds`, another component enqueues new
// work, calls `startProcessorIfNeeded()`, and returns early because a
// processor is already running. In this case, we'd "never" process the job
// (the current processor is stopping; the new one doesn't start). By
// checking `pendingGroupIds` before clearing `activeGroupIds`, we ensure
// that (a) this processor will perform the work; or (b) this processor
// will stop and the next invocation of this method will start a new one.
//
// See also: GroupsV2ProfileKeyUpdater's isUpdating/needsUpdate flags.
mightHaveMoreWork = state.update(block: {
if $0.pendingGroupIds.contains(groupId) {
return true
}
$0.activeGroupIds.remove(groupId)
if $0.activeGroupIds.isEmpty {
NotificationCenter.default.postOnMainThread(name: GroupMessageProcessorManager.didFlushGroupsV2MessageQueue, object: nil)
}
return false
})
}
}
}
// MARK: -
func enqueue(
envelope: DecryptedIncomingEnvelope,
envelopeData: Data,
serverDeliveryTimestamp: UInt64,
tx: DBWriteTransaction,
) {
guard !envelopeData.isEmpty else {
owsFailDebug("Empty envelope.")
return
}
guard let groupId = Self.groupId(from: envelope.content) else {
owsFailDebug("Missing or invalid group id")
return
}
// We need to persist the decrypted envelope data in this transaction to
// prevent data loss.
failIfThrows {
_ = try GroupMessageProcessorJob.insertRecord(
envelopeData: envelopeData,
plaintextData: envelope.plaintextData,
groupId: groupId,
wasReceivedByUD: envelope.wasReceivedByUD,
serverDeliveryTimestamp: serverDeliveryTimestamp,
tx: tx,
)
}
// The new envelope won't be visible to the processor until this
// transaction commits, so start it in the transaction completion.
tx.addSyncCompletion {
self.startProcessorIfNeeded(groupId: groupId)
}
}
private static func groupId(from contentProto: SSKProtoContent?) -> Data? {
guard let contentProto, let groupContext = groupContextV2(from: contentProto) else {
owsFailDebug("Invalid content.")
return nil
}
do {
let groupContextInfo = try GroupV2ContextInfo.deriveFrom(masterKeyData: groupContext.masterKey ?? Data())
return groupContextInfo.groupId.serialize()
} catch {
owsFailDebug("Invalid group context: \(error).")
return nil
}
}
public class func canContextBeProcessedImmediately(
groupContext: SSKProtoGroupContextV2,
tx: DBReadTransaction,
) -> Bool {
let groupContextInfo: GroupV2ContextInfo
do {
groupContextInfo = try GroupV2ContextInfo.deriveFrom(masterKeyData: groupContext.masterKey ?? Data())
} catch {
owsFailDebug("Invalid group context: \(error).")
return false
}
// We can only process GV2 messages immediately if:
// 1. We don't have any other messages queued for this thread
// 2. The message can be processed without updates
let existsJob: Bool = GroupMessageProcessorJobStore().existsJob(
forGroupId: groupContextInfo.groupId.serialize(),
tx: tx,
)
if existsJob {
Logger.info("Cannot immediately process GV2 message because there are messages queued.")
return false
}
return canContextBeProcessedWithoutUpdate(
groupContext: groupContext,
groupContextInfo: groupContextInfo,
tx: tx,
)
}
fileprivate class func canContextBeProcessedWithoutUpdate(
groupContext: SSKProtoGroupContextV2,
groupContextInfo: GroupV2ContextInfo,
tx: DBReadTransaction,
) -> Bool {
guard let groupThread = TSGroupThread.fetch(forGroupId: groupContextInfo.groupId, tx: tx) else {
return false
}
guard let groupModel = groupThread.groupModel as? TSGroupModelV2 else {
owsFailDebug("Invalid group model.")
return true
}
let messageRevision = groupContext.revision
let modelRevision = groupModel.revision
if messageRevision <= modelRevision {
return true
}
// The incoming message indicates that there is a new group revision. We'll
// update our group model using either the change proto embedded in the
// group context or by fetching latest state from the service.
return false
}
fileprivate static func groupContextV2(fromPlaintextData plaintextData: Data) -> SSKProtoGroupContextV2? {
guard !plaintextData.isEmpty else {
return nil
}
let contentProto: SSKProtoContent
do {
contentProto = try SSKProtoContent(serializedData: plaintextData)
} catch {
owsFailDebug("could not parse proto: \(error)")
return nil
}
return groupContextV2(from: contentProto)
}
public class func groupContextV2(from contentProto: SSKProtoContent) -> SSKProtoGroupContextV2? {
if let groupV2 = contentProto.dataMessage?.groupV2 {
return groupV2
}
if let groupV2 = contentProto.editMessage?.dataMessage?.groupV2 {
return groupV2
}
if let groupV2 = contentProto.syncMessage?.sent?.message?.groupV2 {
return groupV2
}
if let groupV2 = contentProto.syncMessage?.sent?.editMessage?.dataMessage?.groupV2 {
return groupV2
}
return nil
}
public func isProcessing() -> Bool {
return self.state.update(block: { !$0.activeGroupIds.isEmpty })
}
public enum DiscardMode {
/// Do not process this envelope.
case discard
/// Process this envelope.
case doNotDiscard
/// Process this envelope, but discard any "renderable" content,
/// e.g. calls or messages in the chat history.
case discardVisibleMessages
}
/// Returns whether a group message from the given user should be discarded.
///
/// If `shouldCheckGroupModel` is false, only checks whether the sender or
/// group is blocked.
public static func discardMode(
forMessageFrom sourceAci: Aci,
groupId: GroupIdentifier,
shouldCheckGroupModel: Bool = true,
tx: DBReadTransaction,
) -> DiscardMode {
let blockingManager = SSKEnvironment.shared.blockingManagerRef
let isBlocked: Bool = (
blockingManager.isAddressBlocked(SignalServiceAddress(sourceAci), transaction: tx)
|| blockingManager.isGroupIdBlocked(groupId, transaction: tx),
)
if isBlocked {
Logger.info("Discarding blocked envelope.")
return .discard
}
// We need to pre-process all incoming envelopes; they might change our
// group state.
//
// But we should only pass envelopes to the MessageManager for processing
// if they correspond to v2 groups of which we are a non-pending member.
if shouldCheckGroupModel {
guard let localAddress = DependenciesBridge.shared.tsAccountManager.localIdentifiers(tx: tx)?.aciAddress else {
owsFailDebug("Missing localAddress.")
return .discard
}
guard let groupThread = TSGroupThread.fetch(forGroupId: groupId, tx: tx) else {
// The user might have just deleted the thread
// but this race should be extremely rare.
// Usually this should indicate a bug.
owsFailDebug("Missing thread.")
return .discard
}
guard let groupModel = groupThread.groupModel as? TSGroupModelV2 else {
owsFailDebug("Invalid group model.")
return .discard
}
guard !groupModel.isTerminated else {
Logger.warn("Discarding envelope; group is ended.")
return .discard
}
guard groupModel.groupMembership.isFullMember(localAddress) else {
// * Local user might have just left the group.
// * Local user may have just learned that we were removed from the group.
// * Local user might be a pending member with an invite.
Logger.warn("Discarding envelope; local user is not an active group member.")
return .discard
}
guard groupModel.groupMembership.isFullMember(SignalServiceAddress(sourceAci)) else {
// * The sender might have just left the group.
Logger.warn("Discarding envelope; sender is not an active group member.")
return .discard
}
if groupModel.isAnnouncementsOnly {
guard groupModel.groupMembership.isFullMemberAndAdministrator(SignalServiceAddress(sourceAci)) else {
// * Only administrators can send "renderable" messages to a "announcement-only" group.
Logger.warn("Discarding renderable content in envelope; sender is not an active group member.")
return .discardVisibleMessages
}
}
}
return .doNotDiscard
}
}