Path: blob/main/crates/polars-mem-engine/src/executors/group_by_rolling.rs
6940 views
use polars_utils::unique_column_name;12use super::*;34#[cfg_attr(not(feature = "dynamic_group_by"), allow(dead_code))]5pub(crate) struct GroupByRollingExec {6pub(crate) input: Box<dyn Executor>,7pub(crate) keys: Vec<Arc<dyn PhysicalExpr>>,8pub(crate) aggs: Vec<Arc<dyn PhysicalExpr>>,9#[cfg(feature = "dynamic_group_by")]10pub(crate) options: RollingGroupOptions,11pub(crate) input_schema: SchemaRef,12pub(crate) slice: Option<(i64, usize)>,13pub(crate) apply: Option<PlanCallback<DataFrame, DataFrame>>,14}1516pub(super) fn sort_and_groups(17df: &mut DataFrame,18keys: &mut Vec<Column>,19) -> PolarsResult<Vec<[IdxSize; 2]>> {20let encoded = row_encode::encode_rows_vertical_par_unordered(keys)?;21let encoded = encoded.rechunk().into_owned();22let encoded = encoded.with_name(unique_column_name());23let idx = encoded.arg_sort(SortOptions {24maintain_order: true,25..Default::default()26});2728let encoded = unsafe {29df.with_column_unchecked(encoded.into_series().into());3031// If not sorted on keys, sort.32let idx_s = idx.clone().into_series();33if !idx_s.is_sorted(Default::default()).unwrap() {34let (df_ordered, keys_ordered) = POOL.join(35|| df.take_unchecked(&idx),36|| {37keys.iter()38.map(|c| c.take_unchecked(&idx))39.collect::<Vec<_>>()40},41);42*df = df_ordered;43*keys = keys_ordered;44}4546df.get_columns_mut().pop().unwrap()47};48let encoded = encoded.as_materialized_series();49let encoded = encoded.binary_offset().unwrap();50let encoded = encoded.with_sorted_flag(polars_core::series::IsSorted::Ascending);51let groups = encoded.group_tuples(true, false).unwrap();5253let GroupsType::Slice { groups, .. } = groups else {54// memory would explode55unreachable!();56};57Ok(groups)58}5960impl GroupByRollingExec {61#[cfg(feature = "dynamic_group_by")]62fn execute_impl(63&mut self,64state: &ExecutionState,65mut df: DataFrame,66) -> PolarsResult<DataFrame> {67df.as_single_chunk_par();6869let mut keys = self70.keys71.iter()72.map(|e| e.evaluate(&df, state))73.collect::<PolarsResult<Vec<_>>>()?;7475let group_by = if !self.keys.is_empty() {76Some(sort_and_groups(&mut df, &mut keys)?)77} else {78None79};8081let (mut time_key, groups) = df.rolling(group_by, &self.options)?;8283if let Some(f) = &self.apply {84let gb = GroupBy::new(&df, vec![], groups, None);85let out = gb.apply(move |df| f.call(df))?;86return Ok(if let Some((offset, len)) = self.slice {87out.slice(offset, len)88} else {89out90});91}9293let mut groups = &groups;94#[allow(unused_assignments)]95// it is unused because we only use it to keep the lifetime of sliced_group valid96let mut sliced_groups = None;9798if let Some((offset, len)) = self.slice {99sliced_groups = Some(groups.slice(offset, len));100groups = sliced_groups.as_ref().unwrap();101102time_key = time_key.slice(offset, len);103for k in &mut keys {104*k = k.slice(offset, len);105}106}107108let agg_columns = evaluate_aggs(&df, &self.aggs, groups, state)?;109110let mut columns = Vec::with_capacity(agg_columns.len() + 1 + keys.len());111columns.extend_from_slice(&keys);112columns.push(time_key);113columns.extend(agg_columns);114115DataFrame::new(columns)116}117}118119impl Executor for GroupByRollingExec {120#[cfg(not(feature = "dynamic_group_by"))]121fn execute(&mut self, _state: &mut ExecutionState) -> PolarsResult<DataFrame> {122panic!("activate feature dynamic_group_by")123}124125#[cfg(feature = "dynamic_group_by")]126fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {127state.should_stop()?;128#[cfg(debug_assertions)]129{130if state.verbose() {131eprintln!("run GroupbyRollingExec")132}133}134let df = self.input.execute(state)?;135let profile_name = if state.has_node_timer() {136let by = self137.keys138.iter()139.map(|s| Ok(s.to_field(&self.input_schema)?.name))140.collect::<PolarsResult<Vec<_>>>()?;141let name = comma_delimited("group_by_rolling".to_string(), &by);142Cow::Owned(name)143} else {144Cow::Borrowed("")145};146147if state.has_node_timer() {148let new_state = state.clone();149new_state.record(|| self.execute_impl(state, df), profile_name)150} else {151self.execute_impl(state, df)152}153}154}155156157