Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/async_primitives/opt_spawned_future.rs
8395 views
1
use pin_project_lite::pin_project;
2
use polars_utils::{UnitVec, unitvec};
3
4
use crate::async_executor::{AbortOnDropHandle, TaskPriority, spawn};
5
6
pin_project! {
7
/// Represents a future that may either be local or spawned.
8
#[project = LocalOrSpawnedFutureProj]
9
pub enum LocalOrSpawnedFuture<F, O> {
10
Local { #[pin] fut: F },
11
Spawned { #[pin] handle: AbortOnDropHandle<O> }
12
}
13
}
14
15
impl<F, O> LocalOrSpawnedFuture<F, O>
16
where
17
F: Future<Output = O>,
18
{
19
/// Wraps the future in a `Local` variant.
20
pub fn new_local(fut: F) -> Self {
21
LocalOrSpawnedFuture::Local { fut }
22
}
23
}
24
25
impl<F, O> LocalOrSpawnedFuture<F, O>
26
where
27
F: Future<Output = O> + Send + 'static,
28
O: Send + 'static,
29
{
30
/// Spawns the future onto the async executor.
31
pub fn spawn(task_priority: TaskPriority, fut: F) -> Self {
32
LocalOrSpawnedFuture::Spawned {
33
handle: AbortOnDropHandle::new(spawn(task_priority, fut)),
34
}
35
}
36
}
37
38
impl<F, O> Future for LocalOrSpawnedFuture<F, O>
39
where
40
F: Future<Output = O>,
41
{
42
type Output = O;
43
44
fn poll(
45
self: std::pin::Pin<&mut Self>,
46
cx: &mut std::task::Context<'_>,
47
) -> std::task::Poll<Self::Output> {
48
match self.project() {
49
LocalOrSpawnedFutureProj::Local { fut } => fut.poll(cx),
50
LocalOrSpawnedFutureProj::Spawned { handle } => handle.poll(cx),
51
}
52
}
53
}
54
55
/// Parallelizes futures across the computational async runtime.
56
///
57
/// As an optimization for cache access, the first future is kept on the current thread. If there
58
/// is only 1 future, then all data is kept on the current thread and spawn is not called at all.
59
///
60
/// Note this means the first future in the returned iterator does not run until polled.
61
///
62
/// Note that dropping the iterator will call abort on all spawned futures, as this is intended to be
63
/// used for compute.
64
pub fn parallelize_first_to_local<'i, 'o, I, F, O>(
65
task_priority: TaskPriority,
66
futures_iter: I,
67
) -> impl ExactSizeIterator<Item = impl Future<Output = O> + Send + 'static> + 'o
68
where
69
I: Iterator<Item = F> + 'i,
70
F: Future<Output = O> + Send + 'static,
71
O: Send + 'static,
72
{
73
parallelize_first_to_local_impl(task_priority, futures_iter).into_iter()
74
}
75
76
fn parallelize_first_to_local_impl<I, F, O>(
77
task_priority: TaskPriority,
78
mut futures_iter: I,
79
) -> UnitVec<LocalOrSpawnedFuture<F, O>>
80
where
81
I: Iterator<Item = F>,
82
F: Future<Output = O> + Send + 'static,
83
O: Send + 'static,
84
{
85
let Some(first_fut) = futures_iter.next() else {
86
return UnitVec::new();
87
};
88
89
let first_fut = LocalOrSpawnedFuture::new_local(first_fut);
90
91
let Some(second_fut) = futures_iter.next() else {
92
return unitvec![first_fut];
93
};
94
95
let mut futures = UnitVec::with_capacity(2 + futures_iter.size_hint().0);
96
97
// Note:
98
// * The local future must come first to ensure we don't block polling it.
99
// * Remaining futures must all be spawned upfront into the Vec for them to run parallel.
100
futures.extend([
101
first_fut,
102
LocalOrSpawnedFuture::spawn(task_priority, second_fut),
103
]);
104
futures.extend(futures_iter.map(|x| LocalOrSpawnedFuture::spawn(task_priority, x)));
105
106
futures
107
}
108
109