Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/iterator.rs
6940 views
1
use std::sync::Arc;
2
3
use polars_core::error::PolarsResult;
4
use polars_utils::idx_vec::UnitVec;
5
use polars_utils::unitvec;
6
use visitor::{RewritingVisitor, TreeWalker};
7
8
use crate::prelude::*;
9
10
macro_rules! push_expr {
11
($current_expr:expr, $c:ident, $push:ident, $push_owned:ident, $iter:ident) => {{
12
use Expr::*;
13
match $current_expr {
14
DataTypeFunction(_) | Column(_) | Literal(_) | Len => {},
15
#[cfg(feature = "dtype-struct")]
16
Field(_) => {},
17
Alias(e, _) => $push($c, e),
18
BinaryExpr { left, op: _, right } => {
19
// reverse order so that left is popped first
20
$push($c, right);
21
$push($c, left);
22
},
23
Cast { expr, .. } => $push($c, expr),
24
Sort { expr, .. } => $push($c, expr),
25
Gather { expr, idx, .. } => {
26
$push($c, idx);
27
$push($c, expr);
28
},
29
Filter { input, by } => {
30
$push($c, by);
31
// latest, so that it is popped first
32
$push($c, input);
33
},
34
SortBy { expr, by, .. } => {
35
for e in by {
36
$push_owned($c, e)
37
}
38
// latest, so that it is popped first
39
$push($c, expr);
40
},
41
Agg(agg_e) => {
42
use AggExpr::*;
43
match agg_e {
44
Max { input, .. } => $push($c, input),
45
Min { input, .. } => $push($c, input),
46
Mean(e) => $push($c, e),
47
Median(e) => $push($c, e),
48
NUnique(e) => $push($c, e),
49
First(e) => $push($c, e),
50
Last(e) => $push($c, e),
51
Implode(e) => $push($c, e),
52
Count { input, .. } => $push($c, input),
53
Quantile { expr, .. } => $push($c, expr),
54
Sum(e) => $push($c, e),
55
AggGroups(e) => $push($c, e),
56
Std(e, _) => $push($c, e),
57
Var(e, _) => $push($c, e),
58
}
59
},
60
Ternary {
61
truthy,
62
falsy,
63
predicate,
64
} => {
65
$push($c, predicate);
66
$push($c, falsy);
67
// latest, so that it is popped first
68
$push($c, truthy);
69
},
70
// we iterate in reverse order, so that the lhs is popped first and will be found
71
// as the root columns/ input columns by `_suffix` and `_keep_name` etc.
72
AnonymousFunction { input, .. } => input.$iter().rev().for_each(|e| $push_owned($c, e)),
73
Eval {
74
expr, evaluation, ..
75
} => {
76
$push($c, evaluation);
77
$push($c, expr);
78
},
79
Function { input, .. } => input.$iter().rev().for_each(|e| $push_owned($c, e)),
80
Explode { input, .. } => $push($c, input),
81
Window {
82
function,
83
partition_by,
84
..
85
} => {
86
for e in partition_by.into_iter().rev() {
87
$push_owned($c, e)
88
}
89
// latest so that it is popped first
90
$push($c, function);
91
},
92
Slice {
93
input,
94
offset,
95
length,
96
} => {
97
$push($c, length);
98
$push($c, offset);
99
// latest, so that it is popped first
100
$push($c, input);
101
},
102
KeepName(e) => $push($c, e),
103
RenameAlias { expr, .. } => $push($c, expr),
104
SubPlan { .. } => {},
105
// pass
106
Selector(_) => {},
107
}
108
}};
109
}
110
111
pub struct ExprIter<'a> {
112
stack: UnitVec<&'a Expr>,
113
}
114
115
impl<'a> Iterator for ExprIter<'a> {
116
type Item = &'a Expr;
117
118
fn next(&mut self) -> Option<Self::Item> {
119
self.stack
120
.pop()
121
.inspect(|current_expr| current_expr.nodes(&mut self.stack))
122
}
123
}
124
125
pub struct ExprMapper<F> {
126
f: F,
127
}
128
129
impl<F: FnMut(Expr) -> PolarsResult<Expr>> RewritingVisitor for ExprMapper<F> {
130
type Node = Expr;
131
type Arena = ();
132
133
fn mutate(&mut self, node: Self::Node, _arena: &mut Self::Arena) -> PolarsResult<Self::Node> {
134
(self.f)(node)
135
}
136
}
137
138
impl Expr {
139
pub fn nodes<'a>(&'a self, container: &mut UnitVec<&'a Expr>) {
140
let push = |c: &mut UnitVec<&'a Expr>, e: &'a Expr| c.push(e);
141
push_expr!(self, container, push, push, iter);
142
}
143
144
pub fn nodes_owned(self, container: &mut Vec<Expr>) {
145
let push_arc = |c: &mut Vec<Expr>, e: Arc<Expr>| c.push(Arc::unwrap_or_clone(e));
146
let push_owned = |c: &mut Vec<Expr>, e: Expr| c.push(e);
147
push_expr!(self, container, push_arc, push_owned, into_iter);
148
}
149
150
pub fn map_expr<F: FnMut(Self) -> Self>(self, mut f: F) -> Self {
151
self.rewrite(&mut ExprMapper { f: |e| Ok(f(e)) }, &mut ())
152
.unwrap()
153
}
154
155
pub fn try_map_expr<F: FnMut(Self) -> PolarsResult<Self>>(self, f: F) -> PolarsResult<Self> {
156
self.rewrite(&mut ExprMapper { f }, &mut ())
157
}
158
}
159
160
impl<'a> IntoIterator for &'a Expr {
161
type Item = &'a Expr;
162
type IntoIter = ExprIter<'a>;
163
164
fn into_iter(self) -> Self::IntoIter {
165
let stack = unitvec!(self);
166
ExprIter { stack }
167
}
168
}
169
170
pub struct AExprIter<'a> {
171
stack: UnitVec<Node>,
172
arena: Option<&'a Arena<AExpr>>,
173
}
174
175
impl<'a> Iterator for AExprIter<'a> {
176
type Item = (Node, &'a AExpr);
177
178
fn next(&mut self) -> Option<Self::Item> {
179
self.stack.pop().map(|node| {
180
// take the arena because the bchk doesn't allow a mutable borrow to the field.
181
let arena = self.arena.unwrap();
182
let current_expr = arena.get(node);
183
current_expr.inputs_rev(&mut self.stack);
184
185
self.arena = Some(arena);
186
(node, current_expr)
187
})
188
}
189
}
190
191
pub trait ArenaExprIter<'a> {
192
fn iter(&'a self, root: Node) -> AExprIter<'a>;
193
}
194
195
impl<'a> ArenaExprIter<'a> for Arena<AExpr> {
196
fn iter(&'a self, root: Node) -> AExprIter<'a> {
197
let stack = unitvec![root];
198
AExprIter {
199
stack,
200
arena: Some(self),
201
}
202
}
203
}
204
205
pub struct AlpIter<'a> {
206
stack: UnitVec<Node>,
207
arena: &'a Arena<IR>,
208
}
209
210
pub trait ArenaLpIter<'a> {
211
fn iter(&'a self, root: Node) -> AlpIter<'a>;
212
}
213
214
impl<'a> ArenaLpIter<'a> for Arena<IR> {
215
fn iter(&'a self, root: Node) -> AlpIter<'a> {
216
let stack = unitvec![root];
217
AlpIter { stack, arena: self }
218
}
219
}
220
221
impl<'a> Iterator for AlpIter<'a> {
222
type Item = (Node, &'a IR);
223
224
fn next(&mut self) -> Option<Self::Item> {
225
self.stack.pop().map(|node| {
226
let lp = self.arena.get(node);
227
lp.copy_inputs(&mut self.stack);
228
(node, lp)
229
})
230
}
231
}
232
233