Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs
8424 views
1
use polars_core::prelude::*;
2
use polars_utils::idx_vec::UnitVec;
3
use polars_utils::slice_enum::Slice;
4
use recursive::recursive;
5
6
use crate::prelude::*;
7
8
mod inner {
9
use polars_utils::arena::Node;
10
use polars_utils::idx_vec::UnitVec;
11
use polars_utils::unitvec;
12
13
pub struct SlicePushDown {
14
scratch: UnitVec<Node>,
15
pub(super) maintain_errors: bool,
16
}
17
18
impl SlicePushDown {
19
pub fn new(maintain_errors: bool) -> Self {
20
Self {
21
scratch: unitvec![],
22
maintain_errors,
23
}
24
}
25
26
/// Returns shared scratch space after clearing.
27
pub fn empty_nodes_scratch_mut(&mut self) -> &mut UnitVec<Node> {
28
self.scratch.clear();
29
&mut self.scratch
30
}
31
}
32
}
33
34
pub(super) use inner::SlicePushDown;
35
36
#[derive(Copy, Clone, Debug)]
37
struct State {
38
offset: i64,
39
len: IdxSize,
40
}
41
42
impl State {
43
fn to_slice_enum(self) -> Slice {
44
let offset = self.offset;
45
let len: usize = usize::try_from(self.len).unwrap();
46
47
(offset, len).into()
48
}
49
}
50
51
/// Can push down slice when:
52
/// * all projections are elementwise
53
/// * at least 1 projection is based on a column (for height broadcast)
54
/// * projections not based on any column project as scalars
55
///
56
/// Returns (can_pushdown, can_pushdown_and_any_expr_has_column)
57
fn can_pushdown_slice_past_projections(
58
exprs: &[ExprIR],
59
arena: &Arena<AExpr>,
60
scratch: &mut UnitVec<Node>,
61
maintain_errors: bool,
62
) -> (bool, bool) {
63
scratch.clear();
64
65
let mut can_pushdown_and_any_expr_has_column = false;
66
67
for expr_ir in exprs.iter() {
68
scratch.push(expr_ir.node());
69
70
// # "has_column"
71
// `select(c = Literal([1, 2, 3])).slice(0, 0)` must block slice pushdown,
72
// because `c` projects to a height independent from the input height. We check
73
// this by observing that `c` does not have any columns in its input nodes.
74
//
75
// TODO: Simply checking that a column node is present does not handle e.g.:
76
// `select(c = Literal([1, 2, 3]).is_in(col(a)))`, for functions like `is_in`,
77
// `str.contains`, `str.contains_any` etc. - observe a column node is present
78
// but the output height is not dependent on it.
79
let mut has_column = false;
80
let mut literals_all_scalar = true;
81
82
let mut pd_group = ExprPushdownGroup::Pushable;
83
84
while let Some(node) = scratch.pop() {
85
let ae = arena.get(node);
86
87
// We re-use the logic from predicate pushdown, as slices can be seen as a form of filtering.
88
// But we also do some bookkeeping here specific to slice pushdown.
89
90
match ae {
91
AExpr::Column(_) => has_column = true,
92
AExpr::Literal(v) => literals_all_scalar &= v.is_scalar(),
93
_ => {},
94
}
95
96
if pd_group
97
.update_with_expr(scratch, ae, arena)
98
.blocks_pushdown(maintain_errors)
99
{
100
return (false, false);
101
}
102
}
103
104
// If there is no column then all literals must be scalar
105
if !(has_column || literals_all_scalar) {
106
return (false, false);
107
}
108
109
can_pushdown_and_any_expr_has_column |= has_column
110
}
111
112
(true, can_pushdown_and_any_expr_has_column)
113
}
114
115
impl SlicePushDown {
116
// slice will be done at this node if we found any
117
// we also stop optimization
118
fn no_pushdown_finish_opt(
119
&self,
120
lp: IR,
121
state: Option<State>,
122
lp_arena: &mut Arena<IR>,
123
) -> PolarsResult<IR> {
124
match state {
125
Some(state) => {
126
let input = lp_arena.add(lp);
127
128
let lp = IR::Slice {
129
input,
130
offset: state.offset,
131
len: state.len,
132
};
133
Ok(lp)
134
},
135
None => Ok(lp),
136
}
137
}
138
139
/// slice will be done at this node, but we continue optimization
140
fn no_pushdown_restart_opt(
141
&mut self,
142
lp: IR,
143
state: Option<State>,
144
lp_arena: &mut Arena<IR>,
145
expr_arena: &mut Arena<AExpr>,
146
) -> PolarsResult<IR> {
147
let inputs = lp.get_inputs();
148
149
let new_inputs = inputs
150
.into_iter()
151
.map(|node| {
152
let alp = lp_arena.take(node);
153
// No state, so we do not push down the slice here.
154
let state = None;
155
let alp = self.pushdown(alp, state, lp_arena, expr_arena)?;
156
lp_arena.replace(node, alp);
157
Ok(node)
158
})
159
.collect::<PolarsResult<UnitVec<_>>>()?;
160
let lp = lp.with_inputs(new_inputs);
161
162
self.no_pushdown_finish_opt(lp, state, lp_arena)
163
}
164
165
/// slice will be pushed down.
166
fn pushdown_and_continue(
167
&mut self,
168
lp: IR,
169
state: Option<State>,
170
lp_arena: &mut Arena<IR>,
171
expr_arena: &mut Arena<AExpr>,
172
) -> PolarsResult<IR> {
173
let inputs = lp.get_inputs();
174
175
let new_inputs = inputs
176
.into_iter()
177
.map(|node| {
178
let alp = lp_arena.take(node);
179
let alp = self.pushdown(alp, state, lp_arena, expr_arena)?;
180
lp_arena.replace(node, alp);
181
Ok(node)
182
})
183
.collect::<PolarsResult<UnitVec<_>>>()?;
184
Ok(lp.with_inputs(new_inputs))
185
}
186
187
#[recursive]
188
fn pushdown(
189
&mut self,
190
lp: IR,
191
state: Option<State>,
192
lp_arena: &mut Arena<IR>,
193
expr_arena: &mut Arena<AExpr>,
194
) -> PolarsResult<IR> {
195
use IR::*;
196
197
match (lp, state) {
198
#[cfg(feature = "python")]
199
(PythonScan {
200
mut options,
201
},
202
// TODO! we currently skip slice pushdown if there is a predicate.
203
// we can modify the readers to only limit after predicates have been applied
204
Some(state)) if state.offset == 0 && matches!(options.predicate, PythonPredicate::None) => {
205
options.n_rows = Some(state.len as usize);
206
let lp = PythonScan {
207
options,
208
};
209
Ok(lp)
210
}
211
212
(Scan {
213
sources,
214
file_info,
215
hive_parts,
216
output_schema,
217
mut unified_scan_args,
218
predicate,
219
predicate_file_skip_applied,
220
scan_type,
221
}, Some(state)) if predicate.is_none() && match &*scan_type {
222
#[cfg(feature = "parquet")]
223
FileScanIR::Parquet { .. } => true,
224
225
#[cfg(feature = "ipc")]
226
FileScanIR::Ipc { .. } => true,
227
228
#[cfg(feature = "csv")]
229
FileScanIR::Csv { .. } => true,
230
231
#[cfg(feature = "json")]
232
FileScanIR::NDJson { .. } => true,
233
234
#[cfg(feature = "python")]
235
FileScanIR::PythonDataset { .. } => true,
236
237
#[cfg(feature = "scan_lines")]
238
FileScanIR::Lines { .. } => true,
239
240
// TODO: This can be `true` after Anonymous scan dispatches to new-streaming.
241
FileScanIR::Anonymous { .. } => state.offset == 0,
242
} => {
243
unified_scan_args.pre_slice = Some(state.to_slice_enum());
244
245
let lp = Scan {
246
sources,
247
file_info,
248
hive_parts,
249
output_schema,
250
scan_type,
251
unified_scan_args,
252
predicate,
253
predicate_file_skip_applied,
254
};
255
256
Ok(lp)
257
},
258
259
(DataFrameScan {df, schema, output_schema, }, Some(state)) => {
260
let df = df.slice(state.offset, state.len as usize);
261
let lp = DataFrameScan {
262
df: Arc::new(df),
263
schema,
264
output_schema,
265
};
266
Ok(lp)
267
}
268
(Union {mut inputs, mut options }, opt_state) => {
269
let subplan_slice: Option<State> = opt_state
270
.filter(|x| x.offset >= 0)
271
.and_then(|x| x.len.checked_add(x.offset.try_into().unwrap()))
272
.map(|len| State {
273
offset: 0,
274
len,
275
});
276
277
for input in &mut inputs {
278
let input_lp = lp_arena.take(*input);
279
let input_lp = self.pushdown(input_lp, subplan_slice, lp_arena, expr_arena)?;
280
lp_arena.replace(*input, input_lp);
281
}
282
options.slice = opt_state.map(|x| (x.offset, x.len.try_into().unwrap()));
283
let lp = Union {inputs, options};
284
Ok(lp)
285
},
286
(Join {
287
input_left,
288
input_right,
289
schema,
290
left_on,
291
right_on,
292
mut options
293
}, Some(state)) if !matches!(options.options, Some(JoinTypeOptionsIR::CrossAndFilter { .. })) => {
294
// first restart optimization in both inputs and get the updated LP
295
let lp_left = lp_arena.take(input_left);
296
let lp_left = self.pushdown(lp_left, None, lp_arena, expr_arena)?;
297
let input_left = lp_arena.add(lp_left);
298
299
let lp_right = lp_arena.take(input_right);
300
let lp_right = self.pushdown(lp_right, None, lp_arena, expr_arena)?;
301
let input_right = lp_arena.add(lp_right);
302
303
// then assign the slice state to the join operation
304
305
let mut_options = Arc::make_mut(&mut options);
306
mut_options.args.slice = Some((state.offset, state.len as usize));
307
308
Ok(Join {
309
input_left,
310
input_right,
311
schema,
312
left_on,
313
right_on,
314
options
315
})
316
}
317
(GroupBy { input, keys, aggs, schema, apply, maintain_order, mut options }, Some(state)) => {
318
// first restart optimization in inputs and get the updated LP
319
let input_lp = lp_arena.take(input);
320
let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?;
321
let input= lp_arena.add(input_lp);
322
323
let mut_options= Arc::make_mut(&mut options);
324
mut_options.slice = Some((state.offset, state.len as usize));
325
326
Ok(GroupBy {
327
input,
328
keys,
329
aggs,
330
schema,
331
apply,
332
maintain_order,
333
options
334
})
335
}
336
(Distinct {input, mut options}, Some(state)) => {
337
// first restart optimization in inputs and get the updated LP
338
let input_lp = lp_arena.take(input);
339
let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?;
340
let input= lp_arena.add(input_lp);
341
options.slice = Some((state.offset, state.len as usize));
342
Ok(Distinct {
343
input,
344
options,
345
})
346
}
347
(Sort {input, by_column, slice, sort_options}, Some(state)) => {
348
// The slice argument on Sort should be inserted by slice pushdown,
349
// so it shouldn't exist yet (or be idempotently the same).
350
let new_slice = Some((state.offset, state.len as usize));
351
assert!(slice.is_none() || slice == new_slice);
352
353
// first restart optimization in inputs and get the updated LP
354
let input_lp = lp_arena.take(input);
355
let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?;
356
let input = lp_arena.add(input_lp);
357
358
Ok(Sort {
359
input,
360
by_column,
361
slice: new_slice,
362
sort_options
363
})
364
}
365
(Slice {
366
input,
367
offset,
368
mut len
369
}, Some(outer_slice)) => {
370
let alp = lp_arena.take(input);
371
372
// Both are positive, can combine into a single slice.
373
if outer_slice.offset >= 0 && offset >= 0 {
374
let state = State {
375
offset: offset.checked_add(outer_slice.offset).unwrap(),
376
len: if len as i128 > outer_slice.offset as i128 {
377
(len - outer_slice.offset as IdxSize).min(outer_slice.len)
378
} else {
379
0
380
},
381
};
382
return self.pushdown(alp, Some(state), lp_arena, expr_arena);
383
}
384
385
// If offset is negative the length can never be greater than it.
386
if offset < 0 {
387
#[allow(clippy::unnecessary_cast)] // Necessary when IdxSize = u64.
388
if len as u64 > offset.unsigned_abs() as u64 {
389
len = offset.unsigned_abs() as IdxSize;
390
}
391
}
392
393
// Both are negative, can also combine (but not so simply).
394
if outer_slice.offset < 0 && offset < 0 {
395
// We use 128-bit arithmetic to avoid overflows, clamping at the end.
396
let inner_start_rel_end = offset as i128;
397
let inner_stop_rel_end = inner_start_rel_end + len as i128;
398
let naive_outer_start_rel_end = inner_stop_rel_end + outer_slice.offset as i128;
399
let naive_outer_stop_rel_end = naive_outer_start_rel_end + outer_slice.len as i128;
400
let clamped_outer_start_rel_end = naive_outer_start_rel_end.max(inner_start_rel_end);
401
let clamped_outer_stop_rel_end = naive_outer_stop_rel_end.max(clamped_outer_start_rel_end);
402
403
let state = State {
404
offset: clamped_outer_start_rel_end.clamp(i64::MIN as i128, i64::MAX as i128) as i64,
405
len: (clamped_outer_stop_rel_end - clamped_outer_start_rel_end).min(IdxSize::MAX as i128) as IdxSize,
406
};
407
return self.pushdown(alp, Some(state), lp_arena, expr_arena);
408
}
409
410
let inner_slice = Some(State { offset, len });
411
let lp = self.pushdown(alp, inner_slice, lp_arena, expr_arena)?;
412
let input = lp_arena.add(lp);
413
Ok(Slice {
414
input,
415
offset: outer_slice.offset,
416
len: outer_slice.len
417
})
418
}
419
(Slice {
420
input,
421
offset,
422
mut len
423
}, None) => {
424
let alp = lp_arena.take(input);
425
426
// If offset is negative the length can never be greater than it.
427
if offset < 0 {
428
#[allow(clippy::unnecessary_cast)] // Necessary when IdxSize = u64.
429
if len as u64 > offset.unsigned_abs() as u64 {
430
len = offset.unsigned_abs() as IdxSize;
431
}
432
}
433
434
let state = Some(State {
435
offset,
436
len
437
});
438
self.pushdown(alp, state, lp_arena, expr_arena)
439
}
440
// [Do not pushdown] boundary
441
// here we do not pushdown.
442
// we reset the state and then start the optimization again
443
m @ (Filter { .. }, _)
444
// other blocking nodes
445
| m @ (DataFrameScan {..}, _)
446
| m @ (Sort {..}, _)
447
| m @ (MapFunction {function: FunctionIR::Explode {..}, ..}, _)
448
| m @ (Cache {..}, _)
449
| m @ (Distinct {..}, _)
450
| m @ (GroupBy{..},_)
451
// blocking in streaming
452
| m @ (Join{..},_)
453
=> {
454
let (lp, state) = m;
455
self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)
456
},
457
#[cfg(feature = "pivot")]
458
m @ (MapFunction {function: FunctionIR::Unpivot {..}, ..}, _) => {
459
let (lp, state) = m;
460
self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)
461
},
462
// [Pushdown]
463
(MapFunction {input, function}, _) if function.allow_predicate_pd() => {
464
let lp = MapFunction {input, function};
465
self.pushdown_and_continue(lp, state, lp_arena, expr_arena)
466
},
467
// [NO Pushdown]
468
m @ (MapFunction {..}, _) => {
469
let (lp, state) = m;
470
self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)
471
}
472
// [Pushdown]
473
// these nodes will be pushed down.
474
// State is None, we can continue
475
m @ (Select {..}, None)
476
| m @ (HStack {..}, None)
477
| m @ (SimpleProjection {..}, _)
478
=> {
479
let (lp, state) = m;
480
self.pushdown_and_continue(lp, state, lp_arena, expr_arena)
481
}
482
// there is state, inspect the projection to determine how to deal with it
483
(Select {input, expr, schema, options}, Some(_)) => {
484
let maintain_errors = self.maintain_errors;
485
if can_pushdown_slice_past_projections(&expr, expr_arena, self.empty_nodes_scratch_mut(), maintain_errors).1 {
486
let lp = Select {input, expr, schema, options};
487
self.pushdown_and_continue(lp, state, lp_arena, expr_arena)
488
}
489
// don't push down slice, but restart optimization
490
else {
491
let lp = Select {input, expr, schema, options};
492
self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)
493
}
494
}
495
(HStack {input, exprs, schema, options}, _) => {
496
let maintain_errors = self.maintain_errors;
497
let (can_pushdown, can_pushdown_and_any_expr_has_column) = can_pushdown_slice_past_projections(&exprs, expr_arena, self.empty_nodes_scratch_mut(), maintain_errors);
498
499
if can_pushdown_and_any_expr_has_column || (
500
// If the schema length is greater then an input column is being projected, so
501
// the exprs in with_columns do not need to have an input column name.
502
schema.len() > exprs.len() && can_pushdown
503
)
504
{
505
let lp = HStack {input, exprs, schema, options};
506
self.pushdown_and_continue(lp, state, lp_arena, expr_arena)
507
}
508
// don't push down slice, but restart optimization
509
else {
510
let lp = HStack {input, exprs, schema, options};
511
self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)
512
}
513
}
514
(HConcat {inputs, schema, options}, _) => {
515
// Slice can always be pushed down for horizontal concatenation
516
let lp = HConcat {inputs, schema, options};
517
self.pushdown_and_continue(lp, state, lp_arena, expr_arena)
518
}
519
(lp @ Sink { .. }, _) | (lp @ SinkMultiple { .. }, _) => {
520
// Slice can always be pushed down for sinks
521
self.pushdown_and_continue(lp, state, lp_arena, expr_arena)
522
}
523
(catch_all, state) => {
524
self.no_pushdown_finish_opt(catch_all, state, lp_arena)
525
}
526
}
527
}
528
529
pub fn optimize(
530
&mut self,
531
logical_plan: IR,
532
lp_arena: &mut Arena<IR>,
533
expr_arena: &mut Arena<AExpr>,
534
) -> PolarsResult<IR> {
535
self.pushdown(logical_plan, None, lp_arena, expr_arena)
536
}
537
}
538
539