Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/group_by_streaming.rs
8430 views
1
use std::borrow::Cow;
2
use std::sync::Arc;
3
4
use polars_core::frame::DataFrame;
5
#[cfg(feature = "dtype-categorical")]
6
use polars_core::prelude::DataType;
7
use polars_core::prelude::{Column, GroupsType};
8
use polars_core::schema::{Schema, SchemaRef};
9
use polars_core::series::IsSorted;
10
use polars_error::PolarsResult;
11
use polars_expr::prelude::PhysicalExpr;
12
use polars_expr::state::ExecutionState;
13
use polars_plan::plans::{AExpr, IR, IRPlan};
14
use polars_utils::arena::{Arena, Node};
15
16
use super::{Executor, check_expand_literals, group_by_helper};
17
use crate::StreamingExecutorBuilder;
18
19
pub struct GroupByStreamingExec {
20
input_exec: Box<dyn Executor>,
21
input_scan_node: Node,
22
plan: IRPlan,
23
builder: StreamingExecutorBuilder,
24
25
phys_keys: Vec<Arc<dyn PhysicalExpr>>,
26
phys_aggs: Vec<Arc<dyn PhysicalExpr>>,
27
maintain_order: bool,
28
output_schema: SchemaRef,
29
slice: Option<(i64, usize)>,
30
from_partitioned_ds: bool,
31
}
32
33
impl GroupByStreamingExec {
34
#[expect(clippy::too_many_arguments)]
35
pub fn new(
36
input: Box<dyn Executor>,
37
builder: StreamingExecutorBuilder,
38
root: Node,
39
lp_arena: &mut Arena<IR>,
40
expr_arena: &Arena<AExpr>,
41
42
phys_keys: Vec<Arc<dyn PhysicalExpr>>,
43
phys_aggs: Vec<Arc<dyn PhysicalExpr>>,
44
maintain_order: bool,
45
output_schema: SchemaRef,
46
slice: Option<(i64, usize)>,
47
from_partitioned_ds: bool,
48
) -> Self {
49
// Create a DataFrame scan for injecting the input result
50
let scan = lp_arena.add(IR::DataFrameScan {
51
df: Arc::new(DataFrame::empty()),
52
schema: Arc::new(Schema::default()),
53
output_schema: None,
54
});
55
56
let IR::GroupBy {
57
input: gb_input, ..
58
} = lp_arena.get_mut(root)
59
else {
60
unreachable!();
61
};
62
63
// Set the scan as the group by input
64
*gb_input = scan;
65
66
// Prune the subplan into separate arenas
67
let mut new_ir_arena = Arena::new();
68
let mut new_expr_arena = Arena::new();
69
let [new_root, new_scan] = polars_plan::plans::prune::prune(
70
&[root, scan],
71
lp_arena,
72
expr_arena,
73
&mut new_ir_arena,
74
&mut new_expr_arena,
75
)
76
.try_into()
77
.unwrap();
78
79
let plan = IRPlan {
80
lp_top: new_root,
81
lp_arena: new_ir_arena,
82
expr_arena: new_expr_arena,
83
};
84
85
Self {
86
input_exec: input,
87
input_scan_node: new_scan,
88
plan,
89
builder,
90
phys_keys,
91
phys_aggs,
92
maintain_order,
93
output_schema,
94
slice,
95
from_partitioned_ds,
96
}
97
}
98
99
fn keys(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Vec<Column>> {
100
compute_keys(&self.phys_keys, df, state)
101
}
102
}
103
104
fn compute_keys(
105
keys: &[Arc<dyn PhysicalExpr>],
106
df: &DataFrame,
107
state: &ExecutionState,
108
) -> PolarsResult<Vec<Column>> {
109
let evaluated = keys
110
.iter()
111
.map(|s| s.evaluate(df, state))
112
.collect::<PolarsResult<_>>()?;
113
let df = check_expand_literals(df, keys, evaluated, false, Default::default())?;
114
Ok(df.into_columns())
115
}
116
117
fn estimate_unique_count(keys: &[Column], mut sample_size: usize) -> PolarsResult<usize> {
118
// https://stats.stackexchange.com/a/19090/147321
119
// estimated unique size
120
// u + ui / m (s - m)
121
// s: set_size
122
// m: sample_size
123
// u: total unique groups counted in sample
124
// ui: groups with single unique value counted in sample
125
let set_size = keys[0].len();
126
if set_size < sample_size {
127
sample_size = set_size;
128
}
129
130
let finish = |groups: &GroupsType| {
131
let u = groups.len() as f64;
132
let ui = if groups.len() == sample_size {
133
u
134
} else {
135
groups.iter().filter(|g| g.len() == 1).count() as f64
136
};
137
138
(u + (ui / sample_size as f64) * (set_size - sample_size) as f64) as usize
139
};
140
141
if keys.len() == 1 {
142
// we sample as that will work also with sorted data.
143
// not that sampling without replacement is *very* expensive. don't do that.
144
let s = keys[0].sample_n(sample_size, true, false, None).unwrap();
145
// fast multi-threaded way to get unique.
146
let groups = s.as_materialized_series().group_tuples(true, false)?;
147
Ok(finish(&groups))
148
} else {
149
let offset = (keys[0].len() / 2) as i64;
150
let df = unsafe { DataFrame::new_unchecked_infer_height(keys.to_vec()) };
151
let df = df.slice(offset, sample_size);
152
let names = df.get_column_names().into_iter().cloned();
153
let gb = df.group_by(names).unwrap();
154
Ok(finish(gb.get_groups()))
155
}
156
}
157
158
// Lower this at debug builds so that we hit this in the test suite.
159
#[cfg(debug_assertions)]
160
const PARTITION_LIMIT: usize = 15;
161
#[cfg(not(debug_assertions))]
162
const PARTITION_LIMIT: usize = 1000;
163
164
// Checks if we should run normal or default aggregation
165
// by sampling data.
166
fn can_run_partitioned(
167
keys: &[Column],
168
original_df: &DataFrame,
169
state: &ExecutionState,
170
from_partitioned_ds: bool,
171
) -> PolarsResult<bool> {
172
if !keys
173
.iter()
174
.take(1)
175
.all(|s| matches!(s.is_sorted_flag(), IsSorted::Not))
176
{
177
if state.verbose() {
178
eprintln!("FOUND SORTED KEY: running default HASH AGGREGATION")
179
}
180
Ok(false)
181
} else if std::env::var("POLARS_NO_PARTITION").is_ok() {
182
if state.verbose() {
183
eprintln!("POLARS_NO_PARTITION set: running default HASH AGGREGATION")
184
}
185
Ok(false)
186
} else if std::env::var("POLARS_FORCE_PARTITION").is_ok() {
187
if state.verbose() {
188
eprintln!("POLARS_FORCE_PARTITION set: running partitioned HASH AGGREGATION")
189
}
190
Ok(true)
191
} else if original_df.height() < PARTITION_LIMIT && !cfg!(test) {
192
if state.verbose() {
193
eprintln!("DATAFRAME < {PARTITION_LIMIT} rows: running default HASH AGGREGATION")
194
}
195
Ok(false)
196
} else {
197
// below this boundary we assume the partitioned group_by will be faster
198
let unique_count_boundary = std::env::var("POLARS_PARTITION_UNIQUE_COUNT")
199
.map(|s| s.parse::<usize>().unwrap())
200
.unwrap_or(1000);
201
202
let (unique_estimate, sampled_method) = match (keys.len(), keys[0].dtype()) {
203
#[cfg(feature = "dtype-categorical")]
204
(1, DataType::Categorical(_, mapping) | DataType::Enum(_, mapping)) => {
205
(mapping.num_cats_upper_bound(), "known")
206
},
207
_ => {
208
// sqrt(N) is a good sample size as it remains low on large numbers
209
// it is better than taking a fraction as it saturates
210
let sample_size = (original_df.height() as f64).powf(0.5) as usize;
211
212
// we never sample less than 100 data points.
213
let sample_size = std::cmp::max(100, sample_size);
214
(estimate_unique_count(keys, sample_size)?, "estimated")
215
},
216
};
217
if state.verbose() {
218
eprintln!("{sampled_method} unique values: {unique_estimate}");
219
}
220
221
if from_partitioned_ds {
222
let estimated_cardinality = unique_estimate as f32 / original_df.height() as f32;
223
if estimated_cardinality < 0.4 {
224
if state.verbose() {
225
eprintln!("PARTITIONED DS");
226
}
227
Ok(true)
228
} else {
229
if state.verbose() {
230
eprintln!(
231
"PARTITIONED DS: estimated cardinality: {estimated_cardinality} exceeded the boundary: 0.4, running default HASH AGGREGATION"
232
);
233
}
234
Ok(false)
235
}
236
} else if unique_estimate > unique_count_boundary {
237
if state.verbose() {
238
eprintln!(
239
"estimated unique count: {unique_estimate} exceeded the boundary: {unique_count_boundary}, running default HASH AGGREGATION"
240
)
241
}
242
Ok(false)
243
} else {
244
Ok(true)
245
}
246
}
247
}
248
249
impl Executor for GroupByStreamingExec {
250
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
251
let name = "streaming_group_by";
252
state.should_stop()?;
253
#[cfg(debug_assertions)]
254
{
255
if state.verbose() {
256
eprintln!("run {name}")
257
}
258
}
259
let input_df = self.input_exec.execute(state)?;
260
261
let profile_name = if state.has_node_timer() {
262
Cow::Owned(format!(".{name}()"))
263
} else {
264
Cow::Borrowed("")
265
};
266
267
let keys = self.keys(&input_df, state)?;
268
269
if !can_run_partitioned(&keys, &input_df, state, self.from_partitioned_ds)? {
270
return group_by_helper(
271
input_df,
272
keys,
273
&self.phys_aggs,
274
None,
275
state,
276
self.maintain_order,
277
&self.output_schema,
278
self.slice,
279
);
280
}
281
282
// Insert the input DataFrame into our DataFrame scan node
283
if let IR::DataFrameScan { df, schema, .. } =
284
self.plan.lp_arena.get_mut(self.input_scan_node)
285
{
286
*schema = input_df.schema().clone();
287
*df = Arc::new(input_df);
288
} else {
289
unreachable!();
290
}
291
292
let mut streaming_exec = (self.builder)(
293
self.plan.lp_top,
294
&mut self.plan.lp_arena,
295
&mut self.plan.expr_arena,
296
)?;
297
298
state
299
.clone()
300
.record(|| streaming_exec.execute(state), profile_name)
301
}
302
}
303
304