Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/iterator.rs
8420 views
1
use std::sync::Arc;
2
3
use polars_core::error::PolarsResult;
4
use polars_utils::idx_vec::UnitVec;
5
use polars_utils::unitvec;
6
use visitor::{RewritingVisitor, TreeWalker};
7
8
use crate::prelude::*;
9
10
macro_rules! push_expr {
11
($current_expr:expr, $c:ident, $push:ident, $push_owned:ident, $iter:ident) => {{
12
use Expr::*;
13
match $current_expr {
14
DataTypeFunction(_) | Column(_) | Literal(_) | Len | Element => {},
15
#[cfg(feature = "dtype-struct")]
16
Field(_) => {},
17
Alias(e, _) => $push($c, e),
18
BinaryExpr { left, op: _, right } => {
19
// reverse order so that left is popped first
20
$push($c, right);
21
$push($c, left);
22
},
23
Cast { expr, .. } => $push($c, expr),
24
Sort { expr, .. } => $push($c, expr),
25
Gather { expr, idx, .. } => {
26
$push($c, idx);
27
$push($c, expr);
28
},
29
Filter { input, by } => {
30
$push($c, by);
31
// latest, so that it is popped first
32
$push($c, input);
33
},
34
SortBy { expr, by, .. } => {
35
for e in by {
36
$push_owned($c, e)
37
}
38
// latest, so that it is popped first
39
$push($c, expr);
40
},
41
Agg(agg_e) => {
42
use AggExpr::*;
43
match agg_e {
44
Max { input, .. } => $push($c, input),
45
Min { input, .. } => $push($c, input),
46
Mean(e) => $push($c, e),
47
Median(e) => $push($c, e),
48
NUnique(e) => $push($c, e),
49
First(e) => $push($c, e),
50
FirstNonNull(e) => $push($c, e),
51
Last(e) => $push($c, e),
52
LastNonNull(e) => $push($c, e),
53
Item { input, .. } => $push($c, input),
54
Implode(e) => $push($c, e),
55
Count { input, .. } => $push($c, input),
56
// TODO: shouldn't quantile push the quantile expr as well?
57
Quantile { expr, .. } => $push($c, expr),
58
Sum(e) => $push($c, e),
59
AggGroups(e) => $push($c, e),
60
Std(e, _) => $push($c, e),
61
Var(e, _) => $push($c, e),
62
}
63
},
64
Ternary {
65
truthy,
66
falsy,
67
predicate,
68
} => {
69
$push($c, predicate);
70
$push($c, falsy);
71
// latest, so that it is popped first
72
$push($c, truthy);
73
},
74
// we iterate in reverse order, so that the lhs is popped first and will be found
75
// as the root columns/ input columns by `_suffix` and `_keep_name` etc.
76
Display { inputs, .. } => inputs.$iter().rev().for_each(|e| $push_owned($c, e)),
77
AnonymousFunction { input, .. } => input.$iter().rev().for_each(|e| $push_owned($c, e)),
78
Eval {
79
expr, evaluation, ..
80
} => {
81
$push($c, evaluation);
82
$push($c, expr);
83
},
84
#[cfg(feature = "dtype-struct")]
85
StructEval { expr, evaluation } => {
86
evaluation.$iter().rev().for_each(|e| $push_owned($c, e));
87
$push($c, expr);
88
},
89
Function { input, .. } => input.$iter().rev().for_each(|e| $push_owned($c, e)),
90
Explode { input, .. } => $push($c, input),
91
#[cfg(feature = "dynamic_group_by")]
92
Rolling { function, .. } => $push($c, function),
93
Over {
94
function,
95
partition_by,
96
order_by,
97
..
98
} => {
99
if let Some((order_by, _)) = order_by {
100
$push($c, order_by);
101
}
102
for e in partition_by.into_iter().rev() {
103
$push_owned($c, e)
104
}
105
// latest so that it is popped first
106
$push($c, function);
107
},
108
Slice {
109
input,
110
offset,
111
length,
112
} => {
113
$push($c, length);
114
$push($c, offset);
115
// latest, so that it is popped first
116
$push($c, input);
117
},
118
KeepName(e) => $push($c, e),
119
RenameAlias { expr, .. } => $push($c, expr),
120
SubPlan { .. } => {},
121
// pass
122
Selector(_) => {},
123
}
124
}};
125
}
126
127
pub struct ExprIter<'a> {
128
stack: UnitVec<&'a Expr>,
129
}
130
131
impl<'a> Iterator for ExprIter<'a> {
132
type Item = &'a Expr;
133
134
fn next(&mut self) -> Option<Self::Item> {
135
self.stack
136
.pop()
137
.inspect(|current_expr| current_expr.nodes(&mut self.stack))
138
}
139
}
140
141
pub struct ExprMapper<F> {
142
f: F,
143
}
144
145
impl<F: FnMut(Expr) -> PolarsResult<Expr>> RewritingVisitor for ExprMapper<F> {
146
type Node = Expr;
147
type Arena = ();
148
149
fn mutate(&mut self, node: Self::Node, _arena: &mut Self::Arena) -> PolarsResult<Self::Node> {
150
(self.f)(node)
151
}
152
}
153
154
impl Expr {
155
pub fn nodes<'a>(&'a self, container: &mut UnitVec<&'a Expr>) {
156
let push = |c: &mut UnitVec<&'a Expr>, e: &'a Expr| c.push(e);
157
push_expr!(self, container, push, push, iter);
158
}
159
160
pub fn nodes_owned(self, container: &mut Vec<Expr>) {
161
let push_arc = |c: &mut Vec<Expr>, e: Arc<Expr>| c.push(Arc::unwrap_or_clone(e));
162
let push_owned = |c: &mut Vec<Expr>, e: Expr| c.push(e);
163
push_expr!(self, container, push_arc, push_owned, into_iter);
164
}
165
166
pub fn map_expr<F: FnMut(Self) -> Self>(self, mut f: F) -> Self {
167
self.rewrite(&mut ExprMapper { f: |e| Ok(f(e)) }, &mut ())
168
.unwrap()
169
}
170
171
pub fn try_map_expr<F: FnMut(Self) -> PolarsResult<Self>>(self, f: F) -> PolarsResult<Self> {
172
self.rewrite(&mut ExprMapper { f }, &mut ())
173
}
174
}
175
176
impl<'a> IntoIterator for &'a Expr {
177
type Item = &'a Expr;
178
type IntoIter = ExprIter<'a>;
179
180
fn into_iter(self) -> Self::IntoIter {
181
let stack = unitvec!(self);
182
ExprIter { stack }
183
}
184
}
185
186
pub struct AExprIter<'a> {
187
stack: UnitVec<Node>,
188
arena: Option<&'a Arena<AExpr>>,
189
}
190
191
impl<'a> Iterator for AExprIter<'a> {
192
type Item = (Node, &'a AExpr);
193
194
fn next(&mut self) -> Option<Self::Item> {
195
self.stack.pop().map(|node| {
196
// take the arena because the bchk doesn't allow a mutable borrow to the field.
197
let arena = self.arena.unwrap();
198
let current_expr = arena.get(node);
199
// Expressions such as StructEval may reference columns that are not input.
200
current_expr.children_rev(&mut self.stack);
201
202
self.arena = Some(arena);
203
(node, current_expr)
204
})
205
}
206
}
207
208
pub trait ArenaExprIter<'a> {
209
fn iter(&'a self, root: Node) -> AExprIter<'a>;
210
}
211
212
impl<'a> ArenaExprIter<'a> for Arena<AExpr> {
213
fn iter(&'a self, root: Node) -> AExprIter<'a> {
214
let stack = unitvec![root];
215
AExprIter {
216
stack,
217
arena: Some(self),
218
}
219
}
220
}
221
222
pub struct AlpIter<'a> {
223
stack: UnitVec<Node>,
224
arena: &'a Arena<IR>,
225
}
226
227
pub trait ArenaLpIter<'a> {
228
fn iter(&'a self, root: Node) -> AlpIter<'a>;
229
}
230
231
impl<'a> ArenaLpIter<'a> for Arena<IR> {
232
fn iter(&'a self, root: Node) -> AlpIter<'a> {
233
let stack = unitvec![root];
234
AlpIter { stack, arena: self }
235
}
236
}
237
238
impl<'a> Iterator for AlpIter<'a> {
239
type Item = (Node, &'a IR);
240
241
fn next(&mut self) -> Option<Self::Item> {
242
self.stack.pop().map(|node| {
243
let lp = self.arena.get(node);
244
lp.copy_inputs(&mut self.stack);
245
(node, lp)
246
})
247
}
248
}
249
250