Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/group_by.rs
8421 views
1
use rayon::prelude::*;
2
3
use super::*;
4
5
pub(super) fn evaluate_aggs(
6
df: &DataFrame,
7
aggs: &[Arc<dyn PhysicalExpr>],
8
groups: &GroupPositions,
9
state: &ExecutionState,
10
) -> PolarsResult<Vec<Column>> {
11
POOL.install(|| {
12
aggs.par_iter()
13
.map(|expr| {
14
let agg = expr.evaluate_on_groups(df, groups, state)?.finalize();
15
polars_ensure!(agg.len() == groups.len(), agg_len = agg.len(), groups.len());
16
Ok(agg)
17
})
18
.collect::<PolarsResult<Vec<_>>>()
19
})
20
}
21
22
/// Take an input Executor and a multiple expressions
23
pub struct GroupByExec {
24
input: Box<dyn Executor>,
25
keys: Vec<Arc<dyn PhysicalExpr>>,
26
aggs: Vec<Arc<dyn PhysicalExpr>>,
27
apply: Option<PlanCallback<DataFrame, DataFrame>>,
28
maintain_order: bool,
29
input_schema: SchemaRef,
30
output_schema: SchemaRef,
31
slice: Option<(i64, usize)>,
32
}
33
34
impl GroupByExec {
35
#[allow(clippy::too_many_arguments)]
36
pub(crate) fn new(
37
input: Box<dyn Executor>,
38
keys: Vec<Arc<dyn PhysicalExpr>>,
39
aggs: Vec<Arc<dyn PhysicalExpr>>,
40
apply: Option<PlanCallback<DataFrame, DataFrame>>,
41
maintain_order: bool,
42
input_schema: SchemaRef,
43
output_schema: SchemaRef,
44
slice: Option<(i64, usize)>,
45
) -> Self {
46
Self {
47
input,
48
keys,
49
aggs,
50
apply,
51
maintain_order,
52
input_schema,
53
output_schema,
54
slice,
55
}
56
}
57
}
58
59
#[allow(clippy::too_many_arguments)]
60
pub(super) fn group_by_helper(
61
mut df: DataFrame,
62
keys: Vec<Column>,
63
aggs: &[Arc<dyn PhysicalExpr>],
64
apply: Option<PlanCallback<DataFrame, DataFrame>>,
65
state: &ExecutionState,
66
maintain_order: bool,
67
output_schema: &SchemaRef,
68
slice: Option<(i64, usize)>,
69
) -> PolarsResult<DataFrame> {
70
df.rechunk_mut_par();
71
let gb = df.group_by_with_series(keys, true, maintain_order)?;
72
73
if let Some(f) = apply {
74
return gb.apply_sliced(slice, move |df| f.call(df), Some(output_schema));
75
}
76
77
let mut groups = gb.get_groups();
78
79
#[allow(unused_assignments)]
80
// it is unused because we only use it to keep the lifetime of sliced_group valid
81
let mut sliced_groups = None;
82
83
if let Some((offset, len)) = slice {
84
sliced_groups = Some(groups.slice(offset, len));
85
groups = sliced_groups.as_ref().unwrap();
86
}
87
88
let (mut columns, agg_columns) = POOL.install(|| {
89
let get_columns = || gb.keys_sliced(slice);
90
91
let get_agg = || evaluate_aggs(&df, aggs, groups, state);
92
93
rayon::join(get_columns, get_agg)
94
});
95
96
columns.extend(agg_columns?);
97
DataFrame::new_infer_height(columns)
98
}
99
100
impl GroupByExec {
101
fn execute_impl(&mut self, state: &ExecutionState, df: DataFrame) -> PolarsResult<DataFrame> {
102
let keys = self
103
.keys
104
.iter()
105
.map(|e| e.evaluate(&df, state))
106
.collect::<PolarsResult<_>>()?;
107
group_by_helper(
108
df,
109
keys,
110
&self.aggs,
111
self.apply.take(),
112
state,
113
self.maintain_order,
114
&self.output_schema,
115
self.slice,
116
)
117
}
118
}
119
120
impl Executor for GroupByExec {
121
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
122
state.should_stop()?;
123
#[cfg(debug_assertions)]
124
{
125
if state.verbose() {
126
eprintln!("run GroupbyExec")
127
}
128
}
129
if state.verbose() {
130
eprintln!("keys/aggregates are not partitionable: running default HASH AGGREGATION")
131
}
132
let df = self.input.execute(state)?;
133
134
let profile_name = if state.has_node_timer() {
135
let by = self
136
.keys
137
.iter()
138
.map(|s| Ok(s.to_field(&self.input_schema)?.name))
139
.collect::<PolarsResult<Vec<_>>>()?;
140
let name = comma_delimited("group_by".to_string(), &by);
141
Cow::Owned(name)
142
} else {
143
Cow::Borrowed("")
144
};
145
146
if state.has_node_timer() {
147
let new_state = state.clone();
148
new_state.record(|| self.execute_impl(state, df), profile_name)
149
} else {
150
self.execute_impl(state, df)
151
}
152
}
153
}
154
155