Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/ir/dot.rs
8458 views
1
use std::fmt;
2
use std::path::PathBuf;
3
4
use polars_core::prelude::{InitHashMaps, PlHashSet};
5
use polars_core::schema::Schema;
6
use polars_utils::pl_str::PlSmallStr;
7
use polars_utils::unique_id::UniqueId;
8
use recursive::recursive;
9
10
use super::format::ExprIRSliceDisplay;
11
use crate::prelude::ir::format::ColumnsDisplay;
12
use crate::prelude::*;
13
14
pub struct IRDotDisplay<'a> {
15
lp: IRPlanRef<'a>,
16
}
17
18
const INDENT: &str = " ";
19
20
#[derive(Clone, Copy)]
21
enum DotNode {
22
Plain(usize),
23
Cache(UniqueId),
24
}
25
26
impl fmt::Display for DotNode {
27
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28
match self {
29
DotNode::Plain(n) => write!(f, "p{n}"),
30
DotNode::Cache(n) => write!(f, "\"{n}\""),
31
}
32
}
33
}
34
35
#[inline(always)]
36
fn write_label<'a, 'b>(
37
f: &'a mut fmt::Formatter<'b>,
38
id: DotNode,
39
mut w: impl FnMut(&mut EscapeLabel<'a>) -> fmt::Result,
40
) -> fmt::Result {
41
write!(f, "{INDENT}{id}[label=\"")?;
42
43
let mut escaped = EscapeLabel(f);
44
w(&mut escaped)?;
45
let EscapeLabel(f) = escaped;
46
47
writeln!(f, "\"]")?;
48
49
Ok(())
50
}
51
52
impl<'a> IRDotDisplay<'a> {
53
pub fn new(lp: IRPlanRef<'a>) -> Self {
54
Self { lp }
55
}
56
57
fn with_root(&self, root: Node) -> Self {
58
Self {
59
lp: self.lp.with_root(root),
60
}
61
}
62
63
fn display_expr(&self, expr: &'a ExprIR) -> ExprIRDisplay<'a> {
64
expr.display(self.lp.expr_arena)
65
}
66
67
fn display_exprs(&self, exprs: &'a [ExprIR]) -> ExprIRSliceDisplay<'a, ExprIR> {
68
ExprIRSliceDisplay {
69
exprs,
70
expr_arena: self.lp.expr_arena,
71
}
72
}
73
74
#[recursive]
75
fn _format(
76
&self,
77
f: &mut fmt::Formatter<'_>,
78
parent: Option<DotNode>,
79
last: &mut usize,
80
visited_caches: &mut PlHashSet<UniqueId>,
81
) -> std::fmt::Result {
82
use fmt::Write;
83
84
let root = self.lp.root();
85
let id = if let IR::Cache { id, .. } = root {
86
DotNode::Cache(*id)
87
} else {
88
*last += 1;
89
DotNode::Plain(*last)
90
};
91
92
if let Some(parent) = parent {
93
writeln!(f, "{INDENT}{id} -> {parent}")?;
94
}
95
96
macro_rules! recurse {
97
($input:expr) => {
98
self.with_root($input)
99
._format(f, Some(id), last, visited_caches)?;
100
};
101
}
102
103
use IR::*;
104
match root {
105
Union { inputs, .. } => {
106
for input in inputs {
107
recurse!(*input);
108
}
109
110
write_label(f, id, |f| f.write_str("UNION"))?;
111
},
112
HConcat { inputs, .. } => {
113
for input in inputs {
114
recurse!(*input);
115
}
116
117
write_label(f, id, |f| f.write_str("HCONCAT"))?;
118
},
119
Cache {
120
input,
121
id: cache_id,
122
..
123
} => {
124
if !visited_caches.contains(cache_id) {
125
visited_caches.insert(*cache_id);
126
127
recurse!(*input);
128
129
write_label(f, id, |f| f.write_str("CACHE"))?;
130
}
131
},
132
Filter { predicate, input } => {
133
recurse!(*input);
134
135
let pred = self.display_expr(predicate);
136
write_label(f, id, |f| write!(f, "FILTER BY {pred}"))?;
137
},
138
#[cfg(feature = "python")]
139
PythonScan { options } => {
140
let predicate = match &options.predicate {
141
PythonPredicate::Polars(e) => format!("{}", self.display_expr(e)),
142
PythonPredicate::PyArrow(s) => s.clone(),
143
PythonPredicate::None => "none".to_string(),
144
};
145
let with_columns = NumColumns(options.with_columns.as_ref().map(|s| s.as_ref()));
146
let total_columns = options.schema.len();
147
148
write_label(f, id, |f| {
149
write!(
150
f,
151
"PYTHON SCAN\nπ {with_columns}/{total_columns};\nσ {predicate}"
152
)
153
})?
154
},
155
Select {
156
expr,
157
input,
158
schema,
159
..
160
} => {
161
recurse!(*input);
162
write_label(f, id, |f| write!(f, "π {}/{}", expr.len(), schema.len()))?;
163
},
164
Sort {
165
input, by_column, ..
166
} => {
167
let by_column = self.display_exprs(by_column);
168
recurse!(*input);
169
write_label(f, id, |f| write!(f, "SORT BY {by_column}"))?;
170
},
171
GroupBy {
172
input, keys, aggs, ..
173
} => {
174
let keys = self.display_exprs(keys);
175
let aggs = self.display_exprs(aggs);
176
recurse!(*input);
177
write_label(f, id, |f| write!(f, "AGG {aggs}\nBY\n{keys}"))?;
178
},
179
HStack { input, exprs, .. } => {
180
let exprs = self.display_exprs(exprs);
181
recurse!(*input);
182
write_label(f, id, |f| write!(f, "WITH COLUMNS {exprs}"))?;
183
},
184
Slice { input, offset, len } => {
185
recurse!(*input);
186
write_label(f, id, |f| write!(f, "SLICE offset: {offset}; len: {len}"))?;
187
},
188
Distinct { input, options, .. } => {
189
recurse!(*input);
190
write_label(f, id, |f| {
191
f.write_str("DISTINCT")?;
192
193
if let Some(subset) = &options.subset {
194
f.write_str(" BY ")?;
195
196
let mut subset = subset.iter();
197
198
if let Some(fst) = subset.next() {
199
f.write_str(fst)?;
200
for name in subset {
201
write!(f, ", \"{name}\"")?;
202
}
203
} else {
204
f.write_str("None")?;
205
}
206
}
207
208
Ok(())
209
})?;
210
},
211
DataFrameScan {
212
schema,
213
output_schema,
214
..
215
} => {
216
let num_columns = NumColumnsSchema(output_schema.as_ref().map(|p| p.as_ref()));
217
let total_columns = schema.len();
218
219
write_label(f, id, |f| {
220
write!(f, "TABLE\nπ {num_columns}/{total_columns}")
221
})?;
222
},
223
Scan {
224
sources,
225
file_info,
226
hive_parts: _,
227
predicate,
228
predicate_file_skip_applied: _,
229
scan_type,
230
unified_scan_args,
231
output_schema: _,
232
} => {
233
let name: &str = (&**scan_type).into();
234
let path = ScanSourcesDisplay(sources);
235
let with_columns = unified_scan_args
236
.projection
237
.as_ref()
238
.map(|cols| cols.as_ref());
239
let with_columns = NumColumns(with_columns);
240
let total_columns =
241
file_info.schema.len() - usize::from(unified_scan_args.row_index.is_some());
242
243
write_label(f, id, |f| {
244
write!(f, "{name} SCAN {path}\nπ {with_columns}/{total_columns};",)?;
245
246
if let Some(predicate) = predicate.as_ref() {
247
write!(f, "\nσ {}", self.display_expr(predicate))?;
248
}
249
250
if let Some(row_index) = unified_scan_args.row_index.as_ref() {
251
write!(f, "\nrow index: {} (+{})", row_index.name, row_index.offset)?;
252
}
253
254
Ok(())
255
})?;
256
},
257
Join {
258
input_left,
259
input_right,
260
left_on,
261
right_on,
262
options,
263
..
264
} => {
265
recurse!(*input_left);
266
recurse!(*input_right);
267
268
write_label(f, id, |f| {
269
write!(f, "JOIN {}", options.args.how)?;
270
271
if !left_on.is_empty() {
272
let left_on = self.display_exprs(left_on);
273
let right_on = self.display_exprs(right_on);
274
write!(f, "\nleft: {left_on};\nright: {right_on}")?
275
}
276
Ok(())
277
})?;
278
},
279
MapFunction {
280
input, function, ..
281
} => {
282
recurse!(*input);
283
write_label(f, id, |f| write!(f, "{function}"))?;
284
},
285
ExtContext { input, .. } => {
286
recurse!(*input);
287
write_label(f, id, |f| f.write_str("EXTERNAL_CONTEXT"))?;
288
},
289
Sink { input, payload, .. } => {
290
recurse!(*input);
291
292
write_label(f, id, |f| {
293
f.write_str(match payload {
294
SinkTypeIR::Memory => "SINK (MEMORY)",
295
SinkTypeIR::Callback { .. } => "SINK (CALLBACK)",
296
SinkTypeIR::File { .. } => "SINK (FILE)",
297
SinkTypeIR::Partitioned { .. } => "SINK (PARTITION)",
298
})
299
})?;
300
},
301
SinkMultiple { inputs } => {
302
for input in inputs {
303
recurse!(*input);
304
}
305
306
write_label(f, id, |f| f.write_str("SINK MULTIPLE"))?;
307
},
308
SimpleProjection { input, columns } => {
309
let num_columns = columns.as_ref().len();
310
let total_columns = self.lp.lp_arena.get(*input).schema(self.lp.lp_arena).len();
311
312
let columns = ColumnsDisplay(columns.as_ref());
313
recurse!(*input);
314
write_label(f, id, |f| {
315
write!(f, "simple π {num_columns}/{total_columns}\n[{columns}]")
316
})?;
317
},
318
#[cfg(feature = "merge_sorted")]
319
MergeSorted {
320
input_left,
321
input_right,
322
key,
323
} => {
324
recurse!(*input_left);
325
recurse!(*input_right);
326
327
write_label(f, id, |f| write!(f, "MERGE_SORTED ON '{key}'",))?;
328
},
329
Invalid => write_label(f, id, |f| f.write_str("INVALID"))?,
330
}
331
332
Ok(())
333
}
334
}
335
336
// A few utility structures for formatting
337
pub struct PathsDisplay<'a>(pub &'a [PathBuf]);
338
pub struct ScanSourcesDisplay<'a>(pub &'a ScanSources);
339
struct NumColumns<'a>(Option<&'a [PlSmallStr]>);
340
struct NumColumnsSchema<'a>(Option<&'a Schema>);
341
342
impl fmt::Display for ScanSourceRef<'_> {
343
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344
match self {
345
ScanSourceRef::Path(path) => path.fmt(f),
346
ScanSourceRef::File(_) => f.write_str("open-file"),
347
ScanSourceRef::Buffer(buff) => write!(f, "{} in-mem bytes", buff.len()),
348
}
349
}
350
}
351
352
impl fmt::Display for ScanSourcesDisplay<'_> {
353
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354
match self.0.len() {
355
0 => write!(f, "[]"),
356
1 => write!(f, "[{}]", self.0.at(0)),
357
2 => write!(f, "[{}, {}]", self.0.at(0), self.0.at(1)),
358
_ => write!(
359
f,
360
"[{}, ... {} other sources]",
361
self.0.at(0),
362
self.0.len() - 1,
363
),
364
}
365
}
366
}
367
368
impl fmt::Display for PathsDisplay<'_> {
369
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370
match self.0.len() {
371
0 => write!(f, "[]"),
372
1 => write!(f, "[{}]", self.0[0].display()),
373
2 => write!(f, "[{}, {}]", self.0[0].display(), self.0[1].display()),
374
_ => write!(
375
f,
376
"[{}, ... {} other files]",
377
self.0[0].display(),
378
self.0.len() - 1,
379
),
380
}
381
}
382
}
383
384
impl fmt::Display for NumColumns<'_> {
385
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
386
match self.0 {
387
None => f.write_str("*"),
388
Some(columns) => columns.len().fmt(f),
389
}
390
}
391
}
392
393
impl fmt::Display for NumColumnsSchema<'_> {
394
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
395
match self.0 {
396
None => f.write_str("*"),
397
Some(columns) => columns.len().fmt(f),
398
}
399
}
400
}
401
402
/// Utility structure to write to a [`fmt::Formatter`] whilst escaping the output as a label name
403
pub struct EscapeLabel<'a>(pub &'a mut dyn fmt::Write);
404
405
impl fmt::Write for EscapeLabel<'_> {
406
fn write_str(&mut self, mut s: &str) -> fmt::Result {
407
loop {
408
let mut char_indices = s.char_indices();
409
410
// This escapes quotes, new lines, and backslashes
411
let f = char_indices.find_map(|(i, c)| match c {
412
'"' => Some((i, r#"\""#)),
413
'\n' => Some((i, r#"\n"#)),
414
'\\' => Some((i, r"\\")),
415
_ => None,
416
});
417
418
let Some((at, to_write)) = f else {
419
break;
420
};
421
422
self.0.write_str(&s[..at])?;
423
self.0.write_str(to_write)?;
424
s = &s[at + 1..];
425
}
426
427
self.0.write_str(s)?;
428
429
Ok(())
430
}
431
}
432
433
impl fmt::Display for IRDotDisplay<'_> {
434
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
435
writeln!(f, "digraph polars_query {{")?;
436
writeln!(f, "{INDENT}rankdir=\"BT\"")?;
437
writeln!(f, "{INDENT}node [fontname=\"Monospace\", shape=\"box\"]")?;
438
439
let mut last = 0;
440
let mut visited_caches = PlHashSet::new();
441
self._format(f, None, &mut last, &mut visited_caches)?;
442
443
writeln!(f, "}}")?;
444
445
Ok(())
446
}
447
}
448
449