Path: blob/main/SignalServiceKit/Upload/AttachmentUploadManager.swift
1 views
//
// Copyright 2023 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
public import LibSignalClient
public protocol AttachmentUploadManager {
/// Upload a transient backup file that isn't an attachment (not saved to the database or sent).
func uploadBackup(
localUploadMetadata: Upload.EncryptedBackupUploadMetadata,
form: Upload.Form,
progress: OWSProgressSink?,
) async throws -> Upload.Result<Upload.EncryptedBackupUploadMetadata>
/// Upload a transient attachment that isn't saved to the database for sending.
func uploadTransientAttachment(
dataSource: DataSourcePath,
progress: OWSProgressSink?,
) async throws -> Upload.Result<Upload.LocalUploadMetadata>
/// Upload a transient link'n'sync attachment that isn't saved to the database for sending.
func uploadLinkNSyncAttachment(
dataSource: DataSourcePath,
progress: OWSProgressSink?,
) async throws -> Upload.Result<Upload.LinkNSyncUploadMetadata>
/// Upload an Attachment to the given endpoint.
/// Will fail if the attachment doesn't exist or isn't available locally.
func uploadTransitTierAttachment(
attachmentId: Attachment.IDType,
progress: OWSProgressSink?,
) async throws
/// Upload an attachment to the media tier (uploading to the transit tier if needed and copying to the media tier).
/// Will fail if the attachment doesn't exist or isn't available locally.
func uploadMediaTierAttachment(
attachmentId: Attachment.IDType,
uploadEra: String,
localAci: Aci,
backupKey: MediaRootBackupKey,
auth: BackupServiceAuth,
progress: OWSProgressSink?,
) async throws
/// Upload an attachment's thumbnail to the media tier (uploading to the transit tier and copying to the media tier).
/// Will fail if the attachment doesn't exist or isn't available locally.
func uploadMediaTierThumbnailAttachment(
attachmentId: Attachment.IDType,
uploadEra: String,
localAci: Aci,
backupKey: MediaRootBackupKey,
auth: BackupServiceAuth,
progress: OWSProgressSink?,
) async throws
}
public actor AttachmentUploadManagerImpl: AttachmentUploadManager {
private let accountKeyStore: AccountKeyStore
private let attachmentEncrypter: Upload.Shims.AttachmentEncrypter
private let attachmentStore: AttachmentStore
private let attachmentUploadStore: AttachmentUploadStore
private let attachmentThumbnailService: AttachmentThumbnailService
private let backupRequestManager: BackupRequestManager
private let chatConnectionManager: ChatConnectionManager
private let dateProvider: DateProvider
private let db: any DB
private let fileSystem: Upload.Shims.FileSystem
private let interactionStore: InteractionStore
private let remoteConfigProvider: any RemoteConfigProvider
private let signalService: OWSSignalServiceProtocol
private let sleepTimer: Upload.Shims.SleepTimer
private let storyStore: StoryStore
private struct ActiveUploadKey: Hashable {
let attachmentId: Attachment.IDType
let isThumbnailUpload: Bool
init(attachmentId: Attachment.IDType, uploadType: UploadType) {
self.attachmentId = attachmentId
switch uploadType {
case .transitTier:
self.isThumbnailUpload = false
case .mediaTier(_, let isThumbnail):
self.isThumbnailUpload = isThumbnail
}
}
}
// Map of active upload tasks.
private var activeUploads = [ActiveUploadKey: Task<(AttachmentUploadRecord, Upload.AttachmentResult), Error>]()
private enum UploadType {
case transitTier
case mediaTier(auth: BackupServiceAuth, isThumbnail: Bool)
var sourceType: AttachmentUploadRecord.SourceType {
switch self {
case .transitTier:
return .transit
case .mediaTier(_, let isThumbnail):
return isThumbnail ? .thumbnail : .media
}
}
}
public init(
accountKeyStore: AccountKeyStore,
attachmentEncrypter: Upload.Shims.AttachmentEncrypter,
attachmentStore: AttachmentStore,
attachmentUploadStore: AttachmentUploadStore,
attachmentThumbnailService: AttachmentThumbnailService,
backupRequestManager: BackupRequestManager,
chatConnectionManager: ChatConnectionManager,
dateProvider: @escaping DateProvider,
db: any DB,
fileSystem: Upload.Shims.FileSystem,
interactionStore: InteractionStore,
remoteConfigProvider: any RemoteConfigProvider,
signalService: OWSSignalServiceProtocol,
sleepTimer: Upload.Shims.SleepTimer,
storyStore: StoryStore,
) {
self.accountKeyStore = accountKeyStore
self.attachmentEncrypter = attachmentEncrypter
self.attachmentStore = attachmentStore
self.attachmentUploadStore = attachmentUploadStore
self.attachmentThumbnailService = attachmentThumbnailService
self.backupRequestManager = backupRequestManager
self.chatConnectionManager = chatConnectionManager
self.dateProvider = dateProvider
self.db = db
self.fileSystem = fileSystem
self.interactionStore = interactionStore
self.remoteConfigProvider = remoteConfigProvider
self.signalService = signalService
self.sleepTimer = sleepTimer
self.storyStore = storyStore
}
public func uploadBackup(
localUploadMetadata: Upload.EncryptedBackupUploadMetadata,
form: Upload.Form,
progress: OWSProgressSink?,
) async throws -> Upload.Result<Upload.EncryptedBackupUploadMetadata> {
let logger = PrefixedLogger(prefix: "[Upload]", suffix: "[backup]")
do {
let attempt = try await AttachmentUpload.buildAttempt(
for: localUploadMetadata,
form: form,
signalService: signalService,
fileSystem: fileSystem,
dateProvider: dateProvider,
logger: logger,
)
return try await AttachmentUpload.start(
attempt: attempt,
dateProvider: dateProvider,
sleepTimer: sleepTimer,
progress: progress,
)
} catch {
if error.isNetworkFailureOrTimeout {
logger.warn("Upload failed due to network error")
} else if error is CancellationError {
logger.warn("Upload cancelled")
} else {
if let statusCode = error.httpStatusCode {
logger.warn("Unexpected upload error [status: \(statusCode)]")
} else {
logger.warn("Unexpected upload error")
}
}
throw error
}
}
public func uploadTransientAttachment(
dataSource: DataSourcePath,
progress: OWSProgressSink?,
) async throws -> Upload.Result<Upload.LocalUploadMetadata> {
let logger = PrefixedLogger(prefix: "[Upload]", suffix: "[transient]")
let temporaryFile = fileSystem.temporaryFileUrl()
let sourceURL = dataSource.fileUrl
let metadata = try attachmentEncrypter.encryptAttachment(at: sourceURL, output: temporaryFile)
let localMetadata = try Upload.LocalUploadMetadata.validateAndBuild(fileUrl: temporaryFile, metadata: metadata)
let form = try await chatConnectionManager.withAuthService(.attachments) {
try await $0.getUploadForm(uploadSize: UInt64(localMetadata.encryptedDataLength)).asUploadForm()
}
do {
// We don't show progress for transient uploads
let attempt = try await AttachmentUpload.buildAttempt(
for: localMetadata,
form: form,
signalService: signalService,
fileSystem: fileSystem,
dateProvider: dateProvider,
logger: logger,
)
return try await AttachmentUpload.start(
attempt: attempt,
dateProvider: dateProvider,
sleepTimer: sleepTimer,
progress: nil,
)
} catch {
if error.isNetworkFailureOrTimeout {
logger.warn("Upload failed due to network error")
} else if error is CancellationError {
logger.warn("Upload cancelled")
} else {
if let statusCode = error.httpStatusCode {
logger.warn("Unexpected upload error [status: \(statusCode)]")
} else {
logger.warn("Unexpected upload error")
}
}
throw error
}
}
public func uploadLinkNSyncAttachment(
dataSource: DataSourcePath,
progress: OWSProgressSink?,
) async throws -> Upload.Result<Upload.LinkNSyncUploadMetadata> {
let logger = PrefixedLogger(prefix: "[Upload]", suffix: "[link'n'sync]")
let sourceURL = dataSource.fileUrl
guard let fileSize = try? dataSource.readLength(), fileSize > 0, let fileSize = UInt32(exactly: fileSize) else {
throw OWSAssertionError("invalid link n sync attachment size")
}
let metadata = Upload.LinkNSyncUploadMetadata(fileUrl: sourceURL, encryptedDataLength: fileSize)
let form = try await chatConnectionManager.withAuthService(.attachments) {
try await $0.getUploadForm(uploadSize: UInt64(metadata.encryptedDataLength)).asUploadForm()
}
do {
// We don't show progress for transient uploads
let attempt = try await AttachmentUpload.buildAttempt(
for: metadata,
form: form,
signalService: signalService,
fileSystem: fileSystem,
dateProvider: dateProvider,
logger: logger,
)
return try await AttachmentUpload.start(
attempt: attempt,
dateProvider: dateProvider,
sleepTimer: sleepTimer,
progress: progress,
)
} catch {
if error.isNetworkFailureOrTimeout {
logger.warn("Upload failed due to network error")
} else if error is CancellationError {
logger.warn("Upload cancelled")
} else {
if let statusCode = error.httpStatusCode {
logger.warn("Unexpected upload error [status: \(statusCode)]")
} else {
logger.warn("Unexpected upload error")
}
}
throw error
}
}
public func uploadTransitTierAttachment(
attachmentId: Attachment.IDType,
progress: OWSProgressSink?,
) async throws {
let logger = PrefixedLogger(prefix: "[Upload]", suffix: "[\(attachmentId)]")
let encryptedByteCount = db.read { tx in
return attachmentStore.fetch(id: attachmentId, tx: tx)?.streamInfo?.encryptedByteCount
} ?? 0
let progressSource = await progress?.addSource(
withLabel: "upload",
unitCount: UInt64(encryptedByteCount),
)
let wrappedProgress: OWSProgressSink = OWSProgress.createSink { [weak self] progressValue in
self?.updateProgress(id: attachmentId, progress: progressValue.percentComplete)
if let progressSource, progressSource.completedUnitCount < progressValue.completedUnitCount {
progressSource.incrementCompletedUnitCount(
by: progressValue.completedUnitCount - progressSource.completedUnitCount,
)
}
}
let (record, result) = try await uploadAttachment(
attachmentId: attachmentId,
type: .transitTier,
logger: logger,
progress: wrappedProgress,
)
// Update the attachment and associated messages with the success
// and clean up and left over upload state
await db.awaitableWrite { tx in
// Read the attachment fresh from the DB
guard
let attachmentStream = try? self.fetchAttachment(
attachmentId: attachmentId,
logger: logger,
tx: tx,
).asStream()
else {
logger.warn("Attachment deleted while uploading")
return
}
self.updateTransitTier(
attachmentStream: attachmentStream,
with: result,
logger: logger,
tx: tx,
)
self.cleanup(record: record, logger: logger, tx: tx)
}
}
public func uploadMediaTierAttachment(
attachmentId: Attachment.IDType,
uploadEra: String,
localAci: Aci,
backupKey: MediaRootBackupKey,
auth: BackupServiceAuth,
progress: OWSProgressSink?,
) async throws {
let logger = PrefixedLogger(prefix: "[MediaTierUpload]", suffix: "[\(attachmentId)]")
let (record, uploadResult) = try await uploadAttachment(
attachmentId: attachmentId,
type: .mediaTier(auth: auth, isThumbnail: false),
logger: logger,
progress: progress,
)
// Read the attachment fresh from the DB
guard
let attachmentStream = try? db.read(block: { try self.fetchAttachment(
attachmentId: attachmentId,
logger: logger,
tx: $0,
) }).asStream(),
let mediaName = attachmentStream.attachment.mediaName
else {
logger.warn("Attachment deleted while uploading")
return
}
let cdnNumber: UInt32
do {
cdnNumber = try await self.copyToMediaTier(
backupKey: backupKey,
auth: auth,
mediaName: mediaName,
uploadEra: uploadEra,
result: uploadResult,
logger: logger,
)
} catch let error as BackupArchive.Response.CopyToMediaTierError {
switch error {
case .sourceObjectNotFound, .badArgument:
let attachmentFileUrl = AttachmentStream.absoluteAttachmentFileURL(
relativeFilePath: attachmentStream.localRelativeFilePath,
)
let fileMissingOrEmpty: Bool
do {
fileMissingOrEmpty = try OWSFileSystem.fileSize(of: attachmentFileUrl) == 0
} catch {
fileMissingOrEmpty = true
}
await db.awaitableWrite { tx in
// Clean up the upload record; if we failed to copy
// we want to start an upload fresh next time.
self.cleanup(record: record, logger: logger, tx: tx)
guard let attachment = attachmentStore.fetch(id: attachmentStream.id, tx: tx) else {
return
}
if fileMissingOrEmpty {
logger.error("Missing attachment file!")
attachmentStore.markOffloaded(
attachment: attachment,
localRelativeFilePathThumbnail: nil,
tx: tx,
)
}
if
uploadResult.localUploadMetadata.isReusedTransitTierUpload,
let transitTierInfo = attachment.latestTransitTierInfo
{
// We reused a transit tier upload but the source couldn't be found.
// That transit tier upload is now invalid.
attachmentStore.removeTransitTierInfo(
transitTierInfo,
attachment: attachment,
tx: tx,
)
}
}
throw error
default:
throw error
}
} catch {
throw error
}
await db.awaitableWrite { tx in
// Refetch the attachment to ensure other fields are up-to-date.
guard let attachmentStream = attachmentStore.fetch(id: attachmentStream.id, tx: tx)?.asStream() else {
return
}
let mediaTierInfo = Attachment.MediaTierInfo(
cdnNumber: cdnNumber,
unencryptedByteCount: uploadResult.localUploadMetadata.plaintextDataLength,
sha256ContentHash: attachmentStream.sha256ContentHash,
// TODO: [Attachment Streaming] support incremental mac
incrementalMacInfo: nil,
uploadEra: uploadEra,
lastDownloadAttemptTimestamp: nil,
)
attachmentStore.saveMediaTierInfo(
attachment: attachmentStream.attachment,
mediaTierInfo: mediaTierInfo,
mediaName: attachmentStream.info.mediaName,
tx: tx,
)
// To upload to media tier, we also upload to transit tier and then perform a copy.
// We can save the transit tier info from that upload to the attachment, in some cases.
let shouldUpdateTransitTierInfo: Bool = {
guard let oldTransitTierInfo = attachmentStream.attachment.latestTransitTierInfo else {
// First case: we had no transit tier info; something is better than nothing.
return true
}
let nowMs = dateProvider().ows_millisecondsSince1970
if
nowMs > oldTransitTierInfo.uploadTimestamp,
nowMs - oldTransitTierInfo.uploadTimestamp > remoteConfigProvider.currentConfig().messageQueueTimeMs
{
// Second case: the transit tier info is old and expired.
return true
}
if
nowMs > oldTransitTierInfo.uploadTimestamp,
nowMs - oldTransitTierInfo.uploadTimestamp > UInt64(Upload.Constants.uploadReuseWindow * Double(MSEC_PER_SEC)),
oldTransitTierInfo.encryptionKey != attachmentStream.attachment.encryptionKey
{
// Third case: the transit tier info isn't expired, but exceeded its reuse window
// anyway so it wasn't going to be used for forwarding. May as well replace it
// with an upload that can be used for _something_.
return true
}
// Note: if the old transit tier info has been within the reuse window
// _and_ used the same encryption key, we wouldn't have reuploaded for
// this media tier copy so "oldTransitTierInfo" would be the very one
// we reused to create "uploadResult" without uploading to begin with.
return false
}()
if shouldUpdateTransitTierInfo {
let transitTierInfo = Attachment.TransitTierInfo(
cdnNumber: uploadResult.cdnNumber,
cdnKey: uploadResult.cdnKey,
uploadTimestamp: uploadResult.beginTimestamp,
encryptionKey: uploadResult.localUploadMetadata.key,
unencryptedByteCount: uploadResult.localUploadMetadata.plaintextDataLength,
// ALWAYS use digest for integrity check for uploaded attachments;
// we only allow sending using a digest integrity check not a plaintext hash
// so prefer digest if we have both. If we don't, this attachment we just
// uploaded will fail to send.
integrityCheck: .digestSHA256Ciphertext(uploadResult.localUploadMetadata.digest),
// TODO: [Attachment Streaming] support incremental mac
incrementalMacInfo: nil,
lastDownloadAttemptTimestamp: nil,
)
attachmentStore.saveLatestTransitTierInfo(
attachmentStream: attachmentStream,
transitTierInfo: transitTierInfo,
tx: tx,
)
}
self.cleanup(record: record, logger: logger, tx: tx)
}
}
public func uploadMediaTierThumbnailAttachment(
attachmentId: Attachment.IDType,
uploadEra: String,
localAci: Aci,
backupKey: MediaRootBackupKey,
auth: BackupServiceAuth,
progress: OWSProgressSink?,
) async throws {
let logger = PrefixedLogger(prefix: "[MediaTierThumbnailUpload]", suffix: "[\(attachmentId)]")
let (record, result) = try await uploadAttachment(
attachmentId: attachmentId,
type: .mediaTier(auth: auth, isThumbnail: true),
logger: logger,
progress: progress,
)
// Read the attachment fresh from the DB
guard
let attachmentStream = try? db.read(block: { try self.fetchAttachment(
attachmentId: attachmentId,
logger: logger,
tx: $0,
) }).asStream(),
let mediaName = attachmentStream.attachment.mediaName
else {
logger.warn("Attachment deleted while uploading")
return
}
let cdnNumber: UInt32
do {
cdnNumber = try await self.copyToMediaTier(
backupKey: backupKey,
auth: auth,
mediaName: AttachmentBackupThumbnail.thumbnailMediaName(fullsizeMediaName: mediaName),
uploadEra: uploadEra,
result: result,
logger: logger,
)
} catch let error as BackupArchive.Response.CopyToMediaTierError {
switch error {
case .sourceObjectNotFound:
await db.awaitableWrite { tx in
// Clean up the upload record; if we failed to copy
// we want to start an upload fresh.
self.cleanup(record: record, logger: logger, tx: tx)
}
throw error
default:
throw error
}
} catch {
throw error
}
await db.awaitableWrite { tx in
// Refetch the attachment to ensure other fields are up-to-date.
guard let attachmentStream = attachmentStore.fetch(id: attachmentStream.id, tx: tx)?.asStream() else {
return
}
let thumbnailInfo = Attachment.ThumbnailMediaTierInfo(
cdnNumber: cdnNumber,
uploadEra: uploadEra,
lastDownloadAttemptTimestamp: nil,
)
attachmentStore.saveMediaTierThumbnailInfo(
attachment: attachmentStream.attachment,
thumbnailMediaTierInfo: thumbnailInfo,
mediaName: attachmentStream.info.mediaName,
tx: tx,
)
self.cleanup(record: record, logger: logger, tx: tx)
}
}
/// Entry point for uploading an `AttachmentStream`
/// Fetches the `AttachmentStream`, fetches an upload form, builds the AttachmentUpload, begins the
/// upload, and updates the `AttachmentStream` upon success.
///
/// It is assumed any errors that could be retried or otherwise handled will have happend at a lower level,
/// so any error encountered here is considered unrecoverable and thrown to the caller.
///
/// Resumption of an active upload can be handled at a lower level, but if the endpoint returns an
/// error that requires a full restart, this is the method that will be called to fetch a new upload form and
/// rebuild the endpoint and upload state before trying again.
private func uploadAttachment(
attachmentId: Attachment.IDType,
type: UploadType,
logger: PrefixedLogger,
progress: OWSProgressSink?,
) async throws -> (record: AttachmentUploadRecord, result: Upload.AttachmentResult) {
let activeUploadKey = ActiveUploadKey(attachmentId: attachmentId, uploadType: type)
if let activeUpload = activeUploads[activeUploadKey] {
// If this fails, it means the internal retry logic has given up, so don't
// attempt any retries here
do {
return try await activeUpload.value
} catch {
return try await uploadAttachment(
attachmentId: attachmentId,
type: type,
logger: logger,
progress: progress,
)
}
}
let attachment = try db.read(block: { tx in
try fetchAttachment(attachmentId: attachmentId, logger: logger, tx: tx)
})
let uploadTask = Task {
defer {
// Clear out the active upload task once it finishes running.
activeUploads[activeUploadKey] = nil
}
// This task will only fail if a non-recoverable error is encountered, or the
// max number of retries is exhausted.
return try await self.upload(
attachment: attachment,
type: type,
logger: logger,
progress: progress,
)
}
// Add the active task to allow any additional scheduled uploads to reuse the same upload
activeUploads[activeUploadKey] = uploadTask
return try await uploadTask.value
}
private func upload(
attachment: Attachment,
type: UploadType,
logger: PrefixedLogger,
progress: OWSProgressSink?,
) async throws -> (AttachmentUploadRecord, Upload.AttachmentResult) {
let attachmentId = attachment.id
var updateRecord = false
var cleanupMetadata = false
// Fetch the record if it exists, or create a new one.
// Note this record isn't persisted in this method, so it will need to be saved later.
var attachmentUploadRecord = fetchOrCreateAttachmentRecord(
for: attachmentId,
sourceType: type.sourceType,
db: db,
)
// Fetch or build the LocalUploadMetadata.
// See `Attachment.transitUploadStrategy(dateProvider:)` for details on when metadata
// is reused vs. constructed new.
let localMetadata: Upload.LocalUploadMetadata
switch try await getOrFetchUploadMetadata(
attachment: attachment,
type: type,
record: attachmentUploadRecord,
logger: logger,
) {
case .existing(let metadata), .reuse(let metadata):
// Cached metadata is still good to use
localMetadata = metadata
if metadata.key != attachmentUploadRecord.localMetadata?.key {
// If we're using a different key, reset the metadata
// and start with a fresh upload form.
updateRecord = true
attachmentUploadRecord.localMetadata = metadata
attachmentUploadRecord.uploadForm = nil
attachmentUploadRecord.uploadFormTimestamp = nil
attachmentUploadRecord.uploadSessionUrl = nil
}
case .new(let metadata):
localMetadata = metadata
updateRecord = true
cleanupMetadata = true
// New metadata was constructed, so clear out the stale upload form.
attachmentUploadRecord.localMetadata = metadata
attachmentUploadRecord.uploadForm = nil
attachmentUploadRecord.uploadFormTimestamp = nil
attachmentUploadRecord.uploadSessionUrl = nil
case .alreadyUploaded(let metadata):
// No need to upload - Cleanup the upload record and return
return (
attachmentUploadRecord,
Upload.AttachmentResult(
cdnKey: metadata.cdnKey,
cdnNumber: metadata.cdnNumber,
localUploadMetadata: metadata,
beginTimestamp: dateProvider().ows_millisecondsSince1970,
finishTimestamp: dateProvider().ows_millisecondsSince1970,
),
)
}
/// Check for a cached upload form
/// This can be up to ~7 days old from the point of upload starting. Just to avoid running into any fuzzieness around the 7 day expiration, expire the form after 6 days
/// If the upload hasn't started, the form shouldn't be cached
let uploadForm: Upload.Form
if
let form = attachmentUploadRecord.uploadForm,
let formTimestamp = attachmentUploadRecord.uploadFormTimestamp,
// And we are still in the window to reuse it
dateProvider().timeIntervalSince(
Date(millisecondsSince1970: formTimestamp),
) <= Upload.Constants.uploadFormReuseWindow
{
uploadForm = form
} else {
updateRecord = true
switch type {
case .transitTier:
uploadForm = try await chatConnectionManager.withAuthService(.attachments) {
try await $0.getUploadForm(uploadSize: UInt64(localMetadata.encryptedDataLength)).asUploadForm()
}
case .mediaTier(let auth, _):
uploadForm = try await self.backupRequestManager
.fetchBackupMediaAttachmentUploadForm(
encryptedByteLength: localMetadata.encryptedDataLength,
auth: auth,
logger: logger,
)
}
attachmentUploadRecord.uploadForm = uploadForm
attachmentUploadRecord.uploadFormTimestamp = Date().ows_millisecondsSince1970
attachmentUploadRecord.uploadSessionUrl = nil
}
do {
let attempt = try await AttachmentUpload.buildAttempt(
for: localMetadata,
form: uploadForm,
existingSessionUrl: attachmentUploadRecord.uploadSessionUrl,
signalService: self.signalService,
fileSystem: self.fileSystem,
dateProvider: self.dateProvider,
logger: logger,
)
// The upload record has modified the metadata, upload form,
// or upload session URL, so persist it before beginning the upload.
if updateRecord || attachmentUploadRecord.uploadSessionUrl == nil {
await db.awaitableWrite { tx in
attachmentUploadRecord.uploadSessionUrl = attempt.uploadLocation
attachmentUploadStore.upsert(record: attachmentUploadRecord, tx: tx)
}
}
let result = try await AttachmentUpload.start(
attempt: attempt,
dateProvider: self.dateProvider,
sleepTimer: sleepTimer,
progress: progress,
)
// On success, cleanup the temp file. Temp files are only created for
// new local metadata, otherwise the existing attachment file location is used.
// TODO: Tie this in with OrphanedAttachmentRecord to track this
if cleanupMetadata {
if
localMetadata.fileUrl == (attachment.streamInfo?.localRelativeFilePath).map({
AttachmentStream.absoluteAttachmentFileURL(relativeFilePath: $0)
})
{
owsFailDebug("Attempting to delete attachment file!")
} else {
do {
try fileSystem.deleteFile(url: localMetadata.fileUrl)
} catch {
owsFailDebug("Error: \(error)")
}
}
}
return (attachmentUploadRecord, result.asAttachmentResult)
} catch {
// If the max number of upload failures was hit, give up and throw an error
if attachmentUploadRecord.attempt >= Upload.Constants.maxUploadAttempts {
await db.awaitableWrite { tx in
self.cleanup(record: attachmentUploadRecord, logger: logger, tx: tx)
}
throw error
}
// If an uploadFailure has percolated up to this layer, it means AttachmentUpload
// has failed in it's retries. Usually this means something with the form or
// metadata is in error or expired, so clear everything out and try again.
if case Upload.Error.uploadFailure = error {
// Only bump the attempt count if the upload failed. Don't bump for things
// like network issues
attachmentUploadRecord.attempt += 1
// If the error has made it here, something was encountered during upload that requires
// a full restart of the upload.
// This means at least throwing away the upload form, and just to be sure,
// throw away the local metadata as well.
attachmentUploadRecord.localMetadata = nil
attachmentUploadRecord.uploadForm = nil
attachmentUploadRecord.uploadSessionUrl = nil
await db.awaitableWrite { tx in
attachmentUploadStore.upsert(record: attachmentUploadRecord, tx: tx)
}
return try await upload(
attachment: attachment,
type: type,
logger: logger,
progress: progress,
)
} else if case Upload.Error.missingFile = error {
await db.awaitableWrite { tx in
if
let attachmentRelFilePath = attachment.streamInfo?.localRelativeFilePath,
// If the missing file matches the url of the primary attachment file,
// mark the whole attachment as not having a file anymore.
localMetadata.fileUrl == AttachmentStream.absoluteAttachmentFileURL(relativeFilePath: attachmentRelFilePath),
let attachment = attachmentStore.fetch(id: attachmentId, tx: tx)
{
logger.error("Primary attachment file missing!")
attachmentStore.markOffloaded(
attachment: attachment,
localRelativeFilePathThumbnail: nil,
tx: tx,
)
}
// Delete the upload record; whatever the state was we need to start over next time.
attachmentUploadStore.removeRecord(
for: attachmentUploadRecord.attachmentId,
sourceType: attachmentUploadRecord.sourceType,
tx: tx,
)
}
throw error
} else {
// Some other non-upload error was encountered - exit from the upload for now.
// Network failures or task cancellation shouldn't bump the attempt count, but
// any other error type should
if error.isNetworkFailureOrTimeout {
logger.warn("Upload failed due to network error")
} else if error is CancellationError {
logger.warn("Upload cancelled")
} else {
attachmentUploadRecord.attempt += 1
await db.awaitableWrite { tx in
attachmentUploadStore.upsert(record: attachmentUploadRecord, tx: tx)
}
if let statusCode = error.httpStatusCode {
logger.warn("Unexpected upload error [status: \(statusCode)]")
} else {
logger.warn("Unexpected upload error")
}
}
throw error
}
}
}
// MARK: - Helpers
private func fetchOrCreateAttachmentRecord(
for attachmentId: Attachment.IDType,
sourceType: AttachmentUploadRecord.SourceType,
db: any DB,
) -> AttachmentUploadRecord {
var attachmentUploadRecord: AttachmentUploadRecord
if
let record = db.read(block: { tx in
attachmentUploadStore.fetchAttachmentUploadRecord(for: attachmentId, sourceType: sourceType, tx: tx)
})
{
attachmentUploadRecord = record
} else {
attachmentUploadRecord = AttachmentUploadRecord(sourceType: sourceType, attachmentId: attachmentId)
}
return attachmentUploadRecord
}
private enum MetadataResult {
case new(Upload.LocalUploadMetadata)
case existing(Upload.LocalUploadMetadata)
case reuse(Upload.LocalUploadMetadata)
case alreadyUploaded(Upload.ReusedUploadMetadata)
}
private func getOrFetchUploadMetadata(
attachment: Attachment,
type: UploadType,
record: AttachmentUploadRecord,
logger: PrefixedLogger,
) async throws -> MetadataResult {
switch type {
case .mediaTier(_, isThumbnail: false):
// We never allow uploads of data we don't have locally.
guard let stream = attachment.asStream() else {
throw OWSGenericError("Attachment is not uploadable.")
}
let now: Date = dateProvider()
let messageQueueTime: TimeInterval = remoteConfigProvider.currentConfig().messageQueueTime
if
!Upload.disableTransitTierUploadReuse,
// We have an existing upload
let transitTierInfo = attachment.latestTransitTierInfo,
// It uses the same primary key (it isn't a reupload with a rotated key)
transitTierInfo.encryptionKey == attachment.encryptionKey,
// We expect it isn't expired
now.timeIntervalSince(Date(millisecondsSince1970: transitTierInfo.uploadTimestamp)) < messageQueueTime
{
// Reuse the existing transit tier upload without reuploading.
return .alreadyUploaded(.init(
cdnKey: transitTierInfo.cdnKey,
cdnNumber: transitTierInfo.cdnNumber,
key: attachment.encryptionKey,
digest: stream.encryptedFileSha256Digest,
plaintextDataLength: stream.unencryptedByteCount,
// This is the length from the stream, not the transit tier,
// but the length is the same regardless of the key used.
// Note we use this when copying to media tier (the server requires
// the blob length on CDN) which is somewhat brittle if our local
// padding length doesn't match the padding length from the sender
// on CDN. This only becomes a problem if we ever change padding
// values or they drift between clients.
encryptedDataLength: stream.encryptedByteCount,
))
} else {
let metadata = Upload.LocalUploadMetadata(
fileUrl: stream.fileURL,
key: attachment.encryptionKey,
digest: stream.info.digestSHA256Ciphertext,
encryptedDataLength: stream.info.encryptedByteCount,
plaintextDataLength: stream.info.unencryptedByteCount,
)
return .reuse(metadata)
}
case .mediaTier(_, isThumbnail: true):
// We never allow uploads of data we don't have locally.
guard
let stream = attachment.asStream(),
let mediaName = attachment.mediaName
else {
throw OWSGenericError("Attachment is not uploadable.")
}
let fileUrl = fileSystem.temporaryFileUrl()
guard let mrbk = db.read(block: { accountKeyStore.getMediaRootBackupKey(tx: $0) }) else {
throw OWSGenericError("Media tier upload missing root key.")
}
let encryptionKey = try mrbk.mediaEncryptionMetadata(
mediaName: AttachmentBackupThumbnail.thumbnailMediaName(fullsizeMediaName: mediaName),
type: .transitTierThumbnail,
)
guard
let thumbnailImage = await attachmentThumbnailService.thumbnailImage(
for: stream,
quality: .backupThumbnail,
)
else {
throw OWSGenericError("Unable to generate thumbnail; may not be visual media?")
}
let thumbnailData = try attachmentThumbnailService.backupThumbnailData(image: thumbnailImage)
let attachmentKey = try encryptionKey.attachmentKey()
let (encryptedThumbnailData, encryptedThumbnailMetadata) = try Cryptography.encrypt(
thumbnailData,
attachmentKey: attachmentKey,
applyExtraPadding: true,
)
// Write the thumbnail to the file.
try encryptedThumbnailData.write(to: fileUrl)
return .reuse(Upload.LocalUploadMetadata(
fileUrl: fileUrl,
key: attachmentKey.combinedKey,
digest: encryptedThumbnailMetadata.digest,
encryptedDataLength: UInt32(encryptedThumbnailData.count),
plaintextDataLength: UInt32(thumbnailData.count),
))
case .transitTier:
switch attachment.transitUploadStrategy(dateProvider: dateProvider) {
case .cannotUpload:
// Can't upload non-stream attachments; terminal failure.
throw OWSGenericError("Attachment is not uploadable.")
case .reuseExistingUpload(let metadata):
logger.debug("Attachment previously uploaded.")
return .alreadyUploaded(metadata)
case .reuseStreamEncryption(let metadata):
return .reuse(metadata)
case .freshUpload(let stream):
// Attempting to upload an existing attachment that's older than 3 days requires
// the attachment to be re-encrypted before upload.
// If this exists in the upload record from a prior attempt, use
// that file if it still exists.
if
let metadata = record.localMetadata,
// TODO:
// Currently, the file url is in a temp directory and doesn't
// persist across launches, so this will never be hit. The fix for this
// is to store the temporary file in a more persistent location, and
// register the file with the OrphanedAttachmentCleaner
OWSFileSystem.fileOrFolderExists(url: metadata.fileUrl)
{
return .existing(metadata)
} else {
let metadata = try buildMetadata(forUploading: stream)
return .new(metadata)
}
}
}
}
private func fetchAttachment(
attachmentId: Attachment.IDType,
logger: PrefixedLogger,
tx: DBReadTransaction,
) throws -> Attachment {
guard let attachment = attachmentStore.fetch(id: attachmentId, tx: tx) else {
// Not finding a local attachment is a terminal failure.
throw OWSGenericError("Missing attachment.")
}
return attachment
}
private func cleanup(record: AttachmentUploadRecord, logger: PrefixedLogger, tx: DBWriteTransaction) {
attachmentUploadStore.removeRecord(for: record.attachmentId, sourceType: record.sourceType, tx: tx)
}
// Update all the necessary places once the upload succeeds
private func updateTransitTier(
attachmentStream: AttachmentStream,
with result: Upload.AttachmentResult,
logger: PrefixedLogger,
tx: DBWriteTransaction,
) {
let transitTierInfo = Attachment.TransitTierInfo(
cdnNumber: result.cdnNumber,
cdnKey: result.cdnKey,
uploadTimestamp: result.beginTimestamp,
encryptionKey: result.localUploadMetadata.key,
unencryptedByteCount: result.localUploadMetadata.plaintextDataLength,
// ALWAYS use digest for integrity check for uploaded attachments;
// we only allow sending using a digest integrity check not a plaintext hash
// so prefer digest if we have both. If we don't, this attachment we just
// uploaded to be able to send will fail to send.
integrityCheck: .digestSHA256Ciphertext(result.localUploadMetadata.digest),
// TODO: [Attachment Streaming] support incremental mac
incrementalMacInfo: nil,
lastDownloadAttemptTimestamp: nil,
)
attachmentStore.saveLatestTransitTierInfo(
attachmentStream: attachmentStream,
transitTierInfo: transitTierInfo,
tx: tx,
)
attachmentStore.enumerateAllReferences(
toAttachmentId: attachmentStream.attachment.id,
tx: tx,
) { attachmentReference, _ in
switch attachmentReference.owner {
case .message(let messageSource):
guard
let interaction = self.interactionStore.fetchInteraction(
rowId: messageSource.messageRowId,
tx: tx,
)
else {
logger.warn("Missing interaction.")
return
}
self.db.touch(interaction: interaction, shouldReindex: false, tx: tx)
case .storyMessage(let storyMessageSource):
guard
let storyMessage = self.storyStore.fetchStoryMessage(
rowId: storyMessageSource.storyMessageRowId,
tx: tx,
)
else {
logger.warn("Missing story message.")
return
}
self.db.touch(storyMessage: storyMessage, tx: tx)
case .thread:
break
}
}
}
public func copyToMediaTier(
backupKey: MediaRootBackupKey,
auth: BackupServiceAuth,
mediaName: String,
uploadEra: String,
result: Upload.AttachmentResult,
logger: PrefixedLogger,
) async throws -> UInt32 {
let mediaEncryptionMetadata = try backupKey.mediaEncryptionMetadata(
mediaName: mediaName,
type: .outerLayerFullsizeOrThumbnail,
)
return try await backupRequestManager.copyToMediaTier(
item: .init(
sourceAttachment: .init(
cdn: result.cdnNumber,
key: result.cdnKey,
),
objectLength: result.localUploadMetadata.encryptedDataLength,
mediaId: mediaEncryptionMetadata.mediaId,
hmacKey: mediaEncryptionMetadata.hmacKey,
aesKey: mediaEncryptionMetadata.aesKey,
),
auth: auth,
logger: logger,
)
}
func buildMetadata(forUploading attachmentStream: AttachmentStream) throws -> Upload.LocalUploadMetadata {
// First we need to decrypt, so we can re-encrypt for upload.
let tmpDecryptedFile = fileSystem.temporaryFileUrl()
let decryptionMedatata = DecryptionMetadata(
key: try AttachmentKey(combinedKey: attachmentStream.attachment.encryptionKey),
// No need to validate for an already-validated stream
integrityCheck: .sha256ContentHash(attachmentStream.sha256ContentHash),
plaintextLength: UInt64(safeCast: attachmentStream.info.unencryptedByteCount),
)
try attachmentEncrypter.decryptAttachment(at: attachmentStream.fileURL, metadata: decryptionMedatata, output: tmpDecryptedFile)
// Now re-encrypt with a fresh set of keys.
// We use a tmp file on purpose; we already have the source file for the attachment
// and don't need to keep around this copy encrypted with different keys; its useful
// for upload only and can cleaned up by the OS after.
let tmpReencryptedFile = fileSystem.temporaryFileUrl()
let reencryptedMetadata = try attachmentEncrypter.encryptAttachment(at: tmpDecryptedFile, output: tmpReencryptedFile)
// we upload the re-encrypted file.
return try .validateAndBuild(fileUrl: tmpReencryptedFile, metadata: reencryptedMetadata)
}
private nonisolated func updateProgress(id: Attachment.IDType, progress: Float) {
NotificationCenter.default.postOnMainThread(
name: Upload.Constants.attachmentUploadProgressNotification,
object: nil,
userInfo: [
Upload.Constants.uploadProgressKey: progress,
Upload.Constants.uploadAttachmentIDKey: id,
],
)
}
}
extension Upload.Result where Metadata: AttachmentUploadMetadata {
var asAttachmentResult: Upload.AttachmentResult {
return Upload.AttachmentResult(
cdnKey: cdnKey,
cdnNumber: cdnNumber,
localUploadMetadata: localUploadMetadata,
beginTimestamp: beginTimestamp,
finishTimestamp: finishTimestamp,
)
}
}
extension Upload {
public static var disableTransitTierUploadReuse: Bool {
get { DebugFlags.internalSettings && UserDefaults.standard.bool(forKey: "disableTransitTierUploadReuse") }
set {
guard DebugFlags.internalSettings else { return }
UserDefaults.standard.set(newValue, forKey: "disableTransitTierUploadReuse")
}
}
}
extension Upload.Form {
public init(uploadForm: UploadForm) {
self.headers = HttpHeaders(httpHeaders: uploadForm.headers, overwriteOnConflict: true)
self.signedUploadLocation = uploadForm.signedUploadUrl.absoluteString
self.cdnKey = uploadForm.key
self.cdnNumber = uploadForm.cdn
}
}
extension UploadForm {
func asUploadForm() -> Upload.Form {
Upload.Form(uploadForm: self)
}
}