Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/physical_plan/mod.rs
6939 views
1
use std::sync::Arc;
2
3
use polars_core::frame::DataFrame;
4
use polars_core::prelude::{IdxSize, InitHashMaps, PlHashMap, SortMultipleOptions};
5
use polars_core::schema::{Schema, SchemaRef};
6
use polars_error::PolarsResult;
7
use polars_io::RowIndex;
8
use polars_io::cloud::CloudOptions;
9
use polars_ops::frame::JoinArgs;
10
use polars_plan::dsl::deletion::DeletionFilesList;
11
use polars_plan::dsl::{
12
CastColumnsPolicy, JoinTypeOptionsIR, MissingColumnsPolicy, PartitionTargetCallback,
13
PartitionVariantIR, ScanSources, SinkFinishCallback, SinkOptions, SinkTarget, SortColumnIR,
14
};
15
use polars_plan::plans::hive::HivePartitionsDf;
16
use polars_plan::plans::{AExpr, DataFrameUdf, IR};
17
use polars_plan::prelude::expr_ir::ExprIR;
18
19
mod fmt;
20
mod io;
21
mod lower_expr;
22
mod lower_group_by;
23
mod lower_ir;
24
mod to_graph;
25
26
pub use fmt::visualize_plan;
27
use polars_plan::prelude::FileType;
28
use polars_utils::arena::{Arena, Node};
29
use polars_utils::pl_str::PlSmallStr;
30
use polars_utils::plpath::PlPath;
31
use polars_utils::slice_enum::Slice;
32
use slotmap::{SecondaryMap, SlotMap};
33
pub use to_graph::physical_plan_to_graph;
34
35
pub use self::lower_ir::StreamingLowerIRContext;
36
use crate::nodes::io_sources::multi_scan::components::forbid_extra_columns::ForbidExtraColumns;
37
use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;
38
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
39
use crate::physical_plan::lower_expr::ExprCache;
40
41
slotmap::new_key_type! {
42
/// Key used for physical nodes.
43
pub struct PhysNodeKey;
44
}
45
46
/// A node in the physical plan.
47
///
48
/// A physical plan is created when the `IR` is translated to a directed
49
/// acyclic graph of operations that can run on the streaming engine.
50
#[derive(Clone, Debug)]
51
pub struct PhysNode {
52
output_schema: Arc<Schema>,
53
kind: PhysNodeKind,
54
}
55
56
impl PhysNode {
57
pub fn new(output_schema: Arc<Schema>, kind: PhysNodeKind) -> Self {
58
Self {
59
output_schema,
60
kind,
61
}
62
}
63
64
pub fn kind(&self) -> &PhysNodeKind {
65
&self.kind
66
}
67
}
68
69
/// A handle representing a physical stream of data with a fixed schema in the
70
/// physical plan. It consists of a reference to a physical node as well as the
71
/// output port on that node to connect to receive the stream.
72
#[derive(Clone, Debug, Copy, PartialEq, Eq, Hash)]
73
pub struct PhysStream {
74
pub node: PhysNodeKey,
75
pub port: usize,
76
}
77
78
impl PhysStream {
79
#[expect(unused)]
80
pub fn new(node: PhysNodeKey, port: usize) -> Self {
81
Self { node, port }
82
}
83
84
// Convenience method to refer to the first output port of a physical node.
85
pub fn first(node: PhysNodeKey) -> Self {
86
Self { node, port: 0 }
87
}
88
}
89
90
#[derive(Clone, Debug)]
91
pub enum PhysNodeKind {
92
InMemorySource {
93
df: Arc<DataFrame>,
94
},
95
96
Select {
97
input: PhysStream,
98
selectors: Vec<ExprIR>,
99
extend_original: bool,
100
},
101
102
InputIndependentSelect {
103
selectors: Vec<ExprIR>,
104
},
105
106
WithRowIndex {
107
input: PhysStream,
108
name: PlSmallStr,
109
offset: Option<IdxSize>,
110
},
111
112
Reduce {
113
input: PhysStream,
114
exprs: Vec<ExprIR>,
115
},
116
117
StreamingSlice {
118
input: PhysStream,
119
offset: usize,
120
length: usize,
121
},
122
123
NegativeSlice {
124
input: PhysStream,
125
offset: i64,
126
length: usize,
127
},
128
129
DynamicSlice {
130
input: PhysStream,
131
offset: PhysStream,
132
length: PhysStream,
133
},
134
135
Shift {
136
input: PhysStream,
137
offset: PhysStream,
138
fill: Option<PhysStream>,
139
},
140
141
Filter {
142
input: PhysStream,
143
predicate: ExprIR,
144
},
145
146
SimpleProjection {
147
input: PhysStream,
148
columns: Vec<PlSmallStr>,
149
},
150
151
InMemorySink {
152
input: PhysStream,
153
},
154
155
FileSink {
156
target: SinkTarget,
157
sink_options: SinkOptions,
158
file_type: FileType,
159
input: PhysStream,
160
cloud_options: Option<CloudOptions>,
161
},
162
163
PartitionSink {
164
input: PhysStream,
165
base_path: Arc<PlPath>,
166
file_path_cb: Option<PartitionTargetCallback>,
167
sink_options: SinkOptions,
168
variant: PartitionVariantIR,
169
file_type: FileType,
170
cloud_options: Option<CloudOptions>,
171
per_partition_sort_by: Option<Vec<SortColumnIR>>,
172
finish_callback: Option<SinkFinishCallback>,
173
},
174
175
SinkMultiple {
176
sinks: Vec<PhysNodeKey>,
177
},
178
179
/// Generic fallback for (as-of-yet) unsupported streaming mappings.
180
/// Fully sinks all data to an in-memory data frame and uses the in-memory
181
/// engine to perform the map.
182
InMemoryMap {
183
input: PhysStream,
184
map: Arc<dyn DataFrameUdf>,
185
186
/// A formatted explain of what the in-memory map. This usually calls format on the IR.
187
format_str: Option<String>,
188
},
189
190
Map {
191
input: PhysStream,
192
map: Arc<dyn DataFrameUdf>,
193
},
194
195
Sort {
196
input: PhysStream,
197
by_column: Vec<ExprIR>,
198
slice: Option<(i64, usize)>,
199
sort_options: SortMultipleOptions,
200
},
201
202
TopK {
203
input: PhysStream,
204
k: PhysStream,
205
by_column: Vec<ExprIR>,
206
reverse: Vec<bool>,
207
nulls_last: Vec<bool>,
208
},
209
210
Repeat {
211
value: PhysStream,
212
repeats: PhysStream,
213
},
214
215
#[cfg(feature = "cum_agg")]
216
CumAgg {
217
input: PhysStream,
218
kind: crate::nodes::cum_agg::CumAggKind,
219
},
220
221
// Parameter is the input stream
222
Rle(PhysStream),
223
RleId(PhysStream),
224
PeakMinMax {
225
input: PhysStream,
226
is_peak_max: bool,
227
},
228
229
OrderedUnion {
230
inputs: Vec<PhysStream>,
231
},
232
233
Zip {
234
inputs: Vec<PhysStream>,
235
/// If true shorter inputs are extended with nulls to the longest input,
236
/// if false all inputs must be the same length, or have length 1 in
237
/// which case they are broadcast.
238
null_extend: bool,
239
},
240
241
#[allow(unused)]
242
Multiplexer {
243
input: PhysStream,
244
},
245
246
MultiScan {
247
scan_sources: ScanSources,
248
249
file_reader_builder: Arc<dyn FileReaderBuilder>,
250
cloud_options: Option<Arc<CloudOptions>>,
251
252
/// Columns to project from the file.
253
file_projection_builder: ProjectionBuilder,
254
/// Final output schema of morsels being sent out of MultiScan.
255
output_schema: SchemaRef,
256
257
row_index: Option<RowIndex>,
258
pre_slice: Option<Slice>,
259
predicate: Option<ExprIR>,
260
261
hive_parts: Option<HivePartitionsDf>,
262
include_file_paths: Option<PlSmallStr>,
263
cast_columns_policy: CastColumnsPolicy,
264
missing_columns_policy: MissingColumnsPolicy,
265
forbid_extra_columns: Option<ForbidExtraColumns>,
266
267
deletion_files: Option<DeletionFilesList>,
268
269
/// Schema of columns contained in the file. Does not contain external columns (e.g. hive / row_index).
270
file_schema: SchemaRef,
271
},
272
273
#[cfg(feature = "python")]
274
PythonScan {
275
options: polars_plan::plans::python::PythonOptions,
276
},
277
278
GroupBy {
279
input: PhysStream,
280
key: Vec<ExprIR>,
281
// Must be a 'simple' expression, a singular column feeding into a single aggregate, or Len.
282
aggs: Vec<ExprIR>,
283
},
284
285
EquiJoin {
286
input_left: PhysStream,
287
input_right: PhysStream,
288
left_on: Vec<ExprIR>,
289
right_on: Vec<ExprIR>,
290
args: JoinArgs,
291
},
292
293
SemiAntiJoin {
294
input_left: PhysStream,
295
input_right: PhysStream,
296
left_on: Vec<ExprIR>,
297
right_on: Vec<ExprIR>,
298
args: JoinArgs,
299
output_bool: bool,
300
},
301
302
CrossJoin {
303
input_left: PhysStream,
304
input_right: PhysStream,
305
args: JoinArgs,
306
},
307
308
/// Generic fallback for (as-of-yet) unsupported streaming joins.
309
/// Fully sinks all data to in-memory data frames and uses the in-memory
310
/// engine to perform the join.
311
InMemoryJoin {
312
input_left: PhysStream,
313
input_right: PhysStream,
314
left_on: Vec<ExprIR>,
315
right_on: Vec<ExprIR>,
316
args: JoinArgs,
317
options: Option<JoinTypeOptionsIR>,
318
},
319
320
#[cfg(feature = "merge_sorted")]
321
MergeSorted {
322
input_left: PhysStream,
323
input_right: PhysStream,
324
},
325
}
326
327
fn visit_node_inputs_mut(
328
roots: Vec<PhysNodeKey>,
329
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
330
mut visit: impl FnMut(&mut PhysStream),
331
) {
332
let mut to_visit = roots;
333
let mut seen: SecondaryMap<PhysNodeKey, ()> =
334
to_visit.iter().copied().map(|n| (n, ())).collect();
335
macro_rules! rec {
336
($n:expr) => {
337
let n = $n;
338
if seen.insert(n, ()).is_none() {
339
to_visit.push(n)
340
}
341
};
342
}
343
while let Some(node) = to_visit.pop() {
344
match &mut phys_sm[node].kind {
345
PhysNodeKind::InMemorySource { .. }
346
| PhysNodeKind::MultiScan { .. }
347
| PhysNodeKind::InputIndependentSelect { .. } => {},
348
#[cfg(feature = "python")]
349
PhysNodeKind::PythonScan { .. } => {},
350
PhysNodeKind::Select { input, .. }
351
| PhysNodeKind::WithRowIndex { input, .. }
352
| PhysNodeKind::Reduce { input, .. }
353
| PhysNodeKind::StreamingSlice { input, .. }
354
| PhysNodeKind::NegativeSlice { input, .. }
355
| PhysNodeKind::Filter { input, .. }
356
| PhysNodeKind::SimpleProjection { input, .. }
357
| PhysNodeKind::InMemorySink { input }
358
| PhysNodeKind::FileSink { input, .. }
359
| PhysNodeKind::PartitionSink { input, .. }
360
| PhysNodeKind::InMemoryMap { input, .. }
361
| PhysNodeKind::Map { input, .. }
362
| PhysNodeKind::Sort { input, .. }
363
| PhysNodeKind::Multiplexer { input }
364
| PhysNodeKind::Rle(input)
365
| PhysNodeKind::RleId(input)
366
| PhysNodeKind::PeakMinMax { input, .. }
367
| PhysNodeKind::GroupBy { input, .. } => {
368
rec!(input.node);
369
visit(input);
370
},
371
372
#[cfg(feature = "cum_agg")]
373
PhysNodeKind::CumAgg { input, .. } => {
374
rec!(input.node);
375
visit(input);
376
},
377
378
PhysNodeKind::InMemoryJoin {
379
input_left,
380
input_right,
381
..
382
}
383
| PhysNodeKind::EquiJoin {
384
input_left,
385
input_right,
386
..
387
}
388
| PhysNodeKind::SemiAntiJoin {
389
input_left,
390
input_right,
391
..
392
}
393
| PhysNodeKind::CrossJoin {
394
input_left,
395
input_right,
396
..
397
} => {
398
rec!(input_left.node);
399
rec!(input_right.node);
400
visit(input_left);
401
visit(input_right);
402
},
403
404
#[cfg(feature = "merge_sorted")]
405
PhysNodeKind::MergeSorted {
406
input_left,
407
input_right,
408
..
409
} => {
410
rec!(input_left.node);
411
rec!(input_right.node);
412
visit(input_left);
413
visit(input_right);
414
},
415
416
PhysNodeKind::TopK { input, k, .. } => {
417
rec!(input.node);
418
rec!(k.node);
419
visit(input);
420
visit(k);
421
},
422
423
PhysNodeKind::DynamicSlice {
424
input,
425
offset,
426
length,
427
} => {
428
rec!(input.node);
429
rec!(offset.node);
430
rec!(length.node);
431
visit(input);
432
visit(offset);
433
visit(length);
434
},
435
436
PhysNodeKind::Shift {
437
input,
438
offset,
439
fill,
440
} => {
441
rec!(input.node);
442
rec!(offset.node);
443
if let Some(fill) = fill {
444
rec!(fill.node);
445
}
446
visit(input);
447
visit(offset);
448
if let Some(fill) = fill {
449
visit(fill);
450
}
451
},
452
453
PhysNodeKind::Repeat { value, repeats } => {
454
rec!(value.node);
455
rec!(repeats.node);
456
visit(value);
457
visit(repeats);
458
},
459
460
PhysNodeKind::OrderedUnion { inputs } | PhysNodeKind::Zip { inputs, .. } => {
461
for input in inputs {
462
rec!(input.node);
463
visit(input);
464
}
465
},
466
467
PhysNodeKind::SinkMultiple { sinks } => {
468
for sink in sinks {
469
rec!(*sink);
470
visit(&mut PhysStream::first(*sink));
471
}
472
},
473
}
474
}
475
}
476
477
fn insert_multiplexers(roots: Vec<PhysNodeKey>, phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>) {
478
let mut refcount = PlHashMap::new();
479
visit_node_inputs_mut(roots.clone(), phys_sm, |i| {
480
*refcount.entry(*i).or_insert(0) += 1;
481
});
482
483
let mut multiplexer_map: PlHashMap<PhysStream, PhysStream> = refcount
484
.into_iter()
485
.filter(|(_stream, refcount)| *refcount > 1)
486
.map(|(stream, _refcount)| {
487
let input_schema = phys_sm[stream.node].output_schema.clone();
488
let multiplexer_node = phys_sm.insert(PhysNode::new(
489
input_schema,
490
PhysNodeKind::Multiplexer { input: stream },
491
));
492
(stream, PhysStream::first(multiplexer_node))
493
})
494
.collect();
495
496
visit_node_inputs_mut(roots, phys_sm, |i| {
497
if let Some(m) = multiplexer_map.get_mut(i) {
498
*i = *m;
499
m.port += 1;
500
}
501
});
502
}
503
504
pub fn build_physical_plan(
505
root: Node,
506
ir_arena: &mut Arena<IR>,
507
expr_arena: &mut Arena<AExpr>,
508
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
509
ctx: StreamingLowerIRContext,
510
) -> PolarsResult<PhysNodeKey> {
511
let mut schema_cache = PlHashMap::with_capacity(ir_arena.len());
512
let mut expr_cache = ExprCache::with_capacity(expr_arena.len());
513
let mut cache_nodes = PlHashMap::new();
514
let phys_root = lower_ir::lower_ir(
515
root,
516
ir_arena,
517
expr_arena,
518
phys_sm,
519
&mut schema_cache,
520
&mut expr_cache,
521
&mut cache_nodes,
522
ctx,
523
)?;
524
insert_multiplexers(vec![phys_root.node], phys_sm);
525
Ok(phys_root.node)
526
}
527
528