Path: blob/main/SignalServiceKit/Messages/MessageProcessor.swift
1 views
//
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
import GRDB
import LibSignalClient
public class MessageProcessor {
private enum Constants {
// We want a value that is just high enough to yield perf benefits.
static let incomingMessageBatchLimit = 16
static let incomingReceiptBatchLimit = 32
}
public static let messageProcessorDidDrainQueue = Notification.Name("messageProcessorDidDrainQueue")
private var hasPendingEnvelopes: Bool {
!pendingEnvelopes.isEmpty
}
public struct Stages: OptionSet {
public var rawValue: UInt8
public init(rawValue: UInt8) {
self.rawValue = rawValue
}
public static let messageFetcher = Stages(rawValue: 1 << 0)
public static let messageProcessor = Stages(rawValue: 1 << 1)
public static let groupMessageProcessor = Stages(rawValue: 1 << 2)
}
public func waitForFetchingAndProcessing(stages: Stages = [.messageFetcher, .messageProcessor, .groupMessageProcessor]) async throws(CancellationError) {
guard CurrentAppContext().shouldProcessIncomingMessages else {
return
}
var preconditions = [any Precondition]()
if stages.contains(.messageFetcher) {
preconditions.append(SSKEnvironment.shared.messageFetcherJobRef.preconditionForFetchingComplete())
}
if stages.contains(.messageProcessor) {
preconditions.append(NotificationPrecondition(
notificationName: Self.messageProcessorDidDrainQueue,
isSatisfied: { !self.hasPendingEnvelopes },
))
}
if stages.contains(.groupMessageProcessor) {
preconditions.append(NotificationPrecondition(
notificationName: GroupMessageProcessorManager.didFlushGroupsV2MessageQueue,
isSatisfied: { !SSKEnvironment.shared.groupMessageProcessorManagerRef.isProcessing() },
))
}
try await Preconditions(preconditions).waitUntilSatisfied()
}
private let appReadiness: AppReadiness
public init(appReadiness: AppReadiness) {
self.appReadiness = appReadiness
SwiftSingletons.register(self)
appReadiness.runNowOrWhenAppDidBecomeReadySync {
SSKEnvironment.shared.messagePipelineSupervisorRef.register(pipelineStage: self)
NotificationCenter.default.addObserver(
self,
selector: #selector(self.registrationStateDidChange),
name: .registrationStateDidChange,
object: nil,
)
}
}
public func enqueueReceivedEnvelopeData(
_ envelopeData: Data,
serverDeliveryTimestamp: UInt64,
envelopeSource: EnvelopeSource,
completion: @escaping () -> Void,
) {
self.queueForEnqueueing.async {
self._enqueueReceivedEnvelopeData(
envelopeData,
serverDeliveryTimestamp: serverDeliveryTimestamp,
envelopeSource: envelopeSource,
completion: completion,
)
}
}
public func flushEnqueuingQueue(completion: @escaping () -> Void) {
self.queueForEnqueueing.async(completion)
}
private func _enqueueReceivedEnvelopeData(
_ envelopeData: Data,
serverDeliveryTimestamp: UInt64,
envelopeSource: EnvelopeSource,
completion: @escaping () -> Void,
) {
assertOnQueue(self.queueForEnqueueing)
guard !envelopeData.isEmpty else {
owsFailDebug("Empty envelope, envelopeSource: \(envelopeSource).")
completion()
return
}
let protoEnvelope: SSKProtoEnvelope
do {
protoEnvelope = try SSKProtoEnvelope(serializedData: envelopeData)
} catch {
owsFailDebug("Failed to parse encrypted envelope \(error), envelopeSource: \(envelopeSource)")
completion()
return
}
// Drop any too-large messages on the floor. Well behaving clients should never send them.
guard (protoEnvelope.content ?? Data()).count <= Self.maxEnvelopeByteCount else {
owsFailDebug("Oversize envelope, envelopeSource: \(envelopeSource).")
completion()
return
}
enqueueReceivedEnvelope(
ReceivedEnvelope(
envelope: protoEnvelope,
serverDeliveryTimestamp: serverDeliveryTimestamp,
completion: completion,
),
envelopeSource: envelopeSource,
)
}
private func enqueueReceivedEnvelope(_ receivedEnvelope: ReceivedEnvelope, envelopeSource: EnvelopeSource) {
pendingEnvelopes.enqueue(receivedEnvelope)
drainPendingEnvelopes()
}
private static let maxEnvelopeByteCount = 256 * 1024
private let queueForEnqueueing = DispatchQueue(label: "org.signal.message-processor-enqueue")
private let queueForProcessing = DispatchQueue(label: "org.signal.message-processor-process", autoreleaseFrequency: .workItem)
#if TESTABLE_BUILD
var serialQueueForTests: DispatchQueue { queueForProcessing }
#endif
private var pendingEnvelopes = PendingEnvelopes()
private let isDrainingPendingEnvelopes = AtomicBool(false, lock: .init())
public func dropEnqueuedEnvelopes(completion: @escaping () -> Void) {
// Run this on queueForProcessing to ensure we're not going to drop
// envelopes that are currently being processed.
self.queueForProcessing.async {
self.pendingEnvelopes.removeAll()
completion()
}
}
private func drainPendingEnvelopes() {
let tsAccountManager = DependenciesBridge.shared.tsAccountManager
guard CurrentAppContext().shouldProcessIncomingMessages else { return }
guard tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered else { return }
guard tsAccountManager.storedDeviceIdWithMaybeTransaction.ifValid != nil else { return }
guard SSKEnvironment.shared.messagePipelineSupervisorRef.isMessageProcessingPermitted else { return }
queueForProcessing.async {
self.isDrainingPendingEnvelopes.set(true)
while autoreleasepool(invoking: { self.drainNextBatch() }) {}
self.isDrainingPendingEnvelopes.set(false)
if self.pendingEnvelopes.isEmpty {
NotificationCenter.default.postOnMainThread(name: Self.messageProcessorDidDrainQueue, object: nil)
}
}
}
private var recentlyProcessedGuids = SetDeque<UUID>()
/// Should ideally match `MESSAGE_SENDER_MAX_CONCURRENCY`.
private var recentlyProcessedGuidLimit = 256
/// Returns whether or not to continue draining the queue.
private func drainNextBatch() -> Bool {
assertOnQueue(queueForProcessing)
guard SSKEnvironment.shared.messagePipelineSupervisorRef.isMessageProcessingPermitted else {
return false
}
// If the app is in the background, use batch size of 1.
// This reduces the risk of us never being able to drain any
// messages from the queue. We should fine tune this number
// to yield the best perf we can get.
let batchLimitUpperBound = CurrentAppContext().isInBackground() ? 1 : max(
Constants.incomingMessageBatchLimit,
Constants.incomingReceiptBatchLimit,
)
let batch = pendingEnvelopes.nextBatch(batchSize: batchLimitUpperBound)
let batchEnvelopes = batch.batchEnvelopes
let pendingEnvelopesCount = batch.pendingEnvelopesCount
if pendingEnvelopesCount > self.recentlyProcessedGuidLimit {
self.recentlyProcessedGuidLimit = pendingEnvelopesCount
}
guard !batchEnvelopes.isEmpty else {
return false
}
var startTime: CFTimeInterval = 0
var processedEnvelopesCount = 0
var messageCount = 0
var receiptCount = 0
SSKEnvironment.shared.databaseStorageRef.write { tx in
// Start the timer once we acquire a write transaction.
startTime = CACurrentMediaTime()
// This is only called via `drainPendingEnvelopes`, and that confirms that
// we're registered. Consequently, this generally shouldn't fail.
let tsAccountManager = DependenciesBridge.shared.tsAccountManager
guard let registeredState = try? tsAccountManager.registeredState(tx: tx) else {
return
}
guard let localDeviceId = tsAccountManager.storedDeviceId(tx: tx).ifValid else {
return
}
var remainingEnvelopes = batchEnvelopes[...]
while
!remainingEnvelopes.isEmpty,
messageCount < Constants.incomingMessageBatchLimit,
receiptCount < Constants.incomingReceiptBatchLimit
{
guard SSKEnvironment.shared.messagePipelineSupervisorRef.isMessageProcessingPermitted else {
break
}
autoreleasepool {
// If we build a request, we must handle it to ensure it's not lost if we
// stop processing envelopes.
let relatedRequests = buildNextCombinedRequest(
envelopes: &remainingEnvelopes,
localIdentifiers: registeredState.localIdentifiers,
localDeviceId: localDeviceId,
tx: tx,
)
if relatedRequests.first?.deliveryReceiptMessageTimestamps != nil {
receiptCount += relatedRequests.count
} else {
messageCount += relatedRequests.count
}
handle(
relatedRequests: relatedRequests,
registeredState: registeredState,
transaction: tx,
)
}
}
processedEnvelopesCount += batchEnvelopes.count - remainingEnvelopes.count
}
for processedEnvelope in batchEnvelopes.prefix(processedEnvelopesCount) {
guard let serverGuid = ValidatedIncomingEnvelope.parseServerGuid(fromEnvelope: processedEnvelope.envelope) else {
continue
}
recentlyProcessedGuids.pushBack(serverGuid)
}
while recentlyProcessedGuids.count > recentlyProcessedGuidLimit {
_ = recentlyProcessedGuids.popFront()
}
// The groups processing logic relies on `removeProcessedEnvelopes` being
// called after the `write`'s `addSyncCompletion` blocks.
pendingEnvelopes.removeProcessedEnvelopes(processedEnvelopesCount)
let endTime = CACurrentMediaTime()
let formattedDuration = String(format: "%.1f", (endTime - startTime) * 1000)
Logger.info("Processed \(processedEnvelopesCount) envelopes (of \(pendingEnvelopesCount) total) in \(formattedDuration)ms")
return true
}
// If envelopes is not empty, this will emit a single request for a non-delivery receipt or one or more requests
// all for delivery receipts.
private func buildNextCombinedRequest(
envelopes: inout ArraySlice<ReceivedEnvelope>,
localIdentifiers: LocalIdentifiers,
localDeviceId: DeviceId,
tx: DBWriteTransaction,
) -> [ProcessingRequest] {
var results = [ProcessingRequest]()
while let envelope = envelopes.first {
envelopes.removeFirst()
let request = processingRequest(
for: envelope,
localIdentifiers: localIdentifiers,
localDeviceId: localDeviceId,
tx: tx,
)
results.append(request)
if request.deliveryReceiptMessageTimestamps == nil {
// If we hit a non-delivery receipt envelope, handle it immediately to avoid
// keeping potentially large decrypted envelopes in memory.
break
}
}
return results
}
private func handle(relatedRequests: [ProcessingRequest], registeredState: RegisteredState, transaction: DBWriteTransaction) {
// Efficiently handle delivery receipts for the same message by fetching the sent message only
// once and only using one updateWith... to update the message with new recipient state.
BatchingDeliveryReceiptContext.withDeferredUpdates(transaction: transaction) { context in
for request in relatedRequests {
handleProcessingRequest(request, context: context, registeredState: registeredState, tx: transaction)
}
}
}
private func reallyHandleProcessingRequest(
_ request: ProcessingRequest,
context: DeliveryReceiptContext,
registeredState: RegisteredState,
transaction: DBWriteTransaction,
) {
switch request.state {
case .completed(error: let error):
Logger.info("Envelope completed early with error \(String(describing: error))")
case .enqueueForGroup(let decryptedEnvelope, let envelopeData):
SSKEnvironment.shared.groupMessageProcessorManagerRef.enqueue(
envelope: decryptedEnvelope,
envelopeData: envelopeData,
serverDeliveryTimestamp: request.receivedEnvelope.serverDeliveryTimestamp,
tx: transaction,
)
SSKEnvironment.shared.messageReceiverRef.finishProcessingEnvelope(decryptedEnvelope, tx: transaction)
case .messageReceiverRequest(let messageReceiverRequest):
SSKEnvironment.shared.messageReceiverRef.handleRequest(messageReceiverRequest, context: context, registeredState: registeredState, tx: transaction)
SSKEnvironment.shared.messageReceiverRef.finishProcessingEnvelope(messageReceiverRequest.decryptedEnvelope, tx: transaction)
case .clearPlaceholdersOnly(let decryptedEnvelope):
SSKEnvironment.shared.messageReceiverRef.finishProcessingEnvelope(decryptedEnvelope, tx: transaction)
case .serverReceipt(let serverReceiptEnvelope):
SSKEnvironment.shared.messageReceiverRef.handleDeliveryReceipt(envelope: serverReceiptEnvelope, context: context, tx: transaction)
}
}
private func handleProcessingRequest(
_ request: ProcessingRequest,
context: DeliveryReceiptContext,
registeredState: RegisteredState,
tx: DBWriteTransaction,
) {
reallyHandleProcessingRequest(request, context: context, registeredState: registeredState, transaction: tx)
tx.addSyncCompletion { request.receivedEnvelope.completion() }
}
@objc
private func registrationStateDidChange() {
self.drainPendingEnvelopes()
}
}
private struct ProcessingRequest {
enum State {
case completed(error: Error?)
case enqueueForGroup(decryptedEnvelope: DecryptedIncomingEnvelope, envelopeData: Data)
case messageReceiverRequest(MessageReceiverRequest)
case serverReceipt(ServerReceiptEnvelope)
// Message decrypted but had an invalid protobuf.
case clearPlaceholdersOnly(DecryptedIncomingEnvelope)
}
let receivedEnvelope: ReceivedEnvelope
let state: State
// If this request is for a delivery receipt, return the timestamps for the sent-messages it
// corresponds to.
var deliveryReceiptMessageTimestamps: [UInt64]? {
switch state {
case .completed, .enqueueForGroup, .clearPlaceholdersOnly:
return nil
case .serverReceipt(let envelope):
return [envelope.validatedEnvelope.timestamp]
case .messageReceiverRequest(let request):
guard
case .receiptMessage = request.messageType,
let receiptMessage = request.protoContent.receiptMessage,
receiptMessage.type == .delivery
else {
return nil
}
return receiptMessage.timestamp
}
}
init(_ receivedEnvelope: ReceivedEnvelope, state: State) {
self.receivedEnvelope = receivedEnvelope
self.state = state
}
}
private struct ProcessingRequestBuilder {
let receivedEnvelope: ReceivedEnvelope
let blockingManager: BlockingManager
let localDeviceId: DeviceId
let localIdentifiers: LocalIdentifiers
let messageDecrypter: OWSMessageDecrypter
let messageReceiver: MessageReceiver
init(
_ receivedEnvelope: ReceivedEnvelope,
blockingManager: BlockingManager,
localDeviceId: DeviceId,
localIdentifiers: LocalIdentifiers,
messageDecrypter: OWSMessageDecrypter,
messageReceiver: MessageReceiver,
) {
self.receivedEnvelope = receivedEnvelope
self.blockingManager = blockingManager
self.localDeviceId = localDeviceId
self.localIdentifiers = localIdentifiers
self.messageDecrypter = messageDecrypter
self.messageReceiver = messageReceiver
}
func build(tx: DBWriteTransaction) -> ProcessingRequest.State {
do {
let decryptionResult = try receivedEnvelope.decryptIfNeeded(
messageDecrypter: messageDecrypter,
localIdentifiers: localIdentifiers,
localDeviceId: localDeviceId,
tx: tx,
)
switch decryptionResult {
case .serverReceipt(let receiptEnvelope):
return .serverReceipt(receiptEnvelope)
case .decryptedMessage(let decryptedEnvelope):
return processingRequest(for: decryptedEnvelope, tx: tx)
}
} catch {
return .completed(error: error)
}
}
private enum ProcessingStep {
case discard
case enqueueForGroupProcessing
case processNow(shouldDiscardVisibleMessages: Bool)
}
private func processingStep(
for decryptedEnvelope: DecryptedIncomingEnvelope,
tx: DBWriteTransaction,
) -> ProcessingStep {
guard
let contentProto = decryptedEnvelope.content,
let groupContextV2 = GroupMessageProcessorManager.groupContextV2(from: contentProto)
else {
// Non-v2-group messages can be processed immediately.
return .processNow(shouldDiscardVisibleMessages: false)
}
guard
GroupMessageProcessorManager.canContextBeProcessedImmediately(
groupContext: groupContextV2,
tx: tx,
)
else {
// Some v2 group messages required group state to be
// updated before they can be processed.
return .enqueueForGroupProcessing
}
let discardMode = SpecificGroupMessageProcessor.discardMode(
forMessageFrom: decryptedEnvelope.sourceAci,
groupContext: groupContextV2,
tx: tx,
)
switch discardMode {
case .discard:
// Some v2 group messages should be discarded and not processed.
return .discard
case .doNotDiscard:
return .processNow(shouldDiscardVisibleMessages: false)
case .discardVisibleMessages:
// Some v2 group messages should be processed, but discarding any "visible"
// messages, e.g. text messages or calls.
return .processNow(shouldDiscardVisibleMessages: true)
}
}
private func processingRequest(
for decryptedEnvelope: DecryptedIncomingEnvelope,
tx: DBWriteTransaction,
) -> ProcessingRequest.State {
owsPrecondition(CurrentAppContext().shouldProcessIncomingMessages)
// Pre-processing has to happen during the same transaction that performed
// decryption.
messageReceiver.preprocessEnvelope(decryptedEnvelope, tx: tx)
// If the sender is in the block list, we can skip scheduling any additional processing.
let sourceAddress = SignalServiceAddress(decryptedEnvelope.sourceAci)
if blockingManager.isAddressBlocked(sourceAddress, transaction: tx) {
Logger.info("Skipping processing for blocked envelope from \(decryptedEnvelope.sourceAci)")
return .completed(error: MessageProcessingError.blockedSender)
}
if decryptedEnvelope.localIdentity == .pni {
let identityManager = DependenciesBridge.shared.identityManager
identityManager.setShouldSharePhoneNumber(with: decryptedEnvelope.sourceAci, tx: tx)
}
switch processingStep(for: decryptedEnvelope, tx: tx) {
case .discard:
// Do nothing.
return .completed(error: nil)
case .enqueueForGroupProcessing:
// If we can't process the message immediately, we enqueue it for
// for processing in the same transaction within which it was decrypted
// to prevent data loss.
let envelopeData: Data
do {
envelopeData = try decryptedEnvelope.envelope.serializedData()
} catch {
owsFailDebug("failed to reserialize envelope: \(error)")
return .completed(error: error)
}
return .enqueueForGroup(decryptedEnvelope: decryptedEnvelope, envelopeData: envelopeData)
case .processNow(let shouldDiscardVisibleMessages):
// Envelopes can be processed immediately if they're:
// 1. Not a GV2 message.
// 2. A GV2 message that doesn't require updating the group.
//
// The advantage to processing the message immediately is that we can full
// process the message in the same transaction that we used to decrypt it.
// This results in a significant perf benefit verse queueing the message
// and waiting for that queue to open new transactions and process
// messages. The downside is that if we *fail* to process this message
// (e.g. the app crashed or was killed), we'll have to re-decrypt again
// before we process. This is safe since the decrypt operation would also
// be rolled back (since the transaction didn't commit) and should be rare.
messageReceiver.checkForUnknownLinkedDevice(in: decryptedEnvelope, tx: tx)
let buildResult = MessageReceiverRequest.buildRequest(
for: decryptedEnvelope,
serverDeliveryTimestamp: receivedEnvelope.serverDeliveryTimestamp,
shouldDiscardVisibleMessages: shouldDiscardVisibleMessages,
tx: tx,
)
switch buildResult {
case .discard:
return .completed(error: nil)
case .noContent:
return .clearPlaceholdersOnly(decryptedEnvelope)
case .request(let messageReceiverRequest):
return .messageReceiverRequest(messageReceiverRequest)
}
}
}
}
private extension MessageProcessor {
func processingRequest(
for envelope: ReceivedEnvelope,
localIdentifiers: LocalIdentifiers,
localDeviceId: DeviceId,
tx: DBWriteTransaction,
) -> ProcessingRequest {
assertOnQueue(queueForProcessing)
if let serverGuid = ValidatedIncomingEnvelope.parseServerGuid(fromEnvelope: envelope.envelope), recentlyProcessedGuids.contains(serverGuid) {
return ProcessingRequest(envelope, state: .completed(error: OWSGenericError("Skipping because it was recently processed.")))
}
let builder = ProcessingRequestBuilder(
envelope,
blockingManager: SSKEnvironment.shared.blockingManagerRef,
localDeviceId: localDeviceId,
localIdentifiers: localIdentifiers,
messageDecrypter: SSKEnvironment.shared.messageDecrypterRef,
messageReceiver: SSKEnvironment.shared.messageReceiverRef,
)
return ProcessingRequest(envelope, state: builder.build(tx: tx))
}
}
// MARK: -
extension MessageProcessor: MessageProcessingPipelineStage {
public func supervisorDidResumeMessageProcessing(_ supervisor: MessagePipelineSupervisor) {
drainPendingEnvelopes()
}
}
// MARK: -
private struct ReceivedEnvelope {
let envelope: SSKProtoEnvelope
let serverDeliveryTimestamp: UInt64
let completion: () -> Void
enum DecryptionResult {
case serverReceipt(ServerReceiptEnvelope)
case decryptedMessage(DecryptedIncomingEnvelope)
}
func decryptIfNeeded(
messageDecrypter: OWSMessageDecrypter,
localIdentifiers: LocalIdentifiers,
localDeviceId: DeviceId,
tx: DBWriteTransaction,
) throws -> DecryptionResult {
// Figure out what type of envelope we're dealing with.
let validatedEnvelope = try ValidatedIncomingEnvelope(envelope, localIdentifiers: localIdentifiers)
switch validatedEnvelope.kind {
case .serverReceipt:
return .serverReceipt(try ServerReceiptEnvelope(validatedEnvelope))
case .identifiedSender(let cipherType):
return .decryptedMessage(
try messageDecrypter.decryptIdentifiedEnvelope(
validatedEnvelope,
cipherType: cipherType,
localIdentifiers: localIdentifiers,
localDeviceId: localDeviceId,
tx: tx,
),
)
case .unidentifiedSender:
return .decryptedMessage(
try messageDecrypter.decryptUnidentifiedSenderEnvelope(
validatedEnvelope,
localIdentifiers: localIdentifiers,
localDeviceId: localDeviceId,
tx: tx,
),
)
}
}
}
// MARK: -
public enum EnvelopeSource {
case unknown
case websocketIdentified
case websocketUnidentified
case debugUI
case tests
}
// MARK: -
private class PendingEnvelopes {
private let unfairLock = UnfairLock()
private var pendingEnvelopes = [ReceivedEnvelope]()
var isEmpty: Bool {
unfairLock.withLock { pendingEnvelopes.isEmpty }
}
var count: Int {
unfairLock.withLock { pendingEnvelopes.count }
}
struct Batch {
let batchEnvelopes: [ReceivedEnvelope]
let pendingEnvelopesCount: Int
}
func nextBatch(batchSize: Int) -> Batch {
unfairLock.withLock {
Batch(
batchEnvelopes: Array(pendingEnvelopes.prefix(batchSize)),
pendingEnvelopesCount: pendingEnvelopes.count,
)
}
}
func removeProcessedEnvelopes(_ processedEnvelopesCount: Int) {
unfairLock.withLock {
pendingEnvelopes.removeFirst(processedEnvelopesCount)
}
}
func removeAll() {
unfairLock.withLock {
pendingEnvelopes.removeAll()
}
}
func enqueue(_ receivedEnvelope: ReceivedEnvelope) {
unfairLock.withLock {
pendingEnvelopes.append(receivedEnvelope)
}
}
}
// MARK: -
public enum MessageProcessingError: Error {
case wrongDestinationUuid
case invalidMessageTypeForDestinationUuid
case blockedSender
}