Path: blob/main/crates/polars-mem-engine/src/executors/group_by_rolling.rs
8427 views
use polars_utils::unique_column_name;12use super::*;34#[cfg_attr(not(feature = "dynamic_group_by"), allow(dead_code))]5pub(crate) struct GroupByRollingExec {6pub(crate) input: Box<dyn Executor>,7pub(crate) keys: Vec<Arc<dyn PhysicalExpr>>,8pub(crate) aggs: Vec<Arc<dyn PhysicalExpr>>,9#[cfg(feature = "dynamic_group_by")]10pub(crate) options: RollingGroupOptions,11pub(crate) input_schema: SchemaRef,12pub(crate) output_schema: SchemaRef,13pub(crate) slice: Option<(i64, usize)>,14pub(crate) apply: Option<PlanCallback<DataFrame, DataFrame>>,15}1617pub(super) fn sort_and_groups(18df: &mut DataFrame,19keys: &mut Vec<Column>,20) -> PolarsResult<Vec<[IdxSize; 2]>> {21let encoded = row_encode::encode_rows_vertical_par_unordered(keys)?;22let encoded = encoded.rechunk().into_owned();23let encoded = encoded.with_name(unique_column_name());24let idx = encoded.arg_sort(SortOptions {25maintain_order: true,26..Default::default()27});2829let encoded = unsafe {30df.push_column_unchecked(encoded.into_series().into());3132// If not sorted on keys, sort.33let idx_s = idx.clone().into_series();34if !idx_s.is_sorted(Default::default()).unwrap() {35let (df_ordered, keys_ordered) = POOL.join(36|| df.take_unchecked(&idx),37|| {38keys.iter()39.map(|c| c.take_unchecked(&idx))40.collect::<Vec<_>>()41},42);43*df = df_ordered;44*keys = keys_ordered;45}4647df.columns_mut().pop().unwrap()48};49let encoded = encoded.as_materialized_series();50let encoded = encoded.binary_offset().unwrap();51let encoded = encoded.with_sorted_flag(polars_core::series::IsSorted::Ascending);52let groups = encoded.group_tuples(true, false).unwrap();5354let GroupsType::Slice { groups, .. } = groups else {55// memory would explode56unreachable!();57};58Ok(groups)59}6061impl GroupByRollingExec {62#[cfg(feature = "dynamic_group_by")]63fn execute_impl(64&mut self,65state: &ExecutionState,66mut df: DataFrame,67) -> PolarsResult<DataFrame> {68df.rechunk_mut_par();6970let mut keys = self71.keys72.iter()73.map(|e| e.evaluate(&df, state))74.collect::<PolarsResult<Vec<_>>>()?;7576let group_by = if !self.keys.is_empty() {77Some(sort_and_groups(&mut df, &mut keys)?)78} else {79None80};8182let (mut time_key, groups) = df.rolling(group_by, &self.options)?;8384if let Some(f) = &self.apply {85let gb = GroupBy::new(&df, vec![], groups, None);86return gb.apply_sliced(self.slice, move |df| f.call(df), Some(&self.output_schema));87}8889let mut groups = &groups;90#[allow(unused_assignments)]91// it is unused because we only use it to keep the lifetime of sliced_group valid92let mut sliced_groups = None;9394if let Some((offset, len)) = self.slice {95sliced_groups = Some(groups.slice(offset, len));96groups = sliced_groups.as_ref().unwrap();9798time_key = time_key.slice(offset, len);99for k in &mut keys {100*k = k.slice(offset, len);101}102}103104let agg_columns = evaluate_aggs(&df, &self.aggs, groups, state)?;105106let mut columns = Vec::with_capacity(agg_columns.len() + 1 + keys.len());107columns.extend_from_slice(&keys);108columns.push(time_key);109columns.extend(agg_columns);110111DataFrame::new_infer_height(columns)112}113}114115impl Executor for GroupByRollingExec {116#[cfg(not(feature = "dynamic_group_by"))]117fn execute(&mut self, _state: &mut ExecutionState) -> PolarsResult<DataFrame> {118panic!("activate feature dynamic_group_by")119}120121#[cfg(feature = "dynamic_group_by")]122fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {123state.should_stop()?;124#[cfg(debug_assertions)]125{126if state.verbose() {127eprintln!("run GroupbyRollingExec")128}129}130let df = self.input.execute(state)?;131let profile_name = if state.has_node_timer() {132let by = self133.keys134.iter()135.map(|s| Ok(s.to_field(&self.input_schema)?.name))136.collect::<PolarsResult<Vec<_>>>()?;137let name = comma_delimited("group_by_rolling".to_string(), &by);138Cow::Owned(name)139} else {140Cow::Borrowed("")141};142143if state.has_node_timer() {144let new_state = state.clone();145new_state.record(|| self.execute_impl(state, df), profile_name)146} else {147self.execute_impl(state, df)148}149}150}151152153