Path: blob/main/crates/polars-stream/src/physical_plan/lower_ir.rs
6939 views
use std::sync::Arc;12use parking_lot::Mutex;3use polars_core::frame::{DataFrame, UniqueKeepStrategy};4use polars_core::prelude::{DataType, PlHashMap, PlHashSet};5use polars_core::scalar::Scalar;6use polars_core::schema::Schema;7use polars_core::{SchemaExtPl, config};8use polars_error::{PolarsResult, polars_bail};9use polars_expr::state::ExecutionState;10use polars_mem_engine::create_physical_plan;11use polars_plan::constants::get_literal_name;12use polars_plan::dsl::default_values::DefaultFieldValues;13use polars_plan::dsl::deletion::DeletionFilesList;14use polars_plan::dsl::{15ExtraColumnsPolicy, FileScanIR, FileSinkType, PartitionSinkTypeIR, PartitionVariantIR,16SinkTypeIR,17};18use polars_plan::plans::expr_ir::{ExprIR, OutputName};19use polars_plan::plans::{AExpr, FunctionIR, IR, IRAggExpr, LiteralValue, write_ir_non_recursive};20use polars_plan::prelude::GroupbyOptions;21use polars_utils::arena::{Arena, Node};22use polars_utils::itertools::Itertools;23use polars_utils::pl_str::PlSmallStr;24use polars_utils::slice_enum::Slice;25use polars_utils::unique_id::UniqueId;26use polars_utils::{IdxSize, unique_column_name};27use slotmap::SlotMap;2829use super::lower_expr::build_hstack_stream;30use super::{PhysNode, PhysNodeKey, PhysNodeKind, PhysStream};31use crate::nodes::io_sources::multi_scan;32use crate::nodes::io_sources::multi_scan::components::forbid_extra_columns::ForbidExtraColumns;33use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;34use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;35use crate::physical_plan::lower_expr::{ExprCache, build_select_stream, lower_exprs};36use crate::physical_plan::lower_group_by::build_group_by_stream;37use crate::utils::late_materialized_df::LateMaterializedDataFrame;3839/// Creates a new PhysStream which outputs a slice of the input stream.40pub fn build_slice_stream(41input: PhysStream,42offset: i64,43length: usize,44phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,45) -> PhysStream {46if offset >= 0 {47let offset = offset as usize;48PhysStream::first(phys_sm.insert(PhysNode::new(49phys_sm[input.node].output_schema.clone(),50PhysNodeKind::StreamingSlice {51input,52offset,53length,54},55)))56} else {57PhysStream::first(phys_sm.insert(PhysNode::new(58phys_sm[input.node].output_schema.clone(),59PhysNodeKind::NegativeSlice {60input,61offset,62length,63},64)))65}66}6768/// Creates a new PhysStream which is filters the input stream.69pub(super) fn build_filter_stream(70input: PhysStream,71predicate: ExprIR,72expr_arena: &mut Arena<AExpr>,73phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,74expr_cache: &mut ExprCache,75ctx: StreamingLowerIRContext,76) -> PolarsResult<PhysStream> {77let predicate = predicate;78let cols_and_predicate = phys_sm[input.node]79.output_schema80.iter_names()81.cloned()82.map(|name| {83ExprIR::new(84expr_arena.add(AExpr::Column(name.clone())),85OutputName::ColumnLhs(name),86)87})88.chain([predicate])89.collect_vec();90let (trans_input, mut trans_cols_and_predicate) = lower_exprs(91input,92&cols_and_predicate,93expr_arena,94phys_sm,95expr_cache,96ctx,97)?;9899let filter_schema = phys_sm[trans_input.node].output_schema.clone();100let filter = PhysNodeKind::Filter {101input: trans_input,102predicate: trans_cols_and_predicate.last().unwrap().clone(),103};104105let post_filter = phys_sm.insert(PhysNode::new(filter_schema, filter));106trans_cols_and_predicate.pop(); // Remove predicate.107build_select_stream(108PhysStream::first(post_filter),109&trans_cols_and_predicate,110expr_arena,111phys_sm,112expr_cache,113ctx,114)115}116117/// Creates a new PhysStream with row index attached with the given name.118pub fn build_row_idx_stream(119input: PhysStream,120name: PlSmallStr,121offset: Option<IdxSize>,122phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,123) -> PhysStream {124let input_schema = &phys_sm[input.node].output_schema;125let mut output_schema = (**input_schema).clone();126output_schema127.insert_at_index(0, name.clone(), DataType::IDX_DTYPE)128.unwrap();129let kind = PhysNodeKind::WithRowIndex {130input,131name,132offset,133};134let with_row_idx_node_key = phys_sm.insert(PhysNode::new(Arc::new(output_schema), kind));135PhysStream::first(with_row_idx_node_key)136}137138#[derive(Debug, Clone, Copy)]139pub struct StreamingLowerIRContext {140pub prepare_visualization: bool,141}142143#[recursive::recursive]144#[allow(clippy::too_many_arguments)]145pub fn lower_ir(146node: Node,147ir_arena: &mut Arena<IR>,148expr_arena: &mut Arena<AExpr>,149phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,150schema_cache: &mut PlHashMap<Node, Arc<Schema>>,151expr_cache: &mut ExprCache,152cache_nodes: &mut PlHashMap<UniqueId, PhysStream>,153ctx: StreamingLowerIRContext,154) -> PolarsResult<PhysStream> {155// Helper macro to simplify recursive calls.156macro_rules! lower_ir {157($input:expr) => {158lower_ir(159$input,160ir_arena,161expr_arena,162phys_sm,163schema_cache,164expr_cache,165cache_nodes,166ctx,167)168};169}170171let ir_node = ir_arena.get(node);172let output_schema = IR::schema_with_cache(node, ir_arena, schema_cache);173let node_kind = match ir_node {174IR::SimpleProjection { input, columns } => {175let columns = columns.iter_names_cloned().collect::<Vec<_>>();176let phys_input = lower_ir!(*input)?;177PhysNodeKind::SimpleProjection {178input: phys_input,179columns,180}181},182183IR::Select { input, expr, .. } => {184let selectors = expr.clone();185let phys_input = lower_ir!(*input)?;186return build_select_stream(187phys_input, &selectors, expr_arena, phys_sm, expr_cache, ctx,188);189},190191IR::HStack { input, exprs, .. } => {192let exprs = exprs.to_vec();193let phys_input = lower_ir!(*input)?;194return build_hstack_stream(phys_input, &exprs, expr_arena, phys_sm, expr_cache, ctx);195},196197IR::Slice { input, offset, len } => {198let offset = *offset;199let len = *len as usize;200let phys_input = lower_ir!(*input)?;201return Ok(build_slice_stream(phys_input, offset, len, phys_sm));202},203204IR::Filter { input, predicate } => {205let predicate = predicate.clone();206let phys_input = lower_ir!(*input)?;207return build_filter_stream(208phys_input, predicate, expr_arena, phys_sm, expr_cache, ctx,209);210},211212IR::DataFrameScan {213df,214output_schema: projection,215schema,216..217} => {218let schema = schema.clone(); // This is initially the schema of df, but can change with the projection.219let mut node_kind = PhysNodeKind::InMemorySource { df: df.clone() };220221// Do we need to apply a projection?222if let Some(projection_schema) = projection {223if projection_schema.len() != schema.len()224|| projection_schema225.iter_names()226.zip(schema.iter_names())227.any(|(l, r)| l != r)228{229let phys_input = phys_sm.insert(PhysNode::new(schema, node_kind));230node_kind = PhysNodeKind::SimpleProjection {231input: PhysStream::first(phys_input),232columns: projection_schema.iter_names_cloned().collect::<Vec<_>>(),233};234}235}236237node_kind238},239240IR::Sink { input, payload } => match payload {241SinkTypeIR::Memory => {242let phys_input = lower_ir!(*input)?;243PhysNodeKind::InMemorySink { input: phys_input }244},245SinkTypeIR::File(FileSinkType {246target,247sink_options,248file_type,249cloud_options,250}) => {251let target = target.clone();252let sink_options = sink_options.clone();253let file_type = file_type.clone();254let cloud_options = cloud_options.clone();255256let phys_input = lower_ir!(*input)?;257PhysNodeKind::FileSink {258target,259sink_options,260file_type,261input: phys_input,262cloud_options,263}264},265SinkTypeIR::Partition(PartitionSinkTypeIR {266base_path,267file_path_cb,268sink_options,269variant,270file_type,271cloud_options,272per_partition_sort_by,273finish_callback,274}) => {275let base_path = base_path.clone();276let file_path_cb = file_path_cb.clone();277let sink_options = sink_options.clone();278let variant = variant.clone();279let file_type = file_type.clone();280let cloud_options = cloud_options.clone();281let per_partition_sort_by = per_partition_sort_by.clone();282let finish_callback = finish_callback.clone();283284let mut input = lower_ir!(*input)?;285match &variant {286PartitionVariantIR::MaxSize(_) => {},287PartitionVariantIR::Parted {288key_exprs,289include_key: _,290}291| PartitionVariantIR::ByKey {292key_exprs,293include_key: _,294} => {295if key_exprs.is_empty() {296polars_bail!(InvalidOperation: "cannot partition by-key without key expressions");297}298299let input_schema = &phys_sm[input.node].output_schema;300let mut select_output_schema = input_schema.as_ref().clone();301for key_expr in key_exprs.iter() {302select_output_schema.insert(303key_expr.output_name().clone(),304key_expr.dtype(input_schema.as_ref(), expr_arena)?.clone(),305);306}307308let select_output_schema = Arc::new(select_output_schema);309let node = phys_sm.insert(PhysNode {310output_schema: select_output_schema,311kind: PhysNodeKind::Select {312input,313selectors: key_exprs.clone(),314extend_original: true,315},316});317input = PhysStream::first(node);318},319};320321PhysNodeKind::PartitionSink {322input,323base_path,324file_path_cb,325sink_options,326variant,327file_type,328cloud_options,329per_partition_sort_by,330finish_callback,331}332},333},334335IR::SinkMultiple { inputs } => {336let mut sinks = Vec::with_capacity(inputs.len());337for input in inputs.clone() {338let phys_node_stream = match ir_arena.get(input) {339IR::Sink { .. } => lower_ir!(input)?,340_ => lower_ir!(ir_arena.add(IR::Sink {341input,342payload: SinkTypeIR::Memory343}))?,344};345sinks.push(phys_node_stream.node);346}347PhysNodeKind::SinkMultiple { sinks }348},349350#[cfg(feature = "merge_sorted")]351IR::MergeSorted {352input_left,353input_right,354key,355} => {356let input_left = *input_left;357let input_right = *input_right;358let key = key.clone();359360let mut phys_left = lower_ir!(input_left)?;361let mut phys_right = lower_ir!(input_right)?;362363let left_schema = &phys_sm[phys_left.node].output_schema;364let right_schema = &phys_sm[phys_right.node].output_schema;365366left_schema.ensure_is_exact_match(right_schema).unwrap();367368let key_dtype = left_schema.try_get(key.as_str())?.clone();369370let key_name = unique_column_name();371use polars_plan::plans::{AExprBuilder, RowEncodingVariant};372373// Add the key column as the last column for both inputs.374for s in [&mut phys_left, &mut phys_right] {375let key_dtype = key_dtype.clone();376let mut expr = AExprBuilder::col(key.clone(), expr_arena);377if key_dtype.is_nested() {378expr = expr.row_encode_unary(379RowEncodingVariant::Ordered {380descending: None,381nulls_last: None,382},383key_dtype,384expr_arena,385);386}387388*s = build_hstack_stream(389*s,390&[expr.expr_ir(key_name.clone())],391expr_arena,392phys_sm,393expr_cache,394ctx,395)?;396}397398PhysNodeKind::MergeSorted {399input_left: phys_left,400input_right: phys_right,401}402},403404IR::MapFunction { input, function } => {405let function = function.clone();406let phys_input = lower_ir!(*input)?;407408match function {409FunctionIR::RowIndex {410name,411offset,412schema: _,413} => PhysNodeKind::WithRowIndex {414input: phys_input,415name,416offset,417},418419function if function.is_streamable() => {420let map = Arc::new(move |df| function.evaluate(df));421PhysNodeKind::Map {422input: phys_input,423map,424}425},426427function => {428let format_str = ctx.prepare_visualization.then(|| {429let mut buffer = String::new();430write_ir_non_recursive(431&mut buffer,432ir_arena.get(node),433expr_arena,434phys_sm.get(phys_input.node).unwrap().output_schema.as_ref(),4350,436)437.unwrap();438buffer439});440let map = Arc::new(move |df| function.evaluate(df));441PhysNodeKind::InMemoryMap {442input: phys_input,443map,444format_str,445}446},447}448},449450IR::Sort {451input,452by_column,453slice,454sort_options,455} => {456let slice = *slice;457let mut by_column = by_column.clone();458let mut sort_options = sort_options.clone();459let phys_input = lower_ir!(*input)?;460461// See if we can insert a top k.462let mut limit = u64::MAX;463if let Some((0, l)) = slice {464limit = limit.min(l as u64);465}466#[allow(clippy::unnecessary_cast)]467if let Some(l) = sort_options.limit {468limit = limit.min(l as u64);469};470471let mut stream = phys_input;472if limit < u64::MAX {473// If we need to maintain order augment with row index.474if sort_options.maintain_order {475let row_idx_name = unique_column_name();476stream = build_row_idx_stream(stream, row_idx_name.clone(), None, phys_sm);477478// Add row index to sort columns.479let row_idx_node = expr_arena.add(AExpr::Column(row_idx_name.clone()));480by_column.push(ExprIR::new(481row_idx_node,482OutputName::ColumnLhs(row_idx_name),483));484sort_options.descending.push(false);485sort_options.nulls_last.push(true);486487// No longer needed for the actual sort itself, handled by row index.488sort_options.maintain_order = false;489}490491let k_node =492expr_arena.add(AExpr::Literal(LiteralValue::Scalar(Scalar::from(limit))));493let k_selector = ExprIR::from_node(k_node, expr_arena);494let k_output_schema =495Schema::from_iter([(get_literal_name().clone(), DataType::UInt64)]);496let k_node = phys_sm.insert(PhysNode::new(497Arc::new(k_output_schema),498PhysNodeKind::InputIndependentSelect {499selectors: vec![k_selector],500},501));502503let trans_by_column;504(stream, trans_by_column) =505lower_exprs(stream, &by_column, expr_arena, phys_sm, expr_cache, ctx)?;506507stream = PhysStream::first(phys_sm.insert(PhysNode {508output_schema: phys_sm[stream.node].output_schema.clone(),509kind: PhysNodeKind::TopK {510input: stream,511k: PhysStream::first(k_node),512by_column: trans_by_column,513reverse: sort_options.descending.iter().map(|x| !x).collect(),514nulls_last: sort_options.nulls_last.clone(),515},516}));517}518519stream = PhysStream::first(phys_sm.insert(PhysNode {520output_schema: phys_sm[stream.node].output_schema.clone(),521kind: PhysNodeKind::Sort {522input: stream,523by_column,524slice,525sort_options,526},527}));528529// Remove any temporary columns we may have added.530let exprs: Vec<_> = output_schema531.iter_names()532.map(|name| {533let node = expr_arena.add(AExpr::Column(name.clone()));534ExprIR::new(node, OutputName::ColumnLhs(name.clone()))535})536.collect();537stream = build_select_stream(stream, &exprs, expr_arena, phys_sm, expr_cache, ctx)?;538539return Ok(stream);540},541542IR::Union { inputs, options } => {543let options = *options;544let inputs = inputs545.clone() // Needed to borrow ir_arena mutably.546.into_iter()547.map(|input| lower_ir!(input))548.collect::<Result<_, _>>()?;549550let node = phys_sm.insert(PhysNode {551output_schema,552kind: PhysNodeKind::OrderedUnion { inputs },553});554let mut stream = PhysStream::first(node);555if let Some((offset, length)) = options.slice {556stream = build_slice_stream(stream, offset, length, phys_sm);557}558return Ok(stream);559},560561IR::HConcat {562inputs,563schema: _,564options: _,565} => {566let inputs = inputs567.clone() // Needed to borrow ir_arena mutably.568.into_iter()569.map(|input| lower_ir!(input))570.collect::<Result<_, _>>()?;571PhysNodeKind::Zip {572inputs,573null_extend: true,574}575},576577v @ IR::Scan { .. } => {578let IR::Scan {579sources: scan_sources,580file_info,581mut hive_parts,582output_schema: _,583scan_type,584predicate,585unified_scan_args,586} = v.clone()587else {588unreachable!();589};590591if scan_sources.is_empty()592|| unified_scan_args593.pre_slice594.as_ref()595.is_some_and(|slice| slice.len() == 0)596{597if config::verbose() {598eprintln!("lower_ir: scan IR had empty sources")599}600601// If there are no sources, just provide an empty in-memory source with the right602// schema.603PhysNodeKind::InMemorySource {604df: Arc::new(DataFrame::empty_with_schema(output_schema.as_ref())),605}606} else {607let file_reader_builder = match &*scan_type {608#[cfg(feature = "parquet")]609FileScanIR::Parquet {610options,611metadata: first_metadata,612} => Arc::new(613crate::nodes::io_sources::parquet::builder::ParquetReaderBuilder {614options: Arc::new(options.clone()),615first_metadata: first_metadata.clone(),616},617) as Arc<dyn FileReaderBuilder>,618619#[cfg(feature = "ipc")]620FileScanIR::Ipc {621options: polars_io::ipc::IpcScanOptions {},622metadata: first_metadata,623} => Arc::new(crate::nodes::io_sources::ipc::builder::IpcReaderBuilder {624first_metadata: first_metadata.clone(),625}) as Arc<dyn FileReaderBuilder>,626627#[cfg(feature = "csv")]628FileScanIR::Csv { options } => {629Arc::new(Arc::new(options.clone())) as Arc<dyn FileReaderBuilder>630},631632#[cfg(feature = "json")]633FileScanIR::NDJson { options } => {634Arc::new(Arc::new(options.clone())) as Arc<dyn FileReaderBuilder>635},636637#[cfg(feature = "python")]638FileScanIR::PythonDataset {639dataset_object: _,640cached_ir,641} => {642use crate::physical_plan::io::python_dataset::python_dataset_scan_to_reader_builder;643let guard = cached_ir.lock().unwrap();644645let expanded_scan = guard646.as_ref()647.expect("python dataset should be resolved")648.python_scan()649.expect("should be python scan");650651python_dataset_scan_to_reader_builder(expanded_scan)652},653654FileScanIR::Anonymous { .. } => todo!("unimplemented: AnonymousScan"),655};656657{658let cloud_options = &unified_scan_args.cloud_options;659let output_schema =660if std::env::var("POLARS_FORCE_EMPTY_PROJECT").as_deref() == Ok("1") {661Default::default()662} else {663output_schema664};665666let cloud_options = cloud_options.clone().map(Arc::new);667let file_schema = file_info.schema;668669let (projected_schema, file_schema) =670multi_scan::functions::resolve_projections::resolve_projections(671&output_schema,672&file_schema,673&mut hive_parts,674unified_scan_args675.row_index676.as_ref()677.map(|ri| ri.name.as_str()),678unified_scan_args679.include_file_paths680.as_ref()681.map(|x| x.as_str()),682);683684let file_projection_builder = ProjectionBuilder::new(685projected_schema,686unified_scan_args.column_mapping.as_ref(),687unified_scan_args688.default_values689.filter(|DefaultFieldValues::Iceberg(v)| !v.is_empty())690.map(|DefaultFieldValues::Iceberg(v)| v),691);692693// TODO: We ignore the parameter for some scan types to maintain old behavior,694// as they currently don't expose an API for it to be configured.695let extra_columns_policy = match &*scan_type {696#[cfg(feature = "parquet")]697FileScanIR::Parquet { .. } => unified_scan_args.extra_columns_policy,698699_ => {700if unified_scan_args.projection.is_some() {701ExtraColumnsPolicy::Ignore702} else {703ExtraColumnsPolicy::Raise704}705},706};707708let forbid_extra_columns = ForbidExtraColumns::opt_new(709&extra_columns_policy,710&file_schema,711unified_scan_args.column_mapping.as_ref(),712);713714let mut multi_scan_node = PhysNodeKind::MultiScan {715scan_sources,716file_reader_builder,717cloud_options,718file_projection_builder,719output_schema: output_schema.clone(),720row_index: None,721pre_slice: None,722predicate: None,723hive_parts,724cast_columns_policy: unified_scan_args.cast_columns_policy,725missing_columns_policy: unified_scan_args.missing_columns_policy,726forbid_extra_columns,727include_file_paths: unified_scan_args.include_file_paths,728// Set to None if empty for performance.729deletion_files: DeletionFilesList::filter_empty(730unified_scan_args.deletion_files,731),732file_schema,733};734735let PhysNodeKind::MultiScan {736output_schema: multi_scan_output_schema,737row_index: row_index_to_multiscan,738pre_slice: pre_slice_to_multiscan,739predicate: predicate_to_multiscan,740..741} = &mut multi_scan_node742else {743unreachable!()744};745746let pre_slice = unified_scan_args.pre_slice.clone();747748let mut row_index_post = unified_scan_args.row_index;749let mut pre_slice_post = pre_slice.clone();750let mut predicate_post = predicate;751752// Always send predicate and slice to multiscan as they can be used to prune files. If the753// underlying reader does not support predicates, multiscan will apply it in post.754*predicate_to_multiscan = predicate_post.take();755// * Negative slice is resolved internally by the multiscan.756// * Note that is done via a row-count pass757*pre_slice_to_multiscan = pre_slice_post.take();758759// * If a predicate was pushed then we always push row index760if predicate_to_multiscan.is_some()761|| matches!(pre_slice, Some(Slice::Negative { .. }))762{763*row_index_to_multiscan = row_index_post.take();764}765766// TODO767// Projection pushdown could change the row index column position. Ideally it shouldn't,768// and instead just put a projection on top of the scan node in the IR. But for now769// we do that step here.770let mut schema_after_row_index_post = multi_scan_output_schema.clone();771let mut reorder_after_row_index_post = false;772773// Remove row index from multiscan schema if not pushed.774if let Some(ri) = row_index_post.as_ref() {775let row_index_post_position =776multi_scan_output_schema.index_of(&ri.name).unwrap();777let (_, dtype) = Arc::make_mut(multi_scan_output_schema)778.shift_remove_index(row_index_post_position)779.unwrap();780781if row_index_post_position != 0 {782reorder_after_row_index_post = true;783let mut schema =784Schema::with_capacity(multi_scan_output_schema.len() + 1);785schema.extend([(ri.name.clone(), dtype)]);786schema.extend(787multi_scan_output_schema788.iter()789.map(|(k, v)| (k.clone(), v.clone())),790);791schema_after_row_index_post = Arc::new(schema);792}793}794795// If we have no predicate and no slice or positive slice, we can reorder the row index to after796// the slice by adjusting the offset. This can remove a serial synchronization step in multiscan797// and allow the reader to still skip rows.798let row_index_post_after_slice = (|| {799let mut row_index = row_index_post.take()?;800801let positive_offset = match pre_slice {802Some(Slice::Positive { offset, .. }) => Some(offset),803None => Some(0),804Some(Slice::Negative { .. }) => unreachable!(),805}?;806807row_index.offset = row_index.offset.saturating_add(808IdxSize::try_from(positive_offset).unwrap_or(IdxSize::MAX),809);810811Some(row_index)812})();813814let mut stream = {815let node_key = phys_sm.insert(PhysNode::new(816multi_scan_output_schema.clone(),817multi_scan_node,818));819PhysStream::first(node_key)820};821822if let Some(ri) = row_index_post {823let node = PhysNodeKind::WithRowIndex {824input: stream,825name: ri.name,826offset: Some(ri.offset),827};828829let node_key = phys_sm.insert(PhysNode {830output_schema: schema_after_row_index_post.clone(),831kind: node,832});833834stream = PhysStream::first(node_key);835836if reorder_after_row_index_post {837let node = PhysNodeKind::SimpleProjection {838input: stream,839columns: output_schema.iter_names_cloned().collect(),840};841842let node_key = phys_sm.insert(PhysNode {843output_schema: output_schema.clone(),844kind: node,845});846847stream = PhysStream::first(node_key);848}849}850851if let Some(pre_slice) = pre_slice_post {852// TODO: Use native Slice enum in the slice node.853let (offset, length) = <(i64, usize)>::try_from(pre_slice).unwrap();854stream = build_slice_stream(stream, offset, length, phys_sm);855}856857if let Some(ri) = row_index_post_after_slice {858let node = PhysNodeKind::WithRowIndex {859input: stream,860name: ri.name,861offset: Some(ri.offset),862};863864let node_key = phys_sm.insert(PhysNode {865output_schema: schema_after_row_index_post,866kind: node,867});868869stream = PhysStream::first(node_key);870871if reorder_after_row_index_post {872let node = PhysNodeKind::SimpleProjection {873input: stream,874columns: output_schema.iter_names_cloned().collect(),875};876877let node_key = phys_sm.insert(PhysNode {878output_schema: output_schema.clone(),879kind: node,880});881882stream = PhysStream::first(node_key);883}884}885886if let Some(predicate) = predicate_post {887stream = build_filter_stream(888stream, predicate, expr_arena, phys_sm, expr_cache, ctx,889)?;890}891892return Ok(stream);893}894}895},896897#[cfg(feature = "python")]898IR::PythonScan { options } => PhysNodeKind::PythonScan {899options: options.clone(),900},901902IR::Cache { input, id } => {903let id = *id;904if let Some(cached) = cache_nodes.get(&id) {905return Ok(*cached);906}907908let phys_input = lower_ir!(*input)?;909cache_nodes.insert(id, phys_input);910return Ok(phys_input);911},912913IR::GroupBy {914input,915keys,916aggs,917schema: output_schema,918apply,919maintain_order,920options,921} => {922let input = *input;923let keys = keys.clone();924let aggs = aggs.clone();925let output_schema = output_schema.clone();926let apply = apply.clone();927let maintain_order = *maintain_order;928let options = options.clone();929930let phys_input = lower_ir!(input)?;931return build_group_by_stream(932phys_input,933&keys,934&aggs,935output_schema,936maintain_order,937options,938apply,939expr_arena,940phys_sm,941expr_cache,942ctx,943);944},945IR::Join {946input_left,947input_right,948schema: _,949left_on,950right_on,951options,952} => {953let input_left = *input_left;954let input_right = *input_right;955let left_on = left_on.clone();956let right_on = right_on.clone();957let args = options.args.clone();958let options = options.options.clone();959let phys_left = lower_ir!(input_left)?;960let phys_right = lower_ir!(input_right)?;961if (args.how.is_equi() || args.how.is_semi_anti()) && !args.validation.needs_checks() {962// When lowering the expressions for the keys we need to ensure we keep around the963// payload columns, otherwise the input nodes can get replaced by input-independent964// nodes since the lowering code does not see we access any non-literal expressions.965// So we add dummy expressions before lowering and remove them afterwards.966let mut aug_left_on = left_on.clone();967for name in phys_sm[phys_left.node].output_schema.iter_names() {968let col_expr = expr_arena.add(AExpr::Column(name.clone()));969aug_left_on.push(ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone())));970}971let mut aug_right_on = right_on.clone();972for name in phys_sm[phys_right.node].output_schema.iter_names() {973let col_expr = expr_arena.add(AExpr::Column(name.clone()));974aug_right_on.push(ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone())));975}976let (trans_input_left, mut trans_left_on) = lower_exprs(977phys_left,978&aug_left_on,979expr_arena,980phys_sm,981expr_cache,982ctx,983)?;984let (trans_input_right, mut trans_right_on) = lower_exprs(985phys_right,986&aug_right_on,987expr_arena,988phys_sm,989expr_cache,990ctx,991)?;992trans_left_on.drain(left_on.len()..);993trans_right_on.drain(right_on.len()..);994995let node = if args.how.is_equi() {996phys_sm.insert(PhysNode::new(997output_schema,998PhysNodeKind::EquiJoin {999input_left: trans_input_left,1000input_right: trans_input_right,1001left_on: trans_left_on,1002right_on: trans_right_on,1003args: args.clone(),1004},1005))1006} else {1007phys_sm.insert(PhysNode::new(1008output_schema,1009PhysNodeKind::SemiAntiJoin {1010input_left: trans_input_left,1011input_right: trans_input_right,1012left_on: trans_left_on,1013right_on: trans_right_on,1014args: args.clone(),1015output_bool: false,1016},1017))1018};1019let mut stream = PhysStream::first(node);1020if let Some((offset, len)) = args.slice {1021stream = build_slice_stream(stream, offset, len, phys_sm);1022}1023return Ok(stream);1024} else if args.how.is_cross() {1025let node = phys_sm.insert(PhysNode::new(1026output_schema,1027PhysNodeKind::CrossJoin {1028input_left: phys_left,1029input_right: phys_right,1030args: args.clone(),1031},1032));1033let mut stream = PhysStream::first(node);1034if let Some((offset, len)) = args.slice {1035stream = build_slice_stream(stream, offset, len, phys_sm);1036}1037return Ok(stream);1038} else {1039PhysNodeKind::InMemoryJoin {1040input_left: phys_left,1041input_right: phys_right,1042left_on,1043right_on,1044args,1045options,1046}1047}1048},10491050IR::Distinct { input, options } => {1051let options = options.clone();1052let phys_input = lower_ir!(*input)?;10531054// We don't have a dedicated distinct operator (yet), lower to group1055// by with an aggregate for each column.1056let input_schema = &phys_sm[phys_input.node].output_schema;1057if input_schema.is_empty() {1058// Can't group (or have duplicates) if dataframe has zero-width.1059return Ok(phys_input);1060}10611062if options.maintain_order && options.keep_strategy == UniqueKeepStrategy::Last {1063// Unfortunately the order-preserving groupby always orders by the first occurrence1064// of the group so we can't lower this and have to fallback.1065let input_schema = phys_sm[phys_input.node].output_schema.clone();1066let lmdf = Arc::new(LateMaterializedDataFrame::default());1067let mut lp_arena = Arena::default();1068let input_lp_node = lp_arena.add(lmdf.clone().as_ir_node(input_schema));1069let distinct_lp_node = lp_arena.add(IR::Distinct {1070input: input_lp_node,1071options,1072});1073let executor = Mutex::new(create_physical_plan(1074distinct_lp_node,1075&mut lp_arena,1076expr_arena,1077None,1078)?);10791080let format_str = ctx.prepare_visualization.then(|| {1081let mut buffer = String::new();1082write_ir_non_recursive(1083&mut buffer,1084ir_arena.get(node),1085expr_arena,1086phys_sm.get(phys_input.node).unwrap().output_schema.as_ref(),10870,1088)1089.unwrap();1090buffer1091});1092let distinct_node = PhysNode {1093output_schema,1094kind: PhysNodeKind::InMemoryMap {1095input: phys_input,1096map: Arc::new(move |df| {1097lmdf.set_materialized_dataframe(df);1098let mut state = ExecutionState::new();1099executor.lock().execute(&mut state)1100}),1101format_str,1102},1103};11041105return Ok(PhysStream::first(phys_sm.insert(distinct_node)));1106}11071108// Create the key and aggregate expressions.1109let all_col_names = input_schema.iter_names().cloned().collect_vec();1110let key_names = if let Some(subset) = options.subset {1111subset.to_vec()1112} else {1113all_col_names.clone()1114};1115let key_name_set: PlHashSet<_> = key_names.iter().cloned().collect();11161117let mut group_by_output_schema = Schema::with_capacity(all_col_names.len() + 1);1118let keys = key_names1119.iter()1120.map(|name| {1121group_by_output_schema1122.insert(name.clone(), input_schema.get(name).unwrap().clone());1123let col_expr = expr_arena.add(AExpr::Column(name.clone()));1124ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone()))1125})1126.collect_vec();11271128let mut aggs = all_col_names1129.iter()1130.filter(|name| !key_name_set.contains(*name))1131.map(|name| {1132group_by_output_schema1133.insert(name.clone(), input_schema.get(name).unwrap().clone());1134let col_expr = expr_arena.add(AExpr::Column(name.clone()));1135use UniqueKeepStrategy::*;1136let agg_expr = match options.keep_strategy {1137First | None | Any => {1138expr_arena.add(AExpr::Agg(IRAggExpr::First(col_expr)))1139},1140Last => expr_arena.add(AExpr::Agg(IRAggExpr::Last(col_expr))),1141};1142ExprIR::new(agg_expr, OutputName::ColumnLhs(name.clone()))1143})1144.collect_vec();11451146if options.keep_strategy == UniqueKeepStrategy::None {1147// Track the length so we can filter out non-unique keys later.1148let name = unique_column_name();1149group_by_output_schema.insert(name.clone(), DataType::IDX_DTYPE);1150aggs.push(ExprIR::new(1151expr_arena.add(AExpr::Len),1152OutputName::Alias(name),1153));1154}11551156let mut stream = build_group_by_stream(1157phys_input,1158&keys,1159&aggs,1160Arc::new(group_by_output_schema),1161options.maintain_order,1162Arc::new(GroupbyOptions::default()),1163None,1164expr_arena,1165phys_sm,1166expr_cache,1167ctx,1168)?;11691170if options.keep_strategy == UniqueKeepStrategy::None {1171// Filter to keep only those groups with length 1.1172let unique_name = aggs.last().unwrap().output_name();1173let left = expr_arena.add(AExpr::Column(unique_name.clone()));1174let right = expr_arena.add(AExpr::Literal(LiteralValue::new_idxsize(1)));1175let predicate_aexpr = expr_arena.add(AExpr::BinaryExpr {1176left,1177op: polars_plan::dsl::Operator::Eq,1178right,1179});1180let predicate =1181ExprIR::new(predicate_aexpr, OutputName::ColumnLhs(unique_name.clone()));1182stream =1183build_filter_stream(stream, predicate, expr_arena, phys_sm, expr_cache, ctx)?;1184}11851186// Restore column order and drop the temporary length column if any.1187let exprs = all_col_names1188.iter()1189.map(|name| {1190let col_expr = expr_arena.add(AExpr::Column(name.clone()));1191ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone()))1192})1193.collect_vec();1194stream = build_select_stream(stream, &exprs, expr_arena, phys_sm, expr_cache, ctx)?;11951196// We didn't pass the slice earlier to build_group_by_stream because1197// we might have the intermediate keep = "none" filter.1198if let Some((offset, length)) = options.slice {1199stream = build_slice_stream(stream, offset, length, phys_sm);1200}12011202return Ok(stream);1203},1204IR::ExtContext { .. } => todo!(),1205IR::Invalid => unreachable!(),1206};12071208let node_key = phys_sm.insert(PhysNode::new(output_schema, node_kind));1209Ok(PhysStream::first(node_key))1210}121112121213