Path: blob/main/crates/polars-stream/src/async_primitives/opt_spawned_future.rs
8395 views
use pin_project_lite::pin_project;1use polars_utils::{UnitVec, unitvec};23use crate::async_executor::{AbortOnDropHandle, TaskPriority, spawn};45pin_project! {6/// Represents a future that may either be local or spawned.7#[project = LocalOrSpawnedFutureProj]8pub enum LocalOrSpawnedFuture<F, O> {9Local { #[pin] fut: F },10Spawned { #[pin] handle: AbortOnDropHandle<O> }11}12}1314impl<F, O> LocalOrSpawnedFuture<F, O>15where16F: Future<Output = O>,17{18/// Wraps the future in a `Local` variant.19pub fn new_local(fut: F) -> Self {20LocalOrSpawnedFuture::Local { fut }21}22}2324impl<F, O> LocalOrSpawnedFuture<F, O>25where26F: Future<Output = O> + Send + 'static,27O: Send + 'static,28{29/// Spawns the future onto the async executor.30pub fn spawn(task_priority: TaskPriority, fut: F) -> Self {31LocalOrSpawnedFuture::Spawned {32handle: AbortOnDropHandle::new(spawn(task_priority, fut)),33}34}35}3637impl<F, O> Future for LocalOrSpawnedFuture<F, O>38where39F: Future<Output = O>,40{41type Output = O;4243fn poll(44self: std::pin::Pin<&mut Self>,45cx: &mut std::task::Context<'_>,46) -> std::task::Poll<Self::Output> {47match self.project() {48LocalOrSpawnedFutureProj::Local { fut } => fut.poll(cx),49LocalOrSpawnedFutureProj::Spawned { handle } => handle.poll(cx),50}51}52}5354/// Parallelizes futures across the computational async runtime.55///56/// As an optimization for cache access, the first future is kept on the current thread. If there57/// is only 1 future, then all data is kept on the current thread and spawn is not called at all.58///59/// Note this means the first future in the returned iterator does not run until polled.60///61/// Note that dropping the iterator will call abort on all spawned futures, as this is intended to be62/// used for compute.63pub fn parallelize_first_to_local<'i, 'o, I, F, O>(64task_priority: TaskPriority,65futures_iter: I,66) -> impl ExactSizeIterator<Item = impl Future<Output = O> + Send + 'static> + 'o67where68I: Iterator<Item = F> + 'i,69F: Future<Output = O> + Send + 'static,70O: Send + 'static,71{72parallelize_first_to_local_impl(task_priority, futures_iter).into_iter()73}7475fn parallelize_first_to_local_impl<I, F, O>(76task_priority: TaskPriority,77mut futures_iter: I,78) -> UnitVec<LocalOrSpawnedFuture<F, O>>79where80I: Iterator<Item = F>,81F: Future<Output = O> + Send + 'static,82O: Send + 'static,83{84let Some(first_fut) = futures_iter.next() else {85return UnitVec::new();86};8788let first_fut = LocalOrSpawnedFuture::new_local(first_fut);8990let Some(second_fut) = futures_iter.next() else {91return unitvec![first_fut];92};9394let mut futures = UnitVec::with_capacity(2 + futures_iter.size_hint().0);9596// Note:97// * The local future must come first to ensure we don't block polling it.98// * Remaining futures must all be spawned upfront into the Vec for them to run parallel.99futures.extend([100first_fut,101LocalOrSpawnedFuture::spawn(task_priority, second_fut),102]);103futures.extend(futures_iter.map(|x| LocalOrSpawnedFuture::spawn(task_priority, x)));104105futures106}107108109