Path: blob/main/crates/polars-stream/src/physical_plan/to_graph.rs
6939 views
use std::sync::Arc;12use parking_lot::Mutex;3use polars_core::prelude::PlRandomState;4use polars_core::schema::Schema;5use polars_core::{POOL, config};6use polars_error::{PolarsResult, polars_bail, polars_ensure, polars_err};7use polars_expr::groups::new_hash_grouper;8use polars_expr::planner::{ExpressionConversionState, create_physical_expr};9use polars_expr::reduce::into_reduction;10use polars_expr::state::ExecutionState;11use polars_mem_engine::{create_physical_plan, create_scan_predicate};12use polars_plan::dsl::{JoinOptionsIR, PartitionVariantIR, ScanSources};13use polars_plan::plans::expr_ir::ExprIR;14use polars_plan::plans::{AExpr, ArenaExprIter, Context, IR, IRAggExpr};15use polars_plan::prelude::{FileType, FunctionFlags};16use polars_utils::arena::{Arena, Node};17use polars_utils::format_pl_smallstr;18use polars_utils::itertools::Itertools;19use polars_utils::pl_str::PlSmallStr;20use polars_utils::plpath::PlPath;21use polars_utils::relaxed_cell::RelaxedCell;22use recursive::recursive;23use slotmap::{SecondaryMap, SlotMap};2425use super::{PhysNode, PhysNodeKey, PhysNodeKind};26use crate::execute::StreamingExecutionState;27use crate::expression::StreamExpr;28use crate::graph::{Graph, GraphNodeKey};29use crate::morsel::{MorselSeq, get_ideal_morsel_size};30use crate::nodes;31use crate::nodes::io_sinks::SinkComputeNode;32use crate::nodes::io_sinks::partition::PerPartitionSortBy;33use crate::nodes::io_sources::multi_scan::config::MultiScanConfig;34use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;35use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;36use crate::physical_plan::lower_expr::compute_output_schema;37use crate::utils::late_materialized_df::LateMaterializedDataFrame;3839fn has_potential_recurring_entrance(node: Node, arena: &Arena<AExpr>) -> bool {40arena.iter(node).any(|(_n, ae)| match ae {41AExpr::Function { options, .. } | AExpr::AnonymousFunction { options, .. } => {42options.flags.contains(FunctionFlags::OPTIONAL_RE_ENTRANT)43},44_ => false,45})46}4748fn create_stream_expr(49expr_ir: &ExprIR,50ctx: &mut GraphConversionContext<'_>,51schema: &Arc<Schema>,52) -> PolarsResult<StreamExpr> {53let reentrant = has_potential_recurring_entrance(expr_ir.node(), ctx.expr_arena);54let phys = create_physical_expr(55expr_ir,56Context::Default,57ctx.expr_arena,58schema,59&mut ctx.expr_conversion_state,60)?;61Ok(StreamExpr::new(phys, reentrant))62}6364struct GraphConversionContext<'a> {65phys_sm: &'a SlotMap<PhysNodeKey, PhysNode>,66expr_arena: &'a mut Arena<AExpr>,67graph: Graph,68phys_to_graph: SecondaryMap<PhysNodeKey, GraphNodeKey>,69expr_conversion_state: ExpressionConversionState,70num_pipelines: usize,71}7273pub fn physical_plan_to_graph(74root: PhysNodeKey,75phys_sm: &SlotMap<PhysNodeKey, PhysNode>,76expr_arena: &mut Arena<AExpr>,77) -> PolarsResult<(Graph, SecondaryMap<PhysNodeKey, GraphNodeKey>)> {78// Get the number of threads from the rayon thread-pool as that respects our config.79let num_pipelines = POOL.current_num_threads();80let mut ctx = GraphConversionContext {81phys_sm,82expr_arena,83graph: Graph::with_capacity(phys_sm.len()),84phys_to_graph: SecondaryMap::with_capacity(phys_sm.len()),85expr_conversion_state: ExpressionConversionState::new(false),86num_pipelines,87};8889to_graph_rec(root, &mut ctx)?;9091Ok((ctx.graph, ctx.phys_to_graph))92}9394#[recursive]95fn to_graph_rec<'a>(96phys_node_key: PhysNodeKey,97ctx: &mut GraphConversionContext<'a>,98) -> PolarsResult<GraphNodeKey> {99// This will ensure we create a proper acyclic directed graph instead of a tree.100if let Some(graph_key) = ctx.phys_to_graph.get(phys_node_key) {101return Ok(*graph_key);102}103104use PhysNodeKind::*;105let node = &ctx.phys_sm[phys_node_key];106let graph_key = match &node.kind {107InMemorySource { df } => ctx.graph.add_node(108nodes::in_memory_source::InMemorySourceNode::new(df.clone(), MorselSeq::default()),109[],110),111SinkMultiple { sinks } => {112// @NOTE: This is always the root node and gets ignored by the physical_plan anyway so113// we give one of the inputs back.114let node = to_graph_rec(sinks[0], ctx)?;115for sink in &sinks[1..] {116to_graph_rec(*sink, ctx)?;117}118return Ok(node);119},120121StreamingSlice {122input,123offset,124length,125} => {126let input_key = to_graph_rec(input.node, ctx)?;127ctx.graph.add_node(128nodes::streaming_slice::StreamingSliceNode::new(*offset, *length),129[(input_key, input.port)],130)131},132133NegativeSlice {134input,135offset,136length,137} => {138let input_key = to_graph_rec(input.node, ctx)?;139ctx.graph.add_node(140nodes::negative_slice::NegativeSliceNode::new(*offset, *length),141[(input_key, input.port)],142)143},144145DynamicSlice {146input,147offset,148length,149} => {150let input_key = to_graph_rec(input.node, ctx)?;151let offset_key = to_graph_rec(offset.node, ctx)?;152let length_key = to_graph_rec(length.node, ctx)?;153let offset_schema = ctx.phys_sm[offset.node].output_schema.clone();154let length_schema = ctx.phys_sm[length.node].output_schema.clone();155ctx.graph.add_node(156nodes::dynamic_slice::DynamicSliceNode::new(offset_schema, length_schema),157[158(input_key, input.port),159(offset_key, offset.port),160(length_key, length.port),161],162)163},164165Shift {166input,167offset,168fill,169} => {170let input_schema = ctx.phys_sm[input.node].output_schema.clone();171let offset_schema = ctx.phys_sm[offset.node].output_schema.clone();172let input_key = to_graph_rec(input.node, ctx)?;173let offset_key = to_graph_rec(offset.node, ctx)?;174if let Some(fill) = fill {175let fill_key = to_graph_rec(fill.node, ctx)?;176ctx.graph.add_node(177nodes::shift::ShiftNode::new(input_schema, offset_schema, true),178[179(input_key, input.port),180(offset_key, offset.port),181(fill_key, fill.port),182],183)184} else {185ctx.graph.add_node(186nodes::shift::ShiftNode::new(input_schema, offset_schema, false),187[(input_key, input.port), (offset_key, offset.port)],188)189}190},191192Filter { predicate, input } => {193let input_schema = &ctx.phys_sm[input.node].output_schema;194let phys_predicate_expr = create_stream_expr(predicate, ctx, input_schema)?;195let input_key = to_graph_rec(input.node, ctx)?;196ctx.graph.add_node(197nodes::filter::FilterNode::new(phys_predicate_expr),198[(input_key, input.port)],199)200},201202Select {203selectors,204input,205extend_original,206} => {207let input_schema = &ctx.phys_sm[input.node].output_schema;208let phys_selectors = selectors209.iter()210.map(|selector| create_stream_expr(selector, ctx, input_schema))211.collect::<PolarsResult<_>>()?;212let input_key = to_graph_rec(input.node, ctx)?;213ctx.graph.add_node(214nodes::select::SelectNode::new(215phys_selectors,216node.output_schema.clone(),217*extend_original,218),219[(input_key, input.port)],220)221},222223WithRowIndex {224input,225name,226offset,227} => {228let input_key = to_graph_rec(input.node, ctx)?;229ctx.graph.add_node(230nodes::with_row_index::WithRowIndexNode::new(name.clone(), *offset),231[(input_key, input.port)],232)233},234235InputIndependentSelect { selectors } => {236let empty_schema = Default::default();237let phys_selectors = selectors238.iter()239.map(|selector| create_stream_expr(selector, ctx, &empty_schema))240.collect::<PolarsResult<_>>()?;241ctx.graph.add_node(242nodes::input_independent_select::InputIndependentSelectNode::new(phys_selectors),243[],244)245},246247Reduce { input, exprs } => {248let input_key = to_graph_rec(input.node, ctx)?;249let input_schema = &ctx.phys_sm[input.node].output_schema;250251let mut reductions = Vec::with_capacity(exprs.len());252let mut inputs = Vec::with_capacity(reductions.len());253254for e in exprs {255let (red, input_node) = into_reduction(e.node(), ctx.expr_arena, input_schema)?;256reductions.push(red);257258let input_phys = create_stream_expr(259&ExprIR::from_node(input_node, ctx.expr_arena),260ctx,261input_schema,262)?;263264inputs.push(input_phys)265}266267ctx.graph.add_node(268nodes::reduce::ReduceNode::new(inputs, reductions, node.output_schema.clone()),269[(input_key, input.port)],270)271},272SimpleProjection { input, columns } => {273let input_schema = ctx.phys_sm[input.node].output_schema.clone();274let input_key = to_graph_rec(input.node, ctx)?;275ctx.graph.add_node(276nodes::simple_projection::SimpleProjectionNode::new(columns.clone(), input_schema),277[(input_key, input.port)],278)279},280281InMemorySink { input } => {282let input_schema = ctx.phys_sm[input.node].output_schema.clone();283let input_key = to_graph_rec(input.node, ctx)?;284ctx.graph.add_node(285nodes::in_memory_sink::InMemorySinkNode::new(input_schema),286[(input_key, input.port)],287)288},289290FileSink {291target,292sink_options,293file_type,294input,295cloud_options,296} => {297let sink_options = sink_options.clone();298let input_schema = ctx.phys_sm[input.node].output_schema.clone();299let input_key = to_graph_rec(input.node, ctx)?;300301match file_type {302#[cfg(feature = "ipc")]303FileType::Ipc(ipc_writer_options) => ctx.graph.add_node(304SinkComputeNode::from(nodes::io_sinks::ipc::IpcSinkNode::new(305input_schema,306target.clone(),307sink_options,308*ipc_writer_options,309cloud_options.clone(),310)),311[(input_key, input.port)],312),313#[cfg(feature = "json")]314FileType::Json(_) => ctx.graph.add_node(315SinkComputeNode::from(nodes::io_sinks::json::NDJsonSinkNode::new(316target.clone(),317sink_options,318cloud_options.clone(),319)),320[(input_key, input.port)],321),322#[cfg(feature = "parquet")]323FileType::Parquet(parquet_writer_options) => ctx.graph.add_node(324SinkComputeNode::from(nodes::io_sinks::parquet::ParquetSinkNode::new(325input_schema,326target.clone(),327sink_options,328parquet_writer_options,329cloud_options.clone(),330false,331)?),332[(input_key, input.port)],333),334#[cfg(feature = "csv")]335FileType::Csv(csv_writer_options) => ctx.graph.add_node(336SinkComputeNode::from(nodes::io_sinks::csv::CsvSinkNode::new(337target.clone(),338input_schema,339sink_options,340csv_writer_options.clone(),341cloud_options.clone(),342)),343[(input_key, input.port)],344),345#[cfg(not(any(346feature = "csv",347feature = "parquet",348feature = "json",349feature = "ipc"350)))]351_ => {352panic!("activate source feature")353},354}355},356357PartitionSink {358input,359base_path,360file_path_cb,361sink_options,362variant,363file_type,364cloud_options,365per_partition_sort_by,366finish_callback,367} => {368let input_schema = ctx.phys_sm[input.node].output_schema.clone();369let input_key = to_graph_rec(input.node, ctx)?;370371let base_path = base_path.clone();372let file_path_cb = file_path_cb.clone();373let ext = PlSmallStr::from_static(file_type.extension());374let create_new = nodes::io_sinks::partition::get_create_new_fn(375file_type.clone(),376sink_options.clone(),377cloud_options.clone(),378finish_callback.is_some(),379);380381let per_partition_sort_by = match per_partition_sort_by.as_ref() {382None => None,383Some(c) => {384let (selectors, descending, nulls_last) = c385.iter()386.map(|c| {387Ok((388create_stream_expr(&c.expr, ctx, &input_schema)?,389c.descending,390c.nulls_last,391))392})393.collect::<PolarsResult<(Vec<_>, Vec<_>, Vec<_>)>>()?;394395Some(PerPartitionSortBy {396selectors,397descending,398nulls_last,399maintain_order: true,400})401},402};403404let sink_compute_node = match variant {405PartitionVariantIR::MaxSize(max_size) => SinkComputeNode::from(406nodes::io_sinks::partition::max_size::MaxSizePartitionSinkNode::new(407input_schema,408*max_size,409base_path,410file_path_cb,411create_new,412ext,413sink_options.clone(),414per_partition_sort_by,415finish_callback.clone(),416),417),418PartitionVariantIR::Parted {419key_exprs,420include_key,421} => SinkComputeNode::from(422nodes::io_sinks::partition::parted::PartedPartitionSinkNode::new(423input_schema,424key_exprs.iter().map(|e| e.output_name().clone()).collect(),425base_path,426file_path_cb,427create_new,428ext,429sink_options.clone(),430*include_key,431per_partition_sort_by,432finish_callback.clone(),433),434),435PartitionVariantIR::ByKey {436key_exprs,437include_key,438} => SinkComputeNode::from(439nodes::io_sinks::partition::by_key::PartitionByKeySinkNode::new(440input_schema,441key_exprs.iter().map(|e| e.output_name().clone()).collect(),442base_path,443file_path_cb,444create_new,445ext,446sink_options.clone(),447*include_key,448per_partition_sort_by,449finish_callback.clone(),450),451),452};453454ctx.graph455.add_node(sink_compute_node, [(input_key, input.port)])456},457458InMemoryMap {459input,460map,461format_str: _,462} => {463let input_schema = ctx.phys_sm[input.node].output_schema.clone();464let input_key = to_graph_rec(input.node, ctx)?;465ctx.graph.add_node(466nodes::in_memory_map::InMemoryMapNode::new(input_schema, map.clone()),467[(input_key, input.port)],468)469},470471Map { input, map } => {472let input_key = to_graph_rec(input.node, ctx)?;473ctx.graph.add_node(474nodes::map::MapNode::new(map.clone()),475[(input_key, input.port)],476)477},478479Sort {480input,481by_column,482slice,483sort_options,484} => {485let input_schema = ctx.phys_sm[input.node].output_schema.clone();486let lmdf = Arc::new(LateMaterializedDataFrame::default());487let mut lp_arena = Arena::default();488let df_node = lp_arena.add(lmdf.clone().as_ir_node(input_schema.clone()));489let sort_node = lp_arena.add(IR::Sort {490input: df_node,491by_column: by_column.clone(),492slice: *slice,493sort_options: sort_options.clone(),494});495let executor = Mutex::new(create_physical_plan(496sort_node,497&mut lp_arena,498ctx.expr_arena,499None,500)?);501502let input_key = to_graph_rec(input.node, ctx)?;503ctx.graph.add_node(504nodes::in_memory_map::InMemoryMapNode::new(505input_schema,506Arc::new(move |df| {507lmdf.set_materialized_dataframe(df);508let mut state = ExecutionState::new();509executor.lock().execute(&mut state)510}),511),512[(input_key, input.port)],513)514},515516TopK {517input,518k,519by_column,520reverse,521nulls_last,522} => {523let input_key = to_graph_rec(input.node, ctx)?;524let k_key = to_graph_rec(k.node, ctx)?;525526let k_schema = ctx.phys_sm[k.node].output_schema.clone();527let input_schema = &ctx.phys_sm[input.node].output_schema;528let key_schema = compute_output_schema(input_schema, by_column, ctx.expr_arena)?;529530let key_selectors = by_column531.iter()532.map(|e| create_stream_expr(e, ctx, input_schema))533.try_collect_vec()?;534535ctx.graph.add_node(536nodes::top_k::TopKNode::new(537k_schema,538reverse.clone(),539nulls_last.clone(),540key_schema,541key_selectors,542),543[(input_key, input.port), (k_key, k.port)],544)545},546547Repeat { value, repeats } => {548let value_key = to_graph_rec(value.node, ctx)?;549let repeats_key = to_graph_rec(repeats.node, ctx)?;550let value_schema = ctx.phys_sm[value.node].output_schema.clone();551let repeats_schema = ctx.phys_sm[repeats.node].output_schema.clone();552ctx.graph.add_node(553nodes::repeat::RepeatNode::new(value_schema, repeats_schema),554[(value_key, value.port), (repeats_key, repeats.port)],555)556},557558#[cfg(feature = "cum_agg")]559CumAgg { input, kind } => {560let input_key = to_graph_rec(input.node, ctx)?;561ctx.graph.add_node(562nodes::cum_agg::CumAggNode::new(*kind),563[(input_key, input.port)],564)565},566567Rle(input) => {568let input_key = to_graph_rec(input.node, ctx)?;569let input_schema = &ctx.phys_sm[input.node].output_schema;570assert_eq!(input_schema.len(), 1);571let (name, dtype) = input_schema.get_at_index(0).unwrap();572ctx.graph.add_node(573nodes::rle::RleNode::new(name.clone(), dtype.clone()),574[(input_key, input.port)],575)576},577578RleId(input) => {579let input_key = to_graph_rec(input.node, ctx)?;580let input_schema = &ctx.phys_sm[input.node].output_schema;581assert_eq!(input_schema.len(), 1);582let (_, dtype) = input_schema.get_at_index(0).unwrap();583ctx.graph.add_node(584nodes::rle_id::RleIdNode::new(dtype.clone()),585[(input_key, input.port)],586)587},588589PeakMinMax { input, is_peak_max } => {590let input_key = to_graph_rec(input.node, ctx)?;591ctx.graph.add_node(592nodes::peak_minmax::PeakMinMaxNode::new(*is_peak_max),593[(input_key, input.port)],594)595},596597OrderedUnion { inputs } => {598let input_keys = inputs599.iter()600.map(|i| PolarsResult::Ok((to_graph_rec(i.node, ctx)?, i.port)))601.try_collect_vec()?;602ctx.graph603.add_node(nodes::ordered_union::OrderedUnionNode::new(), input_keys)604},605606Zip {607inputs,608null_extend,609} => {610let input_schemas = inputs611.iter()612.map(|i| ctx.phys_sm[i.node].output_schema.clone())613.collect_vec();614let input_keys = inputs615.iter()616.map(|i| PolarsResult::Ok((to_graph_rec(i.node, ctx)?, i.port)))617.try_collect_vec()?;618ctx.graph.add_node(619nodes::zip::ZipNode::new(*null_extend, input_schemas),620input_keys,621)622},623624Multiplexer { input } => {625let input_key = to_graph_rec(input.node, ctx)?;626ctx.graph.add_node(627nodes::multiplexer::MultiplexerNode::new(),628[(input_key, input.port)],629)630},631632MultiScan {633scan_sources,634file_reader_builder,635cloud_options,636file_projection_builder,637output_schema,638row_index,639pre_slice,640predicate,641hive_parts,642missing_columns_policy,643cast_columns_policy,644include_file_paths,645forbid_extra_columns,646deletion_files,647file_schema,648} => {649let hive_parts = hive_parts.clone();650651let predicate = predicate652.as_ref()653.map(|pred| {654create_scan_predicate(655pred,656ctx.expr_arena,657output_schema,658hive_parts.as_ref().map(|hp| hp.df().schema().as_ref()),659&mut ctx.expr_conversion_state,660true, // create_skip_batch_predicate661file_reader_builder662.reader_capabilities()663.contains(ReaderCapabilities::PARTIAL_FILTER), // create_column_predicates664)665})666.transpose()?667.map(|p| p.to_io(None, file_schema.clone()));668669let sources = scan_sources.clone();670let file_reader_builder = file_reader_builder.clone();671let cloud_options = cloud_options.clone();672673let final_output_schema = output_schema.clone();674let file_projection_builder = file_projection_builder.clone();675676let row_index = row_index.clone();677let pre_slice = pre_slice.clone();678let hive_parts = hive_parts.map(Arc::new);679let include_file_paths = include_file_paths.clone();680let missing_columns_policy = *missing_columns_policy;681let forbid_extra_columns = forbid_extra_columns.clone();682let cast_columns_policy = cast_columns_policy.clone();683let deletion_files = deletion_files.clone();684685let verbose = config::verbose();686687ctx.graph.add_node(688nodes::io_sources::multi_scan::MultiScan::new(Arc::new(MultiScanConfig {689sources,690file_reader_builder,691cloud_options,692final_output_schema,693file_projection_builder,694row_index,695pre_slice,696predicate,697hive_parts,698include_file_paths,699missing_columns_policy,700forbid_extra_columns,701cast_columns_policy,702deletion_files,703// Initialized later704num_pipelines: RelaxedCell::new_usize(0),705n_readers_pre_init: RelaxedCell::new_usize(0),706max_concurrent_scans: RelaxedCell::new_usize(0),707verbose,708})),709[],710)711},712713GroupBy { input, key, aggs } => {714let input_key = to_graph_rec(input.node, ctx)?;715716let input_schema = &ctx.phys_sm[input.node].output_schema;717let key_schema = compute_output_schema(input_schema, key, ctx.expr_arena)?;718let grouper = new_hash_grouper(key_schema.clone());719720let key_selectors = key721.iter()722.map(|e| create_stream_expr(e, ctx, input_schema))723.try_collect_vec()?;724725let mut grouped_reductions = Vec::new();726let mut grouped_reduction_cols = Vec::new();727let mut has_order_sensitive_agg = false;728for agg in aggs {729has_order_sensitive_agg |= matches!(730ctx.expr_arena.get(agg.node()),731AExpr::Agg(IRAggExpr::First(..) | IRAggExpr::Last(..))732);733let (reduction, input_node) =734into_reduction(agg.node(), ctx.expr_arena, input_schema)?;735let AExpr::Column(col) = ctx.expr_arena.get(input_node) else {736unreachable!()737};738grouped_reductions.push(reduction);739grouped_reduction_cols.push(col.clone());740}741742ctx.graph.add_node(743nodes::group_by::GroupByNode::new(744key_schema,745key_selectors,746grouper,747grouped_reduction_cols,748grouped_reductions,749node.output_schema.clone(),750PlRandomState::default(),751ctx.num_pipelines,752has_order_sensitive_agg,753),754[(input_key, input.port)],755)756},757758InMemoryJoin {759input_left,760input_right,761left_on,762right_on,763args,764options,765} => {766let left_input_key = to_graph_rec(input_left.node, ctx)?;767let right_input_key = to_graph_rec(input_right.node, ctx)?;768let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();769let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();770771let mut lp_arena = Arena::default();772let left_lmdf = Arc::new(LateMaterializedDataFrame::default());773let right_lmdf = Arc::new(LateMaterializedDataFrame::default());774775let left_node = lp_arena.add(left_lmdf.clone().as_ir_node(left_input_schema.clone()));776let right_node =777lp_arena.add(right_lmdf.clone().as_ir_node(right_input_schema.clone()));778let join_node = lp_arena.add(IR::Join {779input_left: left_node,780input_right: right_node,781schema: node.output_schema.clone(),782left_on: left_on.clone(),783right_on: right_on.clone(),784options: Arc::new(JoinOptionsIR {785allow_parallel: true,786force_parallel: false,787args: args.clone(),788options: options.clone(),789rows_left: (None, 0),790rows_right: (None, 0),791}),792});793794let executor = Mutex::new(create_physical_plan(795join_node,796&mut lp_arena,797ctx.expr_arena,798None,799)?);800801ctx.graph.add_node(802nodes::joins::in_memory::InMemoryJoinNode::new(803left_input_schema,804right_input_schema,805Arc::new(move |left, right| {806left_lmdf.set_materialized_dataframe(left);807right_lmdf.set_materialized_dataframe(right);808let mut state = ExecutionState::new();809executor.lock().execute(&mut state)810}),811),812[813(left_input_key, input_left.port),814(right_input_key, input_right.port),815],816)817},818819EquiJoin {820input_left,821input_right,822left_on,823right_on,824args,825}826| SemiAntiJoin {827input_left,828input_right,829left_on,830right_on,831args,832output_bool: _,833} => {834let args = args.clone();835let left_input_key = to_graph_rec(input_left.node, ctx)?;836let right_input_key = to_graph_rec(input_right.node, ctx)?;837let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();838let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();839840let left_key_schema =841compute_output_schema(&left_input_schema, left_on, ctx.expr_arena)?;842let right_key_schema =843compute_output_schema(&right_input_schema, right_on, ctx.expr_arena)?;844845// We want to make sure here that the key types match otherwise we get out garbage out846// since the hashes will be calculated differently.847polars_ensure!(848left_on.len() == right_on.len() &&849left_on.iter().zip(right_on.iter()).all(|(l, r)| {850let l_dtype = left_key_schema.get(l.output_name()).unwrap();851let r_dtype = right_key_schema.get(r.output_name()).unwrap();852l_dtype == r_dtype853}),854SchemaMismatch: "join received different key types on left and right side"855);856857// We use key columns entirely by position, and allow duplicate names in key selectors,858// so just assign arbitrary unique names for the selectors.859let unique_left_on = left_on860.iter()861.enumerate()862.map(|(i, expr)| expr.with_alias(format_pl_smallstr!("__POLARS_KEYCOL_{i}")))863.collect_vec();864let unique_right_on = right_on865.iter()866.enumerate()867.map(|(i, expr)| expr.with_alias(format_pl_smallstr!("__POLARS_KEYCOL_{i}")))868.collect_vec();869870let left_key_selectors = unique_left_on871.iter()872.map(|e| create_stream_expr(e, ctx, &left_input_schema))873.try_collect_vec()?;874let right_key_selectors = unique_right_on875.iter()876.map(|e| create_stream_expr(e, ctx, &right_input_schema))877.try_collect_vec()?;878879let unique_key_schema =880compute_output_schema(&right_input_schema, &unique_left_on, ctx.expr_arena)?;881882match node.kind {883#[cfg(feature = "semi_anti_join")]884SemiAntiJoin { output_bool, .. } => ctx.graph.add_node(885nodes::joins::semi_anti_join::SemiAntiJoinNode::new(886unique_key_schema,887left_key_selectors,888right_key_selectors,889args,890output_bool,891ctx.num_pipelines,892)?,893[894(left_input_key, input_left.port),895(right_input_key, input_right.port),896],897),898_ => ctx.graph.add_node(899nodes::joins::equi_join::EquiJoinNode::new(900left_input_schema,901right_input_schema,902left_key_schema,903right_key_schema,904unique_key_schema,905left_key_selectors,906right_key_selectors,907args,908ctx.num_pipelines,909)?,910[911(left_input_key, input_left.port),912(right_input_key, input_right.port),913],914),915}916},917918CrossJoin {919input_left,920input_right,921args,922} => {923let args = args.clone();924let left_input_key = to_graph_rec(input_left.node, ctx)?;925let right_input_key = to_graph_rec(input_right.node, ctx)?;926let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();927let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();928929ctx.graph.add_node(930nodes::joins::cross_join::CrossJoinNode::new(931left_input_schema,932right_input_schema,933&args,934),935[936(left_input_key, input_left.port),937(right_input_key, input_right.port),938],939)940},941942#[cfg(feature = "merge_sorted")]943MergeSorted {944input_left,945input_right,946} => {947let left_input_key = to_graph_rec(input_left.node, ctx)?;948let right_input_key = to_graph_rec(input_right.node, ctx)?;949ctx.graph.add_node(950nodes::merge_sorted::MergeSortedNode::new(),951[952(left_input_key, input_left.port),953(right_input_key, input_right.port),954],955)956},957958#[cfg(feature = "python")]959PythonScan { options } => {960use polars_plan::dsl::python_dsl::PythonScanSource as S;961use polars_plan::plans::PythonPredicate;962use polars_utils::relaxed_cell::RelaxedCell;963use pyo3::exceptions::PyStopIteration;964use pyo3::prelude::*;965use pyo3::types::{PyBytes, PyNone};966use pyo3::{IntoPyObjectExt, PyTypeInfo, intern};967968let mut options = options.clone();969let with_columns = options.with_columns.take();970let n_rows = options.n_rows.take();971972let python_scan_function = options.scan_fn.take().unwrap().0;973974let with_columns = with_columns.map(|cols| cols.iter().cloned().collect::<Vec<_>>());975976let (pl_predicate, predicate_serialized) = polars_mem_engine::python_scan_predicate(977&mut options,978ctx.expr_arena,979&mut ctx.expr_conversion_state,980)?;981982let output_schema = options.output_schema.unwrap_or(options.schema);983let validate_schema = options.validate_schema;984985let simple_projection = with_columns.as_ref().and_then(|with_columns| {986(with_columns987.iter()988.zip(output_schema.iter_names())989.any(|(a, b)| a != b))990.then(|| output_schema.clone())991});992993let (name, get_batch_fn) = match options.python_source {994S::Pyarrow => todo!(),995S::Cuda => todo!(),996S::IOPlugin => {997let batch_size = Some(get_ideal_morsel_size());998let output_schema = output_schema.clone();9991000let with_columns = with_columns.map(|x| {1001x.into_iter()1002.map(|x| x.to_string())1003.collect::<Vec<String>>()1004});10051006// Setup the IO plugin generator.1007let (generator, can_parse_predicate) = {1008Python::with_gil(|py| {1009let pl = PyModule::import(py, intern!(py, "polars")).unwrap();1010let utils = pl.getattr(intern!(py, "_utils")).unwrap();1011let callable =1012utils.getattr(intern!(py, "_execute_from_rust")).unwrap();10131014let mut could_serialize_predicate = true;1015let predicate = match &options.predicate {1016PythonPredicate::PyArrow(s) => s.into_bound_py_any(py).unwrap(),1017PythonPredicate::None => None::<()>.into_bound_py_any(py).unwrap(),1018PythonPredicate::Polars(_) => {1019assert!(pl_predicate.is_some(), "should be set");1020match &predicate_serialized {1021None => {1022could_serialize_predicate = false;1023PyNone::get(py).to_owned().into_any()1024},1025Some(buf) => PyBytes::new(py, buf).into_any(),1026}1027},1028};10291030let args = (1031python_scan_function,1032with_columns,1033predicate,1034n_rows,1035batch_size,1036);10371038let generator_init = callable.call1(args)?;1039let generator = generator_init.get_item(0).map_err(1040|_| polars_err!(ComputeError: "expected tuple got {generator_init}"),1041)?;1042let can_parse_predicate = generator_init.get_item(1).map_err(1043|_| polars_err!(ComputeError: "expected tuple got {generator}"),1044)?;1045let can_parse_predicate = can_parse_predicate.extract::<bool>().map_err(1046|_| polars_err!(ComputeError: "expected bool got {can_parse_predicate}"),1047)? && could_serialize_predicate;10481049let generator = generator.into_py_any(py).map_err(1050|_| polars_err!(ComputeError: "unable to grab reference to IO plugin generator"),1051)?;10521053PolarsResult::Ok((generator, can_parse_predicate))1054})1055}?;10561057let get_batch_fn = Box::new(move |state: &StreamingExecutionState| {1058let df = Python::with_gil(|py| {1059match generator.bind(py).call_method0(intern!(py, "__next__")) {1060Ok(out) => polars_plan::plans::python_df_to_rust(py, out).map(Some),1061Err(err)1062if err.matches(py, PyStopIteration::type_object(py))? =>1063{1064Ok(None)1065},1066Err(err) => polars_bail!(1067ComputeError: "caught exception during execution of a Python source, exception: {err}"1068),1069}1070})?;10711072let Some(mut df) = df else { return Ok(None) };10731074if let Some(simple_projection) = &simple_projection {1075df = df.project(simple_projection.clone())?;1076}10771078if validate_schema {1079polars_ensure!(1080df.schema() == &output_schema,1081SchemaMismatch: "user provided schema: {:?} doesn't match the DataFrame schema: {:?}",1082output_schema, df.schema()1083);1084}10851086// TODO: Move this to a FilterNode so that it happens in parallel. We may need1087// to move all of the enclosing code to `lower_ir` for this.1088if let (Some(pred), false) = (&pl_predicate, can_parse_predicate) {1089let mask = pred.evaluate(&df, &state.in_memory_exec_state)?;1090df = df.filter(mask.bool()?)?;1091}10921093Ok(Some(df))1094}) as Box<_>;10951096(PlSmallStr::from_static("io_plugin"), get_batch_fn)1097},1098};10991100use polars_plan::dsl::{CastColumnsPolicy, MissingColumnsPolicy};11011102use crate::nodes::io_sources::batch::builder::BatchFnReaderBuilder;1103use crate::nodes::io_sources::batch::{BatchFnReader, GetBatchState};1104use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;11051106let reader = BatchFnReader {1107name: name.clone(),1108// If validate_schema is false, the schema of the morsels may not match the1109// configured schema. In this case we set this to `None` and the reader will1110// retrieve the schema from the first morsel.1111output_schema: validate_schema.then(|| output_schema.clone()),1112get_batch_state: Some(GetBatchState::from(get_batch_fn)),1113execution_state: None,1114verbose: config::verbose(),1115};11161117let file_reader_builder = Arc::new(BatchFnReaderBuilder {1118name,1119reader: std::sync::Mutex::new(Some(reader)),1120execution_state: Default::default(),1121}) as Arc<dyn FileReaderBuilder>;11221123// Give multiscan a single scan source. (It doesn't actually read from this).1124let sources = ScanSources::Paths(Arc::from([PlPath::from_str("python-scan-0")]));1125let cloud_options = None;1126let final_output_schema = output_schema.clone();1127let file_projection_builder = ProjectionBuilder::new(output_schema, None, None);1128let row_index = None;1129let pre_slice = None;1130let predicate = None;1131let hive_parts = None;1132let include_file_paths = None;1133let missing_columns_policy = MissingColumnsPolicy::Raise;1134let forbid_extra_columns = None;1135let cast_columns_policy = CastColumnsPolicy::ERROR_ON_MISMATCH;1136let deletion_files = None;1137let verbose = config::verbose();11381139ctx.graph.add_node(1140nodes::io_sources::multi_scan::MultiScan::new(Arc::new(MultiScanConfig {1141sources,1142file_reader_builder,1143cloud_options,1144final_output_schema,1145file_projection_builder,1146row_index,1147pre_slice,1148predicate,1149hive_parts,1150include_file_paths,1151missing_columns_policy,1152forbid_extra_columns,1153cast_columns_policy,1154deletion_files,1155// Initialized later1156num_pipelines: RelaxedCell::new_usize(0),1157n_readers_pre_init: RelaxedCell::new_usize(0),1158max_concurrent_scans: RelaxedCell::new_usize(0),1159verbose,1160})),1161[],1162)1163},1164};11651166ctx.phys_to_graph.insert(phys_node_key, graph_key);1167Ok(graph_key)1168}116911701171