Path: blob/main/crates/polars-plan/src/plans/conversion/dsl_to_ir/mod.rs
7889 views
use arrow::datatypes::ArrowSchemaRef;1use either::Either;2use expr_expansion::rewrite_projections;3use hive::hive_partitions_from_paths;4use polars_core::chunked_array::cast::CastOptions;5use polars_core::config::verbose;6use polars_utils::format_pl_smallstr;7use polars_utils::itertools::Itertools;8use polars_utils::plpath::PlPath;9use polars_utils::unique_id::UniqueId;1011use super::convert_utils::SplitPredicates;12use super::stack_opt::ConversionOptimizer;13use super::*;14use crate::constants::get_pl_element_name;15use crate::dsl::PartitionedSinkOptions;16use crate::dsl::sink2::FileProviderType;1718mod concat;19mod datatype_fn_to_ir;20mod expr_expansion;21mod expr_to_ir;22mod functions;23mod join;24mod scans;25mod utils;26pub use expr_expansion::{expand_expression, is_regex_projection, prepare_projection};27pub use expr_to_ir::{ExprToIRContext, to_expr_ir};28use expr_to_ir::{to_expr_ir_materialized_lit, to_expr_irs};29use utils::DslConversionContext;3031macro_rules! failed_here {32($($t:tt)*) => {33format!("'{}'", stringify!($($t)*)).into()34}35}36pub(super) use failed_here;3738pub fn to_alp(39lp: DslPlan,40expr_arena: &mut Arena<AExpr>,41lp_arena: &mut Arena<IR>,42// Only `SIMPLIFY_EXPR`, `TYPE_COERCION`, `TYPE_CHECK` are respected.43opt_flags: &mut OptFlags,44) -> PolarsResult<Node> {45let conversion_optimizer = ConversionOptimizer::new(46opt_flags.contains(OptFlags::SIMPLIFY_EXPR),47opt_flags.contains(OptFlags::TYPE_COERCION),48opt_flags.contains(OptFlags::TYPE_CHECK),49);5051let mut ctxt = DslConversionContext {52expr_arena,53lp_arena,54conversion_optimizer,55opt_flags,56nodes_scratch: &mut unitvec![],57cache_file_info: Default::default(),58pushdown_maintain_errors: optimizer::pushdown_maintain_errors(),59verbose: verbose(),60seen_caches: Default::default(),61};6263match to_alp_impl(lp, &mut ctxt) {64Ok(out) => Ok(out),65Err(err) => {66if opt_flags.contains(OptFlags::EAGER) {67// If we dispatched to the lazy engine from the eager API, we don't want to resolve68// where in the query plan it went wrong. It is clear from the backtrace anyway.69return Err(err.remove_context());70};71let Some(ir_until_then) = lp_arena.last_node() else {72return Err(err);73};74let node_name = if let PolarsError::Context { msg, .. } = &err {75msg76} else {77"THIS_NODE"78};79let plan = IRPlan::new(80ir_until_then,81std::mem::take(lp_arena),82std::mem::take(expr_arena),83);84let location = format!("{}", plan.display());85Err(err.wrap_msg(|msg| {86format!("{msg}\n\nResolved plan until failure:\n\n\t---> FAILED HERE RESOLVING {node_name} <---\n{location}")87}))88},89}90}9192fn run_conversion(lp: IR, ctxt: &mut DslConversionContext, name: &str) -> PolarsResult<Node> {93let lp_node = ctxt.lp_arena.add(lp);94ctxt.conversion_optimizer95.optimize_exprs(ctxt.expr_arena, ctxt.lp_arena, lp_node, false)96.map_err(|e| e.context(format!("'{name}' failed").into()))?;9798Ok(lp_node)99}100101/// converts LogicalPlan to IR102/// it adds expressions & lps to the respective arenas as it traverses the plan103/// finally it returns the top node of the logical plan104#[recursive]105pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult<Node> {106let owned = Arc::unwrap_or_clone;107108let v = match lp {109DslPlan::Scan {110sources,111unified_scan_args,112scan_type,113cached_ir,114} => scans::dsl_to_ir(sources, unified_scan_args, scan_type, cached_ir, ctxt)?,115#[cfg(feature = "python")]116DslPlan::PythonScan { options } => {117use crate::dsl::python_dsl::PythonOptionsDsl;118119let schema = options.get_schema()?;120121let PythonOptionsDsl {122scan_fn,123schema_fn: _,124python_source,125validate_schema,126is_pure,127} = options;128129IR::PythonScan {130options: PythonOptions {131scan_fn,132schema,133python_source,134validate_schema,135output_schema: Default::default(),136with_columns: Default::default(),137n_rows: Default::default(),138predicate: Default::default(),139is_pure,140},141}142},143DslPlan::Union { inputs, args } => {144let mut inputs = inputs145.into_iter()146.map(|lp| to_alp_impl(lp, ctxt))147.collect::<PolarsResult<Vec<_>>>()148.map_err(|e| e.context(failed_here!(vertical concat)))?;149150if args.diagonal {151inputs = concat::convert_diagonal_concat(inputs, ctxt.lp_arena, ctxt.expr_arena)?;152}153154if args.to_supertypes {155concat::convert_st_union(156&mut inputs,157ctxt.lp_arena,158ctxt.expr_arena,159ctxt.opt_flags,160)161.map_err(|e| e.context(failed_here!(vertical concat)))?;162}163164let first = *inputs.first().ok_or_else(165|| polars_err!(InvalidOperation: "expected at least one input in 'union'/'concat'"),166)?;167let schema = ctxt.lp_arena.get(first).schema(ctxt.lp_arena);168for n in &inputs[1..] {169let schema_i = ctxt.lp_arena.get(*n).schema(ctxt.lp_arena);170// The first argument171schema_i.matches_schema(schema.as_ref()).map_err(|_| polars_err!(InvalidOperation: "'union'/'concat' inputs should all have the same schema,\172got\n{:?} and \n{:?}", schema, schema_i)173)?;174}175176let options = args.into();177IR::Union { inputs, options }178},179DslPlan::HConcat { inputs, options } => {180let inputs = inputs181.into_iter()182.map(|lp| to_alp_impl(lp, ctxt))183.collect::<PolarsResult<Vec<_>>>()184.map_err(|e| e.context(failed_here!(horizontal concat)))?;185186let schema = concat::h_concat_schema(&inputs, ctxt.lp_arena)?;187188IR::HConcat {189inputs,190schema,191options,192}193},194DslPlan::Filter { input, predicate } => {195let mut input =196to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(filter)))?;197let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);198199let mut out = Vec::with_capacity(1);200expr_expansion::expand_expression(201&predicate,202&PlHashSet::default(),203input_schema.as_ref().as_ref(),204&mut out,205ctxt.opt_flags,206)?;207208let predicate = match out.len() {2091 => {210// all good211out.pop().unwrap()212},2130 => {214let msg = "The predicate expanded to zero expressions. \215This may for example be caused by a regex not matching column names or \216a column dtype match not hitting any dtypes in the DataFrame";217polars_bail!(ComputeError: msg);218},219_ => {220let mut expanded = String::new();221for e in out.iter().take(5) {222expanded.push_str(&format!("\t{e:?},\n"))223}224// pop latest comma225expanded.pop();226if out.len() > 5 {227expanded.push_str("\t...\n")228}229230let msg = if cfg!(feature = "python") {231format!(232"The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\233This is ambiguous. Try to combine the predicates with the 'all' or `any' expression."234)235} else {236format!(237"The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\238This is ambiguous. Try to combine the predicates with the 'all_horizontal' or `any_horizontal' expression."239)240};241polars_bail!(ComputeError: msg)242},243};244let predicate_ae = to_expr_ir(245predicate,246&mut ExprToIRContext::new_with_opt_eager(247ctxt.expr_arena,248&input_schema,249ctxt.opt_flags,250),251)?;252253if ctxt.opt_flags.predicate_pushdown() {254ctxt.nodes_scratch.clear();255256if let Some(SplitPredicates { pushable, fallible }) = SplitPredicates::new(257predicate_ae.node(),258ctxt.expr_arena,259Some(ctxt.nodes_scratch),260ctxt.pushdown_maintain_errors,261) {262let mut update_input = |predicate: Node| -> PolarsResult<()> {263let predicate = ExprIR::from_node(predicate, ctxt.expr_arena);264ctxt.conversion_optimizer265.push_scratch(predicate.node(), ctxt.expr_arena);266let lp = IR::Filter { input, predicate };267input = run_conversion(lp, ctxt, "filter")?;268269Ok(())270};271272// Pushables first, then fallible.273274for predicate in pushable {275update_input(predicate)?;276}277278if let Some(node) = fallible {279update_input(node)?;280}281282return Ok(input);283};284};285286ctxt.conversion_optimizer287.push_scratch(predicate_ae.node(), ctxt.expr_arena);288let lp = IR::Filter {289input,290predicate: predicate_ae,291};292return run_conversion(lp, ctxt, "filter");293},294DslPlan::Slice { input, offset, len } => {295let input =296to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(slice)))?;297IR::Slice { input, offset, len }298},299DslPlan::DataFrameScan { df, schema } => IR::DataFrameScan {300df,301schema,302output_schema: None,303},304DslPlan::Select {305expr,306input,307options,308} => {309let input =310to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(select)))?;311let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);312let (exprs, schema) = prepare_projection(expr, &input_schema, ctxt.opt_flags)313.map_err(|e| e.context(failed_here!(select)))?;314315if exprs.is_empty() {316ctxt.lp_arena.replace(input, utils::empty_df());317return Ok(input);318}319320let eirs = to_expr_irs(321exprs,322&mut ExprToIRContext::new_with_opt_eager(323ctxt.expr_arena,324&input_schema,325ctxt.opt_flags,326),327)?;328ctxt.conversion_optimizer329.fill_scratch(&eirs, ctxt.expr_arena);330331let schema = Arc::new(schema);332let lp = IR::Select {333expr: eirs,334input,335schema,336options,337};338339return run_conversion(lp, ctxt, "select").map_err(|e| e.context(failed_here!(select)));340},341DslPlan::Sort {342input,343by_column,344slice,345mut sort_options,346} => {347let input =348to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(select)))?;349let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);350351// note: if given an Expr::Columns, count the individual cols352let n_by_exprs = if by_column.len() == 1 {353match &by_column[0] {354Expr::Selector(s) => s.into_columns(&input_schema, &Default::default())?.len(),355_ => 1,356}357} else {358by_column.len()359};360let n_desc = sort_options.descending.len();361polars_ensure!(362n_desc == n_by_exprs || n_desc == 1,363ComputeError: "the length of `descending` ({}) does not match the length of `by` ({})", n_desc, by_column.len()364);365let n_nulls_last = sort_options.nulls_last.len();366polars_ensure!(367n_nulls_last == n_by_exprs || n_nulls_last == 1,368ComputeError: "the length of `nulls_last` ({}) does not match the length of `by` ({})", n_nulls_last, by_column.len()369);370371let mut expanded_cols = Vec::new();372let mut nulls_last = Vec::new();373let mut descending = Vec::new();374375// note: nulls_last/descending need to be matched to expanded multi-output expressions.376// when one of nulls_last/descending has not been updated from the default (single377// value true/false), 'cycle' ensures that "by_column" iter is not truncated.378for (c, (&n, &d)) in by_column.into_iter().zip(379sort_options380.nulls_last381.iter()382.cycle()383.zip(sort_options.descending.iter().cycle()),384) {385let exprs = utils::expand_expressions(386input,387vec![c],388ctxt.lp_arena,389ctxt.expr_arena,390ctxt.opt_flags,391)392.map_err(|e| e.context(failed_here!(sort)))?;393394nulls_last.extend(std::iter::repeat_n(n, exprs.len()));395descending.extend(std::iter::repeat_n(d, exprs.len()));396expanded_cols.extend(exprs);397}398sort_options.nulls_last = nulls_last;399sort_options.descending = descending;400401ctxt.conversion_optimizer402.fill_scratch(&expanded_cols, ctxt.expr_arena);403let mut by_column = expanded_cols;404405// Remove null columns in multi-columns sort406if by_column.len() > 1 {407let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);408409let mut null_columns = vec![];410411for (i, c) in by_column.iter().enumerate() {412if let DataType::Null = c.dtype(&input_schema, ctxt.expr_arena)? {413null_columns.push(i);414}415}416// All null columns, only take one.417if null_columns.len() == by_column.len() {418by_column.truncate(1);419sort_options.nulls_last.truncate(1);420sort_options.descending.truncate(1);421}422// Remove the null columns423else if !null_columns.is_empty() {424for i in null_columns.into_iter().rev() {425by_column.remove(i);426sort_options.nulls_last.remove(i);427sort_options.descending.remove(i);428}429}430}431if by_column.is_empty() {432return Ok(input);433};434435let lp = IR::Sort {436input,437by_column,438slice,439sort_options,440};441442return run_conversion(lp, ctxt, "sort").map_err(|e| e.context(failed_here!(sort)));443},444DslPlan::Cache { input, id } => {445let input = match ctxt.seen_caches.get(&id) {446Some(input) => *input,447None => {448let input = to_alp_impl(owned(input), ctxt)449.map_err(|e| e.context(failed_here!(cache)))?;450let seen_before = ctxt.seen_caches.insert(id, input);451assert!(452seen_before.is_none(),453"Cache could not have been created in the mean time. That would make the DAG cyclic."454);455input456},457};458459IR::Cache { input, id }460},461DslPlan::GroupBy {462input,463keys,464predicates,465mut aggs,466apply,467maintain_order,468options,469} => {470// If the group by contains any predicates, we update the plan by turning the471// predicates into aggregations and filtering on them. Then, we recursively call472// this function.473if !predicates.is_empty() {474let predicate_names = (0..predicates.len())475.map(|i| format_pl_smallstr!("__POLARS_HAVING_{i}"))476.collect::<Arc<[_]>>();477let predicates = predicates478.into_iter()479.zip(predicate_names.iter())480.map(|(p, name)| p.alias(name.clone()))481.collect_vec();482aggs.extend(predicates);483484let lp = DslPlan::GroupBy {485input,486keys,487predicates: vec![],488aggs,489apply,490maintain_order,491options,492};493let lp = DslBuilder::from(lp)494.filter(495all_horizontal(496predicate_names.iter().map(|n| col(n.clone())).collect_vec(),497)498.unwrap(),499)500.drop(Selector::ByName {501names: predicate_names,502strict: true,503})504.build();505return to_alp_impl(lp, ctxt);506}507508// NOTE: As we went into this branch, we know that no predicates are provided.509let input =510to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(group_by)))?;511512// Rolling + group-by sorts the whole table, so remove unneeded columns513if ctxt.opt_flags.eager() && options.is_rolling() && !keys.is_empty() {514ctxt.opt_flags.insert(OptFlags::PROJECTION_PUSHDOWN)515}516517let (keys, aggs, schema) = resolve_group_by(518input,519keys,520aggs,521&options,522ctxt.lp_arena,523ctxt.expr_arena,524ctxt.opt_flags,525)526.map_err(|e| e.context(failed_here!(group_by)))?;527528let (apply, schema) = if let Some((apply, schema)) = apply {529(Some(apply), schema)530} else {531(None, schema)532};533534ctxt.conversion_optimizer535.fill_scratch(&keys, ctxt.expr_arena);536ctxt.conversion_optimizer537.fill_scratch(&aggs, ctxt.expr_arena);538539let lp = IR::GroupBy {540input,541keys,542aggs,543schema,544apply,545maintain_order,546options,547};548return run_conversion(lp, ctxt, "group_by")549.map_err(|e| e.context(failed_here!(group_by)));550},551DslPlan::Join {552input_left,553input_right,554left_on,555right_on,556predicates,557options,558} => {559return join::resolve_join(560Either::Left(input_left),561Either::Left(input_right),562left_on,563right_on,564predicates,565JoinOptionsIR::from(Arc::unwrap_or_clone(options)),566ctxt,567)568.map_err(|e| e.context(failed_here!(join)))569.map(|t| t.0);570},571DslPlan::HStack {572input,573exprs,574options,575} => {576let input = to_alp_impl(owned(input), ctxt)577.map_err(|e| e.context(failed_here!(with_columns)))?;578let (exprs, schema) =579resolve_with_columns(exprs, input, ctxt.lp_arena, ctxt.expr_arena, ctxt.opt_flags)580.map_err(|e| e.context(failed_here!(with_columns)))?;581582ctxt.conversion_optimizer583.fill_scratch(&exprs, ctxt.expr_arena);584let lp = IR::HStack {585input,586exprs,587schema,588options,589};590return run_conversion(lp, ctxt, "with_columns");591},592DslPlan::MatchToSchema {593input,594match_schema,595per_column,596extra_columns,597} => {598let input =599to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;600let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);601602assert_eq!(per_column.len(), match_schema.len());603604if input_schema.as_ref() == &match_schema {605return Ok(input);606}607608let mut exprs = Vec::with_capacity(match_schema.len());609let mut found_missing_columns = Vec::new();610let mut used_input_columns = 0;611612for ((column, dtype), per_column) in match_schema.iter().zip(per_column.iter()) {613match input_schema.get(column) {614None => match &per_column.missing_columns {615MissingColumnsPolicyOrExpr::Raise => found_missing_columns.push(column),616MissingColumnsPolicyOrExpr::Insert => exprs.push(Expr::Alias(617Arc::new(Expr::Literal(LiteralValue::Scalar(Scalar::null(618dtype.clone(),619)))),620column.clone(),621)),622MissingColumnsPolicyOrExpr::InsertWith(expr) => {623exprs.push(Expr::Alias(Arc::new(expr.clone()), column.clone()))624},625},626Some(input_dtype) if dtype == input_dtype => {627used_input_columns += 1;628exprs.push(Expr::Column(column.clone()))629},630Some(input_dtype) => {631let from_dtype = input_dtype;632let to_dtype = dtype;633634let policy = CastColumnsPolicy {635integer_upcast: per_column.integer_cast == UpcastOrForbid::Upcast,636float_upcast: per_column.float_cast == UpcastOrForbid::Upcast,637missing_struct_fields: per_column.missing_struct_fields,638extra_struct_fields: per_column.extra_struct_fields,639640..Default::default()641};642643let should_cast =644policy.should_cast_column(column, to_dtype, from_dtype)?;645646let mut expr = Expr::Column(PlSmallStr::from_str(column));647if should_cast {648expr = expr.cast_with_options(to_dtype.clone(), CastOptions::NonStrict);649}650651used_input_columns += 1;652exprs.push(expr);653},654}655}656657// Report the error for missing columns658if let Some(lst) = found_missing_columns.first() {659use std::fmt::Write;660let mut formatted = String::new();661write!(&mut formatted, "\"{}\"", found_missing_columns[0]).unwrap();662for c in &found_missing_columns[1..] {663write!(&mut formatted, ", \"{c}\"").unwrap();664}665666write!(&mut formatted, "\"{lst}\"").unwrap();667polars_bail!(SchemaMismatch: "missing columns in `match_to_schema`: {formatted}");668}669670// Report the error for extra columns671if used_input_columns != input_schema.len()672&& extra_columns == ExtraColumnsPolicy::Raise673{674let found_extra_columns = input_schema675.iter_names()676.filter(|n| !match_schema.contains(n))677.collect::<Vec<_>>();678679use std::fmt::Write;680let mut formatted = String::new();681write!(&mut formatted, "\"{}\"", found_extra_columns[0]).unwrap();682for c in &found_extra_columns[1..] {683write!(&mut formatted, ", \"{c}\"").unwrap();684}685686polars_bail!(SchemaMismatch: "extra columns in `match_to_schema`: {formatted}");687}688689let exprs = to_expr_irs(690exprs,691&mut ExprToIRContext::new_with_opt_eager(692ctxt.expr_arena,693&input_schema,694ctxt.opt_flags,695),696)?;697698ctxt.conversion_optimizer699.fill_scratch(&exprs, ctxt.expr_arena);700let lp = IR::Select {701input,702expr: exprs,703schema: match_schema.clone(),704options: ProjectionOptions {705run_parallel: true,706duplicate_check: false,707should_broadcast: true,708},709};710return run_conversion(lp, ctxt, "match_to_schema");711},712DslPlan::PipeWithSchema { input, callback } => {713// Derive the schema from the input714let mut inputs = Vec::with_capacity(input.len());715let mut input_schemas = Vec::with_capacity(input.len());716717for plan in input.as_ref() {718let ir = to_alp_impl(plan.clone(), ctxt)?;719let schema = ctxt.lp_arena.get(ir).schema(ctxt.lp_arena).into_owned();720721let dsl = DslPlan::IR {722dsl: Arc::new(plan.clone()),723version: ctxt.lp_arena.version(),724node: Some(ir),725};726inputs.push(dsl);727input_schemas.push(schema);728}729730// Adjust the input and start conversion again731let input_adjusted = callback.call((inputs, input_schemas))?;732return to_alp_impl(input_adjusted, ctxt);733},734#[cfg(feature = "pivot")]735DslPlan::Pivot {736input,737on,738on_columns,739index,740values,741agg,742maintain_order,743separator,744} => {745let input =746to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;747let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);748749let on = on.into_columns(input_schema.as_ref(), &Default::default())?;750let index = index.into_columns(input_schema.as_ref(), &Default::default())?;751let values = values.into_columns(input_schema.as_ref(), &Default::default())?;752753polars_ensure!(!on.is_empty(), InvalidOperation: "`pivot` called without `on` columns.");754polars_ensure!(on.len() == on_columns.width(), InvalidOperation: "`pivot` expected `on` and `on_columns` to have the same amount of columns.");755if on.len() > 1 {756polars_ensure!(757on_columns.get_columns().iter().zip(on.iter()).all(|(c, o)| o == c.name()),758InvalidOperation: "`pivot` has mismatching column names between `on` and `on_columns`."759);760}761polars_ensure!(!values.is_empty(), InvalidOperation: "`pivot` called without `values` columns.");762763let on_titles = if on_columns.width() == 1 {764on_columns.get_columns()[0].cast(&DataType::String)?765} else {766on_columns767.as_ref()768.clone()769.into_struct(PlSmallStr::EMPTY)770.cast(&DataType::String)?771.into_column()772};773let on_titles = on_titles.str()?;774775let mut expr_schema = input_schema.as_ref().as_ref().clone();776let mut out = Vec::with_capacity(1);777let mut aggs = Vec::<ExprIR>::with_capacity(values.len() * on_columns.height());778for value in values.iter() {779out.clear();780let value_dtype = input_schema.try_get(value)?;781expr_schema.insert(get_pl_element_name(), value_dtype.clone());782expand_expression(783&agg,784&Default::default(),785&expr_schema,786&mut out,787ctxt.opt_flags,788)?;789polars_ensure!(790out.len() == 1,791InvalidOperation: "Pivot expression are not allowed to expand to more than 1 expression"792);793let agg = out.pop().unwrap();794let agg_ae = to_expr_ir(795agg,796&mut ExprToIRContext::new_with_opt_eager(797ctxt.expr_arena,798&expr_schema,799ctxt.opt_flags,800),801)?802.node();803804polars_ensure!(805aexpr_to_leaf_names_iter(agg_ae, ctxt.expr_arena).count() == 0,806InvalidOperation: "explicit column references are not allowed in the `aggregate_function` of `pivot`"807);808809for i in 0..on_columns.height() {810let mut name = String::new();811if values.len() > 1 {812name.push_str(value.as_str());813name.push_str(separator.as_str());814}815816name.push_str(on_titles.get(i).unwrap_or("null"));817818fn on_predicate(819on: &PlSmallStr,820on_column: &Column,821i: usize,822expr_arena: &mut Arena<AExpr>,823) -> AExprBuilder {824let e = AExprBuilder::col(on.clone(), expr_arena);825e.eq(826AExprBuilder::lit_scalar(827Scalar::new(828on_column.dtype().clone(),829on_column.get(i).unwrap().into_static(),830),831expr_arena,832),833expr_arena,834)835}836837let predicate = if on.len() == 1 {838on_predicate(&on[0], &on_columns.get_columns()[0], i, ctxt.expr_arena)839} else {840AExprBuilder::function(841on.iter()842.enumerate()843.map(|(j, on_col)| {844on_predicate(845on_col,846&on_columns.get_columns()[j],847i,848ctxt.expr_arena,849)850.expr_ir(on_col.clone())851})852.collect::<Vec<_>>(),853IRFunctionExpr::Boolean(IRBooleanFunction::AllHorizontal),854ctxt.expr_arena,855)856};857858let replacement_element = AExprBuilder::col(value.clone(), ctxt.expr_arena)859.filter(predicate, ctxt.expr_arena)860.node();861862#[recursive::recursive]863fn deep_clone_element_replace(864ae: Node,865arena: &mut Arena<AExpr>,866replacement: Node,867) -> Node {868let slf = arena.get(ae).clone();869if matches!(slf, AExpr::Element) {870return deep_clone_ae(replacement, arena);871} else if matches!(slf, AExpr::Len) {872// For backwards-compatibility, we support providing `pl.len()` to mean873// the length of the group here.874let element = deep_clone_ae(replacement, arena);875return AExprBuilder::new_from_node(element).len(arena).node();876}877878let mut children = vec![];879slf.children_rev(&mut children);880for child in &mut children {881*child = deep_clone_element_replace(*child, arena, replacement);882}883children.reverse();884885arena.add(slf.replace_children(&children))886}887aggs.push(ExprIR::new(888deep_clone_element_replace(agg_ae, ctxt.expr_arena, replacement_element),889OutputName::Alias(name.into()),890));891}892}893894let keys = index895.into_iter()896.map(|i| AExprBuilder::col(i.clone(), ctxt.expr_arena).expr_ir(i))897.collect();898IRBuilder::new(input, ctxt.expr_arena, ctxt.lp_arena)899.group_by(keys, aggs, None, maintain_order, Default::default())900.build()901},902DslPlan::Distinct { input, options } => {903let input =904to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;905let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena).into_owned();906907// "subset" param supports cols and/or arbitrary expressions908let (input, subset, temp_cols) = if let Some(exprs) = options.subset {909let exprs = rewrite_projections(910exprs,911&PlHashSet::default(),912&input_schema,913ctxt.opt_flags,914)?;915916// identify cols and exprs in "subset" param917let mut subset_colnames = vec![];918let mut subset_exprs = vec![];919for expr in &exprs {920match expr {921Expr::Column(name) => {922polars_ensure!(923input_schema.contains(name),924ColumnNotFound: "{name:?} not found"925);926subset_colnames.push(name.clone());927},928_ => subset_exprs.push(expr.clone()),929}930}931932if subset_exprs.is_empty() {933// "subset" is a collection of basic cols (or empty)934(input, Some(subset_colnames.into_iter().collect()), vec![])935} else {936// "subset" contains exprs; add them as temporary cols937let (aliased_exprs, temp_names): (Vec<_>, Vec<_>) = subset_exprs938.into_iter()939.enumerate()940.map(|(idx, expr)| {941let temp_name = format_pl_smallstr!("__POLARS_UNIQUE_SUBSET_{}", idx);942(expr.alias(temp_name.clone()), temp_name)943})944.unzip();945946subset_colnames.extend_from_slice(&temp_names);947948// integrate the temporary cols with the existing "input" node949let (temp_expr_irs, schema) = resolve_with_columns(950aliased_exprs,951input,952ctxt.lp_arena,953ctxt.expr_arena,954ctxt.opt_flags,955)?;956ctxt.conversion_optimizer957.fill_scratch(&temp_expr_irs, ctxt.expr_arena);958959let input_with_exprs = ctxt.lp_arena.add(IR::HStack {960input,961exprs: temp_expr_irs,962schema,963options: ProjectionOptions {964run_parallel: false,965duplicate_check: false,966should_broadcast: true,967},968});969(970input_with_exprs,971Some(subset_colnames.into_iter().collect()),972temp_names,973)974}975} else {976(input, None, vec![])977};978979// `distinct` definition (will contain temporary cols if we have "subset" exprs)980let distinct_node = ctxt.lp_arena.add(IR::Distinct {981input,982options: DistinctOptionsIR {983subset,984maintain_order: options.maintain_order,985keep_strategy: options.keep_strategy,986slice: None,987},988});989990// if no temporary cols (eg: we had no "subset" exprs), we're done...991if temp_cols.is_empty() {992return Ok(distinct_node);993}994995// ...otherwise, drop them by projecting the original schema996return Ok(ctxt.lp_arena.add(IR::SimpleProjection {997input: distinct_node,998columns: input_schema,999}));1000},1001DslPlan::MapFunction { input, function } => {1002let input = to_alp_impl(owned(input), ctxt)1003.map_err(|e| e.context(failed_here!(format!("{}", function).to_lowercase())))?;1004let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);10051006match function {1007DslFunction::Explode {1008columns,1009options,1010allow_empty,1011} => {1012let columns = columns.into_columns(&input_schema, &Default::default())?;1013polars_ensure!(!columns.is_empty() || allow_empty, InvalidOperation: "no columns provided in explode");1014if columns.is_empty() {1015return Ok(input);1016}1017let function = FunctionIR::Explode {1018columns: columns.into_iter().collect(),1019options,1020schema: Default::default(),1021};1022let ir = IR::MapFunction { input, function };1023return Ok(ctxt.lp_arena.add(ir));1024},1025DslFunction::FillNan(fill_value) => {1026let exprs = input_schema1027.iter()1028.filter_map(|(name, dtype)| match dtype {1029DataType::Float16 | DataType::Float32 | DataType::Float64 => Some(1030col(name.clone())1031.fill_nan(fill_value.clone())1032.alias(name.clone()),1033),1034_ => None,1035})1036.collect::<Vec<_>>();10371038let (exprs, schema) = resolve_with_columns(1039exprs,1040input,1041ctxt.lp_arena,1042ctxt.expr_arena,1043ctxt.opt_flags,1044)1045.map_err(|e| e.context(failed_here!(fill_nan)))?;10461047ctxt.conversion_optimizer1048.fill_scratch(&exprs, ctxt.expr_arena);10491050let lp = IR::HStack {1051input,1052exprs,1053schema,1054options: ProjectionOptions {1055duplicate_check: false,1056..Default::default()1057},1058};1059return run_conversion(lp, ctxt, "fill_nan");1060},1061DslFunction::Stats(sf) => {1062let exprs = match sf {1063StatsFunction::Var { ddof } => stats_helper(1064|dt| dt.is_primitive_numeric() || dt.is_bool() || dt.is_decimal(),1065|name| col(name.clone()).var(ddof),1066&input_schema,1067),1068StatsFunction::Std { ddof } => stats_helper(1069|dt| dt.is_primitive_numeric() || dt.is_bool() || dt.is_decimal(),1070|name| col(name.clone()).std(ddof),1071&input_schema,1072),1073StatsFunction::Quantile { quantile, method } => stats_helper(1074|dt| dt.is_primitive_numeric() || dt.is_decimal() || dt.is_temporal(),1075|name| col(name.clone()).quantile(quantile.clone(), method),1076&input_schema,1077),1078StatsFunction::Mean => stats_helper(1079|dt| {1080dt.is_primitive_numeric()1081|| dt.is_temporal()1082|| dt.is_bool()1083|| dt.is_decimal()1084},1085|name| col(name.clone()).mean(),1086&input_schema,1087),1088StatsFunction::Sum => stats_helper(1089|dt| {1090dt.is_primitive_numeric()1091|| dt.is_decimal()1092|| matches!(dt, DataType::Boolean | DataType::Duration(_))1093},1094|name| col(name.clone()).sum(),1095&input_schema,1096),1097StatsFunction::Min => stats_helper(1098|dt| dt.is_ord(),1099|name| col(name.clone()).min(),1100&input_schema,1101),1102StatsFunction::Max => stats_helper(1103|dt| dt.is_ord(),1104|name| col(name.clone()).max(),1105&input_schema,1106),1107StatsFunction::Median => stats_helper(1108|dt| {1109dt.is_primitive_numeric()1110|| dt.is_temporal()1111|| dt == &DataType::Boolean1112},1113|name| col(name.clone()).median(),1114&input_schema,1115),1116};1117let schema = Arc::new(expressions_to_schema(1118&exprs,1119&input_schema,1120|duplicate_name: &str| duplicate_name.to_string(),1121)?);1122let eirs = to_expr_irs(1123exprs,1124&mut ExprToIRContext::new_with_opt_eager(1125ctxt.expr_arena,1126&input_schema,1127ctxt.opt_flags,1128),1129)?;11301131ctxt.conversion_optimizer1132.fill_scratch(&eirs, ctxt.expr_arena);11331134let lp = IR::Select {1135input,1136expr: eirs,1137schema,1138options: ProjectionOptions {1139duplicate_check: false,1140..Default::default()1141},1142};1143return run_conversion(lp, ctxt, "stats");1144},1145DslFunction::Rename {1146existing,1147new,1148strict,1149} => {1150assert_eq!(existing.len(), new.len());1151if existing.is_empty() {1152return Ok(input);1153}11541155let existing_lut =1156PlIndexSet::from_iter(existing.iter().map(PlSmallStr::as_str));11571158let mut schema = Schema::with_capacity(input_schema.len());1159let mut num_replaced = 0;11601161// Turn the rename into a select.1162let expr = input_schema1163.iter()1164.map(|(n, dtype)| {1165Ok(match existing_lut.get_index_of(n.as_str()) {1166None => {1167schema.try_insert(n.clone(), dtype.clone())?;1168Expr::Column(n.clone())1169},1170Some(i) => {1171num_replaced += 1;1172schema.try_insert(new[i].clone(), dtype.clone())?;1173Expr::Column(n.clone()).alias(new[i].clone())1174},1175})1176})1177.collect::<PolarsResult<Vec<_>>>()?;11781179if strict && num_replaced != existing.len() {1180let col = existing.iter().find(|c| !input_schema.contains(c)).unwrap();1181polars_bail!(col_not_found = col);1182}11831184// Nothing changed, make into a no-op.1185if num_replaced == 0 {1186return Ok(input);1187}11881189let expr = to_expr_irs(1190expr,1191&mut ExprToIRContext::new_with_opt_eager(1192ctxt.expr_arena,1193&input_schema,1194ctxt.opt_flags,1195),1196)?;1197ctxt.conversion_optimizer1198.fill_scratch(&expr, ctxt.expr_arena);11991200IR::Select {1201input,1202expr,1203schema: Arc::new(schema),1204options: ProjectionOptions {1205run_parallel: false,1206duplicate_check: false,1207should_broadcast: false,1208},1209}1210},1211_ => {1212let function = function.into_function_ir(&input_schema)?;1213IR::MapFunction { input, function }1214},1215}1216},1217DslPlan::ExtContext { input, contexts } => {1218let input = to_alp_impl(owned(input), ctxt)1219.map_err(|e| e.context(failed_here!(with_context)))?;1220let contexts = contexts1221.into_iter()1222.map(|lp| to_alp_impl(lp, ctxt))1223.collect::<PolarsResult<Vec<_>>>()1224.map_err(|e| e.context(failed_here!(with_context)))?;12251226let mut schema = (**ctxt.lp_arena.get(input).schema(ctxt.lp_arena)).clone();1227for input in &contexts {1228let other_schema = ctxt.lp_arena.get(*input).schema(ctxt.lp_arena);1229for fld in other_schema.iter_fields() {1230if schema.get(fld.name()).is_none() {1231schema.with_column(fld.name, fld.dtype);1232}1233}1234}12351236IR::ExtContext {1237input,1238contexts,1239schema: Arc::new(schema),1240}1241},1242DslPlan::Sink { input, payload } => {1243let input =1244to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(sink)))?;1245let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);1246let payload = match payload {1247SinkType::Memory => SinkTypeIR::Memory,1248SinkType::Callback(f) => SinkTypeIR::Callback(f),1249SinkType::File(options) => SinkTypeIR::File(options),1250SinkType::Partitioned(PartitionedSinkOptions {1251base_path,1252file_path_provider,1253partition_strategy,1254finish_callback,1255file_format,1256unified_sink_args,1257max_rows_per_file,1258approximate_bytes_per_file,1259}) => {1260let expr_to_ir_cx = &mut ExprToIRContext::new_with_opt_eager(1261ctxt.expr_arena,1262&input_schema,1263ctxt.opt_flags,1264);12651266let partition_strategy = match partition_strategy {1267PartitionStrategy::Keyed {1268keys,1269include_keys,1270keys_pre_grouped,1271per_partition_sort_by,1272} => {1273let keys = to_expr_irs(keys, expr_to_ir_cx)?;1274let per_partition_sort_by: Vec<SortColumnIR> = per_partition_sort_by1275.into_iter()1276.map(|s| {1277let SortColumn {1278expr,1279descending,1280nulls_last,1281} = s;1282Ok(SortColumnIR {1283expr: to_expr_ir(expr, expr_to_ir_cx)?,1284descending,1285nulls_last,1286})1287})1288.collect::<PolarsResult<_>>()?;12891290PartitionStrategyIR::Keyed {1291keys,1292include_keys,1293keys_pre_grouped,1294per_partition_sort_by,1295}1296},1297PartitionStrategy::FileSize => PartitionStrategyIR::FileSize,1298};12991300let options = PartitionedSinkOptionsIR {1301base_path,1302file_path_provider: file_path_provider.unwrap_or_else(|| {1303FileProviderType::Hive {1304extension: PlSmallStr::from_static(file_format.extension()),1305}1306}),1307partition_strategy,1308finish_callback,1309file_format,1310unified_sink_args,1311max_rows_per_file,1312approximate_bytes_per_file,1313};13141315ctxt.conversion_optimizer1316.fill_scratch(options.expr_irs_iter(), ctxt.expr_arena);13171318SinkTypeIR::Partitioned(options)1319},1320};13211322let lp = IR::Sink { input, payload };1323return run_conversion(lp, ctxt, "sink");1324},1325DslPlan::SinkMultiple { inputs } => {1326let inputs = inputs1327.into_iter()1328.map(|lp| to_alp_impl(lp, ctxt))1329.collect::<PolarsResult<Vec<_>>>()1330.map_err(|e| e.context(failed_here!(vertical concat)))?;1331IR::SinkMultiple { inputs }1332},1333#[cfg(feature = "merge_sorted")]1334DslPlan::MergeSorted {1335input_left,1336input_right,1337key,1338} => {1339let input_left = to_alp_impl(owned(input_left), ctxt)1340.map_err(|e| e.context(failed_here!(merge_sorted)))?;1341let input_right = to_alp_impl(owned(input_right), ctxt)1342.map_err(|e| e.context(failed_here!(merge_sorted)))?;13431344let left_schema = ctxt.lp_arena.get(input_left).schema(ctxt.lp_arena);1345let right_schema = ctxt.lp_arena.get(input_right).schema(ctxt.lp_arena);13461347left_schema1348.ensure_is_exact_match(&right_schema)1349.map_err(|err| err.context("merge_sorted".into()))?;13501351left_schema1352.try_get(key.as_str())1353.map_err(|err| err.context("merge_sorted".into()))?;13541355IR::MergeSorted {1356input_left,1357input_right,1358key,1359}1360},1361DslPlan::IR { node, dsl, version } => {1362return match node {1363Some(node)1364if version == ctxt.lp_arena.version()1365&& ctxt.conversion_optimizer.used_arenas.insert(version) =>1366{1367Ok(node)1368},1369_ => to_alp_impl(owned(dsl), ctxt),1370};1371},1372};1373Ok(ctxt.lp_arena.add(v))1374}13751376fn resolve_with_columns(1377exprs: Vec<Expr>,1378input: Node,1379lp_arena: &Arena<IR>,1380expr_arena: &mut Arena<AExpr>,1381opt_flags: &mut OptFlags,1382) -> PolarsResult<(Vec<ExprIR>, SchemaRef)> {1383let input_schema = lp_arena.get(input).schema(lp_arena);1384let mut output_schema = (**input_schema).clone();1385let exprs = rewrite_projections(exprs, &PlHashSet::new(), &input_schema, opt_flags)?;1386let mut output_names = PlHashSet::with_capacity(exprs.len());13871388let eirs = to_expr_irs(1389exprs,1390&mut ExprToIRContext::new_with_opt_eager(expr_arena, &input_schema, opt_flags),1391)?;1392for eir in eirs.iter() {1393let field = eir.field(&input_schema, expr_arena)?;13941395if !output_names.insert(field.name().clone()) {1396let msg = format!(1397"the name '{}' passed to `LazyFrame.with_columns` is duplicate\n\n\1398It's possible that multiple expressions are returning the same default column name. \1399If this is the case, try renaming the columns with `.alias(\"new_name\")` to avoid \1400duplicate column names.",1401field.name()1402);1403polars_bail!(ComputeError: msg)1404}1405output_schema.with_column(field.name, field.dtype.materialize_unknown(true)?);1406}14071408Ok((eirs, Arc::new(output_schema)))1409}14101411fn resolve_group_by(1412input: Node,1413keys: Vec<Expr>,1414aggs: Vec<Expr>,1415_options: &GroupbyOptions,1416lp_arena: &Arena<IR>,1417expr_arena: &mut Arena<AExpr>,1418opt_flags: &mut OptFlags,1419) -> PolarsResult<(Vec<ExprIR>, Vec<ExprIR>, SchemaRef)> {1420let input_schema = lp_arena.get(input).schema(lp_arena);1421let input_schema = input_schema.as_ref();1422let mut keys = rewrite_projections(keys, &PlHashSet::default(), input_schema, opt_flags)?;14231424// Initialize schema from keys1425let mut output_schema = expressions_to_schema(&keys, input_schema, |duplicate_name: &str| {1426format!("group_by keys contained duplicate output name '{duplicate_name}'")1427})?;1428let mut key_names: PlHashSet<PlSmallStr> = output_schema.iter_names().cloned().collect();14291430#[allow(unused_mut)]1431let mut pop_keys = false;1432// Add dynamic groupby index column(s)1433// Also add index columns to keys for expression expansion.1434#[cfg(feature = "dynamic_group_by")]1435{1436if let Some(options) = _options.rolling.as_ref() {1437let name = options.index_column.clone();1438let dtype = input_schema.try_get(name.as_str())?;1439keys.push(col(name.clone()));1440key_names.insert(name.clone());1441pop_keys = true;1442output_schema.with_column(name.clone(), dtype.clone());1443} else if let Some(options) = _options.dynamic.as_ref() {1444let name = options.index_column.clone();1445keys.push(col(name.clone()));1446key_names.insert(name.clone());1447pop_keys = true;1448let dtype = input_schema.try_get(name.as_str())?;1449if options.include_boundaries {1450output_schema.with_column("_lower_boundary".into(), dtype.clone());1451output_schema.with_column("_upper_boundary".into(), dtype.clone());1452}1453output_schema.with_column(name.clone(), dtype.clone());1454}1455}1456let keys_index_len = output_schema.len();1457if pop_keys {1458let _ = keys.pop();1459}1460let keys = to_expr_irs(1461keys,1462&mut ExprToIRContext::new_with_opt_eager(expr_arena, input_schema, opt_flags),1463)?;14641465// Add aggregation column(s)1466let aggs = rewrite_projections(aggs, &key_names, input_schema, opt_flags)?;1467let aggs = to_expr_irs(1468aggs,1469&mut ExprToIRContext::new_with_opt_eager(expr_arena, input_schema, opt_flags),1470)?;1471utils::validate_expressions(&keys, expr_arena, input_schema, "group by")?;1472utils::validate_expressions(&aggs, expr_arena, input_schema, "group by")?;14731474let mut aggs_schema = expr_irs_to_schema(&aggs, input_schema, expr_arena)?;14751476// Make sure aggregation columns do not contain duplicates1477if aggs_schema.len() < aggs.len() {1478let mut names = PlHashSet::with_capacity(aggs.len());1479for agg in aggs.iter() {1480let name = agg.output_name();1481polars_ensure!(names.insert(name.clone()), duplicate = name)1482}1483}14841485// Coerce aggregation column(s) into List unless not needed (auto-implode)1486debug_assert!(aggs_schema.len() == aggs.len());1487for ((_name, dtype), expr) in aggs_schema.iter_mut().zip(&aggs) {1488if !expr.is_scalar(expr_arena) {1489*dtype = dtype.clone().implode();1490}1491}14921493// Final output_schema1494output_schema.merge(aggs_schema);14951496// Make sure aggregation columns do not contain keys or index columns1497if output_schema.len() < (keys_index_len + aggs.len()) {1498let mut names = PlHashSet::with_capacity(output_schema.len());1499for agg in aggs.iter().chain(keys.iter()) {1500let name = agg.output_name();1501polars_ensure!(names.insert(name.clone()), duplicate = name)1502}1503}15041505Ok((keys, aggs, Arc::new(output_schema)))1506}15071508fn stats_helper<F, E>(condition: F, expr: E, schema: &Schema) -> Vec<Expr>1509where1510F: Fn(&DataType) -> bool,1511E: Fn(&PlSmallStr) -> Expr,1512{1513schema1514.iter()1515.map(|(name, dt)| {1516if condition(dt) {1517expr(name)1518} else {1519lit(NULL).cast(dt.clone()).alias(name.clone())1520}1521})1522.collect()1523}15241525pub(crate) fn maybe_init_projection_excluding_hive(1526reader_schema: &Either<ArrowSchemaRef, SchemaRef>,1527hive_parts: Option<&SchemaRef>,1528) -> Option<Arc<[PlSmallStr]>> {1529// Update `with_columns` with a projection so that hive columns aren't loaded from the1530// file1531let hive_schema = hive_parts?;15321533match &reader_schema {1534Either::Left(reader_schema) => hive_schema1535.iter_names()1536.any(|x| reader_schema.contains(x))1537.then(|| {1538reader_schema1539.iter_names_cloned()1540.filter(|x| !hive_schema.contains(x))1541.collect::<Arc<[_]>>()1542}),1543Either::Right(reader_schema) => hive_schema1544.iter_names()1545.any(|x| reader_schema.contains(x))1546.then(|| {1547reader_schema1548.iter_names_cloned()1549.filter(|x| !hive_schema.contains(x))1550.collect::<Arc<[_]>>()1551}),1552}1553}155415551556