Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/conversion/dsl_to_ir/mod.rs
8503 views
1
use arrow::datatypes::ArrowSchemaRef;
2
use either::Either;
3
use expr_expansion::rewrite_projections;
4
use futures::stream::FuturesUnordered;
5
use hive::hive_partitions_from_paths;
6
use polars_core::chunked_array::cast::CastOptions;
7
use polars_core::config::verbose;
8
use polars_io::ExternalCompression;
9
use polars_io::pl_async::get_runtime;
10
use polars_utils::format_pl_smallstr;
11
use polars_utils::itertools::Itertools;
12
use polars_utils::pl_path::PlRefPath;
13
use polars_utils::unique_id::UniqueId;
14
15
use super::convert_utils::SplitPredicates;
16
use super::stack_opt::ConversionOptimizer;
17
use super::*;
18
use crate::constants::get_pl_element_name;
19
use crate::dsl::PartitionedSinkOptions;
20
use crate::dsl::file_provider::{FileProviderType, HivePathProvider};
21
use crate::dsl::functions::{all_horizontal, col};
22
use crate::plans::conversion::dsl_to_ir::scans::SourcesToFileInfo;
23
24
mod concat;
25
mod datatype_fn_to_ir;
26
mod expr_expansion;
27
mod expr_to_ir;
28
mod functions;
29
mod join;
30
mod scans;
31
mod utils;
32
pub use expr_expansion::{expand_expression, is_regex_projection, prepare_projection};
33
pub use expr_to_ir::{ExprToIRContext, to_expr_ir};
34
use expr_to_ir::{to_expr_ir_materialized_lit, to_expr_irs};
35
use utils::DslConversionContext;
36
37
macro_rules! failed_here {
38
($($t:tt)*) => {
39
format!("'{}'", stringify!($($t)*)).into()
40
}
41
}
42
pub(super) use failed_here;
43
44
pub fn to_alp(
45
lp: DslPlan,
46
expr_arena: &mut Arena<AExpr>,
47
lp_arena: &mut Arena<IR>,
48
// Only `SIMPLIFY_EXPR`, `TYPE_COERCION`, `TYPE_CHECK` are respected.
49
opt_flags: &mut OptFlags,
50
) -> PolarsResult<Node> {
51
let conversion_optimizer = ConversionOptimizer::new(
52
opt_flags.contains(OptFlags::SIMPLIFY_EXPR),
53
opt_flags.contains(OptFlags::TYPE_COERCION),
54
opt_flags.contains(OptFlags::TYPE_CHECK),
55
);
56
57
let mut ctxt = DslConversionContext {
58
expr_arena,
59
lp_arena,
60
conversion_optimizer,
61
opt_flags,
62
nodes_scratch: &mut unitvec![],
63
cache_file_info: Default::default(),
64
pushdown_maintain_errors: optimizer::pushdown_maintain_errors(),
65
verbose: verbose(),
66
seen_caches: Default::default(),
67
};
68
69
match to_alp_impl(lp, &mut ctxt) {
70
Ok(out) => Ok(out),
71
Err(err) => {
72
if opt_flags.contains(OptFlags::EAGER) {
73
// If we dispatched to the lazy engine from the eager API, we don't want to resolve
74
// where in the query plan it went wrong. It is clear from the backtrace anyway.
75
return Err(err.remove_context());
76
};
77
let Some(ir_until_then) = lp_arena.last_node() else {
78
return Err(err);
79
};
80
let node_name = if let PolarsError::Context { msg, .. } = &err {
81
msg
82
} else {
83
"THIS_NODE"
84
};
85
let plan = IRPlan::new(
86
ir_until_then,
87
std::mem::take(lp_arena),
88
std::mem::take(expr_arena),
89
);
90
let location = format!("{}", plan.display());
91
Err(err.wrap_msg(|msg| {
92
format!("{msg}\n\nResolved plan until failure:\n\n\t---> FAILED HERE RESOLVING {node_name} <---\n{location}")
93
}))
94
},
95
}
96
}
97
98
fn run_conversion(lp: IR, ctxt: &mut DslConversionContext, name: &str) -> PolarsResult<Node> {
99
let lp_node = ctxt.lp_arena.add(lp);
100
ctxt.conversion_optimizer
101
.optimize_exprs(ctxt.expr_arena, ctxt.lp_arena, lp_node, false)
102
.map_err(|e| e.context(format!("'{name}' failed").into()))?;
103
104
Ok(lp_node)
105
}
106
107
async fn fetch_metadata(
108
lp: &DslPlan,
109
cache_file_info: SourcesToFileInfo,
110
verbose: bool,
111
) -> PolarsResult<()> {
112
use futures::stream::StreamExt;
113
let mut futures = lp
114
.into_iter()
115
.filter_map(|dsl| {
116
let DslPlan::Scan {
117
sources,
118
unified_scan_args,
119
scan_type,
120
cached_ir,
121
} = dsl
122
else {
123
return None;
124
};
125
Some(scans::dsl_to_ir(
126
sources.clone(),
127
unified_scan_args.clone(),
128
scan_type.clone(),
129
cached_ir.clone(),
130
cache_file_info.clone(),
131
verbose,
132
))
133
})
134
.collect::<FuturesUnordered<_>>();
135
136
while let Some(result) = futures.next().await {
137
result?
138
}
139
Ok::<(), PolarsError>(())
140
}
141
142
/// converts LogicalPlan to IR
143
/// it adds expressions & lps to the respective arenas as it traverses the plan
144
/// finally it returns the top node of the logical plan
145
#[recursive]
146
pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult<Node> {
147
let owned = Arc::unwrap_or_clone;
148
149
// First do a pass to collect all scans and fetch all metadata concurrently.
150
{
151
let verbose = ctxt.verbose;
152
let cache_file_info = ctxt.cache_file_info.clone();
153
use tokio::runtime::Handle;
154
155
let fut = fetch_metadata(&lp, cache_file_info, verbose);
156
if let Ok(_handle) = Handle::try_current() {
157
get_runtime().block_in_place_on(fut)?;
158
} else {
159
get_runtime().block_on(fut)?;
160
}
161
}
162
163
let v = match lp {
164
DslPlan::Scan {
165
sources: _,
166
unified_scan_args: _,
167
scan_type: _,
168
cached_ir,
169
} => cached_ir.lock().unwrap().clone().unwrap(),
170
#[cfg(feature = "python")]
171
DslPlan::PythonScan { options } => {
172
use crate::dsl::python_dsl::PythonOptionsDsl;
173
174
let schema = options.get_schema()?;
175
176
let PythonOptionsDsl {
177
scan_fn,
178
schema_fn: _,
179
python_source,
180
validate_schema,
181
is_pure,
182
} = options;
183
184
IR::PythonScan {
185
options: PythonOptions {
186
scan_fn,
187
schema,
188
python_source,
189
validate_schema,
190
output_schema: Default::default(),
191
with_columns: Default::default(),
192
n_rows: Default::default(),
193
predicate: Default::default(),
194
is_pure,
195
},
196
}
197
},
198
DslPlan::Union { inputs, args } => {
199
let mut inputs = inputs
200
.into_iter()
201
.map(|lp| to_alp_impl(lp, ctxt))
202
.collect::<PolarsResult<Vec<_>>>()
203
.map_err(|e| e.context(failed_here!(vertical concat)))?;
204
205
if args.diagonal {
206
inputs = concat::convert_diagonal_concat(inputs, ctxt.lp_arena, ctxt.expr_arena)?;
207
}
208
209
if args.to_supertypes {
210
concat::convert_st_union(
211
&mut inputs,
212
ctxt.lp_arena,
213
ctxt.expr_arena,
214
ctxt.opt_flags,
215
)
216
.map_err(|e| e.context(failed_here!(vertical concat)))?;
217
}
218
219
let first_n = *inputs.first().ok_or_else(
220
|| polars_err!(InvalidOperation: "expected at least one input in 'union'/'concat'"),
221
)?;
222
let schema = ctxt.lp_arena.get(first_n).schema(ctxt.lp_arena);
223
for n in &inputs[1..] {
224
let schema_i = ctxt.lp_arena.get(*n).schema(ctxt.lp_arena);
225
// The first argument
226
schema_i.matches_schema(schema.as_ref()).map_err(|_| polars_err!(InvalidOperation: "'union'/'concat' inputs should all have the same schema,\
227
got\n{:?} and \n{:?}", schema, schema_i)
228
)?;
229
}
230
231
let options = args.into();
232
IR::Union { inputs, options }
233
},
234
DslPlan::HConcat { inputs, options } => {
235
let inputs = inputs
236
.into_iter()
237
.map(|lp| to_alp_impl(lp, ctxt))
238
.collect::<PolarsResult<Vec<_>>>()
239
.map_err(|e| e.context(failed_here!(horizontal concat)))?;
240
241
let schema = concat::h_concat_schema(&inputs, ctxt.lp_arena)?;
242
243
IR::HConcat {
244
inputs,
245
schema,
246
options,
247
}
248
},
249
DslPlan::Filter { input, predicate } => {
250
let mut input =
251
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(filter)))?;
252
let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
253
254
let mut out = Vec::with_capacity(1);
255
expr_expansion::expand_expression(
256
&predicate,
257
&PlHashSet::default(),
258
input_schema.as_ref().as_ref(),
259
&mut out,
260
ctxt.opt_flags,
261
)?;
262
263
let predicate = match out.len() {
264
1 => {
265
// all good
266
out.pop().unwrap()
267
},
268
0 => {
269
polars_bail!(
270
ComputeError:
271
"The predicate expanded to zero expressions. \
272
This may for example be caused by a regex not matching column names or \
273
a column dtype match not hitting any dtypes in the DataFrame"
274
);
275
},
276
_ => {
277
let mut expanded = String::new();
278
for e in out.iter().take(5) {
279
expanded.push_str(&format!("\t{e:?},\n"))
280
}
281
// pop latest comma
282
expanded.pop();
283
if out.len() > 5 {
284
expanded.push_str("\t...\n")
285
}
286
287
if cfg!(feature = "python") {
288
polars_bail!(
289
ComputeError:
290
"The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\
291
This is ambiguous. Try to combine the predicates with the 'all' or `any' expression."
292
)
293
} else {
294
polars_bail!(
295
ComputeError:
296
"The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\
297
This is ambiguous. Try to combine the predicates with the 'all_horizontal' or `any_horizontal' expression."
298
)
299
};
300
},
301
};
302
let predicate_ae = to_expr_ir(
303
predicate,
304
&mut ExprToIRContext::new_with_opt_eager(
305
ctxt.expr_arena,
306
&input_schema,
307
ctxt.opt_flags,
308
),
309
)?;
310
311
if ctxt.opt_flags.predicate_pushdown() {
312
ctxt.nodes_scratch.clear();
313
314
if let Some(SplitPredicates { pushable, fallible }) = SplitPredicates::new(
315
predicate_ae.node(),
316
ctxt.expr_arena,
317
Some(ctxt.nodes_scratch),
318
ctxt.pushdown_maintain_errors,
319
) {
320
let mut update_input = |predicate: Node| -> PolarsResult<()> {
321
let predicate = ExprIR::from_node(predicate, ctxt.expr_arena);
322
ctxt.conversion_optimizer
323
.push_scratch(predicate.node(), ctxt.expr_arena);
324
let lp = IR::Filter { input, predicate };
325
input = run_conversion(lp, ctxt, "filter")?;
326
327
Ok(())
328
};
329
330
// Pushables first, then fallible.
331
332
for predicate in pushable {
333
update_input(predicate)?;
334
}
335
336
if let Some(node) = fallible {
337
update_input(node)?;
338
}
339
340
return Ok(input);
341
};
342
};
343
344
ctxt.conversion_optimizer
345
.push_scratch(predicate_ae.node(), ctxt.expr_arena);
346
let lp = IR::Filter {
347
input,
348
predicate: predicate_ae,
349
};
350
return run_conversion(lp, ctxt, "filter");
351
},
352
DslPlan::Slice { input, offset, len } => {
353
let input =
354
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(slice)))?;
355
356
if len == 0 {
357
let input_schema = ctxt
358
.lp_arena
359
.get(input)
360
.schema(ctxt.lp_arena)
361
.as_ref()
362
.clone();
363
364
IR::DataFrameScan {
365
df: Arc::new(DataFrame::empty_with_schema(&input_schema)),
366
schema: input_schema.clone(),
367
output_schema: None,
368
}
369
} else {
370
IR::Slice { input, offset, len }
371
}
372
},
373
DslPlan::DataFrameScan { df, schema } => IR::DataFrameScan {
374
df,
375
schema,
376
output_schema: None,
377
},
378
DslPlan::Select {
379
expr,
380
input,
381
options,
382
} => {
383
let input =
384
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(select)))?;
385
let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
386
let (exprs, schema) = prepare_projection(expr, &input_schema, ctxt.opt_flags)
387
.map_err(|e| e.context(failed_here!(select)))?;
388
389
if exprs.is_empty() {
390
ctxt.lp_arena.replace(input, utils::empty_df());
391
return Ok(input);
392
}
393
394
let eirs = to_expr_irs(
395
exprs,
396
&mut ExprToIRContext::new_with_opt_eager(
397
ctxt.expr_arena,
398
&input_schema,
399
ctxt.opt_flags,
400
),
401
)?;
402
ctxt.conversion_optimizer
403
.fill_scratch(&eirs, ctxt.expr_arena);
404
405
let schema = Arc::new(schema);
406
let lp = IR::Select {
407
expr: eirs,
408
input,
409
schema,
410
options,
411
};
412
413
return run_conversion(lp, ctxt, "select").map_err(|e| e.context(failed_here!(select)));
414
},
415
DslPlan::Sort {
416
input,
417
by_column,
418
slice,
419
mut sort_options,
420
} => {
421
let input =
422
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(select)))?;
423
let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
424
425
// note: if given an Expr::Columns, count the individual cols
426
let n_by_exprs = if by_column.len() == 1 {
427
match &by_column[0] {
428
Expr::Selector(s) => s.into_columns(&input_schema, &Default::default())?.len(),
429
_ => 1,
430
}
431
} else {
432
by_column.len()
433
};
434
let n_desc = sort_options.descending.len();
435
polars_ensure!(
436
n_desc == n_by_exprs || n_desc == 1,
437
ComputeError: "the length of `descending` ({}) does not match the length of `by` ({})", n_desc, by_column.len()
438
);
439
let n_nulls_last = sort_options.nulls_last.len();
440
polars_ensure!(
441
n_nulls_last == n_by_exprs || n_nulls_last == 1,
442
ComputeError: "the length of `nulls_last` ({}) does not match the length of `by` ({})", n_nulls_last, by_column.len()
443
);
444
445
let mut expanded_cols = Vec::new();
446
let mut nulls_last = Vec::new();
447
let mut descending = Vec::new();
448
449
// note: nulls_last/descending need to be matched to expanded multi-output expressions.
450
// when one of nulls_last/descending has not been updated from the default (single
451
// value true/false), 'cycle' ensures that "by_column" iter is not truncated.
452
for (c, (&n, &d)) in by_column.into_iter().zip(
453
sort_options
454
.nulls_last
455
.iter()
456
.cycle()
457
.zip(sort_options.descending.iter().cycle()),
458
) {
459
let exprs = utils::expand_expressions(
460
input,
461
vec![c],
462
ctxt.lp_arena,
463
ctxt.expr_arena,
464
ctxt.opt_flags,
465
)
466
.map_err(|e| e.context(failed_here!(sort)))?;
467
468
nulls_last.extend(std::iter::repeat_n(n, exprs.len()));
469
descending.extend(std::iter::repeat_n(d, exprs.len()));
470
expanded_cols.extend(exprs);
471
}
472
sort_options.nulls_last = nulls_last;
473
sort_options.descending = descending;
474
475
ctxt.conversion_optimizer
476
.fill_scratch(&expanded_cols, ctxt.expr_arena);
477
let mut by_column = expanded_cols;
478
479
// Remove null columns in multi-columns sort
480
if by_column.len() > 1 {
481
let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
482
483
let mut null_columns = vec![];
484
485
for (i, c) in by_column.iter().enumerate() {
486
if let DataType::Null = c.dtype(&input_schema, ctxt.expr_arena)? {
487
null_columns.push(i);
488
}
489
}
490
// All null columns, only take one.
491
if null_columns.len() == by_column.len() {
492
by_column.truncate(1);
493
sort_options.nulls_last.truncate(1);
494
sort_options.descending.truncate(1);
495
}
496
// Remove the null columns
497
else if !null_columns.is_empty() {
498
for i in null_columns.into_iter().rev() {
499
by_column.remove(i);
500
sort_options.nulls_last.remove(i);
501
sort_options.descending.remove(i);
502
}
503
}
504
}
505
if by_column.is_empty() {
506
return Ok(input);
507
};
508
509
let lp = IR::Sort {
510
input,
511
by_column,
512
slice: slice.map(|t| (t.0, t.1, None)),
513
sort_options,
514
};
515
516
return run_conversion(lp, ctxt, "sort").map_err(|e| e.context(failed_here!(sort)));
517
},
518
DslPlan::Cache { input, id } => {
519
let input = match ctxt.seen_caches.get(&id) {
520
Some(input) => *input,
521
None => {
522
let input = to_alp_impl(owned(input), ctxt)
523
.map_err(|e| e.context(failed_here!(cache)))?;
524
let seen_before = ctxt.seen_caches.insert(id, input);
525
assert!(
526
seen_before.is_none(),
527
"Cache could not have been created in the mean time. That would make the DAG cyclic."
528
);
529
input
530
},
531
};
532
533
IR::Cache { input, id }
534
},
535
DslPlan::GroupBy {
536
input,
537
keys,
538
predicates,
539
mut aggs,
540
apply,
541
maintain_order,
542
options,
543
} => {
544
// If the group by contains any predicates, we update the plan by turning the
545
// predicates into aggregations and filtering on them. Then, we recursively call
546
// this function.
547
if !predicates.is_empty() {
548
let predicate_names = (0..predicates.len())
549
.map(|i| format_pl_smallstr!("__POLARS_HAVING_{i}"))
550
.collect::<Arc<[_]>>();
551
let predicates = predicates
552
.into_iter()
553
.zip(predicate_names.iter())
554
.map(|(p, name)| p.alias(name.clone()))
555
.collect_vec();
556
aggs.extend(predicates);
557
558
let lp = DslPlan::GroupBy {
559
input,
560
keys,
561
predicates: vec![],
562
aggs,
563
apply,
564
maintain_order,
565
options,
566
};
567
let lp = DslBuilder::from(lp)
568
.filter(
569
all_horizontal(
570
predicate_names.iter().map(|n| col(n.clone())).collect_vec(),
571
)
572
.unwrap(),
573
)
574
.drop(Selector::ByName {
575
names: predicate_names,
576
strict: true,
577
})
578
.build();
579
return to_alp_impl(lp, ctxt);
580
}
581
582
// NOTE: As we went into this branch, we know that no predicates are provided.
583
let input =
584
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(group_by)))?;
585
586
// Rolling + group-by sorts the whole table, so remove unneeded columns
587
if ctxt.opt_flags.eager() && options.is_rolling() && !keys.is_empty() {
588
ctxt.opt_flags.insert(OptFlags::PROJECTION_PUSHDOWN)
589
}
590
591
let (keys, aggs, schema) = resolve_group_by(
592
input,
593
keys,
594
aggs,
595
&options,
596
ctxt.lp_arena,
597
ctxt.expr_arena,
598
ctxt.opt_flags,
599
)
600
.map_err(|e| e.context(failed_here!(group_by)))?;
601
602
let (apply, schema) = if let Some((apply, schema)) = apply {
603
(Some(apply), schema)
604
} else {
605
(None, schema)
606
};
607
608
ctxt.conversion_optimizer
609
.fill_scratch(&keys, ctxt.expr_arena);
610
ctxt.conversion_optimizer
611
.fill_scratch(&aggs, ctxt.expr_arena);
612
613
let lp = IR::GroupBy {
614
input,
615
keys,
616
aggs,
617
schema,
618
apply,
619
maintain_order,
620
options,
621
};
622
return run_conversion(lp, ctxt, "group_by")
623
.map_err(|e| e.context(failed_here!(group_by)));
624
},
625
DslPlan::Join {
626
input_left,
627
input_right,
628
left_on,
629
right_on,
630
predicates,
631
options,
632
} => {
633
return join::resolve_join(
634
Either::Left(input_left),
635
Either::Left(input_right),
636
left_on,
637
right_on,
638
predicates,
639
JoinOptionsIR::from(Arc::unwrap_or_clone(options)),
640
ctxt,
641
)
642
.map_err(|e| e.context(failed_here!(join)))
643
.map(|t| t.0);
644
},
645
DslPlan::HStack {
646
input,
647
exprs,
648
options,
649
} => {
650
let input = to_alp_impl(owned(input), ctxt)
651
.map_err(|e| e.context(failed_here!(with_columns)))?;
652
let (exprs, schema) =
653
resolve_with_columns(exprs, input, ctxt.lp_arena, ctxt.expr_arena, ctxt.opt_flags)
654
.map_err(|e| e.context(failed_here!(with_columns)))?;
655
656
ctxt.conversion_optimizer
657
.fill_scratch(&exprs, ctxt.expr_arena);
658
let lp = IR::HStack {
659
input,
660
exprs,
661
schema,
662
options,
663
};
664
return run_conversion(lp, ctxt, "with_columns");
665
},
666
DslPlan::MatchToSchema {
667
input,
668
match_schema,
669
per_column,
670
extra_columns,
671
} => {
672
let input =
673
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;
674
let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
675
676
assert_eq!(per_column.len(), match_schema.len());
677
678
if input_schema.as_ref() == &match_schema {
679
return Ok(input);
680
}
681
682
let mut exprs = Vec::with_capacity(match_schema.len());
683
let mut found_missing_columns = Vec::new();
684
let mut used_input_columns = 0;
685
686
for ((column, dtype), per_column) in match_schema.iter().zip(per_column.iter()) {
687
match input_schema.get(column) {
688
None => match &per_column.missing_columns {
689
MissingColumnsPolicyOrExpr::Raise => found_missing_columns.push(column),
690
MissingColumnsPolicyOrExpr::Insert => exprs.push(Expr::Alias(
691
Arc::new(Expr::Literal(LiteralValue::Scalar(Scalar::null(
692
dtype.clone(),
693
)))),
694
column.clone(),
695
)),
696
MissingColumnsPolicyOrExpr::InsertWith(expr) => {
697
exprs.push(Expr::Alias(Arc::new(expr.clone()), column.clone()))
698
},
699
},
700
Some(input_dtype) if dtype == input_dtype => {
701
used_input_columns += 1;
702
exprs.push(Expr::Column(column.clone()))
703
},
704
Some(input_dtype) => {
705
let from_dtype = input_dtype;
706
let to_dtype = dtype;
707
708
let policy = CastColumnsPolicy {
709
integer_upcast: per_column.integer_cast == UpcastOrForbid::Upcast,
710
float_upcast: per_column.float_cast == UpcastOrForbid::Upcast,
711
missing_struct_fields: per_column.missing_struct_fields,
712
extra_struct_fields: per_column.extra_struct_fields,
713
714
..Default::default()
715
};
716
717
let should_cast =
718
policy.should_cast_column(column, to_dtype, from_dtype)?;
719
720
let mut expr = Expr::Column(PlSmallStr::from_str(column));
721
if should_cast {
722
expr = expr.cast_with_options(to_dtype.clone(), CastOptions::NonStrict);
723
}
724
725
used_input_columns += 1;
726
exprs.push(expr);
727
},
728
}
729
}
730
731
// Report the error for missing columns
732
if let Some(lst) = found_missing_columns.first() {
733
use std::fmt::Write;
734
let mut formatted = String::new();
735
write!(&mut formatted, "\"{}\"", found_missing_columns[0]).unwrap();
736
for c in &found_missing_columns[1..] {
737
write!(&mut formatted, ", \"{c}\"").unwrap();
738
}
739
740
write!(&mut formatted, "\"{lst}\"").unwrap();
741
polars_bail!(SchemaMismatch: "missing columns in `match_to_schema`: {formatted}");
742
}
743
744
// Report the error for extra columns
745
if used_input_columns != input_schema.len()
746
&& extra_columns == ExtraColumnsPolicy::Raise
747
{
748
let found_extra_columns = input_schema
749
.iter_names()
750
.filter(|n| !match_schema.contains(n))
751
.collect::<Vec<_>>();
752
753
use std::fmt::Write;
754
let mut formatted = String::new();
755
write!(&mut formatted, "\"{}\"", found_extra_columns[0]).unwrap();
756
for c in &found_extra_columns[1..] {
757
write!(&mut formatted, ", \"{c}\"").unwrap();
758
}
759
760
polars_bail!(SchemaMismatch: "extra columns in `match_to_schema`: {formatted}");
761
}
762
763
let exprs = to_expr_irs(
764
exprs,
765
&mut ExprToIRContext::new_with_opt_eager(
766
ctxt.expr_arena,
767
&input_schema,
768
ctxt.opt_flags,
769
),
770
)?;
771
772
ctxt.conversion_optimizer
773
.fill_scratch(&exprs, ctxt.expr_arena);
774
let lp = IR::Select {
775
input,
776
expr: exprs,
777
schema: match_schema.clone(),
778
options: ProjectionOptions {
779
run_parallel: true,
780
duplicate_check: false,
781
should_broadcast: true,
782
},
783
};
784
return run_conversion(lp, ctxt, "match_to_schema");
785
},
786
DslPlan::PipeWithSchema { input, callback } => {
787
// Derive the schema from the input
788
let mut inputs = Vec::with_capacity(input.len());
789
let mut input_schemas = Vec::with_capacity(input.len());
790
791
for plan in input.as_ref() {
792
let ir = to_alp_impl(plan.clone(), ctxt)?;
793
let schema = ctxt.lp_arena.get(ir).schema(ctxt.lp_arena).into_owned();
794
795
let dsl = DslPlan::IR {
796
dsl: Arc::new(plan.clone()),
797
version: ctxt.lp_arena.version(),
798
node: Some(ir),
799
};
800
inputs.push(dsl);
801
input_schemas.push(schema);
802
}
803
804
// Adjust the input and start conversion again
805
let input_adjusted = callback.call((inputs, input_schemas))?;
806
return to_alp_impl(input_adjusted, ctxt);
807
},
808
#[cfg(feature = "pivot")]
809
DslPlan::Pivot {
810
input,
811
on,
812
on_columns,
813
index,
814
values,
815
agg,
816
maintain_order,
817
separator,
818
} => {
819
let input =
820
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;
821
let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
822
823
let on = on.into_columns(input_schema.as_ref(), &Default::default())?;
824
let index = index.into_columns(input_schema.as_ref(), &Default::default())?;
825
let values = values.into_columns(input_schema.as_ref(), &Default::default())?;
826
827
polars_ensure!(!on.is_empty(), InvalidOperation: "`pivot` called without `on` columns.");
828
polars_ensure!(on.len() == on_columns.width(), InvalidOperation: "`pivot` expected `on` and `on_columns` to have the same amount of columns.");
829
if on.len() > 1 {
830
polars_ensure!(
831
on_columns.columns().iter().zip(on.iter()).all(|(c, o)| o == c.name()),
832
InvalidOperation: "`pivot` has mismatching column names between `on` and `on_columns`."
833
);
834
}
835
polars_ensure!(!values.is_empty(), InvalidOperation: "`pivot` called without `values` columns.");
836
837
let on_titles = if on_columns.width() == 1 {
838
on_columns.columns()[0].cast(&DataType::String)?
839
} else {
840
on_columns
841
.as_ref()
842
.clone()
843
.into_struct(PlSmallStr::EMPTY)
844
.cast(&DataType::String)?
845
.into_column()
846
};
847
let on_titles = on_titles.str()?;
848
849
let mut expr_schema = input_schema.as_ref().as_ref().clone();
850
let mut out = Vec::with_capacity(1);
851
let mut aggs = Vec::<ExprIR>::with_capacity(values.len() * on_columns.height());
852
for value in values.iter() {
853
out.clear();
854
let value_dtype = input_schema.try_get(value)?;
855
expr_schema.insert(get_pl_element_name(), value_dtype.clone());
856
expand_expression(
857
&agg,
858
&Default::default(),
859
&expr_schema,
860
&mut out,
861
ctxt.opt_flags,
862
)?;
863
polars_ensure!(
864
out.len() == 1,
865
InvalidOperation: "Pivot expression are not allowed to expand to more than 1 expression"
866
);
867
let agg = out.pop().unwrap();
868
let agg_ae = to_expr_ir(
869
agg,
870
&mut ExprToIRContext::new_with_opt_eager(
871
ctxt.expr_arena,
872
&expr_schema,
873
ctxt.opt_flags,
874
),
875
)?
876
.node();
877
878
polars_ensure!(
879
aexpr_to_leaf_names_iter(agg_ae, ctxt.expr_arena).count() == 0,
880
InvalidOperation: "explicit column references are not allowed in the `aggregate_function` of `pivot`"
881
);
882
883
for i in 0..on_columns.height() {
884
let mut name = String::new();
885
if values.len() > 1 {
886
name.push_str(value.as_str());
887
name.push_str(separator.as_str());
888
}
889
890
name.push_str(on_titles.get(i).unwrap_or("null"));
891
892
fn on_predicate(
893
on: &PlSmallStr,
894
on_column: &Column,
895
i: usize,
896
expr_arena: &mut Arena<AExpr>,
897
) -> AExprBuilder {
898
let e = AExprBuilder::col(on.clone(), expr_arena);
899
e.eq(
900
AExprBuilder::lit_scalar(
901
Scalar::new(
902
on_column.dtype().clone(),
903
on_column.get(i).unwrap().into_static(),
904
),
905
expr_arena,
906
),
907
expr_arena,
908
)
909
}
910
911
let predicate = if on.len() == 1 {
912
on_predicate(&on[0], &on_columns.columns()[0], i, ctxt.expr_arena)
913
} else {
914
AExprBuilder::function(
915
on.iter()
916
.enumerate()
917
.map(|(j, on_col)| {
918
on_predicate(
919
on_col,
920
&on_columns.columns()[j],
921
i,
922
ctxt.expr_arena,
923
)
924
.expr_ir(on_col.clone())
925
})
926
.collect::<Vec<_>>(),
927
IRFunctionExpr::Boolean(IRBooleanFunction::AllHorizontal),
928
ctxt.expr_arena,
929
)
930
};
931
932
let replacement_element = AExprBuilder::col(value.clone(), ctxt.expr_arena)
933
.filter(predicate, ctxt.expr_arena)
934
.node();
935
936
#[recursive::recursive]
937
fn deep_clone_element_replace(
938
ae: Node,
939
arena: &mut Arena<AExpr>,
940
replacement: Node,
941
) -> Node {
942
let slf = arena.get(ae).clone();
943
if matches!(slf, AExpr::Element) {
944
return deep_clone_ae(replacement, arena);
945
} else if matches!(slf, AExpr::Len) {
946
// For backwards-compatibility, we support providing `pl.len()` to mean
947
// the length of the group here.
948
let element = deep_clone_ae(replacement, arena);
949
return AExprBuilder::new_from_node(element).len(arena).node();
950
}
951
952
let mut children = vec![];
953
slf.children_rev(&mut children);
954
for child in &mut children {
955
*child = deep_clone_element_replace(*child, arena, replacement);
956
}
957
children.reverse();
958
959
arena.add(slf.replace_children(&children))
960
}
961
aggs.push(ExprIR::new(
962
deep_clone_element_replace(agg_ae, ctxt.expr_arena, replacement_element),
963
OutputName::Alias(name.into()),
964
));
965
}
966
}
967
968
let keys: Vec<_> = index
969
.into_iter()
970
.map(|i| AExprBuilder::col(i.clone(), ctxt.expr_arena).expr_ir(i))
971
.collect();
972
973
let mut uniq_names = PlHashSet::new();
974
for expr in keys.iter().chain(aggs.iter()) {
975
let name = expr.output_name();
976
let is_uniq = uniq_names.insert(name.clone());
977
polars_ensure!(is_uniq, duplicate = name);
978
}
979
980
IRBuilder::new(input, ctxt.expr_arena, ctxt.lp_arena)
981
.group_by(keys, aggs, None, maintain_order, Default::default())
982
.build()
983
},
984
DslPlan::Distinct { input, options } => {
985
let input =
986
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;
987
let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena).into_owned();
988
989
// "subset" param supports cols and/or arbitrary expressions
990
let (input, subset, temp_cols) = if let Some(exprs) = options.subset {
991
let exprs = rewrite_projections(
992
exprs,
993
&PlHashSet::default(),
994
&input_schema,
995
ctxt.opt_flags,
996
)?;
997
998
// identify cols and exprs in "subset" param
999
let mut subset_colnames = vec![];
1000
let mut subset_exprs = vec![];
1001
for expr in &exprs {
1002
match expr {
1003
Expr::Column(name) => {
1004
polars_ensure!(
1005
input_schema.contains(name),
1006
ColumnNotFound: "{name:?} not found"
1007
);
1008
subset_colnames.push(name.clone());
1009
},
1010
_ => subset_exprs.push(expr.clone()),
1011
}
1012
}
1013
1014
if subset_exprs.is_empty() {
1015
// "subset" is a collection of basic cols (or empty)
1016
(input, Some(subset_colnames.into_iter().collect()), vec![])
1017
} else {
1018
// "subset" contains exprs; add them as temporary cols
1019
let (aliased_exprs, temp_names): (Vec<_>, Vec<_>) = subset_exprs
1020
.into_iter()
1021
.enumerate()
1022
.map(|(idx, expr)| {
1023
let temp_name = format_pl_smallstr!("__POLARS_UNIQUE_SUBSET_{}", idx);
1024
(expr.alias(temp_name.clone()), temp_name)
1025
})
1026
.unzip();
1027
1028
subset_colnames.extend_from_slice(&temp_names);
1029
1030
// integrate the temporary cols with the existing "input" node
1031
let (temp_expr_irs, schema) = resolve_with_columns(
1032
aliased_exprs,
1033
input,
1034
ctxt.lp_arena,
1035
ctxt.expr_arena,
1036
ctxt.opt_flags,
1037
)?;
1038
ctxt.conversion_optimizer
1039
.fill_scratch(&temp_expr_irs, ctxt.expr_arena);
1040
1041
let input_with_exprs = ctxt.lp_arena.add(IR::HStack {
1042
input,
1043
exprs: temp_expr_irs,
1044
schema,
1045
options: ProjectionOptions {
1046
run_parallel: false,
1047
duplicate_check: false,
1048
should_broadcast: true,
1049
},
1050
});
1051
(
1052
input_with_exprs,
1053
Some(subset_colnames.into_iter().collect()),
1054
temp_names,
1055
)
1056
}
1057
} else {
1058
(input, None, vec![])
1059
};
1060
1061
// `distinct` definition (will contain temporary cols if we have "subset" exprs)
1062
let distinct_node = ctxt.lp_arena.add(IR::Distinct {
1063
input,
1064
options: DistinctOptionsIR {
1065
subset,
1066
maintain_order: options.maintain_order,
1067
keep_strategy: options.keep_strategy,
1068
slice: None,
1069
},
1070
});
1071
1072
// if no temporary cols (eg: we had no "subset" exprs), we're done...
1073
if temp_cols.is_empty() {
1074
return Ok(distinct_node);
1075
}
1076
1077
// ...otherwise, drop them by projecting the original schema
1078
return Ok(ctxt.lp_arena.add(IR::SimpleProjection {
1079
input: distinct_node,
1080
columns: input_schema,
1081
}));
1082
},
1083
DslPlan::MapFunction { input, function } => {
1084
let input = to_alp_impl(owned(input), ctxt)
1085
.map_err(|e| e.context(failed_here!(format!("{}", function).to_lowercase())))?;
1086
let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
1087
1088
match function {
1089
DslFunction::Explode {
1090
columns,
1091
options,
1092
allow_empty,
1093
} => {
1094
let columns = columns.into_columns(&input_schema, &Default::default())?;
1095
polars_ensure!(!columns.is_empty() || allow_empty, InvalidOperation: "no columns provided in explode");
1096
if columns.is_empty() {
1097
return Ok(input);
1098
}
1099
let function = FunctionIR::Explode {
1100
columns: columns.into_iter().collect(),
1101
options,
1102
schema: Default::default(),
1103
};
1104
let ir = IR::MapFunction { input, function };
1105
return Ok(ctxt.lp_arena.add(ir));
1106
},
1107
DslFunction::FillNan(fill_value) => {
1108
let exprs = input_schema
1109
.iter()
1110
.filter_map(|(name, dtype)| match dtype {
1111
DataType::Float16 | DataType::Float32 | DataType::Float64 => Some(
1112
col(name.clone())
1113
.fill_nan(fill_value.clone())
1114
.alias(name.clone()),
1115
),
1116
_ => None,
1117
})
1118
.collect::<Vec<_>>();
1119
1120
let (exprs, schema) = resolve_with_columns(
1121
exprs,
1122
input,
1123
ctxt.lp_arena,
1124
ctxt.expr_arena,
1125
ctxt.opt_flags,
1126
)
1127
.map_err(|e| e.context(failed_here!(fill_nan)))?;
1128
1129
ctxt.conversion_optimizer
1130
.fill_scratch(&exprs, ctxt.expr_arena);
1131
1132
let lp = IR::HStack {
1133
input,
1134
exprs,
1135
schema,
1136
options: ProjectionOptions {
1137
duplicate_check: false,
1138
..Default::default()
1139
},
1140
};
1141
return run_conversion(lp, ctxt, "fill_nan");
1142
},
1143
DslFunction::Stats(sf) => {
1144
let exprs = match sf {
1145
StatsFunction::Var { ddof } => stats_helper(
1146
|dt| dt.is_primitive_numeric() || dt.is_bool() || dt.is_decimal(),
1147
|name| col(name.clone()).var(ddof),
1148
&input_schema,
1149
),
1150
StatsFunction::Std { ddof } => stats_helper(
1151
|dt| dt.is_primitive_numeric() || dt.is_bool() || dt.is_decimal(),
1152
|name| col(name.clone()).std(ddof),
1153
&input_schema,
1154
),
1155
StatsFunction::Quantile { quantile, method } => stats_helper(
1156
|dt| dt.is_primitive_numeric() || dt.is_decimal() || dt.is_temporal(),
1157
|name| col(name.clone()).quantile(quantile.clone(), method),
1158
&input_schema,
1159
),
1160
StatsFunction::Mean => stats_helper(
1161
|dt| {
1162
dt.is_primitive_numeric()
1163
|| dt.is_temporal()
1164
|| dt.is_bool()
1165
|| dt.is_decimal()
1166
},
1167
|name| col(name.clone()).mean(),
1168
&input_schema,
1169
),
1170
StatsFunction::Sum => stats_helper(
1171
|dt| {
1172
dt.is_primitive_numeric()
1173
|| dt.is_decimal()
1174
|| matches!(dt, DataType::Boolean | DataType::Duration(_))
1175
},
1176
|name| col(name.clone()).sum(),
1177
&input_schema,
1178
),
1179
StatsFunction::Min => stats_helper(
1180
|dt| dt.is_ord(),
1181
|name| col(name.clone()).min(),
1182
&input_schema,
1183
),
1184
StatsFunction::Max => stats_helper(
1185
|dt| dt.is_ord(),
1186
|name| col(name.clone()).max(),
1187
&input_schema,
1188
),
1189
StatsFunction::Median => stats_helper(
1190
|dt| {
1191
dt.is_primitive_numeric()
1192
|| dt.is_temporal()
1193
|| dt == &DataType::Boolean
1194
},
1195
|name| col(name.clone()).median(),
1196
&input_schema,
1197
),
1198
};
1199
let schema = Arc::new(expressions_to_schema(
1200
&exprs,
1201
&input_schema,
1202
|duplicate_name: &str| duplicate_name.to_string(),
1203
)?);
1204
let eirs = to_expr_irs(
1205
exprs,
1206
&mut ExprToIRContext::new_with_opt_eager(
1207
ctxt.expr_arena,
1208
&input_schema,
1209
ctxt.opt_flags,
1210
),
1211
)?;
1212
1213
ctxt.conversion_optimizer
1214
.fill_scratch(&eirs, ctxt.expr_arena);
1215
1216
let lp = IR::Select {
1217
input,
1218
expr: eirs,
1219
schema,
1220
options: ProjectionOptions {
1221
duplicate_check: false,
1222
..Default::default()
1223
},
1224
};
1225
return run_conversion(lp, ctxt, "stats");
1226
},
1227
DslFunction::Rename {
1228
existing,
1229
new,
1230
strict,
1231
} => {
1232
assert_eq!(existing.len(), new.len());
1233
if existing.is_empty() {
1234
return Ok(input);
1235
}
1236
1237
let existing_lut =
1238
PlIndexSet::from_iter(existing.iter().map(PlSmallStr::as_str));
1239
1240
let mut schema = Schema::with_capacity(input_schema.len());
1241
let mut num_replaced = 0;
1242
1243
// Turn the rename into a select.
1244
let expr = input_schema
1245
.iter()
1246
.map(|(n, dtype)| {
1247
Ok(match existing_lut.get_index_of(n.as_str()) {
1248
None => {
1249
schema.try_insert(n.clone(), dtype.clone())?;
1250
Expr::Column(n.clone())
1251
},
1252
Some(i) => {
1253
num_replaced += 1;
1254
schema.try_insert(new[i].clone(), dtype.clone())?;
1255
Expr::Column(n.clone()).alias(new[i].clone())
1256
},
1257
})
1258
})
1259
.collect::<PolarsResult<Vec<_>>>()?;
1260
1261
if strict && num_replaced != existing.len() {
1262
let col = existing.iter().find(|c| !input_schema.contains(c)).unwrap();
1263
polars_bail!(col_not_found = col);
1264
}
1265
1266
// Nothing changed, make into a no-op.
1267
if num_replaced == 0 {
1268
return Ok(input);
1269
}
1270
1271
let expr = to_expr_irs(
1272
expr,
1273
&mut ExprToIRContext::new_with_opt_eager(
1274
ctxt.expr_arena,
1275
&input_schema,
1276
ctxt.opt_flags,
1277
),
1278
)?;
1279
ctxt.conversion_optimizer
1280
.fill_scratch(&expr, ctxt.expr_arena);
1281
1282
IR::Select {
1283
input,
1284
expr,
1285
schema: Arc::new(schema),
1286
options: ProjectionOptions {
1287
run_parallel: false,
1288
duplicate_check: false,
1289
should_broadcast: false,
1290
},
1291
}
1292
},
1293
_ => {
1294
let function = function.into_function_ir(&input_schema)?;
1295
IR::MapFunction { input, function }
1296
},
1297
}
1298
},
1299
DslPlan::ExtContext { input, contexts } => {
1300
let input = to_alp_impl(owned(input), ctxt)
1301
.map_err(|e| e.context(failed_here!(with_context)))?;
1302
let contexts = contexts
1303
.into_iter()
1304
.map(|lp| to_alp_impl(lp, ctxt))
1305
.collect::<PolarsResult<Vec<_>>>()
1306
.map_err(|e| e.context(failed_here!(with_context)))?;
1307
1308
let mut schema = (**ctxt.lp_arena.get(input).schema(ctxt.lp_arena)).clone();
1309
for input in &contexts {
1310
let other_schema = ctxt.lp_arena.get(*input).schema(ctxt.lp_arena);
1311
for fld in other_schema.iter_fields() {
1312
if schema.get(fld.name()).is_none() {
1313
schema.with_column(fld.name, fld.dtype);
1314
}
1315
}
1316
}
1317
1318
IR::ExtContext {
1319
input,
1320
contexts,
1321
schema: Arc::new(schema),
1322
}
1323
},
1324
DslPlan::Sink { input, payload } => {
1325
let input =
1326
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(sink)))?;
1327
let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
1328
let payload = match payload {
1329
SinkType::Memory => SinkTypeIR::Memory,
1330
SinkType::Callback(f) => SinkTypeIR::Callback(f),
1331
SinkType::File(mut options) => {
1332
let mut compression_opt = None::<ExternalCompression>;
1333
1334
#[cfg(feature = "parquet")]
1335
if let FileWriteFormat::Parquet(options) = &mut options.file_format
1336
&& let Some(arrow_schema) = &mut Arc::make_mut(options).arrow_schema
1337
{
1338
let new_schema =
1339
validate_arrow_schema_conversion(input_schema.as_ref(), arrow_schema)?;
1340
1341
*Arc::make_mut(arrow_schema) = new_schema;
1342
}
1343
1344
#[cfg(feature = "csv")]
1345
if let FileWriteFormat::Csv(csv_options) = &options.file_format
1346
&& csv_options.check_extension
1347
{
1348
compression_opt = Some(csv_options.compression);
1349
}
1350
1351
#[cfg(feature = "json")]
1352
if let FileWriteFormat::NDJson(ndjson_options) = &options.file_format
1353
&& ndjson_options.check_extension
1354
{
1355
compression_opt = Some(ndjson_options.compression);
1356
}
1357
1358
if let Some(compression) = compression_opt {
1359
if let SinkTarget::Path(path) = &options.target {
1360
let extension = path.extension();
1361
1362
if let Some(suffix) = compression.file_suffix() {
1363
polars_ensure!(
1364
extension.is_none_or(|extension| extension == suffix.strip_prefix(".").unwrap_or(suffix)),
1365
InvalidOperation: "the path ({}) does not conform to standard naming, expected suffix: ({}), set `check_extension` to `False` if you don't want this behavior", path, suffix
1366
);
1367
} else if ["gz", "zst", "zstd"].iter().any(|compression_extension| {
1368
extension == Some(compression_extension)
1369
}) {
1370
polars_bail!(
1371
InvalidOperation: "use the compression parameter to control compression, or set `check_extension` to `False` if you want to suffix an uncompressed filename with an ending intended for compression"
1372
);
1373
}
1374
}
1375
}
1376
1377
SinkTypeIR::File(options)
1378
},
1379
SinkType::Partitioned(PartitionedSinkOptions {
1380
base_path,
1381
file_path_provider,
1382
partition_strategy,
1383
file_format,
1384
unified_sink_args,
1385
max_rows_per_file,
1386
approximate_bytes_per_file,
1387
}) => {
1388
let expr_to_ir_cx = &mut ExprToIRContext::new_with_opt_eager(
1389
ctxt.expr_arena,
1390
&input_schema,
1391
ctxt.opt_flags,
1392
);
1393
1394
let partition_strategy = match partition_strategy {
1395
PartitionStrategy::Keyed {
1396
keys,
1397
include_keys,
1398
keys_pre_grouped,
1399
} => {
1400
let keys = to_expr_irs(keys, expr_to_ir_cx)?;
1401
1402
polars_ensure!(
1403
keys.iter().all(|e| is_elementwise_rec(e.node(), ctxt.expr_arena)),
1404
InvalidOperation:
1405
"cannot use non-elementwise expressions for PartitionBy keys"
1406
);
1407
1408
PartitionStrategyIR::Keyed {
1409
keys,
1410
include_keys,
1411
keys_pre_grouped,
1412
}
1413
},
1414
PartitionStrategy::FileSize => PartitionStrategyIR::FileSize,
1415
};
1416
1417
let mut options = PartitionedSinkOptionsIR {
1418
base_path,
1419
file_path_provider: file_path_provider.unwrap_or_else(|| {
1420
FileProviderType::Hive(HivePathProvider {
1421
extension: PlSmallStr::from_static(file_format.extension()),
1422
})
1423
}),
1424
partition_strategy,
1425
file_format,
1426
unified_sink_args,
1427
max_rows_per_file,
1428
approximate_bytes_per_file,
1429
};
1430
1431
#[cfg(feature = "parquet")]
1432
{
1433
let input_schema = input_schema.into_owned();
1434
let file_schema =
1435
options.file_output_schema(&input_schema, ctxt.expr_arena)?;
1436
1437
if let FileWriteFormat::Parquet(parquet_options) = &mut options.file_format
1438
&& let Some(arrow_schema) =
1439
&mut Arc::make_mut(parquet_options).arrow_schema
1440
{
1441
let new_schema = validate_arrow_schema_conversion(
1442
file_schema.as_ref(),
1443
arrow_schema,
1444
)?;
1445
1446
*Arc::make_mut(arrow_schema) = new_schema;
1447
}
1448
}
1449
1450
ctxt.conversion_optimizer
1451
.fill_scratch(options.expr_irs_iter(), ctxt.expr_arena);
1452
1453
SinkTypeIR::Partitioned(options)
1454
},
1455
};
1456
1457
let lp = IR::Sink { input, payload };
1458
return run_conversion(lp, ctxt, "sink");
1459
},
1460
DslPlan::SinkMultiple { inputs } => {
1461
let inputs = inputs
1462
.into_iter()
1463
.map(|lp| to_alp_impl(lp, ctxt))
1464
.collect::<PolarsResult<Vec<_>>>()
1465
.map_err(|e| e.context(failed_here!(vertical concat)))?;
1466
IR::SinkMultiple { inputs }
1467
},
1468
#[cfg(feature = "merge_sorted")]
1469
DslPlan::MergeSorted {
1470
input_left,
1471
input_right,
1472
key,
1473
} => {
1474
let input_left = to_alp_impl(owned(input_left), ctxt)
1475
.map_err(|e| e.context(failed_here!(merge_sorted)))?;
1476
let input_right = to_alp_impl(owned(input_right), ctxt)
1477
.map_err(|e| e.context(failed_here!(merge_sorted)))?;
1478
1479
let left_schema = ctxt.lp_arena.get(input_left).schema(ctxt.lp_arena);
1480
let right_schema = ctxt.lp_arena.get(input_right).schema(ctxt.lp_arena);
1481
1482
left_schema
1483
.ensure_is_exact_match(&right_schema)
1484
.map_err(|err| err.context("merge_sorted".into()))?;
1485
1486
left_schema
1487
.try_get(key.as_str())
1488
.map_err(|err| err.context("merge_sorted".into()))?;
1489
1490
IR::MergeSorted {
1491
input_left,
1492
input_right,
1493
key,
1494
}
1495
},
1496
DslPlan::IR { node, dsl, version } => {
1497
return match node {
1498
Some(node)
1499
if version == ctxt.lp_arena.version()
1500
&& ctxt.conversion_optimizer.used_arenas.insert(version) =>
1501
{
1502
Ok(node)
1503
},
1504
_ => to_alp_impl(owned(dsl), ctxt),
1505
};
1506
},
1507
};
1508
Ok(ctxt.lp_arena.add(v))
1509
}
1510
1511
fn resolve_with_columns(
1512
exprs: Vec<Expr>,
1513
input: Node,
1514
lp_arena: &Arena<IR>,
1515
expr_arena: &mut Arena<AExpr>,
1516
opt_flags: &mut OptFlags,
1517
) -> PolarsResult<(Vec<ExprIR>, SchemaRef)> {
1518
let input_schema = lp_arena.get(input).schema(lp_arena);
1519
let mut output_schema = (**input_schema).clone();
1520
let exprs = rewrite_projections(exprs, &PlHashSet::new(), &input_schema, opt_flags)?;
1521
let mut output_names = PlHashSet::with_capacity(exprs.len());
1522
1523
let eirs = to_expr_irs(
1524
exprs,
1525
&mut ExprToIRContext::new_with_opt_eager(expr_arena, &input_schema, opt_flags),
1526
)?;
1527
for eir in eirs.iter() {
1528
let field = eir.field(&input_schema, expr_arena)?;
1529
1530
if !output_names.insert(field.name().clone()) {
1531
polars_bail!(
1532
ComputeError:
1533
"the name '{}' passed to `LazyFrame.with_columns` is duplicate\n\n\
1534
It's possible that multiple expressions are returning the same default column name. \
1535
If this is the case, try renaming the columns with `.alias(\"new_name\")` to avoid \
1536
duplicate column names.",
1537
field.name()
1538
)
1539
}
1540
output_schema.with_column(field.name, field.dtype.materialize_unknown(true)?);
1541
}
1542
1543
Ok((eirs, Arc::new(output_schema)))
1544
}
1545
1546
fn resolve_group_by(
1547
input: Node,
1548
keys: Vec<Expr>,
1549
aggs: Vec<Expr>,
1550
_options: &GroupbyOptions,
1551
lp_arena: &Arena<IR>,
1552
expr_arena: &mut Arena<AExpr>,
1553
opt_flags: &mut OptFlags,
1554
) -> PolarsResult<(Vec<ExprIR>, Vec<ExprIR>, SchemaRef)> {
1555
let input_schema = lp_arena.get(input).schema(lp_arena);
1556
let input_schema = input_schema.as_ref();
1557
let mut keys = rewrite_projections(keys, &PlHashSet::default(), input_schema, opt_flags)?;
1558
1559
// Initialize schema from keys
1560
let mut output_schema = expressions_to_schema(&keys, input_schema, |duplicate_name: &str| {
1561
format!("group_by keys contained duplicate output name '{duplicate_name}'")
1562
})?;
1563
let mut key_names: PlHashSet<PlSmallStr> = output_schema.iter_names().cloned().collect();
1564
1565
#[allow(unused_mut)]
1566
let mut pop_keys = false;
1567
// Add dynamic groupby index column(s)
1568
// Also add index columns to keys for expression expansion.
1569
#[cfg(feature = "dynamic_group_by")]
1570
{
1571
if let Some(options) = _options.rolling.as_ref() {
1572
let name = options.index_column.clone();
1573
let dtype = input_schema.try_get(name.as_str())?;
1574
keys.push(col(name.clone()));
1575
key_names.insert(name.clone());
1576
pop_keys = true;
1577
output_schema.with_column(name.clone(), dtype.clone());
1578
} else if let Some(options) = _options.dynamic.as_ref() {
1579
let name = options.index_column.clone();
1580
keys.push(col(name.clone()));
1581
key_names.insert(name.clone());
1582
pop_keys = true;
1583
let dtype = input_schema.try_get(name.as_str())?;
1584
if options.include_boundaries {
1585
output_schema.with_column("_lower_boundary".into(), dtype.clone());
1586
output_schema.with_column("_upper_boundary".into(), dtype.clone());
1587
}
1588
output_schema.with_column(name.clone(), dtype.clone());
1589
}
1590
}
1591
let keys_index_len = output_schema.len();
1592
if pop_keys {
1593
let _ = keys.pop();
1594
}
1595
let keys = to_expr_irs(
1596
keys,
1597
&mut ExprToIRContext::new_with_opt_eager(expr_arena, input_schema, opt_flags),
1598
)?;
1599
1600
// Add aggregation column(s)
1601
let aggs = rewrite_projections(aggs, &key_names, input_schema, opt_flags)?;
1602
let aggs = to_expr_irs(
1603
aggs,
1604
&mut ExprToIRContext::new_with_opt_eager(expr_arena, input_schema, opt_flags),
1605
)?;
1606
utils::validate_expressions(&keys, expr_arena, input_schema, "group by")?;
1607
utils::validate_expressions(&aggs, expr_arena, input_schema, "group by")?;
1608
1609
let mut aggs_schema = expr_irs_to_schema(&aggs, input_schema, expr_arena)?;
1610
1611
// Make sure aggregation columns do not contain duplicates
1612
if aggs_schema.len() < aggs.len() {
1613
let mut names = PlHashSet::with_capacity(aggs.len());
1614
for agg in aggs.iter() {
1615
let name = agg.output_name();
1616
polars_ensure!(names.insert(name.clone()), duplicate = name)
1617
}
1618
}
1619
1620
// Coerce aggregation column(s) into List unless not needed (auto-implode)
1621
debug_assert!(aggs_schema.len() == aggs.len());
1622
for ((_name, dtype), expr) in aggs_schema.iter_mut().zip(&aggs) {
1623
if !expr.is_scalar(expr_arena) {
1624
*dtype = dtype.clone().implode();
1625
}
1626
}
1627
1628
// Final output_schema
1629
output_schema.merge(aggs_schema);
1630
1631
// Make sure aggregation columns do not contain keys or index columns
1632
if output_schema.len() < (keys_index_len + aggs.len()) {
1633
let mut names = PlHashSet::with_capacity(output_schema.len());
1634
for agg in aggs.iter().chain(keys.iter()) {
1635
let name = agg.output_name();
1636
polars_ensure!(names.insert(name.clone()), duplicate = name)
1637
}
1638
}
1639
1640
Ok((keys, aggs, Arc::new(output_schema)))
1641
}
1642
1643
fn stats_helper<F, E>(condition: F, expr: E, schema: &Schema) -> Vec<Expr>
1644
where
1645
F: Fn(&DataType) -> bool,
1646
E: Fn(&PlSmallStr) -> Expr,
1647
{
1648
schema
1649
.iter()
1650
.map(|(name, dt)| {
1651
if condition(dt) {
1652
expr(name)
1653
} else {
1654
lit(NULL).cast(dt.clone()).alias(name.clone())
1655
}
1656
})
1657
.collect()
1658
}
1659
1660
pub(crate) fn maybe_init_projection_excluding_hive(
1661
reader_schema: &Either<ArrowSchemaRef, SchemaRef>,
1662
hive_parts: Option<&SchemaRef>,
1663
) -> Option<Arc<[PlSmallStr]>> {
1664
// Update `with_columns` with a projection so that hive columns aren't loaded from the
1665
// file
1666
let hive_schema = hive_parts?;
1667
1668
match &reader_schema {
1669
Either::Left(reader_schema) => hive_schema
1670
.iter_names()
1671
.any(|x| reader_schema.contains(x))
1672
.then(|| {
1673
reader_schema
1674
.iter_names_cloned()
1675
.filter(|x| !hive_schema.contains(x))
1676
.collect::<Arc<[_]>>()
1677
}),
1678
Either::Right(reader_schema) => hive_schema
1679
.iter_names()
1680
.any(|x| reader_schema.contains(x))
1681
.then(|| {
1682
reader_schema
1683
.iter_names_cloned()
1684
.filter(|x| !hive_schema.contains(x))
1685
.collect::<Arc<[_]>>()
1686
}),
1687
}
1688
}
1689
1690