use crate::TaskPool;1use alloc::vec::Vec;23mod adapters;4pub use adapters::*;56/// [`ParallelIterator`] closely emulates the `std::iter::Iterator`7/// interface. However, it uses `bevy_task` to compute batches in parallel.8///9/// Note that the overhead of [`ParallelIterator`] is high relative to some10/// workloads. In particular, if the batch size is too small or task being11/// run in parallel is inexpensive, *a [`ParallelIterator`] could take longer12/// than a normal [`Iterator`]*. Therefore, you should profile your code before13/// using [`ParallelIterator`].14pub trait ParallelIterator<BatchIter>15where16BatchIter: Iterator + Send,17Self: Sized + Send,18{19/// Returns the next batch of items for processing.20///21/// Each batch is an iterator with items of the same type as the22/// [`ParallelIterator`]. Returns `None` when there are no batches left.23fn next_batch(&mut self) -> Option<BatchIter>;2425/// Returns the bounds on the remaining number of items in the26/// parallel iterator.27///28/// See [`Iterator::size_hint()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.size_hint)29fn size_hint(&self) -> (usize, Option<usize>) {30(0, None)31}3233/// Consumes the parallel iterator and returns the number of items.34///35/// See [`Iterator::count()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.count)36fn count(mut self, pool: &TaskPool) -> usize {37pool.scope(|s| {38while let Some(batch) = self.next_batch() {39s.spawn(async move { batch.count() });40}41})42.iter()43.sum()44}4546/// Consumes the parallel iterator and returns the last item.47///48/// See [`Iterator::last()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.last)49fn last(mut self, _pool: &TaskPool) -> Option<BatchIter::Item> {50let mut last_item = None;51while let Some(batch) = self.next_batch() {52last_item = batch.last();53}54last_item55}5657/// Consumes the parallel iterator and returns the nth item.58///59/// See [`Iterator::nth()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.nth)60// TODO: Optimize with size_hint on each batch61fn nth(mut self, _pool: &TaskPool, n: usize) -> Option<BatchIter::Item> {62let mut i = 0;63while let Some(batch) = self.next_batch() {64for item in batch {65if i == n {66return Some(item);67}68i += 1;69}70}71None72}7374/// Takes two parallel iterators and returns a parallel iterators over75/// both in sequence.76///77/// See [`Iterator::chain()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.chain)78// TODO: Use IntoParallelIterator for U79fn chain<U>(self, other: U) -> Chain<Self, U>80where81U: ParallelIterator<BatchIter>,82{83Chain {84left: self,85right: other,86left_in_progress: true,87}88}8990/// Takes a closure and creates a parallel iterator which calls that91/// closure on each item.92///93/// See [`Iterator::map()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.map)94fn map<T, F>(self, f: F) -> Map<Self, F>95where96F: FnMut(BatchIter::Item) -> T + Send + Clone,97{98Map { iter: self, f }99}100101/// Calls a closure on each item of a parallel iterator.102///103/// See [`Iterator::for_each()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.for_each)104fn for_each<F>(mut self, pool: &TaskPool, f: F)105where106F: FnMut(BatchIter::Item) + Send + Clone + Sync,107{108pool.scope(|s| {109while let Some(batch) = self.next_batch() {110let newf = f.clone();111s.spawn(async move {112batch.for_each(newf);113});114}115});116}117118/// Creates a parallel iterator which uses a closure to determine119/// if an element should be yielded.120///121/// See [`Iterator::filter()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.filter)122fn filter<F>(self, predicate: F) -> Filter<Self, F>123where124F: FnMut(&BatchIter::Item) -> bool,125{126Filter {127iter: self,128predicate,129}130}131132/// Creates a parallel iterator that both filters and maps.133///134/// See [`Iterator::filter_map()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.filter_map)135fn filter_map<R, F>(self, f: F) -> FilterMap<Self, F>136where137F: FnMut(BatchIter::Item) -> Option<R>,138{139FilterMap { iter: self, f }140}141142/// Creates a parallel iterator that works like map, but flattens143/// nested structure.144///145/// See [`Iterator::flat_map()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.flat_map)146fn flat_map<U, F>(self, f: F) -> FlatMap<Self, F>147where148F: FnMut(BatchIter::Item) -> U,149U: IntoIterator,150{151FlatMap { iter: self, f }152}153154/// Creates a parallel iterator that flattens nested structure.155///156/// See [`Iterator::flatten()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.flatten)157fn flatten(self) -> Flatten<Self>158where159BatchIter::Item: IntoIterator,160{161Flatten { iter: self }162}163164/// Creates a parallel iterator which ends after the first None.165///166/// See [`Iterator::fuse()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fuse)167fn fuse(self) -> Fuse<Self> {168Fuse { iter: Some(self) }169}170171/// Does something with each item of a parallel iterator, passing172/// the value on.173///174/// See [`Iterator::inspect()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.inspect)175fn inspect<F>(self, f: F) -> Inspect<Self, F>176where177F: FnMut(&BatchIter::Item),178{179Inspect { iter: self, f }180}181182/// Borrows a parallel iterator, rather than consuming it.183///184/// See [`Iterator::by_ref()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.by_ref)185fn by_ref(&mut self) -> &mut Self {186self187}188189/// Transforms a parallel iterator into a collection.190///191/// See [`Iterator::collect()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.collect)192// TODO: Investigate optimizations for less copying193fn collect<C>(mut self, pool: &TaskPool) -> C194where195C: FromIterator<BatchIter::Item>,196BatchIter::Item: Send + 'static,197{198pool.scope(|s| {199while let Some(batch) = self.next_batch() {200s.spawn(async move { batch.collect::<Vec<_>>() });201}202})203.into_iter()204.flatten()205.collect()206}207208/// Consumes a parallel iterator, creating two collections from it.209///210/// See [`Iterator::partition()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.partition)211// TODO: Investigate optimizations for less copying212fn partition<C, F>(mut self, pool: &TaskPool, f: F) -> (C, C)213where214C: Default + Extend<BatchIter::Item> + Send,215F: FnMut(&BatchIter::Item) -> bool + Send + Sync + Clone,216BatchIter::Item: Send + 'static,217{218let (mut a, mut b) = <(C, C)>::default();219pool.scope(|s| {220while let Some(batch) = self.next_batch() {221let newf = f.clone();222s.spawn(async move { batch.partition::<Vec<_>, F>(newf) });223}224})225.into_iter()226.for_each(|(c, d)| {227a.extend(c);228b.extend(d);229});230(a, b)231}232233/// Repeatedly applies a function to items of each batch of a parallel234/// iterator, producing a Vec of final values.235///236/// *Note that this folds each batch independently and returns a Vec of237/// results (in batch order).*238///239/// See [`Iterator::fold()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold)240fn fold<C, F, D>(mut self, pool: &TaskPool, init: C, f: F) -> Vec<C>241where242F: FnMut(C, BatchIter::Item) -> C + Send + Sync + Clone,243C: Clone + Send + Sync + 'static,244{245pool.scope(|s| {246while let Some(batch) = self.next_batch() {247let newf = f.clone();248let newi = init.clone();249s.spawn(async move { batch.fold(newi, newf) });250}251})252}253254/// Tests if every element of the parallel iterator matches a predicate.255///256/// *Note that all is **not** short circuiting.*257///258/// See [`Iterator::all()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.all)259fn all<F>(mut self, pool: &TaskPool, f: F) -> bool260where261F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone,262{263pool.scope(|s| {264while let Some(mut batch) = self.next_batch() {265let newf = f.clone();266s.spawn(async move { batch.all(newf) });267}268})269.into_iter()270.all(core::convert::identity)271}272273/// Tests if any element of the parallel iterator matches a predicate.274///275/// *Note that any is **not** short circuiting.*276///277/// See [`Iterator::any()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.any)278fn any<F>(mut self, pool: &TaskPool, f: F) -> bool279where280F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone,281{282pool.scope(|s| {283while let Some(mut batch) = self.next_batch() {284let newf = f.clone();285s.spawn(async move { batch.any(newf) });286}287})288.into_iter()289.any(core::convert::identity)290}291292/// Searches for an element in a parallel iterator, returning its index.293///294/// *Note that position consumes the whole iterator.*295///296/// See [`Iterator::position()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.position)297// TODO: Investigate optimizations for less copying298fn position<F>(mut self, pool: &TaskPool, f: F) -> Option<usize>299where300F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone,301{302let poses = pool.scope(|s| {303while let Some(batch) = self.next_batch() {304let mut newf = f.clone();305s.spawn(async move {306let mut len = 0;307let mut pos = None;308for item in batch {309if pos.is_none() && newf(item) {310pos = Some(len);311}312len += 1;313}314(len, pos)315});316}317});318let mut start = 0;319for (len, pos) in poses {320if let Some(pos) = pos {321return Some(start + pos);322}323start += len;324}325None326}327328/// Returns the maximum item of a parallel iterator.329///330/// See [`Iterator::max()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.max)331fn max(mut self, pool: &TaskPool) -> Option<BatchIter::Item>332where333BatchIter::Item: Ord + Send + 'static,334{335pool.scope(|s| {336while let Some(batch) = self.next_batch() {337s.spawn(async move { batch.max() });338}339})340.into_iter()341.flatten()342.max()343}344345/// Returns the minimum item of a parallel iterator.346///347/// See [`Iterator::min()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.min)348fn min(mut self, pool: &TaskPool) -> Option<BatchIter::Item>349where350BatchIter::Item: Ord + Send + 'static,351{352pool.scope(|s| {353while let Some(batch) = self.next_batch() {354s.spawn(async move { batch.min() });355}356})357.into_iter()358.flatten()359.min()360}361362/// Returns the item that gives the maximum value from the specified function.363///364/// See [`Iterator::max_by_key()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.max_by_key)365fn max_by_key<R, F>(mut self, pool: &TaskPool, f: F) -> Option<BatchIter::Item>366where367R: Ord,368F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone,369BatchIter::Item: Send + 'static,370{371pool.scope(|s| {372while let Some(batch) = self.next_batch() {373let newf = f.clone();374s.spawn(async move { batch.max_by_key(newf) });375}376})377.into_iter()378.flatten()379.max_by_key(f)380}381382/// Returns the item that gives the maximum value with respect to the specified comparison383/// function.384///385/// See [`Iterator::max_by()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.max_by)386fn max_by<F>(mut self, pool: &TaskPool, f: F) -> Option<BatchIter::Item>387where388F: FnMut(&BatchIter::Item, &BatchIter::Item) -> core::cmp::Ordering + Send + Sync + Clone,389BatchIter::Item: Send + 'static,390{391pool.scope(|s| {392while let Some(batch) = self.next_batch() {393let newf = f.clone();394s.spawn(async move { batch.max_by(newf) });395}396})397.into_iter()398.flatten()399.max_by(f)400}401402/// Returns the item that gives the minimum value from the specified function.403///404/// See [`Iterator::min_by_key()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.min_by_key)405fn min_by_key<R, F>(mut self, pool: &TaskPool, f: F) -> Option<BatchIter::Item>406where407R: Ord,408F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone,409BatchIter::Item: Send + 'static,410{411pool.scope(|s| {412while let Some(batch) = self.next_batch() {413let newf = f.clone();414s.spawn(async move { batch.min_by_key(newf) });415}416})417.into_iter()418.flatten()419.min_by_key(f)420}421422/// Returns the item that gives the minimum value with respect to the specified comparison423/// function.424///425/// See [`Iterator::min_by()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.min_by)426fn min_by<F>(mut self, pool: &TaskPool, f: F) -> Option<BatchIter::Item>427where428F: FnMut(&BatchIter::Item, &BatchIter::Item) -> core::cmp::Ordering + Send + Sync + Clone,429BatchIter::Item: Send + 'static,430{431pool.scope(|s| {432while let Some(batch) = self.next_batch() {433let newf = f.clone();434s.spawn(async move { batch.min_by(newf) });435}436})437.into_iter()438.flatten()439.min_by(f)440}441442/// Creates a parallel iterator which copies all of its items.443///444/// See [`Iterator::copied()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.copied)445fn copied<'a, T>(self) -> Copied<Self>446where447Self: ParallelIterator<BatchIter>,448T: 'a + Copy,449{450Copied { iter: self }451}452453/// Creates a parallel iterator which clones all of its items.454///455/// See [`Iterator::cloned()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.cloned)456fn cloned<'a, T>(self) -> Cloned<Self>457where458Self: ParallelIterator<BatchIter>,459T: 'a + Copy,460{461Cloned { iter: self }462}463464/// Repeats a parallel iterator endlessly.465///466/// See [`Iterator::cycle()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.cycle)467fn cycle(self) -> Cycle<Self>468where469Self: Clone,470{471Cycle {472iter: self,473curr: None,474}475}476477/// Sums the items of a parallel iterator.478///479/// See [`Iterator::sum()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.sum)480fn sum<S, R>(mut self, pool: &TaskPool) -> R481where482S: core::iter::Sum<BatchIter::Item> + Send + 'static,483R: core::iter::Sum<S>,484{485pool.scope(|s| {486while let Some(batch) = self.next_batch() {487s.spawn(async move { batch.sum() });488}489})490.into_iter()491.sum()492}493494/// Multiplies all the items of a parallel iterator.495///496/// See [`Iterator::product()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.product)497fn product<S, R>(mut self, pool: &TaskPool) -> R498where499S: core::iter::Product<BatchIter::Item> + Send + 'static,500R: core::iter::Product<S>,501{502pool.scope(|s| {503while let Some(batch) = self.next_batch() {504s.spawn(async move { batch.product() });505}506})507.into_iter()508.product()509}510}511512513