Path: blob/main/crates/polars-plan/src/plans/conversion/dsl_to_ir/concat.rs
7889 views
use super::*;12fn nodes_to_schemas(inputs: &[Node], lp_arena: &mut Arena<IR>) -> Vec<SchemaRef> {3inputs4.iter()5.map(|n| lp_arena.get(*n).schema(lp_arena).into_owned())6.collect()7}89pub(super) fn convert_diagonal_concat(10mut inputs: Vec<Node>,11lp_arena: &mut Arena<IR>,12expr_arena: &mut Arena<AExpr>,13) -> PolarsResult<Vec<Node>> {14let schemas = nodes_to_schemas(&inputs, lp_arena);1516let upper_bound_width = schemas.iter().map(|sch| sch.len()).sum();1718let mut total_schema = Schema::with_capacity(upper_bound_width);1920for sch in schemas.iter() {21sch.iter().for_each(|(name, dtype)| {22if !total_schema.contains(name) {23total_schema.with_column(name.as_str().into(), dtype.clone());24}25});26}27if total_schema.is_empty() {28return Ok(inputs);29}3031let mut has_empty = false;3233for (node, lf_schema) in inputs.iter_mut().zip(schemas.iter()) {34// Discard, this works physically35if lf_schema.is_empty() {36has_empty = true;37}3839let mut columns_to_add = vec![];40for (name, dtype) in total_schema.iter() {41// If a name from Total Schema is not present - append42if lf_schema.get_field(name).is_none() {43columns_to_add.push(44AExprBuilder::lit_scalar(Scalar::null(dtype.clone()), expr_arena)45.expr_ir(name.clone()),46)47}48}49*node = IRBuilder::new(*node, expr_arena, lp_arena)50// Add the missing columns51.with_columns(columns_to_add, Default::default())52// Now, reorder to match schema.53.project_simple(total_schema.iter_names().map(|v| v.as_str()))54.unwrap()55.node();56}5758if has_empty {59Ok(inputs60.into_iter()61.zip(schemas)62.filter_map(|(input, schema)| if schema.is_empty() { None } else { Some(input) })63.collect())64} else {65Ok(inputs)66}67}6869pub(super) fn convert_st_union(70inputs: &mut [Node],71lp_arena: &mut Arena<IR>,72expr_arena: &mut Arena<AExpr>,73opt_flags: &OptFlags,74) -> PolarsResult<()> {75let mut schema = (**lp_arena.get(inputs[0]).schema(lp_arena)).clone();7677let mut changed = false;78for input in inputs[1..].iter() {79let schema_other = lp_arena.get(*input).schema(lp_arena);80changed |= schema.to_supertype(schema_other.as_ref())?;81}8283if changed {84for input in inputs {85let mut exprs = vec![];86let input_schema = lp_arena.get(*input).schema(lp_arena);8788let to_cast = input_schema.iter().zip(schema.iter_values()).flat_map(89|((left_name, left_type), st)| {90if left_type != st {91Some(col(left_name.clone()).cast(st.clone()))92} else {93None94}95},96);97exprs.extend(to_cast);9899if !exprs.is_empty() {100let expr = to_expr_irs(101exprs,102&mut ExprToIRContext::new_with_opt_eager(expr_arena, &input_schema, opt_flags),103)?;104let lp = IRBuilder::new(*input, expr_arena, lp_arena)105.with_columns(expr, Default::default())106.build();107108let node = lp_arena.add(lp);109*input = node110}111}112}113Ok(())114}115116pub(super) fn h_concat_schema(117inputs: &[Node],118lp_arena: &mut Arena<IR>,119) -> PolarsResult<SchemaRef> {120let schemas = nodes_to_schemas(inputs, lp_arena);121let combined_schema = merge_schemas(&schemas)?;122Ok(Arc::new(combined_schema))123}124125126