Path: blob/main/crates/polars-plan/src/plans/conversion/dsl_to_ir/mod.rs
8503 views
use arrow::datatypes::ArrowSchemaRef;1use either::Either;2use expr_expansion::rewrite_projections;3use futures::stream::FuturesUnordered;4use hive::hive_partitions_from_paths;5use polars_core::chunked_array::cast::CastOptions;6use polars_core::config::verbose;7use polars_io::ExternalCompression;8use polars_io::pl_async::get_runtime;9use polars_utils::format_pl_smallstr;10use polars_utils::itertools::Itertools;11use polars_utils::pl_path::PlRefPath;12use polars_utils::unique_id::UniqueId;1314use super::convert_utils::SplitPredicates;15use super::stack_opt::ConversionOptimizer;16use super::*;17use crate::constants::get_pl_element_name;18use crate::dsl::PartitionedSinkOptions;19use crate::dsl::file_provider::{FileProviderType, HivePathProvider};20use crate::dsl::functions::{all_horizontal, col};21use crate::plans::conversion::dsl_to_ir::scans::SourcesToFileInfo;2223mod concat;24mod datatype_fn_to_ir;25mod expr_expansion;26mod expr_to_ir;27mod functions;28mod join;29mod scans;30mod utils;31pub use expr_expansion::{expand_expression, is_regex_projection, prepare_projection};32pub use expr_to_ir::{ExprToIRContext, to_expr_ir};33use expr_to_ir::{to_expr_ir_materialized_lit, to_expr_irs};34use utils::DslConversionContext;3536macro_rules! failed_here {37($($t:tt)*) => {38format!("'{}'", stringify!($($t)*)).into()39}40}41pub(super) use failed_here;4243pub fn to_alp(44lp: DslPlan,45expr_arena: &mut Arena<AExpr>,46lp_arena: &mut Arena<IR>,47// Only `SIMPLIFY_EXPR`, `TYPE_COERCION`, `TYPE_CHECK` are respected.48opt_flags: &mut OptFlags,49) -> PolarsResult<Node> {50let conversion_optimizer = ConversionOptimizer::new(51opt_flags.contains(OptFlags::SIMPLIFY_EXPR),52opt_flags.contains(OptFlags::TYPE_COERCION),53opt_flags.contains(OptFlags::TYPE_CHECK),54);5556let mut ctxt = DslConversionContext {57expr_arena,58lp_arena,59conversion_optimizer,60opt_flags,61nodes_scratch: &mut unitvec![],62cache_file_info: Default::default(),63pushdown_maintain_errors: optimizer::pushdown_maintain_errors(),64verbose: verbose(),65seen_caches: Default::default(),66};6768match to_alp_impl(lp, &mut ctxt) {69Ok(out) => Ok(out),70Err(err) => {71if opt_flags.contains(OptFlags::EAGER) {72// If we dispatched to the lazy engine from the eager API, we don't want to resolve73// where in the query plan it went wrong. It is clear from the backtrace anyway.74return Err(err.remove_context());75};76let Some(ir_until_then) = lp_arena.last_node() else {77return Err(err);78};79let node_name = if let PolarsError::Context { msg, .. } = &err {80msg81} else {82"THIS_NODE"83};84let plan = IRPlan::new(85ir_until_then,86std::mem::take(lp_arena),87std::mem::take(expr_arena),88);89let location = format!("{}", plan.display());90Err(err.wrap_msg(|msg| {91format!("{msg}\n\nResolved plan until failure:\n\n\t---> FAILED HERE RESOLVING {node_name} <---\n{location}")92}))93},94}95}9697fn run_conversion(lp: IR, ctxt: &mut DslConversionContext, name: &str) -> PolarsResult<Node> {98let lp_node = ctxt.lp_arena.add(lp);99ctxt.conversion_optimizer100.optimize_exprs(ctxt.expr_arena, ctxt.lp_arena, lp_node, false)101.map_err(|e| e.context(format!("'{name}' failed").into()))?;102103Ok(lp_node)104}105106async fn fetch_metadata(107lp: &DslPlan,108cache_file_info: SourcesToFileInfo,109verbose: bool,110) -> PolarsResult<()> {111use futures::stream::StreamExt;112let mut futures = lp113.into_iter()114.filter_map(|dsl| {115let DslPlan::Scan {116sources,117unified_scan_args,118scan_type,119cached_ir,120} = dsl121else {122return None;123};124Some(scans::dsl_to_ir(125sources.clone(),126unified_scan_args.clone(),127scan_type.clone(),128cached_ir.clone(),129cache_file_info.clone(),130verbose,131))132})133.collect::<FuturesUnordered<_>>();134135while let Some(result) = futures.next().await {136result?137}138Ok::<(), PolarsError>(())139}140141/// converts LogicalPlan to IR142/// it adds expressions & lps to the respective arenas as it traverses the plan143/// finally it returns the top node of the logical plan144#[recursive]145pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult<Node> {146let owned = Arc::unwrap_or_clone;147148// First do a pass to collect all scans and fetch all metadata concurrently.149{150let verbose = ctxt.verbose;151let cache_file_info = ctxt.cache_file_info.clone();152use tokio::runtime::Handle;153154let fut = fetch_metadata(&lp, cache_file_info, verbose);155if let Ok(_handle) = Handle::try_current() {156get_runtime().block_in_place_on(fut)?;157} else {158get_runtime().block_on(fut)?;159}160}161162let v = match lp {163DslPlan::Scan {164sources: _,165unified_scan_args: _,166scan_type: _,167cached_ir,168} => cached_ir.lock().unwrap().clone().unwrap(),169#[cfg(feature = "python")]170DslPlan::PythonScan { options } => {171use crate::dsl::python_dsl::PythonOptionsDsl;172173let schema = options.get_schema()?;174175let PythonOptionsDsl {176scan_fn,177schema_fn: _,178python_source,179validate_schema,180is_pure,181} = options;182183IR::PythonScan {184options: PythonOptions {185scan_fn,186schema,187python_source,188validate_schema,189output_schema: Default::default(),190with_columns: Default::default(),191n_rows: Default::default(),192predicate: Default::default(),193is_pure,194},195}196},197DslPlan::Union { inputs, args } => {198let mut inputs = inputs199.into_iter()200.map(|lp| to_alp_impl(lp, ctxt))201.collect::<PolarsResult<Vec<_>>>()202.map_err(|e| e.context(failed_here!(vertical concat)))?;203204if args.diagonal {205inputs = concat::convert_diagonal_concat(inputs, ctxt.lp_arena, ctxt.expr_arena)?;206}207208if args.to_supertypes {209concat::convert_st_union(210&mut inputs,211ctxt.lp_arena,212ctxt.expr_arena,213ctxt.opt_flags,214)215.map_err(|e| e.context(failed_here!(vertical concat)))?;216}217218let first_n = *inputs.first().ok_or_else(219|| polars_err!(InvalidOperation: "expected at least one input in 'union'/'concat'"),220)?;221let schema = ctxt.lp_arena.get(first_n).schema(ctxt.lp_arena);222for n in &inputs[1..] {223let schema_i = ctxt.lp_arena.get(*n).schema(ctxt.lp_arena);224// The first argument225schema_i.matches_schema(schema.as_ref()).map_err(|_| polars_err!(InvalidOperation: "'union'/'concat' inputs should all have the same schema,\226got\n{:?} and \n{:?}", schema, schema_i)227)?;228}229230let options = args.into();231IR::Union { inputs, options }232},233DslPlan::HConcat { inputs, options } => {234let inputs = inputs235.into_iter()236.map(|lp| to_alp_impl(lp, ctxt))237.collect::<PolarsResult<Vec<_>>>()238.map_err(|e| e.context(failed_here!(horizontal concat)))?;239240let schema = concat::h_concat_schema(&inputs, ctxt.lp_arena)?;241242IR::HConcat {243inputs,244schema,245options,246}247},248DslPlan::Filter { input, predicate } => {249let mut input =250to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(filter)))?;251let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);252253let mut out = Vec::with_capacity(1);254expr_expansion::expand_expression(255&predicate,256&PlHashSet::default(),257input_schema.as_ref().as_ref(),258&mut out,259ctxt.opt_flags,260)?;261262let predicate = match out.len() {2631 => {264// all good265out.pop().unwrap()266},2670 => {268polars_bail!(269ComputeError:270"The predicate expanded to zero expressions. \271This may for example be caused by a regex not matching column names or \272a column dtype match not hitting any dtypes in the DataFrame"273);274},275_ => {276let mut expanded = String::new();277for e in out.iter().take(5) {278expanded.push_str(&format!("\t{e:?},\n"))279}280// pop latest comma281expanded.pop();282if out.len() > 5 {283expanded.push_str("\t...\n")284}285286if cfg!(feature = "python") {287polars_bail!(288ComputeError:289"The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\290This is ambiguous. Try to combine the predicates with the 'all' or `any' expression."291)292} else {293polars_bail!(294ComputeError:295"The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\296This is ambiguous. Try to combine the predicates with the 'all_horizontal' or `any_horizontal' expression."297)298};299},300};301let predicate_ae = to_expr_ir(302predicate,303&mut ExprToIRContext::new_with_opt_eager(304ctxt.expr_arena,305&input_schema,306ctxt.opt_flags,307),308)?;309310if ctxt.opt_flags.predicate_pushdown() {311ctxt.nodes_scratch.clear();312313if let Some(SplitPredicates { pushable, fallible }) = SplitPredicates::new(314predicate_ae.node(),315ctxt.expr_arena,316Some(ctxt.nodes_scratch),317ctxt.pushdown_maintain_errors,318) {319let mut update_input = |predicate: Node| -> PolarsResult<()> {320let predicate = ExprIR::from_node(predicate, ctxt.expr_arena);321ctxt.conversion_optimizer322.push_scratch(predicate.node(), ctxt.expr_arena);323let lp = IR::Filter { input, predicate };324input = run_conversion(lp, ctxt, "filter")?;325326Ok(())327};328329// Pushables first, then fallible.330331for predicate in pushable {332update_input(predicate)?;333}334335if let Some(node) = fallible {336update_input(node)?;337}338339return Ok(input);340};341};342343ctxt.conversion_optimizer344.push_scratch(predicate_ae.node(), ctxt.expr_arena);345let lp = IR::Filter {346input,347predicate: predicate_ae,348};349return run_conversion(lp, ctxt, "filter");350},351DslPlan::Slice { input, offset, len } => {352let input =353to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(slice)))?;354355if len == 0 {356let input_schema = ctxt357.lp_arena358.get(input)359.schema(ctxt.lp_arena)360.as_ref()361.clone();362363IR::DataFrameScan {364df: Arc::new(DataFrame::empty_with_schema(&input_schema)),365schema: input_schema.clone(),366output_schema: None,367}368} else {369IR::Slice { input, offset, len }370}371},372DslPlan::DataFrameScan { df, schema } => IR::DataFrameScan {373df,374schema,375output_schema: None,376},377DslPlan::Select {378expr,379input,380options,381} => {382let input =383to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(select)))?;384let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);385let (exprs, schema) = prepare_projection(expr, &input_schema, ctxt.opt_flags)386.map_err(|e| e.context(failed_here!(select)))?;387388if exprs.is_empty() {389ctxt.lp_arena.replace(input, utils::empty_df());390return Ok(input);391}392393let eirs = to_expr_irs(394exprs,395&mut ExprToIRContext::new_with_opt_eager(396ctxt.expr_arena,397&input_schema,398ctxt.opt_flags,399),400)?;401ctxt.conversion_optimizer402.fill_scratch(&eirs, ctxt.expr_arena);403404let schema = Arc::new(schema);405let lp = IR::Select {406expr: eirs,407input,408schema,409options,410};411412return run_conversion(lp, ctxt, "select").map_err(|e| e.context(failed_here!(select)));413},414DslPlan::Sort {415input,416by_column,417slice,418mut sort_options,419} => {420let input =421to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(select)))?;422let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);423424// note: if given an Expr::Columns, count the individual cols425let n_by_exprs = if by_column.len() == 1 {426match &by_column[0] {427Expr::Selector(s) => s.into_columns(&input_schema, &Default::default())?.len(),428_ => 1,429}430} else {431by_column.len()432};433let n_desc = sort_options.descending.len();434polars_ensure!(435n_desc == n_by_exprs || n_desc == 1,436ComputeError: "the length of `descending` ({}) does not match the length of `by` ({})", n_desc, by_column.len()437);438let n_nulls_last = sort_options.nulls_last.len();439polars_ensure!(440n_nulls_last == n_by_exprs || n_nulls_last == 1,441ComputeError: "the length of `nulls_last` ({}) does not match the length of `by` ({})", n_nulls_last, by_column.len()442);443444let mut expanded_cols = Vec::new();445let mut nulls_last = Vec::new();446let mut descending = Vec::new();447448// note: nulls_last/descending need to be matched to expanded multi-output expressions.449// when one of nulls_last/descending has not been updated from the default (single450// value true/false), 'cycle' ensures that "by_column" iter is not truncated.451for (c, (&n, &d)) in by_column.into_iter().zip(452sort_options453.nulls_last454.iter()455.cycle()456.zip(sort_options.descending.iter().cycle()),457) {458let exprs = utils::expand_expressions(459input,460vec![c],461ctxt.lp_arena,462ctxt.expr_arena,463ctxt.opt_flags,464)465.map_err(|e| e.context(failed_here!(sort)))?;466467nulls_last.extend(std::iter::repeat_n(n, exprs.len()));468descending.extend(std::iter::repeat_n(d, exprs.len()));469expanded_cols.extend(exprs);470}471sort_options.nulls_last = nulls_last;472sort_options.descending = descending;473474ctxt.conversion_optimizer475.fill_scratch(&expanded_cols, ctxt.expr_arena);476let mut by_column = expanded_cols;477478// Remove null columns in multi-columns sort479if by_column.len() > 1 {480let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);481482let mut null_columns = vec![];483484for (i, c) in by_column.iter().enumerate() {485if let DataType::Null = c.dtype(&input_schema, ctxt.expr_arena)? {486null_columns.push(i);487}488}489// All null columns, only take one.490if null_columns.len() == by_column.len() {491by_column.truncate(1);492sort_options.nulls_last.truncate(1);493sort_options.descending.truncate(1);494}495// Remove the null columns496else if !null_columns.is_empty() {497for i in null_columns.into_iter().rev() {498by_column.remove(i);499sort_options.nulls_last.remove(i);500sort_options.descending.remove(i);501}502}503}504if by_column.is_empty() {505return Ok(input);506};507508let lp = IR::Sort {509input,510by_column,511slice: slice.map(|t| (t.0, t.1, None)),512sort_options,513};514515return run_conversion(lp, ctxt, "sort").map_err(|e| e.context(failed_here!(sort)));516},517DslPlan::Cache { input, id } => {518let input = match ctxt.seen_caches.get(&id) {519Some(input) => *input,520None => {521let input = to_alp_impl(owned(input), ctxt)522.map_err(|e| e.context(failed_here!(cache)))?;523let seen_before = ctxt.seen_caches.insert(id, input);524assert!(525seen_before.is_none(),526"Cache could not have been created in the mean time. That would make the DAG cyclic."527);528input529},530};531532IR::Cache { input, id }533},534DslPlan::GroupBy {535input,536keys,537predicates,538mut aggs,539apply,540maintain_order,541options,542} => {543// If the group by contains any predicates, we update the plan by turning the544// predicates into aggregations and filtering on them. Then, we recursively call545// this function.546if !predicates.is_empty() {547let predicate_names = (0..predicates.len())548.map(|i| format_pl_smallstr!("__POLARS_HAVING_{i}"))549.collect::<Arc<[_]>>();550let predicates = predicates551.into_iter()552.zip(predicate_names.iter())553.map(|(p, name)| p.alias(name.clone()))554.collect_vec();555aggs.extend(predicates);556557let lp = DslPlan::GroupBy {558input,559keys,560predicates: vec![],561aggs,562apply,563maintain_order,564options,565};566let lp = DslBuilder::from(lp)567.filter(568all_horizontal(569predicate_names.iter().map(|n| col(n.clone())).collect_vec(),570)571.unwrap(),572)573.drop(Selector::ByName {574names: predicate_names,575strict: true,576})577.build();578return to_alp_impl(lp, ctxt);579}580581// NOTE: As we went into this branch, we know that no predicates are provided.582let input =583to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(group_by)))?;584585// Rolling + group-by sorts the whole table, so remove unneeded columns586if ctxt.opt_flags.eager() && options.is_rolling() && !keys.is_empty() {587ctxt.opt_flags.insert(OptFlags::PROJECTION_PUSHDOWN)588}589590let (keys, aggs, schema) = resolve_group_by(591input,592keys,593aggs,594&options,595ctxt.lp_arena,596ctxt.expr_arena,597ctxt.opt_flags,598)599.map_err(|e| e.context(failed_here!(group_by)))?;600601let (apply, schema) = if let Some((apply, schema)) = apply {602(Some(apply), schema)603} else {604(None, schema)605};606607ctxt.conversion_optimizer608.fill_scratch(&keys, ctxt.expr_arena);609ctxt.conversion_optimizer610.fill_scratch(&aggs, ctxt.expr_arena);611612let lp = IR::GroupBy {613input,614keys,615aggs,616schema,617apply,618maintain_order,619options,620};621return run_conversion(lp, ctxt, "group_by")622.map_err(|e| e.context(failed_here!(group_by)));623},624DslPlan::Join {625input_left,626input_right,627left_on,628right_on,629predicates,630options,631} => {632return join::resolve_join(633Either::Left(input_left),634Either::Left(input_right),635left_on,636right_on,637predicates,638JoinOptionsIR::from(Arc::unwrap_or_clone(options)),639ctxt,640)641.map_err(|e| e.context(failed_here!(join)))642.map(|t| t.0);643},644DslPlan::HStack {645input,646exprs,647options,648} => {649let input = to_alp_impl(owned(input), ctxt)650.map_err(|e| e.context(failed_here!(with_columns)))?;651let (exprs, schema) =652resolve_with_columns(exprs, input, ctxt.lp_arena, ctxt.expr_arena, ctxt.opt_flags)653.map_err(|e| e.context(failed_here!(with_columns)))?;654655ctxt.conversion_optimizer656.fill_scratch(&exprs, ctxt.expr_arena);657let lp = IR::HStack {658input,659exprs,660schema,661options,662};663return run_conversion(lp, ctxt, "with_columns");664},665DslPlan::MatchToSchema {666input,667match_schema,668per_column,669extra_columns,670} => {671let input =672to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;673let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);674675assert_eq!(per_column.len(), match_schema.len());676677if input_schema.as_ref() == &match_schema {678return Ok(input);679}680681let mut exprs = Vec::with_capacity(match_schema.len());682let mut found_missing_columns = Vec::new();683let mut used_input_columns = 0;684685for ((column, dtype), per_column) in match_schema.iter().zip(per_column.iter()) {686match input_schema.get(column) {687None => match &per_column.missing_columns {688MissingColumnsPolicyOrExpr::Raise => found_missing_columns.push(column),689MissingColumnsPolicyOrExpr::Insert => exprs.push(Expr::Alias(690Arc::new(Expr::Literal(LiteralValue::Scalar(Scalar::null(691dtype.clone(),692)))),693column.clone(),694)),695MissingColumnsPolicyOrExpr::InsertWith(expr) => {696exprs.push(Expr::Alias(Arc::new(expr.clone()), column.clone()))697},698},699Some(input_dtype) if dtype == input_dtype => {700used_input_columns += 1;701exprs.push(Expr::Column(column.clone()))702},703Some(input_dtype) => {704let from_dtype = input_dtype;705let to_dtype = dtype;706707let policy = CastColumnsPolicy {708integer_upcast: per_column.integer_cast == UpcastOrForbid::Upcast,709float_upcast: per_column.float_cast == UpcastOrForbid::Upcast,710missing_struct_fields: per_column.missing_struct_fields,711extra_struct_fields: per_column.extra_struct_fields,712713..Default::default()714};715716let should_cast =717policy.should_cast_column(column, to_dtype, from_dtype)?;718719let mut expr = Expr::Column(PlSmallStr::from_str(column));720if should_cast {721expr = expr.cast_with_options(to_dtype.clone(), CastOptions::NonStrict);722}723724used_input_columns += 1;725exprs.push(expr);726},727}728}729730// Report the error for missing columns731if let Some(lst) = found_missing_columns.first() {732use std::fmt::Write;733let mut formatted = String::new();734write!(&mut formatted, "\"{}\"", found_missing_columns[0]).unwrap();735for c in &found_missing_columns[1..] {736write!(&mut formatted, ", \"{c}\"").unwrap();737}738739write!(&mut formatted, "\"{lst}\"").unwrap();740polars_bail!(SchemaMismatch: "missing columns in `match_to_schema`: {formatted}");741}742743// Report the error for extra columns744if used_input_columns != input_schema.len()745&& extra_columns == ExtraColumnsPolicy::Raise746{747let found_extra_columns = input_schema748.iter_names()749.filter(|n| !match_schema.contains(n))750.collect::<Vec<_>>();751752use std::fmt::Write;753let mut formatted = String::new();754write!(&mut formatted, "\"{}\"", found_extra_columns[0]).unwrap();755for c in &found_extra_columns[1..] {756write!(&mut formatted, ", \"{c}\"").unwrap();757}758759polars_bail!(SchemaMismatch: "extra columns in `match_to_schema`: {formatted}");760}761762let exprs = to_expr_irs(763exprs,764&mut ExprToIRContext::new_with_opt_eager(765ctxt.expr_arena,766&input_schema,767ctxt.opt_flags,768),769)?;770771ctxt.conversion_optimizer772.fill_scratch(&exprs, ctxt.expr_arena);773let lp = IR::Select {774input,775expr: exprs,776schema: match_schema.clone(),777options: ProjectionOptions {778run_parallel: true,779duplicate_check: false,780should_broadcast: true,781},782};783return run_conversion(lp, ctxt, "match_to_schema");784},785DslPlan::PipeWithSchema { input, callback } => {786// Derive the schema from the input787let mut inputs = Vec::with_capacity(input.len());788let mut input_schemas = Vec::with_capacity(input.len());789790for plan in input.as_ref() {791let ir = to_alp_impl(plan.clone(), ctxt)?;792let schema = ctxt.lp_arena.get(ir).schema(ctxt.lp_arena).into_owned();793794let dsl = DslPlan::IR {795dsl: Arc::new(plan.clone()),796version: ctxt.lp_arena.version(),797node: Some(ir),798};799inputs.push(dsl);800input_schemas.push(schema);801}802803// Adjust the input and start conversion again804let input_adjusted = callback.call((inputs, input_schemas))?;805return to_alp_impl(input_adjusted, ctxt);806},807#[cfg(feature = "pivot")]808DslPlan::Pivot {809input,810on,811on_columns,812index,813values,814agg,815maintain_order,816separator,817} => {818let input =819to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;820let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);821822let on = on.into_columns(input_schema.as_ref(), &Default::default())?;823let index = index.into_columns(input_schema.as_ref(), &Default::default())?;824let values = values.into_columns(input_schema.as_ref(), &Default::default())?;825826polars_ensure!(!on.is_empty(), InvalidOperation: "`pivot` called without `on` columns.");827polars_ensure!(on.len() == on_columns.width(), InvalidOperation: "`pivot` expected `on` and `on_columns` to have the same amount of columns.");828if on.len() > 1 {829polars_ensure!(830on_columns.columns().iter().zip(on.iter()).all(|(c, o)| o == c.name()),831InvalidOperation: "`pivot` has mismatching column names between `on` and `on_columns`."832);833}834polars_ensure!(!values.is_empty(), InvalidOperation: "`pivot` called without `values` columns.");835836let on_titles = if on_columns.width() == 1 {837on_columns.columns()[0].cast(&DataType::String)?838} else {839on_columns840.as_ref()841.clone()842.into_struct(PlSmallStr::EMPTY)843.cast(&DataType::String)?844.into_column()845};846let on_titles = on_titles.str()?;847848let mut expr_schema = input_schema.as_ref().as_ref().clone();849let mut out = Vec::with_capacity(1);850let mut aggs = Vec::<ExprIR>::with_capacity(values.len() * on_columns.height());851for value in values.iter() {852out.clear();853let value_dtype = input_schema.try_get(value)?;854expr_schema.insert(get_pl_element_name(), value_dtype.clone());855expand_expression(856&agg,857&Default::default(),858&expr_schema,859&mut out,860ctxt.opt_flags,861)?;862polars_ensure!(863out.len() == 1,864InvalidOperation: "Pivot expression are not allowed to expand to more than 1 expression"865);866let agg = out.pop().unwrap();867let agg_ae = to_expr_ir(868agg,869&mut ExprToIRContext::new_with_opt_eager(870ctxt.expr_arena,871&expr_schema,872ctxt.opt_flags,873),874)?875.node();876877polars_ensure!(878aexpr_to_leaf_names_iter(agg_ae, ctxt.expr_arena).count() == 0,879InvalidOperation: "explicit column references are not allowed in the `aggregate_function` of `pivot`"880);881882for i in 0..on_columns.height() {883let mut name = String::new();884if values.len() > 1 {885name.push_str(value.as_str());886name.push_str(separator.as_str());887}888889name.push_str(on_titles.get(i).unwrap_or("null"));890891fn on_predicate(892on: &PlSmallStr,893on_column: &Column,894i: usize,895expr_arena: &mut Arena<AExpr>,896) -> AExprBuilder {897let e = AExprBuilder::col(on.clone(), expr_arena);898e.eq(899AExprBuilder::lit_scalar(900Scalar::new(901on_column.dtype().clone(),902on_column.get(i).unwrap().into_static(),903),904expr_arena,905),906expr_arena,907)908}909910let predicate = if on.len() == 1 {911on_predicate(&on[0], &on_columns.columns()[0], i, ctxt.expr_arena)912} else {913AExprBuilder::function(914on.iter()915.enumerate()916.map(|(j, on_col)| {917on_predicate(918on_col,919&on_columns.columns()[j],920i,921ctxt.expr_arena,922)923.expr_ir(on_col.clone())924})925.collect::<Vec<_>>(),926IRFunctionExpr::Boolean(IRBooleanFunction::AllHorizontal),927ctxt.expr_arena,928)929};930931let replacement_element = AExprBuilder::col(value.clone(), ctxt.expr_arena)932.filter(predicate, ctxt.expr_arena)933.node();934935#[recursive::recursive]936fn deep_clone_element_replace(937ae: Node,938arena: &mut Arena<AExpr>,939replacement: Node,940) -> Node {941let slf = arena.get(ae).clone();942if matches!(slf, AExpr::Element) {943return deep_clone_ae(replacement, arena);944} else if matches!(slf, AExpr::Len) {945// For backwards-compatibility, we support providing `pl.len()` to mean946// the length of the group here.947let element = deep_clone_ae(replacement, arena);948return AExprBuilder::new_from_node(element).len(arena).node();949}950951let mut children = vec![];952slf.children_rev(&mut children);953for child in &mut children {954*child = deep_clone_element_replace(*child, arena, replacement);955}956children.reverse();957958arena.add(slf.replace_children(&children))959}960aggs.push(ExprIR::new(961deep_clone_element_replace(agg_ae, ctxt.expr_arena, replacement_element),962OutputName::Alias(name.into()),963));964}965}966967let keys: Vec<_> = index968.into_iter()969.map(|i| AExprBuilder::col(i.clone(), ctxt.expr_arena).expr_ir(i))970.collect();971972let mut uniq_names = PlHashSet::new();973for expr in keys.iter().chain(aggs.iter()) {974let name = expr.output_name();975let is_uniq = uniq_names.insert(name.clone());976polars_ensure!(is_uniq, duplicate = name);977}978979IRBuilder::new(input, ctxt.expr_arena, ctxt.lp_arena)980.group_by(keys, aggs, None, maintain_order, Default::default())981.build()982},983DslPlan::Distinct { input, options } => {984let input =985to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;986let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena).into_owned();987988// "subset" param supports cols and/or arbitrary expressions989let (input, subset, temp_cols) = if let Some(exprs) = options.subset {990let exprs = rewrite_projections(991exprs,992&PlHashSet::default(),993&input_schema,994ctxt.opt_flags,995)?;996997// identify cols and exprs in "subset" param998let mut subset_colnames = vec![];999let mut subset_exprs = vec![];1000for expr in &exprs {1001match expr {1002Expr::Column(name) => {1003polars_ensure!(1004input_schema.contains(name),1005ColumnNotFound: "{name:?} not found"1006);1007subset_colnames.push(name.clone());1008},1009_ => subset_exprs.push(expr.clone()),1010}1011}10121013if subset_exprs.is_empty() {1014// "subset" is a collection of basic cols (or empty)1015(input, Some(subset_colnames.into_iter().collect()), vec![])1016} else {1017// "subset" contains exprs; add them as temporary cols1018let (aliased_exprs, temp_names): (Vec<_>, Vec<_>) = subset_exprs1019.into_iter()1020.enumerate()1021.map(|(idx, expr)| {1022let temp_name = format_pl_smallstr!("__POLARS_UNIQUE_SUBSET_{}", idx);1023(expr.alias(temp_name.clone()), temp_name)1024})1025.unzip();10261027subset_colnames.extend_from_slice(&temp_names);10281029// integrate the temporary cols with the existing "input" node1030let (temp_expr_irs, schema) = resolve_with_columns(1031aliased_exprs,1032input,1033ctxt.lp_arena,1034ctxt.expr_arena,1035ctxt.opt_flags,1036)?;1037ctxt.conversion_optimizer1038.fill_scratch(&temp_expr_irs, ctxt.expr_arena);10391040let input_with_exprs = ctxt.lp_arena.add(IR::HStack {1041input,1042exprs: temp_expr_irs,1043schema,1044options: ProjectionOptions {1045run_parallel: false,1046duplicate_check: false,1047should_broadcast: true,1048},1049});1050(1051input_with_exprs,1052Some(subset_colnames.into_iter().collect()),1053temp_names,1054)1055}1056} else {1057(input, None, vec![])1058};10591060// `distinct` definition (will contain temporary cols if we have "subset" exprs)1061let distinct_node = ctxt.lp_arena.add(IR::Distinct {1062input,1063options: DistinctOptionsIR {1064subset,1065maintain_order: options.maintain_order,1066keep_strategy: options.keep_strategy,1067slice: None,1068},1069});10701071// if no temporary cols (eg: we had no "subset" exprs), we're done...1072if temp_cols.is_empty() {1073return Ok(distinct_node);1074}10751076// ...otherwise, drop them by projecting the original schema1077return Ok(ctxt.lp_arena.add(IR::SimpleProjection {1078input: distinct_node,1079columns: input_schema,1080}));1081},1082DslPlan::MapFunction { input, function } => {1083let input = to_alp_impl(owned(input), ctxt)1084.map_err(|e| e.context(failed_here!(format!("{}", function).to_lowercase())))?;1085let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);10861087match function {1088DslFunction::Explode {1089columns,1090options,1091allow_empty,1092} => {1093let columns = columns.into_columns(&input_schema, &Default::default())?;1094polars_ensure!(!columns.is_empty() || allow_empty, InvalidOperation: "no columns provided in explode");1095if columns.is_empty() {1096return Ok(input);1097}1098let function = FunctionIR::Explode {1099columns: columns.into_iter().collect(),1100options,1101schema: Default::default(),1102};1103let ir = IR::MapFunction { input, function };1104return Ok(ctxt.lp_arena.add(ir));1105},1106DslFunction::FillNan(fill_value) => {1107let exprs = input_schema1108.iter()1109.filter_map(|(name, dtype)| match dtype {1110DataType::Float16 | DataType::Float32 | DataType::Float64 => Some(1111col(name.clone())1112.fill_nan(fill_value.clone())1113.alias(name.clone()),1114),1115_ => None,1116})1117.collect::<Vec<_>>();11181119let (exprs, schema) = resolve_with_columns(1120exprs,1121input,1122ctxt.lp_arena,1123ctxt.expr_arena,1124ctxt.opt_flags,1125)1126.map_err(|e| e.context(failed_here!(fill_nan)))?;11271128ctxt.conversion_optimizer1129.fill_scratch(&exprs, ctxt.expr_arena);11301131let lp = IR::HStack {1132input,1133exprs,1134schema,1135options: ProjectionOptions {1136duplicate_check: false,1137..Default::default()1138},1139};1140return run_conversion(lp, ctxt, "fill_nan");1141},1142DslFunction::Stats(sf) => {1143let exprs = match sf {1144StatsFunction::Var { ddof } => stats_helper(1145|dt| dt.is_primitive_numeric() || dt.is_bool() || dt.is_decimal(),1146|name| col(name.clone()).var(ddof),1147&input_schema,1148),1149StatsFunction::Std { ddof } => stats_helper(1150|dt| dt.is_primitive_numeric() || dt.is_bool() || dt.is_decimal(),1151|name| col(name.clone()).std(ddof),1152&input_schema,1153),1154StatsFunction::Quantile { quantile, method } => stats_helper(1155|dt| dt.is_primitive_numeric() || dt.is_decimal() || dt.is_temporal(),1156|name| col(name.clone()).quantile(quantile.clone(), method),1157&input_schema,1158),1159StatsFunction::Mean => stats_helper(1160|dt| {1161dt.is_primitive_numeric()1162|| dt.is_temporal()1163|| dt.is_bool()1164|| dt.is_decimal()1165},1166|name| col(name.clone()).mean(),1167&input_schema,1168),1169StatsFunction::Sum => stats_helper(1170|dt| {1171dt.is_primitive_numeric()1172|| dt.is_decimal()1173|| matches!(dt, DataType::Boolean | DataType::Duration(_))1174},1175|name| col(name.clone()).sum(),1176&input_schema,1177),1178StatsFunction::Min => stats_helper(1179|dt| dt.is_ord(),1180|name| col(name.clone()).min(),1181&input_schema,1182),1183StatsFunction::Max => stats_helper(1184|dt| dt.is_ord(),1185|name| col(name.clone()).max(),1186&input_schema,1187),1188StatsFunction::Median => stats_helper(1189|dt| {1190dt.is_primitive_numeric()1191|| dt.is_temporal()1192|| dt == &DataType::Boolean1193},1194|name| col(name.clone()).median(),1195&input_schema,1196),1197};1198let schema = Arc::new(expressions_to_schema(1199&exprs,1200&input_schema,1201|duplicate_name: &str| duplicate_name.to_string(),1202)?);1203let eirs = to_expr_irs(1204exprs,1205&mut ExprToIRContext::new_with_opt_eager(1206ctxt.expr_arena,1207&input_schema,1208ctxt.opt_flags,1209),1210)?;12111212ctxt.conversion_optimizer1213.fill_scratch(&eirs, ctxt.expr_arena);12141215let lp = IR::Select {1216input,1217expr: eirs,1218schema,1219options: ProjectionOptions {1220duplicate_check: false,1221..Default::default()1222},1223};1224return run_conversion(lp, ctxt, "stats");1225},1226DslFunction::Rename {1227existing,1228new,1229strict,1230} => {1231assert_eq!(existing.len(), new.len());1232if existing.is_empty() {1233return Ok(input);1234}12351236let existing_lut =1237PlIndexSet::from_iter(existing.iter().map(PlSmallStr::as_str));12381239let mut schema = Schema::with_capacity(input_schema.len());1240let mut num_replaced = 0;12411242// Turn the rename into a select.1243let expr = input_schema1244.iter()1245.map(|(n, dtype)| {1246Ok(match existing_lut.get_index_of(n.as_str()) {1247None => {1248schema.try_insert(n.clone(), dtype.clone())?;1249Expr::Column(n.clone())1250},1251Some(i) => {1252num_replaced += 1;1253schema.try_insert(new[i].clone(), dtype.clone())?;1254Expr::Column(n.clone()).alias(new[i].clone())1255},1256})1257})1258.collect::<PolarsResult<Vec<_>>>()?;12591260if strict && num_replaced != existing.len() {1261let col = existing.iter().find(|c| !input_schema.contains(c)).unwrap();1262polars_bail!(col_not_found = col);1263}12641265// Nothing changed, make into a no-op.1266if num_replaced == 0 {1267return Ok(input);1268}12691270let expr = to_expr_irs(1271expr,1272&mut ExprToIRContext::new_with_opt_eager(1273ctxt.expr_arena,1274&input_schema,1275ctxt.opt_flags,1276),1277)?;1278ctxt.conversion_optimizer1279.fill_scratch(&expr, ctxt.expr_arena);12801281IR::Select {1282input,1283expr,1284schema: Arc::new(schema),1285options: ProjectionOptions {1286run_parallel: false,1287duplicate_check: false,1288should_broadcast: false,1289},1290}1291},1292_ => {1293let function = function.into_function_ir(&input_schema)?;1294IR::MapFunction { input, function }1295},1296}1297},1298DslPlan::ExtContext { input, contexts } => {1299let input = to_alp_impl(owned(input), ctxt)1300.map_err(|e| e.context(failed_here!(with_context)))?;1301let contexts = contexts1302.into_iter()1303.map(|lp| to_alp_impl(lp, ctxt))1304.collect::<PolarsResult<Vec<_>>>()1305.map_err(|e| e.context(failed_here!(with_context)))?;13061307let mut schema = (**ctxt.lp_arena.get(input).schema(ctxt.lp_arena)).clone();1308for input in &contexts {1309let other_schema = ctxt.lp_arena.get(*input).schema(ctxt.lp_arena);1310for fld in other_schema.iter_fields() {1311if schema.get(fld.name()).is_none() {1312schema.with_column(fld.name, fld.dtype);1313}1314}1315}13161317IR::ExtContext {1318input,1319contexts,1320schema: Arc::new(schema),1321}1322},1323DslPlan::Sink { input, payload } => {1324let input =1325to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(sink)))?;1326let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);1327let payload = match payload {1328SinkType::Memory => SinkTypeIR::Memory,1329SinkType::Callback(f) => SinkTypeIR::Callback(f),1330SinkType::File(mut options) => {1331let mut compression_opt = None::<ExternalCompression>;13321333#[cfg(feature = "parquet")]1334if let FileWriteFormat::Parquet(options) = &mut options.file_format1335&& let Some(arrow_schema) = &mut Arc::make_mut(options).arrow_schema1336{1337let new_schema =1338validate_arrow_schema_conversion(input_schema.as_ref(), arrow_schema)?;13391340*Arc::make_mut(arrow_schema) = new_schema;1341}13421343#[cfg(feature = "csv")]1344if let FileWriteFormat::Csv(csv_options) = &options.file_format1345&& csv_options.check_extension1346{1347compression_opt = Some(csv_options.compression);1348}13491350#[cfg(feature = "json")]1351if let FileWriteFormat::NDJson(ndjson_options) = &options.file_format1352&& ndjson_options.check_extension1353{1354compression_opt = Some(ndjson_options.compression);1355}13561357if let Some(compression) = compression_opt {1358if let SinkTarget::Path(path) = &options.target {1359let extension = path.extension();13601361if let Some(suffix) = compression.file_suffix() {1362polars_ensure!(1363extension.is_none_or(|extension| extension == suffix.strip_prefix(".").unwrap_or(suffix)),1364InvalidOperation: "the path ({}) does not conform to standard naming, expected suffix: ({}), set `check_extension` to `False` if you don't want this behavior", path, suffix1365);1366} else if ["gz", "zst", "zstd"].iter().any(|compression_extension| {1367extension == Some(compression_extension)1368}) {1369polars_bail!(1370InvalidOperation: "use the compression parameter to control compression, or set `check_extension` to `False` if you want to suffix an uncompressed filename with an ending intended for compression"1371);1372}1373}1374}13751376SinkTypeIR::File(options)1377},1378SinkType::Partitioned(PartitionedSinkOptions {1379base_path,1380file_path_provider,1381partition_strategy,1382file_format,1383unified_sink_args,1384max_rows_per_file,1385approximate_bytes_per_file,1386}) => {1387let expr_to_ir_cx = &mut ExprToIRContext::new_with_opt_eager(1388ctxt.expr_arena,1389&input_schema,1390ctxt.opt_flags,1391);13921393let partition_strategy = match partition_strategy {1394PartitionStrategy::Keyed {1395keys,1396include_keys,1397keys_pre_grouped,1398} => {1399let keys = to_expr_irs(keys, expr_to_ir_cx)?;14001401polars_ensure!(1402keys.iter().all(|e| is_elementwise_rec(e.node(), ctxt.expr_arena)),1403InvalidOperation:1404"cannot use non-elementwise expressions for PartitionBy keys"1405);14061407PartitionStrategyIR::Keyed {1408keys,1409include_keys,1410keys_pre_grouped,1411}1412},1413PartitionStrategy::FileSize => PartitionStrategyIR::FileSize,1414};14151416let mut options = PartitionedSinkOptionsIR {1417base_path,1418file_path_provider: file_path_provider.unwrap_or_else(|| {1419FileProviderType::Hive(HivePathProvider {1420extension: PlSmallStr::from_static(file_format.extension()),1421})1422}),1423partition_strategy,1424file_format,1425unified_sink_args,1426max_rows_per_file,1427approximate_bytes_per_file,1428};14291430#[cfg(feature = "parquet")]1431{1432let input_schema = input_schema.into_owned();1433let file_schema =1434options.file_output_schema(&input_schema, ctxt.expr_arena)?;14351436if let FileWriteFormat::Parquet(parquet_options) = &mut options.file_format1437&& let Some(arrow_schema) =1438&mut Arc::make_mut(parquet_options).arrow_schema1439{1440let new_schema = validate_arrow_schema_conversion(1441file_schema.as_ref(),1442arrow_schema,1443)?;14441445*Arc::make_mut(arrow_schema) = new_schema;1446}1447}14481449ctxt.conversion_optimizer1450.fill_scratch(options.expr_irs_iter(), ctxt.expr_arena);14511452SinkTypeIR::Partitioned(options)1453},1454};14551456let lp = IR::Sink { input, payload };1457return run_conversion(lp, ctxt, "sink");1458},1459DslPlan::SinkMultiple { inputs } => {1460let inputs = inputs1461.into_iter()1462.map(|lp| to_alp_impl(lp, ctxt))1463.collect::<PolarsResult<Vec<_>>>()1464.map_err(|e| e.context(failed_here!(vertical concat)))?;1465IR::SinkMultiple { inputs }1466},1467#[cfg(feature = "merge_sorted")]1468DslPlan::MergeSorted {1469input_left,1470input_right,1471key,1472} => {1473let input_left = to_alp_impl(owned(input_left), ctxt)1474.map_err(|e| e.context(failed_here!(merge_sorted)))?;1475let input_right = to_alp_impl(owned(input_right), ctxt)1476.map_err(|e| e.context(failed_here!(merge_sorted)))?;14771478let left_schema = ctxt.lp_arena.get(input_left).schema(ctxt.lp_arena);1479let right_schema = ctxt.lp_arena.get(input_right).schema(ctxt.lp_arena);14801481left_schema1482.ensure_is_exact_match(&right_schema)1483.map_err(|err| err.context("merge_sorted".into()))?;14841485left_schema1486.try_get(key.as_str())1487.map_err(|err| err.context("merge_sorted".into()))?;14881489IR::MergeSorted {1490input_left,1491input_right,1492key,1493}1494},1495DslPlan::IR { node, dsl, version } => {1496return match node {1497Some(node)1498if version == ctxt.lp_arena.version()1499&& ctxt.conversion_optimizer.used_arenas.insert(version) =>1500{1501Ok(node)1502},1503_ => to_alp_impl(owned(dsl), ctxt),1504};1505},1506};1507Ok(ctxt.lp_arena.add(v))1508}15091510fn resolve_with_columns(1511exprs: Vec<Expr>,1512input: Node,1513lp_arena: &Arena<IR>,1514expr_arena: &mut Arena<AExpr>,1515opt_flags: &mut OptFlags,1516) -> PolarsResult<(Vec<ExprIR>, SchemaRef)> {1517let input_schema = lp_arena.get(input).schema(lp_arena);1518let mut output_schema = (**input_schema).clone();1519let exprs = rewrite_projections(exprs, &PlHashSet::new(), &input_schema, opt_flags)?;1520let mut output_names = PlHashSet::with_capacity(exprs.len());15211522let eirs = to_expr_irs(1523exprs,1524&mut ExprToIRContext::new_with_opt_eager(expr_arena, &input_schema, opt_flags),1525)?;1526for eir in eirs.iter() {1527let field = eir.field(&input_schema, expr_arena)?;15281529if !output_names.insert(field.name().clone()) {1530polars_bail!(1531ComputeError:1532"the name '{}' passed to `LazyFrame.with_columns` is duplicate\n\n\1533It's possible that multiple expressions are returning the same default column name. \1534If this is the case, try renaming the columns with `.alias(\"new_name\")` to avoid \1535duplicate column names.",1536field.name()1537)1538}1539output_schema.with_column(field.name, field.dtype.materialize_unknown(true)?);1540}15411542Ok((eirs, Arc::new(output_schema)))1543}15441545fn resolve_group_by(1546input: Node,1547keys: Vec<Expr>,1548aggs: Vec<Expr>,1549_options: &GroupbyOptions,1550lp_arena: &Arena<IR>,1551expr_arena: &mut Arena<AExpr>,1552opt_flags: &mut OptFlags,1553) -> PolarsResult<(Vec<ExprIR>, Vec<ExprIR>, SchemaRef)> {1554let input_schema = lp_arena.get(input).schema(lp_arena);1555let input_schema = input_schema.as_ref();1556let mut keys = rewrite_projections(keys, &PlHashSet::default(), input_schema, opt_flags)?;15571558// Initialize schema from keys1559let mut output_schema = expressions_to_schema(&keys, input_schema, |duplicate_name: &str| {1560format!("group_by keys contained duplicate output name '{duplicate_name}'")1561})?;1562let mut key_names: PlHashSet<PlSmallStr> = output_schema.iter_names().cloned().collect();15631564#[allow(unused_mut)]1565let mut pop_keys = false;1566// Add dynamic groupby index column(s)1567// Also add index columns to keys for expression expansion.1568#[cfg(feature = "dynamic_group_by")]1569{1570if let Some(options) = _options.rolling.as_ref() {1571let name = options.index_column.clone();1572let dtype = input_schema.try_get(name.as_str())?;1573keys.push(col(name.clone()));1574key_names.insert(name.clone());1575pop_keys = true;1576output_schema.with_column(name.clone(), dtype.clone());1577} else if let Some(options) = _options.dynamic.as_ref() {1578let name = options.index_column.clone();1579keys.push(col(name.clone()));1580key_names.insert(name.clone());1581pop_keys = true;1582let dtype = input_schema.try_get(name.as_str())?;1583if options.include_boundaries {1584output_schema.with_column("_lower_boundary".into(), dtype.clone());1585output_schema.with_column("_upper_boundary".into(), dtype.clone());1586}1587output_schema.with_column(name.clone(), dtype.clone());1588}1589}1590let keys_index_len = output_schema.len();1591if pop_keys {1592let _ = keys.pop();1593}1594let keys = to_expr_irs(1595keys,1596&mut ExprToIRContext::new_with_opt_eager(expr_arena, input_schema, opt_flags),1597)?;15981599// Add aggregation column(s)1600let aggs = rewrite_projections(aggs, &key_names, input_schema, opt_flags)?;1601let aggs = to_expr_irs(1602aggs,1603&mut ExprToIRContext::new_with_opt_eager(expr_arena, input_schema, opt_flags),1604)?;1605utils::validate_expressions(&keys, expr_arena, input_schema, "group by")?;1606utils::validate_expressions(&aggs, expr_arena, input_schema, "group by")?;16071608let mut aggs_schema = expr_irs_to_schema(&aggs, input_schema, expr_arena)?;16091610// Make sure aggregation columns do not contain duplicates1611if aggs_schema.len() < aggs.len() {1612let mut names = PlHashSet::with_capacity(aggs.len());1613for agg in aggs.iter() {1614let name = agg.output_name();1615polars_ensure!(names.insert(name.clone()), duplicate = name)1616}1617}16181619// Coerce aggregation column(s) into List unless not needed (auto-implode)1620debug_assert!(aggs_schema.len() == aggs.len());1621for ((_name, dtype), expr) in aggs_schema.iter_mut().zip(&aggs) {1622if !expr.is_scalar(expr_arena) {1623*dtype = dtype.clone().implode();1624}1625}16261627// Final output_schema1628output_schema.merge(aggs_schema);16291630// Make sure aggregation columns do not contain keys or index columns1631if output_schema.len() < (keys_index_len + aggs.len()) {1632let mut names = PlHashSet::with_capacity(output_schema.len());1633for agg in aggs.iter().chain(keys.iter()) {1634let name = agg.output_name();1635polars_ensure!(names.insert(name.clone()), duplicate = name)1636}1637}16381639Ok((keys, aggs, Arc::new(output_schema)))1640}16411642fn stats_helper<F, E>(condition: F, expr: E, schema: &Schema) -> Vec<Expr>1643where1644F: Fn(&DataType) -> bool,1645E: Fn(&PlSmallStr) -> Expr,1646{1647schema1648.iter()1649.map(|(name, dt)| {1650if condition(dt) {1651expr(name)1652} else {1653lit(NULL).cast(dt.clone()).alias(name.clone())1654}1655})1656.collect()1657}16581659pub(crate) fn maybe_init_projection_excluding_hive(1660reader_schema: &Either<ArrowSchemaRef, SchemaRef>,1661hive_parts: Option<&SchemaRef>,1662) -> Option<Arc<[PlSmallStr]>> {1663// Update `with_columns` with a projection so that hive columns aren't loaded from the1664// file1665let hive_schema = hive_parts?;16661667match &reader_schema {1668Either::Left(reader_schema) => hive_schema1669.iter_names()1670.any(|x| reader_schema.contains(x))1671.then(|| {1672reader_schema1673.iter_names_cloned()1674.filter(|x| !hive_schema.contains(x))1675.collect::<Arc<[_]>>()1676}),1677Either::Right(reader_schema) => hive_schema1678.iter_names()1679.any(|x| reader_schema.contains(x))1680.then(|| {1681reader_schema1682.iter_names_cloned()1683.filter(|x| !hive_schema.contains(x))1684.collect::<Arc<[_]>>()1685}),1686}1687}168816891690