Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/ir/inputs.rs
6940 views
1
use std::iter;
2
3
use super::*;
4
5
impl IR {
6
/// Returns a node with updated expressions.
7
///
8
/// Panics if the expression count doesn't match
9
/// [`Self::exprs`]/[`Self::exprs_mut`]/[`Self::copy_exprs`].
10
pub fn with_exprs<E>(mut self, exprs: E) -> Self
11
where
12
E: IntoIterator<Item = ExprIR>,
13
{
14
let mut exprs_mut = self.exprs_mut();
15
let mut new_exprs = exprs.into_iter();
16
17
for (expr, new_expr) in exprs_mut.by_ref().zip(new_exprs.by_ref()) {
18
*expr = new_expr;
19
}
20
21
assert!(exprs_mut.next().is_none(), "not enough exprs");
22
assert!(new_exprs.next().is_none(), "too many exprs");
23
24
drop(exprs_mut);
25
26
self
27
}
28
29
/// Returns a node with updated inputs.
30
///
31
/// Panics if the input count doesn't match
32
/// [`Self::inputs`]/[`Self::inputs_mut`]/[`Self::copy_inputs`]/[`Self::get_inputs`].
33
pub fn with_inputs<I>(mut self, inputs: I) -> Self
34
where
35
I: IntoIterator<Item = Node>,
36
{
37
let mut inputs_mut = self.inputs_mut();
38
let mut new_inputs = inputs.into_iter();
39
40
for (input, new_input) in inputs_mut.by_ref().zip(new_inputs.by_ref()) {
41
*input = new_input;
42
}
43
44
assert!(inputs_mut.next().is_none(), "not enough inputs");
45
assert!(new_inputs.next().is_none(), "too many inputs");
46
47
drop(inputs_mut);
48
49
self
50
}
51
52
pub fn exprs(&'_ self) -> Exprs<'_> {
53
use IR::*;
54
match self {
55
Slice { .. } => Exprs::Empty,
56
Cache { .. } => Exprs::Empty,
57
Distinct { .. } => Exprs::Empty,
58
Union { .. } => Exprs::Empty,
59
MapFunction { .. } => Exprs::Empty,
60
DataFrameScan { .. } => Exprs::Empty,
61
HConcat { .. } => Exprs::Empty,
62
ExtContext { .. } => Exprs::Empty,
63
SimpleProjection { .. } => Exprs::Empty,
64
SinkMultiple { .. } => Exprs::Empty,
65
#[cfg(feature = "merge_sorted")]
66
MergeSorted { .. } => Exprs::Empty,
67
68
#[cfg(feature = "python")]
69
PythonScan { options } => match &options.predicate {
70
PythonPredicate::Polars(predicate) => Exprs::single(predicate),
71
_ => Exprs::Empty,
72
},
73
74
Scan { predicate, .. } => match predicate {
75
Some(predicate) => Exprs::single(predicate),
76
_ => Exprs::Empty,
77
},
78
79
Filter { predicate, .. } => Exprs::single(predicate),
80
81
Sort { by_column, .. } => Exprs::slice(by_column),
82
Select { expr, .. } => Exprs::slice(expr),
83
HStack { exprs, .. } => Exprs::slice(exprs),
84
85
GroupBy { keys, aggs, .. } => Exprs::double_slice(keys, aggs),
86
87
Join {
88
left_on,
89
right_on,
90
options,
91
..
92
} => match &options.options {
93
Some(JoinTypeOptionsIR::CrossAndFilter { predicate }) => Exprs::Boxed(Box::new(
94
left_on
95
.iter()
96
.chain(right_on.iter())
97
.chain(iter::once(predicate)),
98
)),
99
_ => Exprs::double_slice(left_on, right_on),
100
},
101
102
Sink { payload, .. } => match payload {
103
SinkTypeIR::Memory => Exprs::Empty,
104
SinkTypeIR::File(_) => Exprs::Empty,
105
SinkTypeIR::Partition(p) => {
106
let key_iter = match &p.variant {
107
PartitionVariantIR::Parted { key_exprs, .. }
108
| PartitionVariantIR::ByKey { key_exprs, .. } => key_exprs.iter(),
109
_ => [].iter(),
110
};
111
let sort_by_iter = match &p.per_partition_sort_by {
112
Some(sort_by) => sort_by.iter(),
113
_ => [].iter(),
114
}
115
.map(|s| &s.expr);
116
Exprs::Boxed(Box::new(key_iter.chain(sort_by_iter)))
117
},
118
},
119
120
Invalid => unreachable!(),
121
}
122
}
123
124
pub fn exprs_mut(&'_ mut self) -> ExprsMut<'_> {
125
use IR::*;
126
match self {
127
Slice { .. } => ExprsMut::Empty,
128
Cache { .. } => ExprsMut::Empty,
129
Distinct { .. } => ExprsMut::Empty,
130
Union { .. } => ExprsMut::Empty,
131
MapFunction { .. } => ExprsMut::Empty,
132
DataFrameScan { .. } => ExprsMut::Empty,
133
HConcat { .. } => ExprsMut::Empty,
134
ExtContext { .. } => ExprsMut::Empty,
135
SimpleProjection { .. } => ExprsMut::Empty,
136
SinkMultiple { .. } => ExprsMut::Empty,
137
#[cfg(feature = "merge_sorted")]
138
MergeSorted { .. } => ExprsMut::Empty,
139
140
#[cfg(feature = "python")]
141
PythonScan { options } => match &mut options.predicate {
142
PythonPredicate::Polars(predicate) => ExprsMut::single(predicate),
143
_ => ExprsMut::Empty,
144
},
145
146
Scan { predicate, .. } => match predicate {
147
Some(predicate) => ExprsMut::single(predicate),
148
_ => ExprsMut::Empty,
149
},
150
151
Filter { predicate, .. } => ExprsMut::single(predicate),
152
153
Sort { by_column, .. } => ExprsMut::slice(by_column),
154
Select { expr, .. } => ExprsMut::slice(expr),
155
HStack { exprs, .. } => ExprsMut::slice(exprs),
156
157
GroupBy { keys, aggs, .. } => ExprsMut::double_slice(keys, aggs),
158
159
Join {
160
left_on,
161
right_on,
162
options,
163
..
164
} => match Arc::make_mut(options).options.as_mut() {
165
Some(JoinTypeOptionsIR::CrossAndFilter { predicate }) => ExprsMut::Boxed(Box::new(
166
left_on
167
.iter_mut()
168
.chain(right_on.iter_mut())
169
.chain(iter::once(predicate)),
170
)),
171
_ => ExprsMut::double_slice(left_on, right_on),
172
},
173
174
Sink { payload, .. } => match payload {
175
SinkTypeIR::Memory => ExprsMut::Empty,
176
SinkTypeIR::File(_) => ExprsMut::Empty,
177
SinkTypeIR::Partition(p) => {
178
let key_iter = match &mut p.variant {
179
PartitionVariantIR::Parted { key_exprs, .. }
180
| PartitionVariantIR::ByKey { key_exprs, .. } => key_exprs.iter_mut(),
181
_ => [].iter_mut(),
182
};
183
let sort_by_iter = match &mut p.per_partition_sort_by {
184
Some(sort_by) => sort_by.iter_mut(),
185
_ => [].iter_mut(),
186
}
187
.map(|s| &mut s.expr);
188
ExprsMut::Boxed(Box::new(key_iter.chain(sort_by_iter)))
189
},
190
},
191
192
Invalid => unreachable!(),
193
}
194
}
195
196
/// Copy the exprs in this LP node to an existing container.
197
pub fn copy_exprs<T>(&self, container: &mut T)
198
where
199
T: Extend<ExprIR>,
200
{
201
container.extend(self.exprs().cloned())
202
}
203
204
pub fn inputs(&'_ self) -> Inputs<'_> {
205
use IR::*;
206
match self {
207
Union { inputs, .. } | HConcat { inputs, .. } | SinkMultiple { inputs } => {
208
Inputs::slice(inputs)
209
},
210
Slice { input, .. } => Inputs::single(*input),
211
Filter { input, .. } => Inputs::single(*input),
212
Select { input, .. } => Inputs::single(*input),
213
SimpleProjection { input, .. } => Inputs::single(*input),
214
Sort { input, .. } => Inputs::single(*input),
215
Cache { input, .. } => Inputs::single(*input),
216
GroupBy { input, .. } => Inputs::single(*input),
217
Join {
218
input_left,
219
input_right,
220
..
221
} => Inputs::double(*input_left, *input_right),
222
HStack { input, .. } => Inputs::single(*input),
223
Distinct { input, .. } => Inputs::single(*input),
224
MapFunction { input, .. } => Inputs::single(*input),
225
Sink { input, .. } => Inputs::single(*input),
226
ExtContext {
227
input, contexts, ..
228
} => Inputs::Boxed(Box::new(iter::once(*input).chain(contexts.iter().copied()))),
229
Scan { .. } => Inputs::Empty,
230
DataFrameScan { .. } => Inputs::Empty,
231
#[cfg(feature = "python")]
232
PythonScan { .. } => Inputs::Empty,
233
#[cfg(feature = "merge_sorted")]
234
MergeSorted {
235
input_left,
236
input_right,
237
..
238
} => Inputs::double(*input_left, *input_right),
239
Invalid => unreachable!(),
240
}
241
}
242
243
pub fn inputs_mut(&'_ mut self) -> InputsMut<'_> {
244
use IR::*;
245
match self {
246
Union { inputs, .. } | HConcat { inputs, .. } | SinkMultiple { inputs } => {
247
InputsMut::slice(inputs)
248
},
249
Slice { input, .. } => InputsMut::single(input),
250
Filter { input, .. } => InputsMut::single(input),
251
Select { input, .. } => InputsMut::single(input),
252
SimpleProjection { input, .. } => InputsMut::single(input),
253
Sort { input, .. } => InputsMut::single(input),
254
Cache { input, .. } => InputsMut::single(input),
255
GroupBy { input, .. } => InputsMut::single(input),
256
Join {
257
input_left,
258
input_right,
259
..
260
} => InputsMut::double(input_left, input_right),
261
HStack { input, .. } => InputsMut::single(input),
262
Distinct { input, .. } => InputsMut::single(input),
263
MapFunction { input, .. } => InputsMut::single(input),
264
Sink { input, .. } => InputsMut::single(input),
265
ExtContext {
266
input, contexts, ..
267
} => InputsMut::Boxed(Box::new(iter::once(input).chain(contexts.iter_mut()))),
268
Scan { .. } => InputsMut::Empty,
269
DataFrameScan { .. } => InputsMut::Empty,
270
#[cfg(feature = "python")]
271
PythonScan { .. } => InputsMut::Empty,
272
#[cfg(feature = "merge_sorted")]
273
MergeSorted {
274
input_left,
275
input_right,
276
..
277
} => InputsMut::double(input_left, input_right),
278
Invalid => unreachable!(),
279
}
280
}
281
282
/// Push inputs of the LP in of this node to an existing container.
283
/// Most plans have typically one input. A join has two and a scan (CsvScan)
284
/// or an in-memory DataFrame has none. A Union has multiple.
285
pub fn copy_inputs<T>(&self, container: &mut T)
286
where
287
T: Extend<Node>,
288
{
289
container.extend(self.inputs())
290
}
291
292
pub fn get_inputs(&self) -> UnitVec<Node> {
293
self.inputs().collect()
294
}
295
296
pub(crate) fn get_input(&self) -> Option<Node> {
297
self.inputs().next()
298
}
299
}
300
301
pub enum Inputs<'a> {
302
Empty,
303
Single(iter::Once<Node>),
304
Double(std::array::IntoIter<Node, 2>),
305
Slice(iter::Copied<std::slice::Iter<'a, Node>>),
306
Boxed(Box<dyn Iterator<Item = Node> + 'a>),
307
}
308
309
impl<'a> Inputs<'a> {
310
fn single(node: Node) -> Self {
311
Self::Single(iter::once(node))
312
}
313
314
fn double(left: Node, right: Node) -> Self {
315
Self::Double([left, right].into_iter())
316
}
317
318
fn slice(inputs: &'a [Node]) -> Self {
319
Self::Slice(inputs.iter().copied())
320
}
321
}
322
323
impl<'a> Iterator for Inputs<'a> {
324
type Item = Node;
325
326
fn next(&mut self) -> Option<Self::Item> {
327
match self {
328
Self::Empty => None,
329
Self::Single(it) => it.next(),
330
Self::Double(it) => it.next(),
331
Self::Slice(it) => it.next(),
332
Self::Boxed(it) => it.next(),
333
}
334
}
335
}
336
337
pub enum InputsMut<'a> {
338
Empty,
339
Single(iter::Once<&'a mut Node>),
340
Double(std::array::IntoIter<&'a mut Node, 2>),
341
Slice(std::slice::IterMut<'a, Node>),
342
Boxed(Box<dyn Iterator<Item = &'a mut Node> + 'a>),
343
}
344
345
impl<'a> InputsMut<'a> {
346
fn single(node: &'a mut Node) -> Self {
347
Self::Single(iter::once(node))
348
}
349
350
fn double(left: &'a mut Node, right: &'a mut Node) -> Self {
351
Self::Double([left, right].into_iter())
352
}
353
354
fn slice(inputs: &'a mut [Node]) -> Self {
355
Self::Slice(inputs.iter_mut())
356
}
357
}
358
359
impl<'a> Iterator for InputsMut<'a> {
360
type Item = &'a mut Node;
361
362
fn next(&mut self) -> Option<Self::Item> {
363
match self {
364
Self::Empty => None,
365
Self::Single(it) => it.next(),
366
Self::Double(it) => it.next(),
367
Self::Slice(it) => it.next(),
368
Self::Boxed(it) => it.next(),
369
}
370
}
371
}
372
373
pub enum Exprs<'a> {
374
Empty,
375
Single(iter::Once<&'a ExprIR>),
376
Slice(std::slice::Iter<'a, ExprIR>),
377
DoubleSlice(iter::Chain<std::slice::Iter<'a, ExprIR>, std::slice::Iter<'a, ExprIR>>),
378
Boxed(Box<dyn Iterator<Item = &'a ExprIR> + 'a>),
379
}
380
381
impl<'a> Exprs<'a> {
382
fn single(expr: &'a ExprIR) -> Self {
383
Self::Single(iter::once(expr))
384
}
385
386
fn slice(inputs: &'a [ExprIR]) -> Self {
387
Self::Slice(inputs.iter())
388
}
389
390
fn double_slice(left: &'a [ExprIR], right: &'a [ExprIR]) -> Self {
391
Self::DoubleSlice(left.iter().chain(right.iter()))
392
}
393
}
394
395
impl<'a> Iterator for Exprs<'a> {
396
type Item = &'a ExprIR;
397
398
fn next(&mut self) -> Option<Self::Item> {
399
match self {
400
Self::Empty => None,
401
Self::Single(it) => it.next(),
402
Self::Slice(it) => it.next(),
403
Self::DoubleSlice(it) => it.next(),
404
Self::Boxed(it) => it.next(),
405
}
406
}
407
}
408
409
pub enum ExprsMut<'a> {
410
Empty,
411
Single(iter::Once<&'a mut ExprIR>),
412
Slice(std::slice::IterMut<'a, ExprIR>),
413
DoubleSlice(iter::Chain<std::slice::IterMut<'a, ExprIR>, std::slice::IterMut<'a, ExprIR>>),
414
Boxed(Box<dyn Iterator<Item = &'a mut ExprIR> + 'a>),
415
}
416
417
impl<'a> ExprsMut<'a> {
418
fn single(expr: &'a mut ExprIR) -> Self {
419
Self::Single(iter::once(expr))
420
}
421
422
fn slice(inputs: &'a mut [ExprIR]) -> Self {
423
Self::Slice(inputs.iter_mut())
424
}
425
426
fn double_slice(left: &'a mut [ExprIR], right: &'a mut [ExprIR]) -> Self {
427
Self::DoubleSlice(left.iter_mut().chain(right.iter_mut()))
428
}
429
}
430
431
impl<'a> Iterator for ExprsMut<'a> {
432
type Item = &'a mut ExprIR;
433
434
fn next(&mut self) -> Option<Self::Item> {
435
match self {
436
Self::Empty => None,
437
Self::Single(it) => it.next(),
438
Self::Slice(it) => it.next(),
439
Self::DoubleSlice(it) => it.next(),
440
Self::Boxed(it) => it.next(),
441
}
442
}
443
}
444
445