Path: blob/main/crates/polars-mem-engine/src/executors/projection_utils.rs
6940 views
use polars_plan::constants::CSE_REPLACED;1use polars_utils::itertools::Itertools;23use super::*;45pub(super) fn profile_name(6s: &dyn PhysicalExpr,7input_schema: &Schema,8) -> PolarsResult<PlSmallStr> {9match s.to_field(input_schema) {10Err(e) => Err(e),11Ok(fld) => Ok(fld.name),12}13}1415type IdAndExpression = (u32, Arc<dyn PhysicalExpr>);1617#[cfg(feature = "dynamic_group_by")]18fn rolling_evaluate(19df: &DataFrame,20state: &ExecutionState,21rolling: PlHashMap<&RollingGroupOptions, Vec<IdAndExpression>>,22) -> PolarsResult<Vec<Vec<(u32, Column)>>> {23POOL.install(|| {24rolling25.par_iter()26.map(|(options, partition)| {27// clear the cache for every partitioned group28let state = state.split();2930let (_time_key, groups) = df.rolling(None, options)?;3132let groups_key = format!("{options:?}");33// Set the groups so all expressions in partition can use it.34// Create a separate scope, so the lock is dropped, otherwise we deadlock when the35// rolling expression try to get read access.36state.window_cache.insert_groups(groups_key, groups);37partition38.par_iter()39.map(|(idx, expr)| expr.evaluate(df, &state).map(|s| (*idx, s)))40.collect::<PolarsResult<Vec<_>>>()41})42.collect()43})44}4546fn window_evaluate(47df: &DataFrame,48state: &ExecutionState,49window: PlHashMap<String, Vec<IdAndExpression>>,50) -> PolarsResult<Vec<Vec<(u32, Column)>>> {51if window.is_empty() {52return Ok(vec![]);53}54let n_threads = POOL.current_num_threads();5556let max_hor = window.values().map(|v| v.len()).max().unwrap_or(0);57let vert = window.len();5859// We don't want to cache and parallel horizontally and vertically as that keeps many cache60// states alive.61let (cache, par_vertical, par_horizontal) = if max_hor >= n_threads || max_hor >= vert {62(true, false, true)63} else {64(false, true, true)65};6667let apply = |partition: &[(u32, Arc<dyn PhysicalExpr>)]| {68// clear the cache for every partitioned group69let mut state = state.split();70// inform the expression it has window functions.71state.insert_has_window_function_flag();7273// caching more than one window expression is a complicated topic for another day74// see issue #252375let cache = cache76&& partition.len() > 177&& partition.iter().all(|(_, e)| {78e.as_expression()79.unwrap()80.into_iter()81.filter(|e| matches!(e, Expr::Window { .. }))82.count()83== 184});85let mut first_result = None;86// First run 1 to fill the cache. Condvars and such don't work as87// rayon threads should not be blocked.88if cache {89let first = &partition[0];90let c = first.1.evaluate(df, &state)?;91first_result = Some((first.0, c));92state.insert_cache_window_flag();93} else {94state.remove_cache_window_flag();95}9697let apply =98|index: &u32, e: &Arc<dyn PhysicalExpr>| e.evaluate(df, &state).map(|c| (*index, c));99100let slice = &partition[first_result.is_some() as usize..];101let mut results = if par_horizontal {102slice103.par_iter()104.map(|(index, e)| apply(index, e))105.collect::<PolarsResult<Vec<_>>>()?106} else {107slice108.iter()109.map(|(index, e)| apply(index, e))110.collect::<PolarsResult<Vec<_>>>()?111};112113if let Some(item) = first_result {114results.push(item)115}116117Ok(results)118};119120if par_vertical {121POOL.install(|| window.par_iter().map(|t| apply(t.1)).collect())122} else {123window.iter().map(|t| apply(t.1)).collect()124}125}126127fn execute_projection_cached_window_fns(128df: &DataFrame,129exprs: &[Arc<dyn PhysicalExpr>],130state: &ExecutionState,131) -> PolarsResult<Vec<Column>> {132// We partition by normal expression and window expression133// - the normal expressions can run in parallel134// - the window expression take more memory and often use the same group_by keys and join tuples135// so they are cached and run sequential136137// the partitioning messes with column order, so we also store the idx138// and use those to restore the original projection order139#[allow(clippy::type_complexity)]140// String: partition_name,141// u32: index,142let mut windows: PlHashMap<String, Vec<IdAndExpression>> = PlHashMap::default();143#[cfg(feature = "dynamic_group_by")]144let mut rolling: PlHashMap<&RollingGroupOptions, Vec<IdAndExpression>> = PlHashMap::default();145let mut other = Vec::with_capacity(exprs.len());146147// first we partition the window function by the values they group over.148// the group_by values should be cached149exprs.iter().enumerate_u32().for_each(|(index, phys)| {150let mut is_window = false;151if let Some(e) = phys.as_expression() {152for e in e.into_iter() {153if let Expr::Window {154partition_by,155options,156order_by,157..158} = e159{160let entry = match options {161WindowType::Over(g) => {162let g: &str = g.into();163let mut key = format!("{:?}_{}", partition_by.as_slice(), g);164if let Some((e, k)) = order_by {165polars_expr::prelude::window_function_format_order_by(166&mut key,167e.as_ref(),168k,169)170}171windows.entry(key).or_insert_with(Vec::new)172},173#[cfg(feature = "dynamic_group_by")]174WindowType::Rolling(options) => {175rolling.entry(options).or_insert_with(Vec::new)176},177};178entry.push((index, phys.clone()));179is_window = true;180break;181}182}183} else {184// Window physical expressions always have the `Expr`.185is_window = false;186}187if !is_window {188other.push((index, phys.as_ref()))189}190});191192let mut selected_columns = POOL.install(|| {193other194.par_iter()195.map(|(idx, expr)| expr.evaluate(df, state).map(|s| (*idx, s)))196.collect::<PolarsResult<Vec<_>>>()197})?;198199// Run partitioned rolling expressions.200// Per partition we run in parallel. We compute the groups before and store them once per partition.201// The rolling expression knows how to fetch the groups.202#[cfg(feature = "dynamic_group_by")]203{204let (a, b) = POOL.join(205|| rolling_evaluate(df, state, rolling),206|| window_evaluate(df, state, windows),207);208209let partitions = a?;210for part in partitions {211selected_columns.extend_from_slice(&part)212}213let partitions = b?;214for part in partitions {215selected_columns.extend_from_slice(&part)216}217}218#[cfg(not(feature = "dynamic_group_by"))]219{220let partitions = window_evaluate(df, state, windows)?;221for part in partitions {222selected_columns.extend_from_slice(&part)223}224}225226selected_columns.sort_unstable_by_key(|tpl| tpl.0);227let selected_columns = selected_columns.into_iter().map(|tpl| tpl.1).collect();228Ok(selected_columns)229}230231fn run_exprs_par(232df: &DataFrame,233exprs: &[Arc<dyn PhysicalExpr>],234state: &ExecutionState,235) -> PolarsResult<Vec<Column>> {236POOL.install(|| {237exprs238.par_iter()239.map(|expr| expr.evaluate(df, state))240.collect()241})242}243244fn run_exprs_seq(245df: &DataFrame,246exprs: &[Arc<dyn PhysicalExpr>],247state: &ExecutionState,248) -> PolarsResult<Vec<Column>> {249exprs.iter().map(|expr| expr.evaluate(df, state)).collect()250}251252pub(super) fn evaluate_physical_expressions(253df: &mut DataFrame,254exprs: &[Arc<dyn PhysicalExpr>],255state: &ExecutionState,256has_windows: bool,257run_parallel: bool,258) -> PolarsResult<Vec<Column>> {259let expr_runner = if has_windows {260execute_projection_cached_window_fns261} else if run_parallel && exprs.len() > 1 {262run_exprs_par263} else {264run_exprs_seq265};266267let selected_columns = expr_runner(df, exprs, state)?;268269if has_windows {270state.clear_window_expr_cache();271}272273Ok(selected_columns)274}275276pub(super) fn check_expand_literals(277df: &DataFrame,278phys_expr: &[Arc<dyn PhysicalExpr>],279mut selected_columns: Vec<Column>,280zero_length: bool,281options: ProjectionOptions,282) -> PolarsResult<DataFrame> {283let Some(first_len) = selected_columns.first().map(|s| s.len()) else {284return Ok(DataFrame::empty());285};286let duplicate_check = options.duplicate_check;287let should_broadcast = options.should_broadcast;288289// When we have CSE we cannot verify scalars yet.290let verify_scalar = if !df.get_columns().is_empty() {291!df.get_columns()[df.width() - 1]292.name()293.starts_with(CSE_REPLACED)294} else {295true296};297298let mut df_height = 0;299let mut has_empty = false;300let mut all_equal_len = true;301{302let mut names = PlHashSet::with_capacity(selected_columns.len());303for s in &selected_columns {304let len = s.len();305has_empty |= len == 0;306df_height = std::cmp::max(df_height, len);307if len != first_len {308all_equal_len = false;309}310let name = s.name();311312if duplicate_check && !names.insert(name) {313let msg = format!(314"the name '{name}' is duplicate\n\n\315It's possible that multiple expressions are returning the same default column \316name. If this is the case, try renaming the columns with \317`.alias(\"new_name\")` to avoid duplicate column names."318);319return Err(PolarsError::Duplicate(msg.into()));320}321}322}323324// If all series are the same length it is ok. If not we can broadcast Series of length one.325if !all_equal_len && should_broadcast {326selected_columns = selected_columns327.into_iter()328.zip(phys_expr)329.map(|(series, phys)| {330Ok(match series.len() {3310 if df_height == 1 => series,3321 => {333if !has_empty && df_height == 1 {334series335} else {336if has_empty {337polars_ensure!(df_height == 1,338ShapeMismatch: "Series length {} doesn't match the DataFrame height of {}",339series.len(), df_height340);341342}343344if verify_scalar && !phys.is_scalar() && std::env::var("POLARS_ALLOW_NON_SCALAR_EXP").as_deref() != Ok("1") {345let identifier = match phys.as_expression() {346Some(e) => format!("expression: {e}"),347None => "this Series".to_string(),348};349polars_bail!(ShapeMismatch: "Series {}, length {} doesn't match the DataFrame height of {}\n\n\350If you want {} to be broadcasted, ensure it is a scalar (for instance by adding '.first()').",351series.name(), series.len(), df_height *(!has_empty as usize), identifier352);353}354series.new_from_index(0, df_height * (!has_empty as usize) )355}356},357len if len == df_height => {358series359},360_ => {361polars_bail!(362ShapeMismatch: "Series length {} doesn't match the DataFrame height of {}",363series.len(), df_height364)365}366})367})368.collect::<PolarsResult<_>>()?369}370371// @scalar-opt372let selected_columns = selected_columns.into_iter().collect::<Vec<_>>();373374let df = unsafe { DataFrame::new_no_checks_height_from_first(selected_columns) };375376// a literal could be projected to a zero length dataframe.377// This prevents a panic.378let df = if zero_length {379let min = df.get_columns().iter().map(|s| s.len()).min();380if min.is_some() { df.head(min) } else { df }381} else {382df383};384Ok(df)385}386387388