Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Profiles/ProfileFetcher.swift
1 views
//
// Copyright 2020 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import Foundation
public import LibSignalClient

public struct ProfileFetchContext {
    /// If set, GSEs will be used as a fallback auth mechanism.
    var groupId: GroupIdentifier?

    /// If true, the fetch may be arbitrarily dropped if deemed non-critical.
    public var isOpportunistic: Bool

    /// If true, the fetch must try to fetch a new credential.
    public var mustFetchNewCredential: Bool

    /// The nature of the user profile fetch.  This determines side-effect behavior
    /// such as if storage service is updated after the write.
    public var userProfileWriter: UserProfileWriter

    public init(
        groupId: GroupIdentifier? = nil,
        isOpportunistic: Bool = false,
        mustFetchNewCredential: Bool = false,
        userProfileWriter: UserProfileWriter = .profileFetch,
    ) {
        self.groupId = groupId
        self.isOpportunistic = isOpportunistic
        self.mustFetchNewCredential = mustFetchNewCredential
        self.userProfileWriter = userProfileWriter
    }
}

public protocol ProfileFetcher {
    func fetchProfileImpl(for serviceId: ServiceId, context: ProfileFetchContext, authedAccount: AuthedAccount) async throws -> FetchedProfile
    func fetchProfileSyncImpl(for serviceId: ServiceId, context: ProfileFetchContext, authedAccount: AuthedAccount) -> Task<FetchedProfile, Error>
    func waitForPendingFetches(for serviceId: ServiceId) async throws
}

extension ProfileFetcher {
    public func fetchProfile(
        for serviceId: ServiceId,
        context: ProfileFetchContext = ProfileFetchContext(),
        authedAccount: AuthedAccount = .implicit(),
    ) async throws -> FetchedProfile {
        return try await fetchProfileImpl(for: serviceId, context: context, authedAccount: authedAccount)
    }

    func fetchProfileSync(
        for serviceId: ServiceId,
        context: ProfileFetchContext = ProfileFetchContext(),
        authedAccount: AuthedAccount = .implicit(),
    ) -> Task<FetchedProfile, Error> {
        return fetchProfileSyncImpl(for: serviceId, context: context, authedAccount: authedAccount)
    }
}

public enum ProfileFetcherError: Error {
    case skippingOpportunisticFetch
    case couldNotFetchCredential
}

