Path: blob/main/crates/polars-stream/src/physical_plan/mod.rs
6939 views
use std::sync::Arc;12use polars_core::frame::DataFrame;3use polars_core::prelude::{IdxSize, InitHashMaps, PlHashMap, SortMultipleOptions};4use polars_core::schema::{Schema, SchemaRef};5use polars_error::PolarsResult;6use polars_io::RowIndex;7use polars_io::cloud::CloudOptions;8use polars_ops::frame::JoinArgs;9use polars_plan::dsl::deletion::DeletionFilesList;10use polars_plan::dsl::{11CastColumnsPolicy, JoinTypeOptionsIR, MissingColumnsPolicy, PartitionTargetCallback,12PartitionVariantIR, ScanSources, SinkFinishCallback, SinkOptions, SinkTarget, SortColumnIR,13};14use polars_plan::plans::hive::HivePartitionsDf;15use polars_plan::plans::{AExpr, DataFrameUdf, IR};16use polars_plan::prelude::expr_ir::ExprIR;1718mod fmt;19mod io;20mod lower_expr;21mod lower_group_by;22mod lower_ir;23mod to_graph;2425pub use fmt::visualize_plan;26use polars_plan::prelude::FileType;27use polars_utils::arena::{Arena, Node};28use polars_utils::pl_str::PlSmallStr;29use polars_utils::plpath::PlPath;30use polars_utils::slice_enum::Slice;31use slotmap::{SecondaryMap, SlotMap};32pub use to_graph::physical_plan_to_graph;3334pub use self::lower_ir::StreamingLowerIRContext;35use crate::nodes::io_sources::multi_scan::components::forbid_extra_columns::ForbidExtraColumns;36use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;37use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;38use crate::physical_plan::lower_expr::ExprCache;3940slotmap::new_key_type! {41/// Key used for physical nodes.42pub struct PhysNodeKey;43}4445/// A node in the physical plan.46///47/// A physical plan is created when the `IR` is translated to a directed48/// acyclic graph of operations that can run on the streaming engine.49#[derive(Clone, Debug)]50pub struct PhysNode {51output_schema: Arc<Schema>,52kind: PhysNodeKind,53}5455impl PhysNode {56pub fn new(output_schema: Arc<Schema>, kind: PhysNodeKind) -> Self {57Self {58output_schema,59kind,60}61}6263pub fn kind(&self) -> &PhysNodeKind {64&self.kind65}66}6768/// A handle representing a physical stream of data with a fixed schema in the69/// physical plan. It consists of a reference to a physical node as well as the70/// output port on that node to connect to receive the stream.71#[derive(Clone, Debug, Copy, PartialEq, Eq, Hash)]72pub struct PhysStream {73pub node: PhysNodeKey,74pub port: usize,75}7677impl PhysStream {78#[expect(unused)]79pub fn new(node: PhysNodeKey, port: usize) -> Self {80Self { node, port }81}8283// Convenience method to refer to the first output port of a physical node.84pub fn first(node: PhysNodeKey) -> Self {85Self { node, port: 0 }86}87}8889#[derive(Clone, Debug)]90pub enum PhysNodeKind {91InMemorySource {92df: Arc<DataFrame>,93},9495Select {96input: PhysStream,97selectors: Vec<ExprIR>,98extend_original: bool,99},100101InputIndependentSelect {102selectors: Vec<ExprIR>,103},104105WithRowIndex {106input: PhysStream,107name: PlSmallStr,108offset: Option<IdxSize>,109},110111Reduce {112input: PhysStream,113exprs: Vec<ExprIR>,114},115116StreamingSlice {117input: PhysStream,118offset: usize,119length: usize,120},121122NegativeSlice {123input: PhysStream,124offset: i64,125length: usize,126},127128DynamicSlice {129input: PhysStream,130offset: PhysStream,131length: PhysStream,132},133134Shift {135input: PhysStream,136offset: PhysStream,137fill: Option<PhysStream>,138},139140Filter {141input: PhysStream,142predicate: ExprIR,143},144145SimpleProjection {146input: PhysStream,147columns: Vec<PlSmallStr>,148},149150InMemorySink {151input: PhysStream,152},153154FileSink {155target: SinkTarget,156sink_options: SinkOptions,157file_type: FileType,158input: PhysStream,159cloud_options: Option<CloudOptions>,160},161162PartitionSink {163input: PhysStream,164base_path: Arc<PlPath>,165file_path_cb: Option<PartitionTargetCallback>,166sink_options: SinkOptions,167variant: PartitionVariantIR,168file_type: FileType,169cloud_options: Option<CloudOptions>,170per_partition_sort_by: Option<Vec<SortColumnIR>>,171finish_callback: Option<SinkFinishCallback>,172},173174SinkMultiple {175sinks: Vec<PhysNodeKey>,176},177178/// Generic fallback for (as-of-yet) unsupported streaming mappings.179/// Fully sinks all data to an in-memory data frame and uses the in-memory180/// engine to perform the map.181InMemoryMap {182input: PhysStream,183map: Arc<dyn DataFrameUdf>,184185/// A formatted explain of what the in-memory map. This usually calls format on the IR.186format_str: Option<String>,187},188189Map {190input: PhysStream,191map: Arc<dyn DataFrameUdf>,192},193194Sort {195input: PhysStream,196by_column: Vec<ExprIR>,197slice: Option<(i64, usize)>,198sort_options: SortMultipleOptions,199},200201TopK {202input: PhysStream,203k: PhysStream,204by_column: Vec<ExprIR>,205reverse: Vec<bool>,206nulls_last: Vec<bool>,207},208209Repeat {210value: PhysStream,211repeats: PhysStream,212},213214#[cfg(feature = "cum_agg")]215CumAgg {216input: PhysStream,217kind: crate::nodes::cum_agg::CumAggKind,218},219220// Parameter is the input stream221Rle(PhysStream),222RleId(PhysStream),223PeakMinMax {224input: PhysStream,225is_peak_max: bool,226},227228OrderedUnion {229inputs: Vec<PhysStream>,230},231232Zip {233inputs: Vec<PhysStream>,234/// If true shorter inputs are extended with nulls to the longest input,235/// if false all inputs must be the same length, or have length 1 in236/// which case they are broadcast.237null_extend: bool,238},239240#[allow(unused)]241Multiplexer {242input: PhysStream,243},244245MultiScan {246scan_sources: ScanSources,247248file_reader_builder: Arc<dyn FileReaderBuilder>,249cloud_options: Option<Arc<CloudOptions>>,250251/// Columns to project from the file.252file_projection_builder: ProjectionBuilder,253/// Final output schema of morsels being sent out of MultiScan.254output_schema: SchemaRef,255256row_index: Option<RowIndex>,257pre_slice: Option<Slice>,258predicate: Option<ExprIR>,259260hive_parts: Option<HivePartitionsDf>,261include_file_paths: Option<PlSmallStr>,262cast_columns_policy: CastColumnsPolicy,263missing_columns_policy: MissingColumnsPolicy,264forbid_extra_columns: Option<ForbidExtraColumns>,265266deletion_files: Option<DeletionFilesList>,267268/// Schema of columns contained in the file. Does not contain external columns (e.g. hive / row_index).269file_schema: SchemaRef,270},271272#[cfg(feature = "python")]273PythonScan {274options: polars_plan::plans::python::PythonOptions,275},276277GroupBy {278input: PhysStream,279key: Vec<ExprIR>,280// Must be a 'simple' expression, a singular column feeding into a single aggregate, or Len.281aggs: Vec<ExprIR>,282},283284EquiJoin {285input_left: PhysStream,286input_right: PhysStream,287left_on: Vec<ExprIR>,288right_on: Vec<ExprIR>,289args: JoinArgs,290},291292SemiAntiJoin {293input_left: PhysStream,294input_right: PhysStream,295left_on: Vec<ExprIR>,296right_on: Vec<ExprIR>,297args: JoinArgs,298output_bool: bool,299},300301CrossJoin {302input_left: PhysStream,303input_right: PhysStream,304args: JoinArgs,305},306307/// Generic fallback for (as-of-yet) unsupported streaming joins.308/// Fully sinks all data to in-memory data frames and uses the in-memory309/// engine to perform the join.310InMemoryJoin {311input_left: PhysStream,312input_right: PhysStream,313left_on: Vec<ExprIR>,314right_on: Vec<ExprIR>,315args: JoinArgs,316options: Option<JoinTypeOptionsIR>,317},318319#[cfg(feature = "merge_sorted")]320MergeSorted {321input_left: PhysStream,322input_right: PhysStream,323},324}325326fn visit_node_inputs_mut(327roots: Vec<PhysNodeKey>,328phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,329mut visit: impl FnMut(&mut PhysStream),330) {331let mut to_visit = roots;332let mut seen: SecondaryMap<PhysNodeKey, ()> =333to_visit.iter().copied().map(|n| (n, ())).collect();334macro_rules! rec {335($n:expr) => {336let n = $n;337if seen.insert(n, ()).is_none() {338to_visit.push(n)339}340};341}342while let Some(node) = to_visit.pop() {343match &mut phys_sm[node].kind {344PhysNodeKind::InMemorySource { .. }345| PhysNodeKind::MultiScan { .. }346| PhysNodeKind::InputIndependentSelect { .. } => {},347#[cfg(feature = "python")]348PhysNodeKind::PythonScan { .. } => {},349PhysNodeKind::Select { input, .. }350| PhysNodeKind::WithRowIndex { input, .. }351| PhysNodeKind::Reduce { input, .. }352| PhysNodeKind::StreamingSlice { input, .. }353| PhysNodeKind::NegativeSlice { input, .. }354| PhysNodeKind::Filter { input, .. }355| PhysNodeKind::SimpleProjection { input, .. }356| PhysNodeKind::InMemorySink { input }357| PhysNodeKind::FileSink { input, .. }358| PhysNodeKind::PartitionSink { input, .. }359| PhysNodeKind::InMemoryMap { input, .. }360| PhysNodeKind::Map { input, .. }361| PhysNodeKind::Sort { input, .. }362| PhysNodeKind::Multiplexer { input }363| PhysNodeKind::Rle(input)364| PhysNodeKind::RleId(input)365| PhysNodeKind::PeakMinMax { input, .. }366| PhysNodeKind::GroupBy { input, .. } => {367rec!(input.node);368visit(input);369},370371#[cfg(feature = "cum_agg")]372PhysNodeKind::CumAgg { input, .. } => {373rec!(input.node);374visit(input);375},376377PhysNodeKind::InMemoryJoin {378input_left,379input_right,380..381}382| PhysNodeKind::EquiJoin {383input_left,384input_right,385..386}387| PhysNodeKind::SemiAntiJoin {388input_left,389input_right,390..391}392| PhysNodeKind::CrossJoin {393input_left,394input_right,395..396} => {397rec!(input_left.node);398rec!(input_right.node);399visit(input_left);400visit(input_right);401},402403#[cfg(feature = "merge_sorted")]404PhysNodeKind::MergeSorted {405input_left,406input_right,407..408} => {409rec!(input_left.node);410rec!(input_right.node);411visit(input_left);412visit(input_right);413},414415PhysNodeKind::TopK { input, k, .. } => {416rec!(input.node);417rec!(k.node);418visit(input);419visit(k);420},421422PhysNodeKind::DynamicSlice {423input,424offset,425length,426} => {427rec!(input.node);428rec!(offset.node);429rec!(length.node);430visit(input);431visit(offset);432visit(length);433},434435PhysNodeKind::Shift {436input,437offset,438fill,439} => {440rec!(input.node);441rec!(offset.node);442if let Some(fill) = fill {443rec!(fill.node);444}445visit(input);446visit(offset);447if let Some(fill) = fill {448visit(fill);449}450},451452PhysNodeKind::Repeat { value, repeats } => {453rec!(value.node);454rec!(repeats.node);455visit(value);456visit(repeats);457},458459PhysNodeKind::OrderedUnion { inputs } | PhysNodeKind::Zip { inputs, .. } => {460for input in inputs {461rec!(input.node);462visit(input);463}464},465466PhysNodeKind::SinkMultiple { sinks } => {467for sink in sinks {468rec!(*sink);469visit(&mut PhysStream::first(*sink));470}471},472}473}474}475476fn insert_multiplexers(roots: Vec<PhysNodeKey>, phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>) {477let mut refcount = PlHashMap::new();478visit_node_inputs_mut(roots.clone(), phys_sm, |i| {479*refcount.entry(*i).or_insert(0) += 1;480});481482let mut multiplexer_map: PlHashMap<PhysStream, PhysStream> = refcount483.into_iter()484.filter(|(_stream, refcount)| *refcount > 1)485.map(|(stream, _refcount)| {486let input_schema = phys_sm[stream.node].output_schema.clone();487let multiplexer_node = phys_sm.insert(PhysNode::new(488input_schema,489PhysNodeKind::Multiplexer { input: stream },490));491(stream, PhysStream::first(multiplexer_node))492})493.collect();494495visit_node_inputs_mut(roots, phys_sm, |i| {496if let Some(m) = multiplexer_map.get_mut(i) {497*i = *m;498m.port += 1;499}500});501}502503pub fn build_physical_plan(504root: Node,505ir_arena: &mut Arena<IR>,506expr_arena: &mut Arena<AExpr>,507phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,508ctx: StreamingLowerIRContext,509) -> PolarsResult<PhysNodeKey> {510let mut schema_cache = PlHashMap::with_capacity(ir_arena.len());511let mut expr_cache = ExprCache::with_capacity(expr_arena.len());512let mut cache_nodes = PlHashMap::new();513let phys_root = lower_ir::lower_ir(514root,515ir_arena,516expr_arena,517phys_sm,518&mut schema_cache,519&mut expr_cache,520&mut cache_nodes,521ctx,522)?;523insert_multiplexers(vec![phys_root.node], phys_sm);524Ok(phys_root.node)525}526527528