Path: blob/main/crates/polars-stream/src/async_primitives/morsel_linearizer.rs
6939 views
//! Saves us from typing `Priority<Reverse<...` everywhere.1use std::cmp::Reverse;23use polars_utils::priority::Priority;45use super::linearizer::{Inserter, Linearizer};6use crate::morsel::{Morsel, MorselSeq};78pub struct MorselLinearizer(Linearizer<Priority<Reverse<MorselSeq>, Morsel>>);9pub struct MorselInserter(Inserter<Priority<Reverse<MorselSeq>, Morsel>>);1011impl MorselLinearizer {12pub fn new(num_inserters: usize, buffer_size: usize) -> (Self, Vec<MorselInserter>) {13let (lin, inserters) = Linearizer::new(num_inserters, buffer_size);1415(16MorselLinearizer(lin),17inserters.into_iter().map(MorselInserter).collect(),18)19}2021pub async fn get(&mut self) -> Option<Morsel> {22self.0.get().await.map(|x| x.1)23}24}2526impl MorselInserter {27pub async fn insert(&mut self, morsel: Morsel) -> Result<(), Morsel> {28self.029.insert(Priority(Reverse(morsel.seq()), morsel))30.await31.map_err(|Priority(_, v)| v)32}33}343536