Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs
6940 views
1
use polars_core::prelude::*;
2
use polars_utils::idx_vec::UnitVec;
3
use polars_utils::slice_enum::Slice;
4
use recursive::recursive;
5
6
use crate::prelude::*;
7
8
mod inner {
9
use polars_utils::arena::Node;
10
use polars_utils::idx_vec::UnitVec;
11
use polars_utils::unitvec;
12
13
pub struct SlicePushDown {
14
#[expect(unused)]
15
pub new_streaming: bool,
16
scratch: UnitVec<Node>,
17
pub(super) maintain_errors: bool,
18
}
19
20
impl SlicePushDown {
21
pub fn new(maintain_errors: bool, new_streaming: bool) -> Self {
22
Self {
23
new_streaming,
24
scratch: unitvec![],
25
maintain_errors,
26
}
27
}
28
29
/// Returns shared scratch space after clearing.
30
pub fn empty_nodes_scratch_mut(&mut self) -> &mut UnitVec<Node> {
31
self.scratch.clear();
32
&mut self.scratch
33
}
34
}
35
}
36
37
pub(super) use inner::SlicePushDown;
38
39
#[derive(Copy, Clone, Debug)]
40
struct State {
41
offset: i64,
42
len: IdxSize,
43
}
44
45
impl State {
46
fn to_slice_enum(self) -> Slice {
47
let offset = self.offset;
48
let len: usize = usize::try_from(self.len).unwrap();
49
50
(offset, len).into()
51
}
52
}
53
54
/// Can push down slice when:
55
/// * all projections are elementwise
56
/// * at least 1 projection is based on a column (for height broadcast)
57
/// * projections not based on any column project as scalars
58
///
59
/// Returns (can_pushdown, can_pushdown_and_any_expr_has_column)
60
fn can_pushdown_slice_past_projections(
61
exprs: &[ExprIR],
62
arena: &Arena<AExpr>,
63
scratch: &mut UnitVec<Node>,
64
maintain_errors: bool,
65
) -> (bool, bool) {
66
scratch.clear();
67
68
let mut can_pushdown_and_any_expr_has_column = false;
69
70
for expr_ir in exprs.iter() {
71
scratch.push(expr_ir.node());
72
73
// # "has_column"
74
// `select(c = Literal([1, 2, 3])).slice(0, 0)` must block slice pushdown,
75
// because `c` projects to a height independent from the input height. We check
76
// this by observing that `c` does not have any columns in its input nodes.
77
//
78
// TODO: Simply checking that a column node is present does not handle e.g.:
79
// `select(c = Literal([1, 2, 3]).is_in(col(a)))`, for functions like `is_in`,
80
// `str.contains`, `str.contains_any` etc. - observe a column node is present
81
// but the output height is not dependent on it.
82
let mut has_column = false;
83
let mut literals_all_scalar = true;
84
85
let mut pd_group = ExprPushdownGroup::Pushable;
86
87
while let Some(node) = scratch.pop() {
88
let ae = arena.get(node);
89
90
// We re-use the logic from predicate pushdown, as slices can be seen as a form of filtering.
91
// But we also do some bookkeeping here specific to slice pushdown.
92
93
match ae {
94
AExpr::Column(_) => has_column = true,
95
AExpr::Literal(v) => literals_all_scalar &= v.is_scalar(),
96
_ => {},
97
}
98
99
if pd_group
100
.update_with_expr(scratch, ae, arena)
101
.blocks_pushdown(maintain_errors)
102
{
103
return (false, false);
104
}
105
}
106
107
// If there is no column then all literals must be scalar
108
if !(has_column || literals_all_scalar) {
109
return (false, false);
110
}
111
112
can_pushdown_and_any_expr_has_column |= has_column
113
}
114
115
(true, can_pushdown_and_any_expr_has_column)
116
}
117
118
impl SlicePushDown {
119
// slice will be done at this node if we found any
120
// we also stop optimization
121
fn no_pushdown_finish_opt(
122
&self,
123
lp: IR,
124
state: Option<State>,
125
lp_arena: &mut Arena<IR>,
126
) -> PolarsResult<IR> {
127
match state {
128
Some(state) => {
129
let input = lp_arena.add(lp);
130
131
let lp = IR::Slice {
132
input,
133
offset: state.offset,
134
len: state.len,
135
};
136
Ok(lp)
137
},
138
None => Ok(lp),
139
}
140
}
141
142
/// slice will be done at this node, but we continue optimization
143
fn no_pushdown_restart_opt(
144
&mut self,
145
lp: IR,
146
state: Option<State>,
147
lp_arena: &mut Arena<IR>,
148
expr_arena: &mut Arena<AExpr>,
149
) -> PolarsResult<IR> {
150
let inputs = lp.get_inputs();
151
152
let new_inputs = inputs
153
.into_iter()
154
.map(|node| {
155
let alp = lp_arena.take(node);
156
// No state, so we do not push down the slice here.
157
let state = None;
158
let alp = self.pushdown(alp, state, lp_arena, expr_arena)?;
159
lp_arena.replace(node, alp);
160
Ok(node)
161
})
162
.collect::<PolarsResult<UnitVec<_>>>()?;
163
let lp = lp.with_inputs(new_inputs);
164
165
self.no_pushdown_finish_opt(lp, state, lp_arena)
166
}
167
168
/// slice will be pushed down.
169
fn pushdown_and_continue(
170
&mut self,
171
lp: IR,
172
state: Option<State>,
173
lp_arena: &mut Arena<IR>,
174
expr_arena: &mut Arena<AExpr>,
175
) -> PolarsResult<IR> {
176
let inputs = lp.get_inputs();
177
178
let new_inputs = inputs
179
.into_iter()
180
.map(|node| {
181
let alp = lp_arena.take(node);
182
let alp = self.pushdown(alp, state, lp_arena, expr_arena)?;
183
lp_arena.replace(node, alp);
184
Ok(node)
185
})
186
.collect::<PolarsResult<UnitVec<_>>>()?;
187
Ok(lp.with_inputs(new_inputs))
188
}
189
190
#[recursive]
191
fn pushdown(
192
&mut self,
193
lp: IR,
194
state: Option<State>,
195
lp_arena: &mut Arena<IR>,
196
expr_arena: &mut Arena<AExpr>,
197
) -> PolarsResult<IR> {
198
use IR::*;
199
200
match (lp, state) {
201
#[cfg(feature = "python")]
202
(PythonScan {
203
mut options,
204
},
205
// TODO! we currently skip slice pushdown if there is a predicate.
206
// we can modify the readers to only limit after predicates have been applied
207
Some(state)) if state.offset == 0 && matches!(options.predicate, PythonPredicate::None) => {
208
options.n_rows = Some(state.len as usize);
209
let lp = PythonScan {
210
options,
211
};
212
Ok(lp)
213
}
214
215
(Scan {
216
sources,
217
file_info,
218
hive_parts,
219
output_schema,
220
mut unified_scan_args,
221
predicate,
222
scan_type,
223
}, Some(state)) if predicate.is_none() && match &*scan_type {
224
#[cfg(feature = "parquet")]
225
FileScanIR::Parquet { .. } => true,
226
227
#[cfg(feature = "ipc")]
228
FileScanIR::Ipc { .. } => true,
229
230
#[cfg(feature = "csv")]
231
FileScanIR::Csv { .. } => true,
232
233
#[cfg(feature = "json")]
234
FileScanIR::NDJson { .. } => true,
235
236
#[cfg(feature = "python")]
237
FileScanIR::PythonDataset { .. } => true,
238
239
// TODO: This can be `true` after Anonymous scan dispatches to new-streaming.
240
FileScanIR::Anonymous { .. } => state.offset == 0,
241
} => {
242
unified_scan_args.pre_slice = Some(state.to_slice_enum());
243
244
let lp = Scan {
245
sources,
246
file_info,
247
hive_parts,
248
output_schema,
249
scan_type,
250
unified_scan_args,
251
predicate,
252
};
253
254
Ok(lp)
255
},
256
257
(DataFrameScan {df, schema, output_schema, }, Some(state)) => {
258
let df = df.slice(state.offset, state.len as usize);
259
let lp = DataFrameScan {
260
df: Arc::new(df),
261
schema,
262
output_schema,
263
};
264
Ok(lp)
265
}
266
(Union {mut inputs, mut options }, Some(state)) => {
267
if state.offset == 0 {
268
for input in &mut inputs {
269
let input_lp = lp_arena.take(*input);
270
let input_lp = self.pushdown(input_lp, Some(state), lp_arena, expr_arena)?;
271
lp_arena.replace(*input, input_lp);
272
}
273
}
274
options.slice = Some((state.offset, state.len as usize));
275
let lp = Union {inputs, options};
276
Ok(lp)
277
},
278
(Join {
279
input_left,
280
input_right,
281
schema,
282
left_on,
283
right_on,
284
mut options
285
}, Some(state)) if !matches!(options.options, Some(JoinTypeOptionsIR::CrossAndFilter { .. })) => {
286
// first restart optimization in both inputs and get the updated LP
287
let lp_left = lp_arena.take(input_left);
288
let lp_left = self.pushdown(lp_left, None, lp_arena, expr_arena)?;
289
let input_left = lp_arena.add(lp_left);
290
291
let lp_right = lp_arena.take(input_right);
292
let lp_right = self.pushdown(lp_right, None, lp_arena, expr_arena)?;
293
let input_right = lp_arena.add(lp_right);
294
295
// then assign the slice state to the join operation
296
297
let mut_options = Arc::make_mut(&mut options);
298
mut_options.args.slice = Some((state.offset, state.len as usize));
299
300
Ok(Join {
301
input_left,
302
input_right,
303
schema,
304
left_on,
305
right_on,
306
options
307
})
308
}
309
(GroupBy { input, keys, aggs, schema, apply, maintain_order, mut options }, Some(state)) => {
310
// first restart optimization in inputs and get the updated LP
311
let input_lp = lp_arena.take(input);
312
let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?;
313
let input= lp_arena.add(input_lp);
314
315
let mut_options= Arc::make_mut(&mut options);
316
mut_options.slice = Some((state.offset, state.len as usize));
317
318
Ok(GroupBy {
319
input,
320
keys,
321
aggs,
322
schema,
323
apply,
324
maintain_order,
325
options
326
})
327
}
328
(Distinct {input, mut options}, Some(state)) => {
329
// first restart optimization in inputs and get the updated LP
330
let input_lp = lp_arena.take(input);
331
let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?;
332
let input= lp_arena.add(input_lp);
333
options.slice = Some((state.offset, state.len as usize));
334
Ok(Distinct {
335
input,
336
options,
337
})
338
}
339
(Sort {input, by_column, mut slice,
340
sort_options}, Some(state)) => {
341
// first restart optimization in inputs and get the updated LP
342
let input_lp = lp_arena.take(input);
343
let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?;
344
let input= lp_arena.add(input_lp);
345
346
slice = Some((state.offset, state.len as usize));
347
Ok(Sort {
348
input,
349
by_column,
350
slice,
351
sort_options
352
})
353
}
354
(Slice {
355
input,
356
offset,
357
mut len
358
}, Some(outer_slice)) => {
359
let alp = lp_arena.take(input);
360
361
// Both are positive, can combine into a single slice.
362
if outer_slice.offset >= 0 && offset >= 0 {
363
let state = State {
364
offset: offset.checked_add(outer_slice.offset).unwrap(),
365
len: if len as i64 > outer_slice.offset {
366
(len - outer_slice.offset as IdxSize).min(outer_slice.len)
367
} else {
368
0
369
},
370
};
371
return self.pushdown(alp, Some(state), lp_arena, expr_arena);
372
}
373
374
// If offset is negative the length can never be greater than it.
375
if offset < 0 {
376
if len as i64 > -offset {
377
len = (-offset) as IdxSize;
378
}
379
}
380
381
// Both are negative, can also combine (but not so simply).
382
if outer_slice.offset < 0 && offset < 0 {
383
let inner_start_rel_end = offset;
384
let inner_stop_rel_end = inner_start_rel_end + len as i64;
385
let naive_outer_start_rel_end = inner_stop_rel_end + outer_slice.offset;
386
let naive_outer_stop_rel_end = naive_outer_start_rel_end + outer_slice.len as i64;
387
let clamped_outer_start_rel_end = naive_outer_start_rel_end.max(inner_start_rel_end);
388
let clamped_outer_stop_rel_end = naive_outer_stop_rel_end.max(clamped_outer_start_rel_end);
389
390
let state = State {
391
offset: clamped_outer_start_rel_end,
392
len: (clamped_outer_stop_rel_end - clamped_outer_start_rel_end) as IdxSize,
393
};
394
return self.pushdown(alp, Some(state), lp_arena, expr_arena);
395
}
396
397
let inner_slice = Some(State { offset, len });
398
let lp = self.pushdown(alp, inner_slice, lp_arena, expr_arena)?;
399
let input = lp_arena.add(lp);
400
Ok(Slice {
401
input,
402
offset: outer_slice.offset,
403
len: outer_slice.len
404
})
405
}
406
(Slice {
407
input,
408
offset,
409
mut len
410
}, None) => {
411
let alp = lp_arena.take(input);
412
413
// If offset is negative the length can never be greater than it.
414
if offset < 0 {
415
if len as i64 > -offset {
416
len = (-offset) as IdxSize;
417
}
418
}
419
420
let state = Some(State {
421
offset,
422
len
423
});
424
self.pushdown(alp, state, lp_arena, expr_arena)
425
}
426
// [Do not pushdown] boundary
427
// here we do not pushdown.
428
// we reset the state and then start the optimization again
429
m @ (Filter { .. }, _)
430
// other blocking nodes
431
| m @ (DataFrameScan {..}, _)
432
| m @ (Sort {..}, _)
433
| m @ (MapFunction {function: FunctionIR::Explode {..}, ..}, _)
434
| m @ (Cache {..}, _)
435
| m @ (Distinct {..}, _)
436
| m @ (GroupBy{..},_)
437
// blocking in streaming
438
| m @ (Join{..},_)
439
=> {
440
let (lp, state) = m;
441
self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)
442
},
443
#[cfg(feature = "pivot")]
444
m @ (MapFunction {function: FunctionIR::Unpivot {..}, ..}, _) => {
445
let (lp, state) = m;
446
self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)
447
},
448
// [Pushdown]
449
(MapFunction {input, function}, _) if function.allow_predicate_pd() => {
450
let lp = MapFunction {input, function};
451
self.pushdown_and_continue(lp, state, lp_arena, expr_arena)
452
},
453
// [NO Pushdown]
454
m @ (MapFunction {..}, _) => {
455
let (lp, state) = m;
456
self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)
457
}
458
// [Pushdown]
459
// these nodes will be pushed down.
460
// State is None, we can continue
461
m @ (Select {..}, None)
462
| m @ (HStack {..}, None)
463
| m @ (SimpleProjection {..}, _)
464
=> {
465
let (lp, state) = m;
466
self.pushdown_and_continue(lp, state, lp_arena, expr_arena)
467
}
468
// there is state, inspect the projection to determine how to deal with it
469
(Select {input, expr, schema, options}, Some(_)) => {
470
let maintain_errors = self.maintain_errors;
471
if can_pushdown_slice_past_projections(&expr, expr_arena, self.empty_nodes_scratch_mut(), maintain_errors).1 {
472
let lp = Select {input, expr, schema, options};
473
self.pushdown_and_continue(lp, state, lp_arena, expr_arena)
474
}
475
// don't push down slice, but restart optimization
476
else {
477
let lp = Select {input, expr, schema, options};
478
self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)
479
}
480
}
481
(HStack {input, exprs, schema, options}, _) => {
482
let maintain_errors = self.maintain_errors;
483
let (can_pushdown, can_pushdown_and_any_expr_has_column) = can_pushdown_slice_past_projections(&exprs, expr_arena, self.empty_nodes_scratch_mut(), maintain_errors);
484
485
if can_pushdown_and_any_expr_has_column || (
486
// If the schema length is greater then an input column is being projected, so
487
// the exprs in with_columns do not need to have an input column name.
488
schema.len() > exprs.len() && can_pushdown
489
)
490
{
491
let lp = HStack {input, exprs, schema, options};
492
self.pushdown_and_continue(lp, state, lp_arena, expr_arena)
493
}
494
// don't push down slice, but restart optimization
495
else {
496
let lp = HStack {input, exprs, schema, options};
497
self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)
498
}
499
}
500
(HConcat {inputs, schema, options}, _) => {
501
// Slice can always be pushed down for horizontal concatenation
502
let lp = HConcat {inputs, schema, options};
503
self.pushdown_and_continue(lp, state, lp_arena, expr_arena)
504
}
505
(lp @ Sink { .. }, _) | (lp @ SinkMultiple { .. }, _) => {
506
// Slice can always be pushed down for sinks
507
self.pushdown_and_continue(lp, state, lp_arena, expr_arena)
508
}
509
(catch_all, state) => {
510
self.no_pushdown_finish_opt(catch_all, state, lp_arena)
511
}
512
}
513
}
514
515
pub fn optimize(
516
&mut self,
517
logical_plan: IR,
518
lp_arena: &mut Arena<IR>,
519
expr_arena: &mut Arena<AExpr>,
520
) -> PolarsResult<IR> {
521
self.pushdown(logical_plan, None, lp_arena, expr_arena)
522
}
523
}
524
525