Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/expressions/rolling.rs
6940 views
1
use polars_time::{PolarsTemporalGroupby, RollingGroupOptions};
2
3
use super::*;
4
5
pub(crate) struct RollingExpr {
6
/// the root column that the Function will be applied on.
7
/// This will be used to create a smaller DataFrame to prevent taking unneeded columns by index
8
/// TODO! support keys?
9
/// The challenge is that the group_by will reorder the results and the
10
/// keys, and time index would need to be updated, or the result should be joined back
11
/// For now, don't support it.
12
///
13
/// A function Expr. i.e. Mean, Median, Max, etc.
14
pub(crate) function: Expr,
15
pub(crate) phys_function: Arc<dyn PhysicalExpr>,
16
pub(crate) options: RollingGroupOptions,
17
pub(crate) expr: Expr,
18
}
19
20
impl PhysicalExpr for RollingExpr {
21
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
22
let groups_key = format!("{:?}", &self.options);
23
24
let groups = {
25
// Groups must be set by expression runner.
26
state.window_cache.get_groups(&groups_key)
27
};
28
29
// There can be multiple rolling expressions in a single expr.
30
// E.g. `min().rolling() + max().rolling()`
31
// So if we hit that we will compute them here.
32
let groups = match groups {
33
Some(groups) => groups,
34
None => {
35
let (_time_key, groups) = df.rolling(None, &self.options)?;
36
state.window_cache.insert_groups(groups_key, groups.clone());
37
groups
38
},
39
};
40
41
let out = self
42
.phys_function
43
.evaluate_on_groups(df, &groups, state)?
44
.finalize();
45
polars_ensure!(out.len() == groups.len(), agg_len = out.len(), groups.len());
46
Ok(out.into_column())
47
}
48
49
fn evaluate_on_groups<'a>(
50
&self,
51
_df: &DataFrame,
52
_groups: &'a GroupPositions,
53
_state: &ExecutionState,
54
) -> PolarsResult<AggregationContext<'a>> {
55
polars_bail!(InvalidOperation: "rolling expression not allowed in aggregation");
56
}
57
58
fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
59
self.function.to_field(input_schema)
60
}
61
62
fn as_expression(&self) -> Option<&Expr> {
63
Some(&self.expr)
64
}
65
66
fn is_scalar(&self) -> bool {
67
false
68
}
69
}
70
71