Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/physical_plan/mod.rs
8433 views
1
use std::num::NonZeroUsize;
2
use std::sync::Arc;
3
4
use polars_core::frame::DataFrame;
5
use polars_core::prelude::{IdxSize, InitHashMaps, PlHashMap, SortMultipleOptions};
6
use polars_core::schema::{Schema, SchemaRef};
7
use polars_error::PolarsResult;
8
use polars_io::RowIndex;
9
use polars_io::cloud::CloudOptions;
10
use polars_ops::frame::JoinArgs;
11
use polars_plan::dsl::deletion::DeletionFilesList;
12
use polars_plan::dsl::{
13
CastColumnsPolicy, FileSinkOptions, JoinTypeOptionsIR, MissingColumnsPolicy,
14
PartitionedSinkOptionsIR, PredicateFileSkip, ScanSources, TableStatistics,
15
};
16
use polars_plan::plans::expr_ir::ExprIR;
17
use polars_plan::plans::hive::HivePartitionsDf;
18
use polars_plan::plans::{AExpr, DataFrameUdf, IR};
19
20
mod fmt;
21
mod io;
22
mod lower_expr;
23
mod lower_group_by;
24
mod lower_ir;
25
mod to_graph;
26
27
pub use fmt::{NodeStyle, visualize_plan};
28
use polars_plan::prelude::PlanCallback;
29
#[cfg(feature = "dynamic_group_by")]
30
use polars_time::DynamicGroupOptions;
31
use polars_time::{ClosedWindow, Duration};
32
use polars_utils::arena::{Arena, Node};
33
use polars_utils::pl_str::PlSmallStr;
34
use polars_utils::slice_enum::Slice;
35
use slotmap::{SecondaryMap, SlotMap};
36
pub use to_graph::physical_plan_to_graph;
37
38
pub use self::lower_ir::StreamingLowerIRContext;
39
use crate::nodes::io_sources::multi_scan::components::forbid_extra_columns::ForbidExtraColumns;
40
use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;
41
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
42
use crate::physical_plan::lower_expr::ExprCache;
43
44
slotmap::new_key_type! {
45
/// Key used for physical nodes.
46
pub struct PhysNodeKey;
47
}
48
49
impl PhysNodeKey {
50
pub fn as_ffi(&self) -> u64 {
51
self.0.as_ffi()
52
}
53
}
54
55
/// A node in the physical plan.
56
///
57
/// A physical plan is created when the `IR` is translated to a directed
58
/// acyclic graph of operations that can run on the streaming engine.
59
#[derive(Clone, Debug)]
60
pub struct PhysNode {
61
output_schema: Arc<Schema>,
62
kind: PhysNodeKind,
63
}
64
65
impl PhysNode {
66
pub fn new(output_schema: Arc<Schema>, kind: PhysNodeKind) -> Self {
67
Self {
68
output_schema,
69
kind,
70
}
71
}
72
73
pub fn kind(&self) -> &PhysNodeKind {
74
&self.kind
75
}
76
}
77
78
/// A handle representing a physical stream of data with a fixed schema in the
79
/// physical plan. It consists of a reference to a physical node as well as the
80
/// output port on that node to connect to receive the stream.
81
#[derive(Clone, Debug, Copy, PartialEq, Eq, Hash)]
82
pub struct PhysStream {
83
pub node: PhysNodeKey,
84
pub port: usize,
85
}
86
87
impl PhysStream {
88
#[allow(unused)]
89
pub fn new(node: PhysNodeKey, port: usize) -> Self {
90
Self { node, port }
91
}
92
93
// Convenience method to refer to the first output port of a physical node.
94
pub fn first(node: PhysNodeKey) -> Self {
95
Self { node, port: 0 }
96
}
97
}
98
99
/// Behaviour when handling multiple DataFrames with different heights.
100
101
#[derive(Clone, Debug, Copy)]
102
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
103
#[cfg_attr(
104
feature = "physical_plan_visualization_schema",
105
derive(schemars::JsonSchema)
106
)]
107
pub enum ZipBehavior {
108
/// Fill the shorter DataFrames with nulls to the height of the longest DataFrame.
109
NullExtend,
110
/// All inputs must be the same height, or have length 1 in which case they are broadcast.
111
Broadcast,
112
/// Raise an error if the DataFrames have different heights.
113
Strict,
114
}
115
116
#[derive(Clone, Debug)]
117
pub enum PhysNodeKind {
118
InMemorySource {
119
df: Arc<DataFrame>,
120
disable_morsel_split: bool,
121
},
122
123
Select {
124
input: PhysStream,
125
selectors: Vec<ExprIR>,
126
extend_original: bool,
127
},
128
129
InputIndependentSelect {
130
selectors: Vec<ExprIR>,
131
},
132
133
WithRowIndex {
134
input: PhysStream,
135
name: PlSmallStr,
136
offset: Option<IdxSize>,
137
},
138
139
Reduce {
140
input: PhysStream,
141
exprs: Vec<ExprIR>,
142
},
143
144
StreamingSlice {
145
input: PhysStream,
146
offset: usize,
147
length: usize,
148
},
149
150
NegativeSlice {
151
input: PhysStream,
152
offset: i64,
153
length: usize,
154
},
155
156
DynamicSlice {
157
input: PhysStream,
158
offset: PhysStream,
159
length: PhysStream,
160
},
161
162
Shift {
163
input: PhysStream,
164
offset: PhysStream,
165
fill: Option<PhysStream>,
166
},
167
168
Filter {
169
input: PhysStream,
170
predicate: ExprIR,
171
},
172
173
SimpleProjection {
174
input: PhysStream,
175
columns: Vec<PlSmallStr>,
176
},
177
178
InMemorySink {
179
input: PhysStream,
180
},
181
182
CallbackSink {
183
input: PhysStream,
184
function: PlanCallback<DataFrame, bool>,
185
maintain_order: bool,
186
chunk_size: Option<NonZeroUsize>,
187
},
188
189
FileSink {
190
input: PhysStream,
191
options: FileSinkOptions,
192
},
193
194
PartitionedSink {
195
input: PhysStream,
196
options: PartitionedSinkOptionsIR,
197
},
198
199
SinkMultiple {
200
sinks: Vec<PhysNodeKey>,
201
},
202
203
/// Generic fallback for (as-of-yet) unsupported streaming mappings.
204
/// Fully sinks all data to an in-memory data frame and uses the in-memory
205
/// engine to perform the map.
206
InMemoryMap {
207
input: PhysStream,
208
map: Arc<dyn DataFrameUdf>,
209
210
/// A formatted explain of what the in-memory map. This usually calls format on the IR.
211
format_str: Option<String>,
212
},
213
214
Map {
215
input: PhysStream,
216
map: Arc<dyn DataFrameUdf>,
217
218
/// A formatted explain of what the in-memory map. This usually calls format on the IR.
219
format_str: Option<String>,
220
},
221
222
SortedGroupBy {
223
input: PhysStream,
224
key: PlSmallStr,
225
aggs: Vec<ExprIR>,
226
slice: Option<(IdxSize, IdxSize)>,
227
},
228
229
Sort {
230
input: PhysStream,
231
by_column: Vec<ExprIR>,
232
slice: Option<(i64, usize)>,
233
sort_options: SortMultipleOptions,
234
},
235
236
TopK {
237
input: PhysStream,
238
k: PhysStream,
239
by_column: Vec<ExprIR>,
240
reverse: Vec<bool>,
241
nulls_last: Vec<bool>,
242
},
243
244
Repeat {
245
value: PhysStream,
246
repeats: PhysStream,
247
},
248
249
#[cfg(feature = "cum_agg")]
250
CumAgg {
251
input: PhysStream,
252
kind: crate::nodes::cum_agg::CumAggKind,
253
},
254
255
// Parameter is the input stream
256
GatherEvery {
257
input: PhysStream,
258
n: usize,
259
offset: usize,
260
},
261
Rle(PhysStream),
262
RleId(PhysStream),
263
PeakMinMax {
264
input: PhysStream,
265
is_peak_max: bool,
266
},
267
268
OrderedUnion {
269
inputs: Vec<PhysStream>,
270
},
271
272
UnorderedUnion {
273
inputs: Vec<PhysStream>,
274
},
275
276
Zip {
277
inputs: Vec<PhysStream>,
278
zip_behavior: ZipBehavior,
279
},
280
281
#[allow(unused)]
282
Multiplexer {
283
input: PhysStream,
284
},
285
286
MultiScan {
287
scan_sources: ScanSources,
288
289
file_reader_builder: Arc<dyn FileReaderBuilder>,
290
cloud_options: Option<Arc<CloudOptions>>,
291
292
/// Columns to project from the file.
293
file_projection_builder: ProjectionBuilder,
294
/// Final output schema of morsels being sent out of MultiScan.
295
output_schema: SchemaRef,
296
297
row_index: Option<RowIndex>,
298
pre_slice: Option<Slice>,
299
predicate: Option<ExprIR>,
300
predicate_file_skip_applied: Option<PredicateFileSkip>,
301
302
hive_parts: Option<HivePartitionsDf>,
303
include_file_paths: Option<PlSmallStr>,
304
cast_columns_policy: CastColumnsPolicy,
305
missing_columns_policy: MissingColumnsPolicy,
306
forbid_extra_columns: Option<ForbidExtraColumns>,
307
308
deletion_files: Option<DeletionFilesList>,
309
table_statistics: Option<TableStatistics>,
310
311
/// Schema of columns contained in the file. Does not contain external columns (e.g. hive / row_index).
312
file_schema: SchemaRef,
313
disable_morsel_split: bool,
314
},
315
316
#[cfg(feature = "python")]
317
PythonScan {
318
options: polars_plan::plans::python::PythonOptions,
319
},
320
321
GroupBy {
322
inputs: Vec<PhysStream>,
323
// Must have the same schema when applied for each input.
324
key_per_input: Vec<Vec<ExprIR>>,
325
// Must be a 'simple' expression, a singular column feeding into a single aggregate, or Len.
326
aggs_per_input: Vec<Vec<ExprIR>>,
327
},
328
329
#[cfg(feature = "dynamic_group_by")]
330
DynamicGroupBy {
331
input: PhysStream,
332
options: DynamicGroupOptions,
333
aggs: Vec<ExprIR>,
334
slice: Option<(IdxSize, IdxSize)>,
335
},
336
337
#[cfg(feature = "dynamic_group_by")]
338
RollingGroupBy {
339
input: PhysStream,
340
index_column: PlSmallStr,
341
period: Duration,
342
offset: Duration,
343
closed: ClosedWindow,
344
slice: Option<(IdxSize, IdxSize)>,
345
aggs: Vec<ExprIR>,
346
},
347
348
EquiJoin {
349
input_left: PhysStream,
350
input_right: PhysStream,
351
left_on: Vec<ExprIR>,
352
right_on: Vec<ExprIR>,
353
args: JoinArgs,
354
},
355
356
MergeJoin {
357
input_left: PhysStream,
358
input_right: PhysStream,
359
left_on: Vec<PlSmallStr>,
360
right_on: Vec<PlSmallStr>,
361
tmp_left_key_col: Option<PlSmallStr>,
362
tmp_right_key_col: Option<PlSmallStr>,
363
descending: bool,
364
nulls_last: bool,
365
keys_row_encoded: bool,
366
args: JoinArgs,
367
},
368
369
SemiAntiJoin {
370
input_left: PhysStream,
371
input_right: PhysStream,
372
left_on: Vec<ExprIR>,
373
right_on: Vec<ExprIR>,
374
args: JoinArgs,
375
output_bool: bool,
376
},
377
378
CrossJoin {
379
input_left: PhysStream,
380
input_right: PhysStream,
381
args: JoinArgs,
382
},
383
384
AsOfJoin {
385
input_left: PhysStream,
386
input_right: PhysStream,
387
left_on: PlSmallStr,
388
right_on: PlSmallStr,
389
tmp_left_key_col: Option<PlSmallStr>,
390
tmp_right_key_col: Option<PlSmallStr>,
391
args: JoinArgs,
392
},
393
394
/// Generic fallback for (as-of-yet) unsupported streaming joins.
395
/// Fully sinks all data to in-memory data frames and uses the in-memory
396
/// engine to perform the join.
397
InMemoryJoin {
398
input_left: PhysStream,
399
input_right: PhysStream,
400
left_on: Vec<ExprIR>,
401
right_on: Vec<ExprIR>,
402
args: JoinArgs,
403
options: Option<JoinTypeOptionsIR>,
404
},
405
406
#[cfg(feature = "merge_sorted")]
407
MergeSorted {
408
input_left: PhysStream,
409
input_right: PhysStream,
410
},
411
412
#[cfg(feature = "ewma")]
413
EwmMean {
414
input: PhysStream,
415
options: polars_ops::series::EWMOptions,
416
},
417
418
#[cfg(feature = "ewma")]
419
EwmVar {
420
input: PhysStream,
421
options: polars_ops::series::EWMOptions,
422
},
423
424
#[cfg(feature = "ewma")]
425
EwmStd {
426
input: PhysStream,
427
options: polars_ops::series::EWMOptions,
428
},
429
}
430
431
fn visit_node_inputs_mut(
432
roots: Vec<PhysNodeKey>,
433
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
434
mut visit: impl FnMut(&mut PhysStream),
435
) {
436
let mut to_visit = roots;
437
let mut seen: SecondaryMap<PhysNodeKey, ()> =
438
to_visit.iter().copied().map(|n| (n, ())).collect();
439
macro_rules! rec {
440
($n:expr) => {
441
let n = $n;
442
if seen.insert(n, ()).is_none() {
443
to_visit.push(n)
444
}
445
};
446
}
447
while let Some(node) = to_visit.pop() {
448
match &mut phys_sm[node].kind {
449
PhysNodeKind::InMemorySource { .. }
450
| PhysNodeKind::MultiScan { .. }
451
| PhysNodeKind::InputIndependentSelect { .. } => {},
452
#[cfg(feature = "python")]
453
PhysNodeKind::PythonScan { .. } => {},
454
PhysNodeKind::Select { input, .. }
455
| PhysNodeKind::WithRowIndex { input, .. }
456
| PhysNodeKind::Reduce { input, .. }
457
| PhysNodeKind::StreamingSlice { input, .. }
458
| PhysNodeKind::NegativeSlice { input, .. }
459
| PhysNodeKind::Filter { input, .. }
460
| PhysNodeKind::SimpleProjection { input, .. }
461
| PhysNodeKind::InMemorySink { input }
462
| PhysNodeKind::CallbackSink { input, .. }
463
| PhysNodeKind::FileSink { input, .. }
464
| PhysNodeKind::PartitionedSink { input, .. }
465
| PhysNodeKind::InMemoryMap { input, .. }
466
| PhysNodeKind::SortedGroupBy { input, .. }
467
| PhysNodeKind::Map { input, .. }
468
| PhysNodeKind::Sort { input, .. }
469
| PhysNodeKind::Multiplexer { input }
470
| PhysNodeKind::GatherEvery { input, .. }
471
| PhysNodeKind::Rle(input)
472
| PhysNodeKind::RleId(input)
473
| PhysNodeKind::PeakMinMax { input, .. } => {
474
rec!(input.node);
475
visit(input);
476
},
477
478
#[cfg(feature = "dynamic_group_by")]
479
PhysNodeKind::DynamicGroupBy { input, .. } => {
480
rec!(input.node);
481
visit(input);
482
},
483
#[cfg(feature = "dynamic_group_by")]
484
PhysNodeKind::RollingGroupBy { input, .. } => {
485
rec!(input.node);
486
visit(input);
487
},
488
489
#[cfg(feature = "cum_agg")]
490
PhysNodeKind::CumAgg { input, .. } => {
491
rec!(input.node);
492
visit(input);
493
},
494
495
PhysNodeKind::InMemoryJoin {
496
input_left,
497
input_right,
498
..
499
}
500
| PhysNodeKind::EquiJoin {
501
input_left,
502
input_right,
503
..
504
}
505
| PhysNodeKind::MergeJoin {
506
input_left,
507
input_right,
508
..
509
}
510
| PhysNodeKind::SemiAntiJoin {
511
input_left,
512
input_right,
513
..
514
}
515
| PhysNodeKind::CrossJoin {
516
input_left,
517
input_right,
518
..
519
}
520
| PhysNodeKind::AsOfJoin {
521
input_left,
522
input_right,
523
..
524
} => {
525
rec!(input_left.node);
526
rec!(input_right.node);
527
visit(input_left);
528
visit(input_right);
529
},
530
531
#[cfg(feature = "merge_sorted")]
532
PhysNodeKind::MergeSorted {
533
input_left,
534
input_right,
535
..
536
} => {
537
rec!(input_left.node);
538
rec!(input_right.node);
539
visit(input_left);
540
visit(input_right);
541
},
542
543
PhysNodeKind::TopK { input, k, .. } => {
544
rec!(input.node);
545
rec!(k.node);
546
visit(input);
547
visit(k);
548
},
549
550
PhysNodeKind::DynamicSlice {
551
input,
552
offset,
553
length,
554
} => {
555
rec!(input.node);
556
rec!(offset.node);
557
rec!(length.node);
558
visit(input);
559
visit(offset);
560
visit(length);
561
},
562
563
PhysNodeKind::Shift {
564
input,
565
offset,
566
fill,
567
} => {
568
rec!(input.node);
569
rec!(offset.node);
570
if let Some(fill) = fill {
571
rec!(fill.node);
572
}
573
visit(input);
574
visit(offset);
575
if let Some(fill) = fill {
576
visit(fill);
577
}
578
},
579
580
PhysNodeKind::Repeat { value, repeats } => {
581
rec!(value.node);
582
rec!(repeats.node);
583
visit(value);
584
visit(repeats);
585
},
586
587
PhysNodeKind::GroupBy { inputs, .. }
588
| PhysNodeKind::OrderedUnion { inputs }
589
| PhysNodeKind::UnorderedUnion { inputs }
590
| PhysNodeKind::Zip { inputs, .. } => {
591
for input in inputs {
592
rec!(input.node);
593
visit(input);
594
}
595
},
596
597
PhysNodeKind::SinkMultiple { sinks } => {
598
for sink in sinks {
599
rec!(*sink);
600
visit(&mut PhysStream::first(*sink));
601
}
602
},
603
604
#[cfg(feature = "ewma")]
605
PhysNodeKind::EwmMean { input, options: _ }
606
| PhysNodeKind::EwmVar { input, options: _ }
607
| PhysNodeKind::EwmStd { input, options: _ } => {
608
rec!(input.node);
609
visit(input)
610
},
611
}
612
}
613
}
614
615
fn insert_multiplexers(roots: Vec<PhysNodeKey>, phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>) {
616
let mut refcount = PlHashMap::new();
617
visit_node_inputs_mut(roots.clone(), phys_sm, |i| {
618
*refcount.entry(*i).or_insert(0) += 1;
619
});
620
621
let mut multiplexer_map: PlHashMap<PhysStream, PhysStream> = refcount
622
.into_iter()
623
.filter(|(_stream, refcount)| *refcount > 1)
624
.map(|(stream, _refcount)| {
625
let input_schema = phys_sm[stream.node].output_schema.clone();
626
let multiplexer_node = phys_sm.insert(PhysNode::new(
627
input_schema,
628
PhysNodeKind::Multiplexer { input: stream },
629
));
630
(stream, PhysStream::first(multiplexer_node))
631
})
632
.collect();
633
634
visit_node_inputs_mut(roots, phys_sm, |i| {
635
if let Some(m) = multiplexer_map.get_mut(i) {
636
*i = *m;
637
m.port += 1;
638
}
639
});
640
}
641
642
pub fn build_physical_plan(
643
root: Node,
644
ir_arena: &mut Arena<IR>,
645
expr_arena: &mut Arena<AExpr>,
646
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
647
ctx: StreamingLowerIRContext,
648
) -> PolarsResult<PhysNodeKey> {
649
let mut schema_cache = PlHashMap::with_capacity(ir_arena.len());
650
let mut expr_cache = ExprCache::with_capacity(expr_arena.len());
651
let mut cache_nodes = PlHashMap::new();
652
let phys_root = lower_ir::lower_ir(
653
root,
654
ir_arena,
655
expr_arena,
656
phys_sm,
657
&mut schema_cache,
658
&mut expr_cache,
659
&mut cache_nodes,
660
ctx,
661
None,
662
)?;
663
insert_multiplexers(vec![phys_root.node], phys_sm);
664
Ok(phys_root.node)
665
}
666
667