Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/physical_plan/fmt.rs
6939 views
1
use std::fmt::Write;
2
3
use polars_plan::dsl::PartitionVariantIR;
4
use polars_plan::plans::expr_ir::ExprIR;
5
use polars_plan::plans::{AExpr, EscapeLabel};
6
use polars_plan::prelude::FileType;
7
use polars_utils::arena::Arena;
8
use polars_utils::slice_enum::Slice;
9
use slotmap::{Key, SecondaryMap, SlotMap};
10
11
use super::{PhysNode, PhysNodeKey, PhysNodeKind};
12
13
/// A style of a graph node.
14
enum NodeStyle {
15
InMemoryFallback,
16
MemoryIntensive,
17
Generic,
18
}
19
20
impl NodeStyle {
21
const COLOR_IN_MEM_FALLBACK: &str = "0.0 0.3 1.0"; // Pastel red
22
const COLOR_MEM_INTENSIVE: &str = "0.16 0.3 1.0"; // Pastel yellow
23
24
/// Returns a style for a node kind.
25
pub fn for_node_kind(kind: &PhysNodeKind) -> Self {
26
use PhysNodeKind as K;
27
match kind {
28
K::InMemoryMap { .. } | K::InMemoryJoin { .. } => Self::InMemoryFallback,
29
K::InMemorySource { .. }
30
| K::InputIndependentSelect { .. }
31
| K::NegativeSlice { .. }
32
| K::InMemorySink { .. }
33
| K::Sort { .. }
34
| K::GroupBy { .. }
35
| K::EquiJoin { .. }
36
| K::SemiAntiJoin { .. }
37
| K::Multiplexer { .. } => Self::MemoryIntensive,
38
#[cfg(feature = "merge_sorted")]
39
K::MergeSorted { .. } => Self::MemoryIntensive,
40
_ => Self::Generic,
41
}
42
}
43
44
/// Returns extra styling attributes (if any) for the graph node.
45
pub fn node_attrs(&self) -> Option<String> {
46
match self {
47
Self::InMemoryFallback => Some(format!(
48
"style=filled,fillcolor=\"{}\"",
49
Self::COLOR_IN_MEM_FALLBACK
50
)),
51
Self::MemoryIntensive => Some(format!(
52
"style=filled,fillcolor=\"{}\"",
53
Self::COLOR_MEM_INTENSIVE
54
)),
55
Self::Generic => None,
56
}
57
}
58
59
/// Returns a legend explaining the node style meaning.
60
pub fn legend() -> String {
61
format!(
62
"fontname=\"Helvetica\"\nfontsize=\"10\"\nlabelloc=\"b\"\nlabel=<<BR/><BR/><B>Legend</B><BR/><BR/>◯ streaming engine node <FONT COLOR=\"{}\">⬤</FONT> potentially memory-intensive node <FONT COLOR=\"{}\">⬤</FONT> in-memory engine fallback>",
63
Self::COLOR_MEM_INTENSIVE,
64
Self::COLOR_IN_MEM_FALLBACK,
65
)
66
}
67
}
68
69
fn escape_graphviz(s: &str) -> String {
70
s.replace('\\', "\\\\")
71
.replace('\n', "\\n")
72
.replace('"', "\\\"")
73
}
74
75
fn fmt_expr(f: &mut dyn Write, expr: &ExprIR, expr_arena: &Arena<AExpr>) -> std::fmt::Result {
76
// Remove the alias to make the display better
77
let without_alias = ExprIR::from_node(expr.node(), expr_arena);
78
write!(
79
f,
80
"{} = {}",
81
expr.output_name(),
82
without_alias.display(expr_arena)
83
)
84
}
85
86
pub enum FormatExprStyle {
87
Select,
88
NoAliases,
89
}
90
91
pub fn fmt_exprs_to_label(
92
exprs: &[ExprIR],
93
expr_arena: &Arena<AExpr>,
94
style: FormatExprStyle,
95
) -> String {
96
let mut buffer = String::new();
97
let mut f = EscapeLabel(&mut buffer);
98
fmt_exprs(&mut f, exprs, expr_arena, style);
99
buffer
100
}
101
102
pub fn fmt_exprs(
103
f: &mut dyn Write,
104
exprs: &[ExprIR],
105
expr_arena: &Arena<AExpr>,
106
style: FormatExprStyle,
107
) {
108
if matches!(style, FormatExprStyle::Select) {
109
let mut formatted = Vec::new();
110
111
let mut max_name_width = 0;
112
let mut max_expr_width = 0;
113
114
for e in exprs {
115
let mut name = String::new();
116
let mut expr = String::new();
117
118
// Remove the alias to make the display better
119
let without_alias = ExprIR::from_node(e.node(), expr_arena);
120
121
write!(name, "{}", e.output_name()).unwrap();
122
write!(expr, "{}", without_alias.display(expr_arena)).unwrap();
123
124
max_name_width = max_name_width.max(name.chars().count());
125
max_expr_width = max_expr_width.max(expr.chars().count());
126
127
formatted.push((name, expr));
128
}
129
130
for (name, expr) in formatted {
131
write!(f, "{name:>max_name_width$} = {expr:<max_expr_width$}\\n").unwrap();
132
}
133
} else {
134
let Some(e) = exprs.first() else {
135
return;
136
};
137
138
fmt_expr(f, e, expr_arena).unwrap();
139
140
for e in &exprs[1..] {
141
f.write_str("\\n").unwrap();
142
fmt_expr(f, e, expr_arena).unwrap();
143
}
144
}
145
}
146
147
#[recursive::recursive]
148
fn visualize_plan_rec(
149
node_key: PhysNodeKey,
150
phys_sm: &SlotMap<PhysNodeKey, PhysNode>,
151
expr_arena: &Arena<AExpr>,
152
visited: &mut SecondaryMap<PhysNodeKey, ()>,
153
out: &mut Vec<String>,
154
) {
155
if visited.contains_key(node_key) {
156
return;
157
}
158
visited.insert(node_key, ());
159
160
let kind = &phys_sm[node_key].kind;
161
162
use std::slice::from_ref;
163
let (label, inputs) = match kind {
164
PhysNodeKind::InMemorySource { df } => (
165
format!(
166
"in-memory-source\\ncols: {}",
167
df.get_column_names_owned().join(", ")
168
),
169
&[][..],
170
),
171
#[cfg(feature = "python")]
172
PhysNodeKind::PythonScan { .. } => ("python-scan".to_string(), &[][..]),
173
PhysNodeKind::SinkMultiple { sinks } => {
174
for sink in sinks {
175
visualize_plan_rec(*sink, phys_sm, expr_arena, visited, out);
176
}
177
return;
178
},
179
PhysNodeKind::Select {
180
input,
181
selectors,
182
extend_original,
183
} => {
184
let label = if *extend_original {
185
"with-columns"
186
} else {
187
"select"
188
};
189
(
190
format!(
191
"{label}\\n{}",
192
fmt_exprs_to_label(selectors, expr_arena, FormatExprStyle::Select)
193
),
194
from_ref(input),
195
)
196
},
197
PhysNodeKind::WithRowIndex {
198
input,
199
name,
200
offset,
201
} => (
202
format!("with-row-index\\nname: {name}\\noffset: {offset:?}"),
203
from_ref(input),
204
),
205
PhysNodeKind::InputIndependentSelect { selectors } => (
206
format!(
207
"input-independent-select\\n{}",
208
fmt_exprs_to_label(selectors, expr_arena, FormatExprStyle::Select)
209
),
210
&[][..],
211
),
212
PhysNodeKind::Reduce { input, exprs } => (
213
format!(
214
"reduce\\n{}",
215
fmt_exprs_to_label(exprs, expr_arena, FormatExprStyle::Select)
216
),
217
from_ref(input),
218
),
219
PhysNodeKind::StreamingSlice {
220
input,
221
offset,
222
length,
223
} => (
224
format!("slice\\noffset: {offset}, length: {length}"),
225
from_ref(input),
226
),
227
PhysNodeKind::NegativeSlice {
228
input,
229
offset,
230
length,
231
} => (
232
format!("slice\\noffset: {offset}, length: {length}"),
233
from_ref(input),
234
),
235
PhysNodeKind::DynamicSlice {
236
input,
237
offset,
238
length,
239
} => ("slice".to_owned(), &[*input, *offset, *length][..]),
240
PhysNodeKind::Shift {
241
input,
242
offset,
243
fill: Some(fill),
244
} => ("shift".to_owned(), &[*input, *offset, *fill][..]),
245
PhysNodeKind::Shift {
246
input,
247
offset,
248
fill: None,
249
} => ("shift".to_owned(), &[*input, *offset][..]),
250
PhysNodeKind::Filter { input, predicate } => (
251
format!(
252
"filter\\n{}",
253
fmt_exprs_to_label(from_ref(predicate), expr_arena, FormatExprStyle::Select)
254
),
255
from_ref(input),
256
),
257
PhysNodeKind::SimpleProjection { input, columns } => (
258
format!("select\\ncols: {}", columns.join(", ")),
259
from_ref(input),
260
),
261
PhysNodeKind::InMemorySink { input } => ("in-memory-sink".to_string(), from_ref(input)),
262
PhysNodeKind::FileSink {
263
input, file_type, ..
264
} => match file_type {
265
#[cfg(feature = "parquet")]
266
FileType::Parquet(_) => ("parquet-sink".to_string(), from_ref(input)),
267
#[cfg(feature = "ipc")]
268
FileType::Ipc(_) => ("ipc-sink".to_string(), from_ref(input)),
269
#[cfg(feature = "csv")]
270
FileType::Csv(_) => ("csv-sink".to_string(), from_ref(input)),
271
#[cfg(feature = "json")]
272
FileType::Json(_) => ("ndjson-sink".to_string(), from_ref(input)),
273
#[allow(unreachable_patterns)]
274
_ => todo!(),
275
},
276
PhysNodeKind::PartitionSink {
277
input,
278
file_type,
279
variant,
280
..
281
} => {
282
let variant = match variant {
283
PartitionVariantIR::ByKey { .. } => "partition-by-key-sink",
284
PartitionVariantIR::MaxSize { .. } => "partition-max-size-sink",
285
PartitionVariantIR::Parted { .. } => "partition-parted-sink",
286
};
287
288
match file_type {
289
#[cfg(feature = "parquet")]
290
FileType::Parquet(_) => (format!("{variant}[parquet]"), from_ref(input)),
291
#[cfg(feature = "ipc")]
292
FileType::Ipc(_) => (format!("{variant}[ipc]"), from_ref(input)),
293
#[cfg(feature = "csv")]
294
FileType::Csv(_) => (format!("{variant}[csv]"), from_ref(input)),
295
#[cfg(feature = "json")]
296
FileType::Json(_) => (format!("{variant}[ndjson]"), from_ref(input)),
297
#[allow(unreachable_patterns)]
298
_ => todo!(),
299
}
300
},
301
PhysNodeKind::InMemoryMap {
302
input,
303
map: _,
304
format_str,
305
} => {
306
let mut label = String::new();
307
label.push_str("in-memory-map");
308
if let Some(format_str) = format_str {
309
label.push('\n');
310
311
let mut f = EscapeLabel(&mut label);
312
write!(f, "{format_str}").unwrap();
313
}
314
(label, from_ref(input))
315
},
316
PhysNodeKind::Map { input, map: _ } => ("map".to_string(), from_ref(input)),
317
PhysNodeKind::Sort {
318
input,
319
by_column,
320
slice: _,
321
sort_options: _,
322
} => (
323
format!(
324
"sort\\n{}",
325
fmt_exprs_to_label(by_column, expr_arena, FormatExprStyle::NoAliases)
326
),
327
from_ref(input),
328
),
329
PhysNodeKind::TopK {
330
input,
331
k,
332
by_column,
333
reverse,
334
nulls_last: _,
335
} => {
336
let name = if reverse.iter().all(|r| *r) {
337
"bottom-k"
338
} else {
339
"top-k"
340
};
341
(
342
format!(
343
"{name}\\n{}",
344
fmt_exprs_to_label(by_column, expr_arena, FormatExprStyle::NoAliases)
345
),
346
&[*input, *k][..],
347
)
348
},
349
PhysNodeKind::Repeat { value, repeats } => ("repeat".to_owned(), &[*value, *repeats][..]),
350
#[cfg(feature = "cum_agg")]
351
PhysNodeKind::CumAgg { input, kind } => {
352
use crate::nodes::cum_agg::CumAggKind;
353
354
(
355
format!(
356
"cum_{}",
357
match kind {
358
CumAggKind::Min => "min",
359
CumAggKind::Max => "max",
360
CumAggKind::Sum => "sum",
361
CumAggKind::Count => "count",
362
CumAggKind::Prod => "prod",
363
}
364
),
365
&[*input][..],
366
)
367
},
368
PhysNodeKind::Rle(input) => ("rle".to_owned(), &[*input][..]),
369
PhysNodeKind::RleId(input) => ("rle_id".to_owned(), &[*input][..]),
370
PhysNodeKind::PeakMinMax { input, is_peak_max } => (
371
if *is_peak_max { "peak_max" } else { "peak_min" }.to_owned(),
372
&[*input][..],
373
),
374
PhysNodeKind::OrderedUnion { inputs } => ("ordered-union".to_string(), inputs.as_slice()),
375
PhysNodeKind::Zip {
376
inputs,
377
null_extend,
378
} => {
379
let label = if *null_extend {
380
"zip-null-extend"
381
} else {
382
"zip"
383
};
384
(label.to_string(), inputs.as_slice())
385
},
386
PhysNodeKind::Multiplexer { input } => ("multiplexer".to_string(), from_ref(input)),
387
PhysNodeKind::MultiScan {
388
scan_sources,
389
file_reader_builder,
390
cloud_options: _,
391
file_projection_builder,
392
output_schema,
393
row_index,
394
pre_slice,
395
predicate,
396
hive_parts,
397
include_file_paths,
398
cast_columns_policy: _,
399
missing_columns_policy: _,
400
forbid_extra_columns: _,
401
deletion_files,
402
file_schema: _,
403
} => {
404
let mut out = format!("multi-scan[{}]", file_reader_builder.reader_name());
405
let mut f = EscapeLabel(&mut out);
406
407
write!(f, "\n{} source", scan_sources.len()).unwrap();
408
409
if scan_sources.len() != 1 {
410
write!(f, "s").unwrap();
411
}
412
413
write!(
414
f,
415
"\nproject: {} total, {} from file",
416
output_schema.len(),
417
file_projection_builder.num_projections(),
418
)
419
.unwrap();
420
421
if let Some(ri) = row_index {
422
write!(f, "\nrow index: name: {}, offset: {:?}", ri.name, ri.offset).unwrap();
423
}
424
425
if let Some(col_name) = include_file_paths {
426
write!(f, "\nfile path column: {col_name}").unwrap();
427
}
428
429
if let Some(pre_slice) = pre_slice {
430
write!(f, "\nslice: offset: ").unwrap();
431
432
match pre_slice {
433
Slice::Positive { offset, len: _ } => write!(f, "{}", *offset),
434
Slice::Negative {
435
offset_from_end,
436
len: _,
437
} => write!(f, "-{}", *offset_from_end),
438
}
439
.unwrap();
440
441
write!(f, ", len: {}", pre_slice.len()).unwrap()
442
}
443
444
if let Some(predicate) = predicate {
445
write!(f, "\nfilter: {}", predicate.display(expr_arena)).unwrap();
446
}
447
448
if let Some(v) = hive_parts.as_ref().map(|h| h.df().width()) {
449
write!(f, "\nhive: {v} column").unwrap();
450
451
if v != 1 {
452
write!(f, "s").unwrap();
453
}
454
}
455
456
if let Some(deletion_files) = deletion_files {
457
write!(f, "\n{deletion_files}").unwrap();
458
}
459
460
(out, &[][..])
461
},
462
PhysNodeKind::GroupBy { input, key, aggs } => (
463
format!(
464
"group-by\\nkey:\\n{}\\naggs:\\n{}",
465
fmt_exprs_to_label(key, expr_arena, FormatExprStyle::Select),
466
fmt_exprs_to_label(aggs, expr_arena, FormatExprStyle::Select)
467
),
468
from_ref(input),
469
),
470
PhysNodeKind::InMemoryJoin {
471
input_left,
472
input_right,
473
left_on,
474
right_on,
475
args,
476
..
477
}
478
| PhysNodeKind::EquiJoin {
479
input_left,
480
input_right,
481
left_on,
482
right_on,
483
args,
484
}
485
| PhysNodeKind::SemiAntiJoin {
486
input_left,
487
input_right,
488
left_on,
489
right_on,
490
args,
491
output_bool: _,
492
} => {
493
let label = match phys_sm[node_key].kind {
494
PhysNodeKind::EquiJoin { .. } => "equi-join",
495
PhysNodeKind::InMemoryJoin { .. } => "in-memory-join",
496
PhysNodeKind::CrossJoin { .. } => "cross-join",
497
PhysNodeKind::SemiAntiJoin {
498
output_bool: false, ..
499
} if args.how.is_semi() => "semi-join",
500
PhysNodeKind::SemiAntiJoin {
501
output_bool: false, ..
502
} if args.how.is_anti() => "anti-join",
503
PhysNodeKind::SemiAntiJoin {
504
output_bool: true, ..
505
} if args.how.is_semi() => "is-in",
506
PhysNodeKind::SemiAntiJoin {
507
output_bool: true, ..
508
} if args.how.is_anti() => "is-not-in",
509
_ => unreachable!(),
510
};
511
let mut label = label.to_string();
512
write!(
513
label,
514
r"\nleft_on:\n{}",
515
fmt_exprs_to_label(left_on, expr_arena, FormatExprStyle::NoAliases)
516
)
517
.unwrap();
518
write!(
519
label,
520
r"\nright_on:\n{}",
521
fmt_exprs_to_label(right_on, expr_arena, FormatExprStyle::NoAliases)
522
)
523
.unwrap();
524
if args.how.is_equi() {
525
write!(
526
label,
527
r"\nhow: {}",
528
escape_graphviz(&format!("{:?}", args.how))
529
)
530
.unwrap();
531
}
532
if args.nulls_equal {
533
write!(label, r"\njoin-nulls").unwrap();
534
}
535
(label, &[*input_left, *input_right][..])
536
},
537
PhysNodeKind::CrossJoin {
538
input_left,
539
input_right,
540
args: _,
541
} => ("cross-join".to_string(), &[*input_left, *input_right][..]),
542
#[cfg(feature = "merge_sorted")]
543
PhysNodeKind::MergeSorted {
544
input_left,
545
input_right,
546
} => ("merge-sorted".to_string(), &[*input_left, *input_right][..]),
547
};
548
549
let node_id = node_key.data().as_ffi();
550
let style = NodeStyle::for_node_kind(kind);
551
552
if let Some(attrs) = style.node_attrs() {
553
out.push(format!("{node_id} [label=\"{label}\",{attrs}];"));
554
} else {
555
out.push(format!("{node_id} [label=\"{label}\"];"));
556
}
557
for input in inputs {
558
visualize_plan_rec(input.node, phys_sm, expr_arena, visited, out);
559
out.push(format!(
560
"{} -> {};",
561
input.node.data().as_ffi(),
562
node_key.data().as_ffi()
563
));
564
}
565
}
566
567
pub fn visualize_plan(
568
root: PhysNodeKey,
569
phys_sm: &SlotMap<PhysNodeKey, PhysNode>,
570
expr_arena: &Arena<AExpr>,
571
) -> String {
572
let mut visited: SecondaryMap<PhysNodeKey, ()> = SecondaryMap::new();
573
let mut out = Vec::with_capacity(phys_sm.len() + 3);
574
out.push("digraph polars {\nrankdir=\"BT\"\nnode [fontname=\"Monospace\"]".to_string());
575
out.push(NodeStyle::legend());
576
visualize_plan_rec(root, phys_sm, expr_arena, &mut visited, &mut out);
577
out.push("}".to_string());
578
out.join("\n")
579
}
580
581