Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/conversion/dsl_to_ir/concat.rs
7889 views
1
use super::*;
2
3
fn nodes_to_schemas(inputs: &[Node], lp_arena: &mut Arena<IR>) -> Vec<SchemaRef> {
4
inputs
5
.iter()
6
.map(|n| lp_arena.get(*n).schema(lp_arena).into_owned())
7
.collect()
8
}
9
10
pub(super) fn convert_diagonal_concat(
11
mut inputs: Vec<Node>,
12
lp_arena: &mut Arena<IR>,
13
expr_arena: &mut Arena<AExpr>,
14
) -> PolarsResult<Vec<Node>> {
15
let schemas = nodes_to_schemas(&inputs, lp_arena);
16
17
let upper_bound_width = schemas.iter().map(|sch| sch.len()).sum();
18
19
let mut total_schema = Schema::with_capacity(upper_bound_width);
20
21
for sch in schemas.iter() {
22
sch.iter().for_each(|(name, dtype)| {
23
if !total_schema.contains(name) {
24
total_schema.with_column(name.as_str().into(), dtype.clone());
25
}
26
});
27
}
28
if total_schema.is_empty() {
29
return Ok(inputs);
30
}
31
32
let mut has_empty = false;
33
34
for (node, lf_schema) in inputs.iter_mut().zip(schemas.iter()) {
35
// Discard, this works physically
36
if lf_schema.is_empty() {
37
has_empty = true;
38
}
39
40
let mut columns_to_add = vec![];
41
for (name, dtype) in total_schema.iter() {
42
// If a name from Total Schema is not present - append
43
if lf_schema.get_field(name).is_none() {
44
columns_to_add.push(
45
AExprBuilder::lit_scalar(Scalar::null(dtype.clone()), expr_arena)
46
.expr_ir(name.clone()),
47
)
48
}
49
}
50
*node = IRBuilder::new(*node, expr_arena, lp_arena)
51
// Add the missing columns
52
.with_columns(columns_to_add, Default::default())
53
// Now, reorder to match schema.
54
.project_simple(total_schema.iter_names().map(|v| v.as_str()))
55
.unwrap()
56
.node();
57
}
58
59
if has_empty {
60
Ok(inputs
61
.into_iter()
62
.zip(schemas)
63
.filter_map(|(input, schema)| if schema.is_empty() { None } else { Some(input) })
64
.collect())
65
} else {
66
Ok(inputs)
67
}
68
}
69
70
pub(super) fn convert_st_union(
71
inputs: &mut [Node],
72
lp_arena: &mut Arena<IR>,
73
expr_arena: &mut Arena<AExpr>,
74
opt_flags: &OptFlags,
75
) -> PolarsResult<()> {
76
let mut schema = (**lp_arena.get(inputs[0]).schema(lp_arena)).clone();
77
78
let mut changed = false;
79
for input in inputs[1..].iter() {
80
let schema_other = lp_arena.get(*input).schema(lp_arena);
81
changed |= schema.to_supertype(schema_other.as_ref())?;
82
}
83
84
if changed {
85
for input in inputs {
86
let mut exprs = vec![];
87
let input_schema = lp_arena.get(*input).schema(lp_arena);
88
89
let to_cast = input_schema.iter().zip(schema.iter_values()).flat_map(
90
|((left_name, left_type), st)| {
91
if left_type != st {
92
Some(col(left_name.clone()).cast(st.clone()))
93
} else {
94
None
95
}
96
},
97
);
98
exprs.extend(to_cast);
99
100
if !exprs.is_empty() {
101
let expr = to_expr_irs(
102
exprs,
103
&mut ExprToIRContext::new_with_opt_eager(expr_arena, &input_schema, opt_flags),
104
)?;
105
let lp = IRBuilder::new(*input, expr_arena, lp_arena)
106
.with_columns(expr, Default::default())
107
.build();
108
109
let node = lp_arena.add(lp);
110
*input = node
111
}
112
}
113
}
114
Ok(())
115
}
116
117
pub(super) fn h_concat_schema(
118
inputs: &[Node],
119
lp_arena: &mut Arena<IR>,
120
) -> PolarsResult<SchemaRef> {
121
let schemas = nodes_to_schemas(inputs, lp_arena);
122
let combined_schema = merge_schemas(&schemas)?;
123
Ok(Arc::new(combined_schema))
124
}
125
126