Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/morsel.rs
6939 views
1
use std::future::Future;
2
use std::sync::{Arc, OnceLock};
3
4
use polars_core::frame::DataFrame;
5
use polars_utils::relaxed_cell::RelaxedCell;
6
7
use crate::async_primitives::wait_group::WaitToken;
8
9
static IDEAL_MORSEL_SIZE: OnceLock<usize> = OnceLock::new();
10
11
pub fn get_ideal_morsel_size() -> usize {
12
*IDEAL_MORSEL_SIZE.get_or_init(|| {
13
std::env::var("POLARS_IDEAL_MORSEL_SIZE")
14
.map(|m| m.parse().unwrap())
15
.unwrap_or(100_000)
16
})
17
}
18
19
/// A token indicating the order of morsels in a stream.
20
///
21
/// The sequence tokens going through a pipe are monotonely non-decreasing and are allowed to be
22
/// discontinuous. Consequently, `1 -> 1 -> 2` and `1 -> 3 -> 5` are valid streams of sequence
23
/// tokens.
24
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Default)]
25
pub struct MorselSeq(u64);
26
27
impl MorselSeq {
28
// TODO: use least significant bit to indicate 'last morsel with this
29
// sequence number'.
30
pub fn new(seq: u64) -> Self {
31
Self(seq.checked_mul(2).unwrap())
32
}
33
34
// The morsel sequence id which comes after this morsel.
35
pub fn successor(self) -> Self {
36
// We increment by two because in the future we want to use the least
37
// significant bit to indicate the final morsel with that sequence id.
38
Self(self.0.checked_add(2).unwrap())
39
}
40
41
// Ensures this morsel sequence comes after the offset.
42
pub fn offset_by(self, offset: Self) -> Self {
43
Self(self.0 + offset.0)
44
}
45
46
pub fn offset_by_u64(self, offset: u64) -> Self {
47
Self(self.0 + 2 * offset)
48
}
49
50
pub fn to_u64(self) -> u64 {
51
self.0
52
}
53
}
54
55
/// A token indicating which source this morsel originated from, and a way to
56
/// pass information/signals to it. Currently it's only used to request a source
57
/// to stop with passing new morsels this execution phase.
58
#[derive(Clone, Debug)]
59
pub struct SourceToken {
60
stop: Arc<RelaxedCell<bool>>,
61
}
62
63
impl SourceToken {
64
pub fn new() -> Self {
65
Self {
66
stop: Arc::default(),
67
}
68
}
69
70
pub fn stop(&self) {
71
self.stop.store(true);
72
}
73
74
pub fn stop_requested(&self) -> bool {
75
self.stop.load()
76
}
77
}
78
79
#[derive(Debug, Clone)]
80
pub struct Morsel {
81
/// The data contained in this morsel.
82
df: DataFrame,
83
84
/// The sequence number of this morsel. May only stay equal or increase
85
/// within a pipeline.
86
seq: MorselSeq,
87
88
/// A token that indicates which source this morsel originates from.
89
source_token: SourceToken,
90
91
/// Used to notify someone when this morsel is consumed, to provide backpressure.
92
consume_token: Option<WaitToken>,
93
}
94
95
impl Morsel {
96
pub fn new(df: DataFrame, seq: MorselSeq, source_token: SourceToken) -> Self {
97
Self {
98
df,
99
seq,
100
source_token,
101
consume_token: None,
102
}
103
}
104
105
#[allow(unused)]
106
pub fn into_inner(self) -> (DataFrame, MorselSeq, SourceToken, Option<WaitToken>) {
107
(self.df, self.seq, self.source_token, self.consume_token)
108
}
109
110
pub fn into_df(self) -> DataFrame {
111
self.df
112
}
113
114
pub fn df(&self) -> &DataFrame {
115
&self.df
116
}
117
118
pub fn df_mut(&mut self) -> &mut DataFrame {
119
&mut self.df
120
}
121
122
pub fn seq(&self) -> MorselSeq {
123
self.seq
124
}
125
126
pub fn set_seq(&mut self, seq: MorselSeq) {
127
self.seq = seq;
128
}
129
130
#[allow(unused)]
131
pub fn map<F: FnOnce(DataFrame) -> DataFrame>(mut self, f: F) -> Self {
132
self.df = f(self.df);
133
self
134
}
135
136
pub fn try_map<E, F: FnOnce(DataFrame) -> Result<DataFrame, E>>(
137
mut self,
138
f: F,
139
) -> Result<Self, E> {
140
self.df = f(self.df)?;
141
Ok(self)
142
}
143
144
pub async fn async_try_map<E, M, F>(mut self, f: M) -> Result<Self, E>
145
where
146
M: FnOnce(DataFrame) -> F,
147
F: Future<Output = Result<DataFrame, E>>,
148
{
149
self.df = f(self.df).await?;
150
Ok(self)
151
}
152
153
pub fn set_consume_token(&mut self, token: WaitToken) {
154
self.consume_token = Some(token);
155
}
156
157
pub fn take_consume_token(&mut self) -> Option<WaitToken> {
158
self.consume_token.take()
159
}
160
161
pub fn source_token(&self) -> &SourceToken {
162
&self.source_token
163
}
164
165
pub fn replace_source_token(&mut self, new_token: SourceToken) -> SourceToken {
166
core::mem::replace(&mut self.source_token, new_token)
167
}
168
}
169
170