Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/async_primitives/opt_spawned_future.rs
6939 views
1
use pin_project_lite::pin_project;
2
use polars_utils::enum_unit_vec::EnumUnitVec;
3
4
use crate::async_executor::{AbortOnDropHandle, TaskPriority, spawn};
5
6
pin_project! {
7
/// Represents a future that may either be local or spawned.
8
#[project = LocalOrSpawnedFutureProj]
9
pub enum LocalOrSpawnedFuture<F, O> {
10
Local { #[pin] fut: F },
11
Spawned { #[pin] handle: AbortOnDropHandle<O> }
12
}
13
}
14
15
impl<F, O> LocalOrSpawnedFuture<F, O>
16
where
17
F: Future<Output = O>,
18
{
19
/// Wraps the future in a `Local` variant.
20
pub fn new_local(fut: F) -> Self {
21
LocalOrSpawnedFuture::Local { fut }
22
}
23
}
24
25
impl<F, O> LocalOrSpawnedFuture<F, O>
26
where
27
F: Future<Output = O> + Send + 'static,
28
O: Send + 'static,
29
{
30
/// Spawns the future onto the async executor.
31
pub fn spawn(fut: F) -> Self {
32
LocalOrSpawnedFuture::Spawned {
33
handle: AbortOnDropHandle::new(spawn(TaskPriority::Low, fut)),
34
}
35
}
36
}
37
38
impl<F, O> Future for LocalOrSpawnedFuture<F, O>
39
where
40
F: Future<Output = O>,
41
{
42
type Output = O;
43
44
fn poll(
45
self: std::pin::Pin<&mut Self>,
46
cx: &mut std::task::Context<'_>,
47
) -> std::task::Poll<Self::Output> {
48
match self.project() {
49
LocalOrSpawnedFutureProj::Local { fut } => fut.poll(cx),
50
LocalOrSpawnedFutureProj::Spawned { handle } => handle.poll(cx),
51
}
52
}
53
}
54
55
/// Parallelizes futures across the computational async runtime.
56
///
57
/// As an optimization for cache access, the first future is kept on the current thread. If there
58
/// is only 1 future, then all data is kept on the current thread and spawn is not called at all.
59
///
60
/// Note this means the first future in the returned iterator does not run until polled.
61
///
62
/// Note that dropping the iterator will call abort on all spawned futures, as this is intended to be
63
/// used for compute.
64
pub fn parallelize_first_to_local<I, F, O>(
65
futures_iter: I,
66
) -> impl ExactSizeIterator<Item = impl Future<Output = O> + Send + 'static>
67
where
68
I: Iterator<Item = F>,
69
F: Future<Output = O> + Send + 'static,
70
O: Send + 'static,
71
{
72
parallelize_first_to_local_impl(futures_iter).into_iter()
73
}
74
75
fn parallelize_first_to_local_impl<I, F, O>(
76
mut futures_iter: I,
77
) -> EnumUnitVec<LocalOrSpawnedFuture<F, O>>
78
where
79
I: Iterator<Item = F>,
80
F: Future<Output = O> + Send + 'static,
81
O: Send + 'static,
82
{
83
let Some(first_fut) = futures_iter.next() else {
84
return EnumUnitVec::new();
85
};
86
87
let first_fut = LocalOrSpawnedFuture::new_local(first_fut);
88
89
let Some(second_fut) = futures_iter.next() else {
90
return EnumUnitVec::new_single(first_fut);
91
};
92
93
let mut futures = Vec::with_capacity(2 + futures_iter.size_hint().0);
94
95
// Note:
96
// * The local future must come first to ensure we don't block polling it.
97
// * Remaining futures must all be spawned upfront into the Vec for them to run parallel.
98
futures.extend([first_fut, LocalOrSpawnedFuture::spawn(second_fut)]);
99
futures.extend(futures_iter.map(LocalOrSpawnedFuture::spawn));
100
101
EnumUnitVec::from(futures)
102
}
103
104