Path: blob/main/crates/polars-mem-engine/src/executors/group_by_dynamic.rs
6940 views
use super::*;12#[cfg_attr(not(feature = "dynamic_group_by"), allow(dead_code))]3pub(crate) struct GroupByDynamicExec {4pub(crate) input: Box<dyn Executor>,5// we will use this later6#[allow(dead_code)]7pub(crate) keys: Vec<Arc<dyn PhysicalExpr>>,8pub(crate) aggs: Vec<Arc<dyn PhysicalExpr>>,9#[cfg(feature = "dynamic_group_by")]10pub(crate) options: DynamicGroupOptions,11pub(crate) input_schema: SchemaRef,12pub(crate) slice: Option<(i64, usize)>,13pub(crate) apply: Option<PlanCallback<DataFrame, DataFrame>>,14}1516impl GroupByDynamicExec {17#[cfg(feature = "dynamic_group_by")]18fn execute_impl(19&mut self,20state: &ExecutionState,21mut df: DataFrame,22) -> PolarsResult<DataFrame> {23use crate::executors::group_by_rolling::sort_and_groups;2425df.as_single_chunk_par();2627let mut keys = self28.keys29.iter()30.map(|e| e.evaluate(&df, state))31.collect::<PolarsResult<Vec<_>>>()?;3233let group_by = if !self.keys.is_empty() {34Some(sort_and_groups(&mut df, &mut keys)?)35} else {36None37};3839let (mut time_key, bounds, groups) = df.group_by_dynamic(group_by, &self.options)?;40POOL.install(|| {41keys.iter_mut().for_each(|key| {42unsafe { *key = key.agg_first(&groups) };43})44});45keys.extend(bounds);4647if let Some(f) = &self.apply {48let gb = GroupBy::new(&df, vec![], groups, None);49let out = gb.apply(move |df| f.call(df))?;50return Ok(if let Some((offset, len)) = self.slice {51out.slice(offset, len)52} else {53out54});55}5657let mut groups = &groups;58#[allow(unused_assignments)]59// it is unused because we only use it to keep the lifetime of sliced_group valid60let mut sliced_groups = None;6162if let Some((offset, len)) = self.slice {63sliced_groups = Some(groups.slice(offset, len));64groups = sliced_groups.as_ref().unwrap();6566time_key = time_key.slice(offset, len);6768// todo! optimize this, we can prevent an agg_first aggregation upstream69// the ordering has changed due to the group_by70for key in keys.iter_mut() {71*key = key.slice(offset, len)72}73}7475let agg_columns = evaluate_aggs(&df, &self.aggs, groups, state)?;7677let mut columns = Vec::with_capacity(agg_columns.len() + 1 + keys.len());78columns.extend_from_slice(&keys);79columns.push(time_key);80columns.extend(agg_columns);8182DataFrame::new(columns)83}84}8586impl Executor for GroupByDynamicExec {87#[cfg(not(feature = "dynamic_group_by"))]88fn execute(&mut self, _state: &mut ExecutionState) -> PolarsResult<DataFrame> {89panic!("activate feature dynamic_group_by")90}9192#[cfg(feature = "dynamic_group_by")]93fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {94state.should_stop()?;95#[cfg(debug_assertions)]96{97if state.verbose() {98eprintln!("run GroupbyDynamicExec")99}100}101let df = self.input.execute(state)?;102103let profile_name = if state.has_node_timer() {104let by = self105.keys106.iter()107.map(|s| Ok(s.to_field(&self.input_schema)?.name))108.collect::<PolarsResult<Vec<_>>>()?;109let name = comma_delimited("group_by_dynamic".to_string(), &by);110Cow::Owned(name)111} else {112Cow::Borrowed("")113};114115if state.has_node_timer() {116let new_state = state.clone();117new_state.record(|| self.execute_impl(state, df), profile_name)118} else {119self.execute_impl(state, df)120}121}122}123124125