Path: blob/main/crates/polars-expr/src/expressions/sort.rs
6940 views
use polars_core::POOL;1use polars_core::prelude::*;2use polars_ops::chunked_array::ListNameSpaceImpl;3use polars_utils::idx_vec::IdxVec;4use rayon::prelude::*;56use super::*;7use crate::expressions::{AggState, AggregationContext, PhysicalExpr};89pub struct SortExpr {10pub(crate) physical_expr: Arc<dyn PhysicalExpr>,11pub(crate) options: SortOptions,12expr: Expr,13}1415impl SortExpr {16pub fn new(physical_expr: Arc<dyn PhysicalExpr>, options: SortOptions, expr: Expr) -> Self {17Self {18physical_expr,19options,20expr,21}22}23}2425/// Map arg_sort result back to the indices on the `GroupIdx`26pub(crate) fn map_sorted_indices_to_group_idx(sorted_idx: &IdxCa, idx: &[IdxSize]) -> IdxVec {27sorted_idx28.cont_slice()29.unwrap()30.iter()31.map(|&i| unsafe { *idx.get_unchecked(i as usize) })32.collect()33}3435pub(crate) fn map_sorted_indices_to_group_slice(sorted_idx: &IdxCa, first: IdxSize) -> IdxVec {36sorted_idx37.cont_slice()38.unwrap()39.iter()40.map(|&i| i + first)41.collect()42}4344impl PhysicalExpr for SortExpr {45fn as_expression(&self) -> Option<&Expr> {46Some(&self.expr)47}4849fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {50let series = self.physical_expr.evaluate(df, state)?;51series.sort_with(self.options)52}5354#[allow(clippy::ptr_arg)]55fn evaluate_on_groups<'a>(56&self,57df: &DataFrame,58groups: &'a GroupPositions,59state: &ExecutionState,60) -> PolarsResult<AggregationContext<'a>> {61let mut ac = self.physical_expr.evaluate_on_groups(df, groups, state)?;62match ac.agg_state() {63AggState::AggregatedList(s) => {64let ca = s.list().unwrap();65let out = ca.lst_sort(self.options)?;66ac.with_values(out.into_column(), true, Some(&self.expr))?;67},68_ => {69let series = ac.flat_naive().into_owned();7071let mut sort_options = self.options;72sort_options.multithreaded = false;73let groups = POOL.install(|| {74match ac.groups().as_ref().as_ref() {75GroupsType::Idx(groups) => {76groups77.par_iter()78.map(|(first, idx)| {79// SAFETY: group tuples are always in bounds.80let group = unsafe { series.take_slice_unchecked(idx) };8182let sorted_idx = group.arg_sort(sort_options);83let new_idx = map_sorted_indices_to_group_idx(&sorted_idx, idx);84(new_idx.first().copied().unwrap_or(first), new_idx)85})86.collect()87},88GroupsType::Slice { groups, .. } => groups89.par_iter()90.map(|&[first, len]| {91let group = series.slice(first as i64, len as usize);92let sorted_idx = group.arg_sort(sort_options);93let new_idx = map_sorted_indices_to_group_slice(&sorted_idx, first);94(new_idx.first().copied().unwrap_or(first), new_idx)95})96.collect(),97}98});99let groups = GroupsType::Idx(groups);100ac.with_groups(groups.into_sliceable());101},102}103104Ok(ac)105}106107fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {108self.physical_expr.to_field(input_schema)109}110111fn is_scalar(&self) -> bool {112false113}114}115116117