Path: blob/main/crates/polars-stream/src/async_primitives/opt_spawned_future.rs
6939 views
use pin_project_lite::pin_project;1use polars_utils::enum_unit_vec::EnumUnitVec;23use crate::async_executor::{AbortOnDropHandle, TaskPriority, spawn};45pin_project! {6/// Represents a future that may either be local or spawned.7#[project = LocalOrSpawnedFutureProj]8pub enum LocalOrSpawnedFuture<F, O> {9Local { #[pin] fut: F },10Spawned { #[pin] handle: AbortOnDropHandle<O> }11}12}1314impl<F, O> LocalOrSpawnedFuture<F, O>15where16F: Future<Output = O>,17{18/// Wraps the future in a `Local` variant.19pub fn new_local(fut: F) -> Self {20LocalOrSpawnedFuture::Local { fut }21}22}2324impl<F, O> LocalOrSpawnedFuture<F, O>25where26F: Future<Output = O> + Send + 'static,27O: Send + 'static,28{29/// Spawns the future onto the async executor.30pub fn spawn(fut: F) -> Self {31LocalOrSpawnedFuture::Spawned {32handle: AbortOnDropHandle::new(spawn(TaskPriority::Low, fut)),33}34}35}3637impl<F, O> Future for LocalOrSpawnedFuture<F, O>38where39F: Future<Output = O>,40{41type Output = O;4243fn poll(44self: std::pin::Pin<&mut Self>,45cx: &mut std::task::Context<'_>,46) -> std::task::Poll<Self::Output> {47match self.project() {48LocalOrSpawnedFutureProj::Local { fut } => fut.poll(cx),49LocalOrSpawnedFutureProj::Spawned { handle } => handle.poll(cx),50}51}52}5354/// Parallelizes futures across the computational async runtime.55///56/// As an optimization for cache access, the first future is kept on the current thread. If there57/// is only 1 future, then all data is kept on the current thread and spawn is not called at all.58///59/// Note this means the first future in the returned iterator does not run until polled.60///61/// Note that dropping the iterator will call abort on all spawned futures, as this is intended to be62/// used for compute.63pub fn parallelize_first_to_local<I, F, O>(64futures_iter: I,65) -> impl ExactSizeIterator<Item = impl Future<Output = O> + Send + 'static>66where67I: Iterator<Item = F>,68F: Future<Output = O> + Send + 'static,69O: Send + 'static,70{71parallelize_first_to_local_impl(futures_iter).into_iter()72}7374fn parallelize_first_to_local_impl<I, F, O>(75mut futures_iter: I,76) -> EnumUnitVec<LocalOrSpawnedFuture<F, O>>77where78I: Iterator<Item = F>,79F: Future<Output = O> + Send + 'static,80O: Send + 'static,81{82let Some(first_fut) = futures_iter.next() else {83return EnumUnitVec::new();84};8586let first_fut = LocalOrSpawnedFuture::new_local(first_fut);8788let Some(second_fut) = futures_iter.next() else {89return EnumUnitVec::new_single(first_fut);90};9192let mut futures = Vec::with_capacity(2 + futures_iter.size_hint().0);9394// Note:95// * The local future must come first to ensure we don't block polling it.96// * Remaining futures must all be spawned upfront into the Vec for them to run parallel.97futures.extend([first_fut, LocalOrSpawnedFuture::spawn(second_fut)]);98futures.extend(futures_iter.map(LocalOrSpawnedFuture::spawn));99100EnumUnitVec::from(futures)101}102103104