Path: blob/main/crates/polars-plan/src/plans/optimizer/sortedness.rs
7887 views
use std::sync::Arc;12use polars_core::chunked_array::cast::CastOptions;3use polars_core::prelude::{FillNullStrategy, PlHashMap, PlHashSet};4use polars_core::schema::Schema;5use polars_core::series::IsSorted;6use polars_utils::arena::{Arena, Node};7use polars_utils::pl_str::PlSmallStr;8use polars_utils::unique_id::UniqueId;910use crate::plans::{11AExpr, ExprIR, FunctionIR, HintIR, IR, IRFunctionExpr, Sorted, ToFieldContext,12constant_evaluate, into_column,13};1415#[derive(Debug, Clone)]16pub struct IRSorted(pub Arc<[Sorted]>);1718/// Are the keys together sorted in any way?19pub fn are_keys_sorted_any(20ir_sorted: Option<&IRSorted>,21keys: &[ExprIR],22expr_arena: &Arena<AExpr>,23input_schema: &Schema,24) -> bool {25if let Some(ir_sorted) = ir_sorted26&& keys.len() <= ir_sorted.0.len()27&& keys28.iter()29.zip(ir_sorted.0.iter())30.all(|(k, s)| into_column(k.node(), expr_arena).is_some_and(|k| k == &s.column))31{32return true;33}3435if keys.len() == 136&& aexpr_sortedness(37expr_arena.get(keys[0].node()),38expr_arena,39input_schema,40ir_sorted,41)42.is_some()43{44return true;45}4647false48}4950pub fn is_sorted(root: Node, ir_arena: &Arena<IR>, expr_arena: &Arena<AExpr>) -> Option<IRSorted> {51let mut sortedness = PlHashMap::default();52let mut cache_proxy = PlHashMap::default();53let mut amort_passed_columns = PlHashSet::default();5455is_sorted_rec(56root,57ir_arena,58expr_arena,59&mut sortedness,60&mut cache_proxy,61&mut amort_passed_columns,62)63}6465#[recursive::recursive]66fn is_sorted_rec(67root: Node,68ir_arena: &Arena<IR>,69expr_arena: &Arena<AExpr>,70sortedness: &mut PlHashMap<Node, Option<IRSorted>>,71cache_proxy: &mut PlHashMap<UniqueId, Option<IRSorted>>,72amort_passed_columns: &mut PlHashSet<PlSmallStr>,73) -> Option<IRSorted> {74if let Some(s) = sortedness.get(&root) {75return s.clone();76}7778macro_rules! rec {79($node:expr) => {{80is_sorted_rec(81$node,82ir_arena,83expr_arena,84sortedness,85cache_proxy,86amort_passed_columns,87)88}};89}9091sortedness.insert(root, None);9293// @NOTE: Most of the below implementations are very very conservative.94let sorted = match ir_arena.get(root) {95#[cfg(feature = "python")]96IR::PythonScan { .. } => None,97IR::Slice {98input,99offset: _,100len: _,101} => rec!(*input),102IR::Filter {103input,104predicate: _,105} => rec!(*input),106IR::Scan { .. } => None,107IR::DataFrameScan { df, .. } => Some(IRSorted(108[df.get_columns()109.iter()110.find_map(|c| match c.is_sorted_flag() {111IsSorted::Not => None,112IsSorted::Ascending => Some(Sorted {113column: c.name().clone(),114descending: Some(false),115nulls_last: Some(c.get(0).is_ok_and(|v| !v.is_null())),116}),117IsSorted::Descending => Some(Sorted {118column: c.name().clone(),119descending: Some(true),120nulls_last: Some(c.get(0).is_ok_and(|v| !v.is_null())),121}),122})?]123.into(),124)),125IR::SimpleProjection { input, columns } => {126let (input, columns) = (*input, columns.clone());127match rec!(input) {128None => None,129Some(v) => {130let first_unsorted_key = v.0.iter().position(|v| !columns.contains(&v.column));131match first_unsorted_key {132None => Some(v),133Some(0) => None,134Some(i) => Some(IRSorted(v.0.iter().take(i).cloned().collect())),135}136},137}138},139IR::Select { input, expr, .. } => {140let input = *input;141let input_sorted = rec!(input);142143if let Some(input_sorted) = &input_sorted {144// We can keep a sorted column if it was kept and not changed.145146amort_passed_columns.clear();147amort_passed_columns.extend(expr.iter().filter_map(|e| {148let column = into_column(e.node(), expr_arena)?;149(column == e.output_name()).then(|| column.clone())150}));151152let first_unkept_key = input_sorted153.0154.iter()155.position(|v| !amort_passed_columns.contains(&v.column));156match first_unkept_key {157None => Some(input_sorted.clone()),158Some(0) => {159let input_schema = ir_arena.get(input).schema(ir_arena);160first_expr_ir_sorted(161expr,162expr_arena,163input_schema.as_ref(),164Some(input_sorted),165)166.map(|s| IRSorted([s].into()))167},168Some(i) => Some(IRSorted(input_sorted.0.iter().take(i).cloned().collect())),169}170} else {171let input_schema = ir_arena.get(input).schema(ir_arena);172first_expr_ir_sorted(expr, expr_arena, input_schema.as_ref(), None)173.map(|s| IRSorted([s].into()))174}175},176IR::HStack { input, exprs, .. } => {177let input = *input;178let input_sorted = rec!(input);179180if let Some(input_sorted) = &input_sorted {181// We can keep a sorted column if it was not overwritten.182183amort_passed_columns.clear();184amort_passed_columns.extend(exprs.iter().filter_map(|e| {185match into_column(e.node(), expr_arena) {186None => Some(e.output_name().clone()),187Some(c) if c == e.output_name() => None,188Some(_) => Some(e.output_name().clone()),189}190}));191192let first_overwritten_key = input_sorted193.0194.iter()195.position(|v| amort_passed_columns.contains(&v.column));196match first_overwritten_key {197None => Some(input_sorted.clone()),198Some(0) => {199let input_schema = ir_arena.get(input).schema(ir_arena);200first_expr_ir_sorted(201exprs,202expr_arena,203input_schema.as_ref(),204Some(input_sorted),205)206.map(|s| IRSorted([s].into()))207},208Some(i) => Some(IRSorted(input_sorted.0.iter().take(i).cloned().collect())),209}210} else {211let input_schema = ir_arena.get(input).schema(ir_arena);212first_expr_ir_sorted(exprs, expr_arena, input_schema.as_ref(), None)213.map(|s| IRSorted([s].into()))214}215},216IR::Sort {217input: _,218by_column,219slice: _,220sort_options,221} => {222let mut s = by_column223.iter()224.map_while(|e| {225into_column(e.node(), expr_arena).map(|c| Sorted {226column: c.clone(),227descending: Some(false),228nulls_last: Some(false),229})230})231.collect::<Vec<_>>();232if sort_options.descending.len() != 1 {233s.iter_mut()234.zip(sort_options.descending.iter())235.for_each(|(s, &d)| s.descending = Some(d));236} else if sort_options.descending[0] {237s.iter_mut().for_each(|s| s.descending = Some(true));238}239if sort_options.nulls_last.len() != 1 {240s.iter_mut()241.zip(sort_options.nulls_last.iter())242.for_each(|(s, &d)| s.nulls_last = Some(d));243} else if sort_options.nulls_last[0] {244s.iter_mut().for_each(|s| s.nulls_last = Some(true));245}246247Some(IRSorted(s.into()))248},249IR::Cache { input, id } => {250let (input, id) = (*input, *id);251if let Some(s) = cache_proxy.get(&id) {252s.clone()253} else {254let s = rec!(input);255cache_proxy.insert(id, s.clone());256s257}258},259IR::GroupBy {260input,261keys,262options,263maintain_order: true,264..265} if !options.is_rolling() && !options.is_dynamic() => {266let input = *input;267let input_sorted = rec!(input)?;268269amort_passed_columns.clear();270amort_passed_columns.extend(keys.iter().filter_map(|e| {271let column = into_column(e.node(), expr_arena)?;272(column == e.output_name()).then(|| column.clone())273}));274275// We can keep a sorted key column if it was kept and not changed.276277let first_unkept_key = input_sorted278.0279.iter()280.position(|v| !amort_passed_columns.contains(&v.column));281match first_unkept_key {282None => Some(input_sorted.clone()),283Some(0) => {284let input_schema = ir_arena.get(input).schema(ir_arena);285first_expr_ir_sorted(keys, expr_arena, input_schema.as_ref(), None)286.map(|s| IRSorted([s].into()))287},288Some(i) => Some(IRSorted(input_sorted.0.iter().take(i).cloned().collect())),289}290},291#[cfg(feature = "dynamic_group_by")]292IR::GroupBy { options, .. } if options.is_rolling() => {293let Some(rolling_options) = &options.rolling else {294unreachable!()295};296Some(IRSorted(297[Sorted {298column: rolling_options.index_column.clone(),299descending: None,300nulls_last: None,301}]302.into(),303))304},305#[cfg(feature = "dynamic_group_by")]306IR::GroupBy { keys, options, .. } if options.is_dynamic() => {307let Some(dynamic_options) = &options.dynamic else {308unreachable!()309};310keys.is_empty().then(|| {311IRSorted(312[Sorted {313column: dynamic_options.index_column.clone(),314descending: None,315nulls_last: None,316}]317.into(),318)319})320},321322IR::GroupBy { .. } => None,323IR::Join { .. } => None,324IR::MapFunction { input, function } => match function {325FunctionIR::Hint(hint) => match hint {326HintIR::Sorted(v) => Some(IRSorted(v.clone())),327#[expect(unreachable_patterns)]328_ => rec!(*input),329},330_ => None,331},332IR::Union { .. } => None,333IR::HConcat { .. } => None,334IR::ExtContext { .. } => None,335IR::Sink { .. } => None,336IR::SinkMultiple { .. } => None,337#[cfg(feature = "merge_sorted")]338IR::MergeSorted { key, .. } => Some(IRSorted(339[Sorted {340column: key.clone(),341descending: None,342nulls_last: None,343}]344.into(),345)),346IR::Distinct { input, options } => {347if !options.maintain_order {348return None;349}350351let input = *input;352rec!(input)353},354IR::Invalid => unreachable!(),355};356357sortedness.insert(root, sorted.clone());358sorted359}360361pub struct AExprSorted {362descending: Option<bool>,363nulls_last: Option<bool>,364}365366fn first_expr_ir_sorted(367exprs: &[ExprIR],368arena: &Arena<AExpr>,369schema: &Schema,370input_sorted: Option<&IRSorted>,371) -> Option<Sorted> {372exprs.iter().find_map(|e| {373aexpr_sortedness(arena.get(e.node()), arena, schema, input_sorted).map(|s| Sorted {374column: e.output_name().clone(),375descending: s.descending,376nulls_last: s.nulls_last,377})378})379}380381#[recursive::recursive]382pub fn aexpr_sortedness(383aexpr: &AExpr,384arena: &Arena<AExpr>,385schema: &Schema,386input_sorted: Option<&IRSorted>,387) -> Option<AExprSorted> {388match aexpr {389AExpr::Element => None,390AExpr::Explode { .. } => None,391AExpr::Column(col) => {392let fst = input_sorted?.0.first().unwrap();393(fst.column == col).then_some(AExprSorted {394descending: fst.descending,395nulls_last: fst.nulls_last,396})397},398AExpr::Literal(lv) if lv.is_scalar() => Some(AExprSorted {399descending: Some(false),400nulls_last: Some(false),401}),402AExpr::Literal(_) => None,403404AExpr::Len => Some(AExprSorted {405descending: Some(false),406nulls_last: Some(false),407}),408AExpr::Cast {409expr,410dtype,411options: CastOptions::Strict,412} if dtype.is_integer() => {413let expr = arena.get(*expr);414let expr_sortedness = aexpr_sortedness(expr, arena, schema, input_sorted)?;415let input_dtype = expr.to_dtype(&ToFieldContext::new(arena, schema)).ok()?;416if !input_dtype.is_integer() {417return None;418}419Some(expr_sortedness)420},421AExpr::Cast { .. } => None, // @TODO: More casts are allowed are allowed422AExpr::Sort { expr: _, options } => Some(AExprSorted {423descending: Some(options.descending),424nulls_last: Some(options.nulls_last),425}),426AExpr::Function {427input,428function,429options: _,430} => function_expr_sortedness(function, input, arena, schema, input_sorted),431AExpr::Filter { input, by: _ }432| AExpr::Slice {433input,434offset: _,435length: _,436} => aexpr_sortedness(arena.get(*input), arena, schema, input_sorted),437438AExpr::BinaryExpr { .. }439| AExpr::Gather { .. }440| AExpr::SortBy { .. }441| AExpr::Agg(_)442| AExpr::Ternary { .. }443| AExpr::AnonymousStreamingAgg { .. }444| AExpr::AnonymousFunction { .. }445| AExpr::Eval { .. }446| AExpr::Over { .. } => None,447#[cfg(feature = "dynamic_group_by")]448AExpr::Rolling { .. } => None,449}450}451452pub fn function_expr_sortedness(453function: &IRFunctionExpr,454inputs: &[ExprIR],455arena: &Arena<AExpr>,456schema: &Schema,457input_sorted: Option<&IRSorted>,458) -> Option<AExprSorted> {459macro_rules! first_input {460() => {{ aexpr_sortedness(arena.get(inputs[0].node()), arena, schema, input_sorted) }};461}462match function {463#[cfg(feature = "rle")]464IRFunctionExpr::RLEID => Some(AExprSorted {465descending: Some(false),466nulls_last: Some(false),467}),468IRFunctionExpr::SetSortedFlag(is_sorted) => match is_sorted {469IsSorted::Ascending => Some(AExprSorted {470descending: Some(false),471nulls_last: None,472}),473IsSorted::Descending => Some(AExprSorted {474descending: Some(true),475nulls_last: None,476}),477IsSorted::Not => None,478},479480IRFunctionExpr::Unique(true)481| IRFunctionExpr::DropNulls482| IRFunctionExpr::DropNans483| IRFunctionExpr::FillNullWithStrategy(484FillNullStrategy::Forward(None) | FillNullStrategy::Backward(None),485) => {486first_input!()487},488#[cfg(feature = "mode")]489IRFunctionExpr::Mode {490maintain_order: true,491} => first_input!(),492493#[cfg(feature = "range")]494IRFunctionExpr::Range(range) => {495use crate::plans::IRRangeFunction as R;496match range {497// `int_range(0, ..., step=1, dtype=UNSIGNED)`498R::IntRange { step: 1, dtype }499if dtype.is_unsigned_integer()500&& constant_evaluate(inputs[0].node(), arena, schema, 0)??501.extract_i64()502.is_ok_and(|v| v == 0) =>503{504Some(AExprSorted {505descending: Some(false),506nulls_last: Some(false),507})508},509510_ => None,511}512},513514IRFunctionExpr::Reverse => {515let mut sortedness = first_input!()?;516if let Some(d) = &mut sortedness.descending {517*d = !*d;518}519if let Some(n) = &mut sortedness.nulls_last {520*n ^= !*n;521}522Some(sortedness)523},524525_ => None,526}527}528529530