Path: blob/main/crates/polars-plan/src/plans/ir/visualization/mod.rs
7889 views
use std::collections::VecDeque;12use polars_core::prelude::{PlHashMap, SortMultipleOptions};3use polars_ops::frame::{JoinArgs, JoinType};4use polars_utils::arena::{Arena, Node};5use polars_utils::format_pl_smallstr;6use polars_utils::pl_str::PlSmallStr;7use polars_utils::unique_id::UniqueId;89use crate::dsl::{10GroupbyOptions, HConcatOptions, JoinOptionsIR, JoinTypeOptionsIR, UnifiedScanArgs, UnionOptions,11};12use crate::plans::visualization::models::{Edge, IRNodeProperties};13use crate::plans::{AExpr, ExprIR, FileInfo, IR};14use crate::prelude::{DistinctOptionsIR, ProjectionOptions};1516pub mod models;17use models::{IRNodeInfo, IRVisualizationData};1819pub fn generate_visualization_data<'a>(20title: PlSmallStr,21roots: &[Node],22ir_arena: &'a Arena<IR>,23expr_arena: &'a Arena<AExpr>,24) -> IRVisualizationData {25let (nodes_list, edges) = IRVisualizationDataGenerator {26ir_arena,27expr_arena,28queue: VecDeque::from_iter(roots.iter().copied()),29nodes_list: vec![],30edges: vec![],31cache_node_to_position: Default::default(),32}33.generate();3435IRVisualizationData {36title,37num_roots: roots.len().try_into().unwrap(),38nodes: nodes_list,39edges,40}41}4243struct IRVisualizationDataGenerator<'a> {44ir_arena: &'a Arena<IR>,45expr_arena: &'a Arena<AExpr>,46queue: VecDeque<Node>,47nodes_list: Vec<IRNodeInfo>,48edges: Vec<Edge>,49/// During traversal we will encounter the same cache ID multiple times, but we only want50/// to push a single entry per cache ID.51cache_node_to_position: PlHashMap<UniqueId, usize>,52}5354impl IRVisualizationDataGenerator<'_> {55fn generate(mut self) -> (Vec<IRNodeInfo>, Vec<Edge>) {56// Use a queue to traverse in insertion order - the ID assignment relies on this.57while let Some(node) = self.queue.pop_front() {58let ir = self.ir_arena.get(node);59let mut ir_node_info = self.get_ir_node_info(ir);60let current_node_position: u64 = self.nodes_list.len().try_into().unwrap();61ir_node_info.id = current_node_position;6263for input_node in ir.inputs() {64// +1 is for the current `ir_node_info` that we haven't inserted yet.65let input_node_position = 1 + self.nodes_list.len() + self.queue.len();6667if let IR::Cache { id, input: _ } = self.ir_arena.get(input_node) {68if let Some(cache_node_position) = self.cache_node_to_position.get(id) {69self.edges70.push(Edge::new(current_node_position, *cache_node_position));71continue;72}7374self.cache_node_to_position.insert(*id, input_node_position);75}7677self.queue.push_back(input_node);78self.edges79.push(Edge::new(current_node_position, input_node_position));80}8182self.nodes_list.push(ir_node_info);83}8485assert!(self.queue.is_empty());86(self.nodes_list, self.edges)87}8889fn get_ir_node_info(&self, ir: &IR) -> IRNodeInfo {90match ir {91IR::Cache { input: _, id } => {92let properties = IRNodeProperties::Cache { id: *id };9394IRNodeInfo {95title: properties.variant_name(),96properties,97..Default::default()98}99},100IR::DataFrameScan {101df,102schema,103output_schema: _,104} => {105let properties = IRNodeProperties::DataFrameScan {106n_rows: df.height().try_into().unwrap(),107schema_names: schema.iter_names_cloned().collect(),108};109110IRNodeInfo {111title: properties.variant_name(),112properties,113..Default::default()114}115},116IR::Distinct {117input: _,118options:119DistinctOptionsIR {120subset,121maintain_order,122keep_strategy,123slice,124},125} => {126let properties = IRNodeProperties::Distinct {127subset: subset.as_deref().map(|x| x.to_vec()),128maintain_order: *maintain_order,129keep_strategy: *keep_strategy,130slice: convert_opt_slice(slice),131};132133IRNodeInfo {134title: properties.variant_name(),135properties,136..Default::default()137}138},139IR::ExtContext {140input: _,141contexts,142schema,143} => {144let properties = IRNodeProperties::ExtContext {145num_contexts: contexts.len().try_into().unwrap(),146schema_names: schema.iter_names_cloned().collect(),147};148149IRNodeInfo {150title: properties.variant_name(),151properties,152..Default::default()153}154},155IR::Filter {156input: _,157predicate,158} => {159let properties = IRNodeProperties::Filter {160predicate: format_pl_smallstr!("{}", predicate.display(self.expr_arena)),161};162163IRNodeInfo {164title: properties.variant_name(),165properties,166..Default::default()167}168},169IR::GroupBy {170input: _,171keys,172aggs,173schema: _,174maintain_order,175options,176apply,177} => {178let GroupbyOptions {179#[cfg(feature = "dynamic_group_by")]180dynamic,181#[cfg(feature = "dynamic_group_by")]182rolling,183slice,184} = options.as_ref();185186let keys = expr_list(keys, self.expr_arena);187let aggs = expr_list(aggs, self.expr_arena);188let maintain_order = *maintain_order;189let plan_callback = apply.as_ref().map(|x| format_pl_smallstr!("{:?}", x));190191let properties = match () {192#[cfg(feature = "dynamic_group_by")]193_ if dynamic.is_some() => {194let Some(polars_time::DynamicGroupOptions {195index_column,196every,197period,198offset,199label,200include_boundaries,201closed_window,202start_by,203}) = dynamic204else {205unreachable!()206};207208IRNodeProperties::DynamicGroupBy {209index_column: index_column.clone(),210every: format_pl_smallstr!("{}", every),211period: format_pl_smallstr!("{}", period),212offset: format_pl_smallstr!("{}", offset),213label: *label,214include_boundaries: *include_boundaries,215closed_window: *closed_window,216group_by: keys,217start_by: *start_by,218plan_callback,219}220},221#[cfg(feature = "dynamic_group_by")]222_ if rolling.is_some() => {223let Some(polars_time::RollingGroupOptions {224index_column,225period,226offset,227closed_window,228}) = rolling229else {230unreachable!()231};232233IRNodeProperties::RollingGroupBy {234keys,235aggs,236index_column: index_column.clone(),237period: format_pl_smallstr!("{}", period),238offset: format_pl_smallstr!("{}", offset),239closed_window: *closed_window,240slice: convert_opt_slice(slice),241plan_callback,242}243},244_ => IRNodeProperties::GroupBy {245keys,246aggs,247maintain_order,248slice: convert_opt_slice(slice),249plan_callback,250},251};252253IRNodeInfo {254title: properties.variant_name(),255properties,256..Default::default()257}258},259IR::HConcat {260inputs,261schema,262options: HConcatOptions { parallel, strict },263} => {264let properties = IRNodeProperties::HConcat {265num_inputs: inputs.len().try_into().unwrap(),266schema_names: schema.iter_names_cloned().collect(),267parallel: *parallel,268strict: *strict,269};270271IRNodeInfo {272title: properties.variant_name(),273properties,274..Default::default()275}276},277IR::HStack {278input: _,279exprs,280schema: _,281options:282ProjectionOptions {283run_parallel,284duplicate_check,285should_broadcast,286},287} => {288let properties = IRNodeProperties::HStack {289exprs: expr_list(exprs, self.expr_arena),290run_parallel: *run_parallel,291duplicate_check: *duplicate_check,292should_broadcast: *should_broadcast,293};294IRNodeInfo {295title: properties.variant_name(),296properties,297..Default::default()298}299},300IR::Invalid => {301let properties = IRNodeProperties::Invalid;302303IRNodeInfo {304title: properties.variant_name(),305properties,306..Default::default()307}308},309IR::Join {310input_left: _,311input_right: _,312schema: _,313left_on,314right_on,315options,316} => {317let JoinOptionsIR {318allow_parallel,319force_parallel,320args:321JoinArgs {322how,323validation,324suffix,325slice,326nulls_equal,327coalesce,328maintain_order,329},330options,331} = options.as_ref();332333let properties = match how {334#[cfg(feature = "asof_join")]335JoinType::AsOf(asof_options) => {336use polars_ops::frame::AsOfOptions;337338#[expect(unused_variables)]339let AsOfOptions {340strategy,341tolerance,342tolerance_str,343left_by,344right_by,345allow_eq,346check_sortedness,347} = asof_options.as_ref();348349assert_eq!(left_on.len(), 1);350assert_eq!(right_on.len(), 1);351352IRNodeProperties::AsOfJoin {353left_on: format_pl_smallstr!("{}", left_on[0].display(self.expr_arena)),354right_on: format_pl_smallstr!(355"{}",356right_on[0].display(self.expr_arena)357),358left_by: left_by.clone(),359right_by: right_by.clone(),360strategy: *strategy,361tolerance: tolerance.as_ref().map(|scalar| {362[363format_pl_smallstr!("{}", scalar.value()),364format_pl_smallstr!("{:?}", scalar.dtype()),365]366}),367suffix: suffix.clone(),368slice: convert_opt_slice(slice),369coalesce: *coalesce,370allow_eq: *allow_eq,371check_sortedness: *check_sortedness,372}373},374#[cfg(feature = "iejoin")]375JoinType::IEJoin => {376use polars_ops::frame::IEJoinOptions;377378let Some(JoinTypeOptionsIR::IEJoin(IEJoinOptions {379operator1,380operator2,381})) = options382else {383unreachable!()384};385386IRNodeProperties::IEJoin {387left_on: expr_list(left_on, self.expr_arena),388right_on: expr_list(right_on, self.expr_arena),389inequality_operators: if let Some(operator2) = operator2 {390vec![*operator1, *operator2]391} else {392vec![*operator1]393},394suffix: suffix.clone(),395slice: convert_opt_slice(slice),396}397},398JoinType::Cross => {399let predicate: Option<PlSmallStr> = options.as_ref().map(|x| {400let JoinTypeOptionsIR::CrossAndFilter { predicate } = x else {401panic!("{x:?}")402};403404format_pl_smallstr!("{}", predicate.display(self.expr_arena))405});406407IRNodeProperties::CrossJoin {408maintain_order: *maintain_order,409slice: convert_opt_slice(slice),410predicate,411suffix: suffix.clone(),412}413},414_ => IRNodeProperties::Join {415how: format_pl_smallstr!("{}", how),416left_on: expr_list(left_on, self.expr_arena),417right_on: expr_list(right_on, self.expr_arena),418nulls_equal: *nulls_equal,419coalesce: *coalesce,420maintain_order: *maintain_order,421validation: *validation,422suffix: suffix.clone(),423slice: convert_opt_slice(slice),424allow_parallel: *allow_parallel,425force_parallel: *force_parallel,426},427};428429IRNodeInfo {430title: properties.variant_name(),431properties,432..Default::default()433}434},435IR::MapFunction { input: _, function } => {436let properties = IRNodeProperties::MapFunction {437function: format_pl_smallstr!("{}", function),438};439440IRNodeInfo {441title: properties.variant_name(),442properties,443..Default::default()444}445},446IR::Scan {447sources,448file_info:449file_info @ FileInfo {450schema: _,451reader_schema: _,452row_estimation: _,453},454predicate,455predicate_file_skip_applied,456scan_type,457unified_scan_args,458hive_parts,459output_schema: _,460} => {461let UnifiedScanArgs {462schema: _,463cloud_options: _,464hive_options: _,465rechunk,466cache: _,467glob: _,468hidden_file_prefix: _,469projection,470column_mapping,471default_values,472row_index,473pre_slice,474cast_columns_policy: _,475missing_columns_policy: _,476extra_columns_policy: _,477include_file_paths,478deletion_files,479table_statistics,480row_count: _,481} = unified_scan_args.as_ref();482483let file_columns: Option<Vec<PlSmallStr>> =484file_info.iter_reader_schema_names().map(|iter| {485iter.filter(|&name| {486!(row_index.as_ref().is_some_and(|ri| name == &ri.name)487|| include_file_paths.as_ref().is_some_and(|x| name == x))488})489.cloned()490.collect()491});492493let pre_slice = pre_slice494.clone()495.map(|x| <(i64, usize)>::try_from(x).unwrap());496497let properties = IRNodeProperties::Scan {498scan_type: PlSmallStr::from_static(scan_type.as_ref().into()),499num_sources: sources.len().try_into().unwrap(),500first_source: sources.first().map(|x| x.to_include_path_name().into()),501file_columns,502projection: projection.as_deref().map(list_str_cloned),503row_index_name: row_index.as_ref().map(|ri| ri.name.clone()),504row_index_offset: row_index.as_ref().map(|ri| {505#[cfg_attr(feature = "bigidx", expect(clippy::useless_conversion))]506ri.offset.into()507}),508pre_slice: convert_opt_slice(&pre_slice),509predicate: predicate510.as_ref()511.map(|e| format_pl_smallstr!("{}", e.display(self.expr_arena))),512predicate_file_skip_applied: *predicate_file_skip_applied,513has_table_statistics: table_statistics.is_some(),514include_file_paths: include_file_paths.clone(),515column_mapping_type: column_mapping516.as_ref()517.map(|x| PlSmallStr::from_static(x.into())),518default_values_type: default_values519.as_ref()520.map(|x| PlSmallStr::from_static(x.into())),521deletion_files_type: deletion_files522.as_ref()523.map(|x| PlSmallStr::from_static(x.into())),524rechunk: *rechunk,525hive_columns: hive_parts526.as_ref()527.map(|x| x.df().schema().iter_names_cloned().collect()),528};529530IRNodeInfo {531title: properties.variant_name(),532properties,533..Default::default()534}535},536IR::Select {537input: _,538expr,539schema: _,540options:541ProjectionOptions {542run_parallel,543duplicate_check,544should_broadcast,545},546} => {547let properties = IRNodeProperties::Select {548exprs: expr_list(expr, self.expr_arena),549run_parallel: *run_parallel,550duplicate_check: *duplicate_check,551should_broadcast: *should_broadcast,552};553554IRNodeInfo {555title: properties.variant_name(),556properties,557..Default::default()558}559},560IR::SimpleProjection { input: _, columns } => {561let properties = IRNodeProperties::SimpleProjection {562columns: columns.iter_names_cloned().collect(),563};564565IRNodeInfo {566title: properties.variant_name(),567properties,568..Default::default()569}570},571IR::Sink { input: _, payload } => {572let properties = IRNodeProperties::Sink {573payload: format_pl_smallstr!("{:?}", payload),574};575576IRNodeInfo {577title: properties.variant_name(),578properties,579..Default::default()580}581},582IR::SinkMultiple { inputs } => {583let properties = IRNodeProperties::SinkMultiple {584num_inputs: inputs.len().try_into().unwrap(),585};586587IRNodeInfo {588title: properties.variant_name(),589properties,590..Default::default()591}592},593IR::Slice {594input: _,595offset,596len,597} => {598#[cfg_attr(feature = "bigidx", expect(clippy::useless_conversion))]599let properties = IRNodeProperties::Slice {600offset: (*offset).into(),601len: (*len).into(),602};603604IRNodeInfo {605title: properties.variant_name(),606properties,607..Default::default()608}609},610IR::Sort {611input: _,612by_column,613slice,614sort_options:615SortMultipleOptions {616descending,617nulls_last,618multithreaded,619maintain_order,620limit,621},622} => {623let properties = IRNodeProperties::Sort {624by_exprs: expr_list(by_column, self.expr_arena),625slice: convert_opt_slice(slice),626descending: descending.clone(),627nulls_last: nulls_last.clone(),628multithreaded: *multithreaded,629maintain_order: *maintain_order,630#[cfg_attr(feature = "bigidx", expect(clippy::useless_conversion))]631limit: limit.map(|x| x.into()),632};633634IRNodeInfo {635title: properties.variant_name(),636properties,637..Default::default()638}639},640IR::Union {641inputs: _,642options:643UnionOptions {644slice,645rows: _,646parallel,647from_partitioned_ds,648flattened_by_opt,649rechunk,650maintain_order,651},652} => {653let properties = IRNodeProperties::Union {654maintain_order: *maintain_order,655parallel: *parallel,656rechunk: *rechunk,657slice: convert_opt_slice(slice),658from_partitioned_ds: *from_partitioned_ds,659flattened_by_opt: *flattened_by_opt,660};661662IRNodeInfo {663title: properties.variant_name(),664properties,665..Default::default()666}667},668#[cfg(feature = "merge_sorted")]669IR::MergeSorted {670input_left: _,671input_right: _,672key,673} => {674let properties = IRNodeProperties::MergeSorted { key: key.clone() };675676IRNodeInfo {677title: properties.variant_name(),678properties,679..Default::default()680}681},682#[cfg(feature = "python")]683IR::PythonScan {684options:685crate::plans::PythonOptions {686scan_fn: _,687schema,688output_schema: _,689with_columns,690python_source,691n_rows,692predicate,693validate_schema,694is_pure,695},696} => {697use crate::plans::PythonPredicate;698699let properties = IRNodeProperties::PythonScan {700scan_source_type: python_source.clone(),701n_rows: n_rows.map(|x| x.try_into().unwrap()),702projection: with_columns.as_deref().map(list_str_cloned),703predicate: match predicate {704PythonPredicate::None => None,705PythonPredicate::PyArrow(s) => Some(s.into()),706PythonPredicate::Polars(p) => {707Some(format_pl_smallstr!("{}", p.display(self.expr_arena)))708},709},710schema_names: schema.iter_names_cloned().collect(),711is_pure: *is_pure,712validate_schema: *validate_schema,713};714715IRNodeInfo {716title: properties.variant_name(),717properties,718..Default::default()719}720},721}722}723}724725impl IRNodeProperties {726fn variant_name(&self) -> PlSmallStr {727PlSmallStr::from_static(<&'static str>::from(self))728}729}730731fn list_str_cloned<I, T>(iter: I) -> Vec<PlSmallStr>732where733I: IntoIterator<Item = T>,734T: AsRef<str>,735{736iter.into_iter()737.map(|x| PlSmallStr::from_str(x.as_ref()))738.collect()739}740741fn convert_opt_slice<T, U>(slice: &Option<(T, U)>) -> Option<(i64, u64)>742where743T: Copy + TryInto<i64>,744U: Copy + TryInto<u64>,745<T as TryInto<i64>>::Error: std::fmt::Debug,746<U as TryInto<u64>>::Error: std::fmt::Debug,747{748slice.map(|(offset, len)| (offset.try_into().unwrap(), len.try_into().unwrap()))749}750751fn expr_list(exprs: &[ExprIR], expr_arena: &Arena<AExpr>) -> Vec<PlSmallStr> {752exprs753.iter()754.map(|e| format_pl_smallstr!("{}", e.display(expr_arena)))755.collect()756}757758759