Path: blob/main/SignalServiceKit/Jobs/JobRecordFinder.swift
1 views
//
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
import GRDB
public protocol JobRecordFinder<JobRecordType> {
associatedtype JobRecordType: JobRecord
/// Fetches a single JobRecord from the database.
///
/// Returns `nil` a JobRecord doesn't exist for `rowId`.
func fetchJob(rowId: JobRecord.RowId, tx: DBReadTransaction) throws -> JobRecordType?
/// Removes a single JobRecord from the database.
func removeJob(_ jobRecord: JobRecordType, tx: DBWriteTransaction)
/// Fetches all runnable jobs.
///
/// This method may use multiple transactions, may use write transactions,
/// may delete jobs that can't ever be run, etc.
///
/// It returns all jobs that can be run (and invokes the block for each job).
///
/// Conforming types should avoid long-running write transactions.
func loadRunnableJobs(updateRunnableJobRecord: @escaping (JobRecordType, DBWriteTransaction) -> Void) async throws -> [JobRecordType]
}
private enum Constants {
/// The number of JobRecords to fetch in a batch.
///
/// Most job queues won't ever have more than a few records at the same
/// time. Other times, a job queue may build up a huge backlog, and this
/// value can help prune it efficiently.
static let batchSize = 400
}
public class JobRecordFinderImpl<JobRecordType>: JobRecordFinder where JobRecordType: JobRecord {
private let db: any DB
public init(db: any DB) {
self.db = db
}
public func fetchJob(rowId: JobRecord.RowId, tx: DBReadTransaction) throws -> JobRecordType? {
do {
let db = tx.database
return try JobRecordType.fetchOne(db, key: rowId)
} catch {
throw error.grdbErrorForLogging
}
}
public func removeJob(_ jobRecord: JobRecordType, tx: DBWriteTransaction) {
jobRecord.anyRemove(transaction: tx)
}
public func loadRunnableJobs(updateRunnableJobRecord: @escaping (JobRecordType, DBWriteTransaction) -> Void) async throws -> [JobRecordType] {
var allRunnableJobs = [JobRecordType]()
var afterRowId: JobRecord.RowId?
while true {
let (runnableJobs, hasMoreAfterRowId) = try await db.awaitableWrite { tx in
try self.fetchAndPruneSomePersistedJobs(afterRowId: afterRowId, updateRunnableJobRecord: updateRunnableJobRecord, tx: tx)
}
allRunnableJobs.append(contentsOf: runnableJobs)
guard let hasMoreAfterRowId else {
break
}
afterRowId = hasMoreAfterRowId
}
return allRunnableJobs
}
private func fetchAndPruneSomePersistedJobs(
afterRowId: JobRecord.RowId?,
updateRunnableJobRecord: (JobRecordType, DBWriteTransaction) -> Void,
tx: DBWriteTransaction,
) throws -> ([JobRecordType], hasMoreAfterRowId: JobRecord.RowId?) {
let (jobs, hasMore) = try fetchSomeJobs(afterRowId: afterRowId, tx: tx)
var runnableJobs = [JobRecordType]()
for job in jobs {
let canRunJob: Bool = {
// TODO: Schedule a DB migration to fully obsolete these properties.
// This property is deprecated. If it's set, it means the job was created
// for a prior version of the application, and that version definitely
// can't be the current process.
if job.exclusiveProcessIdentifier != nil {
return false
}
// This property is deprecated. If it's set, it means that the job is for a
// deprecated type of message that doesn't need to be sent.
if (job as? MessageSenderJobRecord)?.removeMessageAfterSending == true {
return false
}
// If a job has failed or is obsolete, we can remove it. We previously
// distinguished `.ready` from `.running`, but now they're treated exactly
// the same when we restart existing jobs.
switch job.status {
case .unknown, .permanentlyFailed, .obsolete:
return false
case .ready, .running:
break
}
return true
}()
if canRunJob {
updateRunnableJobRecord(job, tx)
runnableJobs.append(job)
} else {
removeJob(job, tx: tx)
}
}
return (runnableJobs, hasMore ? jobs.last!.id! : nil)
}
private func fetchSomeJobs(
afterRowId: JobRecord.RowId?,
tx: DBReadTransaction,
) throws -> ([JobRecordType], hasMore: Bool) {
var sql = """
SELECT * FROM \(JobRecordType.databaseTableName)
WHERE "\(JobRecordType.columnName(.label))" = ?
"""
var arguments: StatementArguments = [JobRecordType.jobRecordType.jobRecordLabel]
if let afterRowId {
sql += """
AND \(JobRecordType.columnName(.id)) > ?
"""
arguments += [afterRowId]
}
sql += """
ORDER BY "\(JobRecordType.columnName(.id))"
LIMIT \(Constants.batchSize)
"""
do {
let db = tx.database
let results = try JobRecordType.fetchAll(db, sql: sql, arguments: arguments)
return (results, results.count == Constants.batchSize)
} catch {
throw error.grdbErrorForLogging
}
}
}