Path: blob/main/crates/polars-mem-engine/src/executors/projection_utils.rs
8420 views
use polars_plan::constants::CSE_REPLACED;1use polars_utils::itertools::Itertools;23use super::*;45pub(super) fn profile_name(6s: &dyn PhysicalExpr,7input_schema: &Schema,8) -> PolarsResult<PlSmallStr> {9match s.to_field(input_schema) {10Err(e) => Err(e),11Ok(fld) => Ok(fld.name),12}13}1415type IdAndExpression = (u32, Arc<dyn PhysicalExpr>);1617#[cfg(feature = "dynamic_group_by")]18fn rolling_evaluate(19df: &DataFrame,20state: &ExecutionState,21rolling: PlHashMap<RollingGroupOptions, Vec<IdAndExpression>>,22) -> PolarsResult<Vec<Vec<(u32, Column)>>> {23POOL.install(|| {24rolling25.par_iter()26.map(|(options, partition)| {27// clear the cache for every partitioned group28let state = state.split();2930let (_time_key, groups) = df.rolling(None, options)?;3132let groups_key = format!("{options:?}");33// Set the groups so all expressions in partition can use it.34// Create a separate scope, so the lock is dropped, otherwise we deadlock when the35// rolling expression try to get read access.36state.window_cache.insert_groups(groups_key, groups);37partition38.par_iter()39.map(|(idx, expr)| expr.evaluate(df, &state).map(|s| (*idx, s)))40.collect::<PolarsResult<Vec<_>>>()41})42.collect()43})44}4546fn window_evaluate(47df: &DataFrame,48state: &ExecutionState,49window: PlHashMap<String, Vec<IdAndExpression>>,50) -> PolarsResult<Vec<Vec<(u32, Column)>>> {51if window.is_empty() {52return Ok(vec![]);53}54let n_threads = POOL.current_num_threads();5556let max_hor = window.values().map(|v| v.len()).max().unwrap_or(0);57let vert = window.len();5859// We don't want to cache and parallel horizontally and vertically as that keeps many cache60// states alive.61let (cache, par_vertical, par_horizontal) = if max_hor >= n_threads || max_hor >= vert {62(true, false, true)63} else {64(false, true, true)65};6667let apply = |partition: &[(u32, Arc<dyn PhysicalExpr>)]| {68// clear the cache for every partitioned group69let mut state = state.split();70// inform the expression it has window functions.71state.insert_has_window_function_flag();7273// caching more than one window expression is a complicated topic for another day74// see issue #252375let cache = cache76&& partition.len() > 177&& partition.iter().all(|(_, e)| {78e.as_expression()79.unwrap()80.into_iter()81.filter(|e| {82#[cfg(feature = "dynamic_group_by")]83if matches!(e, Expr::Rolling { .. }) {84return true;85}86matches!(e, Expr::Over { .. })87})88.count()89== 190});91let mut first_result = None;92// First run 1 to fill the cache. Condvars and such don't work as93// rayon threads should not be blocked.94if cache {95let first = &partition[0];96let c = first.1.evaluate(df, &state)?;97first_result = Some((first.0, c));98state.insert_cache_window_flag();99} else {100state.remove_cache_window_flag();101}102103let apply =104|index: &u32, e: &Arc<dyn PhysicalExpr>| e.evaluate(df, &state).map(|c| (*index, c));105106let slice = &partition[first_result.is_some() as usize..];107let mut results = if par_horizontal {108slice109.par_iter()110.map(|(index, e)| apply(index, e))111.collect::<PolarsResult<Vec<_>>>()?112} else {113slice114.iter()115.map(|(index, e)| apply(index, e))116.collect::<PolarsResult<Vec<_>>>()?117};118119if let Some(item) = first_result {120results.push(item)121}122123Ok(results)124};125126if par_vertical {127POOL.install(|| window.par_iter().map(|t| apply(t.1)).collect())128} else {129window.iter().map(|t| apply(t.1)).collect()130}131}132133fn execute_projection_cached_window_fns(134df: &DataFrame,135exprs: &[Arc<dyn PhysicalExpr>],136state: &ExecutionState,137) -> PolarsResult<Vec<Column>> {138// We partition by normal expression and window expression139// - the normal expressions can run in parallel140// - the window expression take more memory and often use the same group_by keys and join tuples141// so they are cached and run sequential142143// the partitioning messes with column order, so we also store the idx144// and use those to restore the original projection order145#[allow(clippy::type_complexity)]146// String: partition_name,147// u32: index,148let mut windows: PlHashMap<String, Vec<IdAndExpression>> = PlHashMap::default();149#[cfg(feature = "dynamic_group_by")]150let mut rolling: PlHashMap<RollingGroupOptions, Vec<IdAndExpression>> = PlHashMap::default();151let mut other = Vec::with_capacity(exprs.len());152153// first we partition the window function by the values they group over.154// the group_by values should be cached155exprs.iter().enumerate_u32().for_each(|(index, phys)| {156let mut is_window = false;157if let Some(e) = phys.as_expression() {158for e in e.into_iter() {159match e {160#[cfg(feature = "dynamic_group_by")]161Expr::Rolling {162function: _,163index_column,164period,165offset,166closed_window,167} => {168if let Expr::Column(index_column) = index_column.as_ref() {169let options = RollingGroupOptions {170index_column: index_column.clone(),171period: *period,172offset: *offset,173closed_window: *closed_window,174};175let entry = rolling.entry(options).or_default();176entry.push((index, phys.clone()));177is_window = true;178break;179}180},181Expr::Over {182function: _,183partition_by,184order_by,185mapping,186} => {187let mapping: &str = mapping.into();188let mut key = format!("{:?}_{mapping}", partition_by.as_slice());189if let Some((e, k)) = order_by {190polars_expr::prelude::window_function_format_order_by(191&mut key,192e.as_ref(),193k,194)195}196let entry = windows.entry(key).or_insert_with(Vec::new);197entry.push((index, phys.clone()));198is_window = true;199break;200},201_ => {},202}203}204} else {205// Window physical expressions always have the `Expr`.206is_window = false;207}208if !is_window {209other.push((index, phys.as_ref()))210}211});212213let mut selected_columns = POOL.install(|| {214other215.par_iter()216.map(|(idx, expr)| expr.evaluate(df, state).map(|s| (*idx, s)))217.collect::<PolarsResult<Vec<_>>>()218})?;219220// Run partitioned rolling expressions.221// Per partition we run in parallel. We compute the groups before and store them once per partition.222// The rolling expression knows how to fetch the groups.223#[cfg(feature = "dynamic_group_by")]224{225let (a, b) = POOL.join(226|| rolling_evaluate(df, state, rolling),227|| window_evaluate(df, state, windows),228);229230let partitions = a?;231for part in partitions {232selected_columns.extend_from_slice(&part)233}234let partitions = b?;235for part in partitions {236selected_columns.extend_from_slice(&part)237}238}239#[cfg(not(feature = "dynamic_group_by"))]240{241let partitions = window_evaluate(df, state, windows)?;242for part in partitions {243selected_columns.extend_from_slice(&part)244}245}246247selected_columns.sort_unstable_by_key(|tpl| tpl.0);248let selected_columns = selected_columns.into_iter().map(|tpl| tpl.1).collect();249Ok(selected_columns)250}251252fn run_exprs_par(253df: &DataFrame,254exprs: &[Arc<dyn PhysicalExpr>],255state: &ExecutionState,256) -> PolarsResult<Vec<Column>> {257POOL.install(|| {258exprs259.par_iter()260.map(|expr| expr.evaluate(df, state))261.collect()262})263}264265fn run_exprs_seq(266df: &DataFrame,267exprs: &[Arc<dyn PhysicalExpr>],268state: &ExecutionState,269) -> PolarsResult<Vec<Column>> {270exprs.iter().map(|expr| expr.evaluate(df, state)).collect()271}272273pub(super) fn evaluate_physical_expressions(274df: &mut DataFrame,275exprs: &[Arc<dyn PhysicalExpr>],276state: &ExecutionState,277has_windows: bool,278run_parallel: bool,279) -> PolarsResult<Vec<Column>> {280let expr_runner = if has_windows {281execute_projection_cached_window_fns282} else if run_parallel && exprs.len() > 1 {283run_exprs_par284} else {285run_exprs_seq286};287288let selected_columns = expr_runner(df, exprs, state)?;289290if has_windows {291state.clear_window_expr_cache();292}293294Ok(selected_columns)295}296297pub(super) fn check_expand_literals(298df: &DataFrame,299phys_expr: &[Arc<dyn PhysicalExpr>],300mut selected_columns: Vec<Column>,301is_empty: bool,302options: ProjectionOptions,303) -> PolarsResult<DataFrame> {304let Some(first_len) = selected_columns.first().map(|s| s.len()) else {305return Ok(DataFrame::empty());306};307let duplicate_check = options.duplicate_check;308let should_broadcast = options.should_broadcast;309310// When we have CSE we cannot verify scalars yet.311let verify_scalar = if !df.columns().is_empty() {312!df.columns()[df.width() - 1]313.name()314.starts_with(CSE_REPLACED)315} else {316true317};318319let mut df_height = 0;320let mut has_empty = false;321let mut all_equal_len = true;322{323let mut names = PlHashSet::with_capacity(selected_columns.len());324for s in &selected_columns {325let len = s.len();326has_empty |= len == 0;327df_height = std::cmp::max(df_height, len);328if len != first_len {329all_equal_len = false;330}331let name = s.name();332333if duplicate_check && !names.insert(name) {334let msg = format!(335"the name '{name}' is duplicate\n\n\336It's possible that multiple expressions are returning the same default column \337name. If this is the case, try renaming the columns with \338`.alias(\"new_name\")` to avoid duplicate column names."339);340return Err(PolarsError::Duplicate(msg.into()));341}342}343}344345// If all series are the same length it is ok. If not we can broadcast Series of length one.346if !all_equal_len && should_broadcast {347selected_columns = selected_columns348.into_iter()349.zip(phys_expr)350.map(|(series, phys)| {351Ok(match series.len() {3520 if df_height == 1 => series,3531 => {354if !has_empty && df_height == 1 {355series356} else {357if has_empty {358polars_ensure!(df_height == 1,359ShapeMismatch: "Series length {} doesn't match the DataFrame height of {}",360series.len(), df_height361);362363}364365if verify_scalar && !phys.is_scalar() && std::env::var("POLARS_ALLOW_NON_SCALAR_EXP").as_deref() != Ok("1") {366let identifier = match phys.as_expression() {367Some(e) => format!("expression: {e}"),368None => "this Series".to_string(),369};370polars_bail!(ShapeMismatch: "Series {}, length {} doesn't match the DataFrame height of {}\n\n\371If you want {} to be broadcasted, ensure it is a scalar (for instance by adding '.first()').",372series.name(), series.len(), df_height *(!has_empty as usize), identifier373);374}375series.new_from_index(0, df_height * (!has_empty as usize) )376}377},378len if len == df_height => {379series380},381_ => {382polars_bail!(383ShapeMismatch: "Series length {} doesn't match the DataFrame height of {}",384series.len(), df_height385)386}387})388})389.collect::<PolarsResult<_>>()?390}391392// @scalar-opt393let selected_columns = selected_columns.into_iter().collect::<Vec<_>>();394395let df = unsafe { DataFrame::new_unchecked_infer_height(selected_columns) };396397// a literal could be projected to a zero length dataframe.398// This prevents a panic.399let df = if is_empty {400let min = df.columns().iter().map(|s| s.len()).min();401if min.is_some() { df.head(min) } else { df }402} else {403df404};405Ok(df)406}407408409