Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/ir/visualization/mod.rs
7889 views
1
use std::collections::VecDeque;
2
3
use polars_core::prelude::{PlHashMap, SortMultipleOptions};
4
use polars_ops::frame::{JoinArgs, JoinType};
5
use polars_utils::arena::{Arena, Node};
6
use polars_utils::format_pl_smallstr;
7
use polars_utils::pl_str::PlSmallStr;
8
use polars_utils::unique_id::UniqueId;
9
10
use crate::dsl::{
11
GroupbyOptions, HConcatOptions, JoinOptionsIR, JoinTypeOptionsIR, UnifiedScanArgs, UnionOptions,
12
};
13
use crate::plans::visualization::models::{Edge, IRNodeProperties};
14
use crate::plans::{AExpr, ExprIR, FileInfo, IR};
15
use crate::prelude::{DistinctOptionsIR, ProjectionOptions};
16
17
pub mod models;
18
use models::{IRNodeInfo, IRVisualizationData};
19
20
pub fn generate_visualization_data<'a>(
21
title: PlSmallStr,
22
roots: &[Node],
23
ir_arena: &'a Arena<IR>,
24
expr_arena: &'a Arena<AExpr>,
25
) -> IRVisualizationData {
26
let (nodes_list, edges) = IRVisualizationDataGenerator {
27
ir_arena,
28
expr_arena,
29
queue: VecDeque::from_iter(roots.iter().copied()),
30
nodes_list: vec![],
31
edges: vec![],
32
cache_node_to_position: Default::default(),
33
}
34
.generate();
35
36
IRVisualizationData {
37
title,
38
num_roots: roots.len().try_into().unwrap(),
39
nodes: nodes_list,
40
edges,
41
}
42
}
43
44
struct IRVisualizationDataGenerator<'a> {
45
ir_arena: &'a Arena<IR>,
46
expr_arena: &'a Arena<AExpr>,
47
queue: VecDeque<Node>,
48
nodes_list: Vec<IRNodeInfo>,
49
edges: Vec<Edge>,
50
/// During traversal we will encounter the same cache ID multiple times, but we only want
51
/// to push a single entry per cache ID.
52
cache_node_to_position: PlHashMap<UniqueId, usize>,
53
}
54
55
impl IRVisualizationDataGenerator<'_> {
56
fn generate(mut self) -> (Vec<IRNodeInfo>, Vec<Edge>) {
57
// Use a queue to traverse in insertion order - the ID assignment relies on this.
58
while let Some(node) = self.queue.pop_front() {
59
let ir = self.ir_arena.get(node);
60
let mut ir_node_info = self.get_ir_node_info(ir);
61
let current_node_position: u64 = self.nodes_list.len().try_into().unwrap();
62
ir_node_info.id = current_node_position;
63
64
for input_node in ir.inputs() {
65
// +1 is for the current `ir_node_info` that we haven't inserted yet.
66
let input_node_position = 1 + self.nodes_list.len() + self.queue.len();
67
68
if let IR::Cache { id, input: _ } = self.ir_arena.get(input_node) {
69
if let Some(cache_node_position) = self.cache_node_to_position.get(id) {
70
self.edges
71
.push(Edge::new(current_node_position, *cache_node_position));
72
continue;
73
}
74
75
self.cache_node_to_position.insert(*id, input_node_position);
76
}
77
78
self.queue.push_back(input_node);
79
self.edges
80
.push(Edge::new(current_node_position, input_node_position));
81
}
82
83
self.nodes_list.push(ir_node_info);
84
}
85
86
assert!(self.queue.is_empty());
87
(self.nodes_list, self.edges)
88
}
89
90
fn get_ir_node_info(&self, ir: &IR) -> IRNodeInfo {
91
match ir {
92
IR::Cache { input: _, id } => {
93
let properties = IRNodeProperties::Cache { id: *id };
94
95
IRNodeInfo {
96
title: properties.variant_name(),
97
properties,
98
..Default::default()
99
}
100
},
101
IR::DataFrameScan {
102
df,
103
schema,
104
output_schema: _,
105
} => {
106
let properties = IRNodeProperties::DataFrameScan {
107
n_rows: df.height().try_into().unwrap(),
108
schema_names: schema.iter_names_cloned().collect(),
109
};
110
111
IRNodeInfo {
112
title: properties.variant_name(),
113
properties,
114
..Default::default()
115
}
116
},
117
IR::Distinct {
118
input: _,
119
options:
120
DistinctOptionsIR {
121
subset,
122
maintain_order,
123
keep_strategy,
124
slice,
125
},
126
} => {
127
let properties = IRNodeProperties::Distinct {
128
subset: subset.as_deref().map(|x| x.to_vec()),
129
maintain_order: *maintain_order,
130
keep_strategy: *keep_strategy,
131
slice: convert_opt_slice(slice),
132
};
133
134
IRNodeInfo {
135
title: properties.variant_name(),
136
properties,
137
..Default::default()
138
}
139
},
140
IR::ExtContext {
141
input: _,
142
contexts,
143
schema,
144
} => {
145
let properties = IRNodeProperties::ExtContext {
146
num_contexts: contexts.len().try_into().unwrap(),
147
schema_names: schema.iter_names_cloned().collect(),
148
};
149
150
IRNodeInfo {
151
title: properties.variant_name(),
152
properties,
153
..Default::default()
154
}
155
},
156
IR::Filter {
157
input: _,
158
predicate,
159
} => {
160
let properties = IRNodeProperties::Filter {
161
predicate: format_pl_smallstr!("{}", predicate.display(self.expr_arena)),
162
};
163
164
IRNodeInfo {
165
title: properties.variant_name(),
166
properties,
167
..Default::default()
168
}
169
},
170
IR::GroupBy {
171
input: _,
172
keys,
173
aggs,
174
schema: _,
175
maintain_order,
176
options,
177
apply,
178
} => {
179
let GroupbyOptions {
180
#[cfg(feature = "dynamic_group_by")]
181
dynamic,
182
#[cfg(feature = "dynamic_group_by")]
183
rolling,
184
slice,
185
} = options.as_ref();
186
187
let keys = expr_list(keys, self.expr_arena);
188
let aggs = expr_list(aggs, self.expr_arena);
189
let maintain_order = *maintain_order;
190
let plan_callback = apply.as_ref().map(|x| format_pl_smallstr!("{:?}", x));
191
192
let properties = match () {
193
#[cfg(feature = "dynamic_group_by")]
194
_ if dynamic.is_some() => {
195
let Some(polars_time::DynamicGroupOptions {
196
index_column,
197
every,
198
period,
199
offset,
200
label,
201
include_boundaries,
202
closed_window,
203
start_by,
204
}) = dynamic
205
else {
206
unreachable!()
207
};
208
209
IRNodeProperties::DynamicGroupBy {
210
index_column: index_column.clone(),
211
every: format_pl_smallstr!("{}", every),
212
period: format_pl_smallstr!("{}", period),
213
offset: format_pl_smallstr!("{}", offset),
214
label: *label,
215
include_boundaries: *include_boundaries,
216
closed_window: *closed_window,
217
group_by: keys,
218
start_by: *start_by,
219
plan_callback,
220
}
221
},
222
#[cfg(feature = "dynamic_group_by")]
223
_ if rolling.is_some() => {
224
let Some(polars_time::RollingGroupOptions {
225
index_column,
226
period,
227
offset,
228
closed_window,
229
}) = rolling
230
else {
231
unreachable!()
232
};
233
234
IRNodeProperties::RollingGroupBy {
235
keys,
236
aggs,
237
index_column: index_column.clone(),
238
period: format_pl_smallstr!("{}", period),
239
offset: format_pl_smallstr!("{}", offset),
240
closed_window: *closed_window,
241
slice: convert_opt_slice(slice),
242
plan_callback,
243
}
244
},
245
_ => IRNodeProperties::GroupBy {
246
keys,
247
aggs,
248
maintain_order,
249
slice: convert_opt_slice(slice),
250
plan_callback,
251
},
252
};
253
254
IRNodeInfo {
255
title: properties.variant_name(),
256
properties,
257
..Default::default()
258
}
259
},
260
IR::HConcat {
261
inputs,
262
schema,
263
options: HConcatOptions { parallel, strict },
264
} => {
265
let properties = IRNodeProperties::HConcat {
266
num_inputs: inputs.len().try_into().unwrap(),
267
schema_names: schema.iter_names_cloned().collect(),
268
parallel: *parallel,
269
strict: *strict,
270
};
271
272
IRNodeInfo {
273
title: properties.variant_name(),
274
properties,
275
..Default::default()
276
}
277
},
278
IR::HStack {
279
input: _,
280
exprs,
281
schema: _,
282
options:
283
ProjectionOptions {
284
run_parallel,
285
duplicate_check,
286
should_broadcast,
287
},
288
} => {
289
let properties = IRNodeProperties::HStack {
290
exprs: expr_list(exprs, self.expr_arena),
291
run_parallel: *run_parallel,
292
duplicate_check: *duplicate_check,
293
should_broadcast: *should_broadcast,
294
};
295
IRNodeInfo {
296
title: properties.variant_name(),
297
properties,
298
..Default::default()
299
}
300
},
301
IR::Invalid => {
302
let properties = IRNodeProperties::Invalid;
303
304
IRNodeInfo {
305
title: properties.variant_name(),
306
properties,
307
..Default::default()
308
}
309
},
310
IR::Join {
311
input_left: _,
312
input_right: _,
313
schema: _,
314
left_on,
315
right_on,
316
options,
317
} => {
318
let JoinOptionsIR {
319
allow_parallel,
320
force_parallel,
321
args:
322
JoinArgs {
323
how,
324
validation,
325
suffix,
326
slice,
327
nulls_equal,
328
coalesce,
329
maintain_order,
330
},
331
options,
332
} = options.as_ref();
333
334
let properties = match how {
335
#[cfg(feature = "asof_join")]
336
JoinType::AsOf(asof_options) => {
337
use polars_ops::frame::AsOfOptions;
338
339
#[expect(unused_variables)]
340
let AsOfOptions {
341
strategy,
342
tolerance,
343
tolerance_str,
344
left_by,
345
right_by,
346
allow_eq,
347
check_sortedness,
348
} = asof_options.as_ref();
349
350
assert_eq!(left_on.len(), 1);
351
assert_eq!(right_on.len(), 1);
352
353
IRNodeProperties::AsOfJoin {
354
left_on: format_pl_smallstr!("{}", left_on[0].display(self.expr_arena)),
355
right_on: format_pl_smallstr!(
356
"{}",
357
right_on[0].display(self.expr_arena)
358
),
359
left_by: left_by.clone(),
360
right_by: right_by.clone(),
361
strategy: *strategy,
362
tolerance: tolerance.as_ref().map(|scalar| {
363
[
364
format_pl_smallstr!("{}", scalar.value()),
365
format_pl_smallstr!("{:?}", scalar.dtype()),
366
]
367
}),
368
suffix: suffix.clone(),
369
slice: convert_opt_slice(slice),
370
coalesce: *coalesce,
371
allow_eq: *allow_eq,
372
check_sortedness: *check_sortedness,
373
}
374
},
375
#[cfg(feature = "iejoin")]
376
JoinType::IEJoin => {
377
use polars_ops::frame::IEJoinOptions;
378
379
let Some(JoinTypeOptionsIR::IEJoin(IEJoinOptions {
380
operator1,
381
operator2,
382
})) = options
383
else {
384
unreachable!()
385
};
386
387
IRNodeProperties::IEJoin {
388
left_on: expr_list(left_on, self.expr_arena),
389
right_on: expr_list(right_on, self.expr_arena),
390
inequality_operators: if let Some(operator2) = operator2 {
391
vec![*operator1, *operator2]
392
} else {
393
vec![*operator1]
394
},
395
suffix: suffix.clone(),
396
slice: convert_opt_slice(slice),
397
}
398
},
399
JoinType::Cross => {
400
let predicate: Option<PlSmallStr> = options.as_ref().map(|x| {
401
let JoinTypeOptionsIR::CrossAndFilter { predicate } = x else {
402
panic!("{x:?}")
403
};
404
405
format_pl_smallstr!("{}", predicate.display(self.expr_arena))
406
});
407
408
IRNodeProperties::CrossJoin {
409
maintain_order: *maintain_order,
410
slice: convert_opt_slice(slice),
411
predicate,
412
suffix: suffix.clone(),
413
}
414
},
415
_ => IRNodeProperties::Join {
416
how: format_pl_smallstr!("{}", how),
417
left_on: expr_list(left_on, self.expr_arena),
418
right_on: expr_list(right_on, self.expr_arena),
419
nulls_equal: *nulls_equal,
420
coalesce: *coalesce,
421
maintain_order: *maintain_order,
422
validation: *validation,
423
suffix: suffix.clone(),
424
slice: convert_opt_slice(slice),
425
allow_parallel: *allow_parallel,
426
force_parallel: *force_parallel,
427
},
428
};
429
430
IRNodeInfo {
431
title: properties.variant_name(),
432
properties,
433
..Default::default()
434
}
435
},
436
IR::MapFunction { input: _, function } => {
437
let properties = IRNodeProperties::MapFunction {
438
function: format_pl_smallstr!("{}", function),
439
};
440
441
IRNodeInfo {
442
title: properties.variant_name(),
443
properties,
444
..Default::default()
445
}
446
},
447
IR::Scan {
448
sources,
449
file_info:
450
file_info @ FileInfo {
451
schema: _,
452
reader_schema: _,
453
row_estimation: _,
454
},
455
predicate,
456
predicate_file_skip_applied,
457
scan_type,
458
unified_scan_args,
459
hive_parts,
460
output_schema: _,
461
} => {
462
let UnifiedScanArgs {
463
schema: _,
464
cloud_options: _,
465
hive_options: _,
466
rechunk,
467
cache: _,
468
glob: _,
469
hidden_file_prefix: _,
470
projection,
471
column_mapping,
472
default_values,
473
row_index,
474
pre_slice,
475
cast_columns_policy: _,
476
missing_columns_policy: _,
477
extra_columns_policy: _,
478
include_file_paths,
479
deletion_files,
480
table_statistics,
481
row_count: _,
482
} = unified_scan_args.as_ref();
483
484
let file_columns: Option<Vec<PlSmallStr>> =
485
file_info.iter_reader_schema_names().map(|iter| {
486
iter.filter(|&name| {
487
!(row_index.as_ref().is_some_and(|ri| name == &ri.name)
488
|| include_file_paths.as_ref().is_some_and(|x| name == x))
489
})
490
.cloned()
491
.collect()
492
});
493
494
let pre_slice = pre_slice
495
.clone()
496
.map(|x| <(i64, usize)>::try_from(x).unwrap());
497
498
let properties = IRNodeProperties::Scan {
499
scan_type: PlSmallStr::from_static(scan_type.as_ref().into()),
500
num_sources: sources.len().try_into().unwrap(),
501
first_source: sources.first().map(|x| x.to_include_path_name().into()),
502
file_columns,
503
projection: projection.as_deref().map(list_str_cloned),
504
row_index_name: row_index.as_ref().map(|ri| ri.name.clone()),
505
row_index_offset: row_index.as_ref().map(|ri| {
506
#[cfg_attr(feature = "bigidx", expect(clippy::useless_conversion))]
507
ri.offset.into()
508
}),
509
pre_slice: convert_opt_slice(&pre_slice),
510
predicate: predicate
511
.as_ref()
512
.map(|e| format_pl_smallstr!("{}", e.display(self.expr_arena))),
513
predicate_file_skip_applied: *predicate_file_skip_applied,
514
has_table_statistics: table_statistics.is_some(),
515
include_file_paths: include_file_paths.clone(),
516
column_mapping_type: column_mapping
517
.as_ref()
518
.map(|x| PlSmallStr::from_static(x.into())),
519
default_values_type: default_values
520
.as_ref()
521
.map(|x| PlSmallStr::from_static(x.into())),
522
deletion_files_type: deletion_files
523
.as_ref()
524
.map(|x| PlSmallStr::from_static(x.into())),
525
rechunk: *rechunk,
526
hive_columns: hive_parts
527
.as_ref()
528
.map(|x| x.df().schema().iter_names_cloned().collect()),
529
};
530
531
IRNodeInfo {
532
title: properties.variant_name(),
533
properties,
534
..Default::default()
535
}
536
},
537
IR::Select {
538
input: _,
539
expr,
540
schema: _,
541
options:
542
ProjectionOptions {
543
run_parallel,
544
duplicate_check,
545
should_broadcast,
546
},
547
} => {
548
let properties = IRNodeProperties::Select {
549
exprs: expr_list(expr, self.expr_arena),
550
run_parallel: *run_parallel,
551
duplicate_check: *duplicate_check,
552
should_broadcast: *should_broadcast,
553
};
554
555
IRNodeInfo {
556
title: properties.variant_name(),
557
properties,
558
..Default::default()
559
}
560
},
561
IR::SimpleProjection { input: _, columns } => {
562
let properties = IRNodeProperties::SimpleProjection {
563
columns: columns.iter_names_cloned().collect(),
564
};
565
566
IRNodeInfo {
567
title: properties.variant_name(),
568
properties,
569
..Default::default()
570
}
571
},
572
IR::Sink { input: _, payload } => {
573
let properties = IRNodeProperties::Sink {
574
payload: format_pl_smallstr!("{:?}", payload),
575
};
576
577
IRNodeInfo {
578
title: properties.variant_name(),
579
properties,
580
..Default::default()
581
}
582
},
583
IR::SinkMultiple { inputs } => {
584
let properties = IRNodeProperties::SinkMultiple {
585
num_inputs: inputs.len().try_into().unwrap(),
586
};
587
588
IRNodeInfo {
589
title: properties.variant_name(),
590
properties,
591
..Default::default()
592
}
593
},
594
IR::Slice {
595
input: _,
596
offset,
597
len,
598
} => {
599
#[cfg_attr(feature = "bigidx", expect(clippy::useless_conversion))]
600
let properties = IRNodeProperties::Slice {
601
offset: (*offset).into(),
602
len: (*len).into(),
603
};
604
605
IRNodeInfo {
606
title: properties.variant_name(),
607
properties,
608
..Default::default()
609
}
610
},
611
IR::Sort {
612
input: _,
613
by_column,
614
slice,
615
sort_options:
616
SortMultipleOptions {
617
descending,
618
nulls_last,
619
multithreaded,
620
maintain_order,
621
limit,
622
},
623
} => {
624
let properties = IRNodeProperties::Sort {
625
by_exprs: expr_list(by_column, self.expr_arena),
626
slice: convert_opt_slice(slice),
627
descending: descending.clone(),
628
nulls_last: nulls_last.clone(),
629
multithreaded: *multithreaded,
630
maintain_order: *maintain_order,
631
#[cfg_attr(feature = "bigidx", expect(clippy::useless_conversion))]
632
limit: limit.map(|x| x.into()),
633
};
634
635
IRNodeInfo {
636
title: properties.variant_name(),
637
properties,
638
..Default::default()
639
}
640
},
641
IR::Union {
642
inputs: _,
643
options:
644
UnionOptions {
645
slice,
646
rows: _,
647
parallel,
648
from_partitioned_ds,
649
flattened_by_opt,
650
rechunk,
651
maintain_order,
652
},
653
} => {
654
let properties = IRNodeProperties::Union {
655
maintain_order: *maintain_order,
656
parallel: *parallel,
657
rechunk: *rechunk,
658
slice: convert_opt_slice(slice),
659
from_partitioned_ds: *from_partitioned_ds,
660
flattened_by_opt: *flattened_by_opt,
661
};
662
663
IRNodeInfo {
664
title: properties.variant_name(),
665
properties,
666
..Default::default()
667
}
668
},
669
#[cfg(feature = "merge_sorted")]
670
IR::MergeSorted {
671
input_left: _,
672
input_right: _,
673
key,
674
} => {
675
let properties = IRNodeProperties::MergeSorted { key: key.clone() };
676
677
IRNodeInfo {
678
title: properties.variant_name(),
679
properties,
680
..Default::default()
681
}
682
},
683
#[cfg(feature = "python")]
684
IR::PythonScan {
685
options:
686
crate::plans::PythonOptions {
687
scan_fn: _,
688
schema,
689
output_schema: _,
690
with_columns,
691
python_source,
692
n_rows,
693
predicate,
694
validate_schema,
695
is_pure,
696
},
697
} => {
698
use crate::plans::PythonPredicate;
699
700
let properties = IRNodeProperties::PythonScan {
701
scan_source_type: python_source.clone(),
702
n_rows: n_rows.map(|x| x.try_into().unwrap()),
703
projection: with_columns.as_deref().map(list_str_cloned),
704
predicate: match predicate {
705
PythonPredicate::None => None,
706
PythonPredicate::PyArrow(s) => Some(s.into()),
707
PythonPredicate::Polars(p) => {
708
Some(format_pl_smallstr!("{}", p.display(self.expr_arena)))
709
},
710
},
711
schema_names: schema.iter_names_cloned().collect(),
712
is_pure: *is_pure,
713
validate_schema: *validate_schema,
714
};
715
716
IRNodeInfo {
717
title: properties.variant_name(),
718
properties,
719
..Default::default()
720
}
721
},
722
}
723
}
724
}
725
726
impl IRNodeProperties {
727
fn variant_name(&self) -> PlSmallStr {
728
PlSmallStr::from_static(<&'static str>::from(self))
729
}
730
}
731
732
fn list_str_cloned<I, T>(iter: I) -> Vec<PlSmallStr>
733
where
734
I: IntoIterator<Item = T>,
735
T: AsRef<str>,
736
{
737
iter.into_iter()
738
.map(|x| PlSmallStr::from_str(x.as_ref()))
739
.collect()
740
}
741
742
fn convert_opt_slice<T, U>(slice: &Option<(T, U)>) -> Option<(i64, u64)>
743
where
744
T: Copy + TryInto<i64>,
745
U: Copy + TryInto<u64>,
746
<T as TryInto<i64>>::Error: std::fmt::Debug,
747
<U as TryInto<u64>>::Error: std::fmt::Debug,
748
{
749
slice.map(|(offset, len)| (offset.try_into().unwrap(), len.try_into().unwrap()))
750
}
751
752
fn expr_list(exprs: &[ExprIR], expr_arena: &Arena<AExpr>) -> Vec<PlSmallStr> {
753
exprs
754
.iter()
755
.map(|e| format_pl_smallstr!("{}", e.display(expr_arena)))
756
.collect()
757
}
758
759