Path: blob/main/SignalServiceKit/Network/ConnectionLock.swift
1 views
//
// Copyright 2025 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
struct ConnectionLock {
/// Byte offsets in this file are locked as follows:
/// - Byte 0: The connection lock.
/// - Bytes 1...(N-1): A lock indicating the process with priority N wants
/// the connection lock. (The byte for the lowest priority process (i.e.,
/// priority == priorityCount) isn't locked because it's not contested.)
private let fileLock: Result<FileLock, POSIXError>
private let priority: Int
private let priorityCount: Int
init(filePath: String, priority: Int, of priorityCount: Int) {
self.fileLock = Result(catching: { () throws(POSIXError) in
return try FileLock(filePath: filePath)
})
self.priority = priority
self.priorityCount = priorityCount
}
func close() {
try? self.fileLock.get().close()
}
struct HeldLock {
fileprivate var observerToken: DarwinNotificationCenter.ObserverToken?
}
/// Acquires a cross-process lock for this process.
///
/// When a more important (i.e., lower priority number) process requests a
/// lock, less important processes are notified (via `onInterrupt`) and are
/// expected to quickly release the lock.
///
/// - Throws: A POSIXError or a CancellationError
func lock(onInterrupt: (queue: DispatchQueue, callback: () -> Void)) async throws -> HeldLock {
let fileLock = try self.fileLock.get()
var observerToken: DarwinNotificationCenter.ObserverToken?
defer {
if let observerToken {
DarwinNotificationCenter.removeObserver(observerToken)
}
}
// If we're not the most important, listen for interruptions & make sure
// we're not racing with a more important process.
if self.priority > 1 {
observerToken = DarwinNotificationCenter.addObserver(
name: .connectionLock(for: self.priority),
queue: onInterrupt.queue,
block: { _ in onInterrupt.callback() },
)
// More important processes hold this lock from BEFORE they post a
// notification until AFTER they've acquired the connection lock. By
// immediately locking & unlocking, we either run before they acquire this
// lock (and will observe the notification they send) or strictly after
// they acquire this lock (and will fail to acquire the connection lock).
try await fileLock.lockWithCancellationHandler(range: 1..<self.priority)
do throws(POSIXError) {
try fileLock.unlock(range: 1..<self.priority)
} catch {
owsFail("Must be able to unlock held lock.")
}
}
// If we're more important than some other process, make sure that process
// doesn't miss our notification to disconnect. (See above comment.)
if self.priority < self.priorityCount {
try await fileLock.lock(at: self.priority)
for lessImportantPriority in (self.priority + 1)...self.priorityCount {
DarwinNotificationCenter.postNotification(name: .connectionLock(for: lessImportantPriority))
}
}
defer {
if self.priority < self.priorityCount {
do throws(POSIXError) {
try fileLock.unlock(at: self.priority)
} catch {
owsFail("Must be able to unlock held lock.")
}
}
}
try await fileLock.lockWithCancellationHandler(at: 0)
let result = HeldLock(observerToken: observerToken)
observerToken = nil
return result
}
func unlock(_ heldLock: HeldLock) {
if let observerToken = heldLock.observerToken {
DarwinNotificationCenter.removeObserver(observerToken)
}
do throws(POSIXError) {
try fileLock.get().unlock(at: 0)
} catch {
owsFail("Must be able to unlock held lock.")
}
}
}
private struct FileLock {
private let fd: Int32
init(filePath: String) throws(POSIXError) {
let result = open(filePath, O_RDWR | O_CREAT | O_TRUNC, 0o644)
guard result > 0 else {
throw errorForErrno()
}
self.fd = result
}
func close() {
_ = Darwin.close(self.fd)
}
/// See `man fcntl`.
private func fcntl(range: Range<Int>, shouldLock: Bool, shouldBlock: Bool) -> Result<Void, POSIXError> {
owsPrecondition(shouldLock || !shouldBlock)
var req = flock()
req.l_start = off_t(range.lowerBound)
req.l_len = off_t(range.upperBound - range.lowerBound)
req.l_whence = Int16(SEEK_SET)
req.l_type = Int16(shouldLock ? F_WRLCK : F_UNLCK)
let result = Darwin.fcntl(self.fd, shouldBlock ? F_SETLKW : F_SETLK, &req)
if result == -1 {
return .failure(errorForErrno())
}
return .success(())
}
func lock(at offset: Int) async throws(POSIXError) {
try await self.lock(range: offset..<(offset + 1))
}
func lock(range: Range<Int>) async throws(POSIXError) {
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, POSIXError>, Never>) in
// We're going to block, so don't block the cooperative thread pool.
DispatchQueue.global().async {
continuation.resume(returning: self.fcntl(range: range, shouldLock: true, shouldBlock: true))
}
}.get()
}
func lockWithCancellationHandler(at offset: Int, maxAveragePollingInterval: TimeInterval = 3) async throws {
try await self.lockWithCancellationHandler(range: offset..<(offset + 1), maxAveragePollingInterval: maxAveragePollingInterval)
}
func lockWithCancellationHandler(range: Range<Int>, maxAveragePollingInterval: TimeInterval = 3) async throws {
// In most cases, we don't expect contention. But it is possible, and we
// want to remain cancellable during times of contention, so we poll.
try await Retry.performWithBackoff(
maxAttempts: .max,
minAverageBackoff: 0.1,
maxAverageBackoff: maxAveragePollingInterval,
isRetryable: { $0.code == .EAGAIN },
block: { () throws(POSIXError) in
try self.tryLock(range: range)
},
)
}
func tryLock(at offset: Int) throws(POSIXError) {
try self.tryLock(range: offset..<(offset + 1))
}
func tryLock(range: Range<Int>) throws(POSIXError) {
try self.fcntl(range: range, shouldLock: true, shouldBlock: false).get()
}
func unlock(at offset: Int) throws(POSIXError) {
try self.unlock(range: offset..<(offset + 1))
}
func unlock(range: Range<Int>) throws(POSIXError) {
try self.fcntl(range: range, shouldLock: false, shouldBlock: false).get()
}
}
private func errorForErrno() -> POSIXError {
return POSIXError(POSIXErrorCode(rawValue: errno)!)
}