Path: blob/main/crates/polars-plan/src/plans/iterator.rs
8420 views
use std::sync::Arc;12use polars_core::error::PolarsResult;3use polars_utils::idx_vec::UnitVec;4use polars_utils::unitvec;5use visitor::{RewritingVisitor, TreeWalker};67use crate::prelude::*;89macro_rules! push_expr {10($current_expr:expr, $c:ident, $push:ident, $push_owned:ident, $iter:ident) => {{11use Expr::*;12match $current_expr {13DataTypeFunction(_) | Column(_) | Literal(_) | Len | Element => {},14#[cfg(feature = "dtype-struct")]15Field(_) => {},16Alias(e, _) => $push($c, e),17BinaryExpr { left, op: _, right } => {18// reverse order so that left is popped first19$push($c, right);20$push($c, left);21},22Cast { expr, .. } => $push($c, expr),23Sort { expr, .. } => $push($c, expr),24Gather { expr, idx, .. } => {25$push($c, idx);26$push($c, expr);27},28Filter { input, by } => {29$push($c, by);30// latest, so that it is popped first31$push($c, input);32},33SortBy { expr, by, .. } => {34for e in by {35$push_owned($c, e)36}37// latest, so that it is popped first38$push($c, expr);39},40Agg(agg_e) => {41use AggExpr::*;42match agg_e {43Max { input, .. } => $push($c, input),44Min { input, .. } => $push($c, input),45Mean(e) => $push($c, e),46Median(e) => $push($c, e),47NUnique(e) => $push($c, e),48First(e) => $push($c, e),49FirstNonNull(e) => $push($c, e),50Last(e) => $push($c, e),51LastNonNull(e) => $push($c, e),52Item { input, .. } => $push($c, input),53Implode(e) => $push($c, e),54Count { input, .. } => $push($c, input),55// TODO: shouldn't quantile push the quantile expr as well?56Quantile { expr, .. } => $push($c, expr),57Sum(e) => $push($c, e),58AggGroups(e) => $push($c, e),59Std(e, _) => $push($c, e),60Var(e, _) => $push($c, e),61}62},63Ternary {64truthy,65falsy,66predicate,67} => {68$push($c, predicate);69$push($c, falsy);70// latest, so that it is popped first71$push($c, truthy);72},73// we iterate in reverse order, so that the lhs is popped first and will be found74// as the root columns/ input columns by `_suffix` and `_keep_name` etc.75Display { inputs, .. } => inputs.$iter().rev().for_each(|e| $push_owned($c, e)),76AnonymousFunction { input, .. } => input.$iter().rev().for_each(|e| $push_owned($c, e)),77Eval {78expr, evaluation, ..79} => {80$push($c, evaluation);81$push($c, expr);82},83#[cfg(feature = "dtype-struct")]84StructEval { expr, evaluation } => {85evaluation.$iter().rev().for_each(|e| $push_owned($c, e));86$push($c, expr);87},88Function { input, .. } => input.$iter().rev().for_each(|e| $push_owned($c, e)),89Explode { input, .. } => $push($c, input),90#[cfg(feature = "dynamic_group_by")]91Rolling { function, .. } => $push($c, function),92Over {93function,94partition_by,95order_by,96..97} => {98if let Some((order_by, _)) = order_by {99$push($c, order_by);100}101for e in partition_by.into_iter().rev() {102$push_owned($c, e)103}104// latest so that it is popped first105$push($c, function);106},107Slice {108input,109offset,110length,111} => {112$push($c, length);113$push($c, offset);114// latest, so that it is popped first115$push($c, input);116},117KeepName(e) => $push($c, e),118RenameAlias { expr, .. } => $push($c, expr),119SubPlan { .. } => {},120// pass121Selector(_) => {},122}123}};124}125126pub struct ExprIter<'a> {127stack: UnitVec<&'a Expr>,128}129130impl<'a> Iterator for ExprIter<'a> {131type Item = &'a Expr;132133fn next(&mut self) -> Option<Self::Item> {134self.stack135.pop()136.inspect(|current_expr| current_expr.nodes(&mut self.stack))137}138}139140pub struct ExprMapper<F> {141f: F,142}143144impl<F: FnMut(Expr) -> PolarsResult<Expr>> RewritingVisitor for ExprMapper<F> {145type Node = Expr;146type Arena = ();147148fn mutate(&mut self, node: Self::Node, _arena: &mut Self::Arena) -> PolarsResult<Self::Node> {149(self.f)(node)150}151}152153impl Expr {154pub fn nodes<'a>(&'a self, container: &mut UnitVec<&'a Expr>) {155let push = |c: &mut UnitVec<&'a Expr>, e: &'a Expr| c.push(e);156push_expr!(self, container, push, push, iter);157}158159pub fn nodes_owned(self, container: &mut Vec<Expr>) {160let push_arc = |c: &mut Vec<Expr>, e: Arc<Expr>| c.push(Arc::unwrap_or_clone(e));161let push_owned = |c: &mut Vec<Expr>, e: Expr| c.push(e);162push_expr!(self, container, push_arc, push_owned, into_iter);163}164165pub fn map_expr<F: FnMut(Self) -> Self>(self, mut f: F) -> Self {166self.rewrite(&mut ExprMapper { f: |e| Ok(f(e)) }, &mut ())167.unwrap()168}169170pub fn try_map_expr<F: FnMut(Self) -> PolarsResult<Self>>(self, f: F) -> PolarsResult<Self> {171self.rewrite(&mut ExprMapper { f }, &mut ())172}173}174175impl<'a> IntoIterator for &'a Expr {176type Item = &'a Expr;177type IntoIter = ExprIter<'a>;178179fn into_iter(self) -> Self::IntoIter {180let stack = unitvec!(self);181ExprIter { stack }182}183}184185pub struct AExprIter<'a> {186stack: UnitVec<Node>,187arena: Option<&'a Arena<AExpr>>,188}189190impl<'a> Iterator for AExprIter<'a> {191type Item = (Node, &'a AExpr);192193fn next(&mut self) -> Option<Self::Item> {194self.stack.pop().map(|node| {195// take the arena because the bchk doesn't allow a mutable borrow to the field.196let arena = self.arena.unwrap();197let current_expr = arena.get(node);198// Expressions such as StructEval may reference columns that are not input.199current_expr.children_rev(&mut self.stack);200201self.arena = Some(arena);202(node, current_expr)203})204}205}206207pub trait ArenaExprIter<'a> {208fn iter(&'a self, root: Node) -> AExprIter<'a>;209}210211impl<'a> ArenaExprIter<'a> for Arena<AExpr> {212fn iter(&'a self, root: Node) -> AExprIter<'a> {213let stack = unitvec![root];214AExprIter {215stack,216arena: Some(self),217}218}219}220221pub struct AlpIter<'a> {222stack: UnitVec<Node>,223arena: &'a Arena<IR>,224}225226pub trait ArenaLpIter<'a> {227fn iter(&'a self, root: Node) -> AlpIter<'a>;228}229230impl<'a> ArenaLpIter<'a> for Arena<IR> {231fn iter(&'a self, root: Node) -> AlpIter<'a> {232let stack = unitvec![root];233AlpIter { stack, arena: self }234}235}236237impl<'a> Iterator for AlpIter<'a> {238type Item = (Node, &'a IR);239240fn next(&mut self) -> Option<Self::Item> {241self.stack.pop().map(|node| {242let lp = self.arena.get(node);243lp.copy_inputs(&mut self.stack);244(node, lp)245})246}247}248249250