Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/sortedness.rs
7887 views
1
use std::sync::Arc;
2
3
use polars_core::chunked_array::cast::CastOptions;
4
use polars_core::prelude::{FillNullStrategy, PlHashMap, PlHashSet};
5
use polars_core::schema::Schema;
6
use polars_core::series::IsSorted;
7
use polars_utils::arena::{Arena, Node};
8
use polars_utils::pl_str::PlSmallStr;
9
use polars_utils::unique_id::UniqueId;
10
11
use crate::plans::{
12
AExpr, ExprIR, FunctionIR, HintIR, IR, IRFunctionExpr, Sorted, ToFieldContext,
13
constant_evaluate, into_column,
14
};
15
16
#[derive(Debug, Clone)]
17
pub struct IRSorted(pub Arc<[Sorted]>);
18
19
/// Are the keys together sorted in any way?
20
pub fn are_keys_sorted_any(
21
ir_sorted: Option<&IRSorted>,
22
keys: &[ExprIR],
23
expr_arena: &Arena<AExpr>,
24
input_schema: &Schema,
25
) -> bool {
26
if let Some(ir_sorted) = ir_sorted
27
&& keys.len() <= ir_sorted.0.len()
28
&& keys
29
.iter()
30
.zip(ir_sorted.0.iter())
31
.all(|(k, s)| into_column(k.node(), expr_arena).is_some_and(|k| k == &s.column))
32
{
33
return true;
34
}
35
36
if keys.len() == 1
37
&& aexpr_sortedness(
38
expr_arena.get(keys[0].node()),
39
expr_arena,
40
input_schema,
41
ir_sorted,
42
)
43
.is_some()
44
{
45
return true;
46
}
47
48
false
49
}
50
51
pub fn is_sorted(root: Node, ir_arena: &Arena<IR>, expr_arena: &Arena<AExpr>) -> Option<IRSorted> {
52
let mut sortedness = PlHashMap::default();
53
let mut cache_proxy = PlHashMap::default();
54
let mut amort_passed_columns = PlHashSet::default();
55
56
is_sorted_rec(
57
root,
58
ir_arena,
59
expr_arena,
60
&mut sortedness,
61
&mut cache_proxy,
62
&mut amort_passed_columns,
63
)
64
}
65
66
#[recursive::recursive]
67
fn is_sorted_rec(
68
root: Node,
69
ir_arena: &Arena<IR>,
70
expr_arena: &Arena<AExpr>,
71
sortedness: &mut PlHashMap<Node, Option<IRSorted>>,
72
cache_proxy: &mut PlHashMap<UniqueId, Option<IRSorted>>,
73
amort_passed_columns: &mut PlHashSet<PlSmallStr>,
74
) -> Option<IRSorted> {
75
if let Some(s) = sortedness.get(&root) {
76
return s.clone();
77
}
78
79
macro_rules! rec {
80
($node:expr) => {{
81
is_sorted_rec(
82
$node,
83
ir_arena,
84
expr_arena,
85
sortedness,
86
cache_proxy,
87
amort_passed_columns,
88
)
89
}};
90
}
91
92
sortedness.insert(root, None);
93
94
// @NOTE: Most of the below implementations are very very conservative.
95
let sorted = match ir_arena.get(root) {
96
#[cfg(feature = "python")]
97
IR::PythonScan { .. } => None,
98
IR::Slice {
99
input,
100
offset: _,
101
len: _,
102
} => rec!(*input),
103
IR::Filter {
104
input,
105
predicate: _,
106
} => rec!(*input),
107
IR::Scan { .. } => None,
108
IR::DataFrameScan { df, .. } => Some(IRSorted(
109
[df.get_columns()
110
.iter()
111
.find_map(|c| match c.is_sorted_flag() {
112
IsSorted::Not => None,
113
IsSorted::Ascending => Some(Sorted {
114
column: c.name().clone(),
115
descending: Some(false),
116
nulls_last: Some(c.get(0).is_ok_and(|v| !v.is_null())),
117
}),
118
IsSorted::Descending => Some(Sorted {
119
column: c.name().clone(),
120
descending: Some(true),
121
nulls_last: Some(c.get(0).is_ok_and(|v| !v.is_null())),
122
}),
123
})?]
124
.into(),
125
)),
126
IR::SimpleProjection { input, columns } => {
127
let (input, columns) = (*input, columns.clone());
128
match rec!(input) {
129
None => None,
130
Some(v) => {
131
let first_unsorted_key = v.0.iter().position(|v| !columns.contains(&v.column));
132
match first_unsorted_key {
133
None => Some(v),
134
Some(0) => None,
135
Some(i) => Some(IRSorted(v.0.iter().take(i).cloned().collect())),
136
}
137
},
138
}
139
},
140
IR::Select { input, expr, .. } => {
141
let input = *input;
142
let input_sorted = rec!(input);
143
144
if let Some(input_sorted) = &input_sorted {
145
// We can keep a sorted column if it was kept and not changed.
146
147
amort_passed_columns.clear();
148
amort_passed_columns.extend(expr.iter().filter_map(|e| {
149
let column = into_column(e.node(), expr_arena)?;
150
(column == e.output_name()).then(|| column.clone())
151
}));
152
153
let first_unkept_key = input_sorted
154
.0
155
.iter()
156
.position(|v| !amort_passed_columns.contains(&v.column));
157
match first_unkept_key {
158
None => Some(input_sorted.clone()),
159
Some(0) => {
160
let input_schema = ir_arena.get(input).schema(ir_arena);
161
first_expr_ir_sorted(
162
expr,
163
expr_arena,
164
input_schema.as_ref(),
165
Some(input_sorted),
166
)
167
.map(|s| IRSorted([s].into()))
168
},
169
Some(i) => Some(IRSorted(input_sorted.0.iter().take(i).cloned().collect())),
170
}
171
} else {
172
let input_schema = ir_arena.get(input).schema(ir_arena);
173
first_expr_ir_sorted(expr, expr_arena, input_schema.as_ref(), None)
174
.map(|s| IRSorted([s].into()))
175
}
176
},
177
IR::HStack { input, exprs, .. } => {
178
let input = *input;
179
let input_sorted = rec!(input);
180
181
if let Some(input_sorted) = &input_sorted {
182
// We can keep a sorted column if it was not overwritten.
183
184
amort_passed_columns.clear();
185
amort_passed_columns.extend(exprs.iter().filter_map(|e| {
186
match into_column(e.node(), expr_arena) {
187
None => Some(e.output_name().clone()),
188
Some(c) if c == e.output_name() => None,
189
Some(_) => Some(e.output_name().clone()),
190
}
191
}));
192
193
let first_overwritten_key = input_sorted
194
.0
195
.iter()
196
.position(|v| amort_passed_columns.contains(&v.column));
197
match first_overwritten_key {
198
None => Some(input_sorted.clone()),
199
Some(0) => {
200
let input_schema = ir_arena.get(input).schema(ir_arena);
201
first_expr_ir_sorted(
202
exprs,
203
expr_arena,
204
input_schema.as_ref(),
205
Some(input_sorted),
206
)
207
.map(|s| IRSorted([s].into()))
208
},
209
Some(i) => Some(IRSorted(input_sorted.0.iter().take(i).cloned().collect())),
210
}
211
} else {
212
let input_schema = ir_arena.get(input).schema(ir_arena);
213
first_expr_ir_sorted(exprs, expr_arena, input_schema.as_ref(), None)
214
.map(|s| IRSorted([s].into()))
215
}
216
},
217
IR::Sort {
218
input: _,
219
by_column,
220
slice: _,
221
sort_options,
222
} => {
223
let mut s = by_column
224
.iter()
225
.map_while(|e| {
226
into_column(e.node(), expr_arena).map(|c| Sorted {
227
column: c.clone(),
228
descending: Some(false),
229
nulls_last: Some(false),
230
})
231
})
232
.collect::<Vec<_>>();
233
if sort_options.descending.len() != 1 {
234
s.iter_mut()
235
.zip(sort_options.descending.iter())
236
.for_each(|(s, &d)| s.descending = Some(d));
237
} else if sort_options.descending[0] {
238
s.iter_mut().for_each(|s| s.descending = Some(true));
239
}
240
if sort_options.nulls_last.len() != 1 {
241
s.iter_mut()
242
.zip(sort_options.nulls_last.iter())
243
.for_each(|(s, &d)| s.nulls_last = Some(d));
244
} else if sort_options.nulls_last[0] {
245
s.iter_mut().for_each(|s| s.nulls_last = Some(true));
246
}
247
248
Some(IRSorted(s.into()))
249
},
250
IR::Cache { input, id } => {
251
let (input, id) = (*input, *id);
252
if let Some(s) = cache_proxy.get(&id) {
253
s.clone()
254
} else {
255
let s = rec!(input);
256
cache_proxy.insert(id, s.clone());
257
s
258
}
259
},
260
IR::GroupBy {
261
input,
262
keys,
263
options,
264
maintain_order: true,
265
..
266
} if !options.is_rolling() && !options.is_dynamic() => {
267
let input = *input;
268
let input_sorted = rec!(input)?;
269
270
amort_passed_columns.clear();
271
amort_passed_columns.extend(keys.iter().filter_map(|e| {
272
let column = into_column(e.node(), expr_arena)?;
273
(column == e.output_name()).then(|| column.clone())
274
}));
275
276
// We can keep a sorted key column if it was kept and not changed.
277
278
let first_unkept_key = input_sorted
279
.0
280
.iter()
281
.position(|v| !amort_passed_columns.contains(&v.column));
282
match first_unkept_key {
283
None => Some(input_sorted.clone()),
284
Some(0) => {
285
let input_schema = ir_arena.get(input).schema(ir_arena);
286
first_expr_ir_sorted(keys, expr_arena, input_schema.as_ref(), None)
287
.map(|s| IRSorted([s].into()))
288
},
289
Some(i) => Some(IRSorted(input_sorted.0.iter().take(i).cloned().collect())),
290
}
291
},
292
#[cfg(feature = "dynamic_group_by")]
293
IR::GroupBy { options, .. } if options.is_rolling() => {
294
let Some(rolling_options) = &options.rolling else {
295
unreachable!()
296
};
297
Some(IRSorted(
298
[Sorted {
299
column: rolling_options.index_column.clone(),
300
descending: None,
301
nulls_last: None,
302
}]
303
.into(),
304
))
305
},
306
#[cfg(feature = "dynamic_group_by")]
307
IR::GroupBy { keys, options, .. } if options.is_dynamic() => {
308
let Some(dynamic_options) = &options.dynamic else {
309
unreachable!()
310
};
311
keys.is_empty().then(|| {
312
IRSorted(
313
[Sorted {
314
column: dynamic_options.index_column.clone(),
315
descending: None,
316
nulls_last: None,
317
}]
318
.into(),
319
)
320
})
321
},
322
323
IR::GroupBy { .. } => None,
324
IR::Join { .. } => None,
325
IR::MapFunction { input, function } => match function {
326
FunctionIR::Hint(hint) => match hint {
327
HintIR::Sorted(v) => Some(IRSorted(v.clone())),
328
#[expect(unreachable_patterns)]
329
_ => rec!(*input),
330
},
331
_ => None,
332
},
333
IR::Union { .. } => None,
334
IR::HConcat { .. } => None,
335
IR::ExtContext { .. } => None,
336
IR::Sink { .. } => None,
337
IR::SinkMultiple { .. } => None,
338
#[cfg(feature = "merge_sorted")]
339
IR::MergeSorted { key, .. } => Some(IRSorted(
340
[Sorted {
341
column: key.clone(),
342
descending: None,
343
nulls_last: None,
344
}]
345
.into(),
346
)),
347
IR::Distinct { input, options } => {
348
if !options.maintain_order {
349
return None;
350
}
351
352
let input = *input;
353
rec!(input)
354
},
355
IR::Invalid => unreachable!(),
356
};
357
358
sortedness.insert(root, sorted.clone());
359
sorted
360
}
361
362
pub struct AExprSorted {
363
descending: Option<bool>,
364
nulls_last: Option<bool>,
365
}
366
367
fn first_expr_ir_sorted(
368
exprs: &[ExprIR],
369
arena: &Arena<AExpr>,
370
schema: &Schema,
371
input_sorted: Option<&IRSorted>,
372
) -> Option<Sorted> {
373
exprs.iter().find_map(|e| {
374
aexpr_sortedness(arena.get(e.node()), arena, schema, input_sorted).map(|s| Sorted {
375
column: e.output_name().clone(),
376
descending: s.descending,
377
nulls_last: s.nulls_last,
378
})
379
})
380
}
381
382
#[recursive::recursive]
383
pub fn aexpr_sortedness(
384
aexpr: &AExpr,
385
arena: &Arena<AExpr>,
386
schema: &Schema,
387
input_sorted: Option<&IRSorted>,
388
) -> Option<AExprSorted> {
389
match aexpr {
390
AExpr::Element => None,
391
AExpr::Explode { .. } => None,
392
AExpr::Column(col) => {
393
let fst = input_sorted?.0.first().unwrap();
394
(fst.column == col).then_some(AExprSorted {
395
descending: fst.descending,
396
nulls_last: fst.nulls_last,
397
})
398
},
399
AExpr::Literal(lv) if lv.is_scalar() => Some(AExprSorted {
400
descending: Some(false),
401
nulls_last: Some(false),
402
}),
403
AExpr::Literal(_) => None,
404
405
AExpr::Len => Some(AExprSorted {
406
descending: Some(false),
407
nulls_last: Some(false),
408
}),
409
AExpr::Cast {
410
expr,
411
dtype,
412
options: CastOptions::Strict,
413
} if dtype.is_integer() => {
414
let expr = arena.get(*expr);
415
let expr_sortedness = aexpr_sortedness(expr, arena, schema, input_sorted)?;
416
let input_dtype = expr.to_dtype(&ToFieldContext::new(arena, schema)).ok()?;
417
if !input_dtype.is_integer() {
418
return None;
419
}
420
Some(expr_sortedness)
421
},
422
AExpr::Cast { .. } => None, // @TODO: More casts are allowed are allowed
423
AExpr::Sort { expr: _, options } => Some(AExprSorted {
424
descending: Some(options.descending),
425
nulls_last: Some(options.nulls_last),
426
}),
427
AExpr::Function {
428
input,
429
function,
430
options: _,
431
} => function_expr_sortedness(function, input, arena, schema, input_sorted),
432
AExpr::Filter { input, by: _ }
433
| AExpr::Slice {
434
input,
435
offset: _,
436
length: _,
437
} => aexpr_sortedness(arena.get(*input), arena, schema, input_sorted),
438
439
AExpr::BinaryExpr { .. }
440
| AExpr::Gather { .. }
441
| AExpr::SortBy { .. }
442
| AExpr::Agg(_)
443
| AExpr::Ternary { .. }
444
| AExpr::AnonymousStreamingAgg { .. }
445
| AExpr::AnonymousFunction { .. }
446
| AExpr::Eval { .. }
447
| AExpr::Over { .. } => None,
448
#[cfg(feature = "dynamic_group_by")]
449
AExpr::Rolling { .. } => None,
450
}
451
}
452
453
pub fn function_expr_sortedness(
454
function: &IRFunctionExpr,
455
inputs: &[ExprIR],
456
arena: &Arena<AExpr>,
457
schema: &Schema,
458
input_sorted: Option<&IRSorted>,
459
) -> Option<AExprSorted> {
460
macro_rules! first_input {
461
() => {{ aexpr_sortedness(arena.get(inputs[0].node()), arena, schema, input_sorted) }};
462
}
463
match function {
464
#[cfg(feature = "rle")]
465
IRFunctionExpr::RLEID => Some(AExprSorted {
466
descending: Some(false),
467
nulls_last: Some(false),
468
}),
469
IRFunctionExpr::SetSortedFlag(is_sorted) => match is_sorted {
470
IsSorted::Ascending => Some(AExprSorted {
471
descending: Some(false),
472
nulls_last: None,
473
}),
474
IsSorted::Descending => Some(AExprSorted {
475
descending: Some(true),
476
nulls_last: None,
477
}),
478
IsSorted::Not => None,
479
},
480
481
IRFunctionExpr::Unique(true)
482
| IRFunctionExpr::DropNulls
483
| IRFunctionExpr::DropNans
484
| IRFunctionExpr::FillNullWithStrategy(
485
FillNullStrategy::Forward(None) | FillNullStrategy::Backward(None),
486
) => {
487
first_input!()
488
},
489
#[cfg(feature = "mode")]
490
IRFunctionExpr::Mode {
491
maintain_order: true,
492
} => first_input!(),
493
494
#[cfg(feature = "range")]
495
IRFunctionExpr::Range(range) => {
496
use crate::plans::IRRangeFunction as R;
497
match range {
498
// `int_range(0, ..., step=1, dtype=UNSIGNED)`
499
R::IntRange { step: 1, dtype }
500
if dtype.is_unsigned_integer()
501
&& constant_evaluate(inputs[0].node(), arena, schema, 0)??
502
.extract_i64()
503
.is_ok_and(|v| v == 0) =>
504
{
505
Some(AExprSorted {
506
descending: Some(false),
507
nulls_last: Some(false),
508
})
509
},
510
511
_ => None,
512
}
513
},
514
515
IRFunctionExpr::Reverse => {
516
let mut sortedness = first_input!()?;
517
if let Some(d) = &mut sortedness.descending {
518
*d = !*d;
519
}
520
if let Some(n) = &mut sortedness.nulls_last {
521
*n ^= !*n;
522
}
523
Some(sortedness)
524
},
525
526
_ => None,
527
}
528
}
529
530