Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_tasks/src/thread_executor.rs
6604 views
1
use core::marker::PhantomData;
2
use std::thread::{self, ThreadId};
3
4
use crate::executor::Executor;
5
use async_task::Task;
6
use futures_lite::Future;
7
8
/// An executor that can only be ticked on the thread it was instantiated on. But
9
/// can spawn `Send` tasks from other threads.
10
///
11
/// # Example
12
/// ```
13
/// # use std::sync::{Arc, atomic::{AtomicI32, Ordering}};
14
/// use bevy_tasks::ThreadExecutor;
15
///
16
/// let thread_executor = ThreadExecutor::new();
17
/// let count = Arc::new(AtomicI32::new(0));
18
///
19
/// // create some owned values that can be moved into another thread
20
/// let count_clone = count.clone();
21
///
22
/// std::thread::scope(|scope| {
23
/// scope.spawn(|| {
24
/// // we cannot get the ticker from another thread
25
/// let not_thread_ticker = thread_executor.ticker();
26
/// assert!(not_thread_ticker.is_none());
27
///
28
/// // but we can spawn tasks from another thread
29
/// thread_executor.spawn(async move {
30
/// count_clone.fetch_add(1, Ordering::Relaxed);
31
/// }).detach();
32
/// });
33
/// });
34
///
35
/// // the tasks do not make progress unless the executor is manually ticked
36
/// assert_eq!(count.load(Ordering::Relaxed), 0);
37
///
38
/// // tick the ticker until task finishes
39
/// let thread_ticker = thread_executor.ticker().unwrap();
40
/// thread_ticker.try_tick();
41
/// assert_eq!(count.load(Ordering::Relaxed), 1);
42
/// ```
43
#[derive(Debug)]
44
pub struct ThreadExecutor<'task> {
45
executor: Executor<'task>,
46
thread_id: ThreadId,
47
}
48
49
impl<'task> Default for ThreadExecutor<'task> {
50
fn default() -> Self {
51
Self {
52
executor: Executor::new(),
53
thread_id: thread::current().id(),
54
}
55
}
56
}
57
58
impl<'task> ThreadExecutor<'task> {
59
/// create a new [`ThreadExecutor`]
60
pub fn new() -> Self {
61
Self::default()
62
}
63
64
/// Spawn a task on the thread executor
65
pub fn spawn<T: Send + 'task>(
66
&self,
67
future: impl Future<Output = T> + Send + 'task,
68
) -> Task<T> {
69
self.executor.spawn(future)
70
}
71
72
/// Gets the [`ThreadExecutorTicker`] for this executor.
73
/// Use this to tick the executor.
74
/// It only returns the ticker if it's on the thread the executor was created on
75
/// and returns `None` otherwise.
76
pub fn ticker<'ticker>(&'ticker self) -> Option<ThreadExecutorTicker<'task, 'ticker>> {
77
if thread::current().id() == self.thread_id {
78
return Some(ThreadExecutorTicker {
79
executor: self,
80
_marker: PhantomData,
81
});
82
}
83
None
84
}
85
86
/// Returns true if `self` and `other`'s executor is same
87
pub fn is_same(&self, other: &Self) -> bool {
88
core::ptr::eq(self, other)
89
}
90
}
91
92
/// Used to tick the [`ThreadExecutor`]. The executor does not
93
/// make progress unless it is manually ticked on the thread it was
94
/// created on.
95
#[derive(Debug)]
96
pub struct ThreadExecutorTicker<'task, 'ticker> {
97
executor: &'ticker ThreadExecutor<'task>,
98
// make type not send or sync
99
_marker: PhantomData<*const ()>,
100
}
101
102
impl<'task, 'ticker> ThreadExecutorTicker<'task, 'ticker> {
103
/// Tick the thread executor.
104
pub async fn tick(&self) {
105
self.executor.executor.tick().await;
106
}
107
108
/// Synchronously try to tick a task on the executor.
109
/// Returns false if does not find a task to tick.
110
pub fn try_tick(&self) -> bool {
111
self.executor.executor.try_tick()
112
}
113
}
114
115
#[cfg(test)]
116
mod tests {
117
use super::*;
118
use alloc::sync::Arc;
119
120
#[test]
121
fn test_ticker() {
122
let executor = Arc::new(ThreadExecutor::new());
123
let ticker = executor.ticker();
124
assert!(ticker.is_some());
125
126
thread::scope(|s| {
127
s.spawn(|| {
128
let ticker = executor.ticker();
129
assert!(ticker.is_none());
130
});
131
});
132
}
133
}
134
135