Path: blob/main/crates/polars-plan/src/plans/optimizer/count_star.rs
8430 views
use std::env;12use polars_buffer::Buffer;3use polars_io::cloud::CloudOptions;4use polars_utils::pl_path::PlRefPath;56use super::*;78pub(super) struct CountStar;910impl CountStar {11pub(super) fn new() -> Self {12Self13}14}1516const ENV_VAR_NAME: &str = "POLARS_NO_FAST_FILE_COUNT";1718impl CountStar {19// Replace select count(*) from datasource with specialized map function.20pub(super) fn optimize_plan(21&mut self,22lp_arena: &mut Arena<IR>,23expr_arena: &mut Arena<AExpr>,24mut node: Node,25) -> PolarsResult<Option<IR>> {26// New-streaming always puts a sink on top.27if let IR::Sink { input, .. } = lp_arena.get(node) {28node = *input;29}3031match env::var(ENV_VAR_NAME).as_deref() {32// Setting the value to 1 disables this optimization pass.33Ok("1") => return Ok(None),34// If the options is set to 0 or not set we allow the optimization.35Ok("0") | Err(_) => (),36Ok(v) => panic!("{ENV_VAR_NAME} must be one of ('0', '1'), got: {v}"),37}3839Ok(40visit_logical_plan_for_scan_paths(node, lp_arena, expr_arena, false).map(41|count_star_expr| {42// MapFunction needs a leaf node, hence we create a dummy placeholder node43let placeholder = IR::DataFrameScan {44df: Arc::new(Default::default()),45schema: Arc::new(Default::default()),46output_schema: None,47};48let placeholder_node = lp_arena.add(placeholder);4950let alp = IR::MapFunction {51input: placeholder_node,52function: FunctionIR::FastCount {53sources: count_star_expr.sources,54scan_type: count_star_expr.scan_type,55alias: count_star_expr.alias,56},57};5859lp_arena.replace(count_star_expr.node, alp.clone());60alp61},62),63)64}65}6667struct CountStarExpr {68// Top node of the projection to replace69node: Node,70// Paths to the input files71sources: ScanSources,72cloud_options: Option<CloudOptions>,73// File Type74scan_type: Box<FileScanIR>,75// Column Alias76alias: Option<PlSmallStr>,77}7879// Visit the logical plan and return CountStarExpr with the expr information gathered80// Return None if query is not a simple COUNT(*) FROM SOURCE81fn visit_logical_plan_for_scan_paths(82node: Node,83lp_arena: &Arena<IR>,84expr_arena: &Arena<AExpr>,85inside_union: bool, // Inside union's we do not check for COUNT(*) expression86) -> Option<CountStarExpr> {87match lp_arena.get(node) {88IR::Union { inputs, .. } => {89enum MutableSources {90Paths(Vec<PlRefPath>),91Buffers(Vec<Buffer<u8>>),92}9394let mut scan_type: Option<Box<FileScanIR>> = None;95let mut cloud_options = None;96let mut sources = None;9798for input in inputs {99match visit_logical_plan_for_scan_paths(*input, lp_arena, expr_arena, true) {100Some(expr) => {101match (expr.sources, &mut sources) {102(103ScanSources::Paths(paths),104Some(MutableSources::Paths(mutable_paths)),105) => mutable_paths.extend_from_slice(&paths[..]),106(ScanSources::Paths(paths), None) => {107sources = Some(MutableSources::Paths(paths.to_vec()))108},109(110ScanSources::Buffers(buffers),111Some(MutableSources::Buffers(mutable_buffers)),112) => mutable_buffers.extend_from_slice(&buffers[..]),113(ScanSources::Buffers(buffers), None) => {114sources = Some(MutableSources::Buffers(buffers.to_vec()))115},116_ => return None,117}118119// Take the first Some(_) cloud option120// TODO: Should check the cloud types are the same.121cloud_options = cloud_options.or(expr.cloud_options);122123match &scan_type {124None => scan_type = Some(expr.scan_type),125Some(scan_type) => {126// All scans must be of the same type (e.g. csv / parquet)127if std::mem::discriminant(&**scan_type)128!= std::mem::discriminant(&*expr.scan_type)129{130return None;131}132},133};134},135None => return None,136}137}138Some(CountStarExpr {139sources: match sources {140Some(MutableSources::Paths(paths)) => ScanSources::Paths(paths.into()),141Some(MutableSources::Buffers(buffers)) => ScanSources::Buffers(buffers.into()),142None => ScanSources::default(),143},144scan_type: scan_type.unwrap(),145cloud_options,146node,147alias: None,148})149},150IR::Scan {151scan_type,152sources,153unified_scan_args,154..155} => {156// New-streaming is generally on par for all except CSV (see https://github.com/pola-rs/polars/pull/22363).157// In the future we can potentially remove the dedicated count codepaths.158159let use_fast_file_count = match scan_type.as_ref() {160#[cfg(feature = "csv")]161FileScanIR::Csv { .. } => true,162_ => false,163};164165use_fast_file_count.then(|| CountStarExpr {166sources: sources.clone(),167scan_type: scan_type.clone(),168cloud_options: unified_scan_args.cloud_options.clone(),169node,170alias: None,171})172},173// A union can insert a simple projection to ensure all projections align.174// We can ignore that if we are inside a count star.175IR::SimpleProjection { input, .. } if inside_union => {176visit_logical_plan_for_scan_paths(*input, lp_arena, expr_arena, false)177},178IR::Select { input, expr, .. } => {179if expr.len() == 1 {180let (valid, alias) = is_valid_count_expr(&expr[0], expr_arena);181if valid || inside_union {182return visit_logical_plan_for_scan_paths(*input, lp_arena, expr_arena, false)183.map(|mut expr| {184expr.alias = alias;185expr.node = node;186expr187});188}189}190None191},192_ => None,193}194}195196fn is_valid_count_expr(e: &ExprIR, expr_arena: &Arena<AExpr>) -> (bool, Option<PlSmallStr>) {197match expr_arena.get(e.node()) {198AExpr::Len => (true, e.get_alias().cloned()),199_ => (false, None),200}201}202203204