Path: blob/main/crates/polars-stream/src/physical_plan/to_graph.rs
8458 views
use std::sync::{Arc, OnceLock};12use num_traits::AsPrimitive;3use parking_lot::Mutex;4use polars_core::prelude::PlRandomState;5use polars_core::schema::{Schema, SchemaRef};6use polars_core::{POOL, config};7use polars_error::{PolarsResult, polars_bail, polars_ensure, polars_err};8use polars_expr::groups::new_hash_grouper;9use polars_expr::planner::{ExpressionConversionState, create_physical_expr};10use polars_expr::reduce::into_reduction;11use polars_expr::state::ExecutionState;12use polars_mem_engine::create_physical_plan;13use polars_mem_engine::scan_predicate::create_scan_predicate;14use polars_plan::dsl::{15FileSinkOptions, JoinOptionsIR, PartitionStrategyIR, PartitionedSinkOptionsIR, ScanSources,16};17use polars_plan::plans::expr_ir::ExprIR;18use polars_plan::plans::{AExpr, ArenaExprIter, IR, IRAggExpr};19use polars_plan::prelude::FunctionFlags;20use polars_utils::arena::{Arena, Node};21use polars_utils::format_pl_smallstr;22use polars_utils::itertools::Itertools;23use polars_utils::pl_path::PlRefPath;24use polars_utils::pl_str::PlSmallStr;25use polars_utils::relaxed_cell::RelaxedCell;26use recursive::recursive;27use slotmap::{SecondaryMap, SlotMap};2829use super::{PhysNode, PhysNodeKey, PhysNodeKind};30use crate::execute::StreamingExecutionState;31use crate::expression::StreamExpr;32use crate::graph::{Graph, GraphNodeKey};33use crate::morsel::{MorselSeq, get_ideal_morsel_size};34use crate::nodes;35use crate::nodes::io_sources::multi_scan::config::MultiScanConfig;36use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;37use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;38use crate::nodes::joins::merge_join::MergeJoinNode;39use crate::physical_plan::lower_expr::compute_output_schema;40use crate::utils::late_materialized_df::LateMaterializedDataFrame;4142fn has_potential_recurring_entrance(node: Node, arena: &Arena<AExpr>) -> bool {43arena.iter(node).any(|(_n, ae)| match ae {44AExpr::Function { options, .. } | AExpr::AnonymousFunction { options, .. } => {45options.flags.contains(FunctionFlags::OPTIONAL_RE_ENTRANT)46},47_ => false,48})49}5051fn create_stream_expr(52expr_ir: &ExprIR,53ctx: &mut GraphConversionContext<'_>,54schema: &Arc<Schema>,55) -> PolarsResult<StreamExpr> {56let reentrant = has_potential_recurring_entrance(expr_ir.node(), ctx.expr_arena);57let phys = create_physical_expr(58expr_ir,59ctx.expr_arena,60schema,61&mut ctx.expr_conversion_state,62)?;63Ok(StreamExpr::new(phys, reentrant))64}6566struct GraphConversionContext<'a> {67phys_sm: &'a SlotMap<PhysNodeKey, PhysNode>,68expr_arena: &'a mut Arena<AExpr>,69graph: Graph,70phys_to_graph: SecondaryMap<PhysNodeKey, GraphNodeKey>,71expr_conversion_state: ExpressionConversionState,72num_pipelines: usize,73}7475pub fn physical_plan_to_graph(76root: PhysNodeKey,77phys_sm: &SlotMap<PhysNodeKey, PhysNode>,78expr_arena: &mut Arena<AExpr>,79) -> PolarsResult<(Graph, SecondaryMap<PhysNodeKey, GraphNodeKey>)> {80// Get the number of threads from the rayon thread-pool as that respects our config.81let num_pipelines = POOL.current_num_threads();82let mut ctx = GraphConversionContext {83phys_sm,84expr_arena,85graph: Graph::with_capacity(phys_sm.len()),86phys_to_graph: SecondaryMap::with_capacity(phys_sm.len()),87expr_conversion_state: ExpressionConversionState::new(false),88num_pipelines,89};9091to_graph_rec(root, &mut ctx)?;9293Ok((ctx.graph, ctx.phys_to_graph))94}9596#[recursive]97fn to_graph_rec<'a>(98phys_node_key: PhysNodeKey,99ctx: &mut GraphConversionContext<'a>,100) -> PolarsResult<GraphNodeKey> {101// This will ensure we create a proper acyclic directed graph instead of a tree.102if let Some(graph_key) = ctx.phys_to_graph.get(phys_node_key) {103return Ok(*graph_key);104}105106use PhysNodeKind::*;107let node = &ctx.phys_sm[phys_node_key];108let graph_key = match &node.kind {109InMemorySource {110df,111disable_morsel_split,112} => ctx.graph.add_node(113if *disable_morsel_split {114nodes::in_memory_source::InMemorySourceNode::new_no_morsel_split(115df.clone(),116MorselSeq::default(),117)118} else {119nodes::in_memory_source::InMemorySourceNode::new(df.clone(), MorselSeq::default())120},121[],122),123SinkMultiple { sinks } => {124// @NOTE: This is always the root node and gets ignored by the physical_plan anyway so125// we give one of the inputs back.126let node = to_graph_rec(sinks[0], ctx)?;127for sink in &sinks[1..] {128to_graph_rec(*sink, ctx)?;129}130return Ok(node);131},132133StreamingSlice {134input,135offset,136length,137} => {138let input_key = to_graph_rec(input.node, ctx)?;139ctx.graph.add_node(140nodes::streaming_slice::StreamingSliceNode::new(*offset, *length),141[(input_key, input.port)],142)143},144145NegativeSlice {146input,147offset,148length,149} => {150let input_key = to_graph_rec(input.node, ctx)?;151ctx.graph.add_node(152nodes::negative_slice::NegativeSliceNode::new(*offset, *length),153[(input_key, input.port)],154)155},156157DynamicSlice {158input,159offset,160length,161} => {162let input_key = to_graph_rec(input.node, ctx)?;163let offset_key = to_graph_rec(offset.node, ctx)?;164let length_key = to_graph_rec(length.node, ctx)?;165let offset_schema = ctx.phys_sm[offset.node].output_schema.clone();166let length_schema = ctx.phys_sm[length.node].output_schema.clone();167ctx.graph.add_node(168nodes::dynamic_slice::DynamicSliceNode::new(offset_schema, length_schema),169[170(input_key, input.port),171(offset_key, offset.port),172(length_key, length.port),173],174)175},176177Shift {178input,179offset,180fill,181} => {182let input_schema = ctx.phys_sm[input.node].output_schema.clone();183let offset_schema = ctx.phys_sm[offset.node].output_schema.clone();184let input_key = to_graph_rec(input.node, ctx)?;185let offset_key = to_graph_rec(offset.node, ctx)?;186if let Some(fill) = fill {187let fill_key = to_graph_rec(fill.node, ctx)?;188ctx.graph.add_node(189nodes::shift::ShiftNode::new(input_schema, offset_schema, true),190[191(input_key, input.port),192(offset_key, offset.port),193(fill_key, fill.port),194],195)196} else {197ctx.graph.add_node(198nodes::shift::ShiftNode::new(input_schema, offset_schema, false),199[(input_key, input.port), (offset_key, offset.port)],200)201}202},203204Filter { predicate, input } => {205let input_schema = &ctx.phys_sm[input.node].output_schema;206let phys_predicate_expr = create_stream_expr(predicate, ctx, input_schema)?;207let input_key = to_graph_rec(input.node, ctx)?;208ctx.graph.add_node(209nodes::filter::FilterNode::new(phys_predicate_expr),210[(input_key, input.port)],211)212},213214Select {215selectors,216input,217extend_original,218} => {219let input_schema = &ctx.phys_sm[input.node].output_schema;220let phys_selectors = selectors221.iter()222.map(|selector| create_stream_expr(selector, ctx, input_schema))223.collect::<PolarsResult<_>>()?;224let input_key = to_graph_rec(input.node, ctx)?;225ctx.graph.add_node(226nodes::select::SelectNode::new(227phys_selectors,228node.output_schema.clone(),229*extend_original,230),231[(input_key, input.port)],232)233},234235WithRowIndex {236input,237name,238offset,239} => {240let input_key = to_graph_rec(input.node, ctx)?;241ctx.graph.add_node(242nodes::with_row_index::WithRowIndexNode::new(name.clone(), *offset),243[(input_key, input.port)],244)245},246247InputIndependentSelect { selectors } => {248let empty_schema = Default::default();249let phys_selectors = selectors250.iter()251.map(|selector| create_stream_expr(selector, ctx, &empty_schema))252.collect::<PolarsResult<_>>()?;253ctx.graph.add_node(254nodes::input_independent_select::InputIndependentSelectNode::new(phys_selectors),255[],256)257},258259Reduce { input, exprs } => {260let input_key = to_graph_rec(input.node, ctx)?;261let input_schema = &ctx.phys_sm[input.node].output_schema;262263let mut reductions = Vec::with_capacity(exprs.len());264let mut inputs = Vec::with_capacity(reductions.len());265266for e in exprs {267let (red, input_nodes) =268into_reduction(e.node(), ctx.expr_arena, input_schema, false)?;269reductions.push(red);270271let input_phys_exprs = input_nodes272.iter()273.map(|node| {274create_stream_expr(275&ExprIR::from_node(*node, ctx.expr_arena),276ctx,277input_schema,278)279})280.try_collect_vec()?;281282inputs.push(input_phys_exprs)283}284285ctx.graph.add_node(286nodes::reduce::ReduceNode::new(inputs, reductions, node.output_schema.clone()),287[(input_key, input.port)],288)289},290SimpleProjection { input, columns } => {291let input_schema = ctx.phys_sm[input.node].output_schema.clone();292let input_key = to_graph_rec(input.node, ctx)?;293ctx.graph.add_node(294nodes::simple_projection::SimpleProjectionNode::new(columns.clone(), input_schema),295[(input_key, input.port)],296)297},298299InMemorySink { input } => {300let input_schema = ctx.phys_sm[input.node].output_schema.clone();301let input_key = to_graph_rec(input.node, ctx)?;302ctx.graph.add_node(303nodes::in_memory_sink::InMemorySinkNode::new(input_schema),304[(input_key, input.port)],305)306},307308CallbackSink {309input,310function,311maintain_order,312chunk_size,313} => {314let input_key = to_graph_rec(input.node, ctx)?;315ctx.graph.add_node(316nodes::callback_sink::CallbackSinkNode::new(317function.clone(),318*maintain_order,319*chunk_size,320),321[(input_key, input.port)],322)323},324325FileSink {326input,327options:328FileSinkOptions {329target,330file_format,331unified_sink_args,332},333} => {334use crate::nodes::io_sinks::IOSinkNode;335use crate::nodes::io_sinks::config::{IOSinkNodeConfig, IOSinkTarget};336337let input_schema = ctx.phys_sm[input.node].output_schema.clone();338let input_key = to_graph_rec(input.node, ctx)?;339340let target = IOSinkTarget::File(target.clone());341342let config = IOSinkNodeConfig {343file_format: file_format.clone(),344target,345unified_sink_args: unified_sink_args.clone(),346input_schema,347};348349ctx.graph350.add_node(IOSinkNode::new(config), [(input_key, input.port)])351},352353PartitionedSink {354input,355options:356PartitionedSinkOptionsIR {357base_path,358file_path_provider,359partition_strategy,360file_format,361unified_sink_args,362max_rows_per_file,363approximate_bytes_per_file,364},365} => {366use crate::nodes::io_sinks::IOSinkNode;367use crate::nodes::io_sinks::components::exclude_keys_projection::ExcludeKeysProjection;368use crate::nodes::io_sinks::components::hstack_columns::HStackColumns;369use crate::nodes::io_sinks::components::partitioner::{KeyedPartitioner, Partitioner};370use crate::nodes::io_sinks::components::size::{371NonZeroRowCountAndSize, RowCountAndSize,372};373use crate::nodes::io_sinks::config::{374IOSinkNodeConfig, IOSinkTarget, PartitionedTarget,375};376377let input_schema = ctx.phys_sm[input.node].output_schema.clone();378let input_key = to_graph_rec(input.node, ctx)?;379380let file_schema: SchemaRef;381let mut hstack_keys: Option<HStackColumns> = None;382let mut include_keys_in_file = false;383384let partitioner: Partitioner = match partition_strategy {385PartitionStrategyIR::Keyed {386keys,387include_keys,388keys_pre_grouped: _,389} => {390include_keys_in_file = *include_keys;391392let mut key_schema = Schema::with_capacity(keys.len());393394let key_exprs = keys395.iter()396.map(|e| {397let field = e.field(input_schema.as_ref(), ctx.expr_arena)?;398key_schema.extend([field]);399400create_stream_expr(e, ctx, &input_schema)401})402.collect::<PolarsResult<_>>()?;403404let exclude_keys_projection: Arc<[usize]> = input_schema405.iter_names()406.enumerate()407.filter_map(|(i, name)| (!key_schema.contains(name)).then_some(i))408.collect::<Arc<[_]>>();409410let exclude_keys_projection =411if exclude_keys_projection.len() == input_schema.len() {412ExcludeKeysProjection::Width(exclude_keys_projection.len())413} else {414ExcludeKeysProjection::Indices(exclude_keys_projection)415};416417let schema_excluding_keys: Schema = exclude_keys_projection418.iter_indices()419.map(|i| {420let (name, dtype) = input_schema.get_at_index(i).unwrap();421(name.clone(), dtype.clone())422})423.collect();424425let mut schema_including_keys = Arc::unwrap_or_clone(input_schema.clone());426427for (name, dtype) in key_schema.iter() {428schema_including_keys.with_column(name.clone(), dtype.clone());429}430431let schema_including_keys = Arc::new(schema_including_keys);432433hstack_keys = Some(HStackColumns::new(434&schema_including_keys,435&schema_excluding_keys,436&key_schema,437));438439file_schema = if *include_keys {440Arc::clone(&schema_including_keys)441} else {442Arc::new(schema_excluding_keys)443};444445let keyed = KeyedPartitioner {446key_exprs,447exclude_keys_projection: Some(exclude_keys_projection),448};449450Partitioner::Keyed(keyed)451},452PartitionStrategyIR::FileSize => {453file_schema = input_schema.clone();454Partitioner::FileSize455},456};457458let mut file_size_limit = RowCountAndSize::MAX;459460if *max_rows_per_file > 0 {461file_size_limit.num_rows = *max_rows_per_file462}463464if *approximate_bytes_per_file > 0 {465file_size_limit.num_bytes = *approximate_bytes_per_file466}467468let file_size_limit = (file_size_limit != RowCountAndSize::MAX)469.then_some(NonZeroRowCountAndSize::new(file_size_limit).unwrap());470471let target = IOSinkTarget::Partitioned(Box::new(PartitionedTarget {472base_path: base_path.clone(),473file_path_provider: file_path_provider.clone(),474partitioner,475hstack_keys,476include_keys_in_file,477file_schema,478file_size_limit,479}));480481let config = IOSinkNodeConfig {482file_format: file_format.clone(),483target,484unified_sink_args: unified_sink_args.clone(),485input_schema,486};487488ctx.graph489.add_node(IOSinkNode::new(config), [(input_key, input.port)])490},491492InMemoryMap {493input,494map,495format_str: _,496} => {497let input_schema = ctx.phys_sm[input.node].output_schema.clone();498let input_key = to_graph_rec(input.node, ctx)?;499ctx.graph.add_node(500nodes::in_memory_map::InMemoryMapNode::new(input_schema, map.clone()),501[(input_key, input.port)],502)503},504505Map {506input,507map,508format_str: _,509} => {510let input_key = to_graph_rec(input.node, ctx)?;511ctx.graph.add_node(512nodes::map::MapNode::new(map.clone()),513[(input_key, input.port)],514)515},516517SortedGroupBy {518input,519key,520aggs,521slice,522} => {523let input_schema = ctx.phys_sm[input.node].output_schema.clone();524let input_key = to_graph_rec(input.node, ctx)?;525let aggs = aggs526.iter()527.map(|e| {528Ok((529e.output_name().clone(),530create_stream_expr(e, ctx, &input_schema)?,531))532})533.collect::<PolarsResult<Arc<[_]>>>()?;534535ctx.graph.add_node(536nodes::sorted_group_by::SortedGroupBy::new(key.clone(), aggs, *slice, input_schema),537[(input_key, input.port)],538)539},540541Sort {542input,543by_column,544slice,545sort_options,546} => {547let input_schema = ctx.phys_sm[input.node].output_schema.clone();548let lmdf = Arc::new(LateMaterializedDataFrame::default());549let mut lp_arena = Arena::default();550let df_node = lp_arena.add(lmdf.clone().as_ir_node(input_schema.clone()));551let sort_node = lp_arena.add(IR::Sort {552input: df_node,553by_column: by_column.clone(),554slice: slice.map(|t| (t.0, t.1, None)),555sort_options: sort_options.clone(),556});557let executor = Mutex::new(create_physical_plan(558sort_node,559&mut lp_arena,560ctx.expr_arena,561Some(crate::dispatch::build_streaming_query_executor),562)?);563564let input_key = to_graph_rec(input.node, ctx)?;565ctx.graph.add_node(566nodes::in_memory_map::InMemoryMapNode::new(567input_schema,568Arc::new(move |df| {569lmdf.set_materialized_dataframe(df);570let mut state = ExecutionState::new();571executor.lock().execute(&mut state)572}),573),574[(input_key, input.port)],575)576},577578TopK {579input,580k,581by_column,582reverse,583nulls_last,584dyn_pred,585} => {586let input_key = to_graph_rec(input.node, ctx)?;587let k_key = to_graph_rec(k.node, ctx)?;588589let k_schema = ctx.phys_sm[k.node].output_schema.clone();590let input_schema = &ctx.phys_sm[input.node].output_schema;591let key_schema = compute_output_schema(input_schema, by_column, ctx.expr_arena)?;592593let key_selectors = by_column594.iter()595.map(|e| create_stream_expr(e, ctx, input_schema))596.try_collect_vec()?;597598ctx.graph.add_node(599nodes::top_k::TopKNode::new(600k_schema,601reverse.clone(),602nulls_last.clone(),603key_schema,604key_selectors,605dyn_pred.clone(),606),607[(input_key, input.port), (k_key, k.port)],608)609},610611Repeat { value, repeats } => {612let value_key = to_graph_rec(value.node, ctx)?;613let repeats_key = to_graph_rec(repeats.node, ctx)?;614let value_schema = ctx.phys_sm[value.node].output_schema.clone();615let repeats_schema = ctx.phys_sm[repeats.node].output_schema.clone();616ctx.graph.add_node(617nodes::repeat::RepeatNode::new(value_schema, repeats_schema),618[(value_key, value.port), (repeats_key, repeats.port)],619)620},621622#[cfg(feature = "cum_agg")]623CumAgg { input, kind } => {624let input_key = to_graph_rec(input.node, ctx)?;625ctx.graph.add_node(626nodes::cum_agg::CumAggNode::new(*kind),627[(input_key, input.port)],628)629},630631GatherEvery { input, n, offset } => {632let (n, offset) = (*n, *offset);633let input_key = to_graph_rec(input.node, ctx)?;634ctx.graph.add_node(635nodes::gather_every::GatherEveryNode::new(n, offset)?,636[(input_key, input.port)],637)638},639640Rle(input) => {641let input_key = to_graph_rec(input.node, ctx)?;642let input_schema = &ctx.phys_sm[input.node].output_schema;643assert_eq!(input_schema.len(), 1);644let (name, dtype) = input_schema.get_at_index(0).unwrap();645ctx.graph.add_node(646nodes::rle::RleNode::new(name.clone(), dtype.clone()),647[(input_key, input.port)],648)649},650651RleId(input) => {652let input_key = to_graph_rec(input.node, ctx)?;653let input_schema = &ctx.phys_sm[input.node].output_schema;654assert_eq!(input_schema.len(), 1);655let (_, dtype) = input_schema.get_at_index(0).unwrap();656ctx.graph.add_node(657nodes::rle_id::RleIdNode::new(dtype.clone()),658[(input_key, input.port)],659)660},661662PeakMinMax { input, is_peak_max } => {663let input_key = to_graph_rec(input.node, ctx)?;664ctx.graph.add_node(665nodes::peak_minmax::PeakMinMaxNode::new(*is_peak_max),666[(input_key, input.port)],667)668},669670OrderedUnion { inputs } => {671let input_keys = inputs672.iter()673.map(|i| PolarsResult::Ok((to_graph_rec(i.node, ctx)?, i.port)))674.try_collect_vec()?;675ctx.graph.add_node(676nodes::ordered_union::OrderedUnionNode::new(node.output_schema.clone()),677input_keys,678)679},680681UnorderedUnion { inputs } => {682let input_keys = inputs683.iter()684.map(|i| PolarsResult::Ok((to_graph_rec(i.node, ctx)?, i.port)))685.try_collect_vec()?;686ctx.graph.add_node(687nodes::unordered_union::UnorderedUnionNode::new(node.output_schema.clone()),688input_keys,689)690},691692Zip {693inputs,694zip_behavior,695} => {696let input_schemas = inputs697.iter()698.map(|i| ctx.phys_sm[i.node].output_schema.clone())699.collect_vec();700let input_keys = inputs701.iter()702.map(|i| PolarsResult::Ok((to_graph_rec(i.node, ctx)?, i.port)))703.try_collect_vec()?;704ctx.graph.add_node(705nodes::zip::ZipNode::new(*zip_behavior, input_schemas),706input_keys,707)708},709710Multiplexer { input } => {711let input_key = to_graph_rec(input.node, ctx)?;712ctx.graph.add_node(713nodes::multiplexer::MultiplexerNode::new(),714[(input_key, input.port)],715)716},717718MultiScan {719scan_sources,720file_reader_builder,721cloud_options,722file_projection_builder,723output_schema,724row_index,725pre_slice,726predicate,727predicate_file_skip_applied,728hive_parts,729missing_columns_policy,730cast_columns_policy,731include_file_paths,732forbid_extra_columns,733deletion_files,734table_statistics,735file_schema,736disable_morsel_split,737} => {738let hive_parts = hive_parts.clone();739740let predicate = predicate741.as_ref()742.map(|pred| {743create_scan_predicate(744pred,745ctx.expr_arena,746output_schema,747hive_parts.as_ref().map(|hp| hp.df().schema().as_ref()),748&mut ctx.expr_conversion_state,749true, // create_skip_batch_predicate750file_reader_builder751.reader_capabilities()752.contains(ReaderCapabilities::PARTIAL_FILTER), // create_column_predicates753)754})755.transpose()?756.map(|p| p.to_io(None, file_schema.clone()));757let predicate_file_skip_applied = *predicate_file_skip_applied;758759let sources = scan_sources.clone();760let file_reader_builder = file_reader_builder.clone();761let cloud_options = cloud_options.clone();762763let final_output_schema = output_schema.clone();764let file_projection_builder = file_projection_builder.clone();765766let row_index = row_index.clone();767let pre_slice = pre_slice.clone();768let hive_parts = hive_parts.map(Arc::new);769let include_file_paths = include_file_paths.clone();770let missing_columns_policy = *missing_columns_policy;771let forbid_extra_columns = forbid_extra_columns.clone();772let cast_columns_policy = cast_columns_policy.clone();773let deletion_files = deletion_files.clone();774let table_statistics = table_statistics.clone();775let disable_morsel_split = *disable_morsel_split;776777let verbose = config::verbose();778779ctx.graph.add_node(780nodes::io_sources::multi_scan::MultiScan::new(Arc::new(MultiScanConfig {781sources,782file_reader_builder,783cloud_options,784final_output_schema,785file_projection_builder,786row_index,787pre_slice,788predicate,789predicate_file_skip_applied,790hive_parts,791include_file_paths,792missing_columns_policy,793forbid_extra_columns,794cast_columns_policy,795deletion_files,796table_statistics,797// Initialized later798num_pipelines: RelaxedCell::new_usize(0),799n_readers_pre_init: RelaxedCell::new_usize(0),800max_concurrent_scans: RelaxedCell::new_usize(0),801disable_morsel_split,802io_metrics: OnceLock::default(),803verbose,804})),805[],806)807},808809GroupBy {810inputs,811key_per_input,812aggs_per_input,813} => {814let mut key_ports = Vec::new();815let mut key_schema_per_input = Vec::new();816let mut key_selectors_per_input = Vec::new();817let mut reductions_per_input = Vec::new();818let mut grouped_reductions = Vec::new();819let mut grouped_reduction_cols = Vec::new();820let mut has_order_sensitive_agg = false;821for ((input, key), aggs) in inputs.iter().zip(key_per_input).zip(aggs_per_input) {822let input_key = to_graph_rec(input.node, ctx)?;823key_ports.push((input_key, input.port));824825let input_schema = &ctx.phys_sm[input.node].output_schema;826let key_schema = compute_output_schema(input_schema, key, ctx.expr_arena)?;827key_schema_per_input.push(key_schema);828829let key_selectors = key830.iter()831.map(|e| create_stream_expr(e, ctx, input_schema))832.try_collect_vec()?;833key_selectors_per_input.push(key_selectors);834835let mut reductions_for_this_input = Vec::new();836for agg in aggs {837has_order_sensitive_agg |= matches!(838ctx.expr_arena.get(agg.node()),839AExpr::Agg(840IRAggExpr::First(_)841| IRAggExpr::FirstNonNull(_)842| IRAggExpr::Last(_)843| IRAggExpr::LastNonNull(_)844)845);846let (reduction, input_nodes) =847into_reduction(agg.node(), ctx.expr_arena, input_schema, true)?;848let cols = input_nodes849.iter()850.map(|node| {851let AExpr::Column(col) = ctx.expr_arena.get(*node) else {852unreachable!()853};854col.clone()855})856.collect();857reductions_for_this_input.push(grouped_reductions.len());858grouped_reductions.push(reduction);859grouped_reduction_cols.push(cols);860}861862reductions_per_input.push(reductions_for_this_input);863}864865let key_schema = key_schema_per_input.swap_remove(0);866assert!(key_schema_per_input.iter().all(|s| **s == *key_schema));867868let grouper = new_hash_grouper(key_schema.clone());869ctx.graph.add_node(870nodes::group_by::GroupByNode::new(871key_schema,872key_selectors_per_input,873reductions_per_input,874grouper,875grouped_reduction_cols,876grouped_reductions,877node.output_schema.clone(),878PlRandomState::default(),879ctx.num_pipelines,880has_order_sensitive_agg,881),882key_ports,883)884},885886#[cfg(feature = "dynamic_group_by")]887DynamicGroupBy {888input,889options,890aggs,891slice,892} => {893let input_schema = &ctx.phys_sm[input.node].output_schema;894let input_key = to_graph_rec(input.node, ctx)?;895let aggs = aggs896.iter()897.map(|e| {898Ok((899e.output_name().clone(),900create_stream_expr(e, ctx, input_schema)?,901))902})903.collect::<PolarsResult<Arc<[_]>>>()?;904ctx.graph.add_node(905nodes::dynamic_group_by::DynamicGroupBy::new(906input_schema.clone(),907options.clone(),908aggs,909*slice,910)?,911[(input_key, input.port)],912)913},914#[cfg(feature = "dynamic_group_by")]915RollingGroupBy {916input,917index_column,918period,919offset,920closed,921slice,922aggs,923} => {924let input_schema = &ctx.phys_sm[input.node].output_schema;925let input_key = to_graph_rec(input.node, ctx)?;926let aggs = aggs927.iter()928.map(|e| {929Ok((930e.output_name().clone(),931create_stream_expr(e, ctx, input_schema)?,932))933})934.collect::<PolarsResult<Arc<[_]>>>()?;935ctx.graph.add_node(936nodes::rolling_group_by::RollingGroupBy::new(937input_schema.clone(),938index_column.clone(),939*period,940*offset,941*closed,942*slice,943aggs,944)?,945[(input_key, input.port)],946)947},948949InMemoryJoin {950input_left,951input_right,952left_on,953right_on,954args,955options,956} => {957let left_input_key = to_graph_rec(input_left.node, ctx)?;958let right_input_key = to_graph_rec(input_right.node, ctx)?;959let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();960let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();961962let mut lp_arena = Arena::default();963let left_lmdf = Arc::new(LateMaterializedDataFrame::default());964let right_lmdf = Arc::new(LateMaterializedDataFrame::default());965966let left_node = lp_arena.add(left_lmdf.clone().as_ir_node(left_input_schema.clone()));967let right_node =968lp_arena.add(right_lmdf.clone().as_ir_node(right_input_schema.clone()));969let join_node = lp_arena.add(IR::Join {970input_left: left_node,971input_right: right_node,972schema: node.output_schema.clone(),973left_on: left_on.clone(),974right_on: right_on.clone(),975options: Arc::new(JoinOptionsIR {976allow_parallel: true,977force_parallel: false,978args: args.clone(),979options: options.clone(),980}),981});982983let executor = Mutex::new(create_physical_plan(984join_node,985&mut lp_arena,986ctx.expr_arena,987Some(crate::dispatch::build_streaming_query_executor),988)?);989990ctx.graph.add_node(991nodes::joins::in_memory::InMemoryJoinNode::new(992left_input_schema,993right_input_schema,994Arc::new(move |left, right| {995left_lmdf.set_materialized_dataframe(left);996right_lmdf.set_materialized_dataframe(right);997let mut state = ExecutionState::new();998executor.lock().execute(&mut state)999}),1000),1001[1002(left_input_key, input_left.port),1003(right_input_key, input_right.port),1004],1005)1006},10071008EquiJoin {1009input_left,1010input_right,1011left_on,1012right_on,1013args,1014}1015| SemiAntiJoin {1016input_left,1017input_right,1018left_on,1019right_on,1020args,1021output_bool: _,1022} => {1023let args = args.clone();1024let left_input_key = to_graph_rec(input_left.node, ctx)?;1025let right_input_key = to_graph_rec(input_right.node, ctx)?;1026let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();1027let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();10281029let left_key_schema =1030compute_output_schema(&left_input_schema, left_on, ctx.expr_arena)?;1031let right_key_schema =1032compute_output_schema(&right_input_schema, right_on, ctx.expr_arena)?;10331034// We want to make sure here that the key types match otherwise we get out garbage out1035// since the hashes will be calculated differently.1036polars_ensure!(1037left_on.len() == right_on.len() &&1038left_on.iter().zip(right_on.iter()).all(|(l, r)| {1039let l_dtype = left_key_schema.get(l.output_name()).unwrap();1040let r_dtype = right_key_schema.get(r.output_name()).unwrap();1041l_dtype == r_dtype1042}),1043SchemaMismatch: "join received different key types on left and right side"1044);10451046// We use key columns entirely by position, and allow duplicate names in key selectors,1047// so just assign arbitrary unique names for the selectors.1048let unique_left_on = left_on1049.iter()1050.enumerate()1051.map(|(i, expr)| expr.with_alias(format_pl_smallstr!("__POLARS_KEYCOL_{i}")))1052.collect_vec();1053let unique_right_on = right_on1054.iter()1055.enumerate()1056.map(|(i, expr)| expr.with_alias(format_pl_smallstr!("__POLARS_KEYCOL_{i}")))1057.collect_vec();10581059let left_key_selectors = unique_left_on1060.iter()1061.map(|e| create_stream_expr(e, ctx, &left_input_schema))1062.try_collect_vec()?;1063let right_key_selectors = unique_right_on1064.iter()1065.map(|e| create_stream_expr(e, ctx, &right_input_schema))1066.try_collect_vec()?;10671068let unique_key_schema =1069compute_output_schema(&right_input_schema, &unique_left_on, ctx.expr_arena)?;10701071match node.kind {1072#[cfg(feature = "semi_anti_join")]1073SemiAntiJoin { output_bool, .. } => ctx.graph.add_node(1074nodes::joins::semi_anti_join::SemiAntiJoinNode::new(1075unique_key_schema,1076left_key_selectors,1077right_key_selectors,1078args,1079output_bool,1080ctx.num_pipelines,1081)?,1082[1083(left_input_key, input_left.port),1084(right_input_key, input_right.port),1085],1086),1087_ => ctx.graph.add_node(1088nodes::joins::equi_join::EquiJoinNode::new(1089left_input_schema,1090right_input_schema,1091left_key_schema,1092right_key_schema,1093unique_key_schema,1094left_key_selectors,1095right_key_selectors,1096args,1097ctx.num_pipelines,1098)?,1099[1100(left_input_key, input_left.port),1101(right_input_key, input_right.port),1102],1103),1104}1105},11061107MergeJoin {1108input_left,1109input_right,1110left_on,1111right_on,1112tmp_left_key_col,1113tmp_right_key_col,1114descending,1115nulls_last,1116keys_row_encoded,1117args,1118} => {1119let args = args.clone();1120let left_input_key = to_graph_rec(input_left.node, ctx)?;1121let right_input_key = to_graph_rec(input_right.node, ctx)?;1122let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();1123let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();1124let output_schema = node.output_schema.clone();11251126ctx.graph.add_node(1127MergeJoinNode::new(1128left_input_schema,1129right_input_schema,1130output_schema,1131left_on.clone(),1132right_on.clone(),1133tmp_left_key_col.clone(),1134tmp_right_key_col.clone(),1135*descending,1136*nulls_last,1137*keys_row_encoded,1138args,1139)?,1140[1141(left_input_key, input_left.port),1142(right_input_key, input_right.port),1143],1144)1145},11461147CrossJoin {1148input_left,1149input_right,1150args,1151} => {1152let args = args.clone();1153let left_input_key = to_graph_rec(input_left.node, ctx)?;1154let right_input_key = to_graph_rec(input_right.node, ctx)?;1155let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();1156let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();11571158ctx.graph.add_node(1159nodes::joins::cross_join::CrossJoinNode::new(1160left_input_schema,1161right_input_schema,1162&args,1163),1164[1165(left_input_key, input_left.port),1166(right_input_key, input_right.port),1167],1168)1169},11701171AsOfJoin {1172input_left,1173input_right,1174left_on,1175right_on,1176tmp_left_key_col,1177tmp_right_key_col,1178args,1179} => {1180let args = args.clone();1181let left_input_key = to_graph_rec(input_left.node, ctx)?;1182let right_input_key = to_graph_rec(input_right.node, ctx)?;1183let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();1184let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();1185#[cfg(feature = "asof_join")]1186{1187ctx.graph.add_node(1188nodes::joins::asof_join::AsOfJoinNode::new(1189left_input_schema,1190right_input_schema,1191left_on.clone(),1192right_on.clone(),1193tmp_left_key_col.clone(),1194tmp_right_key_col.clone(),1195args,1196),1197[1198(left_input_key, input_left.port),1199(right_input_key, input_right.port),1200],1201)1202}1203#[cfg(not(feature = "asof_join"))]1204{1205unreachable!("asof_join feature is disabled")1206}1207},12081209#[cfg(feature = "merge_sorted")]1210MergeSorted {1211input_left,1212input_right,1213} => {1214let left_input_key = to_graph_rec(input_left.node, ctx)?;1215let right_input_key = to_graph_rec(input_right.node, ctx)?;1216ctx.graph.add_node(1217nodes::merge_sorted::MergeSortedNode::new(),1218[1219(left_input_key, input_left.port),1220(right_input_key, input_right.port),1221],1222)1223},12241225#[cfg(feature = "python")]1226PythonScan { options } => {1227use polars_buffer::Buffer;1228use polars_plan::dsl::python_dsl::PythonScanSource as S;1229use polars_plan::plans::PythonPredicate;1230use polars_utils::relaxed_cell::RelaxedCell;1231use pyo3::exceptions::PyStopIteration;1232use pyo3::prelude::*;1233use pyo3::types::{PyBytes, PyNone};1234use pyo3::{IntoPyObjectExt, PyTypeInfo, intern};12351236let mut options = options.clone();1237let with_columns = options.with_columns.take();1238let n_rows = options.n_rows.take();12391240let python_scan_function = options.scan_fn.take().unwrap().0;12411242let with_columns = with_columns.map(|cols| cols.iter().cloned().collect::<Vec<_>>());12431244let (pl_predicate, predicate_serialized) = polars_mem_engine::python_scan_predicate(1245&mut options,1246ctx.expr_arena,1247&mut ctx.expr_conversion_state,1248)?;12491250let output_schema = options.output_schema.unwrap_or(options.schema);1251let validate_schema = options.validate_schema;12521253let simple_projection = with_columns.as_ref().and_then(|with_columns| {1254(with_columns1255.iter()1256.zip(output_schema.iter_names())1257.any(|(a, b)| a != b))1258.then(|| output_schema.clone())1259});12601261let (name, get_batch_fn) = match options.python_source {1262S::Pyarrow => todo!(),1263S::Cuda => todo!(),1264S::IOPlugin => {1265let batch_size = Some(get_ideal_morsel_size());1266let output_schema = output_schema.clone();12671268let with_columns = with_columns.map(|x| {1269x.into_iter()1270.map(|x| x.to_string())1271.collect::<Vec<String>>()1272});12731274// Setup the IO plugin generator.1275let (generator, can_parse_predicate) = {1276Python::attach(|py| {1277let pl = PyModule::import(py, intern!(py, "polars")).unwrap();1278let utils = pl.getattr(intern!(py, "_utils")).unwrap();1279let callable =1280utils.getattr(intern!(py, "_execute_from_rust")).unwrap();12811282let mut could_serialize_predicate = true;1283let predicate = match &options.predicate {1284PythonPredicate::PyArrow(s) => s.into_bound_py_any(py).unwrap(),1285PythonPredicate::None => None::<()>.into_bound_py_any(py).unwrap(),1286PythonPredicate::Polars(_) => {1287assert!(pl_predicate.is_some(), "should be set");1288match &predicate_serialized {1289None => {1290could_serialize_predicate = false;1291PyNone::get(py).to_owned().into_any()1292},1293Some(buf) => PyBytes::new(py, buf).into_any(),1294}1295},1296};12971298let args = (1299python_scan_function,1300with_columns,1301predicate,1302n_rows,1303batch_size,1304);13051306let generator_init = callable.call1(args)?;1307let generator = generator_init.get_item(0).map_err(1308|_| polars_err!(ComputeError: "expected tuple got {generator_init}"),1309)?;1310let can_parse_predicate = generator_init.get_item(1).map_err(1311|_| polars_err!(ComputeError: "expected tuple got {generator}"),1312)?;1313let can_parse_predicate = can_parse_predicate.extract::<bool>().map_err(1314|_| polars_err!(ComputeError: "expected bool got {can_parse_predicate}"),1315)? && could_serialize_predicate;13161317let generator = generator.into_py_any(py).map_err(1318|_| polars_err!(ComputeError: "unable to grab reference to IO plugin generator"),1319)?;13201321PolarsResult::Ok((generator, can_parse_predicate))1322})1323}?;13241325let get_batch_fn = Box::new(move |state: &StreamingExecutionState| {1326let df = Python::attach(|py| {1327match generator.bind(py).call_method0(intern!(py, "__next__")) {1328Ok(out) => polars_plan::plans::python_df_to_rust(py, out).map(Some),1329Err(err)1330if err.matches(py, PyStopIteration::type_object(py))? =>1331{1332Ok(None)1333},1334Err(err) => polars_bail!(1335ComputeError: "caught exception during execution of a Python source, exception: {err}"1336),1337}1338})?;13391340let Some(mut df) = df else { return Ok(None) };13411342if let Some(simple_projection) = &simple_projection {1343df = unsafe {1344df.select_unchecked(simple_projection.iter_names())?1345.with_schema(simple_projection.clone())1346};1347}13481349if validate_schema {1350polars_ensure!(1351df.schema() == &output_schema,1352SchemaMismatch: "user provided schema: {:?} doesn't match the DataFrame schema: {:?}",1353output_schema, df.schema()1354);1355}13561357// TODO: Move this to a FilterNode so that it happens in parallel. We may need1358// to move all of the enclosing code to `lower_ir` for this.1359if let (Some(pred), false) = (&pl_predicate, can_parse_predicate) {1360let mask = pred.evaluate(&df, &state.in_memory_exec_state)?;1361df = df.filter(mask.bool()?)?;1362}13631364Ok(Some(df))1365}) as Box<_>;13661367(PlSmallStr::from_static("io_plugin"), get_batch_fn)1368},1369};13701371use polars_plan::dsl::{CastColumnsPolicy, MissingColumnsPolicy};13721373use crate::nodes::io_sources::batch::builder::BatchFnReaderBuilder;1374use crate::nodes::io_sources::batch::{BatchFnReader, GetBatchState};1375use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;13761377let reader = BatchFnReader {1378name: name.clone(),1379// If validate_schema is false, the schema of the morsels may not match the1380// configured schema. In this case we set this to `None` and the reader will1381// retrieve the schema from the first morsel.1382output_schema: validate_schema.then(|| output_schema.clone()),1383get_batch_state: Some(GetBatchState::from(get_batch_fn)),1384execution_state: None,1385verbose: config::verbose(),1386};13871388let file_reader_builder = Arc::new(BatchFnReaderBuilder {1389name,1390reader: std::sync::Mutex::new(Some(reader)),1391execution_state: Default::default(),1392}) as Arc<dyn FileReaderBuilder>;13931394// Give multiscan a single scan source. (It doesn't actually read from this).1395let sources = ScanSources::Paths(Buffer::from_iter([PlRefPath::new("python-scan-0")]));1396let cloud_options = None;1397let final_output_schema = output_schema.clone();1398let file_projection_builder = ProjectionBuilder::new(output_schema, None, None);1399let row_index = None;1400let pre_slice = None;1401let predicate = None;1402let predicate_file_skip_applied = None;1403let hive_parts = None;1404let include_file_paths = None;1405let missing_columns_policy = MissingColumnsPolicy::Raise;1406let forbid_extra_columns = None;1407let cast_columns_policy = CastColumnsPolicy::ERROR_ON_MISMATCH;1408let deletion_files = None;1409let table_statistics = None;1410let disable_morsel_split = false;1411let verbose = config::verbose();14121413ctx.graph.add_node(1414nodes::io_sources::multi_scan::MultiScan::new(Arc::new(MultiScanConfig {1415sources,1416file_reader_builder,1417cloud_options,1418final_output_schema,1419file_projection_builder,1420row_index,1421pre_slice,1422predicate,1423predicate_file_skip_applied,1424hive_parts,1425include_file_paths,1426missing_columns_policy,1427forbid_extra_columns,1428cast_columns_policy,1429deletion_files,1430table_statistics,1431// Initialized later1432num_pipelines: RelaxedCell::new_usize(0),1433n_readers_pre_init: RelaxedCell::new_usize(0),1434max_concurrent_scans: RelaxedCell::new_usize(0),1435disable_morsel_split,1436io_metrics: OnceLock::default(),1437verbose,1438})),1439[],1440)1441},14421443#[cfg(feature = "ewma")]1444ewm_variant @ EwmMean { input, options }1445| ewm_variant @ EwmVar { input, options }1446| ewm_variant @ EwmStd { input, options } => {1447use nodes::ewm::EwmNode;1448use polars_compute::ewm::mean::EwmMeanState;1449use polars_compute::ewm::{EwmCovState, EwmStateUpdate, EwmStdState, EwmVarState};1450use polars_core::with_match_physical_float_type;14511452let input_key = to_graph_rec(input.node, ctx)?;1453let input_schema = &ctx.phys_sm[input.node].output_schema;1454let (_, dtype) = input_schema.get_at_index(0).unwrap();14551456let state: Box<dyn EwmStateUpdate + Send> = match ewm_variant {1457EwmMean { .. } => {1458with_match_physical_float_type!(dtype, |$T| {1459let state: EwmMeanState<$T> = EwmMeanState::new(1460AsPrimitive::<$T>::as_(options.alpha),1461options.adjust,1462options.min_periods,1463options.ignore_nulls,1464);14651466Box::new(state)1467})1468},1469_ => with_match_physical_float_type!(dtype, |$T| {1470let state: EwmCovState<$T> = EwmCovState::new(1471AsPrimitive::<$T>::as_(options.alpha),1472options.adjust,1473options.bias,1474options.min_periods,1475options.ignore_nulls,1476);14771478match ewm_variant {1479EwmVar { .. } => Box::new(EwmVarState::new(state)),1480EwmStd { .. } => Box::new(EwmStdState::new(state)),1481_ => unreachable!(),1482}1483}),1484};14851486let name = match ewm_variant {1487EwmMean { .. } => "ewm-mean",1488EwmVar { .. } => "ewm-var",1489EwmStd { .. } => "ewm-std",1490_ => unreachable!(),1491};14921493let node = EwmNode::new(name, state);14941495ctx.graph.add_node(node, [(input_key, input.port)])1496},1497};14981499ctx.phys_to_graph.insert(phys_node_key, graph_key);1500Ok(graph_key)1501}150215031504