Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/group_by_rolling.rs
6940 views
1
use polars_utils::unique_column_name;
2
3
use super::*;
4
5
#[cfg_attr(not(feature = "dynamic_group_by"), allow(dead_code))]
6
pub(crate) struct GroupByRollingExec {
7
pub(crate) input: Box<dyn Executor>,
8
pub(crate) keys: Vec<Arc<dyn PhysicalExpr>>,
9
pub(crate) aggs: Vec<Arc<dyn PhysicalExpr>>,
10
#[cfg(feature = "dynamic_group_by")]
11
pub(crate) options: RollingGroupOptions,
12
pub(crate) input_schema: SchemaRef,
13
pub(crate) slice: Option<(i64, usize)>,
14
pub(crate) apply: Option<PlanCallback<DataFrame, DataFrame>>,
15
}
16
17
pub(super) fn sort_and_groups(
18
df: &mut DataFrame,
19
keys: &mut Vec<Column>,
20
) -> PolarsResult<Vec<[IdxSize; 2]>> {
21
let encoded = row_encode::encode_rows_vertical_par_unordered(keys)?;
22
let encoded = encoded.rechunk().into_owned();
23
let encoded = encoded.with_name(unique_column_name());
24
let idx = encoded.arg_sort(SortOptions {
25
maintain_order: true,
26
..Default::default()
27
});
28
29
let encoded = unsafe {
30
df.with_column_unchecked(encoded.into_series().into());
31
32
// If not sorted on keys, sort.
33
let idx_s = idx.clone().into_series();
34
if !idx_s.is_sorted(Default::default()).unwrap() {
35
let (df_ordered, keys_ordered) = POOL.join(
36
|| df.take_unchecked(&idx),
37
|| {
38
keys.iter()
39
.map(|c| c.take_unchecked(&idx))
40
.collect::<Vec<_>>()
41
},
42
);
43
*df = df_ordered;
44
*keys = keys_ordered;
45
}
46
47
df.get_columns_mut().pop().unwrap()
48
};
49
let encoded = encoded.as_materialized_series();
50
let encoded = encoded.binary_offset().unwrap();
51
let encoded = encoded.with_sorted_flag(polars_core::series::IsSorted::Ascending);
52
let groups = encoded.group_tuples(true, false).unwrap();
53
54
let GroupsType::Slice { groups, .. } = groups else {
55
// memory would explode
56
unreachable!();
57
};
58
Ok(groups)
59
}
60
61
impl GroupByRollingExec {
62
#[cfg(feature = "dynamic_group_by")]
63
fn execute_impl(
64
&mut self,
65
state: &ExecutionState,
66
mut df: DataFrame,
67
) -> PolarsResult<DataFrame> {
68
df.as_single_chunk_par();
69
70
let mut keys = self
71
.keys
72
.iter()
73
.map(|e| e.evaluate(&df, state))
74
.collect::<PolarsResult<Vec<_>>>()?;
75
76
let group_by = if !self.keys.is_empty() {
77
Some(sort_and_groups(&mut df, &mut keys)?)
78
} else {
79
None
80
};
81
82
let (mut time_key, groups) = df.rolling(group_by, &self.options)?;
83
84
if let Some(f) = &self.apply {
85
let gb = GroupBy::new(&df, vec![], groups, None);
86
let out = gb.apply(move |df| f.call(df))?;
87
return Ok(if let Some((offset, len)) = self.slice {
88
out.slice(offset, len)
89
} else {
90
out
91
});
92
}
93
94
let mut groups = &groups;
95
#[allow(unused_assignments)]
96
// it is unused because we only use it to keep the lifetime of sliced_group valid
97
let mut sliced_groups = None;
98
99
if let Some((offset, len)) = self.slice {
100
sliced_groups = Some(groups.slice(offset, len));
101
groups = sliced_groups.as_ref().unwrap();
102
103
time_key = time_key.slice(offset, len);
104
for k in &mut keys {
105
*k = k.slice(offset, len);
106
}
107
}
108
109
let agg_columns = evaluate_aggs(&df, &self.aggs, groups, state)?;
110
111
let mut columns = Vec::with_capacity(agg_columns.len() + 1 + keys.len());
112
columns.extend_from_slice(&keys);
113
columns.push(time_key);
114
columns.extend(agg_columns);
115
116
DataFrame::new(columns)
117
}
118
}
119
120
impl Executor for GroupByRollingExec {
121
#[cfg(not(feature = "dynamic_group_by"))]
122
fn execute(&mut self, _state: &mut ExecutionState) -> PolarsResult<DataFrame> {
123
panic!("activate feature dynamic_group_by")
124
}
125
126
#[cfg(feature = "dynamic_group_by")]
127
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
128
state.should_stop()?;
129
#[cfg(debug_assertions)]
130
{
131
if state.verbose() {
132
eprintln!("run GroupbyRollingExec")
133
}
134
}
135
let df = self.input.execute(state)?;
136
let profile_name = if state.has_node_timer() {
137
let by = self
138
.keys
139
.iter()
140
.map(|s| Ok(s.to_field(&self.input_schema)?.name))
141
.collect::<PolarsResult<Vec<_>>>()?;
142
let name = comma_delimited("group_by_rolling".to_string(), &by);
143
Cow::Owned(name)
144
} else {
145
Cow::Borrowed("")
146
};
147
148
if state.has_node_timer() {
149
let new_state = state.clone();
150
new_state.record(|| self.execute_impl(state, df), profile_name)
151
} else {
152
self.execute_impl(state, df)
153
}
154
}
155
}
156
157