Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/group_by_dynamic.rs
8422 views
1
use super::*;
2
3
#[cfg_attr(not(feature = "dynamic_group_by"), allow(dead_code))]
4
pub(crate) struct GroupByDynamicExec {
5
pub(crate) input: Box<dyn Executor>,
6
// we will use this later
7
#[allow(dead_code)]
8
pub(crate) keys: Vec<Arc<dyn PhysicalExpr>>,
9
pub(crate) aggs: Vec<Arc<dyn PhysicalExpr>>,
10
#[cfg(feature = "dynamic_group_by")]
11
pub(crate) options: DynamicGroupOptions,
12
pub(crate) input_schema: SchemaRef,
13
pub(crate) output_schema: SchemaRef,
14
pub(crate) slice: Option<(i64, usize)>,
15
pub(crate) apply: Option<PlanCallback<DataFrame, DataFrame>>,
16
}
17
18
impl GroupByDynamicExec {
19
#[cfg(feature = "dynamic_group_by")]
20
fn execute_impl(
21
&mut self,
22
state: &ExecutionState,
23
mut df: DataFrame,
24
) -> PolarsResult<DataFrame> {
25
use crate::executors::group_by_rolling::sort_and_groups;
26
27
df.rechunk_mut_par();
28
29
let mut keys = self
30
.keys
31
.iter()
32
.map(|e| e.evaluate(&df, state))
33
.collect::<PolarsResult<Vec<_>>>()?;
34
35
let group_by = if !self.keys.is_empty() {
36
Some(sort_and_groups(&mut df, &mut keys)?)
37
} else {
38
None
39
};
40
41
let (mut time_key, bounds, groups) = df.group_by_dynamic(group_by, &self.options)?;
42
POOL.install(|| {
43
keys.iter_mut().for_each(|key| {
44
unsafe { *key = key.agg_first(&groups) };
45
})
46
});
47
keys.extend(bounds);
48
49
if let Some(f) = &self.apply {
50
let gb = GroupBy::new(&df, vec![], groups, None);
51
return gb.apply_sliced(self.slice, move |df| f.call(df), Some(&self.output_schema));
52
}
53
54
let mut groups = &groups;
55
#[allow(unused_assignments)]
56
// it is unused because we only use it to keep the lifetime of sliced_group valid
57
let mut sliced_groups = None;
58
59
if let Some((offset, len)) = self.slice {
60
sliced_groups = Some(groups.slice(offset, len));
61
groups = sliced_groups.as_ref().unwrap();
62
63
time_key = time_key.slice(offset, len);
64
65
// todo! optimize this, we can prevent an agg_first aggregation upstream
66
// the ordering has changed due to the group_by
67
for key in keys.iter_mut() {
68
*key = key.slice(offset, len)
69
}
70
}
71
72
let agg_columns = evaluate_aggs(&df, &self.aggs, groups, state)?;
73
74
let mut columns = Vec::with_capacity(agg_columns.len() + 1 + keys.len());
75
columns.extend_from_slice(&keys);
76
columns.push(time_key);
77
columns.extend(agg_columns);
78
79
DataFrame::new_infer_height(columns)
80
}
81
}
82
83
impl Executor for GroupByDynamicExec {
84
#[cfg(not(feature = "dynamic_group_by"))]
85
fn execute(&mut self, _state: &mut ExecutionState) -> PolarsResult<DataFrame> {
86
panic!("activate feature dynamic_group_by")
87
}
88
89
#[cfg(feature = "dynamic_group_by")]
90
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
91
state.should_stop()?;
92
#[cfg(debug_assertions)]
93
{
94
if state.verbose() {
95
eprintln!("run GroupbyDynamicExec")
96
}
97
}
98
let df = self.input.execute(state)?;
99
100
let profile_name = if state.has_node_timer() {
101
let by = self
102
.keys
103
.iter()
104
.map(|s| Ok(s.to_field(&self.input_schema)?.name))
105
.collect::<PolarsResult<Vec<_>>>()?;
106
let name = comma_delimited("group_by_dynamic".to_string(), &by);
107
Cow::Owned(name)
108
} else {
109
Cow::Borrowed("")
110
};
111
112
if state.has_node_timer() {
113
let new_state = state.clone();
114
new_state.record(|| self.execute_impl(state, df), profile_name)
115
} else {
116
self.execute_impl(state, df)
117
}
118
}
119
}
120
121