Path: blob/main/crates/polars-stream/src/physical_plan/mod.rs
8433 views
use std::num::NonZeroUsize;1use std::sync::Arc;23use polars_core::frame::DataFrame;4use polars_core::prelude::{IdxSize, InitHashMaps, PlHashMap, SortMultipleOptions};5use polars_core::schema::{Schema, SchemaRef};6use polars_error::PolarsResult;7use polars_io::RowIndex;8use polars_io::cloud::CloudOptions;9use polars_ops::frame::JoinArgs;10use polars_plan::dsl::deletion::DeletionFilesList;11use polars_plan::dsl::{12CastColumnsPolicy, FileSinkOptions, JoinTypeOptionsIR, MissingColumnsPolicy,13PartitionedSinkOptionsIR, PredicateFileSkip, ScanSources, TableStatistics,14};15use polars_plan::plans::expr_ir::ExprIR;16use polars_plan::plans::hive::HivePartitionsDf;17use polars_plan::plans::{AExpr, DataFrameUdf, IR};1819mod fmt;20mod io;21mod lower_expr;22mod lower_group_by;23mod lower_ir;24mod to_graph;2526pub use fmt::{NodeStyle, visualize_plan};27use polars_plan::prelude::PlanCallback;28#[cfg(feature = "dynamic_group_by")]29use polars_time::DynamicGroupOptions;30use polars_time::{ClosedWindow, Duration};31use polars_utils::arena::{Arena, Node};32use polars_utils::pl_str::PlSmallStr;33use polars_utils::slice_enum::Slice;34use slotmap::{SecondaryMap, SlotMap};35pub use to_graph::physical_plan_to_graph;3637pub use self::lower_ir::StreamingLowerIRContext;38use crate::nodes::io_sources::multi_scan::components::forbid_extra_columns::ForbidExtraColumns;39use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;40use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;41use crate::physical_plan::lower_expr::ExprCache;4243slotmap::new_key_type! {44/// Key used for physical nodes.45pub struct PhysNodeKey;46}4748impl PhysNodeKey {49pub fn as_ffi(&self) -> u64 {50self.0.as_ffi()51}52}5354/// A node in the physical plan.55///56/// A physical plan is created when the `IR` is translated to a directed57/// acyclic graph of operations that can run on the streaming engine.58#[derive(Clone, Debug)]59pub struct PhysNode {60output_schema: Arc<Schema>,61kind: PhysNodeKind,62}6364impl PhysNode {65pub fn new(output_schema: Arc<Schema>, kind: PhysNodeKind) -> Self {66Self {67output_schema,68kind,69}70}7172pub fn kind(&self) -> &PhysNodeKind {73&self.kind74}75}7677/// A handle representing a physical stream of data with a fixed schema in the78/// physical plan. It consists of a reference to a physical node as well as the79/// output port on that node to connect to receive the stream.80#[derive(Clone, Debug, Copy, PartialEq, Eq, Hash)]81pub struct PhysStream {82pub node: PhysNodeKey,83pub port: usize,84}8586impl PhysStream {87#[allow(unused)]88pub fn new(node: PhysNodeKey, port: usize) -> Self {89Self { node, port }90}9192// Convenience method to refer to the first output port of a physical node.93pub fn first(node: PhysNodeKey) -> Self {94Self { node, port: 0 }95}96}9798/// Behaviour when handling multiple DataFrames with different heights.99100#[derive(Clone, Debug, Copy)]101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]102#[cfg_attr(103feature = "physical_plan_visualization_schema",104derive(schemars::JsonSchema)105)]106pub enum ZipBehavior {107/// Fill the shorter DataFrames with nulls to the height of the longest DataFrame.108NullExtend,109/// All inputs must be the same height, or have length 1 in which case they are broadcast.110Broadcast,111/// Raise an error if the DataFrames have different heights.112Strict,113}114115#[derive(Clone, Debug)]116pub enum PhysNodeKind {117InMemorySource {118df: Arc<DataFrame>,119disable_morsel_split: bool,120},121122Select {123input: PhysStream,124selectors: Vec<ExprIR>,125extend_original: bool,126},127128InputIndependentSelect {129selectors: Vec<ExprIR>,130},131132WithRowIndex {133input: PhysStream,134name: PlSmallStr,135offset: Option<IdxSize>,136},137138Reduce {139input: PhysStream,140exprs: Vec<ExprIR>,141},142143StreamingSlice {144input: PhysStream,145offset: usize,146length: usize,147},148149NegativeSlice {150input: PhysStream,151offset: i64,152length: usize,153},154155DynamicSlice {156input: PhysStream,157offset: PhysStream,158length: PhysStream,159},160161Shift {162input: PhysStream,163offset: PhysStream,164fill: Option<PhysStream>,165},166167Filter {168input: PhysStream,169predicate: ExprIR,170},171172SimpleProjection {173input: PhysStream,174columns: Vec<PlSmallStr>,175},176177InMemorySink {178input: PhysStream,179},180181CallbackSink {182input: PhysStream,183function: PlanCallback<DataFrame, bool>,184maintain_order: bool,185chunk_size: Option<NonZeroUsize>,186},187188FileSink {189input: PhysStream,190options: FileSinkOptions,191},192193PartitionedSink {194input: PhysStream,195options: PartitionedSinkOptionsIR,196},197198SinkMultiple {199sinks: Vec<PhysNodeKey>,200},201202/// Generic fallback for (as-of-yet) unsupported streaming mappings.203/// Fully sinks all data to an in-memory data frame and uses the in-memory204/// engine to perform the map.205InMemoryMap {206input: PhysStream,207map: Arc<dyn DataFrameUdf>,208209/// A formatted explain of what the in-memory map. This usually calls format on the IR.210format_str: Option<String>,211},212213Map {214input: PhysStream,215map: Arc<dyn DataFrameUdf>,216217/// A formatted explain of what the in-memory map. This usually calls format on the IR.218format_str: Option<String>,219},220221SortedGroupBy {222input: PhysStream,223key: PlSmallStr,224aggs: Vec<ExprIR>,225slice: Option<(IdxSize, IdxSize)>,226},227228Sort {229input: PhysStream,230by_column: Vec<ExprIR>,231slice: Option<(i64, usize)>,232sort_options: SortMultipleOptions,233},234235TopK {236input: PhysStream,237k: PhysStream,238by_column: Vec<ExprIR>,239reverse: Vec<bool>,240nulls_last: Vec<bool>,241},242243Repeat {244value: PhysStream,245repeats: PhysStream,246},247248#[cfg(feature = "cum_agg")]249CumAgg {250input: PhysStream,251kind: crate::nodes::cum_agg::CumAggKind,252},253254// Parameter is the input stream255GatherEvery {256input: PhysStream,257n: usize,258offset: usize,259},260Rle(PhysStream),261RleId(PhysStream),262PeakMinMax {263input: PhysStream,264is_peak_max: bool,265},266267OrderedUnion {268inputs: Vec<PhysStream>,269},270271UnorderedUnion {272inputs: Vec<PhysStream>,273},274275Zip {276inputs: Vec<PhysStream>,277zip_behavior: ZipBehavior,278},279280#[allow(unused)]281Multiplexer {282input: PhysStream,283},284285MultiScan {286scan_sources: ScanSources,287288file_reader_builder: Arc<dyn FileReaderBuilder>,289cloud_options: Option<Arc<CloudOptions>>,290291/// Columns to project from the file.292file_projection_builder: ProjectionBuilder,293/// Final output schema of morsels being sent out of MultiScan.294output_schema: SchemaRef,295296row_index: Option<RowIndex>,297pre_slice: Option<Slice>,298predicate: Option<ExprIR>,299predicate_file_skip_applied: Option<PredicateFileSkip>,300301hive_parts: Option<HivePartitionsDf>,302include_file_paths: Option<PlSmallStr>,303cast_columns_policy: CastColumnsPolicy,304missing_columns_policy: MissingColumnsPolicy,305forbid_extra_columns: Option<ForbidExtraColumns>,306307deletion_files: Option<DeletionFilesList>,308table_statistics: Option<TableStatistics>,309310/// Schema of columns contained in the file. Does not contain external columns (e.g. hive / row_index).311file_schema: SchemaRef,312disable_morsel_split: bool,313},314315#[cfg(feature = "python")]316PythonScan {317options: polars_plan::plans::python::PythonOptions,318},319320GroupBy {321inputs: Vec<PhysStream>,322// Must have the same schema when applied for each input.323key_per_input: Vec<Vec<ExprIR>>,324// Must be a 'simple' expression, a singular column feeding into a single aggregate, or Len.325aggs_per_input: Vec<Vec<ExprIR>>,326},327328#[cfg(feature = "dynamic_group_by")]329DynamicGroupBy {330input: PhysStream,331options: DynamicGroupOptions,332aggs: Vec<ExprIR>,333slice: Option<(IdxSize, IdxSize)>,334},335336#[cfg(feature = "dynamic_group_by")]337RollingGroupBy {338input: PhysStream,339index_column: PlSmallStr,340period: Duration,341offset: Duration,342closed: ClosedWindow,343slice: Option<(IdxSize, IdxSize)>,344aggs: Vec<ExprIR>,345},346347EquiJoin {348input_left: PhysStream,349input_right: PhysStream,350left_on: Vec<ExprIR>,351right_on: Vec<ExprIR>,352args: JoinArgs,353},354355MergeJoin {356input_left: PhysStream,357input_right: PhysStream,358left_on: Vec<PlSmallStr>,359right_on: Vec<PlSmallStr>,360tmp_left_key_col: Option<PlSmallStr>,361tmp_right_key_col: Option<PlSmallStr>,362descending: bool,363nulls_last: bool,364keys_row_encoded: bool,365args: JoinArgs,366},367368SemiAntiJoin {369input_left: PhysStream,370input_right: PhysStream,371left_on: Vec<ExprIR>,372right_on: Vec<ExprIR>,373args: JoinArgs,374output_bool: bool,375},376377CrossJoin {378input_left: PhysStream,379input_right: PhysStream,380args: JoinArgs,381},382383AsOfJoin {384input_left: PhysStream,385input_right: PhysStream,386left_on: PlSmallStr,387right_on: PlSmallStr,388tmp_left_key_col: Option<PlSmallStr>,389tmp_right_key_col: Option<PlSmallStr>,390args: JoinArgs,391},392393/// Generic fallback for (as-of-yet) unsupported streaming joins.394/// Fully sinks all data to in-memory data frames and uses the in-memory395/// engine to perform the join.396InMemoryJoin {397input_left: PhysStream,398input_right: PhysStream,399left_on: Vec<ExprIR>,400right_on: Vec<ExprIR>,401args: JoinArgs,402options: Option<JoinTypeOptionsIR>,403},404405#[cfg(feature = "merge_sorted")]406MergeSorted {407input_left: PhysStream,408input_right: PhysStream,409},410411#[cfg(feature = "ewma")]412EwmMean {413input: PhysStream,414options: polars_ops::series::EWMOptions,415},416417#[cfg(feature = "ewma")]418EwmVar {419input: PhysStream,420options: polars_ops::series::EWMOptions,421},422423#[cfg(feature = "ewma")]424EwmStd {425input: PhysStream,426options: polars_ops::series::EWMOptions,427},428}429430fn visit_node_inputs_mut(431roots: Vec<PhysNodeKey>,432phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,433mut visit: impl FnMut(&mut PhysStream),434) {435let mut to_visit = roots;436let mut seen: SecondaryMap<PhysNodeKey, ()> =437to_visit.iter().copied().map(|n| (n, ())).collect();438macro_rules! rec {439($n:expr) => {440let n = $n;441if seen.insert(n, ()).is_none() {442to_visit.push(n)443}444};445}446while let Some(node) = to_visit.pop() {447match &mut phys_sm[node].kind {448PhysNodeKind::InMemorySource { .. }449| PhysNodeKind::MultiScan { .. }450| PhysNodeKind::InputIndependentSelect { .. } => {},451#[cfg(feature = "python")]452PhysNodeKind::PythonScan { .. } => {},453PhysNodeKind::Select { input, .. }454| PhysNodeKind::WithRowIndex { input, .. }455| PhysNodeKind::Reduce { input, .. }456| PhysNodeKind::StreamingSlice { input, .. }457| PhysNodeKind::NegativeSlice { input, .. }458| PhysNodeKind::Filter { input, .. }459| PhysNodeKind::SimpleProjection { input, .. }460| PhysNodeKind::InMemorySink { input }461| PhysNodeKind::CallbackSink { input, .. }462| PhysNodeKind::FileSink { input, .. }463| PhysNodeKind::PartitionedSink { input, .. }464| PhysNodeKind::InMemoryMap { input, .. }465| PhysNodeKind::SortedGroupBy { input, .. }466| PhysNodeKind::Map { input, .. }467| PhysNodeKind::Sort { input, .. }468| PhysNodeKind::Multiplexer { input }469| PhysNodeKind::GatherEvery { input, .. }470| PhysNodeKind::Rle(input)471| PhysNodeKind::RleId(input)472| PhysNodeKind::PeakMinMax { input, .. } => {473rec!(input.node);474visit(input);475},476477#[cfg(feature = "dynamic_group_by")]478PhysNodeKind::DynamicGroupBy { input, .. } => {479rec!(input.node);480visit(input);481},482#[cfg(feature = "dynamic_group_by")]483PhysNodeKind::RollingGroupBy { input, .. } => {484rec!(input.node);485visit(input);486},487488#[cfg(feature = "cum_agg")]489PhysNodeKind::CumAgg { input, .. } => {490rec!(input.node);491visit(input);492},493494PhysNodeKind::InMemoryJoin {495input_left,496input_right,497..498}499| PhysNodeKind::EquiJoin {500input_left,501input_right,502..503}504| PhysNodeKind::MergeJoin {505input_left,506input_right,507..508}509| PhysNodeKind::SemiAntiJoin {510input_left,511input_right,512..513}514| PhysNodeKind::CrossJoin {515input_left,516input_right,517..518}519| PhysNodeKind::AsOfJoin {520input_left,521input_right,522..523} => {524rec!(input_left.node);525rec!(input_right.node);526visit(input_left);527visit(input_right);528},529530#[cfg(feature = "merge_sorted")]531PhysNodeKind::MergeSorted {532input_left,533input_right,534..535} => {536rec!(input_left.node);537rec!(input_right.node);538visit(input_left);539visit(input_right);540},541542PhysNodeKind::TopK { input, k, .. } => {543rec!(input.node);544rec!(k.node);545visit(input);546visit(k);547},548549PhysNodeKind::DynamicSlice {550input,551offset,552length,553} => {554rec!(input.node);555rec!(offset.node);556rec!(length.node);557visit(input);558visit(offset);559visit(length);560},561562PhysNodeKind::Shift {563input,564offset,565fill,566} => {567rec!(input.node);568rec!(offset.node);569if let Some(fill) = fill {570rec!(fill.node);571}572visit(input);573visit(offset);574if let Some(fill) = fill {575visit(fill);576}577},578579PhysNodeKind::Repeat { value, repeats } => {580rec!(value.node);581rec!(repeats.node);582visit(value);583visit(repeats);584},585586PhysNodeKind::GroupBy { inputs, .. }587| PhysNodeKind::OrderedUnion { inputs }588| PhysNodeKind::UnorderedUnion { inputs }589| PhysNodeKind::Zip { inputs, .. } => {590for input in inputs {591rec!(input.node);592visit(input);593}594},595596PhysNodeKind::SinkMultiple { sinks } => {597for sink in sinks {598rec!(*sink);599visit(&mut PhysStream::first(*sink));600}601},602603#[cfg(feature = "ewma")]604PhysNodeKind::EwmMean { input, options: _ }605| PhysNodeKind::EwmVar { input, options: _ }606| PhysNodeKind::EwmStd { input, options: _ } => {607rec!(input.node);608visit(input)609},610}611}612}613614fn insert_multiplexers(roots: Vec<PhysNodeKey>, phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>) {615let mut refcount = PlHashMap::new();616visit_node_inputs_mut(roots.clone(), phys_sm, |i| {617*refcount.entry(*i).or_insert(0) += 1;618});619620let mut multiplexer_map: PlHashMap<PhysStream, PhysStream> = refcount621.into_iter()622.filter(|(_stream, refcount)| *refcount > 1)623.map(|(stream, _refcount)| {624let input_schema = phys_sm[stream.node].output_schema.clone();625let multiplexer_node = phys_sm.insert(PhysNode::new(626input_schema,627PhysNodeKind::Multiplexer { input: stream },628));629(stream, PhysStream::first(multiplexer_node))630})631.collect();632633visit_node_inputs_mut(roots, phys_sm, |i| {634if let Some(m) = multiplexer_map.get_mut(i) {635*i = *m;636m.port += 1;637}638});639}640641pub fn build_physical_plan(642root: Node,643ir_arena: &mut Arena<IR>,644expr_arena: &mut Arena<AExpr>,645phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,646ctx: StreamingLowerIRContext,647) -> PolarsResult<PhysNodeKey> {648let mut schema_cache = PlHashMap::with_capacity(ir_arena.len());649let mut expr_cache = ExprCache::with_capacity(expr_arena.len());650let mut cache_nodes = PlHashMap::new();651let phys_root = lower_ir::lower_ir(652root,653ir_arena,654expr_arena,655phys_sm,656&mut schema_cache,657&mut expr_cache,658&mut cache_nodes,659ctx,660None,661)?;662insert_multiplexers(vec![phys_root.node], phys_sm);663Ok(phys_root.node)664}665666667