Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/group_by_dynamic.rs
6940 views
1
use super::*;
2
3
#[cfg_attr(not(feature = "dynamic_group_by"), allow(dead_code))]
4
pub(crate) struct GroupByDynamicExec {
5
pub(crate) input: Box<dyn Executor>,
6
// we will use this later
7
#[allow(dead_code)]
8
pub(crate) keys: Vec<Arc<dyn PhysicalExpr>>,
9
pub(crate) aggs: Vec<Arc<dyn PhysicalExpr>>,
10
#[cfg(feature = "dynamic_group_by")]
11
pub(crate) options: DynamicGroupOptions,
12
pub(crate) input_schema: SchemaRef,
13
pub(crate) slice: Option<(i64, usize)>,
14
pub(crate) apply: Option<PlanCallback<DataFrame, DataFrame>>,
15
}
16
17
impl GroupByDynamicExec {
18
#[cfg(feature = "dynamic_group_by")]
19
fn execute_impl(
20
&mut self,
21
state: &ExecutionState,
22
mut df: DataFrame,
23
) -> PolarsResult<DataFrame> {
24
use crate::executors::group_by_rolling::sort_and_groups;
25
26
df.as_single_chunk_par();
27
28
let mut keys = self
29
.keys
30
.iter()
31
.map(|e| e.evaluate(&df, state))
32
.collect::<PolarsResult<Vec<_>>>()?;
33
34
let group_by = if !self.keys.is_empty() {
35
Some(sort_and_groups(&mut df, &mut keys)?)
36
} else {
37
None
38
};
39
40
let (mut time_key, bounds, groups) = df.group_by_dynamic(group_by, &self.options)?;
41
POOL.install(|| {
42
keys.iter_mut().for_each(|key| {
43
unsafe { *key = key.agg_first(&groups) };
44
})
45
});
46
keys.extend(bounds);
47
48
if let Some(f) = &self.apply {
49
let gb = GroupBy::new(&df, vec![], groups, None);
50
let out = gb.apply(move |df| f.call(df))?;
51
return Ok(if let Some((offset, len)) = self.slice {
52
out.slice(offset, len)
53
} else {
54
out
55
});
56
}
57
58
let mut groups = &groups;
59
#[allow(unused_assignments)]
60
// it is unused because we only use it to keep the lifetime of sliced_group valid
61
let mut sliced_groups = None;
62
63
if let Some((offset, len)) = self.slice {
64
sliced_groups = Some(groups.slice(offset, len));
65
groups = sliced_groups.as_ref().unwrap();
66
67
time_key = time_key.slice(offset, len);
68
69
// todo! optimize this, we can prevent an agg_first aggregation upstream
70
// the ordering has changed due to the group_by
71
for key in keys.iter_mut() {
72
*key = key.slice(offset, len)
73
}
74
}
75
76
let agg_columns = evaluate_aggs(&df, &self.aggs, groups, state)?;
77
78
let mut columns = Vec::with_capacity(agg_columns.len() + 1 + keys.len());
79
columns.extend_from_slice(&keys);
80
columns.push(time_key);
81
columns.extend(agg_columns);
82
83
DataFrame::new(columns)
84
}
85
}
86
87
impl Executor for GroupByDynamicExec {
88
#[cfg(not(feature = "dynamic_group_by"))]
89
fn execute(&mut self, _state: &mut ExecutionState) -> PolarsResult<DataFrame> {
90
panic!("activate feature dynamic_group_by")
91
}
92
93
#[cfg(feature = "dynamic_group_by")]
94
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
95
state.should_stop()?;
96
#[cfg(debug_assertions)]
97
{
98
if state.verbose() {
99
eprintln!("run GroupbyDynamicExec")
100
}
101
}
102
let df = self.input.execute(state)?;
103
104
let profile_name = if state.has_node_timer() {
105
let by = self
106
.keys
107
.iter()
108
.map(|s| Ok(s.to_field(&self.input_schema)?.name))
109
.collect::<PolarsResult<Vec<_>>>()?;
110
let name = comma_delimited("group_by_dynamic".to_string(), &by);
111
Cow::Owned(name)
112
} else {
113
Cow::Borrowed("")
114
};
115
116
if state.has_node_timer() {
117
let new_state = state.clone();
118
new_state.record(|| self.execute_impl(state, df), profile_name)
119
} else {
120
self.execute_impl(state, df)
121
}
122
}
123
}
124
125