Path: blob/main/crates/polars-stream/src/physical_plan/fmt.rs
8460 views
use std::fmt::Write;12use polars_plan::dsl::PartitionStrategyIR;3use polars_plan::plans::expr_ir::ExprIR;4use polars_plan::plans::{AExpr, EscapeLabel};5use polars_plan::prelude::FileWriteFormat;6use polars_time::ClosedWindow;7#[cfg(feature = "dynamic_group_by")]8use polars_time::DynamicGroupOptions;9use polars_utils::arena::Arena;10use polars_utils::slice_enum::Slice;11use slotmap::{Key, SecondaryMap, SlotMap};1213use super::{PhysNode, PhysNodeKey, PhysNodeKind};14use crate::physical_plan::ZipBehavior;1516/// A style of a graph node.17pub enum NodeStyle {18InMemoryFallback,19MemoryIntensive,20Generic,21}2223impl NodeStyle {24const COLOR_IN_MEM_FALLBACK: &str = "0.0 0.3 1.0"; // Pastel red25const COLOR_MEM_INTENSIVE: &str = "0.16 0.3 1.0"; // Pastel yellow2627/// Returns a style for a node kind.28pub fn for_node_kind(kind: &PhysNodeKind) -> Self {29use PhysNodeKind as K;30match kind {31K::InMemoryMap { .. } | K::InMemoryJoin { .. } => Self::InMemoryFallback,32K::InMemorySource { .. }33| K::InputIndependentSelect { .. }34| K::NegativeSlice { .. }35| K::InMemorySink { .. }36| K::Sort { .. }37| K::GroupBy { .. }38| K::EquiJoin { .. }39| K::SemiAntiJoin { .. }40| K::Multiplexer { .. } => Self::MemoryIntensive,41#[cfg(feature = "merge_sorted")]42K::MergeSorted { .. } => Self::MemoryIntensive,43_ => Self::Generic,44}45}4647/// Returns extra styling attributes (if any) for the graph node.48pub fn node_attrs(&self) -> Option<String> {49match self {50Self::InMemoryFallback => Some(format!(51"style=filled,fillcolor=\"{}\"",52Self::COLOR_IN_MEM_FALLBACK53)),54Self::MemoryIntensive => Some(format!(55"style=filled,fillcolor=\"{}\"",56Self::COLOR_MEM_INTENSIVE57)),58Self::Generic => None,59}60}6162/// Returns a legend explaining the node style meaning.63pub fn legend() -> String {64format!(65"fontname=\"Helvetica\"\nfontsize=\"10\"\nlabelloc=\"b\"\nlabel=<<BR/><BR/><B>Legend</B><BR/><BR/>◯ streaming engine node <FONT COLOR=\"{}\">⬤</FONT> potentially memory-intensive node <FONT COLOR=\"{}\">⬤</FONT> in-memory engine fallback>",66Self::COLOR_MEM_INTENSIVE,67Self::COLOR_IN_MEM_FALLBACK,68)69}70}7172fn escape_graphviz(s: &str) -> String {73s.replace('\\', "\\\\")74.replace('\n', "\\n")75.replace('"', "\\\"")76}7778fn fmt_expr(f: &mut dyn Write, expr: &ExprIR, expr_arena: &Arena<AExpr>) -> std::fmt::Result {79// Remove the alias to make the display better80let without_alias = ExprIR::from_node(expr.node(), expr_arena);81write!(82f,83"{} = {}",84expr.output_name(),85without_alias.display(expr_arena)86)87}8889pub enum FormatExprStyle {90Select,91NoAliases,92}9394pub fn fmt_exprs_to_label(95exprs: &[ExprIR],96expr_arena: &Arena<AExpr>,97style: FormatExprStyle,98) -> String {99let mut buffer = String::new();100let mut f = EscapeLabel(&mut buffer);101fmt_exprs(&mut f, exprs, expr_arena, style);102buffer103}104105pub fn fmt_exprs(106f: &mut dyn Write,107exprs: &[ExprIR],108expr_arena: &Arena<AExpr>,109style: FormatExprStyle,110) {111if matches!(style, FormatExprStyle::Select) {112let mut formatted = Vec::new();113114let mut max_name_width = 0;115let mut max_expr_width = 0;116117for e in exprs {118let mut name = String::new();119let mut expr = String::new();120121// Remove the alias to make the display better122let without_alias = ExprIR::from_node(e.node(), expr_arena);123124write!(name, "{}", e.output_name()).unwrap();125write!(expr, "{}", without_alias.display(expr_arena)).unwrap();126127max_name_width = max_name_width.max(name.chars().count());128max_expr_width = max_expr_width.max(expr.chars().count());129130formatted.push((name, expr));131}132133for (name, expr) in formatted {134writeln!(f, "{name:>max_name_width$} = {expr:<max_expr_width$}").unwrap();135}136} else {137let Some(e) = exprs.first() else {138return;139};140141fmt_expr(f, e, expr_arena).unwrap();142143for e in &exprs[1..] {144f.write_str("\n").unwrap();145fmt_expr(f, e, expr_arena).unwrap();146}147}148}149150#[recursive::recursive]151fn visualize_plan_rec(152node_key: PhysNodeKey,153phys_sm: &SlotMap<PhysNodeKey, PhysNode>,154expr_arena: &Arena<AExpr>,155visited: &mut SecondaryMap<PhysNodeKey, ()>,156out: &mut Vec<String>,157) {158if visited.contains_key(node_key) {159return;160}161visited.insert(node_key, ());162163let kind = &phys_sm[node_key].kind;164165use std::slice::from_ref;166let (label, inputs) = match kind {167PhysNodeKind::InMemorySource {168df,169disable_morsel_split: _,170} => (171format!(172"in-memory-source\\ncols: {}",173df.get_column_names_owned().join(", ")174),175&[][..],176),177#[cfg(feature = "python")]178PhysNodeKind::PythonScan { .. } => ("python-scan".to_string(), &[][..]),179PhysNodeKind::SinkMultiple { sinks } => {180for sink in sinks {181visualize_plan_rec(*sink, phys_sm, expr_arena, visited, out);182}183return;184},185PhysNodeKind::Select {186input,187selectors,188extend_original,189} => {190let label = if *extend_original {191"with-columns"192} else {193"select"194};195(196format!(197"{label}\\n{}",198fmt_exprs_to_label(selectors, expr_arena, FormatExprStyle::Select)199),200from_ref(input),201)202},203PhysNodeKind::WithRowIndex {204input,205name,206offset,207} => (208format!("with-row-index\\nname: {name}\\noffset: {offset:?}"),209from_ref(input),210),211PhysNodeKind::InputIndependentSelect { selectors } => (212format!(213"input-independent-select\\n{}",214fmt_exprs_to_label(selectors, expr_arena, FormatExprStyle::Select)215),216&[][..],217),218PhysNodeKind::Reduce { input, exprs } => (219format!(220"reduce\\n{}",221fmt_exprs_to_label(exprs, expr_arena, FormatExprStyle::Select)222),223from_ref(input),224),225PhysNodeKind::StreamingSlice {226input,227offset,228length,229} => (230format!("slice\\noffset: {offset}, length: {length}"),231from_ref(input),232),233PhysNodeKind::NegativeSlice {234input,235offset,236length,237} => (238format!("slice\\noffset: {offset}, length: {length}"),239from_ref(input),240),241PhysNodeKind::DynamicSlice {242input,243offset,244length,245} => ("slice".to_owned(), &[*input, *offset, *length][..]),246PhysNodeKind::Shift {247input,248offset,249fill: Some(fill),250} => ("shift".to_owned(), &[*input, *offset, *fill][..]),251PhysNodeKind::Shift {252input,253offset,254fill: None,255} => ("shift".to_owned(), &[*input, *offset][..]),256PhysNodeKind::Filter { input, predicate } => (257format!(258"filter\\n{}",259fmt_exprs_to_label(from_ref(predicate), expr_arena, FormatExprStyle::Select)260),261from_ref(input),262),263PhysNodeKind::SimpleProjection { input, columns } => (264format!("select\\ncols: {}", columns.join(", ")),265from_ref(input),266),267PhysNodeKind::InMemorySink { input } => ("in-memory-sink".to_string(), from_ref(input)),268PhysNodeKind::CallbackSink { input, .. } => ("callback-sink".to_string(), from_ref(input)),269PhysNodeKind::FileSink { input, options } => match options.file_format {270#[cfg(feature = "parquet")]271FileWriteFormat::Parquet(_) => ("parquet-sink".to_string(), from_ref(input)),272#[cfg(feature = "ipc")]273FileWriteFormat::Ipc(_) => ("ipc-sink".to_string(), from_ref(input)),274#[cfg(feature = "csv")]275FileWriteFormat::Csv(_) => ("csv-sink".to_string(), from_ref(input)),276#[cfg(feature = "json")]277FileWriteFormat::NDJson(_) => ("ndjson-sink".to_string(), from_ref(input)),278#[allow(unreachable_patterns)]279_ => todo!(),280},281PhysNodeKind::PartitionedSink { input, options } => {282let variant = match options.partition_strategy {283PartitionStrategyIR::Keyed { .. } => "partition-keyed",284PartitionStrategyIR::FileSize => "partition-file-size",285};286287match options.file_format {288#[cfg(feature = "parquet")]289FileWriteFormat::Parquet(_) => (format!("{variant}[parquet]"), from_ref(input)),290#[cfg(feature = "ipc")]291FileWriteFormat::Ipc(_) => (format!("{variant}[ipc]"), from_ref(input)),292#[cfg(feature = "csv")]293FileWriteFormat::Csv(_) => (format!("{variant}[csv]"), from_ref(input)),294#[cfg(feature = "json")]295FileWriteFormat::NDJson(_) => (format!("{variant}[ndjson]"), from_ref(input)),296#[allow(unreachable_patterns)]297_ => todo!(),298}299},300PhysNodeKind::InMemoryMap {301input,302map: _,303format_str,304} => {305let mut label = String::new();306label.push_str("in-memory-map");307if let Some(format_str) = format_str {308label.write_str("\\n").unwrap();309310let mut f = EscapeLabel(&mut label);311f.write_str(format_str).unwrap();312}313(label, from_ref(input))314},315PhysNodeKind::Map {316input,317map: _,318format_str,319} => {320let mut label = String::new();321label.push_str("map");322if let Some(format_str) = format_str {323label.push_str("\\n");324325let mut f = EscapeLabel(&mut label);326f.write_str(format_str).unwrap();327}328(label, from_ref(input))329},330PhysNodeKind::SortedGroupBy {331input,332key,333aggs,334slice,335} => {336let mut s = String::new();337s.push_str("sorted-group-by\\n");338let f = &mut s;339write!(f, "key: {key}\\n").unwrap();340if let Some((offset, length)) = slice {341write!(f, "slice: {offset}, {length}\\n").unwrap();342}343write!(344f,345"aggs:\\n{}",346fmt_exprs_to_label(aggs, expr_arena, FormatExprStyle::Select)347)348.unwrap();349350(s, from_ref(input))351},352PhysNodeKind::Sort {353input,354by_column,355slice: _,356sort_options: _,357} => (358format!(359"sort\\n{}",360fmt_exprs_to_label(by_column, expr_arena, FormatExprStyle::NoAliases)361),362from_ref(input),363),364PhysNodeKind::TopK {365input,366k,367by_column,368reverse,369nulls_last: _,370dyn_pred: _,371} => {372let name = if reverse.iter().all(|r| *r) {373"bottom-k"374} else {375"top-k"376};377(378format!(379"{name}\\n{}",380fmt_exprs_to_label(by_column, expr_arena, FormatExprStyle::NoAliases)381),382&[*input, *k][..],383)384},385PhysNodeKind::Repeat { value, repeats } => ("repeat".to_owned(), &[*value, *repeats][..]),386#[cfg(feature = "cum_agg")]387PhysNodeKind::CumAgg { input, kind } => {388use crate::nodes::cum_agg::CumAggKind;389390(391format!(392"cum_{}",393match kind {394CumAggKind::Min => "min",395CumAggKind::Max => "max",396CumAggKind::Sum => "sum",397CumAggKind::Count => "count",398CumAggKind::Prod => "prod",399}400),401&[*input][..],402)403},404PhysNodeKind::GatherEvery { input, n, offset } => (405format!("gather_every\\nn: {n}, offset: {offset}"),406&[*input][..],407),408PhysNodeKind::Rle(input) => ("rle".to_owned(), &[*input][..]),409PhysNodeKind::RleId(input) => ("rle_id".to_owned(), &[*input][..]),410PhysNodeKind::PeakMinMax { input, is_peak_max } => (411if *is_peak_max { "peak_max" } else { "peak_min" }.to_owned(),412&[*input][..],413),414PhysNodeKind::OrderedUnion { inputs } => ("ordered-union".to_string(), inputs.as_slice()),415PhysNodeKind::UnorderedUnion { inputs } => {416("unordered-union".to_string(), inputs.as_slice())417},418PhysNodeKind::Zip {419inputs,420zip_behavior,421} => {422let label = match zip_behavior {423ZipBehavior::NullExtend => "zip-null-extend",424ZipBehavior::Broadcast => "zip-broadcast",425ZipBehavior::Strict => "zip-strict",426};427(label.to_string(), inputs.as_slice())428},429PhysNodeKind::Multiplexer { input } => ("multiplexer".to_string(), from_ref(input)),430PhysNodeKind::MultiScan {431scan_sources,432file_reader_builder,433cloud_options: _,434file_projection_builder,435output_schema,436row_index,437pre_slice,438predicate,439predicate_file_skip_applied: _,440hive_parts,441include_file_paths,442cast_columns_policy: _,443missing_columns_policy: _,444forbid_extra_columns: _,445deletion_files,446table_statistics: _,447file_schema: _,448disable_morsel_split: _,449} => {450let mut out = format!("multi-scan[{}]", file_reader_builder.reader_name());451let mut f = EscapeLabel(&mut out);452453write!(f, "\n{} source", scan_sources.len()).unwrap();454455if scan_sources.len() != 1 {456write!(f, "s").unwrap();457}458459write!(460f,461"\nproject: {} total, {} from file",462output_schema.len(),463file_projection_builder.num_projections(),464)465.unwrap();466467if let Some(ri) = row_index {468write!(f, "\nrow index: name: {}, offset: {:?}", ri.name, ri.offset).unwrap();469}470471if let Some(col_name) = include_file_paths {472write!(f, "\nfile path column: {col_name}").unwrap();473}474475if let Some(pre_slice) = pre_slice {476write!(f, "\nslice: offset: ").unwrap();477478match pre_slice {479Slice::Positive { offset, len: _ } => write!(f, "{}", *offset),480Slice::Negative {481offset_from_end,482len: _,483} => write!(f, "-{}", *offset_from_end),484}485.unwrap();486487write!(f, ", len: {}", pre_slice.len()).unwrap()488}489490if let Some(predicate) = predicate {491write!(f, "\nfilter: {}", predicate.display(expr_arena)).unwrap();492}493494if let Some(v) = hive_parts.as_ref().map(|h| h.df().width()) {495write!(f, "\nhive: {v} column").unwrap();496497if v != 1 {498write!(f, "s").unwrap();499}500}501502if let Some(deletion_files) = deletion_files {503write!(f, "\n{deletion_files}").unwrap();504}505506(out, &[][..])507},508PhysNodeKind::GroupBy {509inputs,510key_per_input,511aggs_per_input,512} => {513let mut out = String::from("group-by");514for (key, aggs) in key_per_input.iter().zip(aggs_per_input) {515write!(516&mut out,517"\\nkey:\\n{}\\naggs:\\n{}",518fmt_exprs_to_label(key, expr_arena, FormatExprStyle::Select),519fmt_exprs_to_label(aggs, expr_arena, FormatExprStyle::Select)520)521.ok();522}523(out, inputs.as_slice())524},525#[cfg(feature = "dynamic_group_by")]526PhysNodeKind::DynamicGroupBy {527input,528options,529aggs,530slice,531} => {532use polars_time::prelude::{Label, StartBy};533534let DynamicGroupOptions {535index_column,536every,537period,538offset,539label,540include_boundaries,541closed_window,542start_by,543} = options;544let mut s = String::new();545let f = &mut s;546f.write_str("dynamic-group-by\\n").unwrap();547write!(f, "index column: {index_column}\\n").unwrap();548write!(f, "every: {every}").unwrap();549if every != period {550write!(f, ", period: {period}").unwrap();551}552if !offset.is_zero() {553write!(f, ", offset: {offset}").unwrap();554}555f.write_str("\\n").unwrap();556if *label != Label::Left {557write!(f, "label: {}\\n", <&'static str>::from(label)).unwrap();558}559if *include_boundaries {560write!(f, "include_boundaries: true\\n").unwrap();561}562if *start_by != StartBy::WindowBound {563write!(f, "start_by: {}\\n", <&'static str>::from(start_by)).unwrap();564}565if *closed_window != ClosedWindow::Left {566write!(567f,568"closed_window: {}\\n",569<&'static str>::from(closed_window)570)571.unwrap();572}573if let Some((offset, length)) = slice {574write!(f, "slice: {offset}, {length}\\n").unwrap();575}576write!(577f,578"aggs:\\n{}",579fmt_exprs_to_label(aggs, expr_arena, FormatExprStyle::Select)580)581.unwrap();582583(s, from_ref(input))584},585#[cfg(feature = "dynamic_group_by")]586PhysNodeKind::RollingGroupBy {587input,588index_column,589period,590offset,591closed,592slice,593aggs,594} => {595let mut s = String::new();596let f = &mut s;597f.write_str("rolling-group-by\\n").unwrap();598write!(f, "index column: {index_column}\\n").unwrap();599write!(f, "period: {period}, offset: {offset}\\n").unwrap();600write!(f, "closed: {}\\n", <&'static str>::from(*closed)).unwrap();601if let Some((offset, length)) = slice {602write!(f, "slice: {offset}, {length}\\n").unwrap();603}604write!(605f,606"aggs:\\n{}",607fmt_exprs_to_label(aggs, expr_arena, FormatExprStyle::Select)608)609.unwrap();610611(s, from_ref(input))612},613PhysNodeKind::MergeJoin {614input_left,615input_right,616left_on,617right_on,618args,619..620} => {621let mut label = "merge-join".to_string();622let how: &'static str = (&args.how).into();623write!(624label,625r"\nleft_on:\n{}",626left_on627.iter()628.map(|s| escape_graphviz(&s[..]))629.collect::<Vec<_>>()630.join("\n"),631)632.unwrap();633write!(634label,635r"\nright_on:\n{}",636right_on637.iter()638.map(|s| escape_graphviz(&s[..]))639.collect::<Vec<_>>()640.join("\n"),641)642.unwrap();643write!(label, r"\nhow: {}", escape_graphviz(how)).unwrap();644if args.nulls_equal {645write!(label, r"\njoin-nulls").unwrap();646}647(label, &[*input_left, *input_right][..])648},649PhysNodeKind::InMemoryJoin {650input_left,651input_right,652left_on,653right_on,654args,655..656}657| PhysNodeKind::EquiJoin {658input_left,659input_right,660left_on,661right_on,662args,663}664| PhysNodeKind::SemiAntiJoin {665input_left,666input_right,667left_on,668right_on,669args,670output_bool: _,671} => {672let label = match phys_sm[node_key].kind {673PhysNodeKind::MergeJoin { .. } => "merge-join",674PhysNodeKind::EquiJoin { .. } => "equi-join",675PhysNodeKind::InMemoryJoin { .. } => "in-memory-join",676PhysNodeKind::CrossJoin { .. } => "cross-join",677PhysNodeKind::SemiAntiJoin {678output_bool: false, ..679} if args.how.is_semi() => "semi-join",680PhysNodeKind::SemiAntiJoin {681output_bool: false, ..682} if args.how.is_anti() => "anti-join",683PhysNodeKind::SemiAntiJoin {684output_bool: true, ..685} if args.how.is_semi() => "is-in",686PhysNodeKind::SemiAntiJoin {687output_bool: true, ..688} if args.how.is_anti() => "is-not-in",689_ => unreachable!(),690};691let mut label = label.to_string();692write!(693label,694r"\nleft_on:\n{}",695fmt_exprs_to_label(left_on, expr_arena, FormatExprStyle::NoAliases)696)697.unwrap();698write!(699label,700r"\nright_on:\n{}",701fmt_exprs_to_label(right_on, expr_arena, FormatExprStyle::NoAliases)702)703.unwrap();704if args.how.is_equi() {705write!(706label,707r"\nhow: {}",708escape_graphviz(&format!("{:?}", args.how))709)710.unwrap();711}712if args.nulls_equal {713write!(label, r"\njoin-nulls").unwrap();714}715(label, &[*input_left, *input_right][..])716},717PhysNodeKind::CrossJoin {718input_left,719input_right,720args: _,721} => ("cross-join".to_string(), &[*input_left, *input_right][..]),722PhysNodeKind::AsOfJoin {723input_left,724input_right,725..726} => ("asof_join".to_string(), &[*input_left, *input_right][..]),727#[cfg(feature = "merge_sorted")]728PhysNodeKind::MergeSorted {729input_left,730input_right,731} => ("merge-sorted".to_string(), &[*input_left, *input_right][..]),732#[cfg(feature = "ewma")]733PhysNodeKind::EwmMean { input, options: _ } => ("ewm-mean".to_string(), &[*input][..]),734#[cfg(feature = "ewma")]735PhysNodeKind::EwmVar { input, options: _ } => ("ewm-var".to_string(), &[*input][..]),736#[cfg(feature = "ewma")]737PhysNodeKind::EwmStd { input, options: _ } => ("ewm-std".to_string(), &[*input][..]),738};739740let node_id = node_key.data().as_ffi();741let style = NodeStyle::for_node_kind(kind);742743if let Some(attrs) = style.node_attrs() {744out.push(format!("{node_id} [label=\"{label}\",{attrs}];"));745} else {746out.push(format!("{node_id} [label=\"{label}\"];"));747}748for input in inputs {749visualize_plan_rec(input.node, phys_sm, expr_arena, visited, out);750out.push(format!(751"{} -> {};",752input.node.data().as_ffi(),753node_key.data().as_ffi()754));755}756}757758pub fn visualize_plan(759root: PhysNodeKey,760phys_sm: &SlotMap<PhysNodeKey, PhysNode>,761expr_arena: &Arena<AExpr>,762) -> String {763let mut visited: SecondaryMap<PhysNodeKey, ()> = SecondaryMap::new();764let mut out = Vec::with_capacity(phys_sm.len() + 3);765out.push("digraph polars {\nrankdir=\"BT\"\nnode [fontname=\"Monospace\"]".to_string());766out.push(NodeStyle::legend());767visualize_plan_rec(root, phys_sm, expr_arena, &mut visited, &mut out);768out.push("}".to_string());769out.join("\n")770}771772773