Path: blob/main/crates/polars-plan/src/plans/iterator.rs
6940 views
use std::sync::Arc;12use polars_core::error::PolarsResult;3use polars_utils::idx_vec::UnitVec;4use polars_utils::unitvec;5use visitor::{RewritingVisitor, TreeWalker};67use crate::prelude::*;89macro_rules! push_expr {10($current_expr:expr, $c:ident, $push:ident, $push_owned:ident, $iter:ident) => {{11use Expr::*;12match $current_expr {13DataTypeFunction(_) | Column(_) | Literal(_) | Len => {},14#[cfg(feature = "dtype-struct")]15Field(_) => {},16Alias(e, _) => $push($c, e),17BinaryExpr { left, op: _, right } => {18// reverse order so that left is popped first19$push($c, right);20$push($c, left);21},22Cast { expr, .. } => $push($c, expr),23Sort { expr, .. } => $push($c, expr),24Gather { expr, idx, .. } => {25$push($c, idx);26$push($c, expr);27},28Filter { input, by } => {29$push($c, by);30// latest, so that it is popped first31$push($c, input);32},33SortBy { expr, by, .. } => {34for e in by {35$push_owned($c, e)36}37// latest, so that it is popped first38$push($c, expr);39},40Agg(agg_e) => {41use AggExpr::*;42match agg_e {43Max { input, .. } => $push($c, input),44Min { input, .. } => $push($c, input),45Mean(e) => $push($c, e),46Median(e) => $push($c, e),47NUnique(e) => $push($c, e),48First(e) => $push($c, e),49Last(e) => $push($c, e),50Implode(e) => $push($c, e),51Count { input, .. } => $push($c, input),52Quantile { expr, .. } => $push($c, expr),53Sum(e) => $push($c, e),54AggGroups(e) => $push($c, e),55Std(e, _) => $push($c, e),56Var(e, _) => $push($c, e),57}58},59Ternary {60truthy,61falsy,62predicate,63} => {64$push($c, predicate);65$push($c, falsy);66// latest, so that it is popped first67$push($c, truthy);68},69// we iterate in reverse order, so that the lhs is popped first and will be found70// as the root columns/ input columns by `_suffix` and `_keep_name` etc.71AnonymousFunction { input, .. } => input.$iter().rev().for_each(|e| $push_owned($c, e)),72Eval {73expr, evaluation, ..74} => {75$push($c, evaluation);76$push($c, expr);77},78Function { input, .. } => input.$iter().rev().for_each(|e| $push_owned($c, e)),79Explode { input, .. } => $push($c, input),80Window {81function,82partition_by,83..84} => {85for e in partition_by.into_iter().rev() {86$push_owned($c, e)87}88// latest so that it is popped first89$push($c, function);90},91Slice {92input,93offset,94length,95} => {96$push($c, length);97$push($c, offset);98// latest, so that it is popped first99$push($c, input);100},101KeepName(e) => $push($c, e),102RenameAlias { expr, .. } => $push($c, expr),103SubPlan { .. } => {},104// pass105Selector(_) => {},106}107}};108}109110pub struct ExprIter<'a> {111stack: UnitVec<&'a Expr>,112}113114impl<'a> Iterator for ExprIter<'a> {115type Item = &'a Expr;116117fn next(&mut self) -> Option<Self::Item> {118self.stack119.pop()120.inspect(|current_expr| current_expr.nodes(&mut self.stack))121}122}123124pub struct ExprMapper<F> {125f: F,126}127128impl<F: FnMut(Expr) -> PolarsResult<Expr>> RewritingVisitor for ExprMapper<F> {129type Node = Expr;130type Arena = ();131132fn mutate(&mut self, node: Self::Node, _arena: &mut Self::Arena) -> PolarsResult<Self::Node> {133(self.f)(node)134}135}136137impl Expr {138pub fn nodes<'a>(&'a self, container: &mut UnitVec<&'a Expr>) {139let push = |c: &mut UnitVec<&'a Expr>, e: &'a Expr| c.push(e);140push_expr!(self, container, push, push, iter);141}142143pub fn nodes_owned(self, container: &mut Vec<Expr>) {144let push_arc = |c: &mut Vec<Expr>, e: Arc<Expr>| c.push(Arc::unwrap_or_clone(e));145let push_owned = |c: &mut Vec<Expr>, e: Expr| c.push(e);146push_expr!(self, container, push_arc, push_owned, into_iter);147}148149pub fn map_expr<F: FnMut(Self) -> Self>(self, mut f: F) -> Self {150self.rewrite(&mut ExprMapper { f: |e| Ok(f(e)) }, &mut ())151.unwrap()152}153154pub fn try_map_expr<F: FnMut(Self) -> PolarsResult<Self>>(self, f: F) -> PolarsResult<Self> {155self.rewrite(&mut ExprMapper { f }, &mut ())156}157}158159impl<'a> IntoIterator for &'a Expr {160type Item = &'a Expr;161type IntoIter = ExprIter<'a>;162163fn into_iter(self) -> Self::IntoIter {164let stack = unitvec!(self);165ExprIter { stack }166}167}168169pub struct AExprIter<'a> {170stack: UnitVec<Node>,171arena: Option<&'a Arena<AExpr>>,172}173174impl<'a> Iterator for AExprIter<'a> {175type Item = (Node, &'a AExpr);176177fn next(&mut self) -> Option<Self::Item> {178self.stack.pop().map(|node| {179// take the arena because the bchk doesn't allow a mutable borrow to the field.180let arena = self.arena.unwrap();181let current_expr = arena.get(node);182current_expr.inputs_rev(&mut self.stack);183184self.arena = Some(arena);185(node, current_expr)186})187}188}189190pub trait ArenaExprIter<'a> {191fn iter(&'a self, root: Node) -> AExprIter<'a>;192}193194impl<'a> ArenaExprIter<'a> for Arena<AExpr> {195fn iter(&'a self, root: Node) -> AExprIter<'a> {196let stack = unitvec![root];197AExprIter {198stack,199arena: Some(self),200}201}202}203204pub struct AlpIter<'a> {205stack: UnitVec<Node>,206arena: &'a Arena<IR>,207}208209pub trait ArenaLpIter<'a> {210fn iter(&'a self, root: Node) -> AlpIter<'a>;211}212213impl<'a> ArenaLpIter<'a> for Arena<IR> {214fn iter(&'a self, root: Node) -> AlpIter<'a> {215let stack = unitvec![root];216AlpIter { stack, arena: self }217}218}219220impl<'a> Iterator for AlpIter<'a> {221type Item = (Node, &'a IR);222223fn next(&mut self) -> Option<Self::Item> {224self.stack.pop().map(|node| {225let lp = self.arena.get(node);226lp.copy_inputs(&mut self.stack);227(node, lp)228})229}230}231232233