Path: blob/main/Signal/Backups/BackupAttachmentUploadTracker.swift
1 views
//
// Copyright 2025 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import SignalServiceKit
import SwiftUI
/// Manages async streams of `UploadUpdate`s, which represent the state and
/// progress of Backup Attachment uploads.
///
/// - SeeAlso `BackupAttachmentUploadQueueStatusManager`
/// - SeeAlso `BackupAttachmentUploadProgress`
///
/// - SeeAlso ``BackupAttachmentDownloadTracker``
final class BackupAttachmentUploadTracker {
struct UploadUpdate: Equatable {
enum State {
case uploading
/// - Note
/// We only include "attachments present locally when Backups was
/// enabled" in upload progress. So, we may have a non-empty "upload
/// queue" but have no uploads we need to track progress for; hence
/// the cagey name.
/// - SeeAlso `BackupAttachmentUploadProgress`
case noUploadsToReport
case suspended
case notRegisteredAndReady
case pausedLowBattery
case pausedLowPowerMode
case pausedNeedsWifi
case pausedNeedsInternet
case hasConsumedMediaTierCapacity
}
let state: State
var bytesUploaded: UInt64 { progress.completedUnitCount }
var totalBytesToUpload: UInt64 { progress.totalUnitCount }
var percentageUploaded: Float { progress.percentComplete }
private let progress: OWSProgress
init(state: State, bytesUploaded: UInt64, totalBytesToUpload: UInt64) {
self.init(state: state, progress: OWSProgress(
completedUnitCount: bytesUploaded,
totalUnitCount: totalBytesToUpload,
))
}
fileprivate init(state: State, progress: OWSProgress) {
self.state = state
self.progress = progress
}
static func ==(lhs: UploadUpdate, rhs: UploadUpdate) -> Bool {
return lhs.state == rhs.state
&& lhs.bytesUploaded == rhs.bytesUploaded
&& lhs.totalBytesToUpload == rhs.totalBytesToUpload
}
}
private let backupAttachmentUploadQueueStatusManager: BackupAttachmentUploadQueueStatusManager
private let backupAttachmentUploadProgress: BackupAttachmentUploadProgress
init(
backupAttachmentUploadQueueStatusManager: BackupAttachmentUploadQueueStatusManager,
backupAttachmentUploadProgress: BackupAttachmentUploadProgress,
) {
self.backupAttachmentUploadQueueStatusManager = backupAttachmentUploadQueueStatusManager
self.backupAttachmentUploadProgress = backupAttachmentUploadProgress
}
func updates() -> AsyncStream<UploadUpdate> {
return AsyncStream { continuation in
let tracker = Tracker(
backupAttachmentUploadQueueStatusManager: backupAttachmentUploadQueueStatusManager,
backupAttachmentUploadProgress: backupAttachmentUploadProgress,
continuation: continuation,
)
tracker.start()
continuation.onTermination = { reason in
switch reason {
case .cancelled:
tracker.stop()
case .finished:
owsFailDebug("How did we finish? We should've canceled first.")
@unknown default:
owsFailDebug("Unexpected continuation termination reason: \(reason)")
tracker.stop()
}
}
}
}
}
// MARK: -
private class Tracker {
typealias UploadUpdate = BackupAttachmentUploadTracker.UploadUpdate
private struct State {
var lastReportedUploadProgress: OWSProgress?
var lastReportedUploadQueueStatus: BackupAttachmentUploadQueueStatus?
var uploadQueueStatusObserver: NotificationCenter.Observer?
var uploadProgressObserver: BackupAttachmentUploadProgress.Observer?
let streamContinuation: AsyncStream<UploadUpdate>.Continuation
}
private let backupAttachmentUploadQueueStatusManager: BackupAttachmentUploadQueueStatusManager
private let backupAttachmentUploadProgress: BackupAttachmentUploadProgress
private let state: SeriallyAccessedState<State>
init(
backupAttachmentUploadQueueStatusManager: BackupAttachmentUploadQueueStatusManager,
backupAttachmentUploadProgress: BackupAttachmentUploadProgress,
continuation: AsyncStream<UploadUpdate>.Continuation,
) {
self.backupAttachmentUploadQueueStatusManager = backupAttachmentUploadQueueStatusManager
self.backupAttachmentUploadProgress = backupAttachmentUploadProgress
self.state = SeriallyAccessedState(State(
streamContinuation: continuation,
))
}
func start() {
state.enqueueUpdate { @MainActor [self] _state in
_state.uploadQueueStatusObserver = observeUploadQueueStatus()
}
}
func stop() {
state.enqueueUpdate { [self] _state in
if let uploadQueueStatusObserver = _state.uploadQueueStatusObserver {
NotificationCenter.default.removeObserver(uploadQueueStatusObserver)
}
if let uploadProgressObserver = _state.uploadProgressObserver {
await backupAttachmentUploadProgress.removeObserver(uploadProgressObserver)
}
_state.streamContinuation.finish()
}
}
// MARK: -
@MainActor
private func observeUploadQueueStatus() -> NotificationCenter.Observer {
// We only care about fullsize uploads, ignore thumbnails
let uploadQueueStatusObserver = NotificationCenter.default.addObserver(
name: .backupAttachmentUploadQueueStatusDidChange(for: .fullsize),
) { [weak self] notification in
guard let self else { return }
handleQueueStatusUpdate(
backupAttachmentUploadQueueStatusManager.currentStatus(for: .fullsize),
)
}
// Now that we're observing updates, handle the initial value as if we'd
// just gotten it in an update.
handleQueueStatusUpdate(
backupAttachmentUploadQueueStatusManager.beginObservingIfNecessary(for: .fullsize),
)
return uploadQueueStatusObserver
}
private func handleQueueStatusUpdate(
_ queueStatus: BackupAttachmentUploadQueueStatus,
) {
state.enqueueUpdate { [self] _state in
_state.lastReportedUploadQueueStatus = queueStatus
switch queueStatus {
case .empty:
yieldCurrentUploadUpdate(state: _state)
case
.running,
.noWifiReachability, .lowBattery, .lowPowerMode, .noReachability,
.notRegisteredAndReady, .appBackgrounded, .suspended, .hasConsumedMediaTierCapacity:
// The queue isn't empty, so attach a new progress observer.
//
// Progress observers snapshot and filter the queue's state, so
// any time the queue is non-empty we want to make sure we have
// an observer with a filtered-snapshot of the latest state.
//
// For example, when we first enable paid-tier Backups the queue
// starts empty and is populated when we run list-media for the
// first time.
//
// The observer we attach will yield an update, so we don't need
// to here.
if let existingObserver = _state.uploadProgressObserver {
await backupAttachmentUploadProgress.removeObserver(existingObserver)
}
_state.uploadProgressObserver = try? await backupAttachmentUploadProgress
.addObserver { [weak self] progressUpdate in
guard let self else { return }
handleUploadProgressUpdate(progressUpdate)
}
}
}
}
private func handleUploadProgressUpdate(_ uploadProgress: OWSProgress) {
state.enqueueUpdate { [self] _state in
_state.lastReportedUploadProgress = uploadProgress
yieldCurrentUploadUpdate(state: _state)
}
}
// MARK: -
private func yieldCurrentUploadUpdate(state: State) {
let streamContinuation = state.streamContinuation
guard
let lastReportedUploadProgress = state.lastReportedUploadProgress,
let lastReportedUploadQueueStatus = state.lastReportedUploadQueueStatus
else {
return
}
guard lastReportedUploadProgress.totalUnitCount > 0 else {
// If our "total bytes" to upload is zero, then regardless of the
// queue status we have nothing to report.
streamContinuation.yield(UploadUpdate(
state: .noUploadsToReport,
progress: lastReportedUploadProgress,
))
return
}
let uploadUpdateState: UploadUpdate.State
switch lastReportedUploadQueueStatus {
case .appBackgrounded:
// Don't emit an update when the app is backgrounded, so callers are
// left with the last update before backgrounding.
return
case .running:
uploadUpdateState = .uploading
case .suspended:
uploadUpdateState = .suspended
case .empty:
uploadUpdateState = .noUploadsToReport
case .notRegisteredAndReady:
uploadUpdateState = .notRegisteredAndReady
case .noReachability:
uploadUpdateState = .pausedNeedsInternet
case .noWifiReachability:
uploadUpdateState = .pausedNeedsWifi
case .lowBattery:
uploadUpdateState = .pausedLowBattery
case .lowPowerMode:
uploadUpdateState = .pausedLowPowerMode
case .hasConsumedMediaTierCapacity:
uploadUpdateState = .hasConsumedMediaTierCapacity
}
streamContinuation.yield(UploadUpdate(
state: uploadUpdateState,
progress: lastReportedUploadProgress,
))
}
}