Path: blob/main/crates/polars-stream/src/physical_plan/lower_ir.rs
8448 views
use std::sync::Arc;12use parking_lot::Mutex;3use polars_core::frame::{DataFrame, UniqueKeepStrategy};4use polars_core::prelude::{DataType, PlHashMap, PlHashSet};5use polars_core::scalar::Scalar;6use polars_core::schema::Schema;7use polars_core::{SchemaExtPl, config};8use polars_error::PolarsResult;9use polars_expr::state::ExecutionState;10use polars_mem_engine::create_physical_plan;11use polars_ops::frame::JoinType;12use polars_plan::constants::get_literal_name;13use polars_plan::dsl::default_values::DefaultFieldValues;14use polars_plan::dsl::deletion::DeletionFilesList;15use polars_plan::dsl::{CallbackSinkType, ExtraColumnsPolicy, FileScanIR, SinkTypeIR};16use polars_plan::plans::expr_ir::{ExprIR, OutputName};17use polars_plan::plans::{18AExpr, FunctionIR, IR, IRAggExpr, LiteralValue, are_keys_sorted_any, is_sorted,19write_ir_non_recursive,20};21use polars_plan::prelude::*;22use polars_utils::arena::{Arena, Node};23use polars_utils::itertools::Itertools;24use polars_utils::pl_str::PlSmallStr;25#[cfg(feature = "parquet")]26use polars_utils::relaxed_cell::RelaxedCell;27use polars_utils::row_counter::RowCounter;28use polars_utils::slice_enum::Slice;29use polars_utils::unique_id::UniqueId;30use polars_utils::{IdxSize, format_pl_smallstr, unique_column_name};31use slotmap::SlotMap;3233use super::lower_expr::build_hstack_stream;34use super::{PhysNode, PhysNodeKey, PhysNodeKind, PhysStream};35use crate::nodes::io_sources::multi_scan;36use crate::nodes::io_sources::multi_scan::components::forbid_extra_columns::ForbidExtraColumns;37use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;38use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;39use crate::physical_plan::ZipBehavior;40use crate::physical_plan::lower_expr::{ExprCache, build_select_stream, lower_exprs};41use crate::physical_plan::lower_group_by::build_group_by_stream;42use crate::utils::late_materialized_df::LateMaterializedDataFrame;4344/// Creates a new PhysStream which outputs a slice of the input stream.45pub fn build_slice_stream(46input: PhysStream,47offset: i64,48length: usize,49phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,50) -> PhysStream {51if offset >= 0 {52let offset = offset as usize;53PhysStream::first(phys_sm.insert(PhysNode::new(54phys_sm[input.node].output_schema.clone(),55PhysNodeKind::StreamingSlice {56input,57offset,58length,59},60)))61} else {62PhysStream::first(phys_sm.insert(PhysNode::new(63phys_sm[input.node].output_schema.clone(),64PhysNodeKind::NegativeSlice {65input,66offset,67length,68},69)))70}71}7273/// Creates a new PhysStream which is filters the input stream.74pub(super) fn build_filter_stream(75input: PhysStream,76predicate: ExprIR,77expr_arena: &mut Arena<AExpr>,78phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,79expr_cache: &mut ExprCache,80ctx: StreamingLowerIRContext,81) -> PolarsResult<PhysStream> {82let predicate = predicate;83let cols_and_predicate = phys_sm[input.node]84.output_schema85.iter_names()86.cloned()87.map(|name| {88ExprIR::new(89expr_arena.add(AExpr::Column(name.clone())),90OutputName::ColumnLhs(name),91)92})93.chain([predicate])94.collect_vec();95let (trans_input, mut trans_cols_and_predicate) = lower_exprs(96input,97&cols_and_predicate,98expr_arena,99phys_sm,100expr_cache,101ctx,102)?;103104let filter_schema = phys_sm[trans_input.node].output_schema.clone();105let filter = PhysNodeKind::Filter {106input: trans_input,107predicate: trans_cols_and_predicate.last().unwrap().clone(),108};109110let post_filter = phys_sm.insert(PhysNode::new(filter_schema, filter));111trans_cols_and_predicate.pop(); // Remove predicate.112build_select_stream(113PhysStream::first(post_filter),114&trans_cols_and_predicate,115expr_arena,116phys_sm,117expr_cache,118ctx,119)120}121122/// Creates a new PhysStream with row index attached with the given name.123pub fn build_row_idx_stream(124input: PhysStream,125name: PlSmallStr,126offset: Option<IdxSize>,127phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,128) -> PhysStream {129let input_schema = &phys_sm[input.node].output_schema;130let mut output_schema = (**input_schema).clone();131output_schema132.insert_at_index(0, name.clone(), DataType::IDX_DTYPE)133.unwrap();134let kind = PhysNodeKind::WithRowIndex {135input,136name,137offset,138};139let with_row_idx_node_key = phys_sm.insert(PhysNode::new(Arc::new(output_schema), kind));140PhysStream::first(with_row_idx_node_key)141}142143#[derive(Debug, Clone, Copy)]144pub struct StreamingLowerIRContext {145pub prepare_visualization: bool,146}147148#[recursive::recursive]149#[allow(clippy::too_many_arguments)]150pub fn lower_ir(151node: Node,152ir_arena: &mut Arena<IR>,153expr_arena: &mut Arena<AExpr>,154phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,155schema_cache: &mut PlHashMap<Node, Arc<Schema>>,156expr_cache: &mut ExprCache,157cache_nodes: &mut PlHashMap<UniqueId, PhysStream>,158ctx: StreamingLowerIRContext,159mut disable_morsel_split: Option<bool>,160) -> PolarsResult<PhysStream> {161// Helper macro to simplify recursive calls.162macro_rules! lower_ir {163($input:expr) => {{164// Disable for remaining execution graph if it wasn't explicitly set165// by the current IR.166disable_morsel_split.get_or_insert(false);167168lower_ir(169$input,170ir_arena,171expr_arena,172phys_sm,173schema_cache,174expr_cache,175cache_nodes,176ctx,177disable_morsel_split,178)179}};180}181182// Require the code below to explicitly set this to `true`183if disable_morsel_split == Some(true) {184disable_morsel_split.take();185}186187let ir_node = ir_arena.get(node);188let output_schema = IR::schema_with_cache(node, ir_arena, schema_cache);189let node_kind = match ir_node {190IR::SimpleProjection { input, columns } => {191disable_morsel_split.get_or_insert(true);192let columns = columns.iter_names_cloned().collect::<Vec<_>>();193let phys_input = lower_ir!(*input)?;194PhysNodeKind::SimpleProjection {195input: phys_input,196columns,197}198},199200IR::Select { input, expr, .. } => {201let selectors = expr.clone();202203if selectors204.iter()205.all(|e| matches!(expr_arena.get(e.node()), AExpr::Len | AExpr::Column(_)))206{207disable_morsel_split.get_or_insert(true);208}209210let phys_input = lower_ir!(*input)?;211return build_select_stream(212phys_input, &selectors, expr_arena, phys_sm, expr_cache, ctx,213);214},215216IR::HStack { input, exprs, .. } => {217let exprs = exprs.to_vec();218let phys_input = lower_ir!(*input)?;219return build_hstack_stream(phys_input, &exprs, expr_arena, phys_sm, expr_cache, ctx);220},221222IR::Slice { input, offset, len } => {223let offset = *offset;224let len = *len as usize;225let phys_input = lower_ir!(*input)?;226return Ok(build_slice_stream(phys_input, offset, len, phys_sm));227},228229IR::Filter { input, predicate } => {230let predicate = predicate.clone();231let phys_input = lower_ir!(*input)?;232return build_filter_stream(233phys_input, predicate, expr_arena, phys_sm, expr_cache, ctx,234);235},236237IR::DataFrameScan {238df,239output_schema: projection,240schema,241..242} => {243let schema = schema.clone(); // This is initially the schema of df, but can change with the projection.244let mut node_kind = PhysNodeKind::InMemorySource {245df: df.clone(),246disable_morsel_split: disable_morsel_split.unwrap_or(true),247};248249// Do we need to apply a projection?250if let Some(projection_schema) = projection {251if projection_schema.len() != schema.len()252|| projection_schema253.iter_names()254.zip(schema.iter_names())255.any(|(l, r)| l != r)256{257let phys_input = phys_sm.insert(PhysNode::new(schema, node_kind));258node_kind = PhysNodeKind::SimpleProjection {259input: PhysStream::first(phys_input),260columns: projection_schema.iter_names_cloned().collect::<Vec<_>>(),261};262}263}264265node_kind266},267268IR::Sink { input, payload } => match payload {269SinkTypeIR::Memory => {270disable_morsel_split.get_or_insert(true);271let phys_input = lower_ir!(*input)?;272PhysNodeKind::InMemorySink { input: phys_input }273},274SinkTypeIR::Callback(CallbackSinkType {275function,276maintain_order,277chunk_size,278}) => {279let function = function.clone();280let maintain_order = *maintain_order;281let chunk_size = *chunk_size;282let phys_input = lower_ir!(*input)?;283PhysNodeKind::CallbackSink {284input: phys_input,285function,286maintain_order,287chunk_size,288}289},290291SinkTypeIR::File(options) => {292let options = options.clone();293let input = lower_ir!(*input)?;294PhysNodeKind::FileSink { input, options }295},296297SinkTypeIR::Partitioned(options) => {298let options = options.clone();299let input = lower_ir!(*input)?;300PhysNodeKind::PartitionedSink { input, options }301},302},303304IR::SinkMultiple { inputs } => {305disable_morsel_split.get_or_insert(true);306let mut sinks = Vec::with_capacity(inputs.len());307for input in inputs.clone() {308let phys_node_stream = match ir_arena.get(input) {309IR::Sink { .. } => lower_ir!(input)?,310_ => lower_ir!(ir_arena.add(IR::Sink {311input,312payload: SinkTypeIR::Memory313}))?,314};315sinks.push(phys_node_stream.node);316}317PhysNodeKind::SinkMultiple { sinks }318},319320#[cfg(feature = "merge_sorted")]321IR::MergeSorted {322input_left,323input_right,324key,325} => {326let input_left = *input_left;327let input_right = *input_right;328let key = key.clone();329330let mut phys_left = lower_ir!(input_left)?;331let mut phys_right = lower_ir!(input_right)?;332333let left_schema = &phys_sm[phys_left.node].output_schema;334let right_schema = &phys_sm[phys_right.node].output_schema;335336left_schema.ensure_is_exact_match(right_schema).unwrap();337338let key_dtype = left_schema.try_get(key.as_str())?.clone();339340let key_name = unique_column_name();341use polars_plan::plans::{AExprBuilder, RowEncodingVariant};342343// Add the key column as the last column for both inputs.344for s in [&mut phys_left, &mut phys_right] {345let key_dtype = key_dtype.clone();346let mut expr = AExprBuilder::col(key.clone(), expr_arena);347if key_dtype.is_nested() {348expr = AExprBuilder::row_encode(349vec![expr.expr_ir(key_name.clone())],350vec![key_dtype],351RowEncodingVariant::Ordered {352descending: None,353nulls_last: None,354broadcast_nulls: None,355},356expr_arena,357);358}359360*s = build_hstack_stream(361*s,362&[expr.expr_ir(key_name.clone())],363expr_arena,364phys_sm,365expr_cache,366ctx,367)?;368}369370PhysNodeKind::MergeSorted {371input_left: phys_left,372input_right: phys_right,373}374},375376IR::MapFunction { input, function } => {377let function = function.clone();378let phys_input = lower_ir!(*input)?;379380match function {381FunctionIR::RowIndex {382name,383offset,384schema: _,385} => PhysNodeKind::WithRowIndex {386input: phys_input,387name,388offset,389},390391function if function.is_streamable() => {392let map = Arc::new(move |df| function.evaluate(df));393let format_str = ctx.prepare_visualization.then(|| {394let mut buffer = String::new();395write_ir_non_recursive(396&mut buffer,397ir_arena.get(node),398expr_arena,399phys_sm.get(phys_input.node).unwrap().output_schema.as_ref(),4000,401)402.unwrap();403buffer404});405PhysNodeKind::Map {406input: phys_input,407map,408format_str,409}410},411412function => {413let format_str = ctx.prepare_visualization.then(|| {414let mut buffer = String::new();415write_ir_non_recursive(416&mut buffer,417ir_arena.get(node),418expr_arena,419phys_sm.get(phys_input.node).unwrap().output_schema.as_ref(),4200,421)422.unwrap();423buffer424});425let map = Arc::new(move |df| function.evaluate(df));426PhysNodeKind::InMemoryMap {427input: phys_input,428map,429format_str,430}431},432}433},434435IR::Sort {436input,437by_column,438slice,439sort_options,440} => {441let slice = slice.clone();442let mut by_column = by_column.clone();443let mut sort_options = sort_options.clone();444let phys_input = lower_ir!(*input)?;445446// See if we can insert a top k.447let mut limit = u64::MAX;448if let Some((0, l, _)) = slice {449limit = limit.min(l as u64);450}451#[allow(clippy::unnecessary_cast)]452if let Some(l) = sort_options.limit {453limit = limit.min(l as u64);454};455456let mut stream = phys_input;457if limit < u64::MAX {458// If we need to maintain order augment with row index.459if sort_options.maintain_order {460let row_idx_name = unique_column_name();461stream = build_row_idx_stream(stream, row_idx_name.clone(), None, phys_sm);462463// Add row index to sort columns.464let row_idx_node = expr_arena.add(AExpr::Column(row_idx_name.clone()));465by_column.push(ExprIR::new(466row_idx_node,467OutputName::ColumnLhs(row_idx_name),468));469sort_options.descending.push(false);470sort_options.nulls_last.push(true);471472// No longer needed for the actual sort itself, handled by row index.473sort_options.maintain_order = false;474}475476let k_node =477expr_arena.add(AExpr::Literal(LiteralValue::Scalar(Scalar::from(limit))));478let k_selector = ExprIR::from_node(k_node, expr_arena);479let k_output_schema = Schema::from_iter([(get_literal_name(), DataType::UInt64)]);480let k_node = phys_sm.insert(PhysNode::new(481Arc::new(k_output_schema),482PhysNodeKind::InputIndependentSelect {483selectors: vec![k_selector],484},485));486487let mut trans_by_column;488(stream, trans_by_column) =489lower_exprs(stream, &by_column, expr_arena, phys_sm, expr_cache, ctx)?;490491trans_by_column = trans_by_column492.into_iter()493.enumerate()494.map(|(i, expr)| expr.with_alias(format_pl_smallstr!("__POLARS_KEYCOL_{}", i)))495.collect_vec();496497stream = PhysStream::first(phys_sm.insert(PhysNode {498output_schema: phys_sm[stream.node].output_schema.clone(),499kind: PhysNodeKind::TopK {500input: stream,501k: PhysStream::first(k_node),502by_column: trans_by_column,503reverse: sort_options.descending.iter().map(|x| !x).collect(),504nulls_last: sort_options.nulls_last.clone(),505dyn_pred: slice.as_ref().and_then(|t| t.2.clone()),506},507}));508}509510stream = PhysStream::first(phys_sm.insert(PhysNode {511output_schema: phys_sm[stream.node].output_schema.clone(),512kind: PhysNodeKind::Sort {513input: stream,514by_column,515slice: slice.as_ref().map(|t| (t.0, t.1)),516sort_options,517},518}));519520// Remove any temporary columns we may have added.521let exprs: Vec<_> = output_schema522.iter_names()523.map(|name| {524let node = expr_arena.add(AExpr::Column(name.clone()));525ExprIR::new(node, OutputName::ColumnLhs(name.clone()))526})527.collect();528stream = build_select_stream(stream, &exprs, expr_arena, phys_sm, expr_cache, ctx)?;529530return Ok(stream);531},532IR::Union { inputs, options } => {533let options = *options;534535let inputs = inputs536.clone() // Needed to borrow ir_arena mutably.537.into_iter()538.map(|input| lower_ir!(input))539.collect::<Result<_, _>>()?;540541let kind = if options.maintain_order {542PhysNodeKind::OrderedUnion { inputs }543} else {544PhysNodeKind::UnorderedUnion { inputs }545};546547let node = phys_sm.insert(PhysNode {548output_schema,549kind,550});551let mut stream = PhysStream::first(node);552553if let Some((offset, length)) = options.slice {554stream = build_slice_stream(stream, offset, length, phys_sm);555}556557return Ok(stream);558},559560IR::HConcat {561inputs,562schema: _,563options,564} => {565let zip_behavior = if options.strict {566ZipBehavior::Strict567} else if options.broadcast_unit_length {568ZipBehavior::Broadcast569} else {570ZipBehavior::NullExtend571};572let inputs = inputs573.clone() // Needed to borrow ir_arena mutably.574.into_iter()575.map(|input| lower_ir!(input))576.collect::<Result<_, _>>()?;577PhysNodeKind::Zip {578inputs,579zip_behavior,580}581},582583v @ IR::Scan { .. } => {584let IR::Scan {585sources: scan_sources,586file_info,587mut hive_parts,588output_schema: _,589scan_type,590predicate,591predicate_file_skip_applied,592unified_scan_args,593} = v.clone()594else {595unreachable!();596};597598if (scan_sources.is_empty()599&& !matches!(scan_type.as_ref(), FileScanIR::Anonymous { .. }))600|| unified_scan_args601.pre_slice602.as_ref()603.is_some_and(|slice| slice.len() == 0)604{605if config::verbose() {606eprintln!("lower_ir: scan IR lowered as empty InMemorySource")607}608609// If there are no sources, just provide an empty in-memory source with the right610// schema.611PhysNodeKind::InMemorySource {612df: Arc::new(DataFrame::empty_with_schema(output_schema.as_ref())),613disable_morsel_split: disable_morsel_split.unwrap_or(true),614}615} else if output_schema.is_empty()616&& let Some((physical_rows, deleted_rows)) = unified_scan_args.row_count617&& unified_scan_args.pre_slice.is_none()618&& predicate.is_none()619{620// Fast-count for scan_iceberg will hit here.621let row_counter = RowCounter::new(physical_rows, deleted_rows);622row_counter.num_rows_idxsize()?;623let num_rows = row_counter.num_rows()?;624625if config::verbose() {626eprintln!(627"lower_ir: scan IR lowered as 0-width InMemorySource with height {} ({:?})",628num_rows, &row_counter629)630}631632PhysNodeKind::InMemorySource {633df: Arc::new(DataFrame::empty_with_height(num_rows)),634disable_morsel_split: disable_morsel_split.unwrap_or(true),635}636} else {637let file_reader_builder: Arc<dyn FileReaderBuilder> = match &*scan_type {638#[cfg(feature = "parquet")]639FileScanIR::Parquet {640options,641metadata: first_metadata,642} => Arc::new(643crate::nodes::io_sources::parquet::builder::ParquetReaderBuilder {644options: Arc::new(options.clone()),645first_metadata: first_metadata.clone(),646prefetch_limit: RelaxedCell::new_usize(0),647prefetch_semaphore: std::sync::OnceLock::new(),648shared_prefetch_wait_group_slot: Default::default(),649io_metrics: std::sync::OnceLock::new(),650},651) as _,652653#[cfg(feature = "ipc")]654FileScanIR::Ipc {655options,656metadata: first_metadata,657} => Arc::new(crate::nodes::io_sources::ipc::builder::IpcReaderBuilder {658options: Arc::new(options.clone()),659first_metadata: first_metadata.clone(),660prefetch_limit: RelaxedCell::new_usize(0),661prefetch_semaphore: std::sync::OnceLock::new(),662shared_prefetch_wait_group_slot: Default::default(),663io_metrics: std::sync::OnceLock::new(),664}) as _,665666#[cfg(feature = "csv")]667FileScanIR::Csv { options } => Arc::new(Arc::clone(options)) as _,668669#[cfg(feature = "json")]670FileScanIR::NDJson { options } => Arc::new(671crate::nodes::io_sources::ndjson::builder::NDJsonReaderBuilder {672options: Arc::new(options.clone()),673prefetch_limit: RelaxedCell::new_usize(0),674prefetch_semaphore: std::sync::OnceLock::new(),675shared_prefetch_wait_group_slot: Default::default(),676io_metrics: std::sync::OnceLock::new(),677},678) as _,679// Arc::new(options.clone()) as _,680#[cfg(feature = "python")]681FileScanIR::PythonDataset {682dataset_object: _,683cached_ir,684} => {685use crate::physical_plan::io::python_dataset::python_dataset_scan_to_reader_builder;686let guard = cached_ir.lock().unwrap();687688let expanded_scan = guard689.as_ref()690.expect("python dataset should be resolved")691.python_scan()692.expect("should be python scan");693694python_dataset_scan_to_reader_builder(expanded_scan)695},696697#[cfg(feature = "scan_lines")]698FileScanIR::Lines { name: _ } => {699Arc::new(crate::nodes::io_sources::lines::LineReaderBuilder {700prefetch_limit: RelaxedCell::new_usize(0),701prefetch_semaphore: std::sync::OnceLock::new(),702shared_prefetch_wait_group_slot: Default::default(),703io_metrics: std::sync::OnceLock::new(),704}) as _705},706707FileScanIR::Anonymous { .. } => todo!("unimplemented: AnonymousScan"),708};709710{711let cloud_options = unified_scan_args.cloud_options.clone().map(Arc::new);712let file_schema = file_info.schema;713714let (projected_schema, file_schema) =715multi_scan::functions::resolve_projections::resolve_projections(716&output_schema,717&file_schema,718&mut hive_parts,719unified_scan_args720.row_index721.as_ref()722.map(|ri| ri.name.as_str()),723unified_scan_args724.include_file_paths725.as_ref()726.map(|x| x.as_str()),727);728729let file_projection_builder = ProjectionBuilder::new(730projected_schema,731unified_scan_args.column_mapping.as_ref(),732unified_scan_args733.default_values734.filter(|DefaultFieldValues::Iceberg(v)| !v.is_empty())735.map(|DefaultFieldValues::Iceberg(v)| v),736);737738// TODO: We ignore the parameter for some scan types to maintain old behavior,739// as they currently don't expose an API for it to be configured.740let extra_columns_policy = match &*scan_type {741#[cfg(feature = "parquet")]742FileScanIR::Parquet { .. } => unified_scan_args.extra_columns_policy,743744_ => {745if unified_scan_args.projection.is_some() {746ExtraColumnsPolicy::Ignore747} else {748ExtraColumnsPolicy::Raise749}750},751};752753let forbid_extra_columns = ForbidExtraColumns::opt_new(754&extra_columns_policy,755&file_schema,756unified_scan_args.column_mapping.as_ref(),757);758759let pre_slice = unified_scan_args.pre_slice.clone();760let disable_morsel_split = disable_morsel_split.unwrap_or(true);761762let mut multi_scan_node = PhysNodeKind::MultiScan {763scan_sources,764file_reader_builder,765cloud_options,766file_projection_builder,767output_schema: output_schema.clone(),768row_index: None,769pre_slice,770predicate,771predicate_file_skip_applied,772hive_parts,773cast_columns_policy: unified_scan_args.cast_columns_policy,774missing_columns_policy: unified_scan_args.missing_columns_policy,775forbid_extra_columns,776include_file_paths: unified_scan_args.include_file_paths,777// Set to None if empty for performance.778deletion_files: DeletionFilesList::filter_empty(779unified_scan_args.deletion_files,780),781table_statistics: unified_scan_args.table_statistics,782file_schema,783disable_morsel_split,784};785786let PhysNodeKind::MultiScan {787output_schema: multi_scan_output_schema,788row_index: row_index_to_multiscan,789pre_slice: pre_slice_to_multiscan,790predicate: predicate_to_multiscan,791..792} = &mut multi_scan_node793else {794unreachable!()795};796797let mut row_index_post = unified_scan_args.row_index;798799// * If a predicate was pushed then we always push row index800if predicate_to_multiscan.is_some()801|| matches!(pre_slice_to_multiscan, Some(Slice::Negative { .. }))802{803*row_index_to_multiscan = row_index_post.take();804}805806// TODO807// Projection pushdown could change the row index column position. Ideally it shouldn't,808// and instead just put a projection on top of the scan node in the IR. But for now809// we do that step here.810let mut schema_after_row_index_post = multi_scan_output_schema.clone();811let mut reorder_after_row_index_post = false;812813// Remove row index from multiscan schema if not pushed.814if let Some(ri) = row_index_post.as_ref() {815let row_index_post_position =816multi_scan_output_schema.index_of(&ri.name).unwrap();817let (_, dtype) = Arc::make_mut(multi_scan_output_schema)818.shift_remove_index(row_index_post_position)819.unwrap();820821if row_index_post_position != 0 {822reorder_after_row_index_post = true;823let mut schema =824Schema::with_capacity(multi_scan_output_schema.len() + 1);825schema.extend([(ri.name.clone(), dtype)]);826schema.extend(827multi_scan_output_schema828.iter()829.map(|(k, v)| (k.clone(), v.clone())),830);831schema_after_row_index_post = Arc::new(schema);832}833}834835// If we have no predicate and no slice or positive slice, we can reorder the row index to after836// the slice by adjusting the offset. This can remove a serial synchronization step in multiscan837// and allow the reader to still skip rows.838let row_index_post_after_slice = (|| {839let mut row_index = row_index_post.take()?;840841let positive_offset = match pre_slice_to_multiscan {842Some(Slice::Positive { offset, .. }) => Some(*offset),843None => Some(0),844Some(Slice::Negative { .. }) => unreachable!(),845}?;846847row_index.offset = row_index.offset.saturating_add(848IdxSize::try_from(positive_offset).unwrap_or(IdxSize::MAX),849);850851Some(row_index)852})();853854let mut stream = {855let node_key = phys_sm.insert(PhysNode::new(856multi_scan_output_schema.clone(),857multi_scan_node,858));859PhysStream::first(node_key)860};861862if let Some(ri) = row_index_post {863let node = PhysNodeKind::WithRowIndex {864input: stream,865name: ri.name,866offset: Some(ri.offset),867};868869let node_key = phys_sm.insert(PhysNode {870output_schema: schema_after_row_index_post.clone(),871kind: node,872});873874stream = PhysStream::first(node_key);875876if reorder_after_row_index_post {877let node = PhysNodeKind::SimpleProjection {878input: stream,879columns: output_schema.iter_names_cloned().collect(),880};881882let node_key = phys_sm.insert(PhysNode {883output_schema: output_schema.clone(),884kind: node,885});886887stream = PhysStream::first(node_key);888}889}890891if let Some(ri) = row_index_post_after_slice {892let node = PhysNodeKind::WithRowIndex {893input: stream,894name: ri.name,895offset: Some(ri.offset),896};897898let node_key = phys_sm.insert(PhysNode {899output_schema: schema_after_row_index_post,900kind: node,901});902903stream = PhysStream::first(node_key);904905if reorder_after_row_index_post {906let node = PhysNodeKind::SimpleProjection {907input: stream,908columns: output_schema.iter_names_cloned().collect(),909};910911let node_key = phys_sm.insert(PhysNode {912output_schema: output_schema.clone(),913kind: node,914});915916stream = PhysStream::first(node_key);917}918}919920return Ok(stream);921}922}923},924925#[cfg(feature = "python")]926v @ IR::PythonScan { options } => {927use polars_plan::dsl::python_dsl::PythonScanSource;928929match options.python_source {930PythonScanSource::Pyarrow => {931// Fallback to in-memory engine.932let input = PhysNodeKind::InMemorySource {933df: Arc::new(DataFrame::default()),934disable_morsel_split: disable_morsel_split.unwrap_or(true),935};936let input_key =937phys_sm.insert(PhysNode::new(Arc::new(Schema::default()), input));938let phys_input = PhysStream::first(input_key);939940let lmdf = Arc::new(LateMaterializedDataFrame::default());941let mut lp_arena = Arena::default();942let scan_lp_node = lp_arena.add(v.clone());943944let executor = Mutex::new(create_physical_plan(945scan_lp_node,946&mut lp_arena,947expr_arena,948None,949)?);950951let format_str = ctx.prepare_visualization.then(|| {952let mut buffer = String::new();953write_ir_non_recursive(954&mut buffer,955ir_arena.get(node),956expr_arena,957phys_sm.get(phys_input.node).unwrap().output_schema.as_ref(),9580,959)960.unwrap();961buffer962});963964PhysNodeKind::InMemoryMap {965input: phys_input,966map: Arc::new(move |df| {967lmdf.set_materialized_dataframe(df);968let mut state = ExecutionState::new();969executor.lock().execute(&mut state)970}),971format_str,972}973},974_ => PhysNodeKind::PythonScan {975options: options.clone(),976},977}978},979IR::Cache { input, id } => {980let id = *id;981if let Some(cached) = cache_nodes.get(&id) {982return Ok(*cached);983}984985let phys_input = lower_ir!(*input)?;986cache_nodes.insert(id, phys_input);987return Ok(phys_input);988},989990IR::GroupBy {991input,992keys,993aggs,994schema: output_schema,995apply,996maintain_order,997options,998} => {999let input = *input;1000let keys = keys.clone();1001let aggs = aggs.clone();1002let output_schema = output_schema.clone();1003let apply = apply.clone();1004let maintain_order = *maintain_order;1005let options = options.clone();10061007let phys_input = lower_ir!(input)?;10081009let input_schema = &phys_sm[phys_input.node].output_schema;1010let are_keys_sorted = are_keys_sorted_any(1011is_sorted(input, ir_arena, expr_arena).as_ref(),1012&keys,1013expr_arena,1014input_schema,1015)1016.is_some();10171018return build_group_by_stream(1019phys_input,1020&keys,1021&aggs,1022output_schema,1023maintain_order,1024options,1025apply,1026expr_arena,1027phys_sm,1028expr_cache,1029ctx,1030are_keys_sorted,1031);1032},1033IR::Join {1034input_left,1035input_right,1036schema: _,1037left_on,1038right_on,1039options,1040} => {1041let input_left = *input_left;1042let input_right = *input_right;1043let input_left_schema = IR::schema_with_cache(input_left, ir_arena, schema_cache);1044let input_right_schema = IR::schema_with_cache(input_right, ir_arena, schema_cache);1045let left_on = left_on.clone();1046let right_on = right_on.clone();1047let get_expr_name = |e: &ExprIR| e.output_name().clone();1048let left_on_names = left_on.iter().map(get_expr_name).collect_vec();1049let right_on_names = right_on.iter().map(get_expr_name).collect_vec();1050let args = options.args.clone();1051let options = options.options.clone();1052let left_df_sortedness = is_sorted(input_left, ir_arena, expr_arena);1053let left_on_sorted = are_keys_sorted_any(1054left_df_sortedness.as_ref(),1055&left_on,1056expr_arena,1057&input_left_schema,1058);1059let right_df_sortedness = is_sorted(input_right, ir_arena, expr_arena);1060let right_on_sorted = are_keys_sorted_any(1061right_df_sortedness.as_ref(),1062&right_on,1063expr_arena,1064&input_right_schema,1065);1066let join_keys_sorted_together =1067Option::zip(left_on_sorted.as_ref(), right_on_sorted.as_ref())1068.is_some_and(|(ls, rs)| ls == rs);1069let use_streaming_merge_join = args.how.is_equi() && join_keys_sorted_together;1070#[cfg(feature = "asof_join")]1071let use_streaming_asof_join = if let JoinType::AsOf(ref asof_options) = args.how {1072// Grouped asof-join is not yet supported in the streaming engine.1073asof_options.left_by.is_none() && asof_options.right_by.is_none()1074} else {1075false1076};1077#[cfg(not(feature = "asof_join"))]1078let use_streaming_asof_join = false;10791080let phys_left = lower_ir!(input_left)?;1081let phys_right = lower_ir!(input_right)?;10821083if (args.how.is_equi() || args.how.is_semi_anti() || use_streaming_asof_join)1084&& !args.validation.needs_checks()1085{1086// When lowering the expressions for the keys we need to ensure we keep around the1087// payload columns, otherwise the input nodes can get replaced by input-independent1088// nodes since the lowering code does not see we access any non-literal expressions.1089// So we add dummy expressions before lowering and remove them afterwards.10901091let mut aug_left_on = left_on.clone();1092for name in phys_sm[phys_left.node].output_schema.iter_names() {1093let col_expr = expr_arena.add(AExpr::Column(name.clone()));1094aug_left_on.push(ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone())));1095}1096let mut aug_right_on = right_on.clone();1097for name in phys_sm[phys_right.node].output_schema.iter_names() {1098let col_expr = expr_arena.add(AExpr::Column(name.clone()));1099aug_right_on.push(ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone())));1100}11011102let (mut trans_input_left, mut trans_left_on) = lower_exprs(1103phys_left,1104&aug_left_on,1105expr_arena,1106phys_sm,1107expr_cache,1108ctx,1109)?;1110let (mut trans_input_right, mut trans_right_on) = lower_exprs(1111phys_right,1112&aug_right_on,1113expr_arena,1114phys_sm,1115expr_cache,1116ctx,1117)?;11181119trans_left_on.drain(left_on.len()..);1120trans_right_on.drain(right_on.len()..);11211122let mut key_descending = left_on_sorted.as_ref().and_then(|v| v[0].descending);1123let key_nulls_last = left_on_sorted.as_ref().and_then(|v| v[0].nulls_last);1124let mut tmp_left_key_col = None;1125let mut tmp_right_key_col = None;1126if use_streaming_merge_join || use_streaming_asof_join {1127(trans_input_left, trans_left_on, tmp_left_key_col) = append_sorted_key_column(1128trans_input_left,1129trans_left_on,1130left_on_sorted.as_ref(),1131Some(!args.nulls_equal),1132expr_arena,1133phys_sm,1134expr_cache,1135ctx,1136)?;1137(trans_input_right, trans_right_on, tmp_right_key_col) =1138append_sorted_key_column(1139trans_input_right,1140trans_right_on,1141right_on_sorted.as_ref(),1142Some(!args.nulls_equal),1143expr_arena,1144phys_sm,1145expr_cache,1146ctx,1147)?;1148}11491150let node = if use_streaming_merge_join {1151let keys_are_row_encoded = left_on_names.len() > 1;1152if keys_are_row_encoded {1153key_descending = Some(false);1154}1155phys_sm.insert(PhysNode::new(1156output_schema,1157PhysNodeKind::MergeJoin {1158input_left: trans_input_left,1159input_right: trans_input_right,1160left_on: left_on_names,1161right_on: right_on_names,1162tmp_left_key_col,1163tmp_right_key_col,1164keys_row_encoded: keys_are_row_encoded,1165descending: key_descending.unwrap(),1166nulls_last: key_nulls_last.unwrap(),1167args: args.clone(),1168},1169))1170} else if args.how.is_equi() {1171phys_sm.insert(PhysNode::new(1172output_schema,1173PhysNodeKind::EquiJoin {1174input_left: trans_input_left,1175input_right: trans_input_right,1176left_on: trans_left_on,1177right_on: trans_right_on,1178args: args.clone(),1179},1180))1181} else if use_streaming_asof_join {1182assert!(left_on_names.len() == 1 && right_on_names.len() == 1);1183phys_sm.insert(PhysNode::new(1184output_schema,1185PhysNodeKind::AsOfJoin {1186input_left: trans_input_left,1187input_right: trans_input_right,1188left_on: left_on_names[0].clone(),1189right_on: right_on_names[0].clone(),1190tmp_left_key_col,1191tmp_right_key_col,1192args: args.clone(),1193},1194))1195} else {1196phys_sm.insert(PhysNode::new(1197output_schema,1198PhysNodeKind::SemiAntiJoin {1199input_left: trans_input_left,1200input_right: trans_input_right,1201left_on: trans_left_on,1202right_on: trans_right_on,1203args: args.clone(),1204output_bool: false,1205},1206))1207};1208let mut stream = PhysStream::first(node);1209if let Some((offset, len)) = args.slice {1210stream = build_slice_stream(stream, offset, len, phys_sm);1211}1212return Ok(stream);1213} else if args.how.is_cross() {1214let node = phys_sm.insert(PhysNode::new(1215output_schema,1216PhysNodeKind::CrossJoin {1217input_left: phys_left,1218input_right: phys_right,1219args: args.clone(),1220},1221));1222let mut stream = PhysStream::first(node);1223if let Some((offset, len)) = args.slice {1224stream = build_slice_stream(stream, offset, len, phys_sm);1225}1226return Ok(stream);1227} else {1228PhysNodeKind::InMemoryJoin {1229input_left: phys_left,1230input_right: phys_right,1231left_on,1232right_on,1233args,1234options,1235}1236}1237},12381239IR::Distinct { input, options } => {1240let options = options.clone();1241let input = *input;1242let phys_input = lower_ir!(input)?;12431244// We don't have a dedicated distinct operator (yet), lower to group1245// by with an aggregate for each column.1246let input_schema = &phys_sm[phys_input.node].output_schema;1247if input_schema.is_empty() {1248// Can't group (or have duplicates) if dataframe has zero-width.1249return Ok(phys_input);1250}12511252if options.maintain_order && options.keep_strategy == UniqueKeepStrategy::Last {1253// Unfortunately the order-preserving groupby always orders by the first occurrence1254// of the group so we can't lower this and have to fallback.1255let input_schema = phys_sm[phys_input.node].output_schema.clone();1256let lmdf = Arc::new(LateMaterializedDataFrame::default());1257let mut lp_arena = Arena::default();1258let input_lp_node = lp_arena.add(lmdf.clone().as_ir_node(input_schema));1259let distinct_lp_node = lp_arena.add(IR::Distinct {1260input: input_lp_node,1261options,1262});1263let executor = Mutex::new(create_physical_plan(1264distinct_lp_node,1265&mut lp_arena,1266expr_arena,1267Some(crate::dispatch::build_streaming_query_executor),1268)?);12691270let format_str = ctx.prepare_visualization.then(|| {1271let mut buffer = String::new();1272write_ir_non_recursive(1273&mut buffer,1274ir_arena.get(node),1275expr_arena,1276phys_sm.get(phys_input.node).unwrap().output_schema.as_ref(),12770,1278)1279.unwrap();1280buffer1281});1282let distinct_node = PhysNode {1283output_schema,1284kind: PhysNodeKind::InMemoryMap {1285input: phys_input,1286map: Arc::new(move |df| {1287lmdf.set_materialized_dataframe(df);1288let mut state = ExecutionState::new();1289executor.lock().execute(&mut state)1290}),1291format_str,1292},1293};12941295return Ok(PhysStream::first(phys_sm.insert(distinct_node)));1296}12971298// Create the key and aggregate expressions.1299let all_col_names = input_schema.iter_names().cloned().collect_vec();1300let key_names = if let Some(subset) = options.subset {1301subset.to_vec()1302} else {1303all_col_names.clone()1304};1305let key_name_set: PlHashSet<_> = key_names.iter().cloned().collect();13061307let mut group_by_output_schema = Schema::with_capacity(all_col_names.len() + 1);1308let keys = key_names1309.iter()1310.map(|name| {1311group_by_output_schema1312.insert(name.clone(), input_schema.get(name).unwrap().clone());1313let col_expr = expr_arena.add(AExpr::Column(name.clone()));1314ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone()))1315})1316.collect_vec();13171318let mut aggs = all_col_names1319.iter()1320.filter(|name| !key_name_set.contains(*name))1321.map(|name| {1322group_by_output_schema1323.insert(name.clone(), input_schema.get(name).unwrap().clone());1324let col_expr = expr_arena.add(AExpr::Column(name.clone()));1325use UniqueKeepStrategy::*;1326let agg_expr = match options.keep_strategy {1327First | None | Any => {1328expr_arena.add(AExpr::Agg(IRAggExpr::First(col_expr)))1329},1330Last => expr_arena.add(AExpr::Agg(IRAggExpr::Last(col_expr))),1331};1332ExprIR::new(agg_expr, OutputName::ColumnLhs(name.clone()))1333})1334.collect_vec();13351336if options.keep_strategy == UniqueKeepStrategy::None {1337// Track the length so we can filter out non-unique keys later.1338let name = unique_column_name();1339group_by_output_schema.insert(name.clone(), DataType::IDX_DTYPE);1340aggs.push(ExprIR::new(1341expr_arena.add(AExpr::Len),1342OutputName::Alias(name),1343));1344}13451346let are_keys_sorted = are_keys_sorted_any(1347is_sorted(input, ir_arena, expr_arena).as_ref(),1348&keys,1349expr_arena,1350input_schema,1351)1352.is_some();13531354let mut stream = build_group_by_stream(1355phys_input,1356&keys,1357&aggs,1358Arc::new(group_by_output_schema),1359options.maintain_order,1360Arc::new(GroupbyOptions::default()),1361None,1362expr_arena,1363phys_sm,1364expr_cache,1365ctx,1366are_keys_sorted,1367)?;13681369if options.keep_strategy == UniqueKeepStrategy::None {1370// Filter to keep only those groups with length 1.1371let unique_name = aggs.last().unwrap().output_name();1372let left = expr_arena.add(AExpr::Column(unique_name.clone()));1373let right = expr_arena.add(AExpr::Literal(LiteralValue::new_idxsize(1)));1374let predicate_aexpr = expr_arena.add(AExpr::BinaryExpr {1375left,1376op: polars_plan::dsl::Operator::Eq,1377right,1378});1379let predicate =1380ExprIR::new(predicate_aexpr, OutputName::ColumnLhs(unique_name.clone()));1381stream =1382build_filter_stream(stream, predicate, expr_arena, phys_sm, expr_cache, ctx)?;1383}13841385// Restore column order and drop the temporary length column if any.1386let exprs = all_col_names1387.iter()1388.map(|name| {1389let col_expr = expr_arena.add(AExpr::Column(name.clone()));1390ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone()))1391})1392.collect_vec();1393stream = build_select_stream(stream, &exprs, expr_arena, phys_sm, expr_cache, ctx)?;13941395// We didn't pass the slice earlier to build_group_by_stream because1396// we might have the intermediate keep = "none" filter.1397if let Some((offset, length)) = options.slice {1398stream = build_slice_stream(stream, offset, length, phys_sm);1399}14001401return Ok(stream);1402},1403IR::ExtContext { .. } => todo!(),1404IR::Invalid => unreachable!(),1405};14061407let node_key = phys_sm.insert(PhysNode::new(output_schema, node_kind));1408Ok(PhysStream::first(node_key))1409}14101411/// Append a sorted key column to the DataFrame.1412///1413/// If keys_sorted is None, the sortedness of the key will be decided by the1414/// default sortedness behavior of RowEncodingVariant::Ordered.1415#[allow(clippy::too_many_arguments)]1416fn append_sorted_key_column(1417phys_input: PhysStream,1418mut key_exprs: Vec<ExprIR>,1419keys_sorted: Option<&Vec<AExprSorted>>,1420broadcast_nulls: Option<bool>,1421expr_arena: &mut Arena<AExpr>,1422phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,1423expr_cache: &mut ExprCache,1424ctx: StreamingLowerIRContext,1425) -> PolarsResult<(PhysStream, Vec<ExprIR>, Option<PlSmallStr>)> {1426let input_schema = &phys_sm[phys_input.node].output_schema.clone();1427let use_row_encoding =1428key_exprs.len() > 1 || key_exprs[0].dtype(input_schema, expr_arena)?.is_nested();1429let key_expr_is_trivial =1430|c: &ExprIR, ea: &mut Arena<AExpr>| matches!(ea.get(c.node()), AExpr::Column(_));1431let (phys_output, key_col_name) = if use_row_encoding {1432let key_col_name = unique_column_name();1433let tfc = ToFieldContext::new(expr_arena, input_schema);1434let sorted_descending =1435keys_sorted.and_then(|v| v.iter().map(|s| s.descending).collect::<Option<Vec<_>>>());1436let sorted_nulls_last =1437keys_sorted.and_then(|v| v.iter().map(|s| s.nulls_last).collect::<Option<Vec<_>>>());1438let expr_dtype = |e: &ExprIR| expr_arena.get(e.node()).to_dtype(&tfc);1439let row_encode_col_expr = AExprBuilder::row_encode(1440key_exprs.clone(),1441key_exprs.iter().map(expr_dtype).try_collect_vec()?,1442RowEncodingVariant::Ordered {1443descending: sorted_descending,1444nulls_last: sorted_nulls_last,1445broadcast_nulls,1446},1447expr_arena,1448)1449.expr_ir(key_col_name.clone());1450key_exprs.clear();1451key_exprs.push(row_encode_col_expr);1452let output =1453build_hstack_stream(phys_input, &key_exprs, expr_arena, phys_sm, expr_cache, ctx)?;1454(output, Some(key_col_name))1455} else if !key_expr_is_trivial(&key_exprs[0], expr_arena) {1456let key_col_name = unique_column_name();1457key_exprs[0] = key_exprs[0].with_alias(key_col_name.clone());1458let output =1459build_hstack_stream(phys_input, &key_exprs, expr_arena, phys_sm, expr_cache, ctx)?;1460(output, Some(key_col_name))1461} else {1462(phys_input, None)1463};1464Ok((phys_output, key_exprs, key_col_name))1465}146614671468