Path: blob/main/crates/polars-mem-engine/src/planner/lp.rs
8430 views
use polars_core::POOL;1use polars_core::prelude::*;2use polars_expr::state::ExecutionState;3use polars_plan::plans::expr_ir::ExprIR;4use polars_plan::prelude::sink::CallbackSinkType;5use polars_utils::unique_id::UniqueId;6use recursive::recursive;78#[cfg(feature = "python")]9use self::python_dsl::PythonScanSource;10use super::*;11use crate::executors::{self, CachePrefiller, Executor, GroupByStreamingExec, SinkExecutor};12use crate::scan_predicate::functions::create_scan_predicate;1314pub type StreamingExecutorBuilder =15fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<Box<dyn Executor>>;1617fn partitionable_gb(18keys: &[ExprIR],19aggs: &[ExprIR],20input_schema: &Schema,21expr_arena: &Arena<AExpr>,22apply: &Option<PlanCallback<DataFrame, DataFrame>>,23) -> bool {24// checks:25// 1. complex expressions in the group_by itself are also not partitionable26// in this case anything more than col("foo")27// 2. a custom function cannot be partitioned28// 3. we don't bother with more than 2 keys, as the cardinality likely explodes29// by the combinations30if !keys.is_empty() && keys.len() < 3 && apply.is_none() {31// complex expressions in the group_by itself are also not partitionable32// in this case anything more than col("foo")33for key in keys {34if (expr_arena).iter(key.node()).count() > 135|| has_aexpr(key.node(), expr_arena, |ae| match ae {36AExpr::Literal(lv) => !lv.is_scalar(),37_ => false,38})39{40return false;41}42}4344can_pre_agg_exprs(aggs, expr_arena, input_schema)45} else {46false47}48}4950#[derive(Clone)]51struct ConversionState {52has_cache_child: bool,53has_cache_parent: bool,54}5556impl ConversionState {57fn new() -> PolarsResult<Self> {58Ok(ConversionState {59has_cache_child: false,60has_cache_parent: false,61})62}6364fn with_new_branch<K, F: FnOnce(&mut Self) -> K>(&mut self, func: F) -> K {65let mut new_state = self.clone();66new_state.has_cache_child = false;67let out = func(&mut new_state);68self.has_cache_child = new_state.has_cache_child;69out70}71}7273pub fn create_physical_plan(74root: Node,75lp_arena: &mut Arena<IR>,76expr_arena: &mut Arena<AExpr>,77build_streaming_executor: Option<StreamingExecutorBuilder>,78) -> PolarsResult<Box<dyn Executor>> {79let mut state = ConversionState::new()?;80let mut cache_nodes = Default::default();81let plan = create_physical_plan_impl(82root,83lp_arena,84expr_arena,85&mut state,86&mut cache_nodes,87build_streaming_executor,88)?;8990if cache_nodes.is_empty() {91Ok(plan)92} else {93Ok(Box::new(CachePrefiller {94caches: cache_nodes,95phys_plan: plan,96}))97}98}99100pub struct MultiplePhysicalPlans {101pub cache_prefiller: Option<Box<dyn Executor>>,102pub physical_plans: Vec<Box<dyn Executor>>,103}104pub fn create_multiple_physical_plans(105roots: &[Node],106lp_arena: &mut Arena<IR>,107expr_arena: &mut Arena<AExpr>,108build_streaming_executor: Option<StreamingExecutorBuilder>,109) -> PolarsResult<MultiplePhysicalPlans> {110let mut state = ConversionState::new()?;111let mut cache_nodes = Default::default();112let plans = state.with_new_branch(|new_state| {113roots114.iter()115.map(|&node| {116create_physical_plan_impl(117node,118lp_arena,119expr_arena,120new_state,121&mut cache_nodes,122build_streaming_executor,123)124})125.collect::<PolarsResult<Vec<_>>>()126})?;127128let cache_prefiller = (!cache_nodes.is_empty()).then(|| {129struct Empty;130impl Executor for Empty {131fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {132Ok(DataFrame::empty())133}134}135Box::new(CachePrefiller {136caches: cache_nodes,137phys_plan: Box::new(Empty),138}) as _139});140141Ok(MultiplePhysicalPlans {142cache_prefiller,143physical_plans: plans,144})145}146147#[cfg(feature = "python")]148#[allow(clippy::type_complexity)]149pub fn python_scan_predicate(150options: &mut PythonOptions,151expr_arena: &mut Arena<AExpr>,152state: &mut ExpressionConversionState,153) -> PolarsResult<(154Option<Arc<dyn polars_expr::prelude::PhysicalExpr>>,155Option<Vec<u8>>,156)> {157let mut predicate_serialized = None;158let predicate = if let PythonPredicate::Polars(e) = &options.predicate {159// Clone the expression so we can release the borrow on `options`160// before mutating `options.predicate` below.161let e = e.clone();162163// Convert to a pyarrow eval string.164if matches!(options.python_source, PythonScanSource::Pyarrow) {165use polars_core::config::verbose_print_sensitive;166use polars_plan::plans::MintermIter;167168// Split into AND-minterms and convert each independently.169let mut residual_predicate_nodes: Vec<Node> = vec![];170let parts: Vec<String> = MintermIter::new(e.node(), expr_arena)171.filter_map(|node| {172let result = polars_plan::plans::python::pyarrow::predicate_to_pa(173node,174expr_arena,175Default::default(),176);177if result.is_none() {178residual_predicate_nodes.push(node);179}180result181})182.collect();183184let predicate_pa = match parts.len() {1850 => None,1861 => Some(parts.into_iter().next().unwrap()),187_ => Some(format!("({})", parts.join(" & "))),188};189190let residual_predicate_expr_ir = if let Some(eval_str) = predicate_pa {191options.predicate = PythonPredicate::PyArrow(eval_str);192193residual_predicate_nodes194.into_iter()195.fold(None, |acc, node| {196Some(acc.map_or(node, |acc_node| {197expr_arena.add(AExpr::BinaryExpr {198left: acc_node,199op: Operator::And,200right: node,201})202}))203})204.map(|node| ExprIR::from_node(node, expr_arena))205} else {206Some(e.clone())207};208209verbose_print_sensitive(|| {210let predicate_pa_verbose_msg = match &options.predicate {211PythonPredicate::PyArrow(p) => p,212_ => "<conversion failed>",213};214215format!(216"python_scan_predicate: \217predicate node: {}, \218converted pyarrow predicate: {}, \219residual predicate: {:?}",220ExprIRDisplay::display_node(e.node(), expr_arena),221predicate_pa_verbose_msg,222residual_predicate_expr_ir223.as_ref()224.map(|e| ExprIRDisplay::display_node(e.node(), expr_arena)),225)226});227228residual_predicate_expr_ir229.map(|expr_ir| create_physical_expr(&expr_ir, expr_arena, &options.schema, state))230.transpose()?231}232// Convert to physical expression for the case the reader cannot consume the predicate.233else {234let dsl_expr = e.to_expr(expr_arena);235predicate_serialized = polars_plan::plans::python::predicate::serialize(&dsl_expr)?;236237Some(create_physical_expr(238&e,239expr_arena,240&options.schema,241state,242)?)243}244} else {245None246};247248Ok((predicate, predicate_serialized))249}250251#[recursive]252fn create_physical_plan_impl(253root: Node,254lp_arena: &mut Arena<IR>,255expr_arena: &mut Arena<AExpr>,256state: &mut ConversionState,257// Cache nodes in order of discovery258cache_nodes: &mut PlIndexMap<UniqueId, executors::CachePrefill>,259build_streaming_executor: Option<StreamingExecutorBuilder>,260) -> PolarsResult<Box<dyn Executor>> {261use IR::*;262263let get_streaming_executor_builder = || {264build_streaming_executor.expect(265"get_streaming_executor_builder() failed (hint: missing feature new-streaming?)",266)267};268269macro_rules! recurse {270($node:expr, $state: expr) => {271create_physical_plan_impl(272$node,273lp_arena,274expr_arena,275$state,276cache_nodes,277build_streaming_executor,278)279};280}281282let logical_plan = if state.has_cache_parent283|| matches!(284lp_arena.get(root),285IR::Scan { .. } // Needed for the streaming impl286| IR::Cache { .. } // Needed for plans branching from the same cache node287| IR::GroupBy { .. } // Needed for the streaming impl288| IR::Sink { // Needed for the streaming impl289payload:290SinkTypeIR::File(_) | SinkTypeIR::Partitioned { .. },291..292}293) {294lp_arena.get(root).clone()295} else {296lp_arena.take(root)297};298299match logical_plan {300#[cfg(feature = "python")]301PythonScan { mut options } => {302let mut expr_conv_state = ExpressionConversionState::new(true);303let (predicate, predicate_serialized) =304python_scan_predicate(&mut options, expr_arena, &mut expr_conv_state)?;305Ok(Box::new(executors::PythonScanExec {306options,307predicate,308predicate_serialized,309}))310},311Sink { input, payload } => match payload {312SinkTypeIR::Memory => Ok(Box::new(SinkExecutor {313input: recurse!(input, state)?,314name: PlSmallStr::from_static("mem"),315f: Box::new(move |df, _state| Ok(Some(df))),316})),317SinkTypeIR::Callback(CallbackSinkType {318function,319maintain_order: _,320chunk_size,321}) => {322let chunk_size = chunk_size.map_or(usize::MAX, Into::into);323324Ok(Box::new(SinkExecutor {325input: recurse!(input, state)?,326name: PlSmallStr::from_static("batches"),327f: Box::new(move |mut buffer, _state| {328while buffer.height() > 0 {329let df;330(df, buffer) = buffer.split_at(buffer.height().min(chunk_size) as i64);331let should_stop = function.call(df)?;332if should_stop {333break;334}335}336Ok(Some(DataFrame::empty()))337}),338}))339},340SinkTypeIR::File(_) | SinkTypeIR::Partitioned { .. } => {341get_streaming_executor_builder()(root, lp_arena, expr_arena)342},343},344SinkMultiple { .. } => {345polars_bail!(InvalidOperation: "lazy multisinks only supported on streaming engine")346},347Union { inputs, options } => {348let inputs = state.with_new_branch(|new_state| {349inputs350.into_iter()351.map(|node| recurse!(node, new_state))352.collect::<PolarsResult<Vec<_>>>()353});354let inputs = inputs?;355Ok(Box::new(executors::UnionExec { inputs, options }))356},357HConcat {358inputs, options, ..359} => {360let inputs = state.with_new_branch(|new_state| {361inputs362.into_iter()363.map(|node| recurse!(node, new_state))364.collect::<PolarsResult<Vec<_>>>()365});366367let inputs = inputs?;368369Ok(Box::new(executors::HConcatExec { inputs, options }))370},371Slice { input, offset, len } => {372let input = recurse!(input, state)?;373Ok(Box::new(executors::SliceExec { input, offset, len }))374},375Filter { input, predicate } => {376let streamable = is_elementwise_rec(predicate.node(), expr_arena);377let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();378let input = recurse!(input, state)?;379let mut state = ExpressionConversionState::new(true);380let predicate =381create_physical_expr(&predicate, expr_arena, &input_schema, &mut state)?;382Ok(Box::new(executors::FilterExec::new(383predicate,384input,385state.has_windows,386streamable,387)))388},389#[allow(unused_variables)]390Scan {391sources,392file_info,393hive_parts,394output_schema,395scan_type,396predicate,397predicate_file_skip_applied,398unified_scan_args,399} => {400let mut expr_conversion_state = ExpressionConversionState::new(true);401402let mut create_skip_batch_predicate = unified_scan_args.table_statistics.is_some();403#[cfg(feature = "parquet")]404{405if let FileScanIR::Parquet { options, .. } = scan_type.as_ref() {406create_skip_batch_predicate |= options.use_statistics;407}408}409410let predicate = predicate411.map(|predicate| {412create_scan_predicate(413&predicate,414expr_arena,415output_schema.as_ref().unwrap_or(&file_info.schema),416None, // hive_schema417&mut expr_conversion_state,418create_skip_batch_predicate,419false,420)421})422.transpose()?;423424match *scan_type {425FileScanIR::Anonymous { function, .. } => {426Ok(Box::new(executors::AnonymousScanExec {427function,428predicate,429unified_scan_args,430file_info,431output_schema,432predicate_has_windows: expr_conversion_state.has_windows,433}))434},435#[cfg_attr(436not(any(437feature = "parquet",438feature = "ipc",439feature = "csv",440feature = "json",441feature = "scan_lines"442)),443expect(unreachable_patterns)444)]445_ => get_streaming_executor_builder()(root, lp_arena, expr_arena),446}447},448449Select {450expr,451input,452schema: _schema,453options,454..455} => {456let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();457let input = recurse!(input, state)?;458let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len());459let phys_expr =460create_physical_expressions_from_irs(&expr, expr_arena, &input_schema, &mut state)?;461462let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec(e.node(), expr_arena))463// If all columns are literal we would get a 1 row per thread.464&& !phys_expr.iter().all(|p| {465p.is_literal()466});467468Ok(Box::new(executors::ProjectionExec {469input,470expr: phys_expr,471has_windows: state.has_windows,472input_schema,473#[cfg(test)]474schema: _schema,475options,476allow_vertical_parallelism,477}))478},479DataFrameScan {480df, output_schema, ..481} => Ok(Box::new(executors::DataFrameExec {482df,483projection: output_schema.map(|s| s.iter_names_cloned().collect()),484})),485Sort {486input,487by_column,488slice,489sort_options,490} => {491debug_assert!(!by_column.is_empty());492let input_schema = lp_arena.get(input).schema(lp_arena);493let by_column = create_physical_expressions_from_irs(494&by_column,495expr_arena,496input_schema.as_ref(),497&mut ExpressionConversionState::new(true),498)?;499let input = recurse!(input, state)?;500Ok(Box::new(executors::SortExec {501input,502by_column,503slice,504sort_options,505}))506},507Cache { input, id } => {508state.has_cache_parent = true;509state.has_cache_child = true;510511if let Some(cache) = cache_nodes.get_mut(&id) {512Ok(Box::new(cache.make_exec()))513} else {514let input = recurse!(input, state)?;515516let mut prefill = executors::CachePrefill::new_cache(input, id);517let exec = prefill.make_exec();518519cache_nodes.insert(id, prefill);520521Ok(Box::new(exec))522}523},524Distinct { input, options } => {525let input = recurse!(input, state)?;526Ok(Box::new(executors::UniqueExec { input, options }))527},528GroupBy {529input,530keys,531aggs,532apply,533schema: output_schema,534maintain_order,535options,536} => {537let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();538let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());539let phys_keys = create_physical_expressions_from_irs(540&keys,541expr_arena,542&input_schema,543&mut ExpressionConversionState::new(true),544)?;545let phys_aggs = create_physical_expressions_from_irs(546&aggs,547expr_arena,548&input_schema,549&mut ExpressionConversionState::new(true),550)?;551552let _slice = options.slice;553#[cfg(feature = "dynamic_group_by")]554if let Some(options) = options.dynamic {555let input = recurse!(input, state)?;556return Ok(Box::new(executors::GroupByDynamicExec {557input,558keys: phys_keys,559aggs: phys_aggs,560options,561input_schema,562output_schema,563slice: _slice,564apply,565}));566}567568#[cfg(feature = "dynamic_group_by")]569if let Some(options) = options.rolling {570let input = recurse!(input, state)?;571return Ok(Box::new(executors::GroupByRollingExec {572input,573keys: phys_keys,574aggs: phys_aggs,575options,576input_schema,577output_schema,578slice: _slice,579apply,580}));581}582583// We first check if we can partition the group_by on the latest moment.584let partitionable = partitionable_gb(&keys, &aggs, &input_schema, expr_arena, &apply);585if partitionable {586let from_partitioned_ds = lp_arena.iter(input).any(|(_, lp)| {587if let Union { options, .. } = lp {588options.from_partitioned_ds589} else {590false591}592});593let builder = get_streaming_executor_builder();594595let input = recurse!(input, state)?;596597let gb_root = if state.has_cache_parent {598lp_arena.add(lp_arena.get(root).clone())599} else {600root601};602603let executor = Box::new(GroupByStreamingExec::new(604input,605builder,606gb_root,607lp_arena,608expr_arena,609phys_keys,610phys_aggs,611maintain_order,612output_schema,613_slice,614from_partitioned_ds,615));616617Ok(executor)618} else {619let input = recurse!(input, state)?;620Ok(Box::new(executors::GroupByExec::new(621input,622phys_keys,623phys_aggs,624apply,625maintain_order,626input_schema,627output_schema,628options.slice,629)))630}631},632Join {633input_left,634input_right,635left_on,636right_on,637options,638schema,639..640} => {641let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();642let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();643644let (input_left, input_right) = state.with_new_branch(|new_state| {645(646recurse!(input_left, new_state),647recurse!(input_right, new_state),648)649});650let input_left = input_left?;651let input_right = input_right?;652653// Todo! remove the force option. It can deadlock.654let parallel = if options.force_parallel {655true656} else {657options.allow_parallel658};659660let left_on = create_physical_expressions_from_irs(661&left_on,662expr_arena,663&schema_left,664&mut ExpressionConversionState::new(true),665)?;666let right_on = create_physical_expressions_from_irs(667&right_on,668expr_arena,669&schema_right,670&mut ExpressionConversionState::new(true),671)?;672let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());673674// Convert the join options, to the physical join options. This requires the physical675// planner, so we do this last minute.676let join_type_options = options677.options678.map(|o| {679o.compile(|e| {680let phys_expr = create_physical_expr(681e,682expr_arena,683&schema,684&mut ExpressionConversionState::new(false),685)?;686687let execution_state = ExecutionState::default();688689Ok(Arc::new(move |df: DataFrame| {690let mask = phys_expr.evaluate(&df, &execution_state)?;691let mask = mask.as_materialized_series();692let mask = mask.bool()?;693df.filter_seq(mask)694}))695})696})697.transpose()?;698699Ok(Box::new(executors::JoinExec::new(700input_left,701input_right,702left_on,703right_on,704parallel,705options.args,706join_type_options,707)))708},709HStack {710input,711exprs,712schema: output_schema,713options,714} => {715let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();716let input = recurse!(input, state)?;717718let allow_vertical_parallelism = options.should_broadcast719&& exprs720.iter()721.all(|e| is_elementwise_rec(e.node(), expr_arena));722723let mut state =724ExpressionConversionState::new(POOL.current_num_threads() > exprs.len());725726let phys_exprs = create_physical_expressions_from_irs(727&exprs,728expr_arena,729&input_schema,730&mut state,731)?;732Ok(Box::new(executors::StackExec {733input,734has_windows: state.has_windows,735exprs: phys_exprs,736input_schema,737output_schema,738options,739allow_vertical_parallelism,740}))741},742MapFunction {743input, function, ..744} => {745let input = recurse!(input, state)?;746Ok(Box::new(executors::UdfExec { input, function }))747},748ExtContext {749input, contexts, ..750} => {751let input = recurse!(input, state)?;752let contexts = contexts753.into_iter()754.map(|node| recurse!(node, state))755.collect::<PolarsResult<_>>()?;756Ok(Box::new(executors::ExternalContext { input, contexts }))757},758SimpleProjection { input, columns } => {759let input = recurse!(input, state)?;760let exec = executors::ProjectionSimple { input, columns };761Ok(Box::new(exec))762},763#[cfg(feature = "merge_sorted")]764MergeSorted {765input_left,766input_right,767key,768} => {769let (input_left, input_right) = state.with_new_branch(|new_state| {770(771recurse!(input_left, new_state),772recurse!(input_right, new_state),773)774});775let input_left = input_left?;776let input_right = input_right?;777778let exec = executors::MergeSorted {779input_left,780input_right,781key,782};783Ok(Box::new(exec))784},785Invalid => unreachable!(),786}787}788789#[cfg(test)]790mod tests {791use super::*;792793#[test]794fn test_create_multiple_physical_plans_reused_cache() {795// Check that reusing the same cache node doesn't panic.796// CSE creates duplicate cache nodes with the same ID, but cloud reuses them.797798let mut ir = Arena::new();799800let schema = Schema::from_iter([(PlSmallStr::from_static("x"), DataType::Float32)]);801let scan = ir.add(IR::DataFrameScan {802df: Arc::new(DataFrame::empty_with_schema(&schema)),803schema: Arc::new(schema),804output_schema: None,805});806807let cache = ir.add(IR::Cache {808input: scan,809id: UniqueId::new(),810});811812let left_sink = ir.add(IR::Sink {813input: cache,814payload: SinkTypeIR::Memory,815});816let right_sink = ir.add(IR::Sink {817input: cache,818payload: SinkTypeIR::Memory,819});820821let _multiplan = create_multiple_physical_plans(822&[left_sink, right_sink],823&mut ir,824&mut Arena::new(),825None,826)827.unwrap();828}829}830831832