Path: blob/main/cros_async/src/blocking/sys/linux/block_on.rs
5394 views
// Copyright 2020 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34use std::future::Future;5use std::ptr;6use std::sync::atomic::AtomicI32;7use std::sync::atomic::Ordering;8use std::sync::Arc;9use std::task::Context;10use std::task::Poll;1112use futures::pin_mut;13use futures::task::waker_ref;14use futures::task::ArcWake;1516// Randomly generated values to indicate the state of the current thread.17const WAITING: i32 = 0x25de_74d1;18const WOKEN: i32 = 0x72d3_2c9f;1920const FUTEX_WAIT_PRIVATE: libc::c_int = libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG;21const FUTEX_WAKE_PRIVATE: libc::c_int = libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG;2223thread_local!(static PER_THREAD_WAKER: Arc<Waker> = Arc::new(Waker(AtomicI32::new(WAITING))));2425#[repr(transparent)]26struct Waker(AtomicI32);2728impl ArcWake for Waker {29fn wake_by_ref(arc_self: &Arc<Self>) {30let state = arc_self.0.swap(WOKEN, Ordering::Release);31if state == WAITING {32// SAFETY:33// The thread hasn't already been woken up so wake it up now. Safe because this doesn't34// modify any memory and we check the return value.35let res = unsafe {36libc::syscall(37libc::SYS_futex,38&arc_self.0,39FUTEX_WAKE_PRIVATE,40libc::INT_MAX, // val41ptr::null::<*const libc::timespec>(), // timeout42ptr::null::<*const libc::c_int>(), // uaddr2430_i32, // val344)45};46if res < 0 {47panic!(48"unexpected error from FUTEX_WAKE_PRIVATE: {}",49std::io::Error::last_os_error()50);51}52}53}54}5556/// Run a future to completion on the current thread.57///58/// This method will block the current thread until `f` completes. Useful when you need to call an59/// async fn from a non-async context.60pub fn block_on<F: Future>(f: F) -> F::Output {61pin_mut!(f);6263PER_THREAD_WAKER.with(|thread_waker| {64let waker = waker_ref(thread_waker);65let mut cx = Context::from_waker(&waker);6667loop {68if let Poll::Ready(t) = f.as_mut().poll(&mut cx) {69return t;70}7172let state = thread_waker.0.swap(WAITING, Ordering::Acquire);73if state == WAITING {74// SAFETY:75// If we weren't already woken up then wait until we are. Safe because this doesn't76// modify any memory and we check the return value.77let res = unsafe {78libc::syscall(79libc::SYS_futex,80&thread_waker.0,81FUTEX_WAIT_PRIVATE,82state,83ptr::null::<*const libc::timespec>(), // timeout84ptr::null::<*const libc::c_int>(), // uaddr2850_i32, // val386)87};8889if res < 0 {90let e = std::io::Error::last_os_error();91match e.raw_os_error() {92Some(libc::EAGAIN) | Some(libc::EINTR) => {}93_ => panic!("unexpected error from FUTEX_WAIT_PRIVATE: {e}"),94}95}9697// Clear the state to prevent unnecessary extra loop iterations and also to allow98// nested usage of `block_on`.99thread_waker.0.store(WAITING, Ordering::Release);100}101}102})103}104105#[cfg(test)]106mod test {107use std::future::Future;108use std::pin::Pin;109use std::sync::mpsc::channel;110use std::sync::mpsc::Sender;111use std::sync::Arc;112use std::task::Context;113use std::task::Poll;114use std::task::Waker;115use std::thread;116use std::time::Duration;117118use super::*;119use crate::sync::SpinLock;120121struct TimerState {122fired: bool,123waker: Option<Waker>,124}125struct Timer {126state: Arc<SpinLock<TimerState>>,127}128129impl Future for Timer {130type Output = ();131132fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {133let mut state = self.state.lock();134if state.fired {135return Poll::Ready(());136}137138state.waker = Some(cx.waker().clone());139Poll::Pending140}141}142143fn start_timer(dur: Duration, notify: Option<Sender<()>>) -> Timer {144let state = Arc::new(SpinLock::new(TimerState {145fired: false,146waker: None,147}));148149let thread_state = Arc::clone(&state);150thread::spawn(move || {151thread::sleep(dur);152let mut ts = thread_state.lock();153ts.fired = true;154if let Some(waker) = ts.waker.take() {155waker.wake();156}157drop(ts);158159if let Some(tx) = notify {160tx.send(()).expect("Failed to send completion notification");161}162});163164Timer { state }165}166167#[test]168fn it_works() {169block_on(start_timer(Duration::from_millis(100), None));170}171172#[test]173fn nested() {174async fn inner() {175block_on(start_timer(Duration::from_millis(100), None));176}177178block_on(inner());179}180181#[test]182fn ready_before_poll() {183let (tx, rx) = channel();184185let timer = start_timer(Duration::from_millis(50), Some(tx));186187rx.recv()188.expect("Failed to receive completion notification");189190// We know the timer has already fired so the poll should complete immediately.191block_on(timer);192}193}194195196