use std::cell::UnsafeCell;
use std::hint;
use std::mem;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread::yield_now;
use super::super::sync::waiter::Kind as WaiterKind;
use super::super::sync::waiter::Waiter;
use super::super::sync::waiter::WaiterAdapter;
use super::super::sync::waiter::WaiterList;
use super::super::sync::waiter::WaitingFor;
const LOCKED: usize = 1 << 0;
const HAS_WAITERS: usize = 1 << 1;
const DESIGNATED_WAKER: usize = 1 << 2;
const SPINLOCK: usize = 1 << 3;
const WRITER_WAITING: usize = 1 << 4;
const LONG_WAIT: usize = 1 << 5;
const READ_LOCK: usize = 1 << 8;
const READ_MASK: usize = !0xff;
const SPIN_THRESHOLD: usize = 7;
const LONG_WAIT_THRESHOLD: usize = 19;
trait Kind {
fn zero_to_acquire() -> usize;
fn add_to_acquire() -> usize;
fn set_when_waiting() -> usize;
fn clear_on_acquire() -> usize;
fn new_waiter(raw: &RawRwLock) -> Arc<Waiter>;
}
struct Shared;
impl Kind for Shared {
fn zero_to_acquire() -> usize {
LOCKED | WRITER_WAITING | LONG_WAIT
}
fn add_to_acquire() -> usize {
READ_LOCK
}
fn set_when_waiting() -> usize {
0
}
fn clear_on_acquire() -> usize {
0
}
fn new_waiter(raw: &RawRwLock) -> Arc<Waiter> {
Arc::new(Waiter::new(
WaiterKind::Shared,
cancel_waiter,
raw as *const RawRwLock as usize,
WaitingFor::Mutex,
))
}
}
struct Exclusive;
impl Kind for Exclusive {
fn zero_to_acquire() -> usize {
LOCKED | READ_MASK | LONG_WAIT
}
fn add_to_acquire() -> usize {
LOCKED
}
fn set_when_waiting() -> usize {
WRITER_WAITING
}
fn clear_on_acquire() -> usize {
WRITER_WAITING
}
fn new_waiter(raw: &RawRwLock) -> Arc<Waiter> {
Arc::new(Waiter::new(
WaiterKind::Exclusive,
cancel_waiter,
raw as *const RawRwLock as usize,
WaitingFor::Mutex,
))
}
}
fn get_wake_list(waiters: &mut WaiterList) -> (WaiterList, usize) {
let mut to_wake = WaiterList::new(WaiterAdapter::new());
let mut set_on_release = 0;
let mut cursor = waiters.front_mut();
let mut waking_readers = false;
while let Some(w) = cursor.get() {
match w.kind() {
WaiterKind::Exclusive if !waking_readers => {
let waiter = cursor.remove().unwrap();
waiter.set_waiting_for(WaitingFor::None);
to_wake.push_back(waiter);
break;
}
WaiterKind::Shared => {
let waiter = cursor.remove().unwrap();
waiter.set_waiting_for(WaitingFor::None);
to_wake.push_back(waiter);
waking_readers = true;
}
WaiterKind::Exclusive => {
set_on_release |= WRITER_WAITING;
cursor.move_next();
}
}
}
(to_wake, set_on_release)
}
#[inline]
fn cpu_relax(iterations: usize) {
for _ in 0..iterations {
hint::spin_loop();
}
}
pub(crate) struct RawRwLock {
state: AtomicUsize,
waiters: UnsafeCell<WaiterList>,
}
impl RawRwLock {
pub fn new() -> RawRwLock {
RawRwLock {
state: AtomicUsize::new(0),
waiters: UnsafeCell::new(WaiterList::new(WaiterAdapter::new())),
}
}
#[inline]
pub async fn lock(&self) {
match self
.state
.compare_exchange_weak(0, LOCKED, Ordering::Acquire, Ordering::Relaxed)
{
Ok(_) => {}
Err(oldstate) => {
if (oldstate & Exclusive::zero_to_acquire()) != 0
|| self
.state
.compare_exchange_weak(
oldstate,
(oldstate + Exclusive::add_to_acquire())
& !Exclusive::clear_on_acquire(),
Ordering::Acquire,
Ordering::Relaxed,
)
.is_err()
{
self.lock_slow::<Exclusive>(0, 0).await;
}
}
}
}
#[inline]
pub async fn read_lock(&self) {
match self
.state
.compare_exchange_weak(0, READ_LOCK, Ordering::Acquire, Ordering::Relaxed)
{
Ok(_) => {}
Err(oldstate) => {
if (oldstate & Shared::zero_to_acquire()) != 0
|| self
.state
.compare_exchange_weak(
oldstate,
(oldstate + Shared::add_to_acquire()) & !Shared::clear_on_acquire(),
Ordering::Acquire,
Ordering::Relaxed,
)
.is_err()
{
self.lock_slow::<Shared>(0, 0).await;
}
}
}
}
#[cold]
async fn lock_slow<K: Kind>(&self, mut clear: usize, zero_mask: usize) {
let mut zero_to_acquire = K::zero_to_acquire() & !zero_mask;
let mut spin_count = 0;
let mut wait_count = 0;
let mut waiter = None;
loop {
let oldstate = self.state.load(Ordering::Relaxed);
if (oldstate & zero_to_acquire) == 0 {
if self
.state
.compare_exchange_weak(
oldstate,
(oldstate + K::add_to_acquire()) & !(clear | K::clear_on_acquire()),
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
return;
}
} else if (oldstate & SPINLOCK) == 0 {
let w = waiter.get_or_insert_with(|| K::new_waiter(self));
w.reset(WaitingFor::Mutex);
if self
.state
.compare_exchange_weak(
oldstate,
(oldstate | SPINLOCK | HAS_WAITERS | K::set_when_waiting()) & !clear,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
let mut set_on_release = 0;
if wait_count < LONG_WAIT_THRESHOLD {
unsafe { (*self.waiters.get()).push_back(w.clone()) };
} else {
unsafe { (*self.waiters.get()).push_front(w.clone()) };
set_on_release |= LONG_WAIT;
clear |= LONG_WAIT;
zero_to_acquire &= !LONG_WAIT;
}
let mut state = oldstate;
loop {
match self.state.compare_exchange_weak(
state,
(state | set_on_release) & !SPINLOCK,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(w) => state = w,
}
}
w.wait().await;
clear |= DESIGNATED_WAKER;
zero_to_acquire &= !WRITER_WAITING;
spin_count = 0;
wait_count += 1;
continue;
}
}
if spin_count < SPIN_THRESHOLD {
cpu_relax(1 << spin_count);
spin_count += 1;
} else {
yield_now();
}
}
}
#[inline]
pub fn unlock(&self) {
let oldstate = self.state.fetch_sub(LOCKED, Ordering::Release);
debug_assert_eq!(
oldstate & READ_MASK,
0,
"`unlock` called on rwlock held in read-mode"
);
debug_assert_ne!(
oldstate & LOCKED,
0,
"`unlock` called on rwlock not held in write-mode"
);
if (oldstate & HAS_WAITERS) != 0 && (oldstate & DESIGNATED_WAKER) == 0 {
self.unlock_slow();
}
}
#[inline]
pub fn read_unlock(&self) {
let oldstate = self.state.fetch_sub(READ_LOCK, Ordering::Release);
debug_assert_eq!(
oldstate & LOCKED,
0,
"`read_unlock` called on rwlock held in write-mode"
);
debug_assert_ne!(
oldstate & READ_MASK,
0,
"`read_unlock` called on rwlock not held in read-mode"
);
if (oldstate & HAS_WAITERS) != 0
&& (oldstate & DESIGNATED_WAKER) == 0
&& (oldstate & READ_MASK) == READ_LOCK
{
self.unlock_slow();
}
}
#[cold]
fn unlock_slow(&self) {
let mut spin_count = 0;
loop {
let oldstate = self.state.load(Ordering::Relaxed);
if (oldstate & HAS_WAITERS) == 0 || (oldstate & DESIGNATED_WAKER) != 0 {
return;
} else if (oldstate & SPINLOCK) == 0 {
if self
.state
.compare_exchange_weak(
oldstate,
oldstate | SPINLOCK | DESIGNATED_WAKER,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
let mut clear = SPINLOCK;
let waiters = unsafe { &mut *self.waiters.get() };
let (wake_list, set_on_release) = get_wake_list(waiters);
if waiters.is_empty() {
clear |= HAS_WAITERS;
}
if wake_list.is_empty() {
clear |= DESIGNATED_WAKER;
}
let mut state = oldstate;
loop {
match self.state.compare_exchange_weak(
state,
(state | set_on_release) & !clear,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(w) => state = w,
}
}
for w in wake_list {
w.wake();
}
return;
}
}
if spin_count < SPIN_THRESHOLD {
cpu_relax(1 << spin_count);
spin_count += 1;
} else {
yield_now();
}
}
}
fn cancel_waiter(&self, waiter: &Waiter, wake_next: bool) {
let mut oldstate = self.state.load(Ordering::Relaxed);
while oldstate & SPINLOCK != 0
|| self
.state
.compare_exchange_weak(
oldstate,
oldstate | SPINLOCK,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_err()
{
hint::spin_loop();
oldstate = self.state.load(Ordering::Relaxed);
}
let waiters = unsafe { &mut *self.waiters.get() };
let mut clear = SPINLOCK;
if wake_next
|| waiters
.front()
.get()
.map(|front| std::ptr::eq(front, waiter))
.unwrap_or(false)
{
clear |= LONG_WAIT;
}
let waiting_for = waiter.is_waiting_for();
let old_waiter = if waiter.is_linked() && waiting_for == WaitingFor::Mutex {
let mut cursor = unsafe { waiters.cursor_mut_from_ptr(waiter as *const Waiter) };
cursor.remove()
} else {
None
};
let (wake_list, set_on_release) = if wake_next || waiting_for == WaitingFor::None {
get_wake_list(waiters)
} else {
(WaiterList::new(WaiterAdapter::new()), 0)
};
if waiters.is_empty() {
clear |= HAS_WAITERS;
}
if wake_list.is_empty() {
clear |= DESIGNATED_WAKER;
}
if let WaiterKind::Exclusive = waiter.kind() {
clear |= WRITER_WAITING;
}
while self
.state
.compare_exchange_weak(
oldstate,
(oldstate & !clear) | set_on_release,
Ordering::Release,
Ordering::Relaxed,
)
.is_err()
{
hint::spin_loop();
oldstate = self.state.load(Ordering::Relaxed);
}
for w in wake_list {
w.wake();
}
mem::drop(old_waiter);
}
}
#[allow(clippy::undocumented_unsafe_blocks)]
unsafe impl Send for RawRwLock {}
#[allow(clippy::undocumented_unsafe_blocks)]
unsafe impl Sync for RawRwLock {}
fn cancel_waiter(raw: usize, waiter: &Waiter, wake_next: bool) {
let raw_rwlock = raw as *const RawRwLock;
unsafe { (*raw_rwlock).cancel_waiter(waiter, wake_next) }
}
#[repr(align(128))]
pub struct RwLock<T: ?Sized> {
raw: RawRwLock,
value: UnsafeCell<T>,
}
impl<T> RwLock<T> {
pub fn new(v: T) -> RwLock<T> {
RwLock {
raw: RawRwLock::new(),
value: UnsafeCell::new(v),
}
}
pub fn into_inner(self) -> T {
self.value.into_inner()
}
}
impl<T: ?Sized> RwLock<T> {
#[inline]
pub async fn lock(&self) -> RwLockWriteGuard<'_, T> {
self.raw.lock().await;
RwLockWriteGuard {
mu: self,
value: unsafe { &mut *self.value.get() },
}
}
#[inline]
pub async fn read_lock(&self) -> RwLockReadGuard<'_, T> {
self.raw.read_lock().await;
RwLockReadGuard {
mu: self,
value: unsafe { &*self.value.get() },
}
}
#[inline]
pub(crate) async fn lock_from_cv(&self) -> RwLockWriteGuard<'_, T> {
self.raw.lock_slow::<Exclusive>(DESIGNATED_WAKER, 0).await;
RwLockWriteGuard {
mu: self,
value: unsafe { &mut *self.value.get() },
}
}
#[inline]
pub(crate) async fn read_lock_from_cv(&self) -> RwLockReadGuard<'_, T> {
self.raw
.lock_slow::<Shared>(DESIGNATED_WAKER, WRITER_WAITING)
.await;
RwLockReadGuard {
mu: self,
value: unsafe { &*self.value.get() },
}
}
#[inline]
fn unlock(&self) {
self.raw.unlock();
}
#[inline]
fn read_unlock(&self) {
self.raw.read_unlock();
}
pub fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.value.get() }
}
}
#[allow(clippy::undocumented_unsafe_blocks)]
unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
#[allow(clippy::undocumented_unsafe_blocks)]
unsafe impl<T: ?Sized + Send> Sync for RwLock<T> {}
impl<T: Default> Default for RwLock<T> {
fn default() -> Self {
Self::new(Default::default())
}
}
impl<T> From<T> for RwLock<T> {
fn from(source: T) -> Self {
Self::new(source)
}
}
pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> {
mu: &'a RwLock<T>,
value: &'a mut T,
}
impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
pub(crate) fn into_inner(self) -> &'a RwLock<T> {
self.mu
}
pub(crate) fn as_raw_rwlock(&self) -> &RawRwLock {
&self.mu.raw
}
}
impl<T: ?Sized> Deref for RwLockWriteGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.value
}
}
impl<T: ?Sized> DerefMut for RwLockWriteGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.value
}
}
impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
fn drop(&mut self) {
self.mu.unlock()
}
}
pub struct RwLockReadGuard<'a, T: ?Sized + 'a> {
mu: &'a RwLock<T>,
value: &'a T,
}
impl<'a, T: ?Sized> RwLockReadGuard<'a, T> {
pub(crate) fn into_inner(self) -> &'a RwLock<T> {
self.mu
}
pub(crate) fn as_raw_rwlock(&self) -> &RawRwLock {
&self.mu.raw
}
}
impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.value
}
}
impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
fn drop(&mut self) {
self.mu.read_unlock()
}
}
#[cfg(any(target_os = "android", target_os = "linux"))]
#[cfg(test)]
mod test {
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::mpsc::channel;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
use std::thread;
use std::time::Duration;
use futures::channel::oneshot;
use futures::pending;
use futures::select;
use futures::task::waker_ref;
use futures::task::ArcWake;
use futures::FutureExt;
use futures_executor::LocalPool;
use futures_executor::ThreadPool;
use futures_util::task::LocalSpawnExt;
use super::super::super::block_on;
use super::super::super::sync::Condvar;
use super::super::super::sync::SpinLock;
use super::*;
#[derive(Debug, Eq, PartialEq)]
struct NonCopy(u32);
struct TestWaker;
impl ArcWake for TestWaker {
fn wake_by_ref(_arc_self: &Arc<Self>) {}
}
#[test]
fn it_works() {
let mu = RwLock::new(NonCopy(13));
assert_eq!(*block_on(mu.lock()), NonCopy(13));
}
#[test]
fn smoke() {
let mu = RwLock::new(NonCopy(7));
mem::drop(block_on(mu.lock()));
mem::drop(block_on(mu.lock()));
}
#[test]
fn rw_smoke() {
let mu = RwLock::new(NonCopy(7));
mem::drop(block_on(mu.lock()));
mem::drop(block_on(mu.read_lock()));
mem::drop((block_on(mu.read_lock()), block_on(mu.read_lock())));
mem::drop(block_on(mu.lock()));
}
#[test]
fn async_smoke() {
async fn lock(mu: Rc<RwLock<NonCopy>>) {
mu.lock().await;
}
async fn read_lock(mu: Rc<RwLock<NonCopy>>) {
mu.read_lock().await;
}
async fn double_read_lock(mu: Rc<RwLock<NonCopy>>) {
let first = mu.read_lock().await;
mu.read_lock().await;
first.as_raw_rwlock();
}
let mu = Rc::new(RwLock::new(NonCopy(7)));
let mut ex = LocalPool::new();
let spawner = ex.spawner();
spawner
.spawn_local(lock(Rc::clone(&mu)))
.expect("Failed to spawn future");
spawner
.spawn_local(read_lock(Rc::clone(&mu)))
.expect("Failed to spawn future");
spawner
.spawn_local(double_read_lock(Rc::clone(&mu)))
.expect("Failed to spawn future");
spawner
.spawn_local(lock(Rc::clone(&mu)))
.expect("Failed to spawn future");
ex.run();
}
#[test]
fn send() {
let mu = RwLock::new(NonCopy(19));
thread::spawn(move || {
let value = block_on(mu.lock());
assert_eq!(*value, NonCopy(19));
})
.join()
.unwrap();
}
#[test]
fn arc_nested() {
let mu = RwLock::new(1);
let arc = Arc::new(RwLock::new(mu));
thread::spawn(move || {
let nested = block_on(arc.lock());
let lock2 = block_on(nested.lock());
assert_eq!(*lock2, 1);
})
.join()
.unwrap();
}
#[test]
fn arc_access_in_unwind() {
let arc = Arc::new(RwLock::new(1));
let arc2 = arc.clone();
thread::spawn(move || {
struct Unwinder {
i: Arc<RwLock<i32>>,
}
impl Drop for Unwinder {
fn drop(&mut self) {
*block_on(self.i.lock()) += 1;
}
}
let _u = Unwinder { i: arc2 };
panic!();
})
.join()
.expect_err("thread did not panic");
let lock = block_on(arc.lock());
assert_eq!(*lock, 2);
}
#[test]
fn unsized_value() {
let rwlock: &RwLock<[i32]> = &RwLock::new([1, 2, 3]);
{
let b = &mut *block_on(rwlock.lock());
b[0] = 4;
b[2] = 5;
}
let expected: &[i32] = &[4, 2, 5];
assert_eq!(&*block_on(rwlock.lock()), expected);
}
#[test]
fn high_contention() {
const THREADS: usize = 17;
const ITERATIONS: usize = 103;
let mut threads = Vec::with_capacity(THREADS);
let mu = Arc::new(RwLock::new(0usize));
for _ in 0..THREADS {
let mu2 = mu.clone();
threads.push(thread::spawn(move || {
for _ in 0..ITERATIONS {
*block_on(mu2.lock()) += 1;
}
}));
}
for t in threads.into_iter() {
t.join().unwrap();
}
assert_eq!(*block_on(mu.read_lock()), THREADS * ITERATIONS);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn high_contention_with_cancel() {
const TASKS: usize = 17;
const ITERATIONS: usize = 103;
async fn increment(mu: Arc<RwLock<usize>>, alt_mu: Arc<RwLock<usize>>, tx: Sender<()>) {
for _ in 0..ITERATIONS {
select! {
mut count = mu.lock().fuse() => *count += 1,
mut count = alt_mu.lock().fuse() => *count += 1,
}
}
tx.send(()).expect("Failed to send completion signal");
}
let ex = ThreadPool::new().expect("Failed to create ThreadPool");
let mu = Arc::new(RwLock::new(0usize));
let alt_mu = Arc::new(RwLock::new(0usize));
let (tx, rx) = channel();
for _ in 0..TASKS {
ex.spawn_ok(increment(Arc::clone(&mu), Arc::clone(&alt_mu), tx.clone()));
}
for _ in 0..TASKS {
if let Err(e) = rx.recv_timeout(Duration::from_secs(10)) {
panic!("Error while waiting for threads to complete: {e}");
}
}
assert_eq!(
*block_on(mu.read_lock()) + *block_on(alt_mu.read_lock()),
TASKS * ITERATIONS
);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
assert_eq!(alt_mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn single_thread_async() {
const TASKS: usize = 17;
const ITERATIONS: usize = 103;
async fn increment(mu: Rc<RwLock<usize>>) {
for _ in 0..ITERATIONS {
*mu.lock().await += 1;
}
}
let mut ex = LocalPool::new();
let spawner = ex.spawner();
let mu = Rc::new(RwLock::new(0usize));
for _ in 0..TASKS {
spawner
.spawn_local(increment(Rc::clone(&mu)))
.expect("Failed to spawn task");
}
ex.run();
assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn multi_thread_async() {
const TASKS: usize = 17;
const ITERATIONS: usize = 103;
async fn increment(mu: Arc<RwLock<usize>>, tx: Sender<()>) {
for _ in 0..ITERATIONS {
*mu.lock().await += 1;
}
tx.send(()).expect("Failed to send completion signal");
}
let ex = ThreadPool::new().expect("Failed to create ThreadPool");
let mu = Arc::new(RwLock::new(0usize));
let (tx, rx) = channel();
for _ in 0..TASKS {
ex.spawn_ok(increment(Arc::clone(&mu), tx.clone()));
}
for _ in 0..TASKS {
rx.recv_timeout(Duration::from_secs(5))
.expect("Failed to receive completion signal");
}
assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn get_mut() {
let mut mu = RwLock::new(NonCopy(13));
*mu.get_mut() = NonCopy(17);
assert_eq!(mu.into_inner(), NonCopy(17));
}
#[test]
fn into_inner() {
let mu = RwLock::new(NonCopy(29));
assert_eq!(mu.into_inner(), NonCopy(29));
}
#[test]
fn into_inner_drop() {
struct NeedsDrop(Arc<AtomicUsize>);
impl Drop for NeedsDrop {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::AcqRel);
}
}
let value = Arc::new(AtomicUsize::new(0));
let needs_drop = RwLock::new(NeedsDrop(value.clone()));
assert_eq!(value.load(Ordering::Acquire), 0);
{
let inner = needs_drop.into_inner();
assert_eq!(inner.0.load(Ordering::Acquire), 0);
}
assert_eq!(value.load(Ordering::Acquire), 1);
}
#[test]
fn rw_arc() {
const THREADS: isize = 7;
const ITERATIONS: isize = 13;
let mu = Arc::new(RwLock::new(0isize));
let mu2 = mu.clone();
let (tx, rx) = channel();
thread::spawn(move || {
let mut guard = block_on(mu2.lock());
for _ in 0..ITERATIONS {
let tmp = *guard;
*guard = -1;
thread::yield_now();
*guard = tmp + 1;
}
tx.send(()).unwrap();
});
let mut readers = Vec::with_capacity(10);
for _ in 0..THREADS {
let mu3 = mu.clone();
let handle = thread::spawn(move || {
let guard = block_on(mu3.read_lock());
assert!(*guard >= 0);
});
readers.push(handle);
}
for r in readers {
r.join().expect("One or more readers saw a negative value");
}
rx.recv_timeout(Duration::from_secs(5)).unwrap();
assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn rw_single_thread_async() {
struct TestFuture {
polled: bool,
waker: Arc<SpinLock<Option<Waker>>>,
}
impl Future for TestFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if self.polled {
Poll::Ready(())
} else {
self.polled = true;
*self.waker.lock() = Some(cx.waker().clone());
Poll::Pending
}
}
}
fn wake_future(waker: Arc<SpinLock<Option<Waker>>>) {
loop {
if let Some(w) = waker.lock().take() {
w.wake();
return;
}
thread::sleep(Duration::from_millis(10));
}
}
async fn writer(mu: Rc<RwLock<isize>>) {
let mut guard = mu.lock().await;
for _ in 0..ITERATIONS {
let tmp = *guard;
*guard = -1;
let waker = Arc::new(SpinLock::new(None));
let waker2 = Arc::clone(&waker);
thread::spawn(move || wake_future(waker2));
let fut = TestFuture {
polled: false,
waker,
};
fut.await;
*guard = tmp + 1;
}
}
async fn reader(mu: Rc<RwLock<isize>>) {
let guard = mu.read_lock().await;
assert!(*guard >= 0);
}
const TASKS: isize = 7;
const ITERATIONS: isize = 13;
let mu = Rc::new(RwLock::new(0isize));
let mut ex = LocalPool::new();
let spawner = ex.spawner();
spawner
.spawn_local(writer(Rc::clone(&mu)))
.expect("Failed to spawn writer");
for _ in 0..TASKS {
spawner
.spawn_local(reader(Rc::clone(&mu)))
.expect("Failed to spawn reader");
}
ex.run();
assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn rw_multi_thread_async() {
async fn writer(mu: Arc<RwLock<isize>>, tx: Sender<()>) {
let mut guard = mu.lock().await;
for _ in 0..ITERATIONS {
let tmp = *guard;
*guard = -1;
thread::yield_now();
*guard = tmp + 1;
}
mem::drop(guard);
tx.send(()).unwrap();
}
async fn reader(mu: Arc<RwLock<isize>>, tx: Sender<()>) {
let guard = mu.read_lock().await;
assert!(*guard >= 0);
mem::drop(guard);
tx.send(()).expect("Failed to send completion message");
}
const TASKS: isize = 7;
const ITERATIONS: isize = 13;
let mu = Arc::new(RwLock::new(0isize));
let ex = ThreadPool::new().expect("Failed to create ThreadPool");
let (txw, rxw) = channel();
ex.spawn_ok(writer(Arc::clone(&mu), txw));
let (txr, rxr) = channel();
for _ in 0..TASKS {
ex.spawn_ok(reader(Arc::clone(&mu), txr.clone()));
}
for _ in 0..TASKS {
rxr.recv_timeout(Duration::from_secs(5))
.expect("Failed to receive completion message from reader");
}
rxw.recv_timeout(Duration::from_secs(5))
.expect("Failed to receive completion message from writer");
assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn wake_all_readers() {
async fn read(mu: Arc<RwLock<()>>) {
let g = mu.read_lock().await;
pending!();
mem::drop(g);
}
async fn write(mu: Arc<RwLock<()>>) {
mu.lock().await;
}
let mu = Arc::new(RwLock::new(()));
let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
Box::pin(read(mu.clone())),
Box::pin(read(mu.clone())),
Box::pin(read(mu.clone())),
Box::pin(write(mu.clone())),
Box::pin(read(mu.clone())),
];
const NUM_READERS: usize = 4;
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let g = block_on(mu.lock());
for r in &mut futures {
if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
HAS_WAITERS
);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
WRITER_WAITING
);
mem::drop(g);
for r in &mut futures {
if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
READ_LOCK * NUM_READERS
);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
WRITER_WAITING
);
let mut needs_poll = None;
for (i, r) in futures.iter_mut().enumerate() {
match r.as_mut().poll(&mut cx) {
Poll::Ready(()) => {}
Poll::Pending => {
if needs_poll.is_some() {
panic!("More than one future unable to complete");
}
needs_poll = Some(i);
}
}
}
if futures[needs_poll.expect("Writer unexpectedly able to complete")]
.as_mut()
.poll(&mut cx)
.is_pending()
{
panic!("Writer unable to complete");
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn long_wait() {
async fn tight_loop(mu: Arc<RwLock<bool>>) {
loop {
let ready = mu.lock().await;
if *ready {
break;
}
pending!();
}
}
async fn mark_ready(mu: Arc<RwLock<bool>>) {
*mu.lock().await = true;
}
let mu = Arc::new(RwLock::new(false));
let mut tl = Box::pin(tight_loop(mu.clone()));
let mut mark = Box::pin(mark_ready(mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
for _ in 0..=LONG_WAIT_THRESHOLD {
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
panic!("tight_loop unexpectedly ready");
}
if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
panic!("mark_ready unexpectedly ready");
}
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed),
LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
);
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
panic!("tight_loop unexpectedly ready");
}
if mark.as_mut().poll(&mut cx).is_pending() {
panic!("mark_ready not able to make progress");
}
if tl.as_mut().poll(&mut cx).is_pending() {
panic!("tight_loop not able to finish");
}
assert!(*block_on(mu.lock()));
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn cancel_long_wait_before_wake() {
async fn tight_loop(mu: Arc<RwLock<bool>>) {
loop {
let ready = mu.lock().await;
if *ready {
break;
}
pending!();
}
}
async fn mark_ready(mu: Arc<RwLock<bool>>) {
*mu.lock().await = true;
}
let mu = Arc::new(RwLock::new(false));
let mut tl = Box::pin(tight_loop(mu.clone()));
let mut mark = Box::pin(mark_ready(mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
for _ in 0..=LONG_WAIT_THRESHOLD {
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
panic!("tight_loop unexpectedly ready");
}
if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
panic!("mark_ready unexpectedly ready");
}
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed),
LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
);
mem::drop(mark);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), LOCKED);
mem::drop(tl);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn cancel_long_wait_after_wake() {
async fn tight_loop(mu: Arc<RwLock<bool>>) {
loop {
let ready = mu.lock().await;
if *ready {
break;
}
pending!();
}
}
async fn mark_ready(mu: Arc<RwLock<bool>>) {
*mu.lock().await = true;
}
let mu = Arc::new(RwLock::new(false));
let mut tl = Box::pin(tight_loop(mu.clone()));
let mut mark = Box::pin(mark_ready(mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
for _ in 0..=LONG_WAIT_THRESHOLD {
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
panic!("tight_loop unexpectedly ready");
}
if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
panic!("mark_ready unexpectedly ready");
}
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed),
LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
);
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
panic!("tight_loop unexpectedly ready");
}
mem::drop(mark);
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & LONG_WAIT, 0);
block_on(mark_ready(mu.clone()));
if tl.as_mut().poll(&mut cx).is_pending() {
panic!("tight_loop not able to finish");
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn designated_waker() {
async fn inc(mu: Arc<RwLock<usize>>) {
*mu.lock().await += 1;
}
let mu = Arc::new(RwLock::new(0));
let mut futures = [
Box::pin(inc(mu.clone())),
Box::pin(inc(mu.clone())),
Box::pin(inc(mu.clone())),
];
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let count = block_on(mu.lock());
if let Poll::Ready(()) = futures[0].as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
if let Poll::Ready(()) = futures[1].as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed),
LOCKED | HAS_WAITERS | WRITER_WAITING,
);
mem::drop(count);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed),
DESIGNATED_WAKER | HAS_WAITERS | WRITER_WAITING,
);
if futures[2].as_mut().poll(&mut cx).is_pending() {
panic!("future unable to complete");
}
assert_eq!(*block_on(mu.lock()), 1);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
DESIGNATED_WAKER
);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
HAS_WAITERS
);
if futures[0].as_mut().poll(&mut cx).is_pending() {
panic!("future unable to complete");
}
assert_eq!(*block_on(mu.lock()), 2);
if futures[1].as_mut().poll(&mut cx).is_pending() {
panic!("future unable to complete");
}
assert_eq!(*block_on(mu.lock()), 3);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn cancel_designated_waker() {
async fn inc(mu: Arc<RwLock<usize>>) {
*mu.lock().await += 1;
}
let mu = Arc::new(RwLock::new(0));
let mut fut = Box::pin(inc(mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let count = block_on(mu.lock());
if let Poll::Ready(()) = fut.as_mut().poll(&mut cx) {
panic!("Future unexpectedly ready when lock is held");
}
mem::drop(count);
mem::drop(fut);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn cancel_before_wake() {
async fn inc(mu: Arc<RwLock<usize>>) {
*mu.lock().await += 1;
}
let mu = Arc::new(RwLock::new(0));
let mut fut1 = Box::pin(inc(mu.clone()));
let mut fut2 = Box::pin(inc(mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let count = block_on(mu.lock());
match fut1.as_mut().poll(&mut cx) {
Poll::Pending => {}
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
}
match fut2.as_mut().poll(&mut cx) {
Poll::Pending => {}
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
WRITER_WAITING
);
mem::drop(fut1);
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER, 0);
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
match fut2.as_mut().poll(&mut cx) {
Poll::Pending => {}
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
}
mem::drop(count);
match fut2.as_mut().poll(&mut cx) {
Poll::Pending => panic!("Future is not ready to make progress"),
Poll::Ready(()) => {}
}
assert_eq!(*block_on(mu.lock()), 1);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn cancel_after_wake() {
async fn inc(mu: Arc<RwLock<usize>>) {
*mu.lock().await += 1;
}
let mu = Arc::new(RwLock::new(0));
let mut fut1 = Box::pin(inc(mu.clone()));
let mut fut2 = Box::pin(inc(mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let count = block_on(mu.lock());
match fut1.as_mut().poll(&mut cx) {
Poll::Pending => {}
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
}
match fut2.as_mut().poll(&mut cx) {
Poll::Pending => {}
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
WRITER_WAITING
);
mem::drop(count);
mem::drop(fut1);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
DESIGNATED_WAKER
);
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
match fut2.as_mut().poll(&mut cx) {
Poll::Pending => panic!("Future is not ready to make progress"),
Poll::Ready(()) => {}
}
assert_eq!(*block_on(mu.lock()), 1);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn timeout() {
async fn timed_lock(timer: oneshot::Receiver<()>, mu: Arc<RwLock<()>>) {
select! {
res = timer.fuse() => {
match res {
Ok(()) => {},
Err(e) => panic!("Timer unexpectedly canceled: {e}"),
}
}
_ = mu.lock().fuse() => panic!("Successfuly acquired lock"),
}
}
let mu = Arc::new(RwLock::new(()));
let (tx, rx) = oneshot::channel();
let mut timeout = Box::pin(timed_lock(rx, mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let g = block_on(mu.lock());
if let Poll::Ready(()) = timeout.as_mut().poll(&mut cx) {
panic!("timed_lock unexpectedly ready");
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
HAS_WAITERS
);
tx.send(()).expect("Failed to send wakeup");
if timeout.as_mut().poll(&mut cx).is_pending() {
panic!("timed_lock not ready after timeout");
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
mem::drop(g);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn writer_waiting() {
async fn read_zero(mu: Arc<RwLock<usize>>) {
let val = mu.read_lock().await;
pending!();
assert_eq!(*val, 0);
}
async fn inc(mu: Arc<RwLock<usize>>) {
*mu.lock().await += 1;
}
async fn read_one(mu: Arc<RwLock<usize>>) {
let val = mu.read_lock().await;
assert_eq!(*val, 1);
}
let mu = Arc::new(RwLock::new(0));
let mut r1 = Box::pin(read_zero(mu.clone()));
let mut r2 = Box::pin(read_zero(mu.clone()));
let mut w = Box::pin(inc(mu.clone()));
let mut r3 = Box::pin(read_one(mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
if let Poll::Ready(()) = r1.as_mut().poll(&mut cx) {
panic!("read_zero unexpectedly ready");
}
if let Poll::Ready(()) = r2.as_mut().poll(&mut cx) {
panic!("read_zero unexpectedly ready");
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2 * READ_LOCK
);
if let Poll::Ready(()) = w.as_mut().poll(&mut cx) {
panic!("inc unexpectedly ready");
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
WRITER_WAITING
);
if let Poll::Ready(()) = r3.as_mut().poll(&mut cx) {
panic!("read_one unexpectedly ready");
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2 * READ_LOCK
);
if r1.as_mut().poll(&mut cx).is_pending() {
panic!("read_zero unable to complete");
}
if r2.as_mut().poll(&mut cx).is_pending() {
panic!("read_zero unable to complete");
}
if w.as_mut().poll(&mut cx).is_pending() {
panic!("inc unable to complete");
}
if r3.as_mut().poll(&mut cx).is_pending() {
panic!("read_one unable to complete");
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn notify_one() {
async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
let mut count = mu.read_lock().await;
while *count == 0 {
count = cv.wait_read(count).await;
}
}
async fn write(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
let mut count = mu.lock().await;
while *count == 0 {
count = cv.wait(count).await;
}
*count -= 1;
}
let mu = Arc::new(RwLock::new(0));
let cv = Arc::new(Condvar::new());
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let mut readers = [
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
];
let mut writer = Box::pin(write(mu.clone(), cv.clone()));
for r in &mut readers {
if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
panic!("reader unexpectedly ready");
}
}
if let Poll::Ready(()) = writer.as_mut().poll(&mut cx) {
panic!("writer unexpectedly ready");
}
let mut count = block_on(mu.lock());
*count = 1;
cv.notify_one();
for r in &mut readers {
if r.as_mut().poll(&mut cx).is_ready() {
panic!("reader unexpectedly ready");
}
}
if writer.as_mut().poll(&mut cx).is_ready() {
panic!("writer unexpectedly ready");
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
HAS_WAITERS
);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
WRITER_WAITING
);
mem::drop(count);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
HAS_WAITERS | WRITER_WAITING
);
for r in &mut readers {
if r.as_mut().poll(&mut cx).is_pending() {
panic!("reader unable to complete");
}
}
if writer.as_mut().poll(&mut cx).is_pending() {
panic!("writer unable to complete");
}
assert_eq!(*block_on(mu.read_lock()), 0);
}
#[test]
fn notify_when_unlocked() {
async fn dec(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
let mut count = mu.lock().await;
while *count == 0 {
count = cv.wait(count).await;
}
*count -= 1;
}
let mu = Arc::new(RwLock::new(0));
let cv = Arc::new(Condvar::new());
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let mut futures = [
Box::pin(dec(mu.clone(), cv.clone())),
Box::pin(dec(mu.clone(), cv.clone())),
Box::pin(dec(mu.clone(), cv.clone())),
Box::pin(dec(mu.clone(), cv.clone())),
];
for f in &mut futures {
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
*block_on(mu.lock()) = futures.len();
cv.notify_all();
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
for f in &mut futures {
if f.as_mut().poll(&mut cx).is_pending() {
panic!("future unexpectedly ready");
}
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn notify_reader_writer() {
async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
let mut count = mu.read_lock().await;
while *count == 0 {
count = cv.wait_read(count).await;
}
pending!();
}
async fn write(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
let mut count = mu.lock().await;
while *count == 0 {
count = cv.wait(count).await;
}
*count -= 1;
}
async fn lock(mu: Arc<RwLock<usize>>) {
mem::drop(mu.lock().await);
}
let mu = Arc::new(RwLock::new(0));
let cv = Arc::new(Condvar::new());
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(write(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
];
const NUM_READERS: usize = 4;
let mut l = Box::pin(lock(mu.clone()));
for f in &mut futures {
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
let mut count = block_on(mu.lock());
*count = 1;
if let Poll::Ready(()) = l.as_mut().poll(&mut cx) {
panic!("lock() unexpectedly ready");
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
HAS_WAITERS | WRITER_WAITING
);
cv.notify_all();
mem::drop(count);
if l.as_mut().poll(&mut cx).is_pending() {
panic!("lock() unable to complete");
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
for f in &mut futures {
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
READ_LOCK * NUM_READERS
);
let mut needs_poll = None;
for (i, r) in futures.iter_mut().enumerate() {
match r.as_mut().poll(&mut cx) {
Poll::Ready(()) => {}
Poll::Pending => {
if needs_poll.is_some() {
panic!("More than one future unable to complete");
}
needs_poll = Some(i);
}
}
}
if futures[needs_poll.expect("Writer unexpectedly able to complete")]
.as_mut()
.poll(&mut cx)
.is_pending()
{
panic!("Writer unable to complete");
}
assert_eq!(*block_on(mu.lock()), 0);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn notify_readers_with_read_lock() {
async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
let mut count = mu.read_lock().await;
while *count == 0 {
count = cv.wait_read(count).await;
}
pending!();
}
let mu = Arc::new(RwLock::new(0));
let cv = Arc::new(Condvar::new());
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let mut futures = [
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
];
for f in &mut futures {
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
*block_on(mu.lock()) = 1;
let g = block_on(mu.read_lock());
cv.notify_all();
for f in &mut futures {
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
READ_LOCK * (futures.len() + 1)
);
mem::drop(g);
for f in &mut futures {
if f.as_mut().poll(&mut cx).is_pending() {
panic!("future unable to complete");
}
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
}