Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
signalapp
GitHub Repository: signalapp/Signal-iOS
Path: blob/main/SignalServiceKit/Upload/AttachmentUpload.swift
1 views
//
// Copyright 2023 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//

import Foundation
import QuartzCore

extension Upload.Constants {
    fileprivate static let uploadMaxRetries = 8
    fileprivate static let maxUploadProgressRetries = 2
}

public enum AttachmentUpload {

    // MARK: - Upload Entrypoint

    /// The main entry point into the CDN2/CDN3 upload flow.
    /// This method is responsible for prepping the source data and its metadata.
    /// From this point forward,  the upload doesn't have any knowledge of the source (attachment, backup, image, etc)
    public static func start<Metadata: UploadMetadata>(
        attempt: Upload.Attempt<Metadata>,
        dateProvider: @escaping DateProvider,
        sleepTimer: Upload.Shims.SleepTimer,
        progress: OWSProgressSink?,
    ) async throws -> Upload.Result<Metadata> {
        try Task.checkCancellation()

        let progressSource = await progress?.addSource(
            withLabel: "upload",
            unitCount: UInt64(attempt.encryptedDataLength),
        )

        return try await attemptUpload(
            attempt: attempt,
            dateProvider: dateProvider,
            sleepTimer: sleepTimer,
            progress: progressSource,
        )
    }

    /// The retriable parts of the upload.
    /// 1. Create upload endpoint
    /// 2. Get the target URL from the endpoint
    /// 3. Initate the upload via the endpoint
    ///
    /// - Parameters:
    ///   - localMetadata: The metadata and URL path for the local upload data
    ///   will create a new request and fetch a new form.
    ///   - progressBlock: Callback notified up upload progress.
    /// - returns: `Upload.Result` reflecting the metadata of the final upload result.
    ///
    private static func attemptUpload<Metadata: UploadMetadata>(
        attempt: Upload.Attempt<Metadata>,
        dateProvider: @escaping DateProvider,
        sleepTimer: Upload.Shims.SleepTimer,
        progress: OWSProgressSource?,
    ) async throws -> Upload.Result<Metadata> {
        attempt.logger.info("Begin upload. (CDN\(attempt.cdnNumber)) [\(attempt.encryptedDataLength) bytes]")
        try await performResumableUpload(
            attempt: attempt,
            sleepTimer: sleepTimer,
            progress: progress,
        )
        return Upload.Result(
            cdnKey: attempt.cdnKey,
            cdnNumber: attempt.cdnNumber,
            localUploadMetadata: attempt.localMetadata,
            beginTimestamp: attempt.beginTimestamp,
            finishTimestamp: dateProvider().ows_millisecondsSince1970,
        )
    }

    /// Consult the UploadEndpoint to determine how much has already been uploaded.
    private static func getResumableUploadProgress<Metadata: UploadMetadata>(
        attempt: Upload.Attempt<Metadata>,
        count: UInt = 0,
    ) async throws -> Upload.ResumeProgress {
        do {
            return try await attempt.endpoint.getResumableUploadProgress(attempt: attempt)
        } catch {
            guard
                count < Upload.Constants.maxUploadProgressRetries,
                error.isNetworkFailureOrTimeout
            else {
                throw error
            }
            if
                case Upload.Error.uploadFailure(let recoveryMode) = error,
                case .restart = recoveryMode
            {
                throw error
            }
            attempt.logger.info("Retry fetching upload progress.")
            return try await getResumableUploadProgress(attempt: attempt, count: count + 1)
        }
    }

