Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/sort.rs
6940 views
1
use polars_utils::format_pl_smallstr;
2
3
use super::*;
4
5
pub(crate) struct SortExec {
6
pub(crate) input: Box<dyn Executor>,
7
pub(crate) by_column: Vec<Arc<dyn PhysicalExpr>>,
8
pub(crate) slice: Option<(i64, usize)>,
9
pub(crate) sort_options: SortMultipleOptions,
10
}
11
12
impl SortExec {
13
fn execute_impl(
14
&mut self,
15
state: &ExecutionState,
16
mut df: DataFrame,
17
) -> PolarsResult<DataFrame> {
18
state.should_stop()?;
19
df.as_single_chunk_par();
20
21
let height = df.height();
22
23
let by_columns = self
24
.by_column
25
.iter()
26
.enumerate()
27
.map(|(i, e)| {
28
let mut s = e.evaluate(&df, state)?.into_column();
29
// Polars core will try to set the sorted columns as sorted.
30
// This should only be done with simple col("foo") expressions,
31
// therefore we rename more complex expressions so that
32
// polars core does not match these.
33
if !matches!(e.as_expression(), Some(&Expr::Column(_))) {
34
s.rename(format_pl_smallstr!("_POLARS_SORT_BY_{i}"));
35
}
36
polars_ensure!(
37
s.len() == height,
38
ShapeMismatch: "sort expressions must have same \
39
length as DataFrame, got DataFrame height: {} and Series length: {}",
40
height, s.len()
41
);
42
Ok(s)
43
})
44
.collect::<PolarsResult<Vec<_>>>()?;
45
46
df.sort_impl(by_columns, self.sort_options.clone(), self.slice)
47
}
48
}
49
50
impl Executor for SortExec {
51
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
52
#[cfg(debug_assertions)]
53
{
54
if state.verbose() {
55
eprintln!("run SortExec")
56
}
57
}
58
let df = self.input.execute(state)?;
59
60
let profile_name = if state.has_node_timer() {
61
let by = self
62
.by_column
63
.iter()
64
.map(|s| Ok(s.to_field(df.schema())?.name))
65
.collect::<PolarsResult<Vec<_>>>()?;
66
let name = comma_delimited("sort".to_string(), &by);
67
Cow::Owned(name)
68
} else {
69
Cow::Borrowed("")
70
};
71
72
if state.has_node_timer() {
73
let new_state = state.clone();
74
new_state.record(|| self.execute_impl(state, df), profile_name)
75
} else {
76
self.execute_impl(state, df)
77
}
78
}
79
}
80
81