Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/group_by_partitioned.rs
6940 views
1
use polars_core::series::IsSorted;
2
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
3
use rayon::prelude::*;
4
5
use super::*;
6
7
/// Take an input Executor and a multiple expressions
8
pub struct PartitionGroupByExec {
9
input: Box<dyn Executor>,
10
phys_keys: Vec<Arc<dyn PhysicalExpr>>,
11
phys_aggs: Vec<Arc<dyn PhysicalExpr>>,
12
maintain_order: bool,
13
slice: Option<(i64, usize)>,
14
input_schema: SchemaRef,
15
output_schema: SchemaRef,
16
from_partitioned_ds: bool,
17
#[allow(dead_code)]
18
keys: Vec<Expr>,
19
#[allow(dead_code)]
20
aggs: Vec<Expr>,
21
}
22
23
impl PartitionGroupByExec {
24
#[allow(clippy::too_many_arguments)]
25
pub(crate) fn new(
26
input: Box<dyn Executor>,
27
phys_keys: Vec<Arc<dyn PhysicalExpr>>,
28
phys_aggs: Vec<Arc<dyn PhysicalExpr>>,
29
maintain_order: bool,
30
slice: Option<(i64, usize)>,
31
input_schema: SchemaRef,
32
output_schema: SchemaRef,
33
from_partitioned_ds: bool,
34
keys: Vec<Expr>,
35
aggs: Vec<Expr>,
36
) -> Self {
37
Self {
38
input,
39
phys_keys,
40
phys_aggs,
41
maintain_order,
42
slice,
43
input_schema,
44
output_schema,
45
from_partitioned_ds,
46
keys,
47
aggs,
48
}
49
}
50
51
fn keys(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Vec<Column>> {
52
compute_keys(&self.phys_keys, df, state)
53
}
54
}
55
56
fn compute_keys(
57
keys: &[Arc<dyn PhysicalExpr>],
58
df: &DataFrame,
59
state: &ExecutionState,
60
) -> PolarsResult<Vec<Column>> {
61
let evaluated = keys
62
.iter()
63
.map(|s| s.evaluate(df, state))
64
.collect::<PolarsResult<_>>()?;
65
let df = check_expand_literals(df, keys, evaluated, false, Default::default())?;
66
Ok(df.take_columns())
67
}
68
69
fn run_partitions(
70
df: &mut DataFrame,
71
exec: &PartitionGroupByExec,
72
state: &ExecutionState,
73
n_threads: usize,
74
maintain_order: bool,
75
) -> PolarsResult<(Vec<DataFrame>, Vec<Vec<Column>>)> {
76
// We do a partitioned group_by.
77
// Meaning that we first do the group_by operation arbitrarily
78
// split on several threads. Than the final result we apply the same group_by again.
79
let dfs = split_df(df, n_threads, true);
80
81
let phys_aggs = &exec.phys_aggs;
82
let keys = &exec.phys_keys;
83
84
let mut keys = DataFrame::from_iter(compute_keys(keys, df, state)?);
85
let splitted_keys = split_df(&mut keys, n_threads, true);
86
87
POOL.install(|| {
88
dfs.into_par_iter()
89
.zip(splitted_keys)
90
.map(|(df, keys)| {
91
let gb = df.group_by_with_series(keys.into(), false, maintain_order)?;
92
let groups = gb.get_groups();
93
94
let mut columns = gb.keys();
95
// don't naively call par_iter here, it will segfault in rayon
96
// if you do, throw it on the POOL threadpool.
97
let agg_columns = phys_aggs
98
.iter()
99
.map(|expr| {
100
let agg_expr = expr.as_partitioned_aggregator().unwrap();
101
let agg = agg_expr.evaluate_partitioned(&df, groups, state)?;
102
Ok(if agg.len() != groups.len() {
103
polars_ensure!(agg.len() == 1, agg_len = agg.len(), groups.len());
104
match groups.len() {
105
0 => agg.clear(),
106
len => agg.new_from_index(0, len),
107
}
108
} else {
109
agg
110
}
111
.into_column())
112
})
113
.collect::<PolarsResult<Vec<_>>>()?;
114
115
columns.extend_from_slice(&agg_columns);
116
117
let df = DataFrame::new(columns)?;
118
Ok((df, gb.keys()))
119
})
120
.collect()
121
})
122
}
123
124
fn estimate_unique_count(keys: &[Column], mut sample_size: usize) -> PolarsResult<usize> {
125
// https://stats.stackexchange.com/a/19090/147321
126
// estimated unique size
127
// u + ui / m (s - m)
128
// s: set_size
129
// m: sample_size
130
// u: total unique groups counted in sample
131
// ui: groups with single unique value counted in sample
132
let set_size = keys[0].len();
133
if set_size < sample_size {
134
sample_size = set_size;
135
}
136
137
let finish = |groups: &GroupsType| {
138
let u = groups.len() as f64;
139
let ui = if groups.len() == sample_size {
140
u
141
} else {
142
groups.iter().filter(|g| g.len() == 1).count() as f64
143
};
144
145
(u + (ui / sample_size as f64) * (set_size - sample_size) as f64) as usize
146
};
147
148
if keys.len() == 1 {
149
// we sample as that will work also with sorted data.
150
// not that sampling without replacement is *very* expensive. don't do that.
151
let s = keys[0].sample_n(sample_size, true, false, None).unwrap();
152
// fast multi-threaded way to get unique.
153
let groups = s.as_materialized_series().group_tuples(true, false)?;
154
Ok(finish(&groups))
155
} else {
156
let offset = (keys[0].len() / 2) as i64;
157
let df = unsafe { DataFrame::new_no_checks_height_from_first(keys.to_vec()) };
158
let df = df.slice(offset, sample_size);
159
let names = df.get_column_names().into_iter().cloned();
160
let gb = df.group_by(names).unwrap();
161
Ok(finish(gb.get_groups()))
162
}
163
}
164
165
// Lower this at debug builds so that we hit this in the test suite.
166
#[cfg(debug_assertions)]
167
const PARTITION_LIMIT: usize = 15;
168
#[cfg(not(debug_assertions))]
169
const PARTITION_LIMIT: usize = 1000;
170
171
// Checks if we should run normal or default aggregation
172
// by sampling data.
173
fn can_run_partitioned(
174
keys: &[Column],
175
original_df: &DataFrame,
176
state: &ExecutionState,
177
from_partitioned_ds: bool,
178
) -> PolarsResult<bool> {
179
if !keys
180
.iter()
181
.take(1)
182
.all(|s| matches!(s.is_sorted_flag(), IsSorted::Not))
183
{
184
if state.verbose() {
185
eprintln!("FOUND SORTED KEY: running default HASH AGGREGATION")
186
}
187
Ok(false)
188
} else if std::env::var("POLARS_NO_PARTITION").is_ok() {
189
if state.verbose() {
190
eprintln!("POLARS_NO_PARTITION set: running default HASH AGGREGATION")
191
}
192
Ok(false)
193
} else if std::env::var("POLARS_FORCE_PARTITION").is_ok() {
194
if state.verbose() {
195
eprintln!("POLARS_FORCE_PARTITION set: running partitioned HASH AGGREGATION")
196
}
197
Ok(true)
198
} else if original_df.height() < PARTITION_LIMIT && !cfg!(test) {
199
if state.verbose() {
200
eprintln!("DATAFRAME < {PARTITION_LIMIT} rows: running default HASH AGGREGATION")
201
}
202
Ok(false)
203
} else {
204
// below this boundary we assume the partitioned group_by will be faster
205
let unique_count_boundary = std::env::var("POLARS_PARTITION_UNIQUE_COUNT")
206
.map(|s| s.parse::<usize>().unwrap())
207
.unwrap_or(1000);
208
209
let (unique_estimate, sampled_method) = match (keys.len(), keys[0].dtype()) {
210
#[cfg(feature = "dtype-categorical")]
211
(1, DataType::Categorical(_, mapping) | DataType::Enum(_, mapping)) => {
212
(mapping.num_cats_upper_bound(), "known")
213
},
214
_ => {
215
// sqrt(N) is a good sample size as it remains low on large numbers
216
// it is better than taking a fraction as it saturates
217
let sample_size = (original_df.height() as f64).powf(0.5) as usize;
218
219
// we never sample less than 100 data points.
220
let sample_size = std::cmp::max(100, sample_size);
221
(estimate_unique_count(keys, sample_size)?, "estimated")
222
},
223
};
224
if state.verbose() {
225
eprintln!("{sampled_method} unique values: {unique_estimate}");
226
}
227
228
if from_partitioned_ds {
229
let estimated_cardinality = unique_estimate as f32 / original_df.height() as f32;
230
if estimated_cardinality < 0.4 {
231
if state.verbose() {
232
eprintln!("PARTITIONED DS");
233
}
234
Ok(true)
235
} else {
236
if state.verbose() {
237
eprintln!(
238
"PARTITIONED DS: estimated cardinality: {estimated_cardinality} exceeded the boundary: 0.4, running default HASH AGGREGATION"
239
);
240
}
241
Ok(false)
242
}
243
} else if unique_estimate > unique_count_boundary {
244
if state.verbose() {
245
eprintln!(
246
"estimated unique count: {unique_estimate} exceeded the boundary: {unique_count_boundary}, running default HASH AGGREGATION"
247
)
248
}
249
Ok(false)
250
} else {
251
Ok(true)
252
}
253
}
254
}
255
256
impl PartitionGroupByExec {
257
fn execute_impl(
258
&mut self,
259
state: &mut ExecutionState,
260
mut original_df: DataFrame,
261
) -> PolarsResult<DataFrame> {
262
let (splitted_dfs, splitted_keys) = {
263
// already get the keys. This is the very last minute decision which group_by method we choose.
264
// If the column is a categorical, we know the number of groups we have and can decide to continue
265
// partitioned or go for the standard group_by. The partitioned is likely to be faster on a small number
266
// of groups.
267
let keys = self.keys(&original_df, state)?;
268
269
if !can_run_partitioned(&keys, &original_df, state, self.from_partitioned_ds)? {
270
return group_by_helper(
271
original_df,
272
keys,
273
&self.phys_aggs,
274
None,
275
state,
276
self.maintain_order,
277
self.slice,
278
);
279
}
280
281
if state.verbose() {
282
eprintln!("run PARTITIONED HASH AGGREGATION")
283
}
284
285
// Run the partitioned aggregations
286
let n_threads = POOL.current_num_threads();
287
288
run_partitions(
289
&mut original_df,
290
self,
291
state,
292
n_threads,
293
self.maintain_order,
294
)?
295
};
296
297
// MERGE phase
298
299
let df = accumulate_dataframes_vertical(splitted_dfs)?;
300
let keys = splitted_keys
301
.into_iter()
302
.reduce(|mut acc, e| {
303
acc.iter_mut().zip(e).for_each(|(acc, e)| {
304
let _ = acc.append(&e);
305
});
306
acc
307
})
308
.unwrap();
309
310
// the partitioned group_by has added columns so we must update the schema.
311
state.set_schema(self.output_schema.clone());
312
313
// merge and hash aggregate again
314
315
// first get mutable access and optionally sort
316
let gb = df.group_by_with_series(keys, true, self.maintain_order)?;
317
let mut groups = gb.get_groups();
318
319
#[allow(unused_assignments)]
320
// it is unused because we only use it to keep the lifetime of sliced_group valid
321
let mut sliced_groups = None;
322
323
if let Some((offset, len)) = self.slice {
324
sliced_groups = Some(groups.slice(offset, len));
325
groups = sliced_groups.as_ref().unwrap();
326
}
327
328
let get_columns = || gb.keys_sliced(self.slice);
329
let get_agg = || {
330
let out: PolarsResult<Vec<_>> = self
331
.phys_aggs
332
.par_iter()
333
// we slice the keys off and finalize every aggregation
334
.zip(&df.get_columns()[self.phys_keys.len()..])
335
.map(|(expr, partitioned_s)| {
336
let agg_expr = expr.as_partitioned_aggregator().unwrap();
337
agg_expr.finalize(partitioned_s.clone(), groups, state)
338
})
339
.collect();
340
341
out
342
};
343
let (mut columns, agg_columns): (Vec<_>, _) = POOL.join(get_columns, get_agg);
344
345
columns.extend(agg_columns?);
346
state.clear_schema_cache();
347
348
Ok(DataFrame::new(columns).unwrap())
349
}
350
}
351
352
impl Executor for PartitionGroupByExec {
353
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
354
state.should_stop()?;
355
#[cfg(debug_assertions)]
356
{
357
if state.verbose() {
358
eprintln!("run PartitionGroupbyExec")
359
}
360
}
361
let original_df = self.input.execute(state)?;
362
363
let profile_name = if state.has_node_timer() {
364
let by = self
365
.phys_keys
366
.iter()
367
.map(|s| Ok(s.to_field(&self.input_schema)?.name))
368
.collect::<PolarsResult<Vec<_>>>()?;
369
let name = comma_delimited("group_by_partitioned".to_string(), &by);
370
Cow::Owned(name)
371
} else {
372
Cow::Borrowed("")
373
};
374
if state.has_node_timer() {
375
let new_state = state.clone();
376
new_state.record(|| self.execute_impl(state, original_df), profile_name)
377
} else {
378
self.execute_impl(state, original_df)
379
}
380
}
381
}
382
383