public actor ProfileFetcherImpl: ProfileFetcher {
    private let jobCreator: (
        ServiceId,
        GroupIdentifier?,
        _ mustFetchNewCredential: Bool,
        AuthedAccount,
        UserProfileWriter,
    ) -> ProfileFetcherJob
    private let reachabilityManager: any SSKReachabilityManager
    private let tsAccountManager: any TSAccountManager

    private let recentFetchResults = LRUCache<ServiceId, FetchResult>(maxSize: 16000, nseMaxSize: 4000)

    private struct FetchResult {
        let outcome: Outcome
        enum Outcome {
            case success
            case networkFailure
            case requestFailure(ProfileRequestError)
            case otherFailure
        }

        let completionDate: MonotonicDate

        init(outcome: Outcome, completionDate: MonotonicDate) {
            self.outcome = outcome
            self.completionDate = completionDate
        }
    }

    private nonisolated let inProgressFetches = AtomicValue<[ServiceId: [FetchState]]>([:], lock: .init())

    private class FetchState {
        var waiterContinuations = [CancellableContinuation<Void>]()
    }

    private var rateLimitExpirationDate: MonotonicDate?
    private var scheduledOpportunisticDate: MonotonicDate?

    init(
        accountChecker: AccountChecker,
        db: any DB,
        disappearingMessagesConfigurationStore: any DisappearingMessagesConfigurationStore,
        identityManager: any OWSIdentityManager,
        paymentsHelper: any PaymentsHelper,
        profileManager: any ProfileManager,
        reachabilityManager: any SSKReachabilityManager,
        recipientDatabaseTable: RecipientDatabaseTable,
        syncManager: any SyncManagerProtocol,
        tsAccountManager: any TSAccountManager,
        udManager: any OWSUDManager,
        versionedProfiles: any VersionedProfiles,
    ) {
        self.reachabilityManager = reachabilityManager
        self.tsAccountManager = tsAccountManager
        self.jobCreator = { serviceId, groupIdContext, mustFetchNewCredential, authedAccount, userProfileWriter in
            return ProfileFetcherJob(
                serviceId: serviceId,
                groupIdContext: groupIdContext,
                mustFetchNewCredential: mustFetchNewCredential,
                authedAccount: authedAccount,
                accountChecker: accountChecker,
                db: db,
                disappearingMessagesConfigurationStore: disappearingMessagesConfigurationStore,
                identityManager: identityManager,
                paymentsHelper: paymentsHelper,
                profileManager: profileManager,
                recipientDatabaseTable: recipientDatabaseTable,
                syncManager: syncManager,
                tsAccountManager: tsAccountManager,
                udManager: udManager,
                versionedProfiles: versionedProfiles,
                userProfileWriter: userProfileWriter,
            )
        }
        SwiftSingletons.register(self)
    }

    private nonisolated func insertFetchState(serviceId: ServiceId) -> FetchState {
        let fetchState = FetchState()
        self.inProgressFetches.update {
            $0[serviceId, default: []].append(fetchState)
        }
        return fetchState
    }

    private nonisolated func finalizeFetchState(
        serviceId: ServiceId,
        fetchState: FetchState,
    ) {
        self.inProgressFetches.update {
            $0[serviceId, default: []].removeAll(where: { $0 === fetchState })
        }
        for waiter in fetchState.waiterContinuations {
            waiter.resume(with: .success(()))
        }
    }

    public nonisolated func fetchProfileSyncImpl(
        for serviceId: ServiceId,
        context: ProfileFetchContext,
        authedAccount: AuthedAccount,
    ) -> Task<FetchedProfile, Error> {
        // Insert this before starting the Task to ensure we've denoted the pending
        // fetch before returning control to the caller.
        let fetchState = insertFetchState(serviceId: serviceId)
        return Task {
            return try await self.fetchProfileWithOptionsAndFinalize(
                serviceId: serviceId,
                fetchState: fetchState,
                context: context,
                authedAccount: authedAccount,
            )
        }
    }

    public func fetchProfileImpl(
        for serviceId: ServiceId,
        context: ProfileFetchContext,
        authedAccount: AuthedAccount,
    ) async throws -> FetchedProfile {
        // We're already running concurrently with other code, so no new race
        // conditions are introduced by calling `insertFetchState` inline.
        return try await fetchProfileWithOptionsAndFinalize(
            serviceId: serviceId,
            fetchState: insertFetchState(serviceId: serviceId),
            context: context,
            authedAccount: authedAccount,
        )
    }

    private func fetchProfileWithOptionsAndFinalize(
        serviceId: ServiceId,
        fetchState: FetchState,
        context: ProfileFetchContext,
        authedAccount: AuthedAccount,
    ) async throws -> FetchedProfile {
        let result = await Result {
            try await fetchProfileWithOptions(
                serviceId: serviceId,
                context: context,
                authedAccount: authedAccount,
            )
        }
        finalizeFetchState(serviceId: serviceId, fetchState: fetchState)
        return try result.get()
    }

    private func fetchProfileWithOptions(
        serviceId: ServiceId,
        context: ProfileFetchContext,
        authedAccount: AuthedAccount,
    ) async throws -> FetchedProfile {
        if context.isOpportunistic {
            if !CurrentAppContext().isMainApp {
                throw ProfileFetcherError.skippingOpportunisticFetch
            }
            return try await fetchProfileOpportunistically(serviceId: serviceId, context: context, authedAccount: authedAccount)
        }
        return try await fetchProfileUrgently(serviceId: serviceId, context: context, authedAccount: authedAccount)
    }

    private func fetchProfileOpportunistically(
        serviceId: ServiceId,
        context: ProfileFetchContext,
        authedAccount: AuthedAccount,
    ) async throws -> FetchedProfile {
        if CurrentAppContext().isRunningTests {
            throw ProfileFetcherError.skippingOpportunisticFetch
        }
        guard shouldOpportunisticallyFetch(serviceId: serviceId) else {
            throw ProfileFetcherError.skippingOpportunisticFetch
        }
        guard isRegisteredOrExplicitlyAuthenticated(authedAccount: authedAccount) else {
            throw ProfileFetcherError.skippingOpportunisticFetch
        }
        // We don't need opportunistic fetches for ourself.
        let localIdentifiers = try tsAccountManager.localIdentifiersWithMaybeSneakyTransaction(authedAccount: authedAccount)
        guard !localIdentifiers.contains(serviceId: serviceId) else {
            throw ProfileFetcherError.skippingOpportunisticFetch
        }
        try await waitIfNecessary()
        // Check again since we might have fetched while waiting.
        guard shouldOpportunisticallyFetch(serviceId: serviceId) else {
            throw ProfileFetcherError.skippingOpportunisticFetch
        }
        return try await fetchProfileUrgently(serviceId: serviceId, context: context, authedAccount: authedAccount)
    }

    private func isRegisteredOrExplicitlyAuthenticated(authedAccount: AuthedAccount) -> Bool {
        switch authedAccount.info {
        case .implicit:
            return tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered
        case .explicit:
            return true
        }
    }

    private func fetchProfileUrgently(
        serviceId: ServiceId,
        context: ProfileFetchContext,
        authedAccount: AuthedAccount,
    ) async throws -> FetchedProfile {
        let result = await Result { try await jobCreator(
            serviceId,
            context.groupId,
            context.mustFetchNewCredential,
            authedAccount,
            context.userProfileWriter,
        ).run() }
        let outcome: FetchResult.Outcome
        do {
            _ = try result.get()
            outcome = .success
        } catch let error as ProfileRequestError {
            outcome = .requestFailure(error)
        } catch where error.isNetworkFailureOrTimeout {
            outcome = .networkFailure
        } catch {
            outcome = .otherFailure
        }
        let now = MonotonicDate()
        if case .failure(ProfileRequestError.rateLimit) = result {
            self.rateLimitExpirationDate = now.adding(5 * .minute)
        }
        self.recentFetchResults[serviceId] = FetchResult(outcome: outcome, completionDate: now)
        return try result.get()
    }

    private func waitIfNecessary() async throws {
        let now = MonotonicDate()

        // We need to throttle these jobs.
        //
        // The profile fetch rate limit is a bucket size of 4320, which refills at
        // a rate of 3 per minute.
        //
        // This class handles the "bulk" profile fetches which are common but not
        // urgent. The app also does other "blocking" profile fetches which are
        // urgent but not common. To help ensure that "blocking" profile fetches
        // succeed, the "bulk" profile fetches are cautious. This takes two forms:
        //
        // * Rate-limiting bulk profiles faster than the service's rate limit.
        // * Backing off aggressively if we hit the rate limit.

        let minimumDelay: TimeInterval
        if let rateLimitExpirationDate, now < rateLimitExpirationDate {
            minimumDelay = 20
        } else {
            minimumDelay = 0.1
        }

        let minimumDate = self.scheduledOpportunisticDate?.adding(minimumDelay)
        self.scheduledOpportunisticDate = [now, minimumDate].compacted().max()!

        if let minimumDate, now < minimumDate {
            try await Task.sleep(nanoseconds: (minimumDate - now).nanoseconds)
        }
    }

    private func shouldOpportunisticallyFetch(serviceId: ServiceId) -> Bool {
        guard let fetchResult = self.recentFetchResults[serviceId] else {
            return true
        }

        let retryDelay: TimeInterval
        switch fetchResult.outcome {
        case .success:
            retryDelay = 5 * .minute
        case .networkFailure:
            retryDelay = 1 * .minute
        case .requestFailure(.notAuthorized):
            retryDelay = 30 * .minute
        case .requestFailure(.notFound):
            retryDelay = 6 * .hour
        case .requestFailure(.rateLimit):
            retryDelay = 5 * .minute
        case .otherFailure:
            retryDelay = 30 * .minute
        }

        return MonotonicDate() > fetchResult.completionDate.adding(retryDelay)
    }

    // MARK: - Waiting

    public func waitForPendingFetches(for serviceId: ServiceId) async throws {
        try await withThrowingTaskGroup(of: Void.self) { taskGroup in
            let cancellableContinuations = self.inProgressFetches.update {
                // There might be multiple profile fetches queued up for a single
                // serviceId. We add a CancellableContinuation for *each* of those because
                // we want to wait for whichever takes the longest.
                return $0[serviceId, default: []].map { fetchState in
                    let result = CancellableContinuation<Void>()
                    fetchState.waiterContinuations.append(result)
                    return result
                }
            }
            for cancellableContinuation in cancellableContinuations {
                taskGroup.addTask { try await cancellableContinuation.wait() }
            }
            try await taskGroup.waitForAll()
        }
    }
}