Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/group_by_streaming.rs
7884 views
1
use std::borrow::Cow;
2
use std::sync::Arc;
3
4
use polars_core::frame::DataFrame;
5
#[cfg(feature = "dtype-categorical")]
6
use polars_core::prelude::DataType;
7
use polars_core::prelude::{Column, GroupsType};
8
use polars_core::schema::Schema;
9
use polars_core::series::IsSorted;
10
use polars_error::PolarsResult;
11
use polars_expr::prelude::PhysicalExpr;
12
use polars_expr::state::ExecutionState;
13
use polars_plan::plans::{AExpr, IR, IRPlan};
14
use polars_utils::arena::{Arena, Node};
15
16
use super::{Executor, check_expand_literals, group_by_helper};
17
use crate::StreamingExecutorBuilder;
18
19
pub struct GroupByStreamingExec {
20
input_exec: Box<dyn Executor>,
21
input_scan_node: Node,
22
plan: IRPlan,
23
builder: StreamingExecutorBuilder,
24
25
phys_keys: Vec<Arc<dyn PhysicalExpr>>,
26
phys_aggs: Vec<Arc<dyn PhysicalExpr>>,
27
maintain_order: bool,
28
slice: Option<(i64, usize)>,
29
from_partitioned_ds: bool,
30
}
31
32
impl GroupByStreamingExec {
33
#[expect(clippy::too_many_arguments)]
34
pub fn new(
35
input: Box<dyn Executor>,
36
builder: StreamingExecutorBuilder,
37
root: Node,
38
lp_arena: &mut Arena<IR>,
39
expr_arena: &Arena<AExpr>,
40
41
phys_keys: Vec<Arc<dyn PhysicalExpr>>,
42
phys_aggs: Vec<Arc<dyn PhysicalExpr>>,
43
maintain_order: bool,
44
slice: Option<(i64, usize)>,
45
from_partitioned_ds: bool,
46
) -> Self {
47
// Create a DataFrame scan for injecting the input result
48
let scan = lp_arena.add(IR::DataFrameScan {
49
df: Arc::new(DataFrame::empty()),
50
schema: Arc::new(Schema::default()),
51
output_schema: None,
52
});
53
54
let IR::GroupBy {
55
input: gb_input, ..
56
} = lp_arena.get_mut(root)
57
else {
58
unreachable!();
59
};
60
61
// Set the scan as the group by input
62
*gb_input = scan;
63
64
// Prune the subplan into separate arenas
65
let mut new_ir_arena = Arena::new();
66
let mut new_expr_arena = Arena::new();
67
let [new_root, new_scan] = polars_plan::plans::prune::prune(
68
&[root, scan],
69
lp_arena,
70
expr_arena,
71
&mut new_ir_arena,
72
&mut new_expr_arena,
73
)
74
.try_into()
75
.unwrap();
76
77
let plan = IRPlan {
78
lp_top: new_root,
79
lp_arena: new_ir_arena,
80
expr_arena: new_expr_arena,
81
};
82
83
Self {
84
input_exec: input,
85
input_scan_node: new_scan,
86
plan,
87
builder,
88
phys_keys,
89
phys_aggs,
90
maintain_order,
91
slice,
92
from_partitioned_ds,
93
}
94
}
95
96
fn keys(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Vec<Column>> {
97
compute_keys(&self.phys_keys, df, state)
98
}
99
}
100
101
fn compute_keys(
102
keys: &[Arc<dyn PhysicalExpr>],
103
df: &DataFrame,
104
state: &ExecutionState,
105
) -> PolarsResult<Vec<Column>> {
106
let evaluated = keys
107
.iter()
108
.map(|s| s.evaluate(df, state))
109
.collect::<PolarsResult<_>>()?;
110
let df = check_expand_literals(df, keys, evaluated, false, Default::default())?;
111
Ok(df.take_columns())
112
}
113
114
fn estimate_unique_count(keys: &[Column], mut sample_size: usize) -> PolarsResult<usize> {
115
// https://stats.stackexchange.com/a/19090/147321
116
// estimated unique size
117
// u + ui / m (s - m)
118
// s: set_size
119
// m: sample_size
120
// u: total unique groups counted in sample
121
// ui: groups with single unique value counted in sample
122
let set_size = keys[0].len();
123
if set_size < sample_size {
124
sample_size = set_size;
125
}
126
127
let finish = |groups: &GroupsType| {
128
let u = groups.len() as f64;
129
let ui = if groups.len() == sample_size {
130
u
131
} else {
132
groups.iter().filter(|g| g.len() == 1).count() as f64
133
};
134
135
(u + (ui / sample_size as f64) * (set_size - sample_size) as f64) as usize
136
};
137
138
if keys.len() == 1 {
139
// we sample as that will work also with sorted data.
140
// not that sampling without replacement is *very* expensive. don't do that.
141
let s = keys[0].sample_n(sample_size, true, false, None).unwrap();
142
// fast multi-threaded way to get unique.
143
let groups = s.as_materialized_series().group_tuples(true, false)?;
144
Ok(finish(&groups))
145
} else {
146
let offset = (keys[0].len() / 2) as i64;
147
let df = unsafe { DataFrame::new_no_checks_height_from_first(keys.to_vec()) };
148
let df = df.slice(offset, sample_size);
149
let names = df.get_column_names().into_iter().cloned();
150
let gb = df.group_by(names).unwrap();
151
Ok(finish(gb.get_groups()))
152
}
153
}
154
155
// Lower this at debug builds so that we hit this in the test suite.
156
#[cfg(debug_assertions)]
157
const PARTITION_LIMIT: usize = 15;
158
#[cfg(not(debug_assertions))]
159
const PARTITION_LIMIT: usize = 1000;
160
161
// Checks if we should run normal or default aggregation
162
// by sampling data.
163
fn can_run_partitioned(
164
keys: &[Column],
165
original_df: &DataFrame,
166
state: &ExecutionState,
167
from_partitioned_ds: bool,
168
) -> PolarsResult<bool> {
169
if !keys
170
.iter()
171
.take(1)
172
.all(|s| matches!(s.is_sorted_flag(), IsSorted::Not))
173
{
174
if state.verbose() {
175
eprintln!("FOUND SORTED KEY: running default HASH AGGREGATION")
176
}
177
Ok(false)
178
} else if std::env::var("POLARS_NO_PARTITION").is_ok() {
179
if state.verbose() {
180
eprintln!("POLARS_NO_PARTITION set: running default HASH AGGREGATION")
181
}
182
Ok(false)
183
} else if std::env::var("POLARS_FORCE_PARTITION").is_ok() {
184
if state.verbose() {
185
eprintln!("POLARS_FORCE_PARTITION set: running partitioned HASH AGGREGATION")
186
}
187
Ok(true)
188
} else if original_df.height() < PARTITION_LIMIT && !cfg!(test) {
189
if state.verbose() {
190
eprintln!("DATAFRAME < {PARTITION_LIMIT} rows: running default HASH AGGREGATION")
191
}
192
Ok(false)
193
} else {
194
// below this boundary we assume the partitioned group_by will be faster
195
let unique_count_boundary = std::env::var("POLARS_PARTITION_UNIQUE_COUNT")
196
.map(|s| s.parse::<usize>().unwrap())
197
.unwrap_or(1000);
198
199
let (unique_estimate, sampled_method) = match (keys.len(), keys[0].dtype()) {
200
#[cfg(feature = "dtype-categorical")]
201
(1, DataType::Categorical(_, mapping) | DataType::Enum(_, mapping)) => {
202
(mapping.num_cats_upper_bound(), "known")
203
},
204
_ => {
205
// sqrt(N) is a good sample size as it remains low on large numbers
206
// it is better than taking a fraction as it saturates
207
let sample_size = (original_df.height() as f64).powf(0.5) as usize;
208
209
// we never sample less than 100 data points.
210
let sample_size = std::cmp::max(100, sample_size);
211
(estimate_unique_count(keys, sample_size)?, "estimated")
212
},
213
};
214
if state.verbose() {
215
eprintln!("{sampled_method} unique values: {unique_estimate}");
216
}
217
218
if from_partitioned_ds {
219
let estimated_cardinality = unique_estimate as f32 / original_df.height() as f32;
220
if estimated_cardinality < 0.4 {
221
if state.verbose() {
222
eprintln!("PARTITIONED DS");
223
}
224
Ok(true)
225
} else {
226
if state.verbose() {
227
eprintln!(
228
"PARTITIONED DS: estimated cardinality: {estimated_cardinality} exceeded the boundary: 0.4, running default HASH AGGREGATION"
229
);
230
}
231
Ok(false)
232
}
233
} else if unique_estimate > unique_count_boundary {
234
if state.verbose() {
235
eprintln!(
236
"estimated unique count: {unique_estimate} exceeded the boundary: {unique_count_boundary}, running default HASH AGGREGATION"
237
)
238
}
239
Ok(false)
240
} else {
241
Ok(true)
242
}
243
}
244
}
245
246
impl Executor for GroupByStreamingExec {
247
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
248
let name = "streaming_group_by";
249
state.should_stop()?;
250
#[cfg(debug_assertions)]
251
{
252
if state.verbose() {
253
eprintln!("run {name}")
254
}
255
}
256
let input_df = self.input_exec.execute(state)?;
257
258
let profile_name = if state.has_node_timer() {
259
Cow::Owned(format!(".{name}()"))
260
} else {
261
Cow::Borrowed("")
262
};
263
264
let keys = self.keys(&input_df, state)?;
265
266
if !can_run_partitioned(&keys, &input_df, state, self.from_partitioned_ds)? {
267
return group_by_helper(
268
input_df,
269
keys,
270
&self.phys_aggs,
271
None,
272
state,
273
self.maintain_order,
274
self.slice,
275
);
276
}
277
278
// Insert the input DataFrame into our DataFrame scan node
279
if let IR::DataFrameScan { df, schema, .. } =
280
self.plan.lp_arena.get_mut(self.input_scan_node)
281
{
282
*schema = input_df.schema().clone();
283
*df = Arc::new(input_df);
284
} else {
285
unreachable!();
286
}
287
288
let mut streaming_exec = (self.builder)(
289
self.plan.lp_top,
290
&mut self.plan.lp_arena,
291
&mut self.plan.expr_arena,
292
)?;
293
294
state
295
.clone()
296
.record(|| streaming_exec.execute(state), profile_name)
297
}
298
}
299
300