Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/expressions/sort.rs
6940 views
1
use polars_core::POOL;
2
use polars_core::prelude::*;
3
use polars_ops::chunked_array::ListNameSpaceImpl;
4
use polars_utils::idx_vec::IdxVec;
5
use rayon::prelude::*;
6
7
use super::*;
8
use crate::expressions::{AggState, AggregationContext, PhysicalExpr};
9
10
pub struct SortExpr {
11
pub(crate) physical_expr: Arc<dyn PhysicalExpr>,
12
pub(crate) options: SortOptions,
13
expr: Expr,
14
}
15
16
impl SortExpr {
17
pub fn new(physical_expr: Arc<dyn PhysicalExpr>, options: SortOptions, expr: Expr) -> Self {
18
Self {
19
physical_expr,
20
options,
21
expr,
22
}
23
}
24
}
25
26
/// Map arg_sort result back to the indices on the `GroupIdx`
27
pub(crate) fn map_sorted_indices_to_group_idx(sorted_idx: &IdxCa, idx: &[IdxSize]) -> IdxVec {
28
sorted_idx
29
.cont_slice()
30
.unwrap()
31
.iter()
32
.map(|&i| unsafe { *idx.get_unchecked(i as usize) })
33
.collect()
34
}
35
36
pub(crate) fn map_sorted_indices_to_group_slice(sorted_idx: &IdxCa, first: IdxSize) -> IdxVec {
37
sorted_idx
38
.cont_slice()
39
.unwrap()
40
.iter()
41
.map(|&i| i + first)
42
.collect()
43
}
44
45
impl PhysicalExpr for SortExpr {
46
fn as_expression(&self) -> Option<&Expr> {
47
Some(&self.expr)
48
}
49
50
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
51
let series = self.physical_expr.evaluate(df, state)?;
52
series.sort_with(self.options)
53
}
54
55
#[allow(clippy::ptr_arg)]
56
fn evaluate_on_groups<'a>(
57
&self,
58
df: &DataFrame,
59
groups: &'a GroupPositions,
60
state: &ExecutionState,
61
) -> PolarsResult<AggregationContext<'a>> {
62
let mut ac = self.physical_expr.evaluate_on_groups(df, groups, state)?;
63
match ac.agg_state() {
64
AggState::AggregatedList(s) => {
65
let ca = s.list().unwrap();
66
let out = ca.lst_sort(self.options)?;
67
ac.with_values(out.into_column(), true, Some(&self.expr))?;
68
},
69
_ => {
70
let series = ac.flat_naive().into_owned();
71
72
let mut sort_options = self.options;
73
sort_options.multithreaded = false;
74
let groups = POOL.install(|| {
75
match ac.groups().as_ref().as_ref() {
76
GroupsType::Idx(groups) => {
77
groups
78
.par_iter()
79
.map(|(first, idx)| {
80
// SAFETY: group tuples are always in bounds.
81
let group = unsafe { series.take_slice_unchecked(idx) };
82
83
let sorted_idx = group.arg_sort(sort_options);
84
let new_idx = map_sorted_indices_to_group_idx(&sorted_idx, idx);
85
(new_idx.first().copied().unwrap_or(first), new_idx)
86
})
87
.collect()
88
},
89
GroupsType::Slice { groups, .. } => groups
90
.par_iter()
91
.map(|&[first, len]| {
92
let group = series.slice(first as i64, len as usize);
93
let sorted_idx = group.arg_sort(sort_options);
94
let new_idx = map_sorted_indices_to_group_slice(&sorted_idx, first);
95
(new_idx.first().copied().unwrap_or(first), new_idx)
96
})
97
.collect(),
98
}
99
});
100
let groups = GroupsType::Idx(groups);
101
ac.with_groups(groups.into_sliceable());
102
},
103
}
104
105
Ok(ac)
106
}
107
108
fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
109
self.physical_expr.to_field(input_schema)
110
}
111
112
fn is_scalar(&self) -> bool {
113
false
114
}
115
}
116
117