Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/reduce/convert.rs
8415 views
1
// use polars_core::error::feature_gated;
2
use polars_plan::prelude::*;
3
use polars_utils::arena::{Arena, Node};
4
5
use super::*;
6
use crate::reduce::any_all::{new_all_reduction, new_any_reduction};
7
#[cfg(feature = "approx_unique")]
8
use crate::reduce::approx_n_unique::new_approx_n_unique_reduction;
9
#[cfg(feature = "bitwise")]
10
use crate::reduce::bitwise::{
11
new_bitwise_and_reduction, new_bitwise_or_reduction, new_bitwise_xor_reduction,
12
};
13
use crate::reduce::count::{CountReduce, NullCountReduce};
14
use crate::reduce::first_last::{new_first_reduction, new_item_reduction, new_last_reduction};
15
use crate::reduce::first_last_nonnull::{new_first_nonnull_reduction, new_last_nonnull_reduction};
16
use crate::reduce::len::LenReduce;
17
use crate::reduce::mean::new_mean_reduction;
18
use crate::reduce::min_max::{new_max_reduction, new_min_reduction};
19
use crate::reduce::min_max_by::{new_max_by_reduction, new_min_by_reduction};
20
use crate::reduce::sum::new_sum_reduction;
21
use crate::reduce::var_std::new_var_std_reduction;
22
23
/// Converts a node into a reduction + its associated selector expression.
24
pub fn into_reduction(
25
node: Node,
26
expr_arena: &mut Arena<AExpr>,
27
schema: &Schema,
28
is_aggregation_context: bool,
29
) -> PolarsResult<(Box<dyn GroupedReduction>, Vec<Node>)> {
30
let get_dt = |node| {
31
expr_arena
32
.get(node)
33
.to_dtype(&ToFieldContext::new(expr_arena, schema))?
34
.materialize_unknown(false)
35
};
36
let (gr, in_node) = match expr_arena.get(node) {
37
AExpr::Agg(agg) => match agg {
38
IRAggExpr::Sum(input) => (new_sum_reduction(get_dt(*input)?)?, *input),
39
IRAggExpr::Mean(input) => (new_mean_reduction(get_dt(*input)?)?, *input),
40
IRAggExpr::Min {
41
propagate_nans,
42
input,
43
} => (new_min_reduction(get_dt(*input)?, *propagate_nans)?, *input),
44
IRAggExpr::Max {
45
propagate_nans,
46
input,
47
} => (new_max_reduction(get_dt(*input)?, *propagate_nans)?, *input),
48
IRAggExpr::Var(input, ddof) => (
49
new_var_std_reduction(get_dt(*input)?, false, *ddof)?,
50
*input,
51
),
52
IRAggExpr::Std(input, ddof) => {
53
(new_var_std_reduction(get_dt(*input)?, true, *ddof)?, *input)
54
},
55
IRAggExpr::First(input) => (new_first_reduction(get_dt(*input)?), *input),
56
IRAggExpr::FirstNonNull(input) => {
57
(new_first_nonnull_reduction(get_dt(*input)?), *input)
58
},
59
IRAggExpr::Last(input) => (new_last_reduction(get_dt(*input)?), *input),
60
IRAggExpr::LastNonNull(input) => (new_last_nonnull_reduction(get_dt(*input)?), *input),
61
IRAggExpr::Item { input, allow_empty } => {
62
(new_item_reduction(get_dt(*input)?, *allow_empty), *input)
63
},
64
IRAggExpr::Count {
65
input,
66
include_nulls,
67
} => {
68
let count = Box::new(CountReduce::new(*include_nulls)) as Box<_>;
69
(count, *input)
70
},
71
IRAggExpr::Quantile { .. } => todo!(),
72
IRAggExpr::Median(_) => todo!(),
73
IRAggExpr::NUnique(_) => todo!(),
74
IRAggExpr::Implode(_) => todo!(),
75
IRAggExpr::AggGroups(_) => todo!(),
76
},
77
AExpr::Len => {
78
if let Some(first_column) = schema.iter_names().next() {
79
let out: Box<dyn GroupedReduction> = Box::new(LenReduce::default());
80
let expr = expr_arena.add(AExpr::Column(first_column.as_str().into()));
81
82
(out, expr)
83
} else {
84
// Support len aggregation on 0-width morsels.
85
// Notes:
86
// * We do this instead of projecting a scalar, because scalar literals don't
87
// project to the height of the DataFrame (in the PhysicalExpr impl).
88
// * This approach is not sound for `update_groups()`, but currently that case is
89
// not hit (it would need group-by -> len on empty morsels).
90
polars_ensure!(
91
!is_aggregation_context,
92
ComputeError:
93
"not implemented: len() of groups with no columns"
94
);
95
96
let out: Box<dyn GroupedReduction> = new_sum_reduction(DataType::IDX_DTYPE)?;
97
let expr = expr_arena.add(AExpr::Len);
98
99
(out, expr)
100
}
101
},
102
103
AExpr::Function {
104
input: inner_exprs,
105
function: IRFunctionExpr::NullCount,
106
options: _,
107
} => {
108
assert!(inner_exprs.len() == 1);
109
let input = inner_exprs[0].node();
110
let count = Box::new(NullCountReduce::new()) as Box<_>;
111
(count, input)
112
},
113
114
#[cfg(feature = "approx_unique")]
115
AExpr::Function {
116
input: inner_exprs,
117
function: IRFunctionExpr::ApproxNUnique,
118
options: _,
119
} => {
120
assert!(inner_exprs.len() == 1);
121
let input = inner_exprs[0].node();
122
let out = new_approx_n_unique_reduction(get_dt(input)?)?;
123
(out, input)
124
},
125
126
#[cfg(feature = "bitwise")]
127
AExpr::Function {
128
input: inner_exprs,
129
function: IRFunctionExpr::Bitwise(inner_fn),
130
options: _,
131
} => {
132
assert!(inner_exprs.len() == 1);
133
let input = inner_exprs[0].node();
134
match inner_fn {
135
IRBitwiseFunction::And => (new_bitwise_and_reduction(get_dt(input)?), input),
136
IRBitwiseFunction::Or => (new_bitwise_or_reduction(get_dt(input)?), input),
137
IRBitwiseFunction::Xor => (new_bitwise_xor_reduction(get_dt(input)?), input),
138
_ => unreachable!(),
139
}
140
},
141
142
AExpr::Function {
143
input: inner_exprs,
144
function: IRFunctionExpr::Boolean(inner_fn),
145
options: _,
146
} => {
147
assert!(inner_exprs.len() == 1);
148
let input = inner_exprs[0].node();
149
match inner_fn {
150
IRBooleanFunction::Any { ignore_nulls } => {
151
(new_any_reduction(*ignore_nulls), input)
152
},
153
IRBooleanFunction::All { ignore_nulls } => {
154
(new_all_reduction(*ignore_nulls), input)
155
},
156
_ => unreachable!(),
157
}
158
},
159
160
AExpr::Function {
161
input: inner_exprs,
162
function: IRFunctionExpr::MinBy,
163
options: _,
164
} => {
165
assert!(inner_exprs.len() == 2);
166
let input = inner_exprs[0].node();
167
let by = inner_exprs[1].node();
168
let gr = new_min_by_reduction(get_dt(input)?, get_dt(by)?)?;
169
return Ok((gr, vec![input, by]));
170
},
171
172
AExpr::Function {
173
input: inner_exprs,
174
function: IRFunctionExpr::MaxBy,
175
options: _,
176
} => {
177
assert!(inner_exprs.len() == 2);
178
let input = inner_exprs[0].node();
179
let by = inner_exprs[1].node();
180
let gr = new_max_by_reduction(get_dt(input)?, get_dt(by)?)?;
181
return Ok((gr, vec![input, by]));
182
},
183
184
AExpr::AnonymousAgg {
185
input: inner_exprs,
186
fmt_str: _,
187
function,
188
} => {
189
let ann_agg = function.materialize()?;
190
assert!(inner_exprs.len() == 1);
191
let input = inner_exprs[0].node();
192
let reduction = ann_agg.as_any();
193
let reduction = reduction
194
.downcast_ref::<Box<dyn GroupedReduction>>()
195
.unwrap();
196
(reduction.new_empty(), input)
197
},
198
_ => unreachable!(),
199
};
200
Ok((gr, vec![in_node]))
201
}
202
203