Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/sortedness.rs
8424 views
1
use std::sync::Arc;
2
3
use polars_core::chunked_array::cast::CastOptions;
4
use polars_core::prelude::{FillNullStrategy, PlHashMap, PlHashSet};
5
use polars_core::schema::Schema;
6
use polars_core::series::IsSorted;
7
use polars_utils::arena::{Arena, Node};
8
use polars_utils::itertools::Itertools;
9
use polars_utils::pl_str::PlSmallStr;
10
use polars_utils::unique_id::UniqueId;
11
12
#[cfg(all(feature = "strings", feature = "concat_str"))]
13
use crate::plans::IRStringFunction;
14
use crate::plans::{
15
AExpr, ExprIR, FunctionIR, HintIR, IR, IRFunctionExpr, Sorted, ToFieldContext,
16
constant_evaluate, into_column,
17
};
18
19
#[derive(Debug, Clone)]
20
pub struct IRSorted(pub Arc<[Sorted]>);
21
22
/// Are the keys together sorted in any way?
23
///
24
/// Returns the way in which the keys are sorted, if they are sorted.
25
pub fn are_keys_sorted_any(
26
ir_sorted: Option<&IRSorted>,
27
keys: &[ExprIR],
28
expr_arena: &Arena<AExpr>,
29
input_schema: &Schema,
30
) -> Option<Vec<AExprSorted>> {
31
let mut sortedness = Vec::with_capacity(keys.len());
32
for (idx, key) in keys.iter().enumerate() {
33
let s = aexpr_sortedness(
34
expr_arena.get(key.node()),
35
expr_arena,
36
input_schema,
37
Some(&ir_sorted?.0[idx..]),
38
)?;
39
sortedness.push(s);
40
}
41
Some(sortedness)
42
}
43
44
pub fn is_sorted(root: Node, ir_arena: &Arena<IR>, expr_arena: &Arena<AExpr>) -> Option<IRSorted> {
45
let mut sortedness = PlHashMap::default();
46
let mut cache_proxy = PlHashMap::default();
47
let mut amort_passed_columns = PlHashSet::default();
48
49
is_sorted_rec(
50
root,
51
ir_arena,
52
expr_arena,
53
&mut sortedness,
54
&mut cache_proxy,
55
&mut amort_passed_columns,
56
)
57
}
58
59
#[recursive::recursive]
60
fn is_sorted_rec(
61
root: Node,
62
ir_arena: &Arena<IR>,
63
expr_arena: &Arena<AExpr>,
64
sortedness: &mut PlHashMap<Node, Option<IRSorted>>,
65
cache_proxy: &mut PlHashMap<UniqueId, Option<IRSorted>>,
66
amort_passed_columns: &mut PlHashSet<PlSmallStr>,
67
) -> Option<IRSorted> {
68
if let Some(s) = sortedness.get(&root) {
69
return s.clone();
70
}
71
72
macro_rules! rec {
73
($node:expr) => {{
74
is_sorted_rec(
75
$node,
76
ir_arena,
77
expr_arena,
78
sortedness,
79
cache_proxy,
80
amort_passed_columns,
81
)
82
}};
83
}
84
85
sortedness.insert(root, None);
86
87
// @NOTE: Most of the below implementations are very very conservative.
88
let sorted = match ir_arena.get(root) {
89
#[cfg(feature = "python")]
90
IR::PythonScan { .. } => None,
91
IR::Slice {
92
input,
93
offset: _,
94
len: _,
95
} => rec!(*input),
96
IR::Filter {
97
input,
98
predicate: _,
99
} => rec!(*input),
100
IR::Scan { .. } => None,
101
IR::DataFrameScan { df, .. } => {
102
let sorted_cols = df
103
.columns()
104
.iter()
105
.filter_map(|c| match c.is_sorted_flag() {
106
IsSorted::Not => None,
107
IsSorted::Ascending => Some(Sorted {
108
column: c.name().clone(),
109
descending: Some(false),
110
nulls_last: Some(c.get(0).is_ok_and(|v| !v.is_null())),
111
}),
112
IsSorted::Descending => Some(Sorted {
113
column: c.name().clone(),
114
descending: Some(true),
115
nulls_last: Some(c.get(0).is_ok_and(|v| !v.is_null())),
116
}),
117
})
118
.collect_vec();
119
(!sorted_cols.is_empty()).then(|| IRSorted(sorted_cols.into()))
120
},
121
IR::SimpleProjection { input, columns } => {
122
let (input, columns) = (*input, columns.clone());
123
match rec!(input) {
124
None => None,
125
Some(v) => {
126
let first_unsorted_key = v.0.iter().position(|v| !columns.contains(&v.column));
127
match first_unsorted_key {
128
None => Some(v),
129
Some(0) => None,
130
Some(i) => Some(IRSorted(v.0.iter().take(i).cloned().collect())),
131
}
132
},
133
}
134
},
135
IR::Select { input, expr, .. } => {
136
let input = *input;
137
let input_sorted = rec!(input);
138
139
if let Some(input_sorted) = &input_sorted {
140
// We can keep a sorted column if it was kept and not changed.
141
142
amort_passed_columns.clear();
143
amort_passed_columns.extend(expr.iter().filter_map(|e| {
144
let column = into_column(e.node(), expr_arena)?;
145
(column == e.output_name()).then(|| column.clone())
146
}));
147
148
let first_unkept_key = input_sorted
149
.0
150
.iter()
151
.position(|v| !amort_passed_columns.contains(&v.column));
152
match first_unkept_key {
153
None => Some(input_sorted.clone()),
154
Some(0) => {
155
let input_schema = ir_arena.get(input).schema(ir_arena);
156
first_expr_ir_sorted(
157
expr,
158
expr_arena,
159
input_schema.as_ref(),
160
Some(&input_sorted.0),
161
)
162
.map(|s| IRSorted([s].into()))
163
},
164
Some(i) => Some(IRSorted(input_sorted.0.iter().take(i).cloned().collect())),
165
}
166
} else {
167
let input_schema = ir_arena.get(input).schema(ir_arena);
168
first_expr_ir_sorted(expr, expr_arena, input_schema.as_ref(), None)
169
.map(|s| IRSorted([s].into()))
170
}
171
},
172
IR::HStack { input, exprs, .. } => {
173
let input = *input;
174
let input_sorted = rec!(input);
175
176
if let Some(input_sorted) = &input_sorted {
177
// We can keep a sorted column if it was not overwritten.
178
179
amort_passed_columns.clear();
180
amort_passed_columns.extend(exprs.iter().filter_map(|e| {
181
match into_column(e.node(), expr_arena) {
182
None => Some(e.output_name().clone()),
183
Some(c) if c == e.output_name() => None,
184
Some(_) => Some(e.output_name().clone()),
185
}
186
}));
187
188
let first_overwritten_key = input_sorted
189
.0
190
.iter()
191
.position(|v| amort_passed_columns.contains(&v.column));
192
match first_overwritten_key {
193
None => Some(input_sorted.clone()),
194
Some(0) => {
195
let input_schema = ir_arena.get(input).schema(ir_arena);
196
first_expr_ir_sorted(
197
exprs,
198
expr_arena,
199
input_schema.as_ref(),
200
Some(&input_sorted.0),
201
)
202
.map(|s| IRSorted([s].into()))
203
},
204
Some(i) => Some(IRSorted(input_sorted.0.iter().take(i).cloned().collect())),
205
}
206
} else {
207
let input_schema = ir_arena.get(input).schema(ir_arena);
208
first_expr_ir_sorted(exprs, expr_arena, input_schema.as_ref(), None)
209
.map(|s| IRSorted([s].into()))
210
}
211
},
212
IR::Sort {
213
input: _,
214
by_column,
215
slice: _,
216
sort_options,
217
} => {
218
let mut s = by_column
219
.iter()
220
.map_while(|e| {
221
into_column(e.node(), expr_arena).map(|c| Sorted {
222
column: c.clone(),
223
descending: Some(false),
224
nulls_last: Some(false),
225
})
226
})
227
.collect::<Vec<_>>();
228
if sort_options.descending.len() != 1 {
229
s.iter_mut()
230
.zip(sort_options.descending.iter())
231
.for_each(|(s, &d)| s.descending = Some(d));
232
} else if sort_options.descending[0] {
233
s.iter_mut().for_each(|s| s.descending = Some(true));
234
}
235
if sort_options.nulls_last.len() != 1 {
236
s.iter_mut()
237
.zip(sort_options.nulls_last.iter())
238
.for_each(|(s, &d)| s.nulls_last = Some(d));
239
} else if sort_options.nulls_last[0] {
240
s.iter_mut().for_each(|s| s.nulls_last = Some(true));
241
}
242
243
Some(IRSorted(s.into()))
244
},
245
IR::Cache { input, id } => {
246
let (input, id) = (*input, *id);
247
if let Some(s) = cache_proxy.get(&id) {
248
s.clone()
249
} else {
250
let s = rec!(input);
251
cache_proxy.insert(id, s.clone());
252
s
253
}
254
},
255
IR::GroupBy {
256
input,
257
keys,
258
options,
259
maintain_order: true,
260
..
261
} if !options.is_rolling() && !options.is_dynamic() => {
262
let input = *input;
263
let input_sorted = rec!(input)?;
264
265
amort_passed_columns.clear();
266
amort_passed_columns.extend(keys.iter().filter_map(|e| {
267
let column = into_column(e.node(), expr_arena)?;
268
(column == e.output_name()).then(|| column.clone())
269
}));
270
271
// We can keep a sorted key column if it was kept and not changed.
272
273
let first_unkept_key = input_sorted
274
.0
275
.iter()
276
.position(|v| !amort_passed_columns.contains(&v.column));
277
match first_unkept_key {
278
None => Some(input_sorted.clone()),
279
Some(0) => {
280
let input_schema = ir_arena.get(input).schema(ir_arena);
281
first_expr_ir_sorted(keys, expr_arena, input_schema.as_ref(), None)
282
.map(|s| IRSorted([s].into()))
283
},
284
Some(i) => Some(IRSorted(input_sorted.0.iter().take(i).cloned().collect())),
285
}
286
},
287
#[cfg(feature = "dynamic_group_by")]
288
IR::GroupBy { options, .. } if options.is_rolling() => {
289
let Some(rolling_options) = &options.rolling else {
290
unreachable!()
291
};
292
Some(IRSorted(
293
[Sorted {
294
column: rolling_options.index_column.clone(),
295
descending: None,
296
nulls_last: None,
297
}]
298
.into(),
299
))
300
},
301
#[cfg(feature = "dynamic_group_by")]
302
IR::GroupBy { keys, options, .. } if options.is_dynamic() => {
303
let Some(dynamic_options) = &options.dynamic else {
304
unreachable!()
305
};
306
keys.is_empty().then(|| {
307
IRSorted(
308
[Sorted {
309
column: dynamic_options.index_column.clone(),
310
descending: None,
311
nulls_last: None,
312
}]
313
.into(),
314
)
315
})
316
},
317
318
IR::GroupBy { .. } => None,
319
IR::Join { .. } => None,
320
IR::MapFunction { input, function } => match function {
321
FunctionIR::Hint(hint) => match hint {
322
HintIR::Sorted(v) => Some(IRSorted(v.clone())),
323
#[expect(unreachable_patterns)]
324
_ => rec!(*input),
325
},
326
_ => None,
327
},
328
IR::Union { .. } => None,
329
IR::HConcat { .. } => None,
330
IR::ExtContext { .. } => None,
331
IR::Sink { .. } => None,
332
IR::SinkMultiple { .. } => None,
333
#[cfg(feature = "merge_sorted")]
334
IR::MergeSorted { key, .. } => Some(IRSorted(
335
[Sorted {
336
column: key.clone(),
337
descending: None,
338
nulls_last: None,
339
}]
340
.into(),
341
)),
342
IR::Distinct { input, options } => {
343
if !options.maintain_order {
344
return None;
345
}
346
347
let input = *input;
348
rec!(input)
349
},
350
IR::Invalid => unreachable!(),
351
};
352
353
sortedness.insert(root, sorted.clone());
354
sorted
355
}
356
357
#[derive(Debug, PartialEq)]
358
pub struct AExprSorted {
359
pub descending: Option<bool>,
360
pub nulls_last: Option<bool>,
361
}
362
363
fn first_expr_ir_sorted(
364
exprs: &[ExprIR],
365
arena: &Arena<AExpr>,
366
schema: &Schema,
367
input_sorted: Option<&[Sorted]>,
368
) -> Option<Sorted> {
369
exprs.iter().find_map(|e| {
370
aexpr_sortedness(arena.get(e.node()), arena, schema, input_sorted).map(|s| Sorted {
371
column: e.output_name().clone(),
372
descending: s.descending,
373
nulls_last: s.nulls_last,
374
})
375
})
376
}
377
378
#[recursive::recursive]
379
pub fn aexpr_sortedness(
380
aexpr: &AExpr,
381
arena: &Arena<AExpr>,
382
schema: &Schema,
383
input_sorted: Option<&[Sorted]>,
384
) -> Option<AExprSorted> {
385
match aexpr {
386
AExpr::Element => None,
387
AExpr::Explode { .. } => None,
388
AExpr::Column(col) => {
389
let fst = input_sorted?.first()?;
390
(fst.column == col).then_some(AExprSorted {
391
descending: fst.descending,
392
nulls_last: fst.nulls_last,
393
})
394
},
395
#[cfg(feature = "dtype-struct")]
396
AExpr::StructField(_) => None,
397
AExpr::Literal(lv) if lv.is_scalar() => Some(AExprSorted {
398
descending: Some(false),
399
nulls_last: Some(false),
400
}),
401
AExpr::Literal(_) => None,
402
403
AExpr::Len => Some(AExprSorted {
404
descending: Some(false),
405
nulls_last: Some(false),
406
}),
407
AExpr::Cast {
408
expr,
409
dtype,
410
options: CastOptions::Strict,
411
} if dtype.is_integer() => {
412
let expr = arena.get(*expr);
413
let expr_sortedness = aexpr_sortedness(expr, arena, schema, input_sorted)?;
414
let input_dtype = expr.to_dtype(&ToFieldContext::new(arena, schema)).ok()?;
415
if !input_dtype.is_integer() {
416
return None;
417
}
418
Some(expr_sortedness)
419
},
420
AExpr::Cast { .. } => None, // @TODO: More casts are allowed
421
AExpr::Sort { expr: _, options } => Some(AExprSorted {
422
descending: Some(options.descending),
423
nulls_last: Some(options.nulls_last),
424
}),
425
AExpr::Function {
426
input,
427
function,
428
options: _,
429
} => function_expr_sortedness(function, input, arena, schema, input_sorted),
430
AExpr::Filter { input, by: _ }
431
| AExpr::Slice {
432
input,
433
offset: _,
434
length: _,
435
} => aexpr_sortedness(arena.get(*input), arena, schema, input_sorted),
436
437
AExpr::BinaryExpr { .. }
438
| AExpr::Gather { .. }
439
| AExpr::SortBy { .. }
440
| AExpr::Agg(_)
441
| AExpr::Ternary { .. }
442
| AExpr::AnonymousAgg { .. }
443
| AExpr::AnonymousFunction { .. }
444
| AExpr::Eval { .. }
445
| AExpr::Over { .. } => None,
446
447
#[cfg(feature = "dtype-struct")]
448
AExpr::StructEval { .. } => None,
449
450
#[cfg(feature = "dynamic_group_by")]
451
AExpr::Rolling { .. } => None,
452
}
453
}
454
455
pub fn function_expr_sortedness(
456
function: &IRFunctionExpr,
457
inputs: &[ExprIR],
458
arena: &Arena<AExpr>,
459
schema: &Schema,
460
input_sorted: Option<&[Sorted]>,
461
) -> Option<AExprSorted> {
462
macro_rules! rec_ae {
463
($node:expr) => {{ aexpr_sortedness(arena.get($node), arena, schema, input_sorted) }};
464
}
465
466
match function {
467
#[cfg(feature = "rle")]
468
IRFunctionExpr::RLEID => Some(AExprSorted {
469
descending: Some(false),
470
nulls_last: Some(false),
471
}),
472
IRFunctionExpr::SetSortedFlag(is_sorted) => match is_sorted {
473
IsSorted::Ascending => Some(AExprSorted {
474
descending: Some(false),
475
nulls_last: None,
476
}),
477
IsSorted::Descending => Some(AExprSorted {
478
descending: Some(true),
479
nulls_last: None,
480
}),
481
IsSorted::Not => None,
482
},
483
484
IRFunctionExpr::Unique(true)
485
| IRFunctionExpr::DropNulls
486
| IRFunctionExpr::DropNans
487
| IRFunctionExpr::FillNullWithStrategy(
488
FillNullStrategy::Forward(None) | FillNullStrategy::Backward(None),
489
) => {
490
let [e] = inputs else {
491
return None;
492
};
493
494
rec_ae!(e.node())
495
},
496
#[cfg(feature = "mode")]
497
IRFunctionExpr::Mode {
498
maintain_order: true,
499
} => {
500
let [e] = inputs else {
501
return None;
502
};
503
504
rec_ae!(e.node())
505
},
506
507
#[cfg(feature = "range")]
508
IRFunctionExpr::Range(range) => {
509
use crate::plans::IRRangeFunction as R;
510
match range {
511
// `int_range(0, ..., step=1, dtype=UNSIGNED)`
512
R::IntRange { step: 1, dtype }
513
if dtype.is_unsigned_integer()
514
&& constant_evaluate(inputs[0].node(), arena, schema, 0)??
515
.extract_i64()
516
.is_ok_and(|v| v == 0) =>
517
{
518
Some(AExprSorted {
519
descending: Some(false),
520
nulls_last: Some(false),
521
})
522
},
523
524
_ => None,
525
}
526
},
527
528
IRFunctionExpr::Reverse => {
529
let [e] = inputs else {
530
return None;
531
};
532
533
let mut sortedness = rec_ae!(e.node())?;
534
535
if let Some(d) = &mut sortedness.descending {
536
*d = !*d;
537
}
538
if let Some(n) = &mut sortedness.nulls_last {
539
*n ^= !*n;
540
}
541
Some(sortedness)
542
},
543
544
#[cfg(all(feature = "strings", feature = "concat_str"))]
545
IRFunctionExpr::StringExpr(IRStringFunction::ConcatHorizontal {
546
ignore_nulls: false,
547
delimiter: _,
548
}) => {
549
let [e] = inputs else {
550
return None;
551
};
552
553
rec_ae!(e.node())
554
},
555
556
_ => None,
557
}
558
}
559
560