Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/physical_plan/to_graph.rs
6939 views
1
use std::sync::Arc;
2
3
use parking_lot::Mutex;
4
use polars_core::prelude::PlRandomState;
5
use polars_core::schema::Schema;
6
use polars_core::{POOL, config};
7
use polars_error::{PolarsResult, polars_bail, polars_ensure, polars_err};
8
use polars_expr::groups::new_hash_grouper;
9
use polars_expr::planner::{ExpressionConversionState, create_physical_expr};
10
use polars_expr::reduce::into_reduction;
11
use polars_expr::state::ExecutionState;
12
use polars_mem_engine::{create_physical_plan, create_scan_predicate};
13
use polars_plan::dsl::{JoinOptionsIR, PartitionVariantIR, ScanSources};
14
use polars_plan::plans::expr_ir::ExprIR;
15
use polars_plan::plans::{AExpr, ArenaExprIter, Context, IR, IRAggExpr};
16
use polars_plan::prelude::{FileType, FunctionFlags};
17
use polars_utils::arena::{Arena, Node};
18
use polars_utils::format_pl_smallstr;
19
use polars_utils::itertools::Itertools;
20
use polars_utils::pl_str::PlSmallStr;
21
use polars_utils::plpath::PlPath;
22
use polars_utils::relaxed_cell::RelaxedCell;
23
use recursive::recursive;
24
use slotmap::{SecondaryMap, SlotMap};
25
26
use super::{PhysNode, PhysNodeKey, PhysNodeKind};
27
use crate::execute::StreamingExecutionState;
28
use crate::expression::StreamExpr;
29
use crate::graph::{Graph, GraphNodeKey};
30
use crate::morsel::{MorselSeq, get_ideal_morsel_size};
31
use crate::nodes;
32
use crate::nodes::io_sinks::SinkComputeNode;
33
use crate::nodes::io_sinks::partition::PerPartitionSortBy;
34
use crate::nodes::io_sources::multi_scan::config::MultiScanConfig;
35
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
36
use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;
37
use crate::physical_plan::lower_expr::compute_output_schema;
38
use crate::utils::late_materialized_df::LateMaterializedDataFrame;
39
40
fn has_potential_recurring_entrance(node: Node, arena: &Arena<AExpr>) -> bool {
41
arena.iter(node).any(|(_n, ae)| match ae {
42
AExpr::Function { options, .. } | AExpr::AnonymousFunction { options, .. } => {
43
options.flags.contains(FunctionFlags::OPTIONAL_RE_ENTRANT)
44
},
45
_ => false,
46
})
47
}
48
49
fn create_stream_expr(
50
expr_ir: &ExprIR,
51
ctx: &mut GraphConversionContext<'_>,
52
schema: &Arc<Schema>,
53
) -> PolarsResult<StreamExpr> {
54
let reentrant = has_potential_recurring_entrance(expr_ir.node(), ctx.expr_arena);
55
let phys = create_physical_expr(
56
expr_ir,
57
Context::Default,
58
ctx.expr_arena,
59
schema,
60
&mut ctx.expr_conversion_state,
61
)?;
62
Ok(StreamExpr::new(phys, reentrant))
63
}
64
65
struct GraphConversionContext<'a> {
66
phys_sm: &'a SlotMap<PhysNodeKey, PhysNode>,
67
expr_arena: &'a mut Arena<AExpr>,
68
graph: Graph,
69
phys_to_graph: SecondaryMap<PhysNodeKey, GraphNodeKey>,
70
expr_conversion_state: ExpressionConversionState,
71
num_pipelines: usize,
72
}
73
74
pub fn physical_plan_to_graph(
75
root: PhysNodeKey,
76
phys_sm: &SlotMap<PhysNodeKey, PhysNode>,
77
expr_arena: &mut Arena<AExpr>,
78
) -> PolarsResult<(Graph, SecondaryMap<PhysNodeKey, GraphNodeKey>)> {
79
// Get the number of threads from the rayon thread-pool as that respects our config.
80
let num_pipelines = POOL.current_num_threads();
81
let mut ctx = GraphConversionContext {
82
phys_sm,
83
expr_arena,
84
graph: Graph::with_capacity(phys_sm.len()),
85
phys_to_graph: SecondaryMap::with_capacity(phys_sm.len()),
86
expr_conversion_state: ExpressionConversionState::new(false),
87
num_pipelines,
88
};
89
90
to_graph_rec(root, &mut ctx)?;
91
92
Ok((ctx.graph, ctx.phys_to_graph))
93
}
94
95
#[recursive]
96
fn to_graph_rec<'a>(
97
phys_node_key: PhysNodeKey,
98
ctx: &mut GraphConversionContext<'a>,
99
) -> PolarsResult<GraphNodeKey> {
100
// This will ensure we create a proper acyclic directed graph instead of a tree.
101
if let Some(graph_key) = ctx.phys_to_graph.get(phys_node_key) {
102
return Ok(*graph_key);
103
}
104
105
use PhysNodeKind::*;
106
let node = &ctx.phys_sm[phys_node_key];
107
let graph_key = match &node.kind {
108
InMemorySource { df } => ctx.graph.add_node(
109
nodes::in_memory_source::InMemorySourceNode::new(df.clone(), MorselSeq::default()),
110
[],
111
),
112
SinkMultiple { sinks } => {
113
// @NOTE: This is always the root node and gets ignored by the physical_plan anyway so
114
// we give one of the inputs back.
115
let node = to_graph_rec(sinks[0], ctx)?;
116
for sink in &sinks[1..] {
117
to_graph_rec(*sink, ctx)?;
118
}
119
return Ok(node);
120
},
121
122
StreamingSlice {
123
input,
124
offset,
125
length,
126
} => {
127
let input_key = to_graph_rec(input.node, ctx)?;
128
ctx.graph.add_node(
129
nodes::streaming_slice::StreamingSliceNode::new(*offset, *length),
130
[(input_key, input.port)],
131
)
132
},
133
134
NegativeSlice {
135
input,
136
offset,
137
length,
138
} => {
139
let input_key = to_graph_rec(input.node, ctx)?;
140
ctx.graph.add_node(
141
nodes::negative_slice::NegativeSliceNode::new(*offset, *length),
142
[(input_key, input.port)],
143
)
144
},
145
146
DynamicSlice {
147
input,
148
offset,
149
length,
150
} => {
151
let input_key = to_graph_rec(input.node, ctx)?;
152
let offset_key = to_graph_rec(offset.node, ctx)?;
153
let length_key = to_graph_rec(length.node, ctx)?;
154
let offset_schema = ctx.phys_sm[offset.node].output_schema.clone();
155
let length_schema = ctx.phys_sm[length.node].output_schema.clone();
156
ctx.graph.add_node(
157
nodes::dynamic_slice::DynamicSliceNode::new(offset_schema, length_schema),
158
[
159
(input_key, input.port),
160
(offset_key, offset.port),
161
(length_key, length.port),
162
],
163
)
164
},
165
166
Shift {
167
input,
168
offset,
169
fill,
170
} => {
171
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
172
let offset_schema = ctx.phys_sm[offset.node].output_schema.clone();
173
let input_key = to_graph_rec(input.node, ctx)?;
174
let offset_key = to_graph_rec(offset.node, ctx)?;
175
if let Some(fill) = fill {
176
let fill_key = to_graph_rec(fill.node, ctx)?;
177
ctx.graph.add_node(
178
nodes::shift::ShiftNode::new(input_schema, offset_schema, true),
179
[
180
(input_key, input.port),
181
(offset_key, offset.port),
182
(fill_key, fill.port),
183
],
184
)
185
} else {
186
ctx.graph.add_node(
187
nodes::shift::ShiftNode::new(input_schema, offset_schema, false),
188
[(input_key, input.port), (offset_key, offset.port)],
189
)
190
}
191
},
192
193
Filter { predicate, input } => {
194
let input_schema = &ctx.phys_sm[input.node].output_schema;
195
let phys_predicate_expr = create_stream_expr(predicate, ctx, input_schema)?;
196
let input_key = to_graph_rec(input.node, ctx)?;
197
ctx.graph.add_node(
198
nodes::filter::FilterNode::new(phys_predicate_expr),
199
[(input_key, input.port)],
200
)
201
},
202
203
Select {
204
selectors,
205
input,
206
extend_original,
207
} => {
208
let input_schema = &ctx.phys_sm[input.node].output_schema;
209
let phys_selectors = selectors
210
.iter()
211
.map(|selector| create_stream_expr(selector, ctx, input_schema))
212
.collect::<PolarsResult<_>>()?;
213
let input_key = to_graph_rec(input.node, ctx)?;
214
ctx.graph.add_node(
215
nodes::select::SelectNode::new(
216
phys_selectors,
217
node.output_schema.clone(),
218
*extend_original,
219
),
220
[(input_key, input.port)],
221
)
222
},
223
224
WithRowIndex {
225
input,
226
name,
227
offset,
228
} => {
229
let input_key = to_graph_rec(input.node, ctx)?;
230
ctx.graph.add_node(
231
nodes::with_row_index::WithRowIndexNode::new(name.clone(), *offset),
232
[(input_key, input.port)],
233
)
234
},
235
236
InputIndependentSelect { selectors } => {
237
let empty_schema = Default::default();
238
let phys_selectors = selectors
239
.iter()
240
.map(|selector| create_stream_expr(selector, ctx, &empty_schema))
241
.collect::<PolarsResult<_>>()?;
242
ctx.graph.add_node(
243
nodes::input_independent_select::InputIndependentSelectNode::new(phys_selectors),
244
[],
245
)
246
},
247
248
Reduce { input, exprs } => {
249
let input_key = to_graph_rec(input.node, ctx)?;
250
let input_schema = &ctx.phys_sm[input.node].output_schema;
251
252
let mut reductions = Vec::with_capacity(exprs.len());
253
let mut inputs = Vec::with_capacity(reductions.len());
254
255
for e in exprs {
256
let (red, input_node) = into_reduction(e.node(), ctx.expr_arena, input_schema)?;
257
reductions.push(red);
258
259
let input_phys = create_stream_expr(
260
&ExprIR::from_node(input_node, ctx.expr_arena),
261
ctx,
262
input_schema,
263
)?;
264
265
inputs.push(input_phys)
266
}
267
268
ctx.graph.add_node(
269
nodes::reduce::ReduceNode::new(inputs, reductions, node.output_schema.clone()),
270
[(input_key, input.port)],
271
)
272
},
273
SimpleProjection { input, columns } => {
274
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
275
let input_key = to_graph_rec(input.node, ctx)?;
276
ctx.graph.add_node(
277
nodes::simple_projection::SimpleProjectionNode::new(columns.clone(), input_schema),
278
[(input_key, input.port)],
279
)
280
},
281
282
InMemorySink { input } => {
283
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
284
let input_key = to_graph_rec(input.node, ctx)?;
285
ctx.graph.add_node(
286
nodes::in_memory_sink::InMemorySinkNode::new(input_schema),
287
[(input_key, input.port)],
288
)
289
},
290
291
FileSink {
292
target,
293
sink_options,
294
file_type,
295
input,
296
cloud_options,
297
} => {
298
let sink_options = sink_options.clone();
299
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
300
let input_key = to_graph_rec(input.node, ctx)?;
301
302
match file_type {
303
#[cfg(feature = "ipc")]
304
FileType::Ipc(ipc_writer_options) => ctx.graph.add_node(
305
SinkComputeNode::from(nodes::io_sinks::ipc::IpcSinkNode::new(
306
input_schema,
307
target.clone(),
308
sink_options,
309
*ipc_writer_options,
310
cloud_options.clone(),
311
)),
312
[(input_key, input.port)],
313
),
314
#[cfg(feature = "json")]
315
FileType::Json(_) => ctx.graph.add_node(
316
SinkComputeNode::from(nodes::io_sinks::json::NDJsonSinkNode::new(
317
target.clone(),
318
sink_options,
319
cloud_options.clone(),
320
)),
321
[(input_key, input.port)],
322
),
323
#[cfg(feature = "parquet")]
324
FileType::Parquet(parquet_writer_options) => ctx.graph.add_node(
325
SinkComputeNode::from(nodes::io_sinks::parquet::ParquetSinkNode::new(
326
input_schema,
327
target.clone(),
328
sink_options,
329
parquet_writer_options,
330
cloud_options.clone(),
331
false,
332
)?),
333
[(input_key, input.port)],
334
),
335
#[cfg(feature = "csv")]
336
FileType::Csv(csv_writer_options) => ctx.graph.add_node(
337
SinkComputeNode::from(nodes::io_sinks::csv::CsvSinkNode::new(
338
target.clone(),
339
input_schema,
340
sink_options,
341
csv_writer_options.clone(),
342
cloud_options.clone(),
343
)),
344
[(input_key, input.port)],
345
),
346
#[cfg(not(any(
347
feature = "csv",
348
feature = "parquet",
349
feature = "json",
350
feature = "ipc"
351
)))]
352
_ => {
353
panic!("activate source feature")
354
},
355
}
356
},
357
358
PartitionSink {
359
input,
360
base_path,
361
file_path_cb,
362
sink_options,
363
variant,
364
file_type,
365
cloud_options,
366
per_partition_sort_by,
367
finish_callback,
368
} => {
369
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
370
let input_key = to_graph_rec(input.node, ctx)?;
371
372
let base_path = base_path.clone();
373
let file_path_cb = file_path_cb.clone();
374
let ext = PlSmallStr::from_static(file_type.extension());
375
let create_new = nodes::io_sinks::partition::get_create_new_fn(
376
file_type.clone(),
377
sink_options.clone(),
378
cloud_options.clone(),
379
finish_callback.is_some(),
380
);
381
382
let per_partition_sort_by = match per_partition_sort_by.as_ref() {
383
None => None,
384
Some(c) => {
385
let (selectors, descending, nulls_last) = c
386
.iter()
387
.map(|c| {
388
Ok((
389
create_stream_expr(&c.expr, ctx, &input_schema)?,
390
c.descending,
391
c.nulls_last,
392
))
393
})
394
.collect::<PolarsResult<(Vec<_>, Vec<_>, Vec<_>)>>()?;
395
396
Some(PerPartitionSortBy {
397
selectors,
398
descending,
399
nulls_last,
400
maintain_order: true,
401
})
402
},
403
};
404
405
let sink_compute_node = match variant {
406
PartitionVariantIR::MaxSize(max_size) => SinkComputeNode::from(
407
nodes::io_sinks::partition::max_size::MaxSizePartitionSinkNode::new(
408
input_schema,
409
*max_size,
410
base_path,
411
file_path_cb,
412
create_new,
413
ext,
414
sink_options.clone(),
415
per_partition_sort_by,
416
finish_callback.clone(),
417
),
418
),
419
PartitionVariantIR::Parted {
420
key_exprs,
421
include_key,
422
} => SinkComputeNode::from(
423
nodes::io_sinks::partition::parted::PartedPartitionSinkNode::new(
424
input_schema,
425
key_exprs.iter().map(|e| e.output_name().clone()).collect(),
426
base_path,
427
file_path_cb,
428
create_new,
429
ext,
430
sink_options.clone(),
431
*include_key,
432
per_partition_sort_by,
433
finish_callback.clone(),
434
),
435
),
436
PartitionVariantIR::ByKey {
437
key_exprs,
438
include_key,
439
} => SinkComputeNode::from(
440
nodes::io_sinks::partition::by_key::PartitionByKeySinkNode::new(
441
input_schema,
442
key_exprs.iter().map(|e| e.output_name().clone()).collect(),
443
base_path,
444
file_path_cb,
445
create_new,
446
ext,
447
sink_options.clone(),
448
*include_key,
449
per_partition_sort_by,
450
finish_callback.clone(),
451
),
452
),
453
};
454
455
ctx.graph
456
.add_node(sink_compute_node, [(input_key, input.port)])
457
},
458
459
InMemoryMap {
460
input,
461
map,
462
format_str: _,
463
} => {
464
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
465
let input_key = to_graph_rec(input.node, ctx)?;
466
ctx.graph.add_node(
467
nodes::in_memory_map::InMemoryMapNode::new(input_schema, map.clone()),
468
[(input_key, input.port)],
469
)
470
},
471
472
Map { input, map } => {
473
let input_key = to_graph_rec(input.node, ctx)?;
474
ctx.graph.add_node(
475
nodes::map::MapNode::new(map.clone()),
476
[(input_key, input.port)],
477
)
478
},
479
480
Sort {
481
input,
482
by_column,
483
slice,
484
sort_options,
485
} => {
486
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
487
let lmdf = Arc::new(LateMaterializedDataFrame::default());
488
let mut lp_arena = Arena::default();
489
let df_node = lp_arena.add(lmdf.clone().as_ir_node(input_schema.clone()));
490
let sort_node = lp_arena.add(IR::Sort {
491
input: df_node,
492
by_column: by_column.clone(),
493
slice: *slice,
494
sort_options: sort_options.clone(),
495
});
496
let executor = Mutex::new(create_physical_plan(
497
sort_node,
498
&mut lp_arena,
499
ctx.expr_arena,
500
None,
501
)?);
502
503
let input_key = to_graph_rec(input.node, ctx)?;
504
ctx.graph.add_node(
505
nodes::in_memory_map::InMemoryMapNode::new(
506
input_schema,
507
Arc::new(move |df| {
508
lmdf.set_materialized_dataframe(df);
509
let mut state = ExecutionState::new();
510
executor.lock().execute(&mut state)
511
}),
512
),
513
[(input_key, input.port)],
514
)
515
},
516
517
TopK {
518
input,
519
k,
520
by_column,
521
reverse,
522
nulls_last,
523
} => {
524
let input_key = to_graph_rec(input.node, ctx)?;
525
let k_key = to_graph_rec(k.node, ctx)?;
526
527
let k_schema = ctx.phys_sm[k.node].output_schema.clone();
528
let input_schema = &ctx.phys_sm[input.node].output_schema;
529
let key_schema = compute_output_schema(input_schema, by_column, ctx.expr_arena)?;
530
531
let key_selectors = by_column
532
.iter()
533
.map(|e| create_stream_expr(e, ctx, input_schema))
534
.try_collect_vec()?;
535
536
ctx.graph.add_node(
537
nodes::top_k::TopKNode::new(
538
k_schema,
539
reverse.clone(),
540
nulls_last.clone(),
541
key_schema,
542
key_selectors,
543
),
544
[(input_key, input.port), (k_key, k.port)],
545
)
546
},
547
548
Repeat { value, repeats } => {
549
let value_key = to_graph_rec(value.node, ctx)?;
550
let repeats_key = to_graph_rec(repeats.node, ctx)?;
551
let value_schema = ctx.phys_sm[value.node].output_schema.clone();
552
let repeats_schema = ctx.phys_sm[repeats.node].output_schema.clone();
553
ctx.graph.add_node(
554
nodes::repeat::RepeatNode::new(value_schema, repeats_schema),
555
[(value_key, value.port), (repeats_key, repeats.port)],
556
)
557
},
558
559
#[cfg(feature = "cum_agg")]
560
CumAgg { input, kind } => {
561
let input_key = to_graph_rec(input.node, ctx)?;
562
ctx.graph.add_node(
563
nodes::cum_agg::CumAggNode::new(*kind),
564
[(input_key, input.port)],
565
)
566
},
567
568
Rle(input) => {
569
let input_key = to_graph_rec(input.node, ctx)?;
570
let input_schema = &ctx.phys_sm[input.node].output_schema;
571
assert_eq!(input_schema.len(), 1);
572
let (name, dtype) = input_schema.get_at_index(0).unwrap();
573
ctx.graph.add_node(
574
nodes::rle::RleNode::new(name.clone(), dtype.clone()),
575
[(input_key, input.port)],
576
)
577
},
578
579
RleId(input) => {
580
let input_key = to_graph_rec(input.node, ctx)?;
581
let input_schema = &ctx.phys_sm[input.node].output_schema;
582
assert_eq!(input_schema.len(), 1);
583
let (_, dtype) = input_schema.get_at_index(0).unwrap();
584
ctx.graph.add_node(
585
nodes::rle_id::RleIdNode::new(dtype.clone()),
586
[(input_key, input.port)],
587
)
588
},
589
590
PeakMinMax { input, is_peak_max } => {
591
let input_key = to_graph_rec(input.node, ctx)?;
592
ctx.graph.add_node(
593
nodes::peak_minmax::PeakMinMaxNode::new(*is_peak_max),
594
[(input_key, input.port)],
595
)
596
},
597
598
OrderedUnion { inputs } => {
599
let input_keys = inputs
600
.iter()
601
.map(|i| PolarsResult::Ok((to_graph_rec(i.node, ctx)?, i.port)))
602
.try_collect_vec()?;
603
ctx.graph
604
.add_node(nodes::ordered_union::OrderedUnionNode::new(), input_keys)
605
},
606
607
Zip {
608
inputs,
609
null_extend,
610
} => {
611
let input_schemas = inputs
612
.iter()
613
.map(|i| ctx.phys_sm[i.node].output_schema.clone())
614
.collect_vec();
615
let input_keys = inputs
616
.iter()
617
.map(|i| PolarsResult::Ok((to_graph_rec(i.node, ctx)?, i.port)))
618
.try_collect_vec()?;
619
ctx.graph.add_node(
620
nodes::zip::ZipNode::new(*null_extend, input_schemas),
621
input_keys,
622
)
623
},
624
625
Multiplexer { input } => {
626
let input_key = to_graph_rec(input.node, ctx)?;
627
ctx.graph.add_node(
628
nodes::multiplexer::MultiplexerNode::new(),
629
[(input_key, input.port)],
630
)
631
},
632
633
MultiScan {
634
scan_sources,
635
file_reader_builder,
636
cloud_options,
637
file_projection_builder,
638
output_schema,
639
row_index,
640
pre_slice,
641
predicate,
642
hive_parts,
643
missing_columns_policy,
644
cast_columns_policy,
645
include_file_paths,
646
forbid_extra_columns,
647
deletion_files,
648
file_schema,
649
} => {
650
let hive_parts = hive_parts.clone();
651
652
let predicate = predicate
653
.as_ref()
654
.map(|pred| {
655
create_scan_predicate(
656
pred,
657
ctx.expr_arena,
658
output_schema,
659
hive_parts.as_ref().map(|hp| hp.df().schema().as_ref()),
660
&mut ctx.expr_conversion_state,
661
true, // create_skip_batch_predicate
662
file_reader_builder
663
.reader_capabilities()
664
.contains(ReaderCapabilities::PARTIAL_FILTER), // create_column_predicates
665
)
666
})
667
.transpose()?
668
.map(|p| p.to_io(None, file_schema.clone()));
669
670
let sources = scan_sources.clone();
671
let file_reader_builder = file_reader_builder.clone();
672
let cloud_options = cloud_options.clone();
673
674
let final_output_schema = output_schema.clone();
675
let file_projection_builder = file_projection_builder.clone();
676
677
let row_index = row_index.clone();
678
let pre_slice = pre_slice.clone();
679
let hive_parts = hive_parts.map(Arc::new);
680
let include_file_paths = include_file_paths.clone();
681
let missing_columns_policy = *missing_columns_policy;
682
let forbid_extra_columns = forbid_extra_columns.clone();
683
let cast_columns_policy = cast_columns_policy.clone();
684
let deletion_files = deletion_files.clone();
685
686
let verbose = config::verbose();
687
688
ctx.graph.add_node(
689
nodes::io_sources::multi_scan::MultiScan::new(Arc::new(MultiScanConfig {
690
sources,
691
file_reader_builder,
692
cloud_options,
693
final_output_schema,
694
file_projection_builder,
695
row_index,
696
pre_slice,
697
predicate,
698
hive_parts,
699
include_file_paths,
700
missing_columns_policy,
701
forbid_extra_columns,
702
cast_columns_policy,
703
deletion_files,
704
// Initialized later
705
num_pipelines: RelaxedCell::new_usize(0),
706
n_readers_pre_init: RelaxedCell::new_usize(0),
707
max_concurrent_scans: RelaxedCell::new_usize(0),
708
verbose,
709
})),
710
[],
711
)
712
},
713
714
GroupBy { input, key, aggs } => {
715
let input_key = to_graph_rec(input.node, ctx)?;
716
717
let input_schema = &ctx.phys_sm[input.node].output_schema;
718
let key_schema = compute_output_schema(input_schema, key, ctx.expr_arena)?;
719
let grouper = new_hash_grouper(key_schema.clone());
720
721
let key_selectors = key
722
.iter()
723
.map(|e| create_stream_expr(e, ctx, input_schema))
724
.try_collect_vec()?;
725
726
let mut grouped_reductions = Vec::new();
727
let mut grouped_reduction_cols = Vec::new();
728
let mut has_order_sensitive_agg = false;
729
for agg in aggs {
730
has_order_sensitive_agg |= matches!(
731
ctx.expr_arena.get(agg.node()),
732
AExpr::Agg(IRAggExpr::First(..) | IRAggExpr::Last(..))
733
);
734
let (reduction, input_node) =
735
into_reduction(agg.node(), ctx.expr_arena, input_schema)?;
736
let AExpr::Column(col) = ctx.expr_arena.get(input_node) else {
737
unreachable!()
738
};
739
grouped_reductions.push(reduction);
740
grouped_reduction_cols.push(col.clone());
741
}
742
743
ctx.graph.add_node(
744
nodes::group_by::GroupByNode::new(
745
key_schema,
746
key_selectors,
747
grouper,
748
grouped_reduction_cols,
749
grouped_reductions,
750
node.output_schema.clone(),
751
PlRandomState::default(),
752
ctx.num_pipelines,
753
has_order_sensitive_agg,
754
),
755
[(input_key, input.port)],
756
)
757
},
758
759
InMemoryJoin {
760
input_left,
761
input_right,
762
left_on,
763
right_on,
764
args,
765
options,
766
} => {
767
let left_input_key = to_graph_rec(input_left.node, ctx)?;
768
let right_input_key = to_graph_rec(input_right.node, ctx)?;
769
let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();
770
let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();
771
772
let mut lp_arena = Arena::default();
773
let left_lmdf = Arc::new(LateMaterializedDataFrame::default());
774
let right_lmdf = Arc::new(LateMaterializedDataFrame::default());
775
776
let left_node = lp_arena.add(left_lmdf.clone().as_ir_node(left_input_schema.clone()));
777
let right_node =
778
lp_arena.add(right_lmdf.clone().as_ir_node(right_input_schema.clone()));
779
let join_node = lp_arena.add(IR::Join {
780
input_left: left_node,
781
input_right: right_node,
782
schema: node.output_schema.clone(),
783
left_on: left_on.clone(),
784
right_on: right_on.clone(),
785
options: Arc::new(JoinOptionsIR {
786
allow_parallel: true,
787
force_parallel: false,
788
args: args.clone(),
789
options: options.clone(),
790
rows_left: (None, 0),
791
rows_right: (None, 0),
792
}),
793
});
794
795
let executor = Mutex::new(create_physical_plan(
796
join_node,
797
&mut lp_arena,
798
ctx.expr_arena,
799
None,
800
)?);
801
802
ctx.graph.add_node(
803
nodes::joins::in_memory::InMemoryJoinNode::new(
804
left_input_schema,
805
right_input_schema,
806
Arc::new(move |left, right| {
807
left_lmdf.set_materialized_dataframe(left);
808
right_lmdf.set_materialized_dataframe(right);
809
let mut state = ExecutionState::new();
810
executor.lock().execute(&mut state)
811
}),
812
),
813
[
814
(left_input_key, input_left.port),
815
(right_input_key, input_right.port),
816
],
817
)
818
},
819
820
EquiJoin {
821
input_left,
822
input_right,
823
left_on,
824
right_on,
825
args,
826
}
827
| SemiAntiJoin {
828
input_left,
829
input_right,
830
left_on,
831
right_on,
832
args,
833
output_bool: _,
834
} => {
835
let args = args.clone();
836
let left_input_key = to_graph_rec(input_left.node, ctx)?;
837
let right_input_key = to_graph_rec(input_right.node, ctx)?;
838
let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();
839
let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();
840
841
let left_key_schema =
842
compute_output_schema(&left_input_schema, left_on, ctx.expr_arena)?;
843
let right_key_schema =
844
compute_output_schema(&right_input_schema, right_on, ctx.expr_arena)?;
845
846
// We want to make sure here that the key types match otherwise we get out garbage out
847
// since the hashes will be calculated differently.
848
polars_ensure!(
849
left_on.len() == right_on.len() &&
850
left_on.iter().zip(right_on.iter()).all(|(l, r)| {
851
let l_dtype = left_key_schema.get(l.output_name()).unwrap();
852
let r_dtype = right_key_schema.get(r.output_name()).unwrap();
853
l_dtype == r_dtype
854
}),
855
SchemaMismatch: "join received different key types on left and right side"
856
);
857
858
// We use key columns entirely by position, and allow duplicate names in key selectors,
859
// so just assign arbitrary unique names for the selectors.
860
let unique_left_on = left_on
861
.iter()
862
.enumerate()
863
.map(|(i, expr)| expr.with_alias(format_pl_smallstr!("__POLARS_KEYCOL_{i}")))
864
.collect_vec();
865
let unique_right_on = right_on
866
.iter()
867
.enumerate()
868
.map(|(i, expr)| expr.with_alias(format_pl_smallstr!("__POLARS_KEYCOL_{i}")))
869
.collect_vec();
870
871
let left_key_selectors = unique_left_on
872
.iter()
873
.map(|e| create_stream_expr(e, ctx, &left_input_schema))
874
.try_collect_vec()?;
875
let right_key_selectors = unique_right_on
876
.iter()
877
.map(|e| create_stream_expr(e, ctx, &right_input_schema))
878
.try_collect_vec()?;
879
880
let unique_key_schema =
881
compute_output_schema(&right_input_schema, &unique_left_on, ctx.expr_arena)?;
882
883
match node.kind {
884
#[cfg(feature = "semi_anti_join")]
885
SemiAntiJoin { output_bool, .. } => ctx.graph.add_node(
886
nodes::joins::semi_anti_join::SemiAntiJoinNode::new(
887
unique_key_schema,
888
left_key_selectors,
889
right_key_selectors,
890
args,
891
output_bool,
892
ctx.num_pipelines,
893
)?,
894
[
895
(left_input_key, input_left.port),
896
(right_input_key, input_right.port),
897
],
898
),
899
_ => ctx.graph.add_node(
900
nodes::joins::equi_join::EquiJoinNode::new(
901
left_input_schema,
902
right_input_schema,
903
left_key_schema,
904
right_key_schema,
905
unique_key_schema,
906
left_key_selectors,
907
right_key_selectors,
908
args,
909
ctx.num_pipelines,
910
)?,
911
[
912
(left_input_key, input_left.port),
913
(right_input_key, input_right.port),
914
],
915
),
916
}
917
},
918
919
CrossJoin {
920
input_left,
921
input_right,
922
args,
923
} => {
924
let args = args.clone();
925
let left_input_key = to_graph_rec(input_left.node, ctx)?;
926
let right_input_key = to_graph_rec(input_right.node, ctx)?;
927
let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();
928
let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();
929
930
ctx.graph.add_node(
931
nodes::joins::cross_join::CrossJoinNode::new(
932
left_input_schema,
933
right_input_schema,
934
&args,
935
),
936
[
937
(left_input_key, input_left.port),
938
(right_input_key, input_right.port),
939
],
940
)
941
},
942
943
#[cfg(feature = "merge_sorted")]
944
MergeSorted {
945
input_left,
946
input_right,
947
} => {
948
let left_input_key = to_graph_rec(input_left.node, ctx)?;
949
let right_input_key = to_graph_rec(input_right.node, ctx)?;
950
ctx.graph.add_node(
951
nodes::merge_sorted::MergeSortedNode::new(),
952
[
953
(left_input_key, input_left.port),
954
(right_input_key, input_right.port),
955
],
956
)
957
},
958
959
#[cfg(feature = "python")]
960
PythonScan { options } => {
961
use polars_plan::dsl::python_dsl::PythonScanSource as S;
962
use polars_plan::plans::PythonPredicate;
963
use polars_utils::relaxed_cell::RelaxedCell;
964
use pyo3::exceptions::PyStopIteration;
965
use pyo3::prelude::*;
966
use pyo3::types::{PyBytes, PyNone};
967
use pyo3::{IntoPyObjectExt, PyTypeInfo, intern};
968
969
let mut options = options.clone();
970
let with_columns = options.with_columns.take();
971
let n_rows = options.n_rows.take();
972
973
let python_scan_function = options.scan_fn.take().unwrap().0;
974
975
let with_columns = with_columns.map(|cols| cols.iter().cloned().collect::<Vec<_>>());
976
977
let (pl_predicate, predicate_serialized) = polars_mem_engine::python_scan_predicate(
978
&mut options,
979
ctx.expr_arena,
980
&mut ctx.expr_conversion_state,
981
)?;
982
983
let output_schema = options.output_schema.unwrap_or(options.schema);
984
let validate_schema = options.validate_schema;
985
986
let simple_projection = with_columns.as_ref().and_then(|with_columns| {
987
(with_columns
988
.iter()
989
.zip(output_schema.iter_names())
990
.any(|(a, b)| a != b))
991
.then(|| output_schema.clone())
992
});
993
994
let (name, get_batch_fn) = match options.python_source {
995
S::Pyarrow => todo!(),
996
S::Cuda => todo!(),
997
S::IOPlugin => {
998
let batch_size = Some(get_ideal_morsel_size());
999
let output_schema = output_schema.clone();
1000
1001
let with_columns = with_columns.map(|x| {
1002
x.into_iter()
1003
.map(|x| x.to_string())
1004
.collect::<Vec<String>>()
1005
});
1006
1007
// Setup the IO plugin generator.
1008
let (generator, can_parse_predicate) = {
1009
Python::with_gil(|py| {
1010
let pl = PyModule::import(py, intern!(py, "polars")).unwrap();
1011
let utils = pl.getattr(intern!(py, "_utils")).unwrap();
1012
let callable =
1013
utils.getattr(intern!(py, "_execute_from_rust")).unwrap();
1014
1015
let mut could_serialize_predicate = true;
1016
let predicate = match &options.predicate {
1017
PythonPredicate::PyArrow(s) => s.into_bound_py_any(py).unwrap(),
1018
PythonPredicate::None => None::<()>.into_bound_py_any(py).unwrap(),
1019
PythonPredicate::Polars(_) => {
1020
assert!(pl_predicate.is_some(), "should be set");
1021
match &predicate_serialized {
1022
None => {
1023
could_serialize_predicate = false;
1024
PyNone::get(py).to_owned().into_any()
1025
},
1026
Some(buf) => PyBytes::new(py, buf).into_any(),
1027
}
1028
},
1029
};
1030
1031
let args = (
1032
python_scan_function,
1033
with_columns,
1034
predicate,
1035
n_rows,
1036
batch_size,
1037
);
1038
1039
let generator_init = callable.call1(args)?;
1040
let generator = generator_init.get_item(0).map_err(
1041
|_| polars_err!(ComputeError: "expected tuple got {generator_init}"),
1042
)?;
1043
let can_parse_predicate = generator_init.get_item(1).map_err(
1044
|_| polars_err!(ComputeError: "expected tuple got {generator}"),
1045
)?;
1046
let can_parse_predicate = can_parse_predicate.extract::<bool>().map_err(
1047
|_| polars_err!(ComputeError: "expected bool got {can_parse_predicate}"),
1048
)? && could_serialize_predicate;
1049
1050
let generator = generator.into_py_any(py).map_err(
1051
|_| polars_err!(ComputeError: "unable to grab reference to IO plugin generator"),
1052
)?;
1053
1054
PolarsResult::Ok((generator, can_parse_predicate))
1055
})
1056
}?;
1057
1058
let get_batch_fn = Box::new(move |state: &StreamingExecutionState| {
1059
let df = Python::with_gil(|py| {
1060
match generator.bind(py).call_method0(intern!(py, "__next__")) {
1061
Ok(out) => polars_plan::plans::python_df_to_rust(py, out).map(Some),
1062
Err(err)
1063
if err.matches(py, PyStopIteration::type_object(py))? =>
1064
{
1065
Ok(None)
1066
},
1067
Err(err) => polars_bail!(
1068
ComputeError: "caught exception during execution of a Python source, exception: {err}"
1069
),
1070
}
1071
})?;
1072
1073
let Some(mut df) = df else { return Ok(None) };
1074
1075
if let Some(simple_projection) = &simple_projection {
1076
df = df.project(simple_projection.clone())?;
1077
}
1078
1079
if validate_schema {
1080
polars_ensure!(
1081
df.schema() == &output_schema,
1082
SchemaMismatch: "user provided schema: {:?} doesn't match the DataFrame schema: {:?}",
1083
output_schema, df.schema()
1084
);
1085
}
1086
1087
// TODO: Move this to a FilterNode so that it happens in parallel. We may need
1088
// to move all of the enclosing code to `lower_ir` for this.
1089
if let (Some(pred), false) = (&pl_predicate, can_parse_predicate) {
1090
let mask = pred.evaluate(&df, &state.in_memory_exec_state)?;
1091
df = df.filter(mask.bool()?)?;
1092
}
1093
1094
Ok(Some(df))
1095
}) as Box<_>;
1096
1097
(PlSmallStr::from_static("io_plugin"), get_batch_fn)
1098
},
1099
};
1100
1101
use polars_plan::dsl::{CastColumnsPolicy, MissingColumnsPolicy};
1102
1103
use crate::nodes::io_sources::batch::builder::BatchFnReaderBuilder;
1104
use crate::nodes::io_sources::batch::{BatchFnReader, GetBatchState};
1105
use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;
1106
1107
let reader = BatchFnReader {
1108
name: name.clone(),
1109
// If validate_schema is false, the schema of the morsels may not match the
1110
// configured schema. In this case we set this to `None` and the reader will
1111
// retrieve the schema from the first morsel.
1112
output_schema: validate_schema.then(|| output_schema.clone()),
1113
get_batch_state: Some(GetBatchState::from(get_batch_fn)),
1114
execution_state: None,
1115
verbose: config::verbose(),
1116
};
1117
1118
let file_reader_builder = Arc::new(BatchFnReaderBuilder {
1119
name,
1120
reader: std::sync::Mutex::new(Some(reader)),
1121
execution_state: Default::default(),
1122
}) as Arc<dyn FileReaderBuilder>;
1123
1124
// Give multiscan a single scan source. (It doesn't actually read from this).
1125
let sources = ScanSources::Paths(Arc::from([PlPath::from_str("python-scan-0")]));
1126
let cloud_options = None;
1127
let final_output_schema = output_schema.clone();
1128
let file_projection_builder = ProjectionBuilder::new(output_schema, None, None);
1129
let row_index = None;
1130
let pre_slice = None;
1131
let predicate = None;
1132
let hive_parts = None;
1133
let include_file_paths = None;
1134
let missing_columns_policy = MissingColumnsPolicy::Raise;
1135
let forbid_extra_columns = None;
1136
let cast_columns_policy = CastColumnsPolicy::ERROR_ON_MISMATCH;
1137
let deletion_files = None;
1138
let verbose = config::verbose();
1139
1140
ctx.graph.add_node(
1141
nodes::io_sources::multi_scan::MultiScan::new(Arc::new(MultiScanConfig {
1142
sources,
1143
file_reader_builder,
1144
cloud_options,
1145
final_output_schema,
1146
file_projection_builder,
1147
row_index,
1148
pre_slice,
1149
predicate,
1150
hive_parts,
1151
include_file_paths,
1152
missing_columns_policy,
1153
forbid_extra_columns,
1154
cast_columns_policy,
1155
deletion_files,
1156
// Initialized later
1157
num_pipelines: RelaxedCell::new_usize(0),
1158
n_readers_pre_init: RelaxedCell::new_usize(0),
1159
max_concurrent_scans: RelaxedCell::new_usize(0),
1160
verbose,
1161
})),
1162
[],
1163
)
1164
},
1165
};
1166
1167
ctx.phys_to_graph.insert(phys_node_key, graph_key);
1168
Ok(graph_key)
1169
}
1170
1171