Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/group_by_rolling.rs
8427 views
1
use polars_utils::unique_column_name;
2
3
use super::*;
4
5
#[cfg_attr(not(feature = "dynamic_group_by"), allow(dead_code))]
6
pub(crate) struct GroupByRollingExec {
7
pub(crate) input: Box<dyn Executor>,
8
pub(crate) keys: Vec<Arc<dyn PhysicalExpr>>,
9
pub(crate) aggs: Vec<Arc<dyn PhysicalExpr>>,
10
#[cfg(feature = "dynamic_group_by")]
11
pub(crate) options: RollingGroupOptions,
12
pub(crate) input_schema: SchemaRef,
13
pub(crate) output_schema: SchemaRef,
14
pub(crate) slice: Option<(i64, usize)>,
15
pub(crate) apply: Option<PlanCallback<DataFrame, DataFrame>>,
16
}
17
18
pub(super) fn sort_and_groups(
19
df: &mut DataFrame,
20
keys: &mut Vec<Column>,
21
) -> PolarsResult<Vec<[IdxSize; 2]>> {
22
let encoded = row_encode::encode_rows_vertical_par_unordered(keys)?;
23
let encoded = encoded.rechunk().into_owned();
24
let encoded = encoded.with_name(unique_column_name());
25
let idx = encoded.arg_sort(SortOptions {
26
maintain_order: true,
27
..Default::default()
28
});
29
30
let encoded = unsafe {
31
df.push_column_unchecked(encoded.into_series().into());
32
33
// If not sorted on keys, sort.
34
let idx_s = idx.clone().into_series();
35
if !idx_s.is_sorted(Default::default()).unwrap() {
36
let (df_ordered, keys_ordered) = POOL.join(
37
|| df.take_unchecked(&idx),
38
|| {
39
keys.iter()
40
.map(|c| c.take_unchecked(&idx))
41
.collect::<Vec<_>>()
42
},
43
);
44
*df = df_ordered;
45
*keys = keys_ordered;
46
}
47
48
df.columns_mut().pop().unwrap()
49
};
50
let encoded = encoded.as_materialized_series();
51
let encoded = encoded.binary_offset().unwrap();
52
let encoded = encoded.with_sorted_flag(polars_core::series::IsSorted::Ascending);
53
let groups = encoded.group_tuples(true, false).unwrap();
54
55
let GroupsType::Slice { groups, .. } = groups else {
56
// memory would explode
57
unreachable!();
58
};
59
Ok(groups)
60
}
61
62
impl GroupByRollingExec {
63
#[cfg(feature = "dynamic_group_by")]
64
fn execute_impl(
65
&mut self,
66
state: &ExecutionState,
67
mut df: DataFrame,
68
) -> PolarsResult<DataFrame> {
69
df.rechunk_mut_par();
70
71
let mut keys = self
72
.keys
73
.iter()
74
.map(|e| e.evaluate(&df, state))
75
.collect::<PolarsResult<Vec<_>>>()?;
76
77
let group_by = if !self.keys.is_empty() {
78
Some(sort_and_groups(&mut df, &mut keys)?)
79
} else {
80
None
81
};
82
83
let (mut time_key, groups) = df.rolling(group_by, &self.options)?;
84
85
if let Some(f) = &self.apply {
86
let gb = GroupBy::new(&df, vec![], groups, None);
87
return gb.apply_sliced(self.slice, move |df| f.call(df), Some(&self.output_schema));
88
}
89
90
let mut groups = &groups;
91
#[allow(unused_assignments)]
92
// it is unused because we only use it to keep the lifetime of sliced_group valid
93
let mut sliced_groups = None;
94
95
if let Some((offset, len)) = self.slice {
96
sliced_groups = Some(groups.slice(offset, len));
97
groups = sliced_groups.as_ref().unwrap();
98
99
time_key = time_key.slice(offset, len);
100
for k in &mut keys {
101
*k = k.slice(offset, len);
102
}
103
}
104
105
let agg_columns = evaluate_aggs(&df, &self.aggs, groups, state)?;
106
107
let mut columns = Vec::with_capacity(agg_columns.len() + 1 + keys.len());
108
columns.extend_from_slice(&keys);
109
columns.push(time_key);
110
columns.extend(agg_columns);
111
112
DataFrame::new_infer_height(columns)
113
}
114
}
115
116
impl Executor for GroupByRollingExec {
117
#[cfg(not(feature = "dynamic_group_by"))]
118
fn execute(&mut self, _state: &mut ExecutionState) -> PolarsResult<DataFrame> {
119
panic!("activate feature dynamic_group_by")
120
}
121
122
#[cfg(feature = "dynamic_group_by")]
123
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
124
state.should_stop()?;
125
#[cfg(debug_assertions)]
126
{
127
if state.verbose() {
128
eprintln!("run GroupbyRollingExec")
129
}
130
}
131
let df = self.input.execute(state)?;
132
let profile_name = if state.has_node_timer() {
133
let by = self
134
.keys
135
.iter()
136
.map(|s| Ok(s.to_field(&self.input_schema)?.name))
137
.collect::<PolarsResult<Vec<_>>>()?;
138
let name = comma_delimited("group_by_rolling".to_string(), &by);
139
Cow::Owned(name)
140
} else {
141
Cow::Borrowed("")
142
};
143
144
if state.has_node_timer() {
145
let new_state = state.clone();
146
new_state.record(|| self.execute_impl(state, df), profile_name)
147
} else {
148
self.execute_impl(state, df)
149
}
150
}
151
}
152
153