Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/group_by.rs
6940 views
1
use rayon::prelude::*;
2
3
use super::*;
4
5
pub(super) fn evaluate_aggs(
6
df: &DataFrame,
7
aggs: &[Arc<dyn PhysicalExpr>],
8
groups: &GroupPositions,
9
state: &ExecutionState,
10
) -> PolarsResult<Vec<Column>> {
11
POOL.install(|| {
12
aggs.par_iter()
13
.map(|expr| {
14
let agg = expr.evaluate_on_groups(df, groups, state)?.finalize();
15
polars_ensure!(agg.len() == groups.len(), agg_len = agg.len(), groups.len());
16
Ok(agg)
17
})
18
.collect::<PolarsResult<Vec<_>>>()
19
})
20
}
21
22
/// Take an input Executor and a multiple expressions
23
pub struct GroupByExec {
24
input: Box<dyn Executor>,
25
keys: Vec<Arc<dyn PhysicalExpr>>,
26
aggs: Vec<Arc<dyn PhysicalExpr>>,
27
apply: Option<PlanCallback<DataFrame, DataFrame>>,
28
maintain_order: bool,
29
input_schema: SchemaRef,
30
slice: Option<(i64, usize)>,
31
}
32
33
impl GroupByExec {
34
#[allow(clippy::too_many_arguments)]
35
pub(crate) fn new(
36
input: Box<dyn Executor>,
37
keys: Vec<Arc<dyn PhysicalExpr>>,
38
aggs: Vec<Arc<dyn PhysicalExpr>>,
39
apply: Option<PlanCallback<DataFrame, DataFrame>>,
40
maintain_order: bool,
41
input_schema: SchemaRef,
42
slice: Option<(i64, usize)>,
43
) -> Self {
44
Self {
45
input,
46
keys,
47
aggs,
48
apply,
49
maintain_order,
50
input_schema,
51
slice,
52
}
53
}
54
}
55
56
#[allow(clippy::too_many_arguments)]
57
pub(super) fn group_by_helper(
58
mut df: DataFrame,
59
keys: Vec<Column>,
60
aggs: &[Arc<dyn PhysicalExpr>],
61
apply: Option<PlanCallback<DataFrame, DataFrame>>,
62
state: &ExecutionState,
63
maintain_order: bool,
64
slice: Option<(i64, usize)>,
65
) -> PolarsResult<DataFrame> {
66
df.as_single_chunk_par();
67
let gb = df.group_by_with_series(keys, true, maintain_order)?;
68
69
if let Some(f) = apply {
70
return gb.sliced(slice).apply(move |df| f.call(df));
71
}
72
73
let mut groups = gb.get_groups();
74
75
#[allow(unused_assignments)]
76
// it is unused because we only use it to keep the lifetime of sliced_group valid
77
let mut sliced_groups = None;
78
79
if let Some((offset, len)) = slice {
80
sliced_groups = Some(groups.slice(offset, len));
81
groups = sliced_groups.as_ref().unwrap();
82
}
83
84
let (mut columns, agg_columns) = POOL.install(|| {
85
let get_columns = || gb.keys_sliced(slice);
86
87
let get_agg = || evaluate_aggs(&df, aggs, groups, state);
88
89
rayon::join(get_columns, get_agg)
90
});
91
92
columns.extend(agg_columns?);
93
DataFrame::new(columns)
94
}
95
96
impl GroupByExec {
97
fn execute_impl(&mut self, state: &ExecutionState, df: DataFrame) -> PolarsResult<DataFrame> {
98
let keys = self
99
.keys
100
.iter()
101
.map(|e| e.evaluate(&df, state))
102
.collect::<PolarsResult<_>>()?;
103
group_by_helper(
104
df,
105
keys,
106
&self.aggs,
107
self.apply.take(),
108
state,
109
self.maintain_order,
110
self.slice,
111
)
112
}
113
}
114
115
impl Executor for GroupByExec {
116
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
117
state.should_stop()?;
118
#[cfg(debug_assertions)]
119
{
120
if state.verbose() {
121
eprintln!("run GroupbyExec")
122
}
123
}
124
if state.verbose() {
125
eprintln!("keys/aggregates are not partitionable: running default HASH AGGREGATION")
126
}
127
let df = self.input.execute(state)?;
128
129
let profile_name = if state.has_node_timer() {
130
let by = self
131
.keys
132
.iter()
133
.map(|s| Ok(s.to_field(&self.input_schema)?.name))
134
.collect::<PolarsResult<Vec<_>>>()?;
135
let name = comma_delimited("group_by".to_string(), &by);
136
Cow::Owned(name)
137
} else {
138
Cow::Borrowed("")
139
};
140
141
if state.has_node_timer() {
142
let new_state = state.clone();
143
new_state.record(|| self.execute_impl(state, df), profile_name)
144
} else {
145
self.execute_impl(state, df)
146
}
147
}
148
}
149
150