Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/physical_plan/visualization/mod.rs
7884 views
1
use std::collections::VecDeque;
2
3
use polars_core::prelude::SortMultipleOptions;
4
use polars_ops::frame::{JoinArgs, JoinType};
5
use polars_plan::dsl::{
6
FileSinkOptions, JoinTypeOptionsIR, PartitionStrategyIR, PartitionVariantIR,
7
PartitionedSinkOptionsIR, SinkOptions, SinkTarget, SortColumnIR, UnifiedSinkArgs,
8
};
9
use polars_plan::plans::expr_ir::ExprIR;
10
use polars_plan::prelude::AExpr;
11
use polars_utils::arena::Arena;
12
use polars_utils::pl_str::PlSmallStr;
13
use polars_utils::{IdxSize, format_pl_smallstr};
14
15
pub mod models;
16
pub use models::{PhysNodeInfo, PhysicalPlanVisualizationData};
17
use slotmap::{SecondaryMap, SlotMap};
18
19
use crate::physical_plan::visualization::models::{Edge, PhysNodeProperties};
20
use crate::physical_plan::{PhysNode, PhysNodeKey, PhysNodeKind};
21
22
pub fn generate_visualization_data(
23
title: PlSmallStr,
24
roots: &[PhysNodeKey],
25
phys_sm: &SlotMap<PhysNodeKey, PhysNode>,
26
expr_arena: &Arena<AExpr>,
27
) -> PhysicalPlanVisualizationData {
28
let (nodes_list, edges) = PhysicalPlanVisualizationDataGenerator {
29
phys_sm,
30
expr_arena,
31
queue: VecDeque::from_iter(roots.iter().copied()),
32
marked_for_visit: SecondaryMap::from_iter(roots.iter().map(|r| (*r, ()))),
33
nodes_list: vec![],
34
edges: vec![],
35
}
36
.generate();
37
38
PhysicalPlanVisualizationData {
39
title,
40
num_roots: roots.len().try_into().unwrap(),
41
nodes: nodes_list,
42
edges,
43
}
44
}
45
46
struct PhysicalPlanVisualizationDataGenerator<'a> {
47
phys_sm: &'a SlotMap<PhysNodeKey, PhysNode>,
48
expr_arena: &'a Arena<AExpr>,
49
queue: VecDeque<PhysNodeKey>,
50
marked_for_visit: SecondaryMap<PhysNodeKey, ()>,
51
nodes_list: Vec<PhysNodeInfo>,
52
edges: Vec<Edge>,
53
}
54
55
impl PhysicalPlanVisualizationDataGenerator<'_> {
56
fn generate(mut self) -> (Vec<PhysNodeInfo>, Vec<Edge>) {
57
let mut node_inputs: Vec<PhysNodeKey> = vec![];
58
59
while let Some(key) = self.queue.pop_front() {
60
let node: &PhysNode = self.phys_sm.get(key).unwrap();
61
let mut phys_node_info = self.get_phys_node_info(node, &mut node_inputs);
62
let current_node_key: u64 = key.0.as_ffi();
63
phys_node_info.id = current_node_key;
64
65
for input_node in node_inputs.drain(..) {
66
self.edges
67
.push(Edge::new(current_node_key, input_node.0.as_ffi()));
68
69
let not_yet_marked = self.marked_for_visit.insert(input_node, ()).is_none();
70
if not_yet_marked {
71
self.queue.push_back(input_node);
72
}
73
}
74
75
self.nodes_list.push(phys_node_info);
76
}
77
78
assert!(self.queue.is_empty());
79
(self.nodes_list, self.edges)
80
}
81
82
fn get_phys_node_info(
83
&self,
84
phys_node: &PhysNode,
85
phys_node_inputs: &mut Vec<PhysNodeKey>,
86
) -> PhysNodeInfo {
87
match phys_node.kind() {
88
PhysNodeKind::CallbackSink {
89
input,
90
function,
91
maintain_order,
92
chunk_size,
93
} => {
94
phys_node_inputs.push(input.node);
95
96
let properties = PhysNodeProperties::CallbackSink {
97
callback_function: format_pl_smallstr!("{:?}", function),
98
maintain_order: *maintain_order,
99
chunk_size: *chunk_size,
100
};
101
102
PhysNodeInfo {
103
title: properties.variant_name(),
104
properties,
105
..Default::default()
106
}
107
},
108
PhysNodeKind::DynamicSlice {
109
input,
110
offset,
111
length,
112
} => {
113
phys_node_inputs.push(input.node);
114
phys_node_inputs.push(offset.node);
115
phys_node_inputs.push(length.node);
116
117
let properties = PhysNodeProperties::DynamicSlice;
118
119
PhysNodeInfo {
120
title: properties.variant_name(),
121
properties,
122
..Default::default()
123
}
124
},
125
PhysNodeKind::FileSink {
126
target,
127
sink_options:
128
SinkOptions {
129
sync_on_close,
130
maintain_order,
131
mkdir,
132
},
133
file_type,
134
input,
135
cloud_options,
136
} => {
137
phys_node_inputs.push(input.node);
138
139
let properties = PhysNodeProperties::FileSink {
140
target: match target {
141
SinkTarget::Path(p) => format_pl_smallstr!("Path({})", p.to_str()),
142
SinkTarget::Dyn(_) => PlSmallStr::from_static("DynWriteable"),
143
},
144
file_format: PlSmallStr::from_static(file_type.into()),
145
sync_on_close: *sync_on_close,
146
maintain_order: *maintain_order,
147
mkdir: *mkdir,
148
cloud_options: cloud_options.is_some(),
149
};
150
151
PhysNodeInfo {
152
title: properties.variant_name(),
153
properties,
154
..Default::default()
155
}
156
},
157
PhysNodeKind::Filter { input, predicate } => {
158
phys_node_inputs.push(input.node);
159
160
let properties = PhysNodeProperties::Filter {
161
predicate: format_pl_smallstr!("{}", predicate.display(self.expr_arena)),
162
};
163
164
PhysNodeInfo {
165
title: properties.variant_name(),
166
properties,
167
..Default::default()
168
}
169
},
170
PhysNodeKind::GatherEvery { input, n, offset } => {
171
phys_node_inputs.push(input.node);
172
173
let properties = PhysNodeProperties::GatherEvery {
174
n: (*n).try_into().unwrap(),
175
offset: (*offset).try_into().unwrap(),
176
};
177
178
PhysNodeInfo {
179
title: properties.variant_name(),
180
properties,
181
..Default::default()
182
}
183
},
184
PhysNodeKind::GroupBy { input, key, aggs } => {
185
phys_node_inputs.push(input.node);
186
187
let properties = PhysNodeProperties::GroupBy {
188
keys: expr_list(key, self.expr_arena),
189
aggs: expr_list(aggs, self.expr_arena),
190
};
191
192
PhysNodeInfo {
193
title: properties.variant_name(),
194
properties,
195
..Default::default()
196
}
197
},
198
#[cfg(feature = "dynamic_group_by")]
199
PhysNodeKind::DynamicGroupBy {
200
input,
201
options,
202
aggs,
203
slice,
204
} => {
205
use polars_time::DynamicGroupOptions;
206
use polars_utils::IdxSize;
207
208
phys_node_inputs.push(input.node);
209
210
let DynamicGroupOptions {
211
index_column,
212
every,
213
period,
214
offset,
215
label,
216
include_boundaries,
217
closed_window,
218
start_by,
219
} = options;
220
221
let properties = PhysNodeProperties::DynamicGroupBy {
222
index_column: index_column.clone(),
223
period: format_pl_smallstr!("{period}"),
224
every: format_pl_smallstr!("{every}"),
225
offset: format_pl_smallstr!("{offset}"),
226
start_by: PlSmallStr::from_static(start_by.into()),
227
label: PlSmallStr::from_static(label.into()),
228
include_boundaries: *include_boundaries,
229
closed_window: PlSmallStr::from_static(closed_window.into()),
230
aggs: expr_list(aggs, self.expr_arena),
231
slice: slice.map(|(o, l)| (IdxSize::into(o), IdxSize::into(l))),
232
};
233
234
PhysNodeInfo {
235
title: properties.variant_name(),
236
properties,
237
..Default::default()
238
}
239
},
240
#[cfg(feature = "dynamic_group_by")]
241
PhysNodeKind::RollingGroupBy {
242
input,
243
index_column,
244
period,
245
offset,
246
closed,
247
slice,
248
aggs,
249
} => {
250
phys_node_inputs.push(input.node);
251
252
let properties = PhysNodeProperties::RollingGroupBy {
253
index_column: index_column.clone(),
254
period: format_pl_smallstr!("{period}"),
255
offset: format_pl_smallstr!("{offset}"),
256
closed_window: PlSmallStr::from_static(closed.into()),
257
slice: slice.map(|(o, l)| (IdxSize::into(o), IdxSize::into(l))),
258
aggs: expr_list(aggs, self.expr_arena),
259
};
260
261
PhysNodeInfo {
262
title: properties.variant_name(),
263
properties,
264
..Default::default()
265
}
266
},
267
PhysNodeKind::SortedGroupBy {
268
input,
269
key,
270
aggs,
271
slice,
272
} => {
273
phys_node_inputs.push(input.node);
274
275
let properties = PhysNodeProperties::SortedGroupBy {
276
key: key.clone(),
277
aggs: expr_list(aggs, self.expr_arena),
278
slice: slice.map(|(o, l)| (IdxSize::into(o), IdxSize::into(l))),
279
};
280
281
PhysNodeInfo {
282
title: properties.variant_name(),
283
properties,
284
..Default::default()
285
}
286
},
287
PhysNodeKind::InMemoryMap {
288
input,
289
map: _, // dyn DataFrameUdf
290
format_str,
291
} => {
292
phys_node_inputs.push(input.node);
293
294
let properties = PhysNodeProperties::InMemoryMap {
295
format_str: format_str.as_deref().map_or(
296
PlSmallStr::from_static(
297
"error: prepare_visualization was not set during conversion",
298
),
299
PlSmallStr::from_str,
300
),
301
};
302
303
PhysNodeInfo {
304
title: properties.variant_name(),
305
properties,
306
..Default::default()
307
}
308
},
309
PhysNodeKind::InMemorySink { input } => {
310
phys_node_inputs.push(input.node);
311
312
let properties = PhysNodeProperties::InMemorySink;
313
314
PhysNodeInfo {
315
title: properties.variant_name(),
316
properties,
317
..Default::default()
318
}
319
},
320
PhysNodeKind::InMemorySource { df } => {
321
let properties = PhysNodeProperties::InMemorySource {
322
n_rows: df.height().try_into().unwrap(),
323
schema_names: df.schema().iter_names_cloned().collect(),
324
};
325
326
PhysNodeInfo {
327
title: properties.variant_name(),
328
properties,
329
..Default::default()
330
}
331
},
332
PhysNodeKind::InputIndependentSelect { selectors } => {
333
let properties = PhysNodeProperties::InputIndependentSelect {
334
selectors: expr_list(selectors, self.expr_arena),
335
};
336
337
PhysNodeInfo {
338
title: properties.variant_name(),
339
properties,
340
..Default::default()
341
}
342
},
343
// Joins
344
PhysNodeKind::CrossJoin {
345
input_left,
346
input_right,
347
args,
348
} => {
349
phys_node_inputs.push(input_left.node);
350
phys_node_inputs.push(input_right.node);
351
352
let JoinArgs {
353
how: _,
354
validation: _,
355
suffix,
356
slice: _,
357
nulls_equal: _,
358
coalesce: _,
359
maintain_order,
360
} = args;
361
362
let properties = PhysNodeProperties::CrossJoin {
363
maintain_order: *maintain_order,
364
suffix: suffix.clone(),
365
};
366
367
PhysNodeInfo {
368
title: properties.variant_name(),
369
properties,
370
..Default::default()
371
}
372
},
373
PhysNodeKind::EquiJoin {
374
input_left,
375
input_right,
376
left_on,
377
right_on,
378
args,
379
} => {
380
phys_node_inputs.push(input_left.node);
381
phys_node_inputs.push(input_right.node);
382
383
let JoinArgs {
384
how,
385
validation,
386
suffix,
387
// Lowers to a separate node
388
slice: _,
389
nulls_equal,
390
coalesce,
391
maintain_order,
392
} = args;
393
394
let properties = PhysNodeProperties::EquiJoin {
395
how: format_pl_smallstr!("{}", how),
396
left_on: expr_list(left_on, self.expr_arena),
397
right_on: expr_list(right_on, self.expr_arena),
398
nulls_equal: *nulls_equal,
399
coalesce: *coalesce,
400
maintain_order: *maintain_order,
401
validation: *validation,
402
suffix: suffix.clone(),
403
};
404
405
PhysNodeInfo {
406
title: properties.variant_name(),
407
properties,
408
..Default::default()
409
}
410
},
411
PhysNodeKind::InMemoryJoin {
412
input_left,
413
input_right,
414
left_on,
415
right_on,
416
args:
417
JoinArgs {
418
how,
419
validation,
420
suffix,
421
slice,
422
nulls_equal,
423
coalesce,
424
maintain_order,
425
},
426
options,
427
} => {
428
phys_node_inputs.push(input_left.node);
429
phys_node_inputs.push(input_right.node);
430
431
let properties = match how {
432
JoinType::AsOf(asof_options) => {
433
use polars_ops::frame::AsOfOptions;
434
435
#[expect(unused_variables)]
436
let AsOfOptions {
437
strategy,
438
tolerance,
439
tolerance_str,
440
left_by,
441
right_by,
442
allow_eq,
443
check_sortedness,
444
} = asof_options.as_ref();
445
446
assert_eq!(left_on.len(), 1);
447
assert_eq!(right_on.len(), 1);
448
449
PhysNodeProperties::InMemoryAsOfJoin {
450
left_on: format_pl_smallstr!("{}", left_on[0].display(self.expr_arena)),
451
right_on: format_pl_smallstr!(
452
"{}",
453
right_on[0].display(self.expr_arena)
454
),
455
left_by: left_by.clone(),
456
right_by: right_by.clone(),
457
strategy: *strategy,
458
tolerance: tolerance.as_ref().map(|scalar| {
459
[
460
format_pl_smallstr!("{}", scalar.value()),
461
format_pl_smallstr!("{:?}", scalar.dtype()),
462
]
463
}),
464
suffix: suffix.clone(),
465
slice: convert_opt_slice(slice),
466
coalesce: *coalesce,
467
allow_eq: *allow_eq,
468
check_sortedness: *check_sortedness,
469
}
470
},
471
JoinType::IEJoin => {
472
use polars_ops::frame::IEJoinOptions;
473
474
let Some(JoinTypeOptionsIR::IEJoin(IEJoinOptions {
475
operator1,
476
operator2,
477
})) = options
478
else {
479
unreachable!()
480
};
481
482
PhysNodeProperties::InMemoryIEJoin {
483
left_on: expr_list(left_on, self.expr_arena),
484
right_on: expr_list(right_on, self.expr_arena),
485
inequality_operators: if let Some(operator2) = operator2 {
486
vec![*operator1, *operator2]
487
} else {
488
vec![*operator1]
489
},
490
suffix: suffix.clone(),
491
slice: convert_opt_slice(slice),
492
}
493
},
494
JoinType::Cross => unreachable!(),
495
_ => PhysNodeProperties::InMemoryJoin {
496
how: format_pl_smallstr!("{}", how),
497
left_on: expr_list(left_on, self.expr_arena),
498
right_on: expr_list(right_on, self.expr_arena),
499
nulls_equal: *nulls_equal,
500
coalesce: *coalesce,
501
maintain_order: *maintain_order,
502
validation: *validation,
503
suffix: suffix.clone(),
504
slice: convert_opt_slice(slice),
505
},
506
};
507
508
PhysNodeInfo {
509
title: properties.variant_name(),
510
properties,
511
..Default::default()
512
}
513
},
514
PhysNodeKind::Map {
515
input,
516
map,
517
format_str,
518
} => {
519
phys_node_inputs.push(input.node);
520
521
let properties = PhysNodeProperties::Map {
522
display_str: map.display_str(),
523
format_str: format_str.as_deref().map_or(
524
PlSmallStr::from_static(
525
"error: prepare_visualization was not set during conversion",
526
),
527
PlSmallStr::from_str,
528
),
529
};
530
531
PhysNodeInfo {
532
title: properties.variant_name(),
533
properties,
534
..Default::default()
535
}
536
},
537
PhysNodeKind::MultiScan {
538
scan_sources,
539
file_reader_builder,
540
cloud_options: _,
541
file_projection_builder,
542
output_schema: _,
543
row_index,
544
pre_slice,
545
predicate,
546
predicate_file_skip_applied,
547
hive_parts,
548
include_file_paths,
549
cast_columns_policy: _,
550
missing_columns_policy: _,
551
forbid_extra_columns: _,
552
deletion_files,
553
table_statistics,
554
file_schema: _,
555
} => {
556
let pre_slice = pre_slice
557
.clone()
558
.map(|x| <(i64, usize)>::try_from(x).unwrap());
559
560
let properties = PhysNodeProperties::MultiScan {
561
scan_type: file_reader_builder.reader_name().into(),
562
num_sources: scan_sources.len().try_into().unwrap(),
563
first_source: scan_sources
564
.first()
565
.map(|x| x.to_include_path_name().into()),
566
projected_file_columns: file_projection_builder
567
.projected_names()
568
.cloned()
569
.collect(),
570
file_projection_builder_type: PlSmallStr::from_static(
571
file_projection_builder.into(),
572
),
573
row_index_name: row_index.as_ref().map(|ri| ri.name.clone()),
574
#[allow(clippy::useless_conversion)]
575
row_index_offset: row_index.as_ref().map(|ri| ri.offset.into()),
576
pre_slice: convert_opt_slice(&pre_slice),
577
predicate: predicate
578
.as_ref()
579
.map(|e| format_pl_smallstr!("{}", e.display(self.expr_arena))),
580
predicate_file_skip_applied: *predicate_file_skip_applied,
581
has_table_statistics: table_statistics.is_some(),
582
include_file_paths: include_file_paths.clone(),
583
deletion_files_type: deletion_files
584
.as_ref()
585
.map(|x| PlSmallStr::from_static(x.into())),
586
hive_columns: hive_parts
587
.as_ref()
588
.map(|x| x.df().schema().iter_names_cloned().collect()),
589
};
590
591
PhysNodeInfo {
592
title: properties.variant_name(),
593
properties,
594
..Default::default()
595
}
596
},
597
PhysNodeKind::Multiplexer { input } => {
598
phys_node_inputs.push(input.node);
599
600
let properties = PhysNodeProperties::Multiplexer;
601
602
PhysNodeInfo {
603
title: properties.variant_name(),
604
properties,
605
..Default::default()
606
}
607
},
608
PhysNodeKind::NegativeSlice {
609
input,
610
offset,
611
length,
612
} => {
613
phys_node_inputs.push(input.node);
614
615
let properties = PhysNodeProperties::NegativeSlice {
616
offset: (*offset),
617
length: (*length).try_into().unwrap(),
618
};
619
620
PhysNodeInfo {
621
title: properties.variant_name(),
622
properties,
623
..Default::default()
624
}
625
},
626
PhysNodeKind::OrderedUnion { inputs } => {
627
for input in inputs {
628
phys_node_inputs.push(input.node);
629
}
630
631
let properties = PhysNodeProperties::OrderedUnion {
632
num_inputs: inputs.len().try_into().unwrap(),
633
};
634
635
PhysNodeInfo {
636
title: properties.variant_name(),
637
properties,
638
..Default::default()
639
}
640
},
641
PhysNodeKind::PartitionedSink {
642
input,
643
base_path,
644
file_path_cb,
645
sink_options:
646
SinkOptions {
647
sync_on_close,
648
maintain_order,
649
mkdir,
650
},
651
variant,
652
file_type,
653
cloud_options: _,
654
per_partition_sort_by,
655
finish_callback,
656
} => {
657
phys_node_inputs.push(input.node);
658
659
let (
660
partition_variant_max_size,
661
partition_variant_key_exprs,
662
partition_variant_include_key,
663
) = match variant {
664
PartitionVariantIR::ByKey {
665
key_exprs,
666
include_key,
667
}
668
| PartitionVariantIR::Parted {
669
key_exprs,
670
include_key,
671
} => (
672
None,
673
Some(expr_list(key_exprs, self.expr_arena)),
674
Some(*include_key),
675
),
676
#[allow(clippy::useless_conversion)]
677
PartitionVariantIR::MaxSize(max_size) => (Some((*max_size).into()), None, None),
678
};
679
680
let (
681
per_partition_sort_exprs,
682
per_partition_sort_descending,
683
per_partition_sort_nulls_last,
684
) = per_partition_sort_by
685
.as_ref()
686
.map_or((None, None, None), |x| {
687
let (a, (b, c)): (Vec<_>, (Vec<_>, Vec<_>)) = x
688
.iter()
689
.map(|x| {
690
(
691
format_pl_smallstr!("{}", x.expr.display(self.expr_arena)),
692
(x.descending, x.nulls_last),
693
)
694
})
695
.unzip();
696
697
(Some(a), Some(b), Some(c))
698
});
699
700
let properties = PhysNodeProperties::PartitionSink {
701
base_path: base_path.to_str().into(),
702
file_path_callback: file_path_cb.as_ref().map(|x| x.display_str()),
703
partition_variant: PlSmallStr::from_static(variant.into()),
704
partition_variant_max_size,
705
partition_variant_key_exprs,
706
partition_variant_include_key,
707
file_type: PlSmallStr::from_static(file_type.into()),
708
per_partition_sort_exprs,
709
per_partition_sort_descending,
710
per_partition_sort_nulls_last,
711
finish_callback: finish_callback.as_ref().map(|x| x.display_str()),
712
sync_on_close: *sync_on_close,
713
maintain_order: *maintain_order,
714
mkdir: *mkdir,
715
};
716
717
PhysNodeInfo {
718
title: properties.variant_name(),
719
properties,
720
..Default::default()
721
}
722
},
723
PhysNodeKind::FileSink2 {
724
input,
725
options:
726
FileSinkOptions {
727
target,
728
file_format,
729
unified_sink_args:
730
UnifiedSinkArgs {
731
mkdir,
732
maintain_order,
733
sync_on_close,
734
cloud_options,
735
},
736
},
737
} => {
738
phys_node_inputs.push(input.node);
739
740
let properties = PhysNodeProperties::FileSink {
741
target: match target {
742
SinkTarget::Path(p) => format_pl_smallstr!("Path({})", p.to_str()),
743
SinkTarget::Dyn(_) => PlSmallStr::from_static("DynWriteable"),
744
},
745
file_format: PlSmallStr::from_static(file_format.as_ref().into()),
746
sync_on_close: *sync_on_close,
747
maintain_order: *maintain_order,
748
mkdir: *mkdir,
749
cloud_options: cloud_options.is_some(),
750
};
751
752
PhysNodeInfo {
753
title: properties.variant_name(),
754
properties,
755
..Default::default()
756
}
757
},
758
PhysNodeKind::PartitionedSink2 {
759
input,
760
options:
761
PartitionedSinkOptionsIR {
762
base_path,
763
file_path_provider,
764
partition_strategy,
765
finish_callback: _,
766
file_format,
767
unified_sink_args:
768
UnifiedSinkArgs {
769
mkdir,
770
maintain_order,
771
sync_on_close,
772
cloud_options,
773
},
774
max_rows_per_file,
775
approximate_bytes_per_file,
776
},
777
} => {
778
phys_node_inputs.push(input.node);
779
780
let mut partition_key_exprs: Option<Vec<PlSmallStr>> = None;
781
let mut include_keys_: Option<bool> = None;
782
let mut per_partition_sort_by_: Option<&[SortColumnIR]> = None;
783
784
match partition_strategy {
785
PartitionStrategyIR::Keyed {
786
keys,
787
include_keys,
788
keys_pre_grouped: _,
789
per_partition_sort_by,
790
} => {
791
partition_key_exprs = Some(expr_list(keys, self.expr_arena));
792
include_keys_ = Some(*include_keys);
793
per_partition_sort_by_ = Some(per_partition_sort_by.as_slice());
794
},
795
PartitionStrategyIR::FileSize => {},
796
}
797
798
let (
799
per_partition_sort_exprs,
800
per_partition_sort_descending,
801
per_partition_sort_nulls_last,
802
) = per_partition_sort_by_
803
.as_ref()
804
.map_or((None, None, None), |x| {
805
let (a, (b, c)): (Vec<_>, (Vec<_>, Vec<_>)) = x
806
.iter()
807
.map(|x| {
808
(
809
format_pl_smallstr!("{}", x.expr.display(self.expr_arena)),
810
(x.descending, x.nulls_last),
811
)
812
})
813
.unzip();
814
815
(Some(a), Some(b), Some(c))
816
});
817
818
let properties = PhysNodeProperties::PartitionSink2 {
819
base_path: base_path.to_str().into(),
820
file_path_provider: file_path_provider.clone(),
821
file_format: PlSmallStr::from_static(file_format.as_ref().into()),
822
partition_strategy: PlSmallStr::from_static(partition_strategy.into()),
823
partition_key_exprs,
824
include_keys: include_keys_,
825
per_partition_sort_exprs,
826
per_partition_sort_descending,
827
per_partition_sort_nulls_last,
828
mkdir: *mkdir,
829
maintain_order: *maintain_order,
830
sync_on_close: *sync_on_close,
831
cloud_options: cloud_options.is_some(),
832
max_rows_per_file: *max_rows_per_file,
833
approximate_bytes_per_file: *approximate_bytes_per_file,
834
};
835
836
PhysNodeInfo {
837
title: properties.variant_name(),
838
properties,
839
..Default::default()
840
}
841
},
842
PhysNodeKind::PeakMinMax { input, is_peak_max } => {
843
phys_node_inputs.push(input.node);
844
845
let properties = if *is_peak_max {
846
PhysNodeProperties::PeakMax
847
} else {
848
PhysNodeProperties::PeakMin
849
};
850
851
PhysNodeInfo {
852
title: properties.variant_name(),
853
properties,
854
..Default::default()
855
}
856
},
857
PhysNodeKind::Reduce { input, exprs } => {
858
phys_node_inputs.push(input.node);
859
860
let properties = PhysNodeProperties::Reduce {
861
exprs: expr_list(exprs, self.expr_arena),
862
};
863
864
PhysNodeInfo {
865
title: properties.variant_name(),
866
properties,
867
..Default::default()
868
}
869
},
870
PhysNodeKind::Repeat { value, repeats } => {
871
phys_node_inputs.push(value.node);
872
phys_node_inputs.push(repeats.node);
873
874
let properties = PhysNodeProperties::Repeat;
875
876
PhysNodeInfo {
877
title: properties.variant_name(),
878
properties,
879
..Default::default()
880
}
881
},
882
PhysNodeKind::Rle(input) => {
883
phys_node_inputs.push(input.node);
884
885
let properties = PhysNodeProperties::Rle;
886
887
PhysNodeInfo {
888
title: properties.variant_name(),
889
properties,
890
..Default::default()
891
}
892
},
893
PhysNodeKind::RleId(input) => {
894
phys_node_inputs.push(input.node);
895
896
let properties = PhysNodeProperties::RleId;
897
898
PhysNodeInfo {
899
title: properties.variant_name(),
900
properties,
901
..Default::default()
902
}
903
},
904
PhysNodeKind::Select {
905
input,
906
selectors,
907
extend_original,
908
} => {
909
phys_node_inputs.push(input.node);
910
911
let properties = PhysNodeProperties::Select {
912
selectors: expr_list(selectors, self.expr_arena),
913
extend_original: *extend_original,
914
};
915
916
PhysNodeInfo {
917
title: properties.variant_name(),
918
properties,
919
..Default::default()
920
}
921
},
922
PhysNodeKind::Shift {
923
input,
924
offset,
925
fill,
926
} => {
927
phys_node_inputs.push(input.node);
928
phys_node_inputs.push(offset.node);
929
930
if let Some(fill) = fill {
931
phys_node_inputs.push(fill.node);
932
}
933
934
let properties = PhysNodeProperties::Shift {
935
has_fill: fill.is_some(),
936
};
937
938
PhysNodeInfo {
939
title: properties.variant_name(),
940
properties,
941
..Default::default()
942
}
943
},
944
PhysNodeKind::SimpleProjection { input, columns } => {
945
phys_node_inputs.push(input.node);
946
947
let properties = PhysNodeProperties::SimpleProjection {
948
columns: columns.clone(),
949
};
950
951
PhysNodeInfo {
952
title: properties.variant_name(),
953
properties,
954
..Default::default()
955
}
956
},
957
PhysNodeKind::SinkMultiple { sinks } => {
958
for node in sinks {
959
phys_node_inputs.push(*node);
960
}
961
962
let properties = PhysNodeProperties::SinkMultiple {
963
num_sinks: sinks.len().try_into().unwrap(),
964
};
965
966
PhysNodeInfo {
967
title: properties.variant_name(),
968
properties,
969
..Default::default()
970
}
971
},
972
PhysNodeKind::Sort {
973
input,
974
by_column,
975
slice,
976
sort_options:
977
SortMultipleOptions {
978
descending,
979
nulls_last,
980
multithreaded,
981
maintain_order,
982
limit,
983
},
984
} => {
985
phys_node_inputs.push(input.node);
986
987
let properties = PhysNodeProperties::Sort {
988
by_exprs: expr_list(by_column, self.expr_arena),
989
slice: convert_opt_slice(slice),
990
descending: descending.clone(),
991
nulls_last: nulls_last.clone(),
992
multithreaded: *multithreaded,
993
maintain_order: *maintain_order,
994
#[allow(clippy::useless_conversion)]
995
limit: limit.map(|x| x.into()),
996
};
997
998
PhysNodeInfo {
999
title: properties.variant_name(),
1000
properties,
1001
..Default::default()
1002
}
1003
},
1004
PhysNodeKind::StreamingSlice {
1005
input,
1006
offset,
1007
length,
1008
} => {
1009
phys_node_inputs.push(input.node);
1010
1011
let properties = PhysNodeProperties::Slice {
1012
offset: (*offset).try_into().unwrap(),
1013
length: (*length).try_into().unwrap(),
1014
};
1015
1016
PhysNodeInfo {
1017
title: properties.variant_name(),
1018
properties,
1019
..Default::default()
1020
}
1021
},
1022
PhysNodeKind::TopK {
1023
input,
1024
k,
1025
by_column,
1026
reverse,
1027
nulls_last,
1028
} => {
1029
phys_node_inputs.push(input.node);
1030
phys_node_inputs.push(k.node);
1031
1032
let properties = PhysNodeProperties::TopK {
1033
by_exprs: expr_list(by_column, self.expr_arena),
1034
reverse: reverse.clone(),
1035
nulls_last: nulls_last.clone(),
1036
};
1037
1038
PhysNodeInfo {
1039
title: properties.variant_name(),
1040
properties,
1041
..Default::default()
1042
}
1043
},
1044
PhysNodeKind::WithRowIndex {
1045
input,
1046
name,
1047
offset,
1048
} => {
1049
phys_node_inputs.push(input.node);
1050
1051
let properties = PhysNodeProperties::WithRowIndex {
1052
name: name.clone(),
1053
#[allow(clippy::useless_conversion)]
1054
offset: offset.map(|x| x.into()),
1055
};
1056
1057
PhysNodeInfo {
1058
title: properties.variant_name(),
1059
properties,
1060
..Default::default()
1061
}
1062
},
1063
PhysNodeKind::Zip {
1064
inputs,
1065
zip_behavior,
1066
} => {
1067
for input in inputs {
1068
phys_node_inputs.push(input.node);
1069
}
1070
1071
let properties = PhysNodeProperties::Zip {
1072
num_inputs: inputs.len().try_into().unwrap(),
1073
zip_behavior: *zip_behavior,
1074
};
1075
1076
PhysNodeInfo {
1077
title: properties.variant_name(),
1078
properties,
1079
..Default::default()
1080
}
1081
},
1082
#[cfg(feature = "cum_agg")]
1083
PhysNodeKind::CumAgg { input, kind } => {
1084
phys_node_inputs.push(input.node);
1085
1086
let properties = PhysNodeProperties::CumAgg {
1087
kind: format_pl_smallstr!("{:?}", kind),
1088
};
1089
1090
PhysNodeInfo {
1091
title: properties.variant_name(),
1092
properties,
1093
..Default::default()
1094
}
1095
},
1096
#[cfg(feature = "ewma")]
1097
PhysNodeKind::EwmMean { input, options }
1098
| PhysNodeKind::EwmVar { input, options }
1099
| PhysNodeKind::EwmStd { input, options } => {
1100
phys_node_inputs.push(input.node);
1101
1102
let polars_ops::series::EWMOptions {
1103
alpha,
1104
adjust,
1105
bias,
1106
min_periods,
1107
ignore_nulls,
1108
} = options;
1109
1110
let properties = PhysNodeProperties::Ewm {
1111
variant: PlSmallStr::from_static(phys_node.kind().into()),
1112
alpha: *alpha,
1113
adjust: *adjust,
1114
bias: *bias,
1115
min_periods: *min_periods,
1116
ignore_nulls: *ignore_nulls,
1117
};
1118
1119
PhysNodeInfo {
1120
title: properties.variant_name(),
1121
properties,
1122
..Default::default()
1123
}
1124
},
1125
#[cfg(feature = "semi_anti_join")]
1126
PhysNodeKind::SemiAntiJoin {
1127
input_left,
1128
input_right,
1129
left_on,
1130
right_on,
1131
args,
1132
output_bool,
1133
} => {
1134
phys_node_inputs.push(input_left.node);
1135
phys_node_inputs.push(input_right.node);
1136
1137
let properties = PhysNodeProperties::SemiAntiJoin {
1138
left_on: expr_list(left_on, self.expr_arena),
1139
right_on: expr_list(right_on, self.expr_arena),
1140
nulls_equal: args.nulls_equal,
1141
output_as_bool: *output_bool,
1142
};
1143
1144
PhysNodeInfo {
1145
title: properties.variant_name(),
1146
properties,
1147
..Default::default()
1148
}
1149
},
1150
#[cfg(feature = "merge_sorted")]
1151
PhysNodeKind::MergeSorted {
1152
input_left,
1153
input_right,
1154
} => {
1155
phys_node_inputs.push(input_left.node);
1156
phys_node_inputs.push(input_right.node);
1157
1158
let properties = PhysNodeProperties::MergeSorted;
1159
1160
PhysNodeInfo {
1161
title: properties.variant_name(),
1162
properties,
1163
..Default::default()
1164
}
1165
},
1166
#[cfg(feature = "python")]
1167
PhysNodeKind::PythonScan {
1168
options:
1169
polars_plan::plans::PythonOptions {
1170
scan_fn: _,
1171
schema,
1172
output_schema: _,
1173
with_columns,
1174
python_source,
1175
n_rows,
1176
predicate,
1177
validate_schema,
1178
is_pure,
1179
},
1180
} => {
1181
use polars_plan::plans::PythonPredicate;
1182
1183
let properties = PhysNodeProperties::PythonScan {
1184
scan_source_type: python_source.clone(),
1185
n_rows: n_rows.map(|x| x.try_into().unwrap()),
1186
projection: with_columns.as_deref().map(list_str_cloned),
1187
predicate: match predicate {
1188
PythonPredicate::None => None,
1189
PythonPredicate::PyArrow(s) => Some(s.into()),
1190
PythonPredicate::Polars(p) => {
1191
Some(format_pl_smallstr!("{}", p.display(self.expr_arena)))
1192
},
1193
},
1194
schema_names: schema.iter_names_cloned().collect(),
1195
is_pure: *is_pure,
1196
validate_schema: *validate_schema,
1197
};
1198
1199
PhysNodeInfo {
1200
title: properties.variant_name(),
1201
properties,
1202
..Default::default()
1203
}
1204
},
1205
}
1206
}
1207
}
1208
1209
impl PhysNodeProperties {
1210
fn variant_name(&self) -> PlSmallStr {
1211
PlSmallStr::from_static(<&'static str>::from(self))
1212
}
1213
}
1214
1215
fn list_str_cloned<I, T>(iter: I) -> Vec<PlSmallStr>
1216
where
1217
I: IntoIterator<Item = T>,
1218
T: AsRef<str>,
1219
{
1220
iter.into_iter()
1221
.map(|x| PlSmallStr::from_str(x.as_ref()))
1222
.collect()
1223
}
1224
1225
fn convert_opt_slice<T, U>(slice: &Option<(T, U)>) -> Option<(i64, u64)>
1226
where
1227
T: Copy + TryInto<i64>,
1228
U: Copy + TryInto<u64>,
1229
<T as TryInto<i64>>::Error: std::fmt::Debug,
1230
<U as TryInto<u64>>::Error: std::fmt::Debug,
1231
{
1232
slice.map(|(offset, len)| (offset.try_into().unwrap(), len.try_into().unwrap()))
1233
}
1234
1235
fn expr_list(exprs: &[ExprIR], expr_arena: &Arena<AExpr>) -> Vec<PlSmallStr> {
1236
exprs
1237
.iter()
1238
.map(|e| format_pl_smallstr!("{}", e.display(expr_arena)))
1239
.collect()
1240
}
1241
1242