    /// Upload the file using the endpoint and report progress
    private static func performResumableUpload<Metadata: UploadMetadata>(
        attempt: Upload.Attempt<Metadata>,
        sleepTimer: Upload.Shims.SleepTimer,
        count: UInt = 0,
        priorUploadProgress: Upload.ResumeProgress? = nil,
        progress: OWSProgressSource?,
    ) async throws {
        guard count < Upload.Constants.uploadMaxRetries else {
            throw Upload.Error.uploadFailure(recovery: .noMoreRetries)
        }
        let startTime = CACurrentMediaTime()

        let totalDataLength = attempt.encryptedDataLength
        let bytesAlreadyUploaded: Int

        // Only check remote upload progress if we think progress was made locally
        if attempt.isResumedUpload || count > 0 {
            let uploadProgress: Upload.ResumeProgress
            if let priorUploadProgress {
                uploadProgress = priorUploadProgress
            } else {
                uploadProgress = try await getResumableUploadProgress(attempt: attempt)
            }
            switch uploadProgress {
            case .complete:
                attempt.logger.info("Complete upload reported by endpoint.")

                progress?.incrementCompletedUnitCount(by: UInt64(totalDataLength))

                return
            case .uploaded(let updatedBytesAlreadUploaded):
                attempt.logger.info("Endpoint reported \(updatedBytesAlreadUploaded)/\(attempt.encryptedDataLength) uploaded.")
                bytesAlreadyUploaded = updatedBytesAlreadUploaded
                if bytesAlreadyUploaded == totalDataLength {
                    attempt.logger.info("Complete upload reported by endpoint.")
                    progress?.incrementCompletedUnitCount(by: UInt64(totalDataLength))
                    return
                } else if bytesAlreadyUploaded > totalDataLength {
                    attempt.logger.warn("Endpoint reported upload size larger than local size. Marking as failed")
                    throw Upload.Error.uploadFailure(recovery: .restart(.afterBackoff))
                }
            case .restart:
                attempt.logger.warn("Error with fetching progress. Restart upload.")
                throw Upload.Error.uploadFailure(recovery: .restart(.afterBackoff))
            }
        } else {
            bytesAlreadyUploaded = 0
        }

        let internalProgress: OWSProgressSource
        if let progress {
            internalProgress = progress
        } else {
            let internalProgressSink = OWSProgress.createSink { _ in }
            internalProgress = await internalProgressSink.addSource(withLabel: "upload", unitCount: UInt64(totalDataLength))
        }

        // Total progress is (progress from previous attempts/slices +
        // progress from this attempt/slice).
        if internalProgress.completedUnitCount < bytesAlreadyUploaded {
            internalProgress.incrementCompletedUnitCount(by: UInt64(bytesAlreadyUploaded) - internalProgress.completedUnitCount)
        }

        func downloadTimeLogString(_ bytesUploaded: UInt64) -> String {
            let totalTime = CACurrentMediaTime() - startTime
            guard totalTime > 0 else { return "" }

            let bytesDownloaded = bytesUploaded - UInt64(bytesAlreadyUploaded)
            let rate = Double(bytesDownloaded / 1024) / totalTime
            let timeMessage = String(format: "%lld bytes in %.2fs", bytesDownloaded, totalTime)

            if bytesDownloaded > 0 {
                return timeMessage + String(format: " (%.2f KiB/s)", rate)
            } else {
                return timeMessage
            }
        }

        do {
            try await attempt.endpoint.performUpload(
                startPoint: bytesAlreadyUploaded,
                attempt: attempt,
                progress: internalProgress,
            )
            attempt.logger.info("Attachment uploaded successfully. \(bytesAlreadyUploaded) -> \(internalProgress.completedUnitCount) (\(downloadTimeLogString(internalProgress.completedUnitCount))")
        } catch {
            if let statusCode = error.httpStatusCode {
                attempt.logger.warn("Encountered error during upload. (code=\(statusCode)")
            } else {
                attempt.logger.warn("Encountered error during upload. ")
            }

            var failureMode: Upload.FailureMode = .noMoreRetries
            var latestUploadProgress: Upload.ResumeProgress?
            var latestUploadProgressBytes: UInt32 = UInt32(truncatingIfNeeded: internalProgress.completedUnitCount)
            var uploadReportedRemoteProgress = false
            switch error {
            case .partialUpload(let bytesUploaded):
                attempt.logger.info("Endpoint successfully uploaded chunk of \(bytesUploaded) bytes.")
                uploadReportedRemoteProgress = true
                latestUploadProgressBytes += bytesUploaded
                failureMode = .resume(.immediately)
            case .uploadFailure(let retryMode):
                // if a failure mode was passed back
                failureMode = retryMode
            case .networkTimeout:
                // if this isn't an understood error, map into a failure mode
                // fetch the progress to determine if we've made progress.
                latestUploadProgress = try? await getResumableUploadProgress(attempt: attempt)
                switch latestUploadProgress {
                case .complete:
                    latestUploadProgressBytes = totalDataLength
                    failureMode = .resume(.immediately)
                case .restart:
                    latestUploadProgressBytes = 0
                    failureMode = .restart(.afterBackoff)
                case .none:
                    failureMode = .resume(.afterBackoff)
                case .uploaded(let remoteBytesCount):
                    attempt.logger.info("Endpoint reported \(remoteBytesCount)/\(attempt.encryptedDataLength) uploaded.")
                    latestUploadProgressBytes = UInt32(truncatingIfNeeded: remoteBytesCount)
                    if latestUploadProgressBytes > bytesAlreadyUploaded {
                        uploadReportedRemoteProgress = true
                        // The remote endpoint reports progress was made, so retry immediately.
                        failureMode = .resume(.immediately)
                    } else {
                        failureMode = .resume(.afterBackoff)
                    }
                }
            case .networkError:
                failureMode = .resume(.afterBackoff)
            case .missingFile:
                attempt.logger.error("Missing attachment file!")
                failureMode = .noMoreRetries
            case .invalidUploadURL, .unsupportedEndpoint, .unexpectedResponseStatusCode, .unknown:
                // These errors are unrecoverable, so restart the upload in hopes of correcting the issue.
                failureMode = .restart(.afterBackoff)
            }

            if uploadReportedRemoteProgress {
                attempt.logger.info("Upload reported making progress: \(bytesAlreadyUploaded) -> \(latestUploadProgressBytes) (\(downloadTimeLogString(UInt64(latestUploadProgressBytes))))")
            }

            switch failureMode {
            case .noMoreRetries:
                attempt.logger.warn("No more retries.")
                throw error
            case .resume(let recoveryMode):
                switch recoveryMode {
                case .immediately:
                    attempt.logger.warn("Retry upload immediately.")
                case .afterBackoff:
                    let backoff = OWSOperation.retryIntervalForExponentialBackoff(failureCount: count, maxAverageBackoff: 14.1 * .minute)
                    attempt.logger.warn(String(format: "Retry upload after %.3f seconds.", backoff))
                    try await sleepTimer.sleep(for: backoff)
                case .afterServerRequestedDelay(let delay):
                    attempt.logger.warn(String(format: "Retry upload after %.3f seconds.", delay))
                    try await sleepTimer.sleep(for: delay)
                }
            case .restart:
                // Restart is handled at a higher level since the whole
                // upload form needs to be rebuilt.
                throw Upload.Error.uploadFailure(recovery: failureMode)
            }

            attempt.logger.info("Resuming upload.")
            // Reset the attempt count to 1 as long as remote progress was made. Make it 1, since 0
            // will behave like a fresh upload and skip fetching the remote upload progress.
            let nextAttemptCount = uploadReportedRemoteProgress ? 1 : count + 1
            try await performResumableUpload(
                attempt: attempt,
                sleepTimer: sleepTimer,
                count: nextAttemptCount,
                priorUploadProgress: latestUploadProgress,
                progress: progress,
            )
        }
    }

