Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/cros_async/src/sync/mu.rs
5394 views
1
// Copyright 2020 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::cell::UnsafeCell;
6
use std::hint;
7
use std::mem;
8
use std::ops::Deref;
9
use std::ops::DerefMut;
10
use std::sync::atomic::AtomicUsize;
11
use std::sync::atomic::Ordering;
12
use std::sync::Arc;
13
use std::thread::yield_now;
14
15
use super::super::sync::waiter::Kind as WaiterKind;
16
use super::super::sync::waiter::Waiter;
17
use super::super::sync::waiter::WaiterAdapter;
18
use super::super::sync::waiter::WaiterList;
19
use super::super::sync::waiter::WaitingFor;
20
21
// Set when the rwlock is exclusively locked.
22
const LOCKED: usize = 1 << 0;
23
// Set when there are one or more threads waiting to acquire the lock.
24
const HAS_WAITERS: usize = 1 << 1;
25
// Set when a thread has been woken up from the wait queue. Cleared when that thread either acquires
26
// the lock or adds itself back into the wait queue. Used to prevent unnecessary wake ups when a
27
// thread has been removed from the wait queue but has not gotten CPU time yet.
28
const DESIGNATED_WAKER: usize = 1 << 2;
29
// Used to provide exclusive access to the `waiters` field in `RwLock`. Should only be held while
30
// modifying the waiter list.
31
const SPINLOCK: usize = 1 << 3;
32
// Set when a thread that wants an exclusive lock adds itself to the wait queue. New threads
33
// attempting to acquire a shared lock will be preventing from getting it when this bit is set.
34
// However, this bit is ignored once a thread has gone through the wait queue at least once.
35
const WRITER_WAITING: usize = 1 << 4;
36
// Set when a thread has gone through the wait queue many times but has failed to acquire the lock
37
// every time it is woken up. When this bit is set, all other threads are prevented from acquiring
38
// the lock until the thread that set the `LONG_WAIT` bit has acquired the lock.
39
const LONG_WAIT: usize = 1 << 5;
40
// The bit that is added to the rwlock state in order to acquire a shared lock. Since more than one
41
// thread can acquire a shared lock, we cannot use a single bit. Instead we use all the remaining
42
// bits in the state to track the number of threads that have acquired a shared lock.
43
const READ_LOCK: usize = 1 << 8;
44
// Mask used for checking if any threads currently hold a shared lock.
45
const READ_MASK: usize = !0xff;
46
47
// The number of times the thread should just spin and attempt to re-acquire the lock.
48
const SPIN_THRESHOLD: usize = 7;
49
50
// The number of times the thread needs to go through the wait queue before it sets the `LONG_WAIT`
51
// bit and forces all other threads to wait for it to acquire the lock. This value is set relatively
52
// high so that we don't lose the benefit of having running threads unless it is absolutely
53
// necessary.
54
const LONG_WAIT_THRESHOLD: usize = 19;
55
56
// Common methods between shared and exclusive locks.
57
trait Kind {
58
// The bits that must be zero for the thread to acquire this kind of lock. If any of these bits
59
// are not zero then the thread will first spin and retry a few times before adding itself to
60
// the wait queue.
61
fn zero_to_acquire() -> usize;
62
63
// The bit that must be added in order to acquire this kind of lock. This should either be
64
// `LOCKED` or `READ_LOCK`.
65
fn add_to_acquire() -> usize;
66
67
// The bits that should be set when a thread adds itself to the wait queue while waiting to
68
// acquire this kind of lock.
69
fn set_when_waiting() -> usize;
70
71
// The bits that should be cleared when a thread acquires this kind of lock.
72
fn clear_on_acquire() -> usize;
73
74
// The waiter that a thread should use when waiting to acquire this kind of lock.
75
fn new_waiter(raw: &RawRwLock) -> Arc<Waiter>;
76
}
77
78
// A lock type for shared read-only access to the data. More than one thread may hold this kind of
79
// lock simultaneously.
80
struct Shared;
81
82
impl Kind for Shared {
83
fn zero_to_acquire() -> usize {
84
LOCKED | WRITER_WAITING | LONG_WAIT
85
}
86
87
fn add_to_acquire() -> usize {
88
READ_LOCK
89
}
90
91
fn set_when_waiting() -> usize {
92
0
93
}
94
95
fn clear_on_acquire() -> usize {
96
0
97
}
98
99
fn new_waiter(raw: &RawRwLock) -> Arc<Waiter> {
100
Arc::new(Waiter::new(
101
WaiterKind::Shared,
102
cancel_waiter,
103
raw as *const RawRwLock as usize,
104
WaitingFor::Mutex,
105
))
106
}
107
}
108
109
// A lock type for mutually exclusive read-write access to the data. Only one thread can hold this
110
// kind of lock at a time.
111
struct Exclusive;
112
113
impl Kind for Exclusive {
114
fn zero_to_acquire() -> usize {
115
LOCKED | READ_MASK | LONG_WAIT
116
}
117
118
fn add_to_acquire() -> usize {
119
LOCKED
120
}
121
122
fn set_when_waiting() -> usize {
123
WRITER_WAITING
124
}
125
126
fn clear_on_acquire() -> usize {
127
WRITER_WAITING
128
}
129
130
fn new_waiter(raw: &RawRwLock) -> Arc<Waiter> {
131
Arc::new(Waiter::new(
132
WaiterKind::Exclusive,
133
cancel_waiter,
134
raw as *const RawRwLock as usize,
135
WaitingFor::Mutex,
136
))
137
}
138
}
139
140
// Scan `waiters` and return the ones that should be woken up. Also returns any bits that should be
141
// set in the rwlock state when the current thread releases the spin lock protecting the waiter
142
// list.
143
//
144
// If the first waiter is trying to acquire a shared lock, then all waiters in the list that are
145
// waiting for a shared lock are also woken up. If any waiters waiting for an exclusive lock are
146
// found when iterating through the list, then the returned `usize` contains the `WRITER_WAITING`
147
// bit, which should be set when the thread releases the spin lock.
148
//
149
// If the first waiter is trying to acquire an exclusive lock, then only that waiter is returned and
150
// no bits are set in the returned `usize`.
151
fn get_wake_list(waiters: &mut WaiterList) -> (WaiterList, usize) {
152
let mut to_wake = WaiterList::new(WaiterAdapter::new());
153
let mut set_on_release = 0;
154
let mut cursor = waiters.front_mut();
155
156
let mut waking_readers = false;
157
while let Some(w) = cursor.get() {
158
match w.kind() {
159
WaiterKind::Exclusive if !waking_readers => {
160
// This is the first waiter and it's a writer. No need to check the other waiters.
161
let waiter = cursor.remove().unwrap();
162
waiter.set_waiting_for(WaitingFor::None);
163
to_wake.push_back(waiter);
164
break;
165
}
166
167
WaiterKind::Shared => {
168
// This is a reader and the first waiter in the list was not a writer so wake up all
169
// the readers in the wait list.
170
let waiter = cursor.remove().unwrap();
171
waiter.set_waiting_for(WaitingFor::None);
172
to_wake.push_back(waiter);
173
waking_readers = true;
174
}
175
176
WaiterKind::Exclusive => {
177
// We found a writer while looking for more readers to wake up. Set the
178
// `WRITER_WAITING` bit to prevent any new readers from acquiring the lock. All
179
// readers currently in the wait list will ignore this bit since they already waited
180
// once.
181
set_on_release |= WRITER_WAITING;
182
cursor.move_next();
183
}
184
}
185
}
186
187
(to_wake, set_on_release)
188
}
189
190
#[inline]
191
fn cpu_relax(iterations: usize) {
192
for _ in 0..iterations {
193
hint::spin_loop();
194
}
195
}
196
197
pub(crate) struct RawRwLock {
198
state: AtomicUsize,
199
waiters: UnsafeCell<WaiterList>,
200
}
201
202
impl RawRwLock {
203
pub fn new() -> RawRwLock {
204
RawRwLock {
205
state: AtomicUsize::new(0),
206
waiters: UnsafeCell::new(WaiterList::new(WaiterAdapter::new())),
207
}
208
}
209
210
#[inline]
211
pub async fn lock(&self) {
212
match self
213
.state
214
.compare_exchange_weak(0, LOCKED, Ordering::Acquire, Ordering::Relaxed)
215
{
216
Ok(_) => {}
217
Err(oldstate) => {
218
// If any bits that should be zero are not zero or if we fail to acquire the lock
219
// with a single compare_exchange then go through the slow path.
220
if (oldstate & Exclusive::zero_to_acquire()) != 0
221
|| self
222
.state
223
.compare_exchange_weak(
224
oldstate,
225
(oldstate + Exclusive::add_to_acquire())
226
& !Exclusive::clear_on_acquire(),
227
Ordering::Acquire,
228
Ordering::Relaxed,
229
)
230
.is_err()
231
{
232
self.lock_slow::<Exclusive>(0, 0).await;
233
}
234
}
235
}
236
}
237
238
#[inline]
239
pub async fn read_lock(&self) {
240
match self
241
.state
242
.compare_exchange_weak(0, READ_LOCK, Ordering::Acquire, Ordering::Relaxed)
243
{
244
Ok(_) => {}
245
Err(oldstate) => {
246
if (oldstate & Shared::zero_to_acquire()) != 0
247
|| self
248
.state
249
.compare_exchange_weak(
250
oldstate,
251
(oldstate + Shared::add_to_acquire()) & !Shared::clear_on_acquire(),
252
Ordering::Acquire,
253
Ordering::Relaxed,
254
)
255
.is_err()
256
{
257
self.lock_slow::<Shared>(0, 0).await;
258
}
259
}
260
}
261
}
262
263
// Slow path for acquiring the lock. `clear` should contain any bits that need to be cleared
264
// when the lock is acquired. Any bits set in `zero_mask` are cleared from the bits returned by
265
// `K::zero_to_acquire()`.
266
#[cold]
267
async fn lock_slow<K: Kind>(&self, mut clear: usize, zero_mask: usize) {
268
let mut zero_to_acquire = K::zero_to_acquire() & !zero_mask;
269
270
let mut spin_count = 0;
271
let mut wait_count = 0;
272
let mut waiter = None;
273
loop {
274
let oldstate = self.state.load(Ordering::Relaxed);
275
// If all the bits in `zero_to_acquire` are actually zero then try to acquire the lock
276
// directly.
277
if (oldstate & zero_to_acquire) == 0 {
278
if self
279
.state
280
.compare_exchange_weak(
281
oldstate,
282
(oldstate + K::add_to_acquire()) & !(clear | K::clear_on_acquire()),
283
Ordering::Acquire,
284
Ordering::Relaxed,
285
)
286
.is_ok()
287
{
288
return;
289
}
290
} else if (oldstate & SPINLOCK) == 0 {
291
// The rwlock is locked and the spin lock is available. Try to add this thread to
292
// the waiter queue.
293
let w = waiter.get_or_insert_with(|| K::new_waiter(self));
294
w.reset(WaitingFor::Mutex);
295
296
if self
297
.state
298
.compare_exchange_weak(
299
oldstate,
300
(oldstate | SPINLOCK | HAS_WAITERS | K::set_when_waiting()) & !clear,
301
Ordering::Acquire,
302
Ordering::Relaxed,
303
)
304
.is_ok()
305
{
306
let mut set_on_release = 0;
307
308
if wait_count < LONG_WAIT_THRESHOLD {
309
// Add the waiter to the back of the queue.
310
// SAFETY:
311
// Safe because we have acquired the spin lock and it provides exclusive
312
// access to the waiter queue.
313
unsafe { (*self.waiters.get()).push_back(w.clone()) };
314
} else {
315
// This waiter has gone through the queue too many times. Put it in the
316
// front of the queue and block all other threads from acquiring the lock
317
// until this one has acquired it at least once.
318
// SAFETY:
319
// Safe because we have acquired the spin lock and it provides exclusive
320
// access to the waiter queue.
321
unsafe { (*self.waiters.get()).push_front(w.clone()) };
322
323
// Set the LONG_WAIT bit to prevent all other threads from acquiring the
324
// lock.
325
set_on_release |= LONG_WAIT;
326
327
// Make sure we clear the LONG_WAIT bit when we do finally get the lock.
328
clear |= LONG_WAIT;
329
330
// Since we set the LONG_WAIT bit we shouldn't allow that bit to prevent us
331
// from acquiring the lock.
332
zero_to_acquire &= !LONG_WAIT;
333
}
334
335
// Release the spin lock.
336
let mut state = oldstate;
337
loop {
338
match self.state.compare_exchange_weak(
339
state,
340
(state | set_on_release) & !SPINLOCK,
341
Ordering::Release,
342
Ordering::Relaxed,
343
) {
344
Ok(_) => break,
345
Err(w) => state = w,
346
}
347
}
348
349
// Now wait until we are woken.
350
w.wait().await;
351
352
// The `DESIGNATED_WAKER` bit gets set when this thread is woken up by the
353
// thread that originally held the lock. While this bit is set, no other waiters
354
// will be woken up so it's important to clear it the next time we try to
355
// acquire the main lock or the spin lock.
356
clear |= DESIGNATED_WAKER;
357
358
// Now that the thread has waited once, we no longer care if there is a writer
359
// waiting. Only the limits of mutual exclusion can prevent us from acquiring
360
// the lock.
361
zero_to_acquire &= !WRITER_WAITING;
362
363
// Reset the spin count since we just went through the wait queue.
364
spin_count = 0;
365
366
// Increment the wait count since we went through the wait queue.
367
wait_count += 1;
368
369
// Skip the `cpu_relax` below.
370
continue;
371
}
372
}
373
374
// Both the lock and the spin lock are held by one or more other threads. First, we'll
375
// spin a few times in case we can acquire the lock or the spin lock. If that fails then
376
// we yield because we might be preventing the threads that do hold the 2 locks from
377
// getting cpu time.
378
if spin_count < SPIN_THRESHOLD {
379
cpu_relax(1 << spin_count);
380
spin_count += 1;
381
} else {
382
yield_now();
383
}
384
}
385
}
386
387
#[inline]
388
pub fn unlock(&self) {
389
// Fast path, if possible. We can directly clear the locked bit since we have exclusive
390
// access to the rwlock.
391
let oldstate = self.state.fetch_sub(LOCKED, Ordering::Release);
392
393
// Panic if we just tried to unlock a rwlock that wasn't held by this thread. This shouldn't
394
// really be possible since `unlock` is not a public method.
395
debug_assert_eq!(
396
oldstate & READ_MASK,
397
0,
398
"`unlock` called on rwlock held in read-mode"
399
);
400
debug_assert_ne!(
401
oldstate & LOCKED,
402
0,
403
"`unlock` called on rwlock not held in write-mode"
404
);
405
406
if (oldstate & HAS_WAITERS) != 0 && (oldstate & DESIGNATED_WAKER) == 0 {
407
// The oldstate has waiters but no designated waker has been chosen yet.
408
self.unlock_slow();
409
}
410
}
411
412
#[inline]
413
pub fn read_unlock(&self) {
414
// Fast path, if possible. We can directly subtract the READ_LOCK bit since we had
415
// previously added it.
416
let oldstate = self.state.fetch_sub(READ_LOCK, Ordering::Release);
417
418
debug_assert_eq!(
419
oldstate & LOCKED,
420
0,
421
"`read_unlock` called on rwlock held in write-mode"
422
);
423
debug_assert_ne!(
424
oldstate & READ_MASK,
425
0,
426
"`read_unlock` called on rwlock not held in read-mode"
427
);
428
429
if (oldstate & HAS_WAITERS) != 0
430
&& (oldstate & DESIGNATED_WAKER) == 0
431
&& (oldstate & READ_MASK) == READ_LOCK
432
{
433
// There are waiters, no designated waker has been chosen yet, and the last reader is
434
// unlocking so we have to take the slow path.
435
self.unlock_slow();
436
}
437
}
438
439
#[cold]
440
fn unlock_slow(&self) {
441
let mut spin_count = 0;
442
443
loop {
444
let oldstate = self.state.load(Ordering::Relaxed);
445
if (oldstate & HAS_WAITERS) == 0 || (oldstate & DESIGNATED_WAKER) != 0 {
446
// No more waiters or a designated waker has been chosen. Nothing left for us to do.
447
return;
448
} else if (oldstate & SPINLOCK) == 0 {
449
// The spin lock is not held by another thread. Try to acquire it. Also set the
450
// `DESIGNATED_WAKER` bit since we are likely going to wake up one or more threads.
451
if self
452
.state
453
.compare_exchange_weak(
454
oldstate,
455
oldstate | SPINLOCK | DESIGNATED_WAKER,
456
Ordering::Acquire,
457
Ordering::Relaxed,
458
)
459
.is_ok()
460
{
461
// Acquired the spinlock. Try to wake a waiter. We may also end up wanting to
462
// clear the HAS_WAITER and DESIGNATED_WAKER bits so start collecting the bits
463
// to be cleared.
464
let mut clear = SPINLOCK;
465
466
// SAFETY:
467
// Safe because the spinlock guarantees exclusive access to the waiter list and
468
// the reference does not escape this function.
469
let waiters = unsafe { &mut *self.waiters.get() };
470
let (wake_list, set_on_release) = get_wake_list(waiters);
471
472
// If the waiter list is now empty, clear the HAS_WAITERS bit.
473
if waiters.is_empty() {
474
clear |= HAS_WAITERS;
475
}
476
477
if wake_list.is_empty() {
478
// Since we are not going to wake any waiters clear the DESIGNATED_WAKER bit
479
// that we set when we acquired the spin lock.
480
clear |= DESIGNATED_WAKER;
481
}
482
483
// Release the spin lock and clear any other bits as necessary. Also, set any
484
// bits returned by `get_wake_list`. For now, this is just the `WRITER_WAITING`
485
// bit, which needs to be set when we are waking up a bunch of readers and there
486
// are still writers in the wait queue. This will prevent any readers that
487
// aren't in `wake_list` from acquiring the read lock.
488
let mut state = oldstate;
489
loop {
490
match self.state.compare_exchange_weak(
491
state,
492
(state | set_on_release) & !clear,
493
Ordering::Release,
494
Ordering::Relaxed,
495
) {
496
Ok(_) => break,
497
Err(w) => state = w,
498
}
499
}
500
501
// Now wake the waiters, if any.
502
for w in wake_list {
503
w.wake();
504
}
505
506
// We're done.
507
return;
508
}
509
}
510
511
// Spin and try again. It's ok to block here as we have already released the lock.
512
if spin_count < SPIN_THRESHOLD {
513
cpu_relax(1 << spin_count);
514
spin_count += 1;
515
} else {
516
yield_now();
517
}
518
}
519
}
520
521
fn cancel_waiter(&self, waiter: &Waiter, wake_next: bool) {
522
let mut oldstate = self.state.load(Ordering::Relaxed);
523
while oldstate & SPINLOCK != 0
524
|| self
525
.state
526
.compare_exchange_weak(
527
oldstate,
528
oldstate | SPINLOCK,
529
Ordering::Acquire,
530
Ordering::Relaxed,
531
)
532
.is_err()
533
{
534
hint::spin_loop();
535
oldstate = self.state.load(Ordering::Relaxed);
536
}
537
538
// SAFETY:
539
// Safe because the spin lock provides exclusive access and the reference does not escape
540
// this function.
541
let waiters = unsafe { &mut *self.waiters.get() };
542
543
let mut clear = SPINLOCK;
544
545
// If we are about to remove the first waiter in the wait list, then clear the LONG_WAIT
546
// bit. Also clear the bit if we are going to be waking some other waiters. In this case the
547
// waiter that set the bit may have already been removed from the waiter list (and could be
548
// the one that is currently being dropped). If it is still in the waiter list then clearing
549
// this bit may starve it for one more iteration through the lock_slow() loop, whereas not
550
// clearing this bit could cause a deadlock if the waiter that set it is the one that is
551
// being dropped.
552
if wake_next
553
|| waiters
554
.front()
555
.get()
556
.map(|front| std::ptr::eq(front, waiter))
557
.unwrap_or(false)
558
{
559
clear |= LONG_WAIT;
560
}
561
562
let waiting_for = waiter.is_waiting_for();
563
564
// Don't drop the old waiter while holding the spin lock.
565
let old_waiter = if waiter.is_linked() && waiting_for == WaitingFor::Mutex {
566
// SAFETY:
567
// We know that the waiter is still linked and is waiting for the rwlock, which
568
// guarantees that it is still linked into `self.waiters`.
569
let mut cursor = unsafe { waiters.cursor_mut_from_ptr(waiter as *const Waiter) };
570
cursor.remove()
571
} else {
572
None
573
};
574
575
let (wake_list, set_on_release) = if wake_next || waiting_for == WaitingFor::None {
576
// Either the waiter was already woken or it's been removed from the rwlock's waiter
577
// list and is going to be woken. Either way, we need to wake up another thread.
578
get_wake_list(waiters)
579
} else {
580
(WaiterList::new(WaiterAdapter::new()), 0)
581
};
582
583
if waiters.is_empty() {
584
clear |= HAS_WAITERS;
585
}
586
587
if wake_list.is_empty() {
588
// We're not waking any other threads so clear the DESIGNATED_WAKER bit. In the worst
589
// case this leads to an additional thread being woken up but we risk a deadlock if we
590
// don't clear it.
591
clear |= DESIGNATED_WAKER;
592
}
593
594
if let WaiterKind::Exclusive = waiter.kind() {
595
// The waiter being dropped is a writer so clear the writer waiting bit for now. If we
596
// found more writers in the list while fetching waiters to wake up then this bit will
597
// be set again via `set_on_release`.
598
clear |= WRITER_WAITING;
599
}
600
601
while self
602
.state
603
.compare_exchange_weak(
604
oldstate,
605
(oldstate & !clear) | set_on_release,
606
Ordering::Release,
607
Ordering::Relaxed,
608
)
609
.is_err()
610
{
611
hint::spin_loop();
612
oldstate = self.state.load(Ordering::Relaxed);
613
}
614
615
for w in wake_list {
616
w.wake();
617
}
618
619
mem::drop(old_waiter);
620
}
621
}
622
623
// TODO(b/315998194): Add safety comment
624
#[allow(clippy::undocumented_unsafe_blocks)]
625
unsafe impl Send for RawRwLock {}
626
// TODO(b/315998194): Add safety comment
627
#[allow(clippy::undocumented_unsafe_blocks)]
628
unsafe impl Sync for RawRwLock {}
629
630
fn cancel_waiter(raw: usize, waiter: &Waiter, wake_next: bool) {
631
let raw_rwlock = raw as *const RawRwLock;
632
633
// SAFETY:
634
// Safe because the thread that owns the waiter that is being canceled must also own a reference
635
// to the rwlock, which ensures that this pointer is valid.
636
unsafe { (*raw_rwlock).cancel_waiter(waiter, wake_next) }
637
}
638
639
/// A high-level primitive that provides safe, mutable access to a shared resource.
640
///
641
/// `RwLock` safely provides both shared, immutable access (via `read_lock()`) as well as exclusive,
642
/// mutable access (via `lock()`) to an underlying resource asynchronously while ensuring fairness
643
/// with no loss of performance. If you don't need `read_lock()` nor fairness, try upstream
644
/// `futures::lock::Mutex` instead.
645
///
646
/// # Poisoning
647
///
648
/// `RwLock` does not support lock poisoning so if a thread panics while holding the lock, the
649
/// poisoned data will be accessible by other threads in your program. If you need to guarantee that
650
/// other threads cannot access poisoned data then you may wish to wrap this `RwLock` inside another
651
/// type that provides the poisoning feature. See the implementation of `std::sync::Mutex` for an
652
/// example of this. Note `futures::lock::Mutex` does not support poisoning either.
653
///
654
///
655
/// # Fairness
656
///
657
/// This `RwLock` implementation does not guarantee that threads will acquire the lock in the same
658
/// order that they call `lock()` or `read_lock()`. However it will attempt to prevent long-term
659
/// starvation: if a thread repeatedly fails to acquire the lock beyond a threshold then all other
660
/// threads will fail to acquire the lock until the starved thread has acquired it. Note, on the
661
/// other hand, `futures::lock::Mutex` does not guarantee fairness.
662
///
663
/// Similarly, this `RwLock` will attempt to balance reader and writer threads: once there is a
664
/// writer thread waiting to acquire the lock no new reader threads will be allowed to acquire it.
665
/// However, any reader threads that were already waiting will still be allowed to acquire it.
666
///
667
/// # Examples
668
///
669
/// ```edition2018
670
/// use std::sync::Arc;
671
/// use std::thread;
672
/// use std::sync::mpsc::channel;
673
///
674
/// use cros_async::{block_on, sync::RwLock};
675
///
676
/// const N: usize = 10;
677
///
678
/// // Spawn a few threads to increment a shared variable (non-atomically), and
679
/// // let the main thread know once all increments are done.
680
/// //
681
/// // Here we're using an Arc to share memory among threads, and the data inside
682
/// // the Arc is protected with a rwlock.
683
/// let data = Arc::new(RwLock::new(0));
684
///
685
/// let (tx, rx) = channel();
686
/// for _ in 0..N {
687
/// let (data, tx) = (Arc::clone(&data), tx.clone());
688
/// thread::spawn(move || {
689
/// // The shared state can only be accessed once the lock is held.
690
/// // Our non-atomic increment is safe because we're the only thread
691
/// // which can access the shared state when the lock is held.
692
/// let mut data = block_on(data.lock());
693
/// *data += 1;
694
/// if *data == N {
695
/// tx.send(()).unwrap();
696
/// }
697
/// // the lock is unlocked here when `data` goes out of scope.
698
/// });
699
/// }
700
///
701
/// rx.recv().unwrap();
702
/// ```
703
#[repr(align(128))]
704
pub struct RwLock<T: ?Sized> {
705
raw: RawRwLock,
706
value: UnsafeCell<T>,
707
}
708
709
impl<T> RwLock<T> {
710
/// Create a new, unlocked `RwLock` ready for use.
711
pub fn new(v: T) -> RwLock<T> {
712
RwLock {
713
raw: RawRwLock::new(),
714
value: UnsafeCell::new(v),
715
}
716
}
717
718
/// Consume the `RwLock` and return the contained value. This method does not perform any
719
/// locking as the compiler will guarantee that there are no other references to `self` and the
720
/// caller owns the `RwLock`.
721
pub fn into_inner(self) -> T {
722
// Don't need to acquire the lock because the compiler guarantees that there are
723
// no references to `self`.
724
self.value.into_inner()
725
}
726
}
727
728
impl<T: ?Sized> RwLock<T> {
729
/// Acquires exclusive, mutable access to the resource protected by the `RwLock`, blocking the
730
/// current thread until it is able to do so. Upon returning, the current thread will be the
731
/// only thread with access to the resource. The `RwLock` will be released when the returned
732
/// `RwLockWriteGuard` is dropped.
733
///
734
/// Calling `lock()` while holding a `RwLockWriteGuard` or a `RwLockReadGuard` will cause a
735
/// deadlock.
736
///
737
/// Callers that are not in an async context may wish to use the `block_on` method to block the
738
/// thread until the `RwLock` is acquired.
739
#[inline]
740
pub async fn lock(&self) -> RwLockWriteGuard<'_, T> {
741
self.raw.lock().await;
742
743
RwLockWriteGuard {
744
mu: self,
745
// SAFETY:
746
// Safe because we have exclusive access to `self.value`.
747
value: unsafe { &mut *self.value.get() },
748
}
749
}
750
751
/// Acquires shared, immutable access to the resource protected by the `RwLock`, blocking the
752
/// current thread until it is able to do so. Upon returning there may be other threads that
753
/// also have immutable access to the resource but there will not be any threads that have
754
/// mutable access to the resource. When the returned `RwLockReadGuard` is dropped the thread
755
/// releases its access to the resource.
756
///
757
/// Calling `read_lock()` while holding a `RwLockReadGuard` may deadlock. Calling `read_lock()`
758
/// while holding a `RwLockWriteGuard` will deadlock.
759
///
760
/// Callers that are not in an async context may wish to use the `block_on` method to block the
761
/// thread until the `RwLock` is acquired.
762
#[inline]
763
pub async fn read_lock(&self) -> RwLockReadGuard<'_, T> {
764
self.raw.read_lock().await;
765
766
RwLockReadGuard {
767
mu: self,
768
// SAFETY:
769
// Safe because we have shared read-only access to `self.value`.
770
value: unsafe { &*self.value.get() },
771
}
772
}
773
774
// Called from `Condvar::wait` when the thread wants to reacquire the lock.
775
#[inline]
776
pub(crate) async fn lock_from_cv(&self) -> RwLockWriteGuard<'_, T> {
777
self.raw.lock_slow::<Exclusive>(DESIGNATED_WAKER, 0).await;
778
779
RwLockWriteGuard {
780
mu: self,
781
// SAFETY:
782
// Safe because we have exclusive access to `self.value`.
783
value: unsafe { &mut *self.value.get() },
784
}
785
}
786
787
// Like `lock_from_cv` but for acquiring a shared lock.
788
#[inline]
789
pub(crate) async fn read_lock_from_cv(&self) -> RwLockReadGuard<'_, T> {
790
// Threads that have waited in the Condvar's waiter list don't have to care if there is a
791
// writer waiting since they have already waited once.
792
self.raw
793
.lock_slow::<Shared>(DESIGNATED_WAKER, WRITER_WAITING)
794
.await;
795
796
RwLockReadGuard {
797
mu: self,
798
// SAFETY:
799
// Safe because we have exclusive access to `self.value`.
800
value: unsafe { &*self.value.get() },
801
}
802
}
803
804
#[inline]
805
fn unlock(&self) {
806
self.raw.unlock();
807
}
808
809
#[inline]
810
fn read_unlock(&self) {
811
self.raw.read_unlock();
812
}
813
814
pub fn get_mut(&mut self) -> &mut T {
815
// SAFETY:
816
// Safe because the compiler statically guarantees that are no other references to `self`.
817
// This is also why we don't need to acquire the lock first.
818
unsafe { &mut *self.value.get() }
819
}
820
}
821
822
// TODO(b/315998194): Add safety comment
823
#[allow(clippy::undocumented_unsafe_blocks)]
824
unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
825
// TODO(b/315998194): Add safety comment
826
#[allow(clippy::undocumented_unsafe_blocks)]
827
unsafe impl<T: ?Sized + Send> Sync for RwLock<T> {}
828
829
impl<T: Default> Default for RwLock<T> {
830
fn default() -> Self {
831
Self::new(Default::default())
832
}
833
}
834
835
impl<T> From<T> for RwLock<T> {
836
fn from(source: T) -> Self {
837
Self::new(source)
838
}
839
}
840
841
/// An RAII implementation of a "scoped exclusive lock" for a `RwLock`. When this structure is
842
/// dropped, the lock will be released. The resource protected by the `RwLock` can be accessed via
843
/// the `Deref` and `DerefMut` implementations of this structure.
844
pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> {
845
mu: &'a RwLock<T>,
846
value: &'a mut T,
847
}
848
849
impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
850
pub(crate) fn into_inner(self) -> &'a RwLock<T> {
851
self.mu
852
}
853
854
pub(crate) fn as_raw_rwlock(&self) -> &RawRwLock {
855
&self.mu.raw
856
}
857
}
858
859
impl<T: ?Sized> Deref for RwLockWriteGuard<'_, T> {
860
type Target = T;
861
862
fn deref(&self) -> &Self::Target {
863
self.value
864
}
865
}
866
867
impl<T: ?Sized> DerefMut for RwLockWriteGuard<'_, T> {
868
fn deref_mut(&mut self) -> &mut Self::Target {
869
self.value
870
}
871
}
872
873
impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
874
fn drop(&mut self) {
875
self.mu.unlock()
876
}
877
}
878
879
/// An RAII implementation of a "scoped shared lock" for a `RwLock`. When this structure is dropped,
880
/// the lock will be released. The resource protected by the `RwLock` can be accessed via the
881
/// `Deref` implementation of this structure.
882
pub struct RwLockReadGuard<'a, T: ?Sized + 'a> {
883
mu: &'a RwLock<T>,
884
value: &'a T,
885
}
886
887
impl<'a, T: ?Sized> RwLockReadGuard<'a, T> {
888
pub(crate) fn into_inner(self) -> &'a RwLock<T> {
889
self.mu
890
}
891
892
pub(crate) fn as_raw_rwlock(&self) -> &RawRwLock {
893
&self.mu.raw
894
}
895
}
896
897
impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> {
898
type Target = T;
899
900
fn deref(&self) -> &Self::Target {
901
self.value
902
}
903
}
904
905
impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
906
fn drop(&mut self) {
907
self.mu.read_unlock()
908
}
909
}
910
911
// TODO(b/194338842): Fix tests for windows
912
#[cfg(any(target_os = "android", target_os = "linux"))]
913
#[cfg(test)]
914
mod test {
915
use std::future::Future;
916
use std::mem;
917
use std::pin::Pin;
918
use std::rc::Rc;
919
use std::sync::atomic::AtomicUsize;
920
use std::sync::atomic::Ordering;
921
use std::sync::mpsc::channel;
922
use std::sync::mpsc::Sender;
923
use std::sync::Arc;
924
use std::task::Context;
925
use std::task::Poll;
926
use std::task::Waker;
927
use std::thread;
928
use std::time::Duration;
929
930
use futures::channel::oneshot;
931
use futures::pending;
932
use futures::select;
933
use futures::task::waker_ref;
934
use futures::task::ArcWake;
935
use futures::FutureExt;
936
use futures_executor::LocalPool;
937
use futures_executor::ThreadPool;
938
use futures_util::task::LocalSpawnExt;
939
940
use super::super::super::block_on;
941
use super::super::super::sync::Condvar;
942
use super::super::super::sync::SpinLock;
943
use super::*;
944
945
#[derive(Debug, Eq, PartialEq)]
946
struct NonCopy(u32);
947
948
// Dummy waker used when we want to manually drive futures.
949
struct TestWaker;
950
impl ArcWake for TestWaker {
951
fn wake_by_ref(_arc_self: &Arc<Self>) {}
952
}
953
954
#[test]
955
fn it_works() {
956
let mu = RwLock::new(NonCopy(13));
957
958
assert_eq!(*block_on(mu.lock()), NonCopy(13));
959
}
960
961
#[test]
962
fn smoke() {
963
let mu = RwLock::new(NonCopy(7));
964
965
mem::drop(block_on(mu.lock()));
966
mem::drop(block_on(mu.lock()));
967
}
968
969
#[test]
970
fn rw_smoke() {
971
let mu = RwLock::new(NonCopy(7));
972
973
mem::drop(block_on(mu.lock()));
974
mem::drop(block_on(mu.read_lock()));
975
mem::drop((block_on(mu.read_lock()), block_on(mu.read_lock())));
976
mem::drop(block_on(mu.lock()));
977
}
978
979
#[test]
980
fn async_smoke() {
981
async fn lock(mu: Rc<RwLock<NonCopy>>) {
982
mu.lock().await;
983
}
984
985
async fn read_lock(mu: Rc<RwLock<NonCopy>>) {
986
mu.read_lock().await;
987
}
988
989
async fn double_read_lock(mu: Rc<RwLock<NonCopy>>) {
990
let first = mu.read_lock().await;
991
mu.read_lock().await;
992
993
// Make sure first lives past the second read lock.
994
first.as_raw_rwlock();
995
}
996
997
let mu = Rc::new(RwLock::new(NonCopy(7)));
998
999
let mut ex = LocalPool::new();
1000
let spawner = ex.spawner();
1001
1002
spawner
1003
.spawn_local(lock(Rc::clone(&mu)))
1004
.expect("Failed to spawn future");
1005
spawner
1006
.spawn_local(read_lock(Rc::clone(&mu)))
1007
.expect("Failed to spawn future");
1008
spawner
1009
.spawn_local(double_read_lock(Rc::clone(&mu)))
1010
.expect("Failed to spawn future");
1011
spawner
1012
.spawn_local(lock(Rc::clone(&mu)))
1013
.expect("Failed to spawn future");
1014
1015
ex.run();
1016
}
1017
1018
#[test]
1019
fn send() {
1020
let mu = RwLock::new(NonCopy(19));
1021
1022
thread::spawn(move || {
1023
let value = block_on(mu.lock());
1024
assert_eq!(*value, NonCopy(19));
1025
})
1026
.join()
1027
.unwrap();
1028
}
1029
1030
#[test]
1031
fn arc_nested() {
1032
// Tests nested rwlocks and access to underlying data.
1033
let mu = RwLock::new(1);
1034
let arc = Arc::new(RwLock::new(mu));
1035
thread::spawn(move || {
1036
let nested = block_on(arc.lock());
1037
let lock2 = block_on(nested.lock());
1038
assert_eq!(*lock2, 1);
1039
})
1040
.join()
1041
.unwrap();
1042
}
1043
1044
#[test]
1045
fn arc_access_in_unwind() {
1046
let arc = Arc::new(RwLock::new(1));
1047
let arc2 = arc.clone();
1048
thread::spawn(move || {
1049
struct Unwinder {
1050
i: Arc<RwLock<i32>>,
1051
}
1052
impl Drop for Unwinder {
1053
fn drop(&mut self) {
1054
*block_on(self.i.lock()) += 1;
1055
}
1056
}
1057
let _u = Unwinder { i: arc2 };
1058
panic!();
1059
})
1060
.join()
1061
.expect_err("thread did not panic");
1062
let lock = block_on(arc.lock());
1063
assert_eq!(*lock, 2);
1064
}
1065
1066
#[test]
1067
fn unsized_value() {
1068
let rwlock: &RwLock<[i32]> = &RwLock::new([1, 2, 3]);
1069
{
1070
let b = &mut *block_on(rwlock.lock());
1071
b[0] = 4;
1072
b[2] = 5;
1073
}
1074
let expected: &[i32] = &[4, 2, 5];
1075
assert_eq!(&*block_on(rwlock.lock()), expected);
1076
}
1077
#[test]
1078
fn high_contention() {
1079
const THREADS: usize = 17;
1080
const ITERATIONS: usize = 103;
1081
1082
let mut threads = Vec::with_capacity(THREADS);
1083
1084
let mu = Arc::new(RwLock::new(0usize));
1085
for _ in 0..THREADS {
1086
let mu2 = mu.clone();
1087
threads.push(thread::spawn(move || {
1088
for _ in 0..ITERATIONS {
1089
*block_on(mu2.lock()) += 1;
1090
}
1091
}));
1092
}
1093
1094
for t in threads.into_iter() {
1095
t.join().unwrap();
1096
}
1097
1098
assert_eq!(*block_on(mu.read_lock()), THREADS * ITERATIONS);
1099
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1100
}
1101
1102
#[test]
1103
fn high_contention_with_cancel() {
1104
const TASKS: usize = 17;
1105
const ITERATIONS: usize = 103;
1106
1107
async fn increment(mu: Arc<RwLock<usize>>, alt_mu: Arc<RwLock<usize>>, tx: Sender<()>) {
1108
for _ in 0..ITERATIONS {
1109
select! {
1110
mut count = mu.lock().fuse() => *count += 1,
1111
mut count = alt_mu.lock().fuse() => *count += 1,
1112
}
1113
}
1114
tx.send(()).expect("Failed to send completion signal");
1115
}
1116
1117
let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1118
1119
let mu = Arc::new(RwLock::new(0usize));
1120
let alt_mu = Arc::new(RwLock::new(0usize));
1121
1122
let (tx, rx) = channel();
1123
for _ in 0..TASKS {
1124
ex.spawn_ok(increment(Arc::clone(&mu), Arc::clone(&alt_mu), tx.clone()));
1125
}
1126
1127
for _ in 0..TASKS {
1128
if let Err(e) = rx.recv_timeout(Duration::from_secs(10)) {
1129
panic!("Error while waiting for threads to complete: {e}");
1130
}
1131
}
1132
1133
assert_eq!(
1134
*block_on(mu.read_lock()) + *block_on(alt_mu.read_lock()),
1135
TASKS * ITERATIONS
1136
);
1137
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1138
assert_eq!(alt_mu.raw.state.load(Ordering::Relaxed), 0);
1139
}
1140
1141
#[test]
1142
fn single_thread_async() {
1143
const TASKS: usize = 17;
1144
const ITERATIONS: usize = 103;
1145
1146
// Async closures are unstable.
1147
async fn increment(mu: Rc<RwLock<usize>>) {
1148
for _ in 0..ITERATIONS {
1149
*mu.lock().await += 1;
1150
}
1151
}
1152
1153
let mut ex = LocalPool::new();
1154
let spawner = ex.spawner();
1155
1156
let mu = Rc::new(RwLock::new(0usize));
1157
for _ in 0..TASKS {
1158
spawner
1159
.spawn_local(increment(Rc::clone(&mu)))
1160
.expect("Failed to spawn task");
1161
}
1162
1163
ex.run();
1164
1165
assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
1166
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1167
}
1168
1169
#[test]
1170
fn multi_thread_async() {
1171
const TASKS: usize = 17;
1172
const ITERATIONS: usize = 103;
1173
1174
// Async closures are unstable.
1175
async fn increment(mu: Arc<RwLock<usize>>, tx: Sender<()>) {
1176
for _ in 0..ITERATIONS {
1177
*mu.lock().await += 1;
1178
}
1179
tx.send(()).expect("Failed to send completion signal");
1180
}
1181
1182
let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1183
1184
let mu = Arc::new(RwLock::new(0usize));
1185
let (tx, rx) = channel();
1186
for _ in 0..TASKS {
1187
ex.spawn_ok(increment(Arc::clone(&mu), tx.clone()));
1188
}
1189
1190
for _ in 0..TASKS {
1191
rx.recv_timeout(Duration::from_secs(5))
1192
.expect("Failed to receive completion signal");
1193
}
1194
assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
1195
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1196
}
1197
1198
#[test]
1199
fn get_mut() {
1200
let mut mu = RwLock::new(NonCopy(13));
1201
*mu.get_mut() = NonCopy(17);
1202
1203
assert_eq!(mu.into_inner(), NonCopy(17));
1204
}
1205
1206
#[test]
1207
fn into_inner() {
1208
let mu = RwLock::new(NonCopy(29));
1209
assert_eq!(mu.into_inner(), NonCopy(29));
1210
}
1211
1212
#[test]
1213
fn into_inner_drop() {
1214
struct NeedsDrop(Arc<AtomicUsize>);
1215
impl Drop for NeedsDrop {
1216
fn drop(&mut self) {
1217
self.0.fetch_add(1, Ordering::AcqRel);
1218
}
1219
}
1220
1221
let value = Arc::new(AtomicUsize::new(0));
1222
let needs_drop = RwLock::new(NeedsDrop(value.clone()));
1223
assert_eq!(value.load(Ordering::Acquire), 0);
1224
1225
{
1226
let inner = needs_drop.into_inner();
1227
assert_eq!(inner.0.load(Ordering::Acquire), 0);
1228
}
1229
1230
assert_eq!(value.load(Ordering::Acquire), 1);
1231
}
1232
1233
#[test]
1234
fn rw_arc() {
1235
const THREADS: isize = 7;
1236
const ITERATIONS: isize = 13;
1237
1238
let mu = Arc::new(RwLock::new(0isize));
1239
let mu2 = mu.clone();
1240
1241
let (tx, rx) = channel();
1242
thread::spawn(move || {
1243
let mut guard = block_on(mu2.lock());
1244
for _ in 0..ITERATIONS {
1245
let tmp = *guard;
1246
*guard = -1;
1247
thread::yield_now();
1248
*guard = tmp + 1;
1249
}
1250
tx.send(()).unwrap();
1251
});
1252
1253
let mut readers = Vec::with_capacity(10);
1254
for _ in 0..THREADS {
1255
let mu3 = mu.clone();
1256
let handle = thread::spawn(move || {
1257
let guard = block_on(mu3.read_lock());
1258
assert!(*guard >= 0);
1259
});
1260
1261
readers.push(handle);
1262
}
1263
1264
// Wait for the readers to finish their checks.
1265
for r in readers {
1266
r.join().expect("One or more readers saw a negative value");
1267
}
1268
1269
// Wait for the writer to finish.
1270
rx.recv_timeout(Duration::from_secs(5)).unwrap();
1271
assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1272
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1273
}
1274
1275
#[test]
1276
fn rw_single_thread_async() {
1277
// A Future that returns `Poll::pending` the first time it is polled and `Poll::Ready` every
1278
// time after that.
1279
struct TestFuture {
1280
polled: bool,
1281
waker: Arc<SpinLock<Option<Waker>>>,
1282
}
1283
1284
impl Future for TestFuture {
1285
type Output = ();
1286
1287
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1288
if self.polled {
1289
Poll::Ready(())
1290
} else {
1291
self.polled = true;
1292
*self.waker.lock() = Some(cx.waker().clone());
1293
Poll::Pending
1294
}
1295
}
1296
}
1297
1298
fn wake_future(waker: Arc<SpinLock<Option<Waker>>>) {
1299
loop {
1300
if let Some(w) = waker.lock().take() {
1301
w.wake();
1302
return;
1303
}
1304
1305
// This sleep cannot be moved into an else branch because we would end up holding
1306
// the lock while sleeping due to rust's drop ordering rules.
1307
thread::sleep(Duration::from_millis(10));
1308
}
1309
}
1310
1311
async fn writer(mu: Rc<RwLock<isize>>) {
1312
let mut guard = mu.lock().await;
1313
for _ in 0..ITERATIONS {
1314
let tmp = *guard;
1315
*guard = -1;
1316
let waker = Arc::new(SpinLock::new(None));
1317
let waker2 = Arc::clone(&waker);
1318
thread::spawn(move || wake_future(waker2));
1319
let fut = TestFuture {
1320
polled: false,
1321
waker,
1322
};
1323
fut.await;
1324
*guard = tmp + 1;
1325
}
1326
}
1327
1328
async fn reader(mu: Rc<RwLock<isize>>) {
1329
let guard = mu.read_lock().await;
1330
assert!(*guard >= 0);
1331
}
1332
1333
const TASKS: isize = 7;
1334
const ITERATIONS: isize = 13;
1335
1336
let mu = Rc::new(RwLock::new(0isize));
1337
let mut ex = LocalPool::new();
1338
let spawner = ex.spawner();
1339
1340
spawner
1341
.spawn_local(writer(Rc::clone(&mu)))
1342
.expect("Failed to spawn writer");
1343
1344
for _ in 0..TASKS {
1345
spawner
1346
.spawn_local(reader(Rc::clone(&mu)))
1347
.expect("Failed to spawn reader");
1348
}
1349
1350
ex.run();
1351
1352
assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1353
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1354
}
1355
1356
#[test]
1357
fn rw_multi_thread_async() {
1358
async fn writer(mu: Arc<RwLock<isize>>, tx: Sender<()>) {
1359
let mut guard = mu.lock().await;
1360
for _ in 0..ITERATIONS {
1361
let tmp = *guard;
1362
*guard = -1;
1363
thread::yield_now();
1364
*guard = tmp + 1;
1365
}
1366
1367
mem::drop(guard);
1368
tx.send(()).unwrap();
1369
}
1370
1371
async fn reader(mu: Arc<RwLock<isize>>, tx: Sender<()>) {
1372
let guard = mu.read_lock().await;
1373
assert!(*guard >= 0);
1374
1375
mem::drop(guard);
1376
tx.send(()).expect("Failed to send completion message");
1377
}
1378
1379
const TASKS: isize = 7;
1380
const ITERATIONS: isize = 13;
1381
1382
let mu = Arc::new(RwLock::new(0isize));
1383
let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1384
1385
let (txw, rxw) = channel();
1386
ex.spawn_ok(writer(Arc::clone(&mu), txw));
1387
1388
let (txr, rxr) = channel();
1389
for _ in 0..TASKS {
1390
ex.spawn_ok(reader(Arc::clone(&mu), txr.clone()));
1391
}
1392
1393
// Wait for the readers to finish their checks.
1394
for _ in 0..TASKS {
1395
rxr.recv_timeout(Duration::from_secs(5))
1396
.expect("Failed to receive completion message from reader");
1397
}
1398
1399
// Wait for the writer to finish.
1400
rxw.recv_timeout(Duration::from_secs(5))
1401
.expect("Failed to receive completion message from writer");
1402
1403
assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1404
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1405
}
1406
1407
#[test]
1408
fn wake_all_readers() {
1409
async fn read(mu: Arc<RwLock<()>>) {
1410
let g = mu.read_lock().await;
1411
pending!();
1412
mem::drop(g);
1413
}
1414
1415
async fn write(mu: Arc<RwLock<()>>) {
1416
mu.lock().await;
1417
}
1418
1419
let mu = Arc::new(RwLock::new(()));
1420
let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
1421
Box::pin(read(mu.clone())),
1422
Box::pin(read(mu.clone())),
1423
Box::pin(read(mu.clone())),
1424
Box::pin(write(mu.clone())),
1425
Box::pin(read(mu.clone())),
1426
];
1427
const NUM_READERS: usize = 4;
1428
1429
let arc_waker = Arc::new(TestWaker);
1430
let waker = waker_ref(&arc_waker);
1431
let mut cx = Context::from_waker(&waker);
1432
1433
// Acquire the lock so that the futures cannot get it.
1434
let g = block_on(mu.lock());
1435
1436
for r in &mut futures {
1437
if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
1438
panic!("future unexpectedly ready");
1439
}
1440
}
1441
1442
assert_eq!(
1443
mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1444
HAS_WAITERS
1445
);
1446
1447
assert_eq!(
1448
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1449
WRITER_WAITING
1450
);
1451
1452
// Drop the lock. This should allow all readers to make progress. Since they already waited
1453
// once they should ignore the WRITER_WAITING bit that is currently set.
1454
mem::drop(g);
1455
for r in &mut futures {
1456
if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
1457
panic!("future unexpectedly ready");
1458
}
1459
}
1460
1461
// Check that all readers were able to acquire the lock.
1462
assert_eq!(
1463
mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1464
READ_LOCK * NUM_READERS
1465
);
1466
assert_eq!(
1467
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1468
WRITER_WAITING
1469
);
1470
1471
let mut needs_poll = None;
1472
1473
// All the readers can now finish but the writer needs to be polled again.
1474
for (i, r) in futures.iter_mut().enumerate() {
1475
match r.as_mut().poll(&mut cx) {
1476
Poll::Ready(()) => {}
1477
Poll::Pending => {
1478
if needs_poll.is_some() {
1479
panic!("More than one future unable to complete");
1480
}
1481
needs_poll = Some(i);
1482
}
1483
}
1484
}
1485
1486
if futures[needs_poll.expect("Writer unexpectedly able to complete")]
1487
.as_mut()
1488
.poll(&mut cx)
1489
.is_pending()
1490
{
1491
panic!("Writer unable to complete");
1492
}
1493
1494
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1495
}
1496
1497
#[test]
1498
fn long_wait() {
1499
async fn tight_loop(mu: Arc<RwLock<bool>>) {
1500
loop {
1501
let ready = mu.lock().await;
1502
if *ready {
1503
break;
1504
}
1505
pending!();
1506
}
1507
}
1508
1509
async fn mark_ready(mu: Arc<RwLock<bool>>) {
1510
*mu.lock().await = true;
1511
}
1512
1513
let mu = Arc::new(RwLock::new(false));
1514
let mut tl = Box::pin(tight_loop(mu.clone()));
1515
let mut mark = Box::pin(mark_ready(mu.clone()));
1516
1517
let arc_waker = Arc::new(TestWaker);
1518
let waker = waker_ref(&arc_waker);
1519
let mut cx = Context::from_waker(&waker);
1520
1521
for _ in 0..=LONG_WAIT_THRESHOLD {
1522
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1523
panic!("tight_loop unexpectedly ready");
1524
}
1525
1526
if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1527
panic!("mark_ready unexpectedly ready");
1528
}
1529
}
1530
1531
assert_eq!(
1532
mu.raw.state.load(Ordering::Relaxed),
1533
LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1534
);
1535
1536
// This time the tight loop will fail to acquire the lock.
1537
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1538
panic!("tight_loop unexpectedly ready");
1539
}
1540
1541
// Which will finally allow the mark_ready function to make progress.
1542
if mark.as_mut().poll(&mut cx).is_pending() {
1543
panic!("mark_ready not able to make progress");
1544
}
1545
1546
// Now the tight loop will finish.
1547
if tl.as_mut().poll(&mut cx).is_pending() {
1548
panic!("tight_loop not able to finish");
1549
}
1550
1551
assert!(*block_on(mu.lock()));
1552
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1553
}
1554
1555
#[test]
1556
fn cancel_long_wait_before_wake() {
1557
async fn tight_loop(mu: Arc<RwLock<bool>>) {
1558
loop {
1559
let ready = mu.lock().await;
1560
if *ready {
1561
break;
1562
}
1563
pending!();
1564
}
1565
}
1566
1567
async fn mark_ready(mu: Arc<RwLock<bool>>) {
1568
*mu.lock().await = true;
1569
}
1570
1571
let mu = Arc::new(RwLock::new(false));
1572
let mut tl = Box::pin(tight_loop(mu.clone()));
1573
let mut mark = Box::pin(mark_ready(mu.clone()));
1574
1575
let arc_waker = Arc::new(TestWaker);
1576
let waker = waker_ref(&arc_waker);
1577
let mut cx = Context::from_waker(&waker);
1578
1579
for _ in 0..=LONG_WAIT_THRESHOLD {
1580
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1581
panic!("tight_loop unexpectedly ready");
1582
}
1583
1584
if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1585
panic!("mark_ready unexpectedly ready");
1586
}
1587
}
1588
1589
assert_eq!(
1590
mu.raw.state.load(Ordering::Relaxed),
1591
LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1592
);
1593
1594
// Now drop the mark_ready future, which should clear the LONG_WAIT bit.
1595
mem::drop(mark);
1596
assert_eq!(mu.raw.state.load(Ordering::Relaxed), LOCKED);
1597
1598
mem::drop(tl);
1599
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1600
}
1601
1602
#[test]
1603
fn cancel_long_wait_after_wake() {
1604
async fn tight_loop(mu: Arc<RwLock<bool>>) {
1605
loop {
1606
let ready = mu.lock().await;
1607
if *ready {
1608
break;
1609
}
1610
pending!();
1611
}
1612
}
1613
1614
async fn mark_ready(mu: Arc<RwLock<bool>>) {
1615
*mu.lock().await = true;
1616
}
1617
1618
let mu = Arc::new(RwLock::new(false));
1619
let mut tl = Box::pin(tight_loop(mu.clone()));
1620
let mut mark = Box::pin(mark_ready(mu.clone()));
1621
1622
let arc_waker = Arc::new(TestWaker);
1623
let waker = waker_ref(&arc_waker);
1624
let mut cx = Context::from_waker(&waker);
1625
1626
for _ in 0..=LONG_WAIT_THRESHOLD {
1627
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1628
panic!("tight_loop unexpectedly ready");
1629
}
1630
1631
if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1632
panic!("mark_ready unexpectedly ready");
1633
}
1634
}
1635
1636
assert_eq!(
1637
mu.raw.state.load(Ordering::Relaxed),
1638
LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1639
);
1640
1641
// This time the tight loop will fail to acquire the lock.
1642
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1643
panic!("tight_loop unexpectedly ready");
1644
}
1645
1646
// Now drop the mark_ready future, which should clear the LONG_WAIT bit.
1647
mem::drop(mark);
1648
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & LONG_WAIT, 0);
1649
1650
// Since the lock is not held, we should be able to spawn a future to set the ready flag.
1651
block_on(mark_ready(mu.clone()));
1652
1653
// Now the tight loop will finish.
1654
if tl.as_mut().poll(&mut cx).is_pending() {
1655
panic!("tight_loop not able to finish");
1656
}
1657
1658
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1659
}
1660
1661
#[test]
1662
fn designated_waker() {
1663
async fn inc(mu: Arc<RwLock<usize>>) {
1664
*mu.lock().await += 1;
1665
}
1666
1667
let mu = Arc::new(RwLock::new(0));
1668
1669
let mut futures = [
1670
Box::pin(inc(mu.clone())),
1671
Box::pin(inc(mu.clone())),
1672
Box::pin(inc(mu.clone())),
1673
];
1674
1675
let arc_waker = Arc::new(TestWaker);
1676
let waker = waker_ref(&arc_waker);
1677
let mut cx = Context::from_waker(&waker);
1678
1679
let count = block_on(mu.lock());
1680
1681
// Poll 2 futures. Since neither will be able to acquire the lock, they should get added to
1682
// the waiter list.
1683
if let Poll::Ready(()) = futures[0].as_mut().poll(&mut cx) {
1684
panic!("future unexpectedly ready");
1685
}
1686
if let Poll::Ready(()) = futures[1].as_mut().poll(&mut cx) {
1687
panic!("future unexpectedly ready");
1688
}
1689
1690
assert_eq!(
1691
mu.raw.state.load(Ordering::Relaxed),
1692
LOCKED | HAS_WAITERS | WRITER_WAITING,
1693
);
1694
1695
// Now drop the lock. This should set the DESIGNATED_WAKER bit and wake up the first future
1696
// in the wait list.
1697
mem::drop(count);
1698
1699
assert_eq!(
1700
mu.raw.state.load(Ordering::Relaxed),
1701
DESIGNATED_WAKER | HAS_WAITERS | WRITER_WAITING,
1702
);
1703
1704
// Now poll the third future. It should be able to acquire the lock immediately.
1705
if futures[2].as_mut().poll(&mut cx).is_pending() {
1706
panic!("future unable to complete");
1707
}
1708
assert_eq!(*block_on(mu.lock()), 1);
1709
1710
// There should still be a waiter in the wait list and the DESIGNATED_WAKER bit should still
1711
// be set.
1712
assert_eq!(
1713
mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
1714
DESIGNATED_WAKER
1715
);
1716
assert_eq!(
1717
mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1718
HAS_WAITERS
1719
);
1720
1721
// Now let the future that was woken up run.
1722
if futures[0].as_mut().poll(&mut cx).is_pending() {
1723
panic!("future unable to complete");
1724
}
1725
assert_eq!(*block_on(mu.lock()), 2);
1726
1727
if futures[1].as_mut().poll(&mut cx).is_pending() {
1728
panic!("future unable to complete");
1729
}
1730
assert_eq!(*block_on(mu.lock()), 3);
1731
1732
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1733
}
1734
1735
#[test]
1736
fn cancel_designated_waker() {
1737
async fn inc(mu: Arc<RwLock<usize>>) {
1738
*mu.lock().await += 1;
1739
}
1740
1741
let mu = Arc::new(RwLock::new(0));
1742
1743
let mut fut = Box::pin(inc(mu.clone()));
1744
1745
let arc_waker = Arc::new(TestWaker);
1746
let waker = waker_ref(&arc_waker);
1747
let mut cx = Context::from_waker(&waker);
1748
1749
let count = block_on(mu.lock());
1750
1751
if let Poll::Ready(()) = fut.as_mut().poll(&mut cx) {
1752
panic!("Future unexpectedly ready when lock is held");
1753
}
1754
1755
// Drop the lock. This will wake up the future.
1756
mem::drop(count);
1757
1758
// Now drop the future without polling. This should clear all the state in the rwlock.
1759
mem::drop(fut);
1760
1761
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1762
}
1763
1764
#[test]
1765
fn cancel_before_wake() {
1766
async fn inc(mu: Arc<RwLock<usize>>) {
1767
*mu.lock().await += 1;
1768
}
1769
1770
let mu = Arc::new(RwLock::new(0));
1771
1772
let mut fut1 = Box::pin(inc(mu.clone()));
1773
1774
let mut fut2 = Box::pin(inc(mu.clone()));
1775
1776
let arc_waker = Arc::new(TestWaker);
1777
let waker = waker_ref(&arc_waker);
1778
let mut cx = Context::from_waker(&waker);
1779
1780
// First acquire the lock.
1781
let count = block_on(mu.lock());
1782
1783
// Now poll the futures. Since the lock is acquired they will both get queued in the waiter
1784
// list.
1785
match fut1.as_mut().poll(&mut cx) {
1786
Poll::Pending => {}
1787
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1788
}
1789
1790
match fut2.as_mut().poll(&mut cx) {
1791
Poll::Pending => {}
1792
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1793
}
1794
1795
assert_eq!(
1796
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1797
WRITER_WAITING
1798
);
1799
1800
// Drop fut1. This should remove it from the waiter list but shouldn't wake fut2.
1801
mem::drop(fut1);
1802
1803
// There should be no designated waker.
1804
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER, 0);
1805
1806
// Since the waiter was a writer, we should clear the WRITER_WAITING bit.
1807
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
1808
1809
match fut2.as_mut().poll(&mut cx) {
1810
Poll::Pending => {}
1811
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1812
}
1813
1814
// Now drop the lock. This should mark fut2 as ready to make progress.
1815
mem::drop(count);
1816
1817
match fut2.as_mut().poll(&mut cx) {
1818
Poll::Pending => panic!("Future is not ready to make progress"),
1819
Poll::Ready(()) => {}
1820
}
1821
1822
// Verify that we only incremented the count once.
1823
assert_eq!(*block_on(mu.lock()), 1);
1824
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1825
}
1826
1827
#[test]
1828
fn cancel_after_wake() {
1829
async fn inc(mu: Arc<RwLock<usize>>) {
1830
*mu.lock().await += 1;
1831
}
1832
1833
let mu = Arc::new(RwLock::new(0));
1834
1835
let mut fut1 = Box::pin(inc(mu.clone()));
1836
1837
let mut fut2 = Box::pin(inc(mu.clone()));
1838
1839
let arc_waker = Arc::new(TestWaker);
1840
let waker = waker_ref(&arc_waker);
1841
let mut cx = Context::from_waker(&waker);
1842
1843
// First acquire the lock.
1844
let count = block_on(mu.lock());
1845
1846
// Now poll the futures. Since the lock is acquired they will both get queued in the waiter
1847
// list.
1848
match fut1.as_mut().poll(&mut cx) {
1849
Poll::Pending => {}
1850
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1851
}
1852
1853
match fut2.as_mut().poll(&mut cx) {
1854
Poll::Pending => {}
1855
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1856
}
1857
1858
assert_eq!(
1859
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1860
WRITER_WAITING
1861
);
1862
1863
// Drop the lock. This should mark fut1 as ready to make progress.
1864
mem::drop(count);
1865
1866
// Now drop fut1. This should make fut2 ready to make progress.
1867
mem::drop(fut1);
1868
1869
// Since there was still another waiter in the list we shouldn't have cleared the
1870
// DESIGNATED_WAKER bit.
1871
assert_eq!(
1872
mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
1873
DESIGNATED_WAKER
1874
);
1875
1876
// Since the waiter was a writer, we should clear the WRITER_WAITING bit.
1877
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
1878
1879
match fut2.as_mut().poll(&mut cx) {
1880
Poll::Pending => panic!("Future is not ready to make progress"),
1881
Poll::Ready(()) => {}
1882
}
1883
1884
// Verify that we only incremented the count once.
1885
assert_eq!(*block_on(mu.lock()), 1);
1886
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1887
}
1888
1889
#[test]
1890
fn timeout() {
1891
async fn timed_lock(timer: oneshot::Receiver<()>, mu: Arc<RwLock<()>>) {
1892
select! {
1893
res = timer.fuse() => {
1894
match res {
1895
Ok(()) => {},
1896
Err(e) => panic!("Timer unexpectedly canceled: {e}"),
1897
}
1898
}
1899
_ = mu.lock().fuse() => panic!("Successfuly acquired lock"),
1900
}
1901
}
1902
1903
let mu = Arc::new(RwLock::new(()));
1904
let (tx, rx) = oneshot::channel();
1905
1906
let mut timeout = Box::pin(timed_lock(rx, mu.clone()));
1907
1908
let arc_waker = Arc::new(TestWaker);
1909
let waker = waker_ref(&arc_waker);
1910
let mut cx = Context::from_waker(&waker);
1911
1912
// Acquire the lock.
1913
let g = block_on(mu.lock());
1914
1915
// Poll the future.
1916
if let Poll::Ready(()) = timeout.as_mut().poll(&mut cx) {
1917
panic!("timed_lock unexpectedly ready");
1918
}
1919
1920
assert_eq!(
1921
mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1922
HAS_WAITERS
1923
);
1924
1925
// Signal the channel, which should cancel the lock.
1926
tx.send(()).expect("Failed to send wakeup");
1927
1928
// Now the future should have completed without acquiring the lock.
1929
if timeout.as_mut().poll(&mut cx).is_pending() {
1930
panic!("timed_lock not ready after timeout");
1931
}
1932
1933
// The rwlock state should not show any waiters.
1934
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
1935
1936
mem::drop(g);
1937
1938
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1939
}
1940
1941
#[test]
1942
fn writer_waiting() {
1943
async fn read_zero(mu: Arc<RwLock<usize>>) {
1944
let val = mu.read_lock().await;
1945
pending!();
1946
1947
assert_eq!(*val, 0);
1948
}
1949
1950
async fn inc(mu: Arc<RwLock<usize>>) {
1951
*mu.lock().await += 1;
1952
}
1953
1954
async fn read_one(mu: Arc<RwLock<usize>>) {
1955
let val = mu.read_lock().await;
1956
1957
assert_eq!(*val, 1);
1958
}
1959
1960
let mu = Arc::new(RwLock::new(0));
1961
1962
let mut r1 = Box::pin(read_zero(mu.clone()));
1963
let mut r2 = Box::pin(read_zero(mu.clone()));
1964
1965
let mut w = Box::pin(inc(mu.clone()));
1966
let mut r3 = Box::pin(read_one(mu.clone()));
1967
1968
let arc_waker = Arc::new(TestWaker);
1969
let waker = waker_ref(&arc_waker);
1970
let mut cx = Context::from_waker(&waker);
1971
1972
if let Poll::Ready(()) = r1.as_mut().poll(&mut cx) {
1973
panic!("read_zero unexpectedly ready");
1974
}
1975
if let Poll::Ready(()) = r2.as_mut().poll(&mut cx) {
1976
panic!("read_zero unexpectedly ready");
1977
}
1978
assert_eq!(
1979
mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1980
2 * READ_LOCK
1981
);
1982
1983
if let Poll::Ready(()) = w.as_mut().poll(&mut cx) {
1984
panic!("inc unexpectedly ready");
1985
}
1986
assert_eq!(
1987
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1988
WRITER_WAITING
1989
);
1990
1991
// The WRITER_WAITING bit should prevent the next reader from acquiring the lock.
1992
if let Poll::Ready(()) = r3.as_mut().poll(&mut cx) {
1993
panic!("read_one unexpectedly ready");
1994
}
1995
assert_eq!(
1996
mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1997
2 * READ_LOCK
1998
);
1999
2000
if r1.as_mut().poll(&mut cx).is_pending() {
2001
panic!("read_zero unable to complete");
2002
}
2003
if r2.as_mut().poll(&mut cx).is_pending() {
2004
panic!("read_zero unable to complete");
2005
}
2006
if w.as_mut().poll(&mut cx).is_pending() {
2007
panic!("inc unable to complete");
2008
}
2009
if r3.as_mut().poll(&mut cx).is_pending() {
2010
panic!("read_one unable to complete");
2011
}
2012
2013
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2014
}
2015
2016
#[test]
2017
fn notify_one() {
2018
async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2019
let mut count = mu.read_lock().await;
2020
while *count == 0 {
2021
count = cv.wait_read(count).await;
2022
}
2023
}
2024
2025
async fn write(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2026
let mut count = mu.lock().await;
2027
while *count == 0 {
2028
count = cv.wait(count).await;
2029
}
2030
2031
*count -= 1;
2032
}
2033
2034
let mu = Arc::new(RwLock::new(0));
2035
let cv = Arc::new(Condvar::new());
2036
2037
let arc_waker = Arc::new(TestWaker);
2038
let waker = waker_ref(&arc_waker);
2039
let mut cx = Context::from_waker(&waker);
2040
2041
let mut readers = [
2042
Box::pin(read(mu.clone(), cv.clone())),
2043
Box::pin(read(mu.clone(), cv.clone())),
2044
Box::pin(read(mu.clone(), cv.clone())),
2045
Box::pin(read(mu.clone(), cv.clone())),
2046
];
2047
let mut writer = Box::pin(write(mu.clone(), cv.clone()));
2048
2049
for r in &mut readers {
2050
if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
2051
panic!("reader unexpectedly ready");
2052
}
2053
}
2054
if let Poll::Ready(()) = writer.as_mut().poll(&mut cx) {
2055
panic!("writer unexpectedly ready");
2056
}
2057
2058
let mut count = block_on(mu.lock());
2059
*count = 1;
2060
2061
// This should wake all readers + one writer.
2062
cv.notify_one();
2063
2064
// Poll the readers and the writer so they add themselves to the rwlock's waiter list.
2065
for r in &mut readers {
2066
if r.as_mut().poll(&mut cx).is_ready() {
2067
panic!("reader unexpectedly ready");
2068
}
2069
}
2070
2071
if writer.as_mut().poll(&mut cx).is_ready() {
2072
panic!("writer unexpectedly ready");
2073
}
2074
2075
assert_eq!(
2076
mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
2077
HAS_WAITERS
2078
);
2079
assert_eq!(
2080
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
2081
WRITER_WAITING
2082
);
2083
2084
mem::drop(count);
2085
2086
assert_eq!(
2087
mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
2088
HAS_WAITERS | WRITER_WAITING
2089
);
2090
2091
for r in &mut readers {
2092
if r.as_mut().poll(&mut cx).is_pending() {
2093
panic!("reader unable to complete");
2094
}
2095
}
2096
2097
if writer.as_mut().poll(&mut cx).is_pending() {
2098
panic!("writer unable to complete");
2099
}
2100
2101
assert_eq!(*block_on(mu.read_lock()), 0);
2102
}
2103
2104
#[test]
2105
fn notify_when_unlocked() {
2106
async fn dec(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2107
let mut count = mu.lock().await;
2108
2109
while *count == 0 {
2110
count = cv.wait(count).await;
2111
}
2112
2113
*count -= 1;
2114
}
2115
2116
let mu = Arc::new(RwLock::new(0));
2117
let cv = Arc::new(Condvar::new());
2118
2119
let arc_waker = Arc::new(TestWaker);
2120
let waker = waker_ref(&arc_waker);
2121
let mut cx = Context::from_waker(&waker);
2122
2123
let mut futures = [
2124
Box::pin(dec(mu.clone(), cv.clone())),
2125
Box::pin(dec(mu.clone(), cv.clone())),
2126
Box::pin(dec(mu.clone(), cv.clone())),
2127
Box::pin(dec(mu.clone(), cv.clone())),
2128
];
2129
2130
for f in &mut futures {
2131
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2132
panic!("future unexpectedly ready");
2133
}
2134
}
2135
2136
*block_on(mu.lock()) = futures.len();
2137
cv.notify_all();
2138
2139
// Since we haven't polled `futures` yet, the rwlock should not have any waiters.
2140
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
2141
2142
for f in &mut futures {
2143
if f.as_mut().poll(&mut cx).is_pending() {
2144
panic!("future unexpectedly ready");
2145
}
2146
}
2147
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2148
}
2149
2150
#[test]
2151
fn notify_reader_writer() {
2152
async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2153
let mut count = mu.read_lock().await;
2154
while *count == 0 {
2155
count = cv.wait_read(count).await;
2156
}
2157
2158
// Yield once while holding the read lock, which should prevent the writer from waking
2159
// up.
2160
pending!();
2161
}
2162
2163
async fn write(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2164
let mut count = mu.lock().await;
2165
while *count == 0 {
2166
count = cv.wait(count).await;
2167
}
2168
2169
*count -= 1;
2170
}
2171
2172
async fn lock(mu: Arc<RwLock<usize>>) {
2173
mem::drop(mu.lock().await);
2174
}
2175
2176
let mu = Arc::new(RwLock::new(0));
2177
let cv = Arc::new(Condvar::new());
2178
2179
let arc_waker = Arc::new(TestWaker);
2180
let waker = waker_ref(&arc_waker);
2181
let mut cx = Context::from_waker(&waker);
2182
2183
let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
2184
Box::pin(read(mu.clone(), cv.clone())),
2185
Box::pin(read(mu.clone(), cv.clone())),
2186
Box::pin(read(mu.clone(), cv.clone())),
2187
Box::pin(write(mu.clone(), cv.clone())),
2188
Box::pin(read(mu.clone(), cv.clone())),
2189
];
2190
const NUM_READERS: usize = 4;
2191
2192
let mut l = Box::pin(lock(mu.clone()));
2193
2194
for f in &mut futures {
2195
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2196
panic!("future unexpectedly ready");
2197
}
2198
}
2199
2200
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2201
2202
let mut count = block_on(mu.lock());
2203
*count = 1;
2204
2205
// Now poll the lock function. Since the lock is held by us, it will get queued on the
2206
// waiter list.
2207
if let Poll::Ready(()) = l.as_mut().poll(&mut cx) {
2208
panic!("lock() unexpectedly ready");
2209
}
2210
2211
assert_eq!(
2212
mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
2213
HAS_WAITERS | WRITER_WAITING
2214
);
2215
2216
// Wake up waiters while holding the lock.
2217
cv.notify_all();
2218
2219
// Drop the lock. This should wake up the lock function.
2220
mem::drop(count);
2221
2222
if l.as_mut().poll(&mut cx).is_pending() {
2223
panic!("lock() unable to complete");
2224
}
2225
2226
// Since we haven't polled `futures` yet, the rwlock state should now be empty.
2227
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2228
2229
// Poll everything again. The readers should be able to make progress (but not complete) but
2230
// the writer should be blocked.
2231
for f in &mut futures {
2232
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2233
panic!("future unexpectedly ready");
2234
}
2235
}
2236
2237
assert_eq!(
2238
mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2239
READ_LOCK * NUM_READERS
2240
);
2241
2242
// All the readers can now finish but the writer needs to be polled again.
2243
let mut needs_poll = None;
2244
for (i, r) in futures.iter_mut().enumerate() {
2245
match r.as_mut().poll(&mut cx) {
2246
Poll::Ready(()) => {}
2247
Poll::Pending => {
2248
if needs_poll.is_some() {
2249
panic!("More than one future unable to complete");
2250
}
2251
needs_poll = Some(i);
2252
}
2253
}
2254
}
2255
2256
if futures[needs_poll.expect("Writer unexpectedly able to complete")]
2257
.as_mut()
2258
.poll(&mut cx)
2259
.is_pending()
2260
{
2261
panic!("Writer unable to complete");
2262
}
2263
2264
assert_eq!(*block_on(mu.lock()), 0);
2265
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2266
}
2267
2268
#[test]
2269
fn notify_readers_with_read_lock() {
2270
async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2271
let mut count = mu.read_lock().await;
2272
while *count == 0 {
2273
count = cv.wait_read(count).await;
2274
}
2275
2276
// Yield once while holding the read lock.
2277
pending!();
2278
}
2279
2280
let mu = Arc::new(RwLock::new(0));
2281
let cv = Arc::new(Condvar::new());
2282
2283
let arc_waker = Arc::new(TestWaker);
2284
let waker = waker_ref(&arc_waker);
2285
let mut cx = Context::from_waker(&waker);
2286
2287
let mut futures = [
2288
Box::pin(read(mu.clone(), cv.clone())),
2289
Box::pin(read(mu.clone(), cv.clone())),
2290
Box::pin(read(mu.clone(), cv.clone())),
2291
Box::pin(read(mu.clone(), cv.clone())),
2292
];
2293
2294
for f in &mut futures {
2295
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2296
panic!("future unexpectedly ready");
2297
}
2298
}
2299
2300
// Increment the count and then grab a read lock.
2301
*block_on(mu.lock()) = 1;
2302
2303
let g = block_on(mu.read_lock());
2304
2305
// Notify the condvar while holding the read lock. This should wake up all the waiters.
2306
cv.notify_all();
2307
2308
// Since the lock is held in shared mode, all the readers should immediately be able to
2309
// acquire the read lock.
2310
for f in &mut futures {
2311
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2312
panic!("future unexpectedly ready");
2313
}
2314
}
2315
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
2316
assert_eq!(
2317
mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2318
READ_LOCK * (futures.len() + 1)
2319
);
2320
2321
mem::drop(g);
2322
2323
for f in &mut futures {
2324
if f.as_mut().poll(&mut cx).is_pending() {
2325
panic!("future unable to complete");
2326
}
2327
}
2328
2329
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2330
}
2331
}
2332
2333