Path: blob/main/crates/polars-mem-engine/src/executors/group_by_dynamic.rs
8422 views
use super::*;12#[cfg_attr(not(feature = "dynamic_group_by"), allow(dead_code))]3pub(crate) struct GroupByDynamicExec {4pub(crate) input: Box<dyn Executor>,5// we will use this later6#[allow(dead_code)]7pub(crate) keys: Vec<Arc<dyn PhysicalExpr>>,8pub(crate) aggs: Vec<Arc<dyn PhysicalExpr>>,9#[cfg(feature = "dynamic_group_by")]10pub(crate) options: DynamicGroupOptions,11pub(crate) input_schema: SchemaRef,12pub(crate) output_schema: SchemaRef,13pub(crate) slice: Option<(i64, usize)>,14pub(crate) apply: Option<PlanCallback<DataFrame, DataFrame>>,15}1617impl GroupByDynamicExec {18#[cfg(feature = "dynamic_group_by")]19fn execute_impl(20&mut self,21state: &ExecutionState,22mut df: DataFrame,23) -> PolarsResult<DataFrame> {24use crate::executors::group_by_rolling::sort_and_groups;2526df.rechunk_mut_par();2728let mut keys = self29.keys30.iter()31.map(|e| e.evaluate(&df, state))32.collect::<PolarsResult<Vec<_>>>()?;3334let group_by = if !self.keys.is_empty() {35Some(sort_and_groups(&mut df, &mut keys)?)36} else {37None38};3940let (mut time_key, bounds, groups) = df.group_by_dynamic(group_by, &self.options)?;41POOL.install(|| {42keys.iter_mut().for_each(|key| {43unsafe { *key = key.agg_first(&groups) };44})45});46keys.extend(bounds);4748if let Some(f) = &self.apply {49let gb = GroupBy::new(&df, vec![], groups, None);50return gb.apply_sliced(self.slice, move |df| f.call(df), Some(&self.output_schema));51}5253let mut groups = &groups;54#[allow(unused_assignments)]55// it is unused because we only use it to keep the lifetime of sliced_group valid56let mut sliced_groups = None;5758if let Some((offset, len)) = self.slice {59sliced_groups = Some(groups.slice(offset, len));60groups = sliced_groups.as_ref().unwrap();6162time_key = time_key.slice(offset, len);6364// todo! optimize this, we can prevent an agg_first aggregation upstream65// the ordering has changed due to the group_by66for key in keys.iter_mut() {67*key = key.slice(offset, len)68}69}7071let agg_columns = evaluate_aggs(&df, &self.aggs, groups, state)?;7273let mut columns = Vec::with_capacity(agg_columns.len() + 1 + keys.len());74columns.extend_from_slice(&keys);75columns.push(time_key);76columns.extend(agg_columns);7778DataFrame::new_infer_height(columns)79}80}8182impl Executor for GroupByDynamicExec {83#[cfg(not(feature = "dynamic_group_by"))]84fn execute(&mut self, _state: &mut ExecutionState) -> PolarsResult<DataFrame> {85panic!("activate feature dynamic_group_by")86}8788#[cfg(feature = "dynamic_group_by")]89fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {90state.should_stop()?;91#[cfg(debug_assertions)]92{93if state.verbose() {94eprintln!("run GroupbyDynamicExec")95}96}97let df = self.input.execute(state)?;9899let profile_name = if state.has_node_timer() {100let by = self101.keys102.iter()103.map(|s| Ok(s.to_field(&self.input_schema)?.name))104.collect::<PolarsResult<Vec<_>>>()?;105let name = comma_delimited("group_by_dynamic".to_string(), &by);106Cow::Owned(name)107} else {108Cow::Borrowed("")109};110111if state.has_node_timer() {112let new_state = state.clone();113new_state.record(|| self.execute_impl(state, df), profile_name)114} else {115self.execute_impl(state, df)116}117}118}119120121