Path: blob/main/SignalServiceKit/Upload/UploadEndpointCDN2.swift
1 views
//
// Copyright 2023 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
struct UploadEndpointCDN2: UploadEndpoint {
private enum Constants {
static let maxUploadLocationRetries = 2
}
private let uploadForm: Upload.Form
private let signalService: OWSSignalServiceProtocol
private let fileSystem: Upload.Shims.FileSystem
private let logger: PrefixedLogger
init(
form: Upload.Form,
signalService: OWSSignalServiceProtocol,
fileSystem: Upload.Shims.FileSystem,
logger: PrefixedLogger,
) {
self.uploadForm = form
self.signalService = signalService
self.fileSystem = fileSystem
self.logger = logger
}
// This fetches a temporary URL that can be used for resumable uploads.
//
// See: https://cloud.google.com/storage/docs/performing-resumable-uploads#xml-api
// NOTE: follow the "XML API" instructions.
func fetchResumableUploadLocation() async throws -> URL {
return try await _fetchResumableUploadLocation(attemptCount: 0)
}
private func _fetchResumableUploadLocation(attemptCount: Int = 0) async throws -> URL {
if attemptCount > 0 {
logger.info("attemptCount: \(attemptCount)")
}
let urlSession = await signalService.sharedUrlSessionForCdn(cdnNumber: uploadForm.cdnNumber, maxResponseSize: nil)
let urlString = uploadForm.signedUploadLocation
guard urlString.lowercased().hasPrefix("http") else {
throw OWSAssertionError("Invalid signedUploadLocation.")
}
var headers = uploadForm.headers
// Remove host header.
headers["Host"] = nil
headers["Content-Length"] = "0"
headers["Content-Type"] = MimeType.applicationOctetStream.rawValue
do {
let response = try await urlSession.performRequest(
urlString,
method: .post,
headers: headers,
body: nil,
)
guard response.responseStatusCode == 201 else {
throw response.asError()
}
guard
let locationHeader = response.headers["location"],
locationHeader.lowercased().hasPrefix("http"),
let locationUrl = URL(string: locationHeader)
else {
throw OWSAssertionError("Invalid location header.")
}
return locationUrl
} catch {
guard error.isNetworkFailureOrTimeout else {
throw error
}
guard attemptCount <= Constants.maxUploadLocationRetries else {
logger.warn("No more retries: \(attemptCount). ")
throw error
}
return try await self._fetchResumableUploadLocation(attemptCount: attemptCount + 1)
}
}
// Determine how much has already been uploaded.
func getResumableUploadProgress<Metadata: UploadMetadata>(
attempt: Upload.Attempt<Metadata>,
) async throws -> Upload.ResumeProgress {
var headers = HttpHeaders()
headers["Content-Length"] = "0"
headers["Content-Range"] = "bytes */\(attempt.encryptedDataLength)"
let urlSession = await signalService.sharedUrlSessionForCdn(cdnNumber: uploadForm.cdnNumber, maxResponseSize: nil)
let response = try await urlSession.performRequest(
attempt.uploadLocation.absoluteString,
method: .put,
headers: headers,
body: nil,
)
let statusCode = response.responseStatusCode
switch statusCode {
case 200, 201:
// completed, so return 'OK' TODO:
return .complete
case 308:
break
default:
owsFailDebug("Invalid status code: \(statusCode).")
// Any other error should result in restarting the upload.
return .restart
}
// If you receive a '308 Resume Incomplete' response, it may or may
// not have a valid Range header. If the header is missing, resume
// the upload from the beginning.
//
// See: https://cloud.google.com/storage/docs/performing-resumable-uploads#status-check
let expectedPrefix = "bytes=0-"
guard
let rangeHeader = response.headers["range"],
rangeHeader.hasPrefix(expectedPrefix)
else {
// Return zero to restart the upload.
return .uploaded(0)
}
let rangeEndString = rangeHeader.suffix(rangeHeader.count - expectedPrefix.count)
guard
!rangeEndString.isEmpty,
let rangeEnd = Int(rangeEndString)
else {
logger.warn("Invalid Range header: \(rangeHeader) (\(rangeEndString)).")
// There was a range header present, but it was
// invalid - restart the upload from scratch
return .restart
}
// rangeEnd is the _index_ of the last uploaded bytes, e.g.:
// * 0 if 1 byte has been uploaded,
// * N if N+1 bytes have been uploaded.
let bytesAlreadyUploaded = rangeEnd + 1
logger.verbose("rangeEnd: \(rangeEnd), bytesAlreadyUploaded: \(bytesAlreadyUploaded).")
return .uploaded(bytesAlreadyUploaded)
}
func performUpload<Metadata: UploadMetadata>(
startPoint: Int,
attempt: Upload.Attempt<Metadata>,
progress: OWSProgressSource?,
) async throws(Upload.Error) {
let totalDataLength = attempt.encryptedDataLength
var headers = HttpHeaders()
let (uploadData, truncated) = try readUploadFileChunk(
fileSystem: fileSystem,
url: attempt.fileUrl,
startIndex: startPoint,
)
guard uploadData.count > 0 else {
attempt.logger.error("No data to upload")
return
}
headers["Content-Length"] = "\(uploadData.count)"
if truncated || startPoint > 0 {
// Example: Resuming after uploading 2359296 of 7351375 bytes.
// Content-Range: bytes 2359296-7351374/7351375
// Content-Length: 4992079
// Since this is an index into the range, subtract one from the byte count uploaded
// `truncated` (chunked) uploads should always add this header, even on the first request
headers["Content-Range"] = "bytes \(startPoint)-\(startPoint + uploadData.count - 1)/\(totalDataLength)"
}
do {
let urlSession = await signalService.sharedUrlSessionForCdn(cdnNumber: uploadForm.cdnNumber, maxResponseSize: nil)
let response = try await urlSession.performUpload(
attempt.uploadLocation.absoluteString,
method: .put,
headers: headers,
requestData: uploadData,
progressBlock: progress?.asProgressBlock() ?? { _, _ in },
)
switch response.responseStatusCode {
case 200, 201:
return
case 308 where truncated:
// The upload succeeded in uploading a chunk of data. Throw this error
// to the caller, which should trigger an immediate resume with the next chunk
throw Upload.Error.partialUpload(bytesUploaded: UInt32(clamping: uploadData.count))
default:
throw Upload.Error.unknown
}
} catch {
let retryMode: Upload.FailureMode.RetryMode = {
if
// Allow the server to override the default backoff with a specified value
let retryHeader = error.httpResponseHeaders?.value(forHeader: "retry-after"),
let delay = TimeInterval(retryHeader)
{
return .afterServerRequestedDelay(delay)
} else {
return .afterBackoff
}
}()
switch error {
case let error as Upload.Error:
throw error
case let error as OWSHTTPError where (500...599).contains(error.responseStatusCode):
// On 5XX errors, clients should try to resume the upload
attempt.logger.warn("Temporary upload failure [\(error.responseStatusCode)], retry.")
// Check for any progress here
throw Upload.Error.uploadFailure(recovery: .resume(retryMode))
case OWSHTTPError.networkFailure(let wrappedError):
let debugMessage = DebugFlags.internalLogging ? " Error: \(wrappedError.debugDescription)" : ""
if wrappedError.isTimeoutImpl {
attempt.logger.warn("Network timeout during upload.\(debugMessage)")
throw Upload.Error.networkTimeout
} else {
attempt.logger.warn("Network failure during upload.\(debugMessage)")
throw Upload.Error.networkError
}
default:
attempt.logger.warn("Unknown upload failure.")
throw Upload.Error.unknown
}
}
}
}