    // MARK: - Helper Methods

    public static func buildAttempt(
        for localMetadata: Upload.LocalUploadMetadata,
        form: Upload.Form,
        existingSessionUrl: URL? = nil,
        signalService: OWSSignalServiceProtocol,
        fileSystem: Upload.Shims.FileSystem,
        dateProvider: @escaping DateProvider,
        logger: PrefixedLogger,
    ) async throws -> Upload.Attempt<Upload.LocalUploadMetadata> {
        return try await buildAttempt(
            for: localMetadata,
            fileUrl: localMetadata.fileUrl,
            encryptedDataLength: localMetadata.encryptedDataLength,
            form: form,
            existingSessionUrl: existingSessionUrl,
            signalService: signalService,
            fileSystem: fileSystem,
            dateProvider: dateProvider,
            logger: logger,
        )
    }

    public static func buildAttempt(
        for metadata: Upload.LinkNSyncUploadMetadata,
        form: Upload.Form,
        existingSessionUrl: URL? = nil,
        signalService: OWSSignalServiceProtocol,
        fileSystem: Upload.Shims.FileSystem,
        dateProvider: @escaping DateProvider,
        logger: PrefixedLogger,
    ) async throws -> Upload.Attempt<Upload.LinkNSyncUploadMetadata> {
        return try await buildAttempt(
            for: metadata,
            fileUrl: metadata.fileUrl,
            encryptedDataLength: metadata.encryptedDataLength,
            form: form,
            existingSessionUrl: existingSessionUrl,
            signalService: signalService,
            fileSystem: fileSystem,
            dateProvider: dateProvider,
            logger: logger,
        )
    }

