Path: blob/main/crates/polars-stream/src/physical_plan/fmt.rs
6939 views
use std::fmt::Write;12use polars_plan::dsl::PartitionVariantIR;3use polars_plan::plans::expr_ir::ExprIR;4use polars_plan::plans::{AExpr, EscapeLabel};5use polars_plan::prelude::FileType;6use polars_utils::arena::Arena;7use polars_utils::slice_enum::Slice;8use slotmap::{Key, SecondaryMap, SlotMap};910use super::{PhysNode, PhysNodeKey, PhysNodeKind};1112/// A style of a graph node.13enum NodeStyle {14InMemoryFallback,15MemoryIntensive,16Generic,17}1819impl NodeStyle {20const COLOR_IN_MEM_FALLBACK: &str = "0.0 0.3 1.0"; // Pastel red21const COLOR_MEM_INTENSIVE: &str = "0.16 0.3 1.0"; // Pastel yellow2223/// Returns a style for a node kind.24pub fn for_node_kind(kind: &PhysNodeKind) -> Self {25use PhysNodeKind as K;26match kind {27K::InMemoryMap { .. } | K::InMemoryJoin { .. } => Self::InMemoryFallback,28K::InMemorySource { .. }29| K::InputIndependentSelect { .. }30| K::NegativeSlice { .. }31| K::InMemorySink { .. }32| K::Sort { .. }33| K::GroupBy { .. }34| K::EquiJoin { .. }35| K::SemiAntiJoin { .. }36| K::Multiplexer { .. } => Self::MemoryIntensive,37#[cfg(feature = "merge_sorted")]38K::MergeSorted { .. } => Self::MemoryIntensive,39_ => Self::Generic,40}41}4243/// Returns extra styling attributes (if any) for the graph node.44pub fn node_attrs(&self) -> Option<String> {45match self {46Self::InMemoryFallback => Some(format!(47"style=filled,fillcolor=\"{}\"",48Self::COLOR_IN_MEM_FALLBACK49)),50Self::MemoryIntensive => Some(format!(51"style=filled,fillcolor=\"{}\"",52Self::COLOR_MEM_INTENSIVE53)),54Self::Generic => None,55}56}5758/// Returns a legend explaining the node style meaning.59pub fn legend() -> String {60format!(61"fontname=\"Helvetica\"\nfontsize=\"10\"\nlabelloc=\"b\"\nlabel=<<BR/><BR/><B>Legend</B><BR/><BR/>◯ streaming engine node <FONT COLOR=\"{}\">⬤</FONT> potentially memory-intensive node <FONT COLOR=\"{}\">⬤</FONT> in-memory engine fallback>",62Self::COLOR_MEM_INTENSIVE,63Self::COLOR_IN_MEM_FALLBACK,64)65}66}6768fn escape_graphviz(s: &str) -> String {69s.replace('\\', "\\\\")70.replace('\n', "\\n")71.replace('"', "\\\"")72}7374fn fmt_expr(f: &mut dyn Write, expr: &ExprIR, expr_arena: &Arena<AExpr>) -> std::fmt::Result {75// Remove the alias to make the display better76let without_alias = ExprIR::from_node(expr.node(), expr_arena);77write!(78f,79"{} = {}",80expr.output_name(),81without_alias.display(expr_arena)82)83}8485pub enum FormatExprStyle {86Select,87NoAliases,88}8990pub fn fmt_exprs_to_label(91exprs: &[ExprIR],92expr_arena: &Arena<AExpr>,93style: FormatExprStyle,94) -> String {95let mut buffer = String::new();96let mut f = EscapeLabel(&mut buffer);97fmt_exprs(&mut f, exprs, expr_arena, style);98buffer99}100101pub fn fmt_exprs(102f: &mut dyn Write,103exprs: &[ExprIR],104expr_arena: &Arena<AExpr>,105style: FormatExprStyle,106) {107if matches!(style, FormatExprStyle::Select) {108let mut formatted = Vec::new();109110let mut max_name_width = 0;111let mut max_expr_width = 0;112113for e in exprs {114let mut name = String::new();115let mut expr = String::new();116117// Remove the alias to make the display better118let without_alias = ExprIR::from_node(e.node(), expr_arena);119120write!(name, "{}", e.output_name()).unwrap();121write!(expr, "{}", without_alias.display(expr_arena)).unwrap();122123max_name_width = max_name_width.max(name.chars().count());124max_expr_width = max_expr_width.max(expr.chars().count());125126formatted.push((name, expr));127}128129for (name, expr) in formatted {130write!(f, "{name:>max_name_width$} = {expr:<max_expr_width$}\\n").unwrap();131}132} else {133let Some(e) = exprs.first() else {134return;135};136137fmt_expr(f, e, expr_arena).unwrap();138139for e in &exprs[1..] {140f.write_str("\\n").unwrap();141fmt_expr(f, e, expr_arena).unwrap();142}143}144}145146#[recursive::recursive]147fn visualize_plan_rec(148node_key: PhysNodeKey,149phys_sm: &SlotMap<PhysNodeKey, PhysNode>,150expr_arena: &Arena<AExpr>,151visited: &mut SecondaryMap<PhysNodeKey, ()>,152out: &mut Vec<String>,153) {154if visited.contains_key(node_key) {155return;156}157visited.insert(node_key, ());158159let kind = &phys_sm[node_key].kind;160161use std::slice::from_ref;162let (label, inputs) = match kind {163PhysNodeKind::InMemorySource { df } => (164format!(165"in-memory-source\\ncols: {}",166df.get_column_names_owned().join(", ")167),168&[][..],169),170#[cfg(feature = "python")]171PhysNodeKind::PythonScan { .. } => ("python-scan".to_string(), &[][..]),172PhysNodeKind::SinkMultiple { sinks } => {173for sink in sinks {174visualize_plan_rec(*sink, phys_sm, expr_arena, visited, out);175}176return;177},178PhysNodeKind::Select {179input,180selectors,181extend_original,182} => {183let label = if *extend_original {184"with-columns"185} else {186"select"187};188(189format!(190"{label}\\n{}",191fmt_exprs_to_label(selectors, expr_arena, FormatExprStyle::Select)192),193from_ref(input),194)195},196PhysNodeKind::WithRowIndex {197input,198name,199offset,200} => (201format!("with-row-index\\nname: {name}\\noffset: {offset:?}"),202from_ref(input),203),204PhysNodeKind::InputIndependentSelect { selectors } => (205format!(206"input-independent-select\\n{}",207fmt_exprs_to_label(selectors, expr_arena, FormatExprStyle::Select)208),209&[][..],210),211PhysNodeKind::Reduce { input, exprs } => (212format!(213"reduce\\n{}",214fmt_exprs_to_label(exprs, expr_arena, FormatExprStyle::Select)215),216from_ref(input),217),218PhysNodeKind::StreamingSlice {219input,220offset,221length,222} => (223format!("slice\\noffset: {offset}, length: {length}"),224from_ref(input),225),226PhysNodeKind::NegativeSlice {227input,228offset,229length,230} => (231format!("slice\\noffset: {offset}, length: {length}"),232from_ref(input),233),234PhysNodeKind::DynamicSlice {235input,236offset,237length,238} => ("slice".to_owned(), &[*input, *offset, *length][..]),239PhysNodeKind::Shift {240input,241offset,242fill: Some(fill),243} => ("shift".to_owned(), &[*input, *offset, *fill][..]),244PhysNodeKind::Shift {245input,246offset,247fill: None,248} => ("shift".to_owned(), &[*input, *offset][..]),249PhysNodeKind::Filter { input, predicate } => (250format!(251"filter\\n{}",252fmt_exprs_to_label(from_ref(predicate), expr_arena, FormatExprStyle::Select)253),254from_ref(input),255),256PhysNodeKind::SimpleProjection { input, columns } => (257format!("select\\ncols: {}", columns.join(", ")),258from_ref(input),259),260PhysNodeKind::InMemorySink { input } => ("in-memory-sink".to_string(), from_ref(input)),261PhysNodeKind::FileSink {262input, file_type, ..263} => match file_type {264#[cfg(feature = "parquet")]265FileType::Parquet(_) => ("parquet-sink".to_string(), from_ref(input)),266#[cfg(feature = "ipc")]267FileType::Ipc(_) => ("ipc-sink".to_string(), from_ref(input)),268#[cfg(feature = "csv")]269FileType::Csv(_) => ("csv-sink".to_string(), from_ref(input)),270#[cfg(feature = "json")]271FileType::Json(_) => ("ndjson-sink".to_string(), from_ref(input)),272#[allow(unreachable_patterns)]273_ => todo!(),274},275PhysNodeKind::PartitionSink {276input,277file_type,278variant,279..280} => {281let variant = match variant {282PartitionVariantIR::ByKey { .. } => "partition-by-key-sink",283PartitionVariantIR::MaxSize { .. } => "partition-max-size-sink",284PartitionVariantIR::Parted { .. } => "partition-parted-sink",285};286287match file_type {288#[cfg(feature = "parquet")]289FileType::Parquet(_) => (format!("{variant}[parquet]"), from_ref(input)),290#[cfg(feature = "ipc")]291FileType::Ipc(_) => (format!("{variant}[ipc]"), from_ref(input)),292#[cfg(feature = "csv")]293FileType::Csv(_) => (format!("{variant}[csv]"), from_ref(input)),294#[cfg(feature = "json")]295FileType::Json(_) => (format!("{variant}[ndjson]"), from_ref(input)),296#[allow(unreachable_patterns)]297_ => todo!(),298}299},300PhysNodeKind::InMemoryMap {301input,302map: _,303format_str,304} => {305let mut label = String::new();306label.push_str("in-memory-map");307if let Some(format_str) = format_str {308label.push('\n');309310let mut f = EscapeLabel(&mut label);311write!(f, "{format_str}").unwrap();312}313(label, from_ref(input))314},315PhysNodeKind::Map { input, map: _ } => ("map".to_string(), from_ref(input)),316PhysNodeKind::Sort {317input,318by_column,319slice: _,320sort_options: _,321} => (322format!(323"sort\\n{}",324fmt_exprs_to_label(by_column, expr_arena, FormatExprStyle::NoAliases)325),326from_ref(input),327),328PhysNodeKind::TopK {329input,330k,331by_column,332reverse,333nulls_last: _,334} => {335let name = if reverse.iter().all(|r| *r) {336"bottom-k"337} else {338"top-k"339};340(341format!(342"{name}\\n{}",343fmt_exprs_to_label(by_column, expr_arena, FormatExprStyle::NoAliases)344),345&[*input, *k][..],346)347},348PhysNodeKind::Repeat { value, repeats } => ("repeat".to_owned(), &[*value, *repeats][..]),349#[cfg(feature = "cum_agg")]350PhysNodeKind::CumAgg { input, kind } => {351use crate::nodes::cum_agg::CumAggKind;352353(354format!(355"cum_{}",356match kind {357CumAggKind::Min => "min",358CumAggKind::Max => "max",359CumAggKind::Sum => "sum",360CumAggKind::Count => "count",361CumAggKind::Prod => "prod",362}363),364&[*input][..],365)366},367PhysNodeKind::Rle(input) => ("rle".to_owned(), &[*input][..]),368PhysNodeKind::RleId(input) => ("rle_id".to_owned(), &[*input][..]),369PhysNodeKind::PeakMinMax { input, is_peak_max } => (370if *is_peak_max { "peak_max" } else { "peak_min" }.to_owned(),371&[*input][..],372),373PhysNodeKind::OrderedUnion { inputs } => ("ordered-union".to_string(), inputs.as_slice()),374PhysNodeKind::Zip {375inputs,376null_extend,377} => {378let label = if *null_extend {379"zip-null-extend"380} else {381"zip"382};383(label.to_string(), inputs.as_slice())384},385PhysNodeKind::Multiplexer { input } => ("multiplexer".to_string(), from_ref(input)),386PhysNodeKind::MultiScan {387scan_sources,388file_reader_builder,389cloud_options: _,390file_projection_builder,391output_schema,392row_index,393pre_slice,394predicate,395hive_parts,396include_file_paths,397cast_columns_policy: _,398missing_columns_policy: _,399forbid_extra_columns: _,400deletion_files,401file_schema: _,402} => {403let mut out = format!("multi-scan[{}]", file_reader_builder.reader_name());404let mut f = EscapeLabel(&mut out);405406write!(f, "\n{} source", scan_sources.len()).unwrap();407408if scan_sources.len() != 1 {409write!(f, "s").unwrap();410}411412write!(413f,414"\nproject: {} total, {} from file",415output_schema.len(),416file_projection_builder.num_projections(),417)418.unwrap();419420if let Some(ri) = row_index {421write!(f, "\nrow index: name: {}, offset: {:?}", ri.name, ri.offset).unwrap();422}423424if let Some(col_name) = include_file_paths {425write!(f, "\nfile path column: {col_name}").unwrap();426}427428if let Some(pre_slice) = pre_slice {429write!(f, "\nslice: offset: ").unwrap();430431match pre_slice {432Slice::Positive { offset, len: _ } => write!(f, "{}", *offset),433Slice::Negative {434offset_from_end,435len: _,436} => write!(f, "-{}", *offset_from_end),437}438.unwrap();439440write!(f, ", len: {}", pre_slice.len()).unwrap()441}442443if let Some(predicate) = predicate {444write!(f, "\nfilter: {}", predicate.display(expr_arena)).unwrap();445}446447if let Some(v) = hive_parts.as_ref().map(|h| h.df().width()) {448write!(f, "\nhive: {v} column").unwrap();449450if v != 1 {451write!(f, "s").unwrap();452}453}454455if let Some(deletion_files) = deletion_files {456write!(f, "\n{deletion_files}").unwrap();457}458459(out, &[][..])460},461PhysNodeKind::GroupBy { input, key, aggs } => (462format!(463"group-by\\nkey:\\n{}\\naggs:\\n{}",464fmt_exprs_to_label(key, expr_arena, FormatExprStyle::Select),465fmt_exprs_to_label(aggs, expr_arena, FormatExprStyle::Select)466),467from_ref(input),468),469PhysNodeKind::InMemoryJoin {470input_left,471input_right,472left_on,473right_on,474args,475..476}477| PhysNodeKind::EquiJoin {478input_left,479input_right,480left_on,481right_on,482args,483}484| PhysNodeKind::SemiAntiJoin {485input_left,486input_right,487left_on,488right_on,489args,490output_bool: _,491} => {492let label = match phys_sm[node_key].kind {493PhysNodeKind::EquiJoin { .. } => "equi-join",494PhysNodeKind::InMemoryJoin { .. } => "in-memory-join",495PhysNodeKind::CrossJoin { .. } => "cross-join",496PhysNodeKind::SemiAntiJoin {497output_bool: false, ..498} if args.how.is_semi() => "semi-join",499PhysNodeKind::SemiAntiJoin {500output_bool: false, ..501} if args.how.is_anti() => "anti-join",502PhysNodeKind::SemiAntiJoin {503output_bool: true, ..504} if args.how.is_semi() => "is-in",505PhysNodeKind::SemiAntiJoin {506output_bool: true, ..507} if args.how.is_anti() => "is-not-in",508_ => unreachable!(),509};510let mut label = label.to_string();511write!(512label,513r"\nleft_on:\n{}",514fmt_exprs_to_label(left_on, expr_arena, FormatExprStyle::NoAliases)515)516.unwrap();517write!(518label,519r"\nright_on:\n{}",520fmt_exprs_to_label(right_on, expr_arena, FormatExprStyle::NoAliases)521)522.unwrap();523if args.how.is_equi() {524write!(525label,526r"\nhow: {}",527escape_graphviz(&format!("{:?}", args.how))528)529.unwrap();530}531if args.nulls_equal {532write!(label, r"\njoin-nulls").unwrap();533}534(label, &[*input_left, *input_right][..])535},536PhysNodeKind::CrossJoin {537input_left,538input_right,539args: _,540} => ("cross-join".to_string(), &[*input_left, *input_right][..]),541#[cfg(feature = "merge_sorted")]542PhysNodeKind::MergeSorted {543input_left,544input_right,545} => ("merge-sorted".to_string(), &[*input_left, *input_right][..]),546};547548let node_id = node_key.data().as_ffi();549let style = NodeStyle::for_node_kind(kind);550551if let Some(attrs) = style.node_attrs() {552out.push(format!("{node_id} [label=\"{label}\",{attrs}];"));553} else {554out.push(format!("{node_id} [label=\"{label}\"];"));555}556for input in inputs {557visualize_plan_rec(input.node, phys_sm, expr_arena, visited, out);558out.push(format!(559"{} -> {};",560input.node.data().as_ffi(),561node_key.data().as_ffi()562));563}564}565566pub fn visualize_plan(567root: PhysNodeKey,568phys_sm: &SlotMap<PhysNodeKey, PhysNode>,569expr_arena: &Arena<AExpr>,570) -> String {571let mut visited: SecondaryMap<PhysNodeKey, ()> = SecondaryMap::new();572let mut out = Vec::with_capacity(phys_sm.len() + 3);573out.push("digraph polars {\nrankdir=\"BT\"\nnode [fontname=\"Monospace\"]".to_string());574out.push(NodeStyle::legend());575visualize_plan_rec(root, phys_sm, expr_arena, &mut visited, &mut out);576out.push("}".to_string());577out.join("\n")578}579580581