Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/physical_plan/fmt.rs
8460 views
1
use std::fmt::Write;
2
3
use polars_plan::dsl::PartitionStrategyIR;
4
use polars_plan::plans::expr_ir::ExprIR;
5
use polars_plan::plans::{AExpr, EscapeLabel};
6
use polars_plan::prelude::FileWriteFormat;
7
use polars_time::ClosedWindow;
8
#[cfg(feature = "dynamic_group_by")]
9
use polars_time::DynamicGroupOptions;
10
use polars_utils::arena::Arena;
11
use polars_utils::slice_enum::Slice;
12
use slotmap::{Key, SecondaryMap, SlotMap};
13
14
use super::{PhysNode, PhysNodeKey, PhysNodeKind};
15
use crate::physical_plan::ZipBehavior;
16
17
/// A style of a graph node.
18
pub enum NodeStyle {
19
InMemoryFallback,
20
MemoryIntensive,
21
Generic,
22
}
23
24
impl NodeStyle {
25
const COLOR_IN_MEM_FALLBACK: &str = "0.0 0.3 1.0"; // Pastel red
26
const COLOR_MEM_INTENSIVE: &str = "0.16 0.3 1.0"; // Pastel yellow
27
28
/// Returns a style for a node kind.
29
pub fn for_node_kind(kind: &PhysNodeKind) -> Self {
30
use PhysNodeKind as K;
31
match kind {
32
K::InMemoryMap { .. } | K::InMemoryJoin { .. } => Self::InMemoryFallback,
33
K::InMemorySource { .. }
34
| K::InputIndependentSelect { .. }
35
| K::NegativeSlice { .. }
36
| K::InMemorySink { .. }
37
| K::Sort { .. }
38
| K::GroupBy { .. }
39
| K::EquiJoin { .. }
40
| K::SemiAntiJoin { .. }
41
| K::Multiplexer { .. } => Self::MemoryIntensive,
42
#[cfg(feature = "merge_sorted")]
43
K::MergeSorted { .. } => Self::MemoryIntensive,
44
_ => Self::Generic,
45
}
46
}
47
48
/// Returns extra styling attributes (if any) for the graph node.
49
pub fn node_attrs(&self) -> Option<String> {
50
match self {
51
Self::InMemoryFallback => Some(format!(
52
"style=filled,fillcolor=\"{}\"",
53
Self::COLOR_IN_MEM_FALLBACK
54
)),
55
Self::MemoryIntensive => Some(format!(
56
"style=filled,fillcolor=\"{}\"",
57
Self::COLOR_MEM_INTENSIVE
58
)),
59
Self::Generic => None,
60
}
61
}
62
63
/// Returns a legend explaining the node style meaning.
64
pub fn legend() -> String {
65
format!(
66
"fontname=\"Helvetica\"\nfontsize=\"10\"\nlabelloc=\"b\"\nlabel=<<BR/><BR/><B>Legend</B><BR/><BR/>◯ streaming engine node <FONT COLOR=\"{}\">⬤</FONT> potentially memory-intensive node <FONT COLOR=\"{}\">⬤</FONT> in-memory engine fallback>",
67
Self::COLOR_MEM_INTENSIVE,
68
Self::COLOR_IN_MEM_FALLBACK,
69
)
70
}
71
}
72
73
fn escape_graphviz(s: &str) -> String {
74
s.replace('\\', "\\\\")
75
.replace('\n', "\\n")
76
.replace('"', "\\\"")
77
}
78
79
fn fmt_expr(f: &mut dyn Write, expr: &ExprIR, expr_arena: &Arena<AExpr>) -> std::fmt::Result {
80
// Remove the alias to make the display better
81
let without_alias = ExprIR::from_node(expr.node(), expr_arena);
82
write!(
83
f,
84
"{} = {}",
85
expr.output_name(),
86
without_alias.display(expr_arena)
87
)
88
}
89
90
pub enum FormatExprStyle {
91
Select,
92
NoAliases,
93
}
94
95
pub fn fmt_exprs_to_label(
96
exprs: &[ExprIR],
97
expr_arena: &Arena<AExpr>,
98
style: FormatExprStyle,
99
) -> String {
100
let mut buffer = String::new();
101
let mut f = EscapeLabel(&mut buffer);
102
fmt_exprs(&mut f, exprs, expr_arena, style);
103
buffer
104
}
105
106
pub fn fmt_exprs(
107
f: &mut dyn Write,
108
exprs: &[ExprIR],
109
expr_arena: &Arena<AExpr>,
110
style: FormatExprStyle,
111
) {
112
if matches!(style, FormatExprStyle::Select) {
113
let mut formatted = Vec::new();
114
115
let mut max_name_width = 0;
116
let mut max_expr_width = 0;
117
118
for e in exprs {
119
let mut name = String::new();
120
let mut expr = String::new();
121
122
// Remove the alias to make the display better
123
let without_alias = ExprIR::from_node(e.node(), expr_arena);
124
125
write!(name, "{}", e.output_name()).unwrap();
126
write!(expr, "{}", without_alias.display(expr_arena)).unwrap();
127
128
max_name_width = max_name_width.max(name.chars().count());
129
max_expr_width = max_expr_width.max(expr.chars().count());
130
131
formatted.push((name, expr));
132
}
133
134
for (name, expr) in formatted {
135
writeln!(f, "{name:>max_name_width$} = {expr:<max_expr_width$}").unwrap();
136
}
137
} else {
138
let Some(e) = exprs.first() else {
139
return;
140
};
141
142
fmt_expr(f, e, expr_arena).unwrap();
143
144
for e in &exprs[1..] {
145
f.write_str("\n").unwrap();
146
fmt_expr(f, e, expr_arena).unwrap();
147
}
148
}
149
}
150
151
#[recursive::recursive]
152
fn visualize_plan_rec(
153
node_key: PhysNodeKey,
154
phys_sm: &SlotMap<PhysNodeKey, PhysNode>,
155
expr_arena: &Arena<AExpr>,
156
visited: &mut SecondaryMap<PhysNodeKey, ()>,
157
out: &mut Vec<String>,
158
) {
159
if visited.contains_key(node_key) {
160
return;
161
}
162
visited.insert(node_key, ());
163
164
let kind = &phys_sm[node_key].kind;
165
166
use std::slice::from_ref;
167
let (label, inputs) = match kind {
168
PhysNodeKind::InMemorySource {
169
df,
170
disable_morsel_split: _,
171
} => (
172
format!(
173
"in-memory-source\\ncols: {}",
174
df.get_column_names_owned().join(", ")
175
),
176
&[][..],
177
),
178
#[cfg(feature = "python")]
179
PhysNodeKind::PythonScan { .. } => ("python-scan".to_string(), &[][..]),
180
PhysNodeKind::SinkMultiple { sinks } => {
181
for sink in sinks {
182
visualize_plan_rec(*sink, phys_sm, expr_arena, visited, out);
183
}
184
return;
185
},
186
PhysNodeKind::Select {
187
input,
188
selectors,
189
extend_original,
190
} => {
191
let label = if *extend_original {
192
"with-columns"
193
} else {
194
"select"
195
};
196
(
197
format!(
198
"{label}\\n{}",
199
fmt_exprs_to_label(selectors, expr_arena, FormatExprStyle::Select)
200
),
201
from_ref(input),
202
)
203
},
204
PhysNodeKind::WithRowIndex {
205
input,
206
name,
207
offset,
208
} => (
209
format!("with-row-index\\nname: {name}\\noffset: {offset:?}"),
210
from_ref(input),
211
),
212
PhysNodeKind::InputIndependentSelect { selectors } => (
213
format!(
214
"input-independent-select\\n{}",
215
fmt_exprs_to_label(selectors, expr_arena, FormatExprStyle::Select)
216
),
217
&[][..],
218
),
219
PhysNodeKind::Reduce { input, exprs } => (
220
format!(
221
"reduce\\n{}",
222
fmt_exprs_to_label(exprs, expr_arena, FormatExprStyle::Select)
223
),
224
from_ref(input),
225
),
226
PhysNodeKind::StreamingSlice {
227
input,
228
offset,
229
length,
230
} => (
231
format!("slice\\noffset: {offset}, length: {length}"),
232
from_ref(input),
233
),
234
PhysNodeKind::NegativeSlice {
235
input,
236
offset,
237
length,
238
} => (
239
format!("slice\\noffset: {offset}, length: {length}"),
240
from_ref(input),
241
),
242
PhysNodeKind::DynamicSlice {
243
input,
244
offset,
245
length,
246
} => ("slice".to_owned(), &[*input, *offset, *length][..]),
247
PhysNodeKind::Shift {
248
input,
249
offset,
250
fill: Some(fill),
251
} => ("shift".to_owned(), &[*input, *offset, *fill][..]),
252
PhysNodeKind::Shift {
253
input,
254
offset,
255
fill: None,
256
} => ("shift".to_owned(), &[*input, *offset][..]),
257
PhysNodeKind::Filter { input, predicate } => (
258
format!(
259
"filter\\n{}",
260
fmt_exprs_to_label(from_ref(predicate), expr_arena, FormatExprStyle::Select)
261
),
262
from_ref(input),
263
),
264
PhysNodeKind::SimpleProjection { input, columns } => (
265
format!("select\\ncols: {}", columns.join(", ")),
266
from_ref(input),
267
),
268
PhysNodeKind::InMemorySink { input } => ("in-memory-sink".to_string(), from_ref(input)),
269
PhysNodeKind::CallbackSink { input, .. } => ("callback-sink".to_string(), from_ref(input)),
270
PhysNodeKind::FileSink { input, options } => match options.file_format {
271
#[cfg(feature = "parquet")]
272
FileWriteFormat::Parquet(_) => ("parquet-sink".to_string(), from_ref(input)),
273
#[cfg(feature = "ipc")]
274
FileWriteFormat::Ipc(_) => ("ipc-sink".to_string(), from_ref(input)),
275
#[cfg(feature = "csv")]
276
FileWriteFormat::Csv(_) => ("csv-sink".to_string(), from_ref(input)),
277
#[cfg(feature = "json")]
278
FileWriteFormat::NDJson(_) => ("ndjson-sink".to_string(), from_ref(input)),
279
#[allow(unreachable_patterns)]
280
_ => todo!(),
281
},
282
PhysNodeKind::PartitionedSink { input, options } => {
283
let variant = match options.partition_strategy {
284
PartitionStrategyIR::Keyed { .. } => "partition-keyed",
285
PartitionStrategyIR::FileSize => "partition-file-size",
286
};
287
288
match options.file_format {
289
#[cfg(feature = "parquet")]
290
FileWriteFormat::Parquet(_) => (format!("{variant}[parquet]"), from_ref(input)),
291
#[cfg(feature = "ipc")]
292
FileWriteFormat::Ipc(_) => (format!("{variant}[ipc]"), from_ref(input)),
293
#[cfg(feature = "csv")]
294
FileWriteFormat::Csv(_) => (format!("{variant}[csv]"), from_ref(input)),
295
#[cfg(feature = "json")]
296
FileWriteFormat::NDJson(_) => (format!("{variant}[ndjson]"), from_ref(input)),
297
#[allow(unreachable_patterns)]
298
_ => todo!(),
299
}
300
},
301
PhysNodeKind::InMemoryMap {
302
input,
303
map: _,
304
format_str,
305
} => {
306
let mut label = String::new();
307
label.push_str("in-memory-map");
308
if let Some(format_str) = format_str {
309
label.write_str("\\n").unwrap();
310
311
let mut f = EscapeLabel(&mut label);
312
f.write_str(format_str).unwrap();
313
}
314
(label, from_ref(input))
315
},
316
PhysNodeKind::Map {
317
input,
318
map: _,
319
format_str,
320
} => {
321
let mut label = String::new();
322
label.push_str("map");
323
if let Some(format_str) = format_str {
324
label.push_str("\\n");
325
326
let mut f = EscapeLabel(&mut label);
327
f.write_str(format_str).unwrap();
328
}
329
(label, from_ref(input))
330
},
331
PhysNodeKind::SortedGroupBy {
332
input,
333
key,
334
aggs,
335
slice,
336
} => {
337
let mut s = String::new();
338
s.push_str("sorted-group-by\\n");
339
let f = &mut s;
340
write!(f, "key: {key}\\n").unwrap();
341
if let Some((offset, length)) = slice {
342
write!(f, "slice: {offset}, {length}\\n").unwrap();
343
}
344
write!(
345
f,
346
"aggs:\\n{}",
347
fmt_exprs_to_label(aggs, expr_arena, FormatExprStyle::Select)
348
)
349
.unwrap();
350
351
(s, from_ref(input))
352
},
353
PhysNodeKind::Sort {
354
input,
355
by_column,
356
slice: _,
357
sort_options: _,
358
} => (
359
format!(
360
"sort\\n{}",
361
fmt_exprs_to_label(by_column, expr_arena, FormatExprStyle::NoAliases)
362
),
363
from_ref(input),
364
),
365
PhysNodeKind::TopK {
366
input,
367
k,
368
by_column,
369
reverse,
370
nulls_last: _,
371
dyn_pred: _,
372
} => {
373
let name = if reverse.iter().all(|r| *r) {
374
"bottom-k"
375
} else {
376
"top-k"
377
};
378
(
379
format!(
380
"{name}\\n{}",
381
fmt_exprs_to_label(by_column, expr_arena, FormatExprStyle::NoAliases)
382
),
383
&[*input, *k][..],
384
)
385
},
386
PhysNodeKind::Repeat { value, repeats } => ("repeat".to_owned(), &[*value, *repeats][..]),
387
#[cfg(feature = "cum_agg")]
388
PhysNodeKind::CumAgg { input, kind } => {
389
use crate::nodes::cum_agg::CumAggKind;
390
391
(
392
format!(
393
"cum_{}",
394
match kind {
395
CumAggKind::Min => "min",
396
CumAggKind::Max => "max",
397
CumAggKind::Sum => "sum",
398
CumAggKind::Count => "count",
399
CumAggKind::Prod => "prod",
400
}
401
),
402
&[*input][..],
403
)
404
},
405
PhysNodeKind::GatherEvery { input, n, offset } => (
406
format!("gather_every\\nn: {n}, offset: {offset}"),
407
&[*input][..],
408
),
409
PhysNodeKind::Rle(input) => ("rle".to_owned(), &[*input][..]),
410
PhysNodeKind::RleId(input) => ("rle_id".to_owned(), &[*input][..]),
411
PhysNodeKind::PeakMinMax { input, is_peak_max } => (
412
if *is_peak_max { "peak_max" } else { "peak_min" }.to_owned(),
413
&[*input][..],
414
),
415
PhysNodeKind::OrderedUnion { inputs } => ("ordered-union".to_string(), inputs.as_slice()),
416
PhysNodeKind::UnorderedUnion { inputs } => {
417
("unordered-union".to_string(), inputs.as_slice())
418
},
419
PhysNodeKind::Zip {
420
inputs,
421
zip_behavior,
422
} => {
423
let label = match zip_behavior {
424
ZipBehavior::NullExtend => "zip-null-extend",
425
ZipBehavior::Broadcast => "zip-broadcast",
426
ZipBehavior::Strict => "zip-strict",
427
};
428
(label.to_string(), inputs.as_slice())
429
},
430
PhysNodeKind::Multiplexer { input } => ("multiplexer".to_string(), from_ref(input)),
431
PhysNodeKind::MultiScan {
432
scan_sources,
433
file_reader_builder,
434
cloud_options: _,
435
file_projection_builder,
436
output_schema,
437
row_index,
438
pre_slice,
439
predicate,
440
predicate_file_skip_applied: _,
441
hive_parts,
442
include_file_paths,
443
cast_columns_policy: _,
444
missing_columns_policy: _,
445
forbid_extra_columns: _,
446
deletion_files,
447
table_statistics: _,
448
file_schema: _,
449
disable_morsel_split: _,
450
} => {
451
let mut out = format!("multi-scan[{}]", file_reader_builder.reader_name());
452
let mut f = EscapeLabel(&mut out);
453
454
write!(f, "\n{} source", scan_sources.len()).unwrap();
455
456
if scan_sources.len() != 1 {
457
write!(f, "s").unwrap();
458
}
459
460
write!(
461
f,
462
"\nproject: {} total, {} from file",
463
output_schema.len(),
464
file_projection_builder.num_projections(),
465
)
466
.unwrap();
467
468
if let Some(ri) = row_index {
469
write!(f, "\nrow index: name: {}, offset: {:?}", ri.name, ri.offset).unwrap();
470
}
471
472
if let Some(col_name) = include_file_paths {
473
write!(f, "\nfile path column: {col_name}").unwrap();
474
}
475
476
if let Some(pre_slice) = pre_slice {
477
write!(f, "\nslice: offset: ").unwrap();
478
479
match pre_slice {
480
Slice::Positive { offset, len: _ } => write!(f, "{}", *offset),
481
Slice::Negative {
482
offset_from_end,
483
len: _,
484
} => write!(f, "-{}", *offset_from_end),
485
}
486
.unwrap();
487
488
write!(f, ", len: {}", pre_slice.len()).unwrap()
489
}
490
491
if let Some(predicate) = predicate {
492
write!(f, "\nfilter: {}", predicate.display(expr_arena)).unwrap();
493
}
494
495
if let Some(v) = hive_parts.as_ref().map(|h| h.df().width()) {
496
write!(f, "\nhive: {v} column").unwrap();
497
498
if v != 1 {
499
write!(f, "s").unwrap();
500
}
501
}
502
503
if let Some(deletion_files) = deletion_files {
504
write!(f, "\n{deletion_files}").unwrap();
505
}
506
507
(out, &[][..])
508
},
509
PhysNodeKind::GroupBy {
510
inputs,
511
key_per_input,
512
aggs_per_input,
513
} => {
514
let mut out = String::from("group-by");
515
for (key, aggs) in key_per_input.iter().zip(aggs_per_input) {
516
write!(
517
&mut out,
518
"\\nkey:\\n{}\\naggs:\\n{}",
519
fmt_exprs_to_label(key, expr_arena, FormatExprStyle::Select),
520
fmt_exprs_to_label(aggs, expr_arena, FormatExprStyle::Select)
521
)
522
.ok();
523
}
524
(out, inputs.as_slice())
525
},
526
#[cfg(feature = "dynamic_group_by")]
527
PhysNodeKind::DynamicGroupBy {
528
input,
529
options,
530
aggs,
531
slice,
532
} => {
533
use polars_time::prelude::{Label, StartBy};
534
535
let DynamicGroupOptions {
536
index_column,
537
every,
538
period,
539
offset,
540
label,
541
include_boundaries,
542
closed_window,
543
start_by,
544
} = options;
545
let mut s = String::new();
546
let f = &mut s;
547
f.write_str("dynamic-group-by\\n").unwrap();
548
write!(f, "index column: {index_column}\\n").unwrap();
549
write!(f, "every: {every}").unwrap();
550
if every != period {
551
write!(f, ", period: {period}").unwrap();
552
}
553
if !offset.is_zero() {
554
write!(f, ", offset: {offset}").unwrap();
555
}
556
f.write_str("\\n").unwrap();
557
if *label != Label::Left {
558
write!(f, "label: {}\\n", <&'static str>::from(label)).unwrap();
559
}
560
if *include_boundaries {
561
write!(f, "include_boundaries: true\\n").unwrap();
562
}
563
if *start_by != StartBy::WindowBound {
564
write!(f, "start_by: {}\\n", <&'static str>::from(start_by)).unwrap();
565
}
566
if *closed_window != ClosedWindow::Left {
567
write!(
568
f,
569
"closed_window: {}\\n",
570
<&'static str>::from(closed_window)
571
)
572
.unwrap();
573
}
574
if let Some((offset, length)) = slice {
575
write!(f, "slice: {offset}, {length}\\n").unwrap();
576
}
577
write!(
578
f,
579
"aggs:\\n{}",
580
fmt_exprs_to_label(aggs, expr_arena, FormatExprStyle::Select)
581
)
582
.unwrap();
583
584
(s, from_ref(input))
585
},
586
#[cfg(feature = "dynamic_group_by")]
587
PhysNodeKind::RollingGroupBy {
588
input,
589
index_column,
590
period,
591
offset,
592
closed,
593
slice,
594
aggs,
595
} => {
596
let mut s = String::new();
597
let f = &mut s;
598
f.write_str("rolling-group-by\\n").unwrap();
599
write!(f, "index column: {index_column}\\n").unwrap();
600
write!(f, "period: {period}, offset: {offset}\\n").unwrap();
601
write!(f, "closed: {}\\n", <&'static str>::from(*closed)).unwrap();
602
if let Some((offset, length)) = slice {
603
write!(f, "slice: {offset}, {length}\\n").unwrap();
604
}
605
write!(
606
f,
607
"aggs:\\n{}",
608
fmt_exprs_to_label(aggs, expr_arena, FormatExprStyle::Select)
609
)
610
.unwrap();
611
612
(s, from_ref(input))
613
},
614
PhysNodeKind::MergeJoin {
615
input_left,
616
input_right,
617
left_on,
618
right_on,
619
args,
620
..
621
} => {
622
let mut label = "merge-join".to_string();
623
let how: &'static str = (&args.how).into();
624
write!(
625
label,
626
r"\nleft_on:\n{}",
627
left_on
628
.iter()
629
.map(|s| escape_graphviz(&s[..]))
630
.collect::<Vec<_>>()
631
.join("\n"),
632
)
633
.unwrap();
634
write!(
635
label,
636
r"\nright_on:\n{}",
637
right_on
638
.iter()
639
.map(|s| escape_graphviz(&s[..]))
640
.collect::<Vec<_>>()
641
.join("\n"),
642
)
643
.unwrap();
644
write!(label, r"\nhow: {}", escape_graphviz(how)).unwrap();
645
if args.nulls_equal {
646
write!(label, r"\njoin-nulls").unwrap();
647
}
648
(label, &[*input_left, *input_right][..])
649
},
650
PhysNodeKind::InMemoryJoin {
651
input_left,
652
input_right,
653
left_on,
654
right_on,
655
args,
656
..
657
}
658
| PhysNodeKind::EquiJoin {
659
input_left,
660
input_right,
661
left_on,
662
right_on,
663
args,
664
}
665
| PhysNodeKind::SemiAntiJoin {
666
input_left,
667
input_right,
668
left_on,
669
right_on,
670
args,
671
output_bool: _,
672
} => {
673
let label = match phys_sm[node_key].kind {
674
PhysNodeKind::MergeJoin { .. } => "merge-join",
675
PhysNodeKind::EquiJoin { .. } => "equi-join",
676
PhysNodeKind::InMemoryJoin { .. } => "in-memory-join",
677
PhysNodeKind::CrossJoin { .. } => "cross-join",
678
PhysNodeKind::SemiAntiJoin {
679
output_bool: false, ..
680
} if args.how.is_semi() => "semi-join",
681
PhysNodeKind::SemiAntiJoin {
682
output_bool: false, ..
683
} if args.how.is_anti() => "anti-join",
684
PhysNodeKind::SemiAntiJoin {
685
output_bool: true, ..
686
} if args.how.is_semi() => "is-in",
687
PhysNodeKind::SemiAntiJoin {
688
output_bool: true, ..
689
} if args.how.is_anti() => "is-not-in",
690
_ => unreachable!(),
691
};
692
let mut label = label.to_string();
693
write!(
694
label,
695
r"\nleft_on:\n{}",
696
fmt_exprs_to_label(left_on, expr_arena, FormatExprStyle::NoAliases)
697
)
698
.unwrap();
699
write!(
700
label,
701
r"\nright_on:\n{}",
702
fmt_exprs_to_label(right_on, expr_arena, FormatExprStyle::NoAliases)
703
)
704
.unwrap();
705
if args.how.is_equi() {
706
write!(
707
label,
708
r"\nhow: {}",
709
escape_graphviz(&format!("{:?}", args.how))
710
)
711
.unwrap();
712
}
713
if args.nulls_equal {
714
write!(label, r"\njoin-nulls").unwrap();
715
}
716
(label, &[*input_left, *input_right][..])
717
},
718
PhysNodeKind::CrossJoin {
719
input_left,
720
input_right,
721
args: _,
722
} => ("cross-join".to_string(), &[*input_left, *input_right][..]),
723
PhysNodeKind::AsOfJoin {
724
input_left,
725
input_right,
726
..
727
} => ("asof_join".to_string(), &[*input_left, *input_right][..]),
728
#[cfg(feature = "merge_sorted")]
729
PhysNodeKind::MergeSorted {
730
input_left,
731
input_right,
732
} => ("merge-sorted".to_string(), &[*input_left, *input_right][..]),
733
#[cfg(feature = "ewma")]
734
PhysNodeKind::EwmMean { input, options: _ } => ("ewm-mean".to_string(), &[*input][..]),
735
#[cfg(feature = "ewma")]
736
PhysNodeKind::EwmVar { input, options: _ } => ("ewm-var".to_string(), &[*input][..]),
737
#[cfg(feature = "ewma")]
738
PhysNodeKind::EwmStd { input, options: _ } => ("ewm-std".to_string(), &[*input][..]),
739
};
740
741
let node_id = node_key.data().as_ffi();
742
let style = NodeStyle::for_node_kind(kind);
743
744
if let Some(attrs) = style.node_attrs() {
745
out.push(format!("{node_id} [label=\"{label}\",{attrs}];"));
746
} else {
747
out.push(format!("{node_id} [label=\"{label}\"];"));
748
}
749
for input in inputs {
750
visualize_plan_rec(input.node, phys_sm, expr_arena, visited, out);
751
out.push(format!(
752
"{} -> {};",
753
input.node.data().as_ffi(),
754
node_key.data().as_ffi()
755
));
756
}
757
}
758
759
pub fn visualize_plan(
760
root: PhysNodeKey,
761
phys_sm: &SlotMap<PhysNodeKey, PhysNode>,
762
expr_arena: &Arena<AExpr>,
763
) -> String {
764
let mut visited: SecondaryMap<PhysNodeKey, ()> = SecondaryMap::new();
765
let mut out = Vec::with_capacity(phys_sm.len() + 3);
766
out.push("digraph polars {\nrankdir=\"BT\"\nnode [fontname=\"Monospace\"]".to_string());
767
out.push(NodeStyle::legend());
768
visualize_plan_rec(root, phys_sm, expr_arena, &mut visited, &mut out);
769
out.push("}".to_string());
770
out.join("\n")
771
}
772
773