    public static func buildAttempt(
        for localMetadata: Upload.EncryptedBackupUploadMetadata,
        form: Upload.Form,
        existingSessionUrl: URL? = nil,
        signalService: OWSSignalServiceProtocol,
        fileSystem: Upload.Shims.FileSystem,
        dateProvider: @escaping DateProvider,
        logger: PrefixedLogger,
    ) async throws -> Upload.Attempt<Upload.EncryptedBackupUploadMetadata> {
        return try await buildAttempt(
            for: localMetadata,
            fileUrl: localMetadata.fileUrl,
            encryptedDataLength: localMetadata.encryptedDataLength,
            form: form,
            existingSessionUrl: existingSessionUrl,
            signalService: signalService,
            fileSystem: fileSystem,
            dateProvider: dateProvider,
            logger: logger,
        )
    }

    public static func buildAttempt<Metadata: UploadMetadata>(
        for localMetadata: Metadata,
        fileUrl: URL,
        encryptedDataLength: UInt32,
        form: Upload.Form,
        existingSessionUrl: URL? = nil,
        signalService: OWSSignalServiceProtocol,
        fileSystem: Upload.Shims.FileSystem,
        dateProvider: @escaping DateProvider,
        logger: PrefixedLogger,
    ) async throws -> Upload.Attempt<Metadata> {
        let endpoint: UploadEndpoint = try {
            switch form.cdnNumber {
            case 2:
                return UploadEndpointCDN2(
                    form: form,
                    signalService: signalService,
                    fileSystem: fileSystem,
                    logger: logger,
                )
            case 3:
                return UploadEndpointCDN3(
                    form: form,
                    signalService: signalService,
                    fileSystem: fileSystem,
                    logger: logger,
                )
            default:
                throw OWSAssertionError("Unsupported Endpoint: \(form.cdnNumber)")
            }
        }()
        let uploadLocation = try await {
            if let existingSessionUrl {
                return existingSessionUrl
            }
            return try await endpoint.fetchResumableUploadLocation()
        }()
        return Upload.Attempt(
            cdnKey: form.cdnKey,
            cdnNumber: form.cdnNumber,
            fileUrl: fileUrl,
            encryptedDataLength: encryptedDataLength,
            localMetadata: localMetadata,
            beginTimestamp: dateProvider().ows_millisecondsSince1970,
            endpoint: endpoint,
            uploadLocation: uploadLocation,
            isResumedUpload: existingSessionUrl != nil,
            logger: logger,
        )
    }
}