Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Concurrency/KeyedConcurrentTaskQueue.swift
1 views
//
// Copyright 2025 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import Foundation

/// A [Key: ConcurrentTaskQueue] that prunes empty task queues.
///
/// This type is useful, for example, when you want to limit the number of
/// concurrent operations per ServiceId.
///
/// (This type could be expanded to allow, for example, "no more than 2
/// concurrent tasks per key and no more than 8 concurrent tasks total").
public actor KeyedConcurrentTaskQueue<KeyType: Hashable> {
    private let concurrentLimitPerKey: Int
    private var taskQueues = [KeyType: ReferenceCounted<ConcurrentTaskQueue>]()

    public init(concurrentLimitPerKey: Int) {
        self.concurrentLimitPerKey = concurrentLimitPerKey
    }

    private func buildTaskQueue() -> ReferenceCounted<ConcurrentTaskQueue> {
        return ReferenceCounted(wrappedValue: ConcurrentTaskQueue(concurrentLimit: concurrentLimitPerKey))
    }

    /// See the corresponding ConcurrentTaskQueue method.
    public func runWithoutTaskCancellationHandler<T, E>(forKey key: KeyType, _ block: () async throws(E) -> T) async throws(E) -> T {
        return try await withTaskQueue(forKey: key) { taskQueue async throws(E) in
            return try await taskQueue.runWithoutTaskCancellationHandler(block)
        }
    }

    /// See the corresponding ConcurrentTaskQueue method.
    public func run<T>(forKey key: KeyType, _ block: () async throws -> T) async throws -> T {
        return try await withTaskQueue(forKey: key) { taskQueue async throws in
            return try await taskQueue.run(block)
        }
    }

    /// See the corresponding ConcurrentTaskQueue method.
    public func run<T>(forKey key: KeyType, _ block: () async -> T) async throws(CancellationError) -> T {
        return try await withTaskQueue(forKey: key) { taskQueue async throws(CancellationError) in
            return try await taskQueue.run(block)
        }
    }

    private func withTaskQueue<T, E>(forKey key: KeyType, run block: (_ taskQueue: ConcurrentTaskQueue) async throws(E) -> T) async throws(E) -> T {
        // The increment and decrement both run on this actor, so they are mutually
        // exclusive. We only remove from `taskQueues` when everything is done.
        let taskQueue = taskQueues[key, default: buildTaskQueue()].increment()
        defer {
            // The value must exist in the Dictionary because we own our own reference
            // count and haven't yet decremented it.
            if taskQueues[key]!.decrement() {
                taskQueues.removeValue(forKey: key)
            }
        }
        return try await block(taskQueue)
    }
}

private struct ReferenceCounted<T> {
    var referenceCount = 0
    var wrappedValue: T

    mutating func increment() -> T {
        referenceCount += 1
        return wrappedValue
    }

    mutating func decrement() -> Bool {
        referenceCount -= 1
        return referenceCount == 0
    }
}