Path: blob/main/crates/polars-stream/src/physical_plan/lower_expr.rs
8429 views
use std::sync::Arc;12use polars_core::chunked_array::cast::CastOptions;3use polars_core::frame::DataFrame;4use polars_core::prelude::{5DataType, Field, IDX_DTYPE, InitHashMaps, PlHashMap, PlHashSet, PlIndexMap, PlIndexSet,6};7use polars_core::scalar::Scalar;8use polars_core::schema::{Schema, SchemaExt};9use polars_error::PolarsResult;10use polars_expr::state::ExecutionState;11use polars_expr::{ExpressionConversionState, create_physical_expr};12use polars_ops::frame::{JoinArgs, JoinType};13use polars_ops::series::{RLE_LENGTH_COLUMN_NAME, RLE_VALUE_COLUMN_NAME};14use polars_plan::plans::AExpr;15use polars_plan::plans::expr_ir::{ExprIR, OutputName};16use polars_plan::prelude::*;17use polars_utils::arena::{Arena, Node};18use polars_utils::itertools::Itertools;19use polars_utils::pl_str::PlSmallStr;20use polars_utils::{unique_column_name, unitvec};21use slotmap::SlotMap;2223use super::fmt::fmt_exprs;24use super::{PhysNode, PhysNodeKey, PhysNodeKind, PhysStream, StreamingLowerIRContext};25use crate::physical_plan::ZipBehavior;26use crate::physical_plan::lower_group_by::build_group_by_stream;27use crate::physical_plan::lower_ir::{build_filter_stream, build_row_idx_stream};2829type ExprNodeKey = Node;3031pub(crate) struct ExprCache {32is_elementwise: PlHashMap<Node, bool>,33is_input_independent: PlHashMap<Node, bool>,34is_length_preserving: PlHashMap<Node, bool>,35}3637impl ExprCache {38pub fn with_capacity(capacity: usize) -> Self {39Self {40is_elementwise: PlHashMap::with_capacity(capacity),41is_input_independent: PlHashMap::with_capacity(capacity),42is_length_preserving: PlHashMap::with_capacity(capacity),43}44}45}4647struct LowerExprContext<'a> {48prepare_visualization: bool,49expr_arena: &'a mut Arena<AExpr>,50phys_sm: &'a mut SlotMap<PhysNodeKey, PhysNode>,51cache: &'a mut ExprCache,52}5354impl<'a> From<LowerExprContext<'a>> for StreamingLowerIRContext {55fn from(value: LowerExprContext<'a>) -> Self {56Self {57prepare_visualization: value.prepare_visualization,58}59}60}61impl<'a> From<&LowerExprContext<'a>> for StreamingLowerIRContext {62fn from(value: &LowerExprContext<'a>) -> Self {63Self {64prepare_visualization: value.prepare_visualization,65}66}67}6869pub(crate) fn is_fake_elementwise_function(expr: &AExpr) -> bool {70// The in-memory engine treats ApplyList as elementwise but this is not actually71// the case. It doesn't cause any problems for the in-memory engine because of72// how it does the execution but it causes errors for new-streaming.7374// Some other functions are also marked as elementwise for filter pushdown75// but aren't actually elementwise (e.g. arguments aren't same length).76match expr {77AExpr::Function { function, .. } => {78use IRFunctionExpr as F;79match function {80#[cfg(feature = "is_in")]81F::Boolean(IRBooleanFunction::IsIn { .. }) => true,82#[cfg(feature = "replace")]83F::Replace | F::ReplaceStrict { .. } => true,84_ => false,85}86},87_ => false,88}89}9091pub(crate) fn is_elementwise_rec_cached(92expr_key: ExprNodeKey,93arena: &Arena<AExpr>,94cache: &mut ExprCache,95) -> bool {96if !cache.is_elementwise.contains_key(&expr_key) {97cache.is_elementwise.insert(98expr_key,99(|| {100let mut expr_key = expr_key;101let mut stack = unitvec![];102103loop {104let ae = arena.get(expr_key);105106if is_fake_elementwise_function(ae) {107return false;108}109110if !polars_plan::plans::is_elementwise(&mut stack, ae, arena) {111return false;112}113114let Some(next_key) = stack.pop() else {115break;116};117118expr_key = next_key;119}120121true122})(),123);124}125126*cache.is_elementwise.get(&expr_key).unwrap()127}128129#[recursive::recursive]130pub fn is_input_independent_rec(131expr_key: ExprNodeKey,132arena: &Arena<AExpr>,133cache: &mut PlHashMap<ExprNodeKey, bool>,134) -> bool {135if let Some(ret) = cache.get(&expr_key) {136return *ret;137}138139let ret = match arena.get(expr_key) {140// Handled separately in `Eval`.141AExpr::Element => unreachable!(),142AExpr::StructField(_) => false,143AExpr::Explode { expr: inner, .. }144| AExpr::Cast {145expr: inner,146dtype: _,147options: _,148}149| AExpr::Sort {150expr: inner,151options: _,152} => is_input_independent_rec(*inner, arena, cache),153AExpr::Column(_) => false,154155AExpr::Literal(_) => true,156AExpr::BinaryExpr { left, op: _, right } => {157is_input_independent_rec(*left, arena, cache)158&& is_input_independent_rec(*right, arena, cache)159},160AExpr::Gather {161expr,162idx,163returns_scalar: _,164null_on_oob: _,165} => {166is_input_independent_rec(*expr, arena, cache)167&& is_input_independent_rec(*idx, arena, cache)168},169AExpr::SortBy {170expr,171by,172sort_options: _,173} => {174is_input_independent_rec(*expr, arena, cache)175&& by176.iter()177.all(|expr| is_input_independent_rec(*expr, arena, cache))178},179AExpr::Filter { input, by } => {180is_input_independent_rec(*input, arena, cache)181&& is_input_independent_rec(*by, arena, cache)182},183AExpr::Agg(agg_expr) => match agg_expr.get_input() {184polars_plan::plans::NodeInputs::Leaf => true,185polars_plan::plans::NodeInputs::Single(expr) => {186is_input_independent_rec(expr, arena, cache)187},188polars_plan::plans::NodeInputs::Many(exprs) => exprs189.iter()190.all(|expr| is_input_independent_rec(*expr, arena, cache)),191},192AExpr::Ternary {193predicate,194truthy,195falsy,196} => {197is_input_independent_rec(*predicate, arena, cache)198&& is_input_independent_rec(*truthy, arena, cache)199&& is_input_independent_rec(*falsy, arena, cache)200},201AExpr::AnonymousFunction {202input,203function: _,204options: _,205fmt_str: _,206}207| AExpr::AnonymousAgg {208input,209function: _,210fmt_str: _,211}212| AExpr::Function {213input,214function: _,215options: _,216} => input217.iter()218.all(|expr| is_input_independent_rec(expr.node(), arena, cache)),219AExpr::Eval {220expr,221evaluation: _,222variant: _,223} => is_input_independent_rec(*expr, arena, cache),224AExpr::StructEval { expr, evaluation } => {225is_input_independent_rec(*expr, arena, cache)226&& evaluation227.iter()228.all(|expr| is_input_independent_rec(expr.node(), arena, cache))229},230#[cfg(feature = "dynamic_group_by")]231AExpr::Rolling {232function,233index_column,234period: _,235offset: _,236closed_window: _,237} => {238is_input_independent_rec(*function, arena, cache)239&& is_input_independent_rec(*index_column, arena, cache)240},241AExpr::Over {242function,243partition_by,244order_by,245mapping: _,246} => {247is_input_independent_rec(*function, arena, cache)248&& partition_by249.iter()250.all(|expr| is_input_independent_rec(*expr, arena, cache))251&& order_by252.iter()253.all(|(expr, _options)| is_input_independent_rec(*expr, arena, cache))254},255AExpr::Slice {256input,257offset,258length,259} => {260is_input_independent_rec(*input, arena, cache)261&& is_input_independent_rec(*offset, arena, cache)262&& is_input_independent_rec(*length, arena, cache)263},264AExpr::Len => false,265};266267cache.insert(expr_key, ret);268ret269}270271pub fn is_input_independent(272expr_key: ExprNodeKey,273expr_arena: &Arena<AExpr>,274cache: &mut ExprCache,275) -> bool {276is_input_independent_rec(expr_key, expr_arena, &mut cache.is_input_independent)277}278279fn is_input_independent_ctx(expr_key: ExprNodeKey, ctx: &mut LowerExprContext) -> bool {280is_input_independent_rec(281expr_key,282ctx.expr_arena,283&mut ctx.cache.is_input_independent,284)285}286287fn build_input_independent_node_with_ctx(288exprs: &[ExprIR],289ctx: &mut LowerExprContext,290) -> PolarsResult<PhysNodeKey> {291let output_schema = compute_output_schema(&Schema::default(), exprs, ctx.expr_arena)?;292Ok(ctx.phys_sm.insert(PhysNode::new(293output_schema,294PhysNodeKind::InputIndependentSelect {295selectors: exprs.to_vec(),296},297)))298}299300#[recursive::recursive]301pub fn is_length_preserving_rec(302expr_key: ExprNodeKey,303arena: &Arena<AExpr>,304cache: &mut PlHashMap<ExprNodeKey, bool>,305) -> bool {306if let Some(ret) = cache.get(&expr_key) {307return *ret;308}309310let ret = match arena.get(expr_key) {311// Handled separately in `Eval`.312AExpr::Element => unreachable!(),313// Mapped to `Column` in `StructEval`.314AExpr::StructField(_) => unreachable!(),315316AExpr::Gather { .. }317| AExpr::Explode { .. }318| AExpr::Filter { .. }319| AExpr::Agg(_)320| AExpr::Slice { .. }321| AExpr::Len322| AExpr::Literal(_) => false,323324AExpr::Column(_) => true,325326AExpr::Cast {327expr: inner,328dtype: _,329options: _,330}331| AExpr::Sort {332expr: inner,333options: _,334}335| AExpr::SortBy {336expr: inner,337by: _,338sort_options: _,339} => is_length_preserving_rec(*inner, arena, cache),340341AExpr::BinaryExpr { left, op: _, right } => {342// As long as at least one input is length-preserving the other side343// should either broadcast or have the same length.344is_length_preserving_rec(*left, arena, cache)345|| is_length_preserving_rec(*right, arena, cache)346},347AExpr::Ternary {348predicate,349truthy,350falsy,351} => {352is_length_preserving_rec(*predicate, arena, cache)353|| is_length_preserving_rec(*truthy, arena, cache)354|| is_length_preserving_rec(*falsy, arena, cache)355},356AExpr::AnonymousAgg { .. } => false,357AExpr::AnonymousFunction {358input,359function: _,360options,361fmt_str: _,362}363| AExpr::Function {364input,365function: _,366options,367} => {368// TODO: actually inspect the functions? This is overly conservative.369options.is_length_preserving()370&& input371.iter()372.all(|expr| is_length_preserving_rec(expr.node(), arena, cache))373},374AExpr::Eval {375expr,376evaluation: _,377variant: _,378} => is_length_preserving_rec(*expr, arena, cache),379#[cfg(feature = "dynamic_group_by")]380AExpr::Rolling {381function: _,382index_column: _,383period: _,384offset: _,385closed_window: _,386} => true,387AExpr::StructEval {388expr,389evaluation: _,390} => is_length_preserving_rec(*expr, arena, cache),391AExpr::Over {392function: _, // Actually shouldn't matter for window functions.393partition_by: _,394order_by: _,395mapping,396} => !matches!(mapping, WindowMapping::Explode),397};398399cache.insert(expr_key, ret);400ret401}402403#[expect(dead_code)]404pub fn is_length_preserving(405expr_key: ExprNodeKey,406expr_arena: &Arena<AExpr>,407cache: &mut ExprCache,408) -> bool {409is_length_preserving_rec(expr_key, expr_arena, &mut cache.is_length_preserving)410}411412fn is_length_preserving_ctx(expr_key: ExprNodeKey, ctx: &mut LowerExprContext) -> bool {413is_length_preserving_rec(414expr_key,415ctx.expr_arena,416&mut ctx.cache.is_length_preserving,417)418}419420fn build_fallback_node_with_ctx(421input: PhysStream,422exprs: &[ExprIR],423ctx: &mut LowerExprContext,424) -> PolarsResult<PhysNodeKey> {425// Pre-select only the columns that are needed for this fallback expression.426let input_schema = &ctx.phys_sm[input.node].output_schema;427let mut select_names: PlHashSet<_> = exprs428.iter()429.flat_map(|expr| {430polars_plan::utils::aexpr_to_leaf_names_iter(expr.node(), ctx.expr_arena).cloned()431})432.collect();433// To keep the length correct we have to ensure we select at least one434// column.435if select_names.is_empty() {436if let Some(name) = input_schema.iter_names().next() {437select_names.insert(name.clone());438}439}440let input_stream = if input_schema441.iter_names()442.any(|name| !select_names.contains(name))443{444let select_exprs = select_names445.into_iter()446.map(|name| {447ExprIR::new(448ctx.expr_arena.add(AExpr::Column(name.clone())),449OutputName::ColumnLhs(name),450)451})452.collect_vec();453build_select_stream_with_ctx(input, &select_exprs, ctx)?454} else {455input456};457458let output_schema = schema_for_select(input_stream, exprs, ctx)?;459let mut conv_state = ExpressionConversionState::new(false);460let phys_exprs = exprs461.iter()462.map(|expr| {463create_physical_expr(464expr,465ctx.expr_arena,466&ctx.phys_sm[input_stream.node].output_schema,467&mut conv_state,468)469})470.try_collect_vec()?;471let map = move |df| {472let exec_state = ExecutionState::new();473let columns = phys_exprs474.iter()475.map(|phys_expr| phys_expr.evaluate(&df, &exec_state))476.try_collect()?;477478DataFrame::new_infer_broadcast(columns)479};480481let format_str = ctx.prepare_visualization.then(|| {482let mut buffer = String::new();483buffer.push_str("SELECT [\n");484fmt_exprs(485&mut buffer,486exprs,487ctx.expr_arena,488super::fmt::FormatExprStyle::Select,489);490buffer.push(']');491buffer492});493let kind = PhysNodeKind::InMemoryMap {494input: input_stream,495map: Arc::new(map),496format_str,497};498Ok(ctx.phys_sm.insert(PhysNode::new(output_schema, kind)))499}500501fn simplify_input_streams(502orig_input: PhysStream,503mut input_streams: PlIndexSet<PhysStream>,504ctx: &mut LowerExprContext,505) -> PolarsResult<PlIndexSet<PhysStream>> {506// Flatten nested zips (ensures the original input columns only occur once).507if input_streams.len() > 1 {508let mut flattened_input_streams = PlIndexSet::with_capacity(input_streams.len());509for input_stream in input_streams {510if let PhysNodeKind::Zip {511inputs,512zip_behavior: ZipBehavior::Broadcast,513} = &ctx.phys_sm[input_stream.node].kind514{515flattened_input_streams.extend(inputs);516} else {517flattened_input_streams.insert(input_stream);518}519}520input_streams = flattened_input_streams;521}522523// Merge reduce nodes that directly operate on the original input.524let mut combined_exprs = vec![];525input_streams = input_streams526.into_iter()527.filter(|input_stream| {528if let PhysNodeKind::Reduce {529input: inner,530exprs,531} = &ctx.phys_sm[input_stream.node].kind532{533if *inner == orig_input {534combined_exprs.extend(exprs.iter().cloned());535ctx.phys_sm.remove(input_stream.node);536return false;537}538}539true540})541.collect();542if !combined_exprs.is_empty() {543let output_schema = schema_for_select(orig_input, &combined_exprs, ctx)?;544let kind = PhysNodeKind::Reduce {545input: orig_input,546exprs: combined_exprs,547};548let reduce_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));549input_streams.insert(PhysStream::first(reduce_node_key));550}551552Ok(input_streams)553}554555// Assuming that agg_node is a reduction, lowers its input recursively and556// returns a Reduce node as well a node corresponding to the column to select557// from the Reduce node for the aggregate.558fn lower_reduce_node(559input: PhysStream,560agg_node: Node,561ctx: &mut LowerExprContext,562) -> PolarsResult<(PhysStream, Node)> {563let agg_aexpr = ctx.expr_arena.get(agg_node).clone();564let mut agg_input = Vec::with_capacity(1);565agg_aexpr.inputs_rev(&mut agg_input);566agg_input.reverse();567568let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &agg_input, ctx)?;569let trans_agg_node = ctx.expr_arena.add(agg_aexpr.replace_inputs(&trans_exprs));570571let out_name = unique_column_name();572let expr_ir = ExprIR::new(trans_agg_node, OutputName::Alias(out_name.clone()));573let output_schema = schema_for_select(trans_input, std::slice::from_ref(&expr_ir), ctx)?;574let kind = PhysNodeKind::Reduce {575input: trans_input,576exprs: vec![expr_ir],577};578579let reduce_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));580let reduce_stream = PhysStream::first(reduce_node_key);581let out_node = ctx.expr_arena.add(AExpr::Column(out_name));582Ok((reduce_stream, out_node))583}584585// In the recursive lowering we don't bother with named expressions at all, so586// we work directly with Nodes.587#[recursive::recursive]588fn lower_exprs_with_ctx(589input: PhysStream,590exprs: &[Node],591ctx: &mut LowerExprContext,592) -> PolarsResult<(PhysStream, Vec<Node>)> {593// We have to catch this case separately, in case all the input independent expressions are elementwise.594// TODO: we shouldn't always do this when recursing, e.g. pl.col.a.sum() + 1 will still hit this in the recursion.595if exprs.iter().all(|e| is_input_independent_ctx(*e, ctx)) {596let expr_irs = exprs597.iter()598.map(|e| ExprIR::new(*e, OutputName::Alias(unique_column_name())))599.collect_vec();600let node = build_input_independent_node_with_ctx(&expr_irs, ctx)?;601let out_exprs = expr_irs602.iter()603.map(|e| ctx.expr_arena.add(AExpr::Column(e.output_name().clone())))604.collect();605return Ok((PhysStream::first(node), out_exprs));606}607608// Fallback expressions that can directly be applied to the original input.609let mut fallback_subset = Vec::new();610611// Streams containing the columns used for executing transformed expressions.612let mut input_streams = PlIndexSet::new();613614// The final transformed expressions that will be selected from the zipped615// together transformed nodes.616let mut transformed_exprs = Vec::with_capacity(exprs.len());617618for expr in exprs.iter().copied() {619if is_elementwise_rec_cached(expr, ctx.expr_arena, ctx.cache) {620if !is_input_independent_ctx(expr, ctx) {621input_streams.insert(input);622}623transformed_exprs.push(expr);624continue;625}626627match ctx.expr_arena.get(expr).clone() {628// Handled separately in `Eval` expressions.629AExpr::Element => unreachable!(),630// Mapped to `Column` in `StructEval`.631AExpr::StructField(_) => unreachable!(),632633AExpr::Explode {634expr: inner,635options,636} => {637// While explode is streamable, it is not elementwise, so we638// have to transform it to a select node.639let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &[inner], ctx)?;640let exploded_name = unique_column_name();641let trans_inner = ctx.expr_arena.add(AExpr::Explode {642expr: trans_exprs[0],643options,644});645let explode_expr =646ExprIR::new(trans_inner, OutputName::Alias(exploded_name.clone()));647let output_schema =648schema_for_select(trans_input, std::slice::from_ref(&explode_expr), ctx)?;649let node_kind = PhysNodeKind::Select {650input: trans_input,651selectors: vec![explode_expr.clone()],652extend_original: false,653};654let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));655input_streams.insert(PhysStream::first(node_key));656transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(exploded_name)));657},658AExpr::Column(_) => unreachable!("column should always be streamable"),659AExpr::Literal(_) => {660let out_name = unique_column_name();661let inner_expr = ExprIR::new(expr, OutputName::Alias(out_name.clone()));662let node_key = build_input_independent_node_with_ctx(&[inner_expr], ctx)?;663input_streams.insert(PhysStream::first(node_key));664transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));665},666667AExpr::Function {668input: ref inner_exprs,669function: IRFunctionExpr::Repeat,670options: _,671} => {672assert!(inner_exprs.len() == 2);673let out_name = unique_column_name();674let value_expr_ir = inner_exprs[0].with_alias(out_name.clone());675let repeats_expr_ir = inner_exprs[1].clone();676let value_stream = build_select_stream_with_ctx(input, &[value_expr_ir], ctx)?;677let repeats_stream = build_select_stream_with_ctx(input, &[repeats_expr_ir], ctx)?;678679let output_schema = ctx.phys_sm[value_stream.node].output_schema.clone();680let kind = PhysNodeKind::Repeat {681value: value_stream,682repeats: repeats_stream,683};684let repeat_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));685input_streams.insert(PhysStream::first(repeat_node_key));686transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));687},688689AExpr::Function {690input: ref inner_exprs,691function: IRFunctionExpr::ExtendConstant,692options: _,693} => {694assert!(inner_exprs.len() == 3);695let input_schema = &ctx.phys_sm[input.node].output_schema;696let out_name = unique_column_name();697let first_ir = inner_exprs[0].with_alias(out_name.clone());698let out_dtype = first_ir.dtype(input_schema, ctx.expr_arena)?;699let mut value_expr_ir = inner_exprs[1].with_alias(out_name.clone());700let repeats_expr_ir = inner_exprs[2].clone();701702// Cast the value if necessary.703if value_expr_ir.dtype(input_schema, ctx.expr_arena)? != out_dtype {704let cast_expr = AExpr::Cast {705expr: value_expr_ir.node(),706dtype: out_dtype.clone(),707options: CastOptions::NonStrict,708};709value_expr_ir = ExprIR::new(710ctx.expr_arena.add(cast_expr),711OutputName::Alias(out_name.clone()),712);713}714715let first_stream = build_select_stream_with_ctx(input, &[first_ir], ctx)?;716let value_stream = build_select_stream_with_ctx(input, &[value_expr_ir], ctx)?;717let repeats_stream = build_select_stream_with_ctx(input, &[repeats_expr_ir], ctx)?;718719let output_schema = ctx.phys_sm[first_stream.node].output_schema.clone();720let repeat_kind = PhysNodeKind::Repeat {721value: value_stream,722repeats: repeats_stream,723};724let repeat_node_key = ctx725.phys_sm726.insert(PhysNode::new(output_schema.clone(), repeat_kind));727728let concat_kind = PhysNodeKind::OrderedUnion {729inputs: vec![first_stream, PhysStream::first(repeat_node_key)],730};731let concat_node_key = ctx732.phys_sm733.insert(PhysNode::new(output_schema, concat_kind));734input_streams.insert(PhysStream::first(concat_node_key));735transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));736},737738AExpr::Function {739input: ref inner_exprs,740function: IRFunctionExpr::ConcatExpr(_rechunk),741options: _,742} => {743// We have to lower each expression separately as they might have different lengths.744let mut concat_streams = Vec::new();745let out_name = unique_column_name();746for inner_expr in inner_exprs {747let (trans_input, trans_expr) =748lower_exprs_with_ctx(input, &[inner_expr.node()], ctx)?;749let select_expr =750ExprIR::new(trans_expr[0], OutputName::Alias(out_name.clone()));751concat_streams.push(build_select_stream_with_ctx(752trans_input,753&[select_expr],754ctx,755)?);756}757758let output_schema = ctx.phys_sm[concat_streams[0].node].output_schema.clone();759let node_kind = PhysNodeKind::OrderedUnion {760inputs: concat_streams,761};762let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));763input_streams.insert(PhysStream::first(node_key));764transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));765},766767AExpr::Function {768input: ref inner_exprs,769function: IRFunctionExpr::Unique(maintain_order),770options: _,771} => {772assert!(inner_exprs.len() == 1);773// Lower to no-aggregate group-by with unique name.774let tmp_name = unique_column_name();775let (trans_input, trans_inner_exprs) =776lower_exprs_with_ctx(input, &[inner_exprs[0].node()], ctx)?;777let group_by_key_expr =778ExprIR::new(trans_inner_exprs[0], OutputName::Alias(tmp_name.clone()));779let group_by_output_schema =780schema_for_select(trans_input, std::slice::from_ref(&group_by_key_expr), ctx)?;781let group_by_stream = build_group_by_stream(782trans_input,783&[group_by_key_expr],784&[],785group_by_output_schema,786maintain_order,787Arc::new(GroupbyOptions::default()),788None,789ctx.expr_arena,790ctx.phys_sm,791ctx.cache,792StreamingLowerIRContext::from(&*ctx),793false,794)?;795input_streams.insert(group_by_stream);796transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(tmp_name)));797},798799AExpr::Function {800input: ref inner_exprs,801function: IRFunctionExpr::UniqueCounts,802options: _,803} => {804// Transform:805// expr.unique_counts().alias(name)806// ->807// .select(expr.alias(name))808// .group_by(_ = name, maintain_order=True)809// .agg(name = pl.len())810// .select(name)811812assert_eq!(inner_exprs.len(), 1);813814let input_schema = &ctx.phys_sm[input.node].output_schema;815816let key_name = unique_column_name();817let tmp_count_name = unique_column_name();818819let input_expr = &inner_exprs[0];820let output_dtype = input_expr.dtype(input_schema, ctx.expr_arena)?.clone();821let group_by_output_schema = Arc::new(Schema::from_iter([822(key_name.clone(), output_dtype),823(tmp_count_name.clone(), IDX_DTYPE),824]));825826let keys = [input_expr.with_alias(key_name)];827let aggs = [ExprIR::new(828ctx.expr_arena.add(AExpr::Len),829OutputName::Alias(tmp_count_name.clone()),830)];831832let stream = build_group_by_stream(833input,834&keys,835&aggs,836group_by_output_schema,837true,838Default::default(),839None,840ctx.expr_arena,841ctx.phys_sm,842ctx.cache,843StreamingLowerIRContext {844prepare_visualization: ctx.prepare_visualization,845},846false,847)?;848transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(tmp_count_name)));849input_streams.insert(stream);850},851AExpr::Function {852input: ref inner_exprs,853function:854IRFunctionExpr::ValueCounts {855sort: false,856parallel: _,857name: count_name,858normalize: false,859},860options: _,861} => {862// Transform:863// expr.value_counts(864// sort=False,865// parallel=_,866// name=count_name,867// normalize=False868// ).alias(name)869// ->870// .select(expr.alias(name))871// .group_by(name)872// .agg(count_name = pl.len())873// .select(pl.struct([name, count_name]))874875assert_eq!(inner_exprs.len(), 1);876877let input_schema = &ctx.phys_sm[input.node].output_schema;878879let tmp_value_name = unique_column_name();880let tmp_count_name = unique_column_name();881882let input_expr = &inner_exprs[0];883let output_field = input_expr.field(input_schema, ctx.expr_arena)?;884let group_by_output_schema = Arc::new(Schema::from_iter([885output_field.clone().with_name(tmp_value_name.clone()),886Field::new(tmp_count_name.clone(), IDX_DTYPE),887]));888889let keys = [input_expr.with_alias(tmp_value_name.clone())];890let aggs = [ExprIR::new(891ctx.expr_arena.add(AExpr::Len),892OutputName::Alias(tmp_count_name.clone()),893)];894895let stream = build_group_by_stream(896input,897&keys,898&aggs,899group_by_output_schema,900false,901Default::default(),902None,903ctx.expr_arena,904ctx.phys_sm,905ctx.cache,906StreamingLowerIRContext {907prepare_visualization: ctx.prepare_visualization,908},909false,910)?;911912let value = ExprIR::new(913ctx.expr_arena.add(AExpr::Column(tmp_value_name)),914OutputName::Alias(output_field.name),915);916let count = ExprIR::new(917ctx.expr_arena.add(AExpr::Column(tmp_count_name)),918OutputName::Alias(count_name.clone()),919);920921transformed_exprs.push(922AExprBuilder::function(923vec![value, count],924IRFunctionExpr::AsStruct,925ctx.expr_arena,926)927.node(),928);929input_streams.insert(stream);930},931932#[cfg(feature = "mode")]933AExpr::Function {934input: ref inner_exprs,935function: IRFunctionExpr::Mode { maintain_order },936options: _,937} => {938// Transform:939// expr.mode()940// ->941// .select(_t = expr)942// .group_by(_t)943// .agg(count_name = pl.len())944// .select(_t.filter(count_name == count_name.max())945946assert_eq!(inner_exprs.len(), 1);947948let tmp_value_name = unique_column_name();949let tmp_count_name = unique_column_name();950951let stream = build_select_stream_with_ctx(952input,953&[inner_exprs[0].with_alias(tmp_value_name.clone())],954ctx,955)?;956957let mut group_by_output_schema =958ctx.phys_sm[stream.node].output_schema.as_ref().clone();959group_by_output_schema.insert(tmp_count_name.clone(), IDX_DTYPE);960961let keys = [AExprBuilder::col(tmp_value_name.clone(), ctx.expr_arena)962.expr_ir(tmp_value_name.clone())];963let aggs = [ExprIR::new(964ctx.expr_arena.add(AExpr::Len),965OutputName::Alias(tmp_count_name.clone()),966)];967968let stream = build_group_by_stream(969stream,970&keys,971&aggs,972Arc::new(group_by_output_schema),973maintain_order,974Default::default(),975None,976ctx.expr_arena,977ctx.phys_sm,978ctx.cache,979StreamingLowerIRContext {980prepare_visualization: ctx.prepare_visualization,981},982false,983)?;984985let stream = build_select_stream_with_ctx(986stream,987&[AExprBuilder::col(tmp_value_name.clone(), ctx.expr_arena)988.filter(989AExprBuilder::col(tmp_count_name.clone(), ctx.expr_arena).eq(990AExprBuilder::col(tmp_count_name.clone(), ctx.expr_arena)991.max(ctx.expr_arena),992ctx.expr_arena,993),994ctx.expr_arena,995)996.expr_ir(tmp_value_name.clone())],997ctx,998)?;9991000transformed_exprs1001.push(AExprBuilder::col(tmp_value_name.clone(), ctx.expr_arena).node());1002input_streams.insert(stream);1003},10041005AExpr::Function {1006input: ref inner_exprs,1007function: IRFunctionExpr::ArgUnique,1008options: _,1009} => {1010// Transform:1011// expr.arg_unique()1012// ->1013// .with_row_index(IDX)1014// .group_by(expr)1015// .agg(IDX = IDX.first())1016// .select(IDX.sort())10171018assert_eq!(inner_exprs.len(), 1);10191020let expr_name = unique_column_name();1021let idx_name = unique_column_name();10221023let stream = build_select_stream_with_ctx(1024input,1025&[inner_exprs[0].with_alias(expr_name.clone())],1026ctx,1027)?;10281029let mut group_by_output_schema =1030ctx.phys_sm[stream.node].output_schema.as_ref().clone();1031group_by_output_schema.insert(idx_name.clone(), IDX_DTYPE);10321033let stream = build_row_idx_stream(stream, idx_name.clone(), None, ctx.phys_sm);10341035let keys =1036[AExprBuilder::col(expr_name.clone(), ctx.expr_arena).expr_ir(expr_name)];1037let aggs = [AExprBuilder::col(idx_name.clone(), ctx.expr_arena)1038.first(ctx.expr_arena)1039.expr_ir(idx_name.clone())];10401041let stream = build_group_by_stream(1042stream,1043&keys,1044&aggs,1045Arc::new(group_by_output_schema),1046false,1047Default::default(),1048None,1049ctx.expr_arena,1050ctx.phys_sm,1051ctx.cache,1052StreamingLowerIRContext {1053prepare_visualization: ctx.prepare_visualization,1054},1055false,1056)?;10571058let expr = AExprBuilder::col(idx_name.clone(), ctx.expr_arena)1059.sort(Default::default(), ctx.expr_arena)1060.expr_ir(idx_name.clone());1061let stream = build_select_stream_with_ctx(stream, &[expr], ctx)?;10621063transformed_exprs.push(AExprBuilder::col(idx_name.clone(), ctx.expr_arena).node());1064input_streams.insert(stream);1065},10661067#[cfg(feature = "is_in")]1068AExpr::Function {1069input: ref inner_exprs,1070function: IRFunctionExpr::Boolean(IRBooleanFunction::IsIn { nulls_equal }),1071options: _,1072} if is_scalar_ae(inner_exprs[1].node(), ctx.expr_arena) => {1073// Translate left and right side separately (they could have different lengths).10741075use polars_core::prelude::ExplodeOptions;1076let left_on_name = unique_column_name();1077let right_on_name = unique_column_name();1078let (trans_input_left, trans_expr_left) =1079lower_exprs_with_ctx(input, &[inner_exprs[0].node()], ctx)?;1080let right_expr_exploded_node = match ctx.expr_arena.get(inner_exprs[1].node()) {1081// expr.implode().explode() ~= expr (and avoids rechunking)1082AExpr::Agg(IRAggExpr::Implode(n)) => *n,1083_ => AExprBuilder::new_from_node(inner_exprs[1].node())1084.explode(1085ctx.expr_arena,1086ExplodeOptions {1087empty_as_null: false,1088keep_nulls: true,1089},1090)1091.node(),1092};1093let (trans_input_right, trans_expr_right) =1094lower_exprs_with_ctx(input, &[right_expr_exploded_node], ctx)?;10951096// We have to ensure the left input has the right name for the semi-anti-join to1097// generate the correct output name.1098let left_col_expr = ctx.expr_arena.add(AExpr::Column(left_on_name.clone()));1099let left_select_stream = build_select_stream_with_ctx(1100trans_input_left,1101&[ExprIR::new(1102trans_expr_left[0],1103OutputName::Alias(left_on_name.clone()),1104)],1105ctx,1106)?;11071108let node_kind = PhysNodeKind::SemiAntiJoin {1109input_left: left_select_stream,1110input_right: trans_input_right,1111left_on: vec![ExprIR::new(1112left_col_expr,1113OutputName::Alias(left_on_name.clone()),1114)],1115right_on: vec![ExprIR::new(1116trans_expr_right[0],1117OutputName::Alias(right_on_name),1118)],1119args: JoinArgs {1120how: JoinType::Semi,1121validation: Default::default(),1122suffix: None,1123slice: None,1124nulls_equal,1125coalesce: Default::default(),1126maintain_order: Default::default(),1127build_side: None,1128},1129output_bool: true,1130};11311132// SemiAntiJoin with output_bool returns a column with the same name as the first1133// input column.1134let output_schema = Schema::from_iter([(left_on_name.clone(), DataType::Boolean)]);1135let node_key = ctx1136.phys_sm1137.insert(PhysNode::new(Arc::new(output_schema), node_kind));1138input_streams.insert(PhysStream::first(node_key));1139transformed_exprs.push(left_col_expr);1140},11411142#[cfg(feature = "cum_agg")]1143ref agg_expr @ AExpr::Function {1144input: ref inner_exprs,1145function:1146ref function @ (IRFunctionExpr::CumMin { reverse }1147| IRFunctionExpr::CumMax { reverse }1148| IRFunctionExpr::CumSum { reverse }1149| IRFunctionExpr::CumCount { reverse }1150| IRFunctionExpr::CumProd { reverse }),1151options: _,1152} if !reverse => {1153use crate::nodes::cum_agg::CumAggKind;11541155assert_eq!(inner_exprs.len(), 1);11561157let input_schema = &ctx.phys_sm[input.node].output_schema;11581159let value_key = unique_column_name();1160let value_dtype =1161agg_expr.to_dtype(&ToFieldContext::new(ctx.expr_arena, input_schema))?;11621163let input = build_select_stream_with_ctx(1164input,1165&[inner_exprs[0].with_alias(value_key.clone())],1166ctx,1167)?;1168let kind = match function {1169IRFunctionExpr::CumMin { .. } => CumAggKind::Min,1170IRFunctionExpr::CumMax { .. } => CumAggKind::Max,1171IRFunctionExpr::CumSum { .. } => CumAggKind::Sum,1172IRFunctionExpr::CumCount { .. } => CumAggKind::Count,1173IRFunctionExpr::CumProd { .. } => CumAggKind::Prod,1174_ => unreachable!(),1175};1176let node_kind = PhysNodeKind::CumAgg { input, kind };11771178let output_schema = Schema::from_iter([(value_key.clone(), value_dtype.clone())]);1179let node_key = ctx1180.phys_sm1181.insert(PhysNode::new(Arc::new(output_schema), node_kind));1182input_streams.insert(PhysStream::first(node_key));1183transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(value_key)));1184},11851186#[cfg(feature = "diff")]1187AExpr::Function {1188input: ref inner_exprs,1189function: IRFunctionExpr::Diff(null_behavior),1190options: _,1191} => {1192use polars_core::scalar::Scalar;1193use polars_core::series::ops::NullBehavior;11941195assert_eq!(inner_exprs.len(), 2);11961197// Transform:1198// expr.diff(offset, "ignore")1199// ->1200// expr - expr.shift(offset)12011202let base_expr_ir = &inner_exprs[0];1203let base_dtype =1204base_expr_ir.dtype(&ctx.phys_sm[input.node].output_schema, ctx.expr_arena)?;1205let offset_expr_ir = &inner_exprs[1];1206let offset_dtype =1207offset_expr_ir.dtype(&ctx.phys_sm[input.node].output_schema, ctx.expr_arena)?;12081209let mut base = AExprBuilder::new_from_node(base_expr_ir.node());1210let cast_dtype = match base_dtype {1211DataType::UInt8 => Some(DataType::Int16),1212DataType::UInt16 => Some(DataType::Int32),1213DataType::UInt32 | DataType::UInt64 => Some(DataType::Int64),1214_ => None,1215};1216if let Some(dtype) = cast_dtype {1217base = base.cast(dtype, ctx.expr_arena);1218}12191220let mut offset = AExprBuilder::new_from_node(offset_expr_ir.node());1221if offset_dtype != &DataType::Int64 {1222offset = offset.cast(DataType::Int64, ctx.expr_arena);1223}12241225let shifted = base.shift(offset.node(), ctx.expr_arena);1226let mut output = base.minus(shifted.node(), ctx.expr_arena);12271228if null_behavior == NullBehavior::Drop {1229// Without the column size, slice can only remove leading nulls.1230// So if the offset was negative, the nulls appeared at the end of the column.1231// In that case, shift the column forward to move the nulls back to the front.1232let zero_literal =1233AExprBuilder::lit(LiteralValue::new_idxsize(0), ctx.expr_arena);1234let offset_neg = offset.negate(ctx.expr_arena);1235let offset_if_negative = AExprBuilder::function(1236vec![offset_neg.expr_ir_unnamed(), zero_literal.expr_ir_unnamed()],1237IRFunctionExpr::MaxHorizontal,1238ctx.expr_arena,1239);1240output = output.shift(offset_if_negative, ctx.expr_arena);12411242// Remove the nulls that were introduced by the shift1243let offset_abs = offset.abs(ctx.expr_arena);1244let null_literal = AExprBuilder::lit(1245LiteralValue::Scalar(Scalar::null(DataType::Int64)),1246ctx.expr_arena,1247);1248output = output.slice(offset_abs, null_literal, ctx.expr_arena);1249}12501251let (stream, nodes) = lower_exprs_with_ctx(input, &[output.node()], ctx)?;1252input_streams.insert(stream);1253transformed_exprs.extend(nodes);1254},12551256AExpr::Function {1257input: ref inner_exprs,1258function: IRFunctionExpr::RLE,1259options: _,1260} => {1261assert_eq!(inner_exprs.len(), 1);12621263let input_schema = &ctx.phys_sm[input.node].output_schema;12641265let value_key = unique_column_name();1266let value_dtype = inner_exprs[0].dtype(input_schema, ctx.expr_arena)?;12671268let input = build_select_stream_with_ctx(1269input,1270&[inner_exprs[0].with_alias(value_key.clone())],1271ctx,1272)?;1273let node_kind = PhysNodeKind::Rle(input);12741275let output_schema = Schema::from_iter([(1276value_key.clone(),1277DataType::Struct(vec![1278Field::new(1279PlSmallStr::from_static(RLE_VALUE_COLUMN_NAME),1280value_dtype.clone(),1281),1282Field::new(PlSmallStr::from_static(RLE_LENGTH_COLUMN_NAME), IDX_DTYPE),1283]),1284)]);1285let node_key = ctx1286.phys_sm1287.insert(PhysNode::new(Arc::new(output_schema), node_kind));1288input_streams.insert(PhysStream::first(node_key));1289transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(value_key)));1290},12911292AExpr::Function {1293input: ref inner_exprs,1294function: IRFunctionExpr::RLEID,1295options: _,1296} => {1297assert_eq!(inner_exprs.len(), 1);12981299let value_key = unique_column_name();13001301let input = build_select_stream_with_ctx(1302input,1303&[inner_exprs[0].with_alias(value_key.clone())],1304ctx,1305)?;1306let node_kind = PhysNodeKind::RleId(input);13071308let output_schema = Schema::from_iter([(value_key.clone(), IDX_DTYPE)]);1309let node_key = ctx1310.phys_sm1311.insert(PhysNode::new(Arc::new(output_schema), node_kind));1312input_streams.insert(PhysStream::first(node_key));1313transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(value_key.clone())));1314},13151316AExpr::Function {1317input: ref inner_exprs,1318function: IRFunctionExpr::GatherEvery { n, offset },1319options: _,1320} => {1321assert_eq!(inner_exprs.len(), 1);13221323let value_key = unique_column_name();13241325let input = build_select_stream_with_ctx(1326input,1327&[inner_exprs[0].with_alias(value_key.clone())],1328ctx,1329)?;1330let node_kind = PhysNodeKind::GatherEvery { input, n, offset };13311332let output_schema = ctx.phys_sm[input.node].output_schema.clone();1333let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));1334input_streams.insert(PhysStream::first(node_key));1335transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(value_key.clone())));1336},13371338AExpr::Function {1339input: ref inner_exprs,1340function: ref function @ (IRFunctionExpr::PeakMin | IRFunctionExpr::PeakMax),1341options: _,1342} => {1343assert_eq!(inner_exprs.len(), 1);13441345let value_key = unique_column_name();13461347let input = build_select_stream_with_ctx(1348input,1349&[inner_exprs[0].with_alias(value_key.clone())],1350ctx,1351)?;1352let is_peak_max = matches!(function, IRFunctionExpr::PeakMax);1353let node_kind = PhysNodeKind::PeakMinMax { input, is_peak_max };13541355let output_schema = Schema::from_iter([(value_key.clone(), DataType::Boolean)]);1356let node_key = ctx1357.phys_sm1358.insert(PhysNode::new(Arc::new(output_schema), node_kind));1359input_streams.insert(PhysStream::first(node_key));1360transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(value_key.clone())));1361},13621363// pl.row_index() maps to this.1364#[cfg(feature = "range")]1365AExpr::Function {1366input: ref inner_exprs,1367function: IRFunctionExpr::Range(IRRangeFunction::IntRange { step: 1, dtype }),1368options: _,1369} if {1370let start_is_zero = match ctx.expr_arena.get(inner_exprs[0].node()) {1371AExpr::Literal(lit) => lit.extract_usize().ok() == Some(0),1372_ => false,1373};1374let stop_is_len = matches!(ctx.expr_arena.get(inner_exprs[1].node()), AExpr::Len);13751376dtype == DataType::IDX_DTYPE && start_is_zero && stop_is_len1377} =>1378{1379let out_name = unique_column_name();1380let row_idx_col_aexpr = ctx.expr_arena.add(AExpr::Column(out_name.clone()));1381let row_idx_col_expr_ir =1382ExprIR::new(row_idx_col_aexpr, OutputName::ColumnLhs(out_name.clone()));1383let row_idx_stream = build_select_stream_with_ctx(1384build_row_idx_stream(input, out_name, None, ctx.phys_sm),1385&[row_idx_col_expr_ir],1386ctx,1387)?;1388input_streams.insert(row_idx_stream);1389transformed_exprs.push(row_idx_col_aexpr);1390},13911392#[cfg(feature = "range")]1393AExpr::Function {1394input: ref inner_exprs,1395function: IRFunctionExpr::Range(IRRangeFunction::IntRange { step: 1, dtype }),1396options: _,1397} if {1398let start_is_zero = match ctx.expr_arena.get(inner_exprs[0].node()) {1399AExpr::Literal(lit) => lit.extract_usize().ok() == Some(0),1400_ => false,1401};1402let stop_is_count = matches!(1403ctx.expr_arena.get(inner_exprs[1].node()),1404AExpr::Agg(IRAggExpr::Count { .. })1405);14061407start_is_zero && stop_is_count1408} =>1409{1410let AExpr::Agg(IRAggExpr::Count {1411input: input_expr,1412include_nulls,1413}) = ctx.expr_arena.get(inner_exprs[1].node())1414else {1415unreachable!();1416};1417let (input_expr, include_nulls) = (*input_expr, *include_nulls);14181419let out_name = unique_column_name();1420let mut row_idx_col_aexpr = ctx.expr_arena.add(AExpr::Column(out_name.clone()));1421if dtype != IDX_DTYPE {1422row_idx_col_aexpr = AExprBuilder::new_from_node(row_idx_col_aexpr)1423.cast(dtype, ctx.expr_arena)1424.node();1425}1426let row_idx_col_expr_ir =1427ExprIR::new(row_idx_col_aexpr, OutputName::ColumnLhs(out_name.clone()));14281429let mut input_expr = AExprBuilder::new_from_node(input_expr);1430if !include_nulls {1431input_expr = input_expr.drop_nulls(ctx.expr_arena);1432}1433let input_expr = input_expr.expr_ir_retain_name(ctx.expr_arena);14341435let row_idx_stream = build_select_stream_with_ctx(1436build_row_idx_stream(1437build_select_stream_with_ctx(input, &[input_expr], ctx)?,1438out_name,1439None,1440ctx.phys_sm,1441),1442&[row_idx_col_expr_ir],1443ctx,1444)?;1445input_streams.insert(row_idx_stream);1446transformed_exprs.push(row_idx_col_aexpr);1447},14481449// Lower arbitrary elementwise functions.1450ref node @ AExpr::Function {1451input: ref inner_exprs,1452options,1453..1454}1455| ref node @ AExpr::AnonymousFunction {1456input: ref inner_exprs,1457options,1458..1459} if options.is_elementwise() && !is_fake_elementwise_function(node) => {1460let inner_nodes = inner_exprs.iter().map(|expr| expr.node()).collect_vec();1461let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &inner_nodes, ctx)?;14621463// The function may be sensitive to names (e.g. pl.struct), so we restore them.1464let new_input = trans_exprs1465.into_iter()1466.zip(inner_exprs)1467.map(|(trans, orig)| {1468ExprIR::new(trans, OutputName::Alias(orig.output_name().clone()))1469})1470.collect_vec();1471let mut new_node = node.clone();1472match &mut new_node {1473AExpr::Function { input, .. } | AExpr::AnonymousFunction { input, .. } => {1474*input = new_input;1475},1476_ => unreachable!(),1477}1478input_streams.insert(trans_input);1479transformed_exprs.push(ctx.expr_arena.add(new_node));1480},14811482// Lower arbitrary row-separable functions.1483ref node @ AExpr::Function {1484input: ref inner_exprs,1485ref function,1486options,1487} if options.is_row_separable() && !is_fake_elementwise_function(node) => {1488// While these functions are streamable, they are not elementwise, so we1489// have to transform them to a select node.1490let inner_nodes = inner_exprs.iter().map(|x| x.node()).collect_vec();1491let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &inner_nodes, ctx)?;1492let out_name = unique_column_name();1493let trans_inner = ctx.expr_arena.add(AExpr::Function {1494input: trans_exprs1495.iter()1496.map(|node| ExprIR::from_node(*node, ctx.expr_arena))1497.collect(),1498function: function.clone(),1499options,1500});1501let func_expr = ExprIR::new(trans_inner, OutputName::Alias(out_name.clone()));1502let output_schema =1503schema_for_select(trans_input, std::slice::from_ref(&func_expr), ctx)?;1504let node_kind = PhysNodeKind::Select {1505input: trans_input,1506selectors: vec![func_expr.clone()],1507extend_original: false,1508};1509let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));1510input_streams.insert(PhysStream::first(node_key));1511transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));1512},15131514AExpr::BinaryExpr { left, op, right } => {1515let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &[left, right], ctx)?;1516let bin_expr = AExpr::BinaryExpr {1517left: trans_exprs[0],1518op,1519right: trans_exprs[1],1520};1521input_streams.insert(trans_input);1522transformed_exprs.push(ctx.expr_arena.add(bin_expr));1523},1524AExpr::Eval {1525expr: inner,1526evaluation,1527variant,1528} => match variant {1529EvalVariant::List1530| EvalVariant::ListAgg1531| EvalVariant::Array { as_list: _ }1532| EvalVariant::ArrayAgg => {1533let (trans_input, trans_expr) = lower_exprs_with_ctx(input, &[inner], ctx)?;1534let eval_expr = AExpr::Eval {1535expr: trans_expr[0],1536evaluation,1537variant,1538};1539input_streams.insert(trans_input);1540transformed_exprs.push(ctx.expr_arena.add(eval_expr));1541},1542EvalVariant::Cumulative { .. } => {1543// Cumulative is not elementwise, this would need a special node.1544let out_name = unique_column_name();1545fallback_subset.push(ExprIR::new(expr, OutputName::Alias(out_name.clone())));1546transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));1547},1548},1549AExpr::StructEval {1550expr: inner,1551mut evaluation,1552} => {1553// Transform (simplified):1554// expr.struct.with_fields(evaluation).alias(name)1555// ->1556// .select(expr)1557// .with_columns(validity = expr.is_not_null())1558// .map(df.struct.unnest()))1559// .with_columns([evaluation])1560// .select(pl.when(validity).then(as_struct()).alias(name)1561//1562// Any reference to `StructField(x)` gets remapped to `Column(PREFIX_x)` prior to1563// calling `unnest()`, with PREFIX being unique for each StructEval expression.15641565// Evaluate input `expr` and capture `col` references from `evaluation`1566let out_name = unique_column_name();1567let inner_expr_ir = ExprIR::new(inner, OutputName::Alias(out_name.clone()));1568let mut expr_irs = Vec::new();1569expr_irs.push(inner_expr_ir);15701571// Any column expression inside evaluation must be added explicitly.1572let eval_col_names: PlHashSet<_> = evaluation1573.iter()1574.flat_map(|expr| {1575polars_plan::utils::aexpr_to_leaf_names_iter(expr.node(), ctx.expr_arena)1576})1577.cloned()1578.collect();1579for name in eval_col_names {1580expr_irs.push(ExprIR::new(1581ctx.expr_arena.add(AExpr::Column(name.clone())),1582OutputName::ColumnLhs(name),1583));1584}1585let stream = build_select_stream_with_ctx(input, &expr_irs, ctx)?;15861587// Capture validity as an extra column.1588let validity_name = polars_utils::format_pl_smallstr!(1589"{}{}",1590out_name,1591PlSmallStr::from_static("_VLD")1592);1593let validity_input_node = ctx.expr_arena.add(AExpr::Column(out_name.clone()));1594let validity_expr_ir = ExprIR::new(1595validity_input_node,1596OutputName::Alias(validity_name.clone()),1597);1598let validity_expr = AExprBuilder::function(1599vec![validity_expr_ir],1600IRFunctionExpr::Boolean(IRBooleanFunction::IsNotNull),1601ctx.expr_arena,1602);1603let validity_node = validity_expr.node();1604let validity_expr_ir =1605ExprIR::new(validity_node, OutputName::Alias(validity_name.clone()));1606let stream = build_hstack_stream(1607stream,1608&[validity_expr_ir],1609ctx.expr_arena,1610ctx.phys_sm,1611ctx.cache,1612StreamingLowerIRContext {1613prepare_visualization: ctx.prepare_visualization,1614},1615)?;16161617// Rewrite any `StructField(x)`` expression into a `Col(prefix_x)`` expression.1618let separator = PlSmallStr::from_static("_FLD_");1619let field_prefix = polars_utils::format_pl_smallstr!("{}{}", out_name, separator);1620evaluation.iter_mut().for_each(|e| {1621e.set_node(structfield_to_column(1622e.node(),1623ctx.expr_arena,1624&field_prefix,1625))1626});16271628// Unnest.1629let unnest_fn = FunctionIR::Unnest {1630columns: Arc::new([out_name.clone()]),1631separator: Some(separator.clone()),1632};1633let input_schema = ctx.phys_sm[stream.node].output_schema.clone();1634let output_schema = unnest_fn.schema(&input_schema)?.into_owned();1635let format_str = ctx.prepare_visualization.then(|| {1636format!(1637"UNNEST columns: [{}], separator: \"{}\"",1638out_name.as_str(),1639separator.as_str()1640)1641});1642let map = Arc::new(move |df| unnest_fn.evaluate(df));1643let node_kind = PhysNodeKind::Map {1644input: stream,1645map,1646format_str,1647};1648let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));1649let stream = PhysStream::first(node_key);16501651// Evaluate `evaluation`, using `with_columns`.1652// This requires output names to be prefixed, as they refer to the local StructField namespace.1653// Note, native columns are still included in the stream but could be dropped (nice-to-have).1654evaluation.iter_mut().for_each(|e| {1655*e = e.with_alias(polars_utils::format_pl_smallstr!(1656"{}{}",1657&field_prefix,1658e.output_name()1659));1660});1661let stream = build_hstack_stream(1662stream,1663&evaluation,1664ctx.expr_arena,1665ctx.phys_sm,1666ctx.cache,1667StreamingLowerIRContext {1668prepare_visualization: ctx.prepare_visualization,1669},1670)?;16711672// Nest any column that belongs to the StructField namespace back into a Struct.1673let mut fields_expr_irs = Vec::new();1674let eval_schema = ctx.phys_sm[stream.node].output_schema.clone();1675for (name, _) in eval_schema.iter() {1676if let Some(stripped_name) = name.strip_prefix(field_prefix.as_str()) {1677let node = ctx.expr_arena.add(AExpr::Column(name.clone()));1678fields_expr_irs.push(1679ExprIR::from_node(node, ctx.expr_arena)1680.with_alias(PlSmallStr::from_str(stripped_name)),1681);1682}1683}1684let as_struct_expr = AExprBuilder::function(1685fields_expr_irs,1686IRFunctionExpr::AsStruct,1687ctx.expr_arena,1688);1689let as_struct_node = as_struct_expr.node();16901691// Apply validity.1692let with_validity = AExprBuilder::when_then_otherwise(1693AExprBuilder::col(validity_name.clone(), ctx.expr_arena),1694AExprBuilder::new_from_node(as_struct_node),1695AExprBuilder::lit(1696LiteralValue::Scalar(Scalar::null(DataType::Null)),1697ctx.expr_arena,1698),1699ctx.expr_arena,1700);1701let with_validity_node = with_validity.node();1702let validity_expr_ir =1703ExprIR::new(with_validity_node, OutputName::Alias(out_name.clone()));1704let stream = build_select_stream_with_ctx(stream, &[validity_expr_ir], ctx)?;1705let exit_node = ctx.expr_arena.add(AExpr::Column(out_name.clone()));17061707// Finalize.1708input_streams.insert(stream);1709transformed_exprs.push(exit_node);1710},1711AExpr::Ternary {1712predicate,1713truthy,1714falsy,1715} => {1716let (trans_input, trans_exprs) =1717lower_exprs_with_ctx(input, &[predicate, truthy, falsy], ctx)?;1718let tern_expr = AExpr::Ternary {1719predicate: trans_exprs[0],1720truthy: trans_exprs[1],1721falsy: trans_exprs[2],1722};1723input_streams.insert(trans_input);1724transformed_exprs.push(ctx.expr_arena.add(tern_expr));1725},1726AExpr::Cast {1727expr: inner,1728dtype,1729options,1730} => {1731let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &[inner], ctx)?;1732input_streams.insert(trans_input);1733transformed_exprs.push(ctx.expr_arena.add(AExpr::Cast {1734expr: trans_exprs[0],1735dtype,1736options,1737}));1738},1739AExpr::Sort {1740expr: inner,1741options,1742} => {1743// As we'll refer to the sorted column twice, ensure the inner1744// expr is available as a column by selecting first.1745let sorted_name = unique_column_name();1746let inner_expr_ir = ExprIR::new(inner, OutputName::Alias(sorted_name.clone()));1747let select_stream =1748build_select_stream_with_ctx(input, std::slice::from_ref(&inner_expr_ir), ctx)?;1749let col_expr = ctx.expr_arena.add(AExpr::Column(sorted_name.clone()));1750let kind = PhysNodeKind::Sort {1751input: select_stream,1752by_column: vec![ExprIR::new(col_expr, OutputName::Alias(sorted_name))],1753slice: None,1754sort_options: (&options).into(),1755};1756let output_schema = ctx.phys_sm[select_stream.node].output_schema.clone();1757let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));1758input_streams.insert(PhysStream::first(node_key));1759transformed_exprs.push(col_expr);1760},17611762AExpr::SortBy {1763expr: inner,1764by,1765sort_options,1766} => {1767// Select our inputs (if we don't do this we'll waste time sorting irrelevant columns).1768let sorted_name = unique_column_name();1769let by_names = by.iter().map(|_| unique_column_name()).collect_vec();1770let all_inner_expr_irs = [(&sorted_name, inner)]1771.into_iter()1772.chain(by_names.iter().zip(by.iter().copied()))1773.map(|(name, inner)| ExprIR::new(inner, OutputName::Alias(name.clone())))1774.collect_vec();1775let select_stream = build_select_stream_with_ctx(input, &all_inner_expr_irs, ctx)?;17761777// Sort the inputs.1778let kind = PhysNodeKind::Sort {1779input: select_stream,1780by_column: by_names1781.into_iter()1782.map(|name| {1783ExprIR::new(1784ctx.expr_arena.add(AExpr::Column(name.clone())),1785OutputName::Alias(name),1786)1787})1788.collect(),1789slice: None,1790sort_options,1791};1792let output_schema = ctx.phys_sm[select_stream.node].output_schema.clone();1793let sort_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));17941795let sorted_col_expr = ctx.expr_arena.add(AExpr::Column(sorted_name.clone()));1796input_streams.insert(PhysStream::first(sort_node_key));1797transformed_exprs.push(sorted_col_expr);1798},17991800#[cfg(feature = "top_k")]1801AExpr::Function {1802input: inner_exprs,1803function: function @ (IRFunctionExpr::TopK { .. } | IRFunctionExpr::TopKBy { .. }),1804options: _,1805} => {1806// Select our inputs.1807let by = &inner_exprs[2..];1808let out_name = unique_column_name();1809let by_names = by.iter().map(|_| unique_column_name()).collect_vec();1810let data_irs = [(&out_name, &inner_exprs[0])]1811.into_iter()1812.chain(by_names.iter().zip(by.iter()))1813.map(|(name, inner)| ExprIR::new(inner.node(), OutputName::Alias(name.clone())))1814.collect_vec();1815let data_stream = build_select_stream_with_ctx(input, &data_irs, ctx)?;1816let k_stream = build_select_stream_with_ctx(input, &inner_exprs[1..2], ctx)?;18171818// Create 'by' column expressions.1819let out_col_node = ctx.expr_arena.add(AExpr::Column(out_name.clone()));1820let out_col_expr = ExprIR::new(out_col_node, OutputName::Alias(out_name));1821let (by_column, reverse) = match function {1822IRFunctionExpr::TopK { descending } => {1823(vec![out_col_expr.clone()], vec![descending])1824},1825IRFunctionExpr::TopKBy {1826descending: reverse,1827} => {1828let by_column = by_names1829.into_iter()1830.map(|name| {1831ExprIR::new(1832ctx.expr_arena.add(AExpr::Column(name.clone())),1833OutputName::Alias(name),1834)1835})1836.collect();1837(by_column, reverse.clone())1838},1839_ => unreachable!(),1840};18411842let kind = PhysNodeKind::TopK {1843input: data_stream,1844k: k_stream,1845nulls_last: vec![true; by_column.len()],1846reverse,1847by_column,1848};1849let output_schema = ctx.phys_sm[data_stream.node].output_schema.clone();1850let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));1851input_streams.insert(PhysStream::first(node_key));1852transformed_exprs.push(out_col_node);1853},18541855AExpr::Filter { input: inner, by } => {1856// Select our inputs (if we don't do this we'll waste time filtering irrelevant columns).1857let out_name = unique_column_name();1858let by_name = unique_column_name();1859let inner_expr_ir = ExprIR::new(inner, OutputName::Alias(out_name.clone()));1860let by_expr_ir = ExprIR::new(by, OutputName::Alias(by_name.clone()));1861let select_stream =1862build_select_stream_with_ctx(input, &[inner_expr_ir, by_expr_ir], ctx)?;18631864// Add a filter node.1865let predicate = ExprIR::new(1866ctx.expr_arena.add(AExpr::Column(by_name.clone())),1867OutputName::Alias(by_name),1868);1869let kind = PhysNodeKind::Filter {1870input: select_stream,1871predicate,1872};1873let output_schema = ctx.phys_sm[select_stream.node].output_schema.clone();1874let filter_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));1875input_streams.insert(PhysStream::first(filter_node_key));1876transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));1877},18781879AExpr::AnonymousAgg {1880input: _,1881fmt_str: _,1882function: _,1883} => {1884let (trans_stream, trans_expr) = lower_reduce_node(input, expr, ctx)?;1885input_streams.insert(trans_stream);1886transformed_exprs.push(trans_expr);1887},1888// Aggregates.1889AExpr::Agg(agg) => match agg {1890// Change agg mutably so we can share the codepath for all of these.1891IRAggExpr::Min { .. }1892| IRAggExpr::Max { .. }1893| IRAggExpr::First(_)1894| IRAggExpr::FirstNonNull(_)1895| IRAggExpr::Last(_)1896| IRAggExpr::LastNonNull(_)1897| IRAggExpr::Item { .. }1898| IRAggExpr::Sum(_)1899| IRAggExpr::Mean(_)1900| IRAggExpr::Var { .. }1901| IRAggExpr::Std { .. }1902| IRAggExpr::Count { .. } => {1903let (trans_stream, trans_expr) = lower_reduce_node(input, expr, ctx)?;1904input_streams.insert(trans_stream);1905transformed_exprs.push(trans_expr);1906},1907IRAggExpr::NUnique(inner) => {1908// Lower to no-aggregate group-by with unique name feeding into len aggregate.1909let tmp_name = unique_column_name();1910let (trans_input, trans_inner_exprs) =1911lower_exprs_with_ctx(input, &[inner], ctx)?;1912let group_by_key_expr =1913ExprIR::new(trans_inner_exprs[0], OutputName::Alias(tmp_name.clone()));1914let group_by_output_schema = schema_for_select(1915trans_input,1916std::slice::from_ref(&group_by_key_expr),1917ctx,1918)?;1919let group_by_stream = build_group_by_stream(1920trans_input,1921&[group_by_key_expr],1922&[],1923group_by_output_schema,1924false,1925Arc::new(GroupbyOptions::default()),1926None,1927ctx.expr_arena,1928ctx.phys_sm,1929ctx.cache,1930StreamingLowerIRContext::from(&*ctx),1931false,1932)?;19331934let len_node = ctx.expr_arena.add(AExpr::Len);1935let len_expr_ir = ExprIR::new(len_node, OutputName::Alias(tmp_name.clone()));1936let output_schema = schema_for_select(1937group_by_stream,1938std::slice::from_ref(&len_expr_ir),1939ctx,1940)?;1941let kind = PhysNodeKind::Reduce {1942input: group_by_stream,1943exprs: vec![len_expr_ir],1944};19451946let reduce_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));1947input_streams.insert(PhysStream::first(reduce_node_key));1948transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(tmp_name)));1949},1950IRAggExpr::Median(_)1951| IRAggExpr::Implode(_)1952| IRAggExpr::Quantile { .. }1953| IRAggExpr::AggGroups(_) => {1954let out_name = unique_column_name();1955fallback_subset.push(ExprIR::new(expr, OutputName::Alias(out_name.clone())));1956transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));1957},1958},19591960#[cfg(feature = "bitwise")]1961AExpr::Function {1962function:1963IRFunctionExpr::Bitwise(1964IRBitwiseFunction::And | IRBitwiseFunction::Or | IRBitwiseFunction::Xor,1965),1966..1967} => {1968let (trans_stream, trans_expr) = lower_reduce_node(input, expr, ctx)?;1969input_streams.insert(trans_stream);1970transformed_exprs.push(trans_expr);1971},19721973#[cfg(feature = "approx_unique")]1974AExpr::Function {1975function: IRFunctionExpr::ApproxNUnique,1976..1977} => {1978let (trans_stream, trans_expr) = lower_reduce_node(input, expr, ctx)?;1979input_streams.insert(trans_stream);1980transformed_exprs.push(trans_expr);1981},19821983AExpr::Function {1984function:1985IRFunctionExpr::Boolean(1986IRBooleanFunction::Any { .. } | IRBooleanFunction::All { .. },1987)1988| IRFunctionExpr::MinBy1989| IRFunctionExpr::MaxBy1990| IRFunctionExpr::NullCount,1991..1992} => {1993let (trans_stream, trans_expr) = lower_reduce_node(input, expr, ctx)?;1994input_streams.insert(trans_stream);1995transformed_exprs.push(trans_expr);1996},19971998// Length-based expressions.1999AExpr::Len => {2000let out_name = unique_column_name();2001let expr_ir = ExprIR::new(expr, OutputName::Alias(out_name.clone()));2002let output_schema = schema_for_select(input, std::slice::from_ref(&expr_ir), ctx)?;2003let kind = PhysNodeKind::Reduce {2004input,2005exprs: vec![expr_ir],2006};2007let reduce_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));2008input_streams.insert(PhysStream::first(reduce_node_key));2009transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));2010},20112012AExpr::Function {2013input: ref inner_exprs,2014function: IRFunctionExpr::ArgWhere,2015options: _,2016} => {2017// pl.arg_where(expr)2018//2019// ->2020// .select(predicate_name = expr)2021// .with_row_index(out_name)2022// .filter(predicate_name)2023// .select(out_name)2024let out_name = unique_column_name();2025let predicate_name = unique_column_name();2026let predicate = build_select_stream_with_ctx(2027input,2028&[inner_exprs[0].with_alias(predicate_name.clone())],2029ctx,2030)?;2031let row_index =2032build_row_idx_stream(predicate, out_name.clone(), None, ctx.phys_sm);20332034let filter_stream = build_filter_stream(2035row_index,2036AExprBuilder::col(predicate_name.clone(), ctx.expr_arena)2037.expr_ir(predicate_name),2038ctx.expr_arena,2039ctx.phys_sm,2040ctx.cache,2041StreamingLowerIRContext {2042prepare_visualization: ctx.prepare_visualization,2043},2044)?;2045input_streams.insert(filter_stream);2046transformed_exprs.push(AExprBuilder::col(out_name.clone(), ctx.expr_arena).node());2047},20482049AExpr::Slice {2050input: inner,2051offset,2052length,2053} => {2054let out_name = unique_column_name();2055let inner_expr_ir = ExprIR::new(inner, OutputName::Alias(out_name.clone()));2056let offset_expr_ir = ExprIR::from_node(offset, ctx.expr_arena);2057let length_expr_ir = ExprIR::from_node(length, ctx.expr_arena);2058let input_stream = build_select_stream_with_ctx(input, &[inner_expr_ir], ctx)?;2059let offset_stream = build_select_stream_with_ctx(input, &[offset_expr_ir], ctx)?;2060let length_stream = build_select_stream_with_ctx(input, &[length_expr_ir], ctx)?;20612062let output_schema = ctx.phys_sm[input_stream.node].output_schema.clone();2063let kind = PhysNodeKind::DynamicSlice {2064input: input_stream,2065offset: offset_stream,2066length: length_stream,2067};2068let slice_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));2069input_streams.insert(PhysStream::first(slice_node_key));2070transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));2071},20722073AExpr::Function {2074input: ref inner_exprs,2075function: func @ (IRFunctionExpr::Shift | IRFunctionExpr::ShiftAndFill),2076options: _,2077} => {2078let out_name = unique_column_name();2079let data_col_expr = inner_exprs[0].with_alias(out_name.clone());2080let trans_data_column = build_select_stream_with_ctx(input, &[data_col_expr], ctx)?;2081let trans_offset =2082build_select_stream_with_ctx(input, &[inner_exprs[1].clone()], ctx)?;20832084let trans_fill = if func == IRFunctionExpr::ShiftAndFill {2085let fill_expr = inner_exprs[2].with_alias(out_name.clone());2086Some(build_select_stream_with_ctx(input, &[fill_expr], ctx)?)2087} else {2088None2089};20902091let output_schema = ctx.phys_sm[trans_data_column.node].output_schema.clone();2092let node_key = ctx.phys_sm.insert(PhysNode::new(2093output_schema,2094PhysNodeKind::Shift {2095input: trans_data_column,2096offset: trans_offset,2097fill: trans_fill,2098},2099));21002101input_streams.insert(PhysStream::first(node_key));2102transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));2103},21042105#[cfg(feature = "ewma")]2106AExpr::Function {2107input: input_exprs,2108function:2109ewm_variant @ IRFunctionExpr::EwmMean { options }2110| ewm_variant @ IRFunctionExpr::EwmVar { options }2111| ewm_variant @ IRFunctionExpr::EwmStd { options },2112options: _,2113} => {2114let out_name = unique_column_name();21152116let input = match input_exprs.as_slice() {2117[input_expr] => build_select_stream_with_ctx(2118input,2119&[input_expr.with_alias(out_name.clone())],2120ctx,2121)?,2122_ => panic!("{:?}", input_exprs),2123};21242125let input_schema = ctx.phys_sm[input.node].output_schema.clone();2126assert_eq!(input_schema.len(), 1);2127let output_schema = input_schema;21282129let kind = match ewm_variant {2130IRFunctionExpr::EwmMean { .. } => PhysNodeKind::EwmMean { input, options },2131IRFunctionExpr::EwmVar { .. } => PhysNodeKind::EwmVar { input, options },2132IRFunctionExpr::EwmStd { .. } => PhysNodeKind::EwmStd { input, options },2133_ => unreachable!(),2134};2135let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));2136input_streams.insert(PhysStream::first(node_key));2137transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));2138},21392140#[cfg(feature = "dynamic_group_by")]2141rolling_function @ AExpr::Rolling {2142function,2143index_column,2144period,2145offset,2146closed_window,2147} => {2148// function.rolling(index_column=index_column)2149//2150// ->2151//2152// .select(*LIVE_COLUMNS(function), _tmp0 = index_column)2153// .rolling(_tmp0)2154// .agg(_tmp1 = function)2155// .select(_tmp1)21562157let out_name = unique_column_name();2158let index_column_name = unique_column_name();21592160let index_column_expr_ir =2161AExprBuilder::new_from_node(index_column).expr_ir(index_column_name.clone());21622163let input_schema = &ctx.phys_sm[input.node].output_schema;2164let output_dtype = rolling_function2165.to_dtype(&ToFieldContext::new(ctx.expr_arena, input_schema))?;2166let output_schema = Schema::from_iter([2167index_column_expr_ir.field(input_schema, ctx.expr_arena)?,2168Field::new(out_name.clone(), output_dtype),2169]);21702171let input_columns = aexpr_to_leaf_names(function, ctx.expr_arena)2172.into_iter()2173.map(|n| AExprBuilder::col(n.clone(), ctx.expr_arena).expr_ir(n))2174.chain(std::iter::once(index_column_expr_ir.clone()))2175.collect::<Vec<_>>();2176let input = build_select_stream_with_ctx(input, &input_columns, ctx)?;21772178let kind = PhysNodeKind::RollingGroupBy {2179input,2180index_column: index_column_name,2181period,2182offset,2183closed: closed_window,2184slice: None,2185aggs: vec![AExprBuilder::new_from_node(function).expr_ir(out_name.clone())],2186};2187let node_key = ctx2188.phys_sm2189.insert(PhysNode::new(Arc::new(output_schema), kind));2190let input = PhysStream::first(node_key);21912192let input = build_select_stream_with_ctx(2193input,2194&[AExprBuilder::col(out_name.clone(), ctx.expr_arena)2195.expr_ir(out_name.clone())],2196ctx,2197)?;2198input_streams.insert(input);2199transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));2200},22012202AExpr::AnonymousFunction { .. }2203| AExpr::Function { .. }2204| AExpr::Over { .. }2205| AExpr::Gather { .. } => {2206let out_name = unique_column_name();2207fallback_subset.push(ExprIR::new(expr, OutputName::Alias(out_name.clone())));2208transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));2209},2210}2211}22122213if !fallback_subset.is_empty() {2214let fallback_node = build_fallback_node_with_ctx(input, &fallback_subset, ctx)?;2215input_streams.insert(PhysStream::first(fallback_node));2216}22172218// Simplify the input nodes (also ensures the original input only occurs2219// once in the zip).2220input_streams = simplify_input_streams(input, input_streams, ctx)?;22212222if input_streams.len() == 1 {2223// No need for any multiplexing/zipping, can directly execute.2224return Ok((input_streams.into_iter().next().unwrap(), transformed_exprs));2225}22262227let zip_inputs = input_streams.into_iter().collect_vec();2228let output_schema = zip_inputs2229.iter()2230.flat_map(|stream| ctx.phys_sm[stream.node].output_schema.iter_fields())2231.collect();2232let zip_kind = PhysNodeKind::Zip {2233inputs: zip_inputs,2234zip_behavior: ZipBehavior::Broadcast,2235};2236let zip_node = ctx2237.phys_sm2238.insert(PhysNode::new(Arc::new(output_schema), zip_kind));22392240Ok((PhysStream::first(zip_node), transformed_exprs))2241}22422243/// Computes the schema that selecting the given expressions on the input schema2244/// would result in.2245pub fn compute_output_schema(2246input_schema: &Schema,2247exprs: &[ExprIR],2248expr_arena: &Arena<AExpr>,2249) -> PolarsResult<Arc<Schema>> {2250let output_schema: Schema = exprs2251.iter()2252.map(|e| {2253let name = e.output_name().clone();2254let dtype = e2255.dtype(input_schema, expr_arena)?2256.clone()2257.materialize_unknown(true)2258.unwrap();2259PolarsResult::Ok(Field::new(name, dtype))2260})2261.try_collect()?;2262Ok(Arc::new(output_schema))2263}22642265/// Computes the schema that selecting the given expressions on the input node2266/// would result in.2267fn schema_for_select(2268input: PhysStream,2269exprs: &[ExprIR],2270ctx: &mut LowerExprContext,2271) -> PolarsResult<Arc<Schema>> {2272let input_schema = &ctx.phys_sm[input.node].output_schema;2273compute_output_schema(input_schema, exprs, ctx.expr_arena)2274}22752276fn build_select_stream_with_ctx(2277input: PhysStream,2278exprs: &[ExprIR],2279ctx: &mut LowerExprContext,2280) -> PolarsResult<PhysStream> {2281if exprs2282.iter()2283.all(|e| is_input_independent_ctx(e.node(), ctx))2284{2285return Ok(PhysStream::first(build_input_independent_node_with_ctx(2286exprs, ctx,2287)?));2288}22892290// Are we only selecting simple columns, with the same name?2291let all_simple_columns: Option<Vec<PlSmallStr>> = exprs2292.iter()2293.map(|e| match ctx.expr_arena.get(e.node()) {2294AExpr::Column(name) if name == e.output_name() => Some(name.clone()),2295_ => None,2296})2297.collect();22982299if let Some(columns) = all_simple_columns {2300let input_schema = ctx.phys_sm[input.node].output_schema.clone();2301if input_schema.len() == columns.len()2302&& input_schema.iter_names().zip(&columns).all(|(l, r)| l == r)2303{2304// Input node already has the correct schema, just pass through.2305return Ok(input);2306}23072308let output_schema = Arc::new(input_schema.try_project(&columns)?);2309let node_kind = PhysNodeKind::SimpleProjection { input, columns };2310let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));2311return Ok(PhysStream::first(node_key));2312}23132314// Actual lowering is needed.2315let node_exprs = exprs.iter().map(|e| e.node()).collect_vec();2316let (transformed_input, transformed_exprs) = lower_exprs_with_ctx(input, &node_exprs, ctx)?;2317let trans_expr_irs = exprs2318.iter()2319.zip(transformed_exprs)2320.map(|(e, te)| ExprIR::new(te, OutputName::Alias(e.output_name().clone())))2321.collect_vec();2322let output_schema = schema_for_select(transformed_input, &trans_expr_irs, ctx)?;2323let node_kind = PhysNodeKind::Select {2324input: transformed_input,2325selectors: trans_expr_irs,2326extend_original: false,2327};2328let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));2329Ok(PhysStream::first(node_key))2330}23312332/// Lowers an input node plus a set of expressions on that input node to an2333/// equivalent (input node, set of expressions) pair, ensuring that the new set2334/// of expressions can run on the streaming engine.2335///2336/// Ensures that if the input node is transformed it has unique column names.2337pub fn lower_exprs(2338input: PhysStream,2339exprs: &[ExprIR],2340expr_arena: &mut Arena<AExpr>,2341phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,2342expr_cache: &mut ExprCache,2343ctx: StreamingLowerIRContext,2344) -> PolarsResult<(PhysStream, Vec<ExprIR>)> {2345let mut ctx = LowerExprContext {2346expr_arena,2347phys_sm,2348cache: expr_cache,2349prepare_visualization: ctx.prepare_visualization,2350};2351let node_exprs = exprs.iter().map(|e| e.node()).collect_vec();2352let (transformed_input, transformed_exprs) =2353lower_exprs_with_ctx(input, &node_exprs, &mut ctx)?;2354let trans_expr_irs = exprs2355.iter()2356.zip(transformed_exprs)2357.map(|(e, te)| ExprIR::new(te, OutputName::Alias(e.output_name().clone())))2358.collect_vec();2359Ok((transformed_input, trans_expr_irs))2360}23612362/// Builds a new selection node given an input stream and the expressions to2363/// select for, if needed.2364pub fn build_select_stream(2365input: PhysStream,2366exprs: &[ExprIR],2367expr_arena: &mut Arena<AExpr>,2368phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,2369expr_cache: &mut ExprCache,2370ctx: StreamingLowerIRContext,2371) -> PolarsResult<PhysStream> {2372let mut ctx = LowerExprContext {2373expr_arena,2374phys_sm,2375cache: expr_cache,2376prepare_visualization: ctx.prepare_visualization,2377};2378build_select_stream_with_ctx(input, exprs, &mut ctx)2379}23802381/// Builds a hstack node given an input stream and the expressions to add.2382pub fn build_hstack_stream(2383input: PhysStream,2384exprs: &[ExprIR],2385expr_arena: &mut Arena<AExpr>,2386phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,2387expr_cache: &mut ExprCache,2388ctx: StreamingLowerIRContext,2389) -> PolarsResult<PhysStream> {2390let input_schema = &phys_sm[input.node].output_schema;2391if exprs2392.iter()2393.all(|e| is_elementwise_rec_cached(e.node(), expr_arena, expr_cache))2394{2395let mut output_schema = input_schema.as_ref().clone();2396for expr in exprs {2397output_schema.insert(2398expr.output_name().clone(),2399expr.dtype(input_schema, expr_arena)?2400.clone()2401.materialize_unknown(true)?,2402);2403}2404let output_schema = Arc::new(output_schema);24052406let selectors = exprs.to_vec();2407let kind = PhysNodeKind::Select {2408input,2409selectors,2410extend_original: true,2411};2412let node_key = phys_sm.insert(PhysNode {2413output_schema,2414kind,2415});24162417Ok(PhysStream::first(node_key))2418} else {2419// We already handled the all-streamable case above, so things get more complicated.2420// For simplicity we just do a normal select with all the original columns prepended.2421let mut selectors = PlIndexMap::with_capacity(input_schema.len() + exprs.len());2422for name in input_schema.iter_names() {2423let col_name = name.clone();2424let col_expr = expr_arena.add(AExpr::Column(col_name.clone()));2425selectors.insert(2426name.clone(),2427ExprIR::new(col_expr, OutputName::ColumnLhs(col_name)),2428);2429}2430for expr in exprs {2431selectors.insert(expr.output_name().clone(), expr.clone());2432}2433let selectors = selectors.into_values().collect_vec();2434build_length_preserving_select_stream(2435input, &selectors, expr_arena, phys_sm, expr_cache, ctx,2436)2437}2438}24392440/// Builds a new selection node given an input stream and the expressions to2441/// select for, if needed. Preserves the length of the input, like in with_columns.2442pub fn build_length_preserving_select_stream(2443input: PhysStream,2444exprs: &[ExprIR],2445expr_arena: &mut Arena<AExpr>,2446phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,2447expr_cache: &mut ExprCache,2448ctx: StreamingLowerIRContext,2449) -> PolarsResult<PhysStream> {2450let mut ctx = LowerExprContext {2451expr_arena,2452phys_sm,2453cache: expr_cache,2454prepare_visualization: ctx.prepare_visualization,2455};2456let already_length_preserving = exprs2457.iter()2458.any(|expr| is_length_preserving_ctx(expr.node(), &mut ctx));2459let input_schema = &ctx.phys_sm[input.node].output_schema;2460if exprs.is_empty() || input_schema.is_empty() || already_length_preserving {2461return build_select_stream_with_ctx(input, exprs, &mut ctx);2462}24632464// Hacky work-around: append an input column with a temporary name, but2465// remove it from the final selector. This should ensure scalars gets zipped2466// back to the input to broadcast them.2467let tmp_name = unique_column_name();2468let first_col = ctx.expr_arena.add(AExpr::Column(2469input_schema.iter_names_cloned().next().unwrap(),2470));2471let mut tmp_exprs = Vec::with_capacity(exprs.len() + 1);2472tmp_exprs.extend(exprs.iter().cloned());2473tmp_exprs.push(ExprIR::new(first_col, OutputName::Alias(tmp_name.clone())));24742475let out_stream = build_select_stream_with_ctx(input, &tmp_exprs, &mut ctx)?;2476let PhysNodeKind::Select { selectors, .. } = &mut ctx.phys_sm[out_stream.node].kind else {2477unreachable!()2478};2479assert!(selectors.pop().unwrap().output_name() == &tmp_name);2480let out_schema = Arc::make_mut(&mut phys_sm[out_stream.node].output_schema);2481out_schema.shift_remove(tmp_name.as_ref()).unwrap();2482Ok(out_stream)2483}248424852486