Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/async_primitives/morsel_linearizer.rs
6939 views
1
//! Saves us from typing `Priority<Reverse<...` everywhere.
2
use std::cmp::Reverse;
3
4
use polars_utils::priority::Priority;
5
6
use super::linearizer::{Inserter, Linearizer};
7
use crate::morsel::{Morsel, MorselSeq};
8
9
pub struct MorselLinearizer(Linearizer<Priority<Reverse<MorselSeq>, Morsel>>);
10
pub struct MorselInserter(Inserter<Priority<Reverse<MorselSeq>, Morsel>>);
11
12
impl MorselLinearizer {
13
pub fn new(num_inserters: usize, buffer_size: usize) -> (Self, Vec<MorselInserter>) {
14
let (lin, inserters) = Linearizer::new(num_inserters, buffer_size);
15
16
(
17
MorselLinearizer(lin),
18
inserters.into_iter().map(MorselInserter).collect(),
19
)
20
}
21
22
pub async fn get(&mut self) -> Option<Morsel> {
23
self.0.get().await.map(|x| x.1)
24
}
25
}
26
27
impl MorselInserter {
28
pub async fn insert(&mut self, morsel: Morsel) -> Result<(), Morsel> {
29
self.0
30
.insert(Priority(Reverse(morsel.seq()), morsel))
31
.await
32
.map_err(|Priority(_, v)| v)
33
}
34
}
35
36