Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/lazyframe/visitor/nodes.rs
7890 views
1
#[cfg(feature = "iejoin")]
2
use polars::prelude::JoinTypeOptionsIR;
3
use polars::prelude::deletion::DeletionFilesList;
4
use polars::prelude::python_dsl::PythonScanSource;
5
use polars::prelude::{ColumnMapping, PredicateFileSkip};
6
use polars_core::prelude::IdxSize;
7
use polars_io::cloud::CloudOptions;
8
use polars_ops::prelude::JoinType;
9
use polars_plan::plans::IR;
10
use polars_plan::prelude::{FileScanIR, FunctionIR, PythonPredicate, UnifiedScanArgs};
11
use pyo3::IntoPyObjectExt;
12
use pyo3::exceptions::{PyNotImplementedError, PyValueError};
13
use pyo3::prelude::*;
14
use pyo3::types::{PyDict, PyList, PyString};
15
16
use super::expr_nodes::PyGroupbyOptions;
17
use crate::PyDataFrame;
18
use crate::lazyframe::visit::PyExprIR;
19
20
fn scan_type_to_pyobject(
21
py: Python<'_>,
22
scan_type: &FileScanIR,
23
cloud_options: &Option<CloudOptions>,
24
) -> PyResult<Py<PyAny>> {
25
match scan_type {
26
#[cfg(feature = "csv")]
27
FileScanIR::Csv { options } => {
28
let options = serde_json::to_string(options)
29
.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
30
let cloud_options = serde_json::to_string(cloud_options)
31
.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
32
Ok(("csv", options, cloud_options).into_py_any(py)?)
33
},
34
#[cfg(feature = "parquet")]
35
FileScanIR::Parquet { options, .. } => {
36
let options = serde_json::to_string(options)
37
.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
38
let cloud_options = serde_json::to_string(cloud_options)
39
.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
40
Ok(("parquet", options, cloud_options).into_py_any(py)?)
41
},
42
#[cfg(feature = "ipc")]
43
FileScanIR::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),
44
#[cfg(feature = "json")]
45
FileScanIR::NDJson { options, .. } => {
46
let options = serde_json::to_string(options)
47
.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
48
Ok(("ndjson", options).into_py_any(py)?)
49
},
50
#[cfg(feature = "scan_lines")]
51
FileScanIR::Lines { name } => Ok(("lines", name.as_str()).into_py_any(py)?),
52
FileScanIR::PythonDataset { .. } => {
53
Err(PyNotImplementedError::new_err("python dataset scan"))
54
},
55
FileScanIR::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),
56
}
57
}
58
59
#[pyclass(frozen)]
60
/// Scan a table with an optional predicate from a python function
61
pub struct PythonScan {
62
#[pyo3(get)]
63
options: Py<PyAny>,
64
}
65
66
#[pyclass(frozen)]
67
/// Slice the table
68
pub struct Slice {
69
#[pyo3(get)]
70
input: usize,
71
#[pyo3(get)]
72
offset: i64,
73
#[pyo3(get)]
74
len: IdxSize,
75
}
76
77
#[pyclass(frozen)]
78
/// Filter the table with a boolean expression
79
pub struct Filter {
80
#[pyo3(get)]
81
input: usize,
82
#[pyo3(get)]
83
predicate: PyExprIR,
84
}
85
86
#[pyclass(frozen)]
87
#[derive(Clone)]
88
pub struct PyFileOptions {
89
inner: UnifiedScanArgs,
90
}
91
92
#[pymethods]
93
impl PyFileOptions {
94
#[getter]
95
fn n_rows(&self) -> Option<(i64, usize)> {
96
self.inner
97
.pre_slice
98
.clone()
99
.map(|slice| <(i64, usize)>::try_from(slice).unwrap())
100
}
101
#[getter]
102
fn with_columns(&self) -> Option<Vec<&str>> {
103
self.inner
104
.projection
105
.as_ref()?
106
.iter()
107
.map(|x| x.as_str())
108
.collect::<Vec<_>>()
109
.into()
110
}
111
#[getter]
112
fn cache(&self, _py: Python<'_>) -> bool {
113
self.inner.cache
114
}
115
#[getter]
116
fn row_index(&self) -> Option<(&str, IdxSize)> {
117
self.inner
118
.row_index
119
.as_ref()
120
.map(|n| (n.name.as_str(), n.offset))
121
}
122
#[getter]
123
fn rechunk(&self, _py: Python<'_>) -> bool {
124
self.inner.rechunk
125
}
126
#[getter]
127
fn hive_options(&self, _py: Python<'_>) -> PyResult<Py<PyAny>> {
128
Err(PyNotImplementedError::new_err("hive options"))
129
}
130
#[getter]
131
fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {
132
self.inner.include_file_paths.as_deref()
133
}
134
135
/// One of:
136
/// * None
137
/// * ("iceberg-position-delete", dict[int, list[str]])
138
#[getter]
139
fn deletion_files(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
140
Ok(match &self.inner.deletion_files {
141
None => py.None().into_any(),
142
143
Some(DeletionFilesList::IcebergPositionDelete(paths)) => {
144
let out = PyDict::new(py);
145
146
for (k, v) in paths.iter() {
147
out.set_item(*k, v.as_ref())?;
148
}
149
150
("iceberg-position-delete", out)
151
.into_pyobject(py)?
152
.into_any()
153
.unbind()
154
},
155
})
156
}
157
158
/// One of:
159
/// * None
160
/// * ("iceberg-column-mapping", <unimplemented>)
161
#[getter]
162
fn column_mapping(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
163
Ok(match &self.inner.column_mapping {
164
None => py.None().into_any(),
165
166
Some(ColumnMapping::Iceberg { .. }) => unimplemented!(),
167
})
168
}
169
}
170
171
#[pyclass(frozen)]
172
/// Scan a table from file
173
pub struct Scan {
174
#[pyo3(get)]
175
paths: Py<PyAny>,
176
#[pyo3(get)]
177
file_info: Py<PyAny>,
178
#[pyo3(get)]
179
predicate: Option<PyExprIR>,
180
#[pyo3(get)]
181
file_options: PyFileOptions,
182
#[pyo3(get)]
183
scan_type: Py<PyAny>,
184
}
185
186
#[pyclass(frozen)]
187
/// Scan a table from an existing dataframe
188
pub struct DataFrameScan {
189
#[pyo3(get)]
190
df: PyDataFrame,
191
#[pyo3(get)]
192
projection: Py<PyAny>,
193
#[pyo3(get)]
194
selection: Option<PyExprIR>,
195
}
196
197
#[pyclass(frozen)]
198
/// Project out columns from a table
199
pub struct SimpleProjection {
200
#[pyo3(get)]
201
input: usize,
202
}
203
204
#[pyclass(frozen)]
205
/// Column selection
206
pub struct Select {
207
#[pyo3(get)]
208
input: usize,
209
#[pyo3(get)]
210
expr: Vec<PyExprIR>,
211
#[pyo3(get)]
212
should_broadcast: bool,
213
}
214
215
#[pyclass(frozen)]
216
/// Sort the table
217
pub struct Sort {
218
#[pyo3(get)]
219
input: usize,
220
#[pyo3(get)]
221
by_column: Vec<PyExprIR>,
222
#[pyo3(get)]
223
sort_options: (bool, Vec<bool>, Vec<bool>),
224
#[pyo3(get)]
225
slice: Option<(i64, usize)>,
226
}
227
228
#[pyclass(frozen)]
229
/// Cache the input at this point in the LP
230
pub struct Cache {
231
#[pyo3(get)]
232
input: usize,
233
#[pyo3(get)]
234
id_: u128,
235
}
236
237
#[pyclass(frozen)]
238
/// Groupby aggregation
239
pub struct GroupBy {
240
#[pyo3(get)]
241
input: usize,
242
#[pyo3(get)]
243
keys: Vec<PyExprIR>,
244
#[pyo3(get)]
245
aggs: Vec<PyExprIR>,
246
#[pyo3(get)]
247
apply: (),
248
#[pyo3(get)]
249
maintain_order: bool,
250
#[pyo3(get)]
251
options: Py<PyAny>,
252
}
253
254
#[pyclass(frozen)]
255
/// Join operation
256
pub struct Join {
257
#[pyo3(get)]
258
input_left: usize,
259
#[pyo3(get)]
260
input_right: usize,
261
#[pyo3(get)]
262
left_on: Vec<PyExprIR>,
263
#[pyo3(get)]
264
right_on: Vec<PyExprIR>,
265
#[pyo3(get)]
266
options: Py<PyAny>,
267
}
268
269
#[pyclass(frozen)]
270
/// Merge sorted operation
271
pub struct MergeSorted {
272
#[pyo3(get)]
273
input_left: usize,
274
#[pyo3(get)]
275
input_right: usize,
276
#[pyo3(get)]
277
key: String,
278
}
279
280
#[pyclass(frozen)]
281
/// Adding columns to the table without a Join
282
pub struct HStack {
283
#[pyo3(get)]
284
input: usize,
285
#[pyo3(get)]
286
exprs: Vec<PyExprIR>,
287
#[pyo3(get)]
288
should_broadcast: bool,
289
}
290
291
#[pyclass(frozen)]
292
/// Like Select, but all operations produce a single row.
293
pub struct Reduce {
294
#[pyo3(get)]
295
input: usize,
296
#[pyo3(get)]
297
exprs: Vec<PyExprIR>,
298
}
299
300
#[pyclass(frozen)]
301
/// Remove duplicates from the table
302
pub struct Distinct {
303
#[pyo3(get)]
304
input: usize,
305
#[pyo3(get)]
306
options: Py<PyAny>,
307
}
308
#[pyclass(frozen)]
309
/// A (User Defined) Function
310
pub struct MapFunction {
311
#[pyo3(get)]
312
input: usize,
313
#[pyo3(get)]
314
function: Py<PyAny>,
315
}
316
#[pyclass(frozen)]
317
pub struct Union {
318
#[pyo3(get)]
319
inputs: Vec<usize>,
320
#[pyo3(get)]
321
options: Option<(i64, usize)>,
322
}
323
#[pyclass(frozen)]
324
/// Horizontal concatenation of multiple plans
325
pub struct HConcat {
326
#[pyo3(get)]
327
inputs: Vec<usize>,
328
#[pyo3(get)]
329
options: (),
330
}
331
#[pyclass(frozen)]
332
/// This allows expressions to access other tables
333
pub struct ExtContext {
334
#[pyo3(get)]
335
input: usize,
336
#[pyo3(get)]
337
contexts: Vec<usize>,
338
}
339
340
#[pyclass(frozen)]
341
pub struct Sink {
342
#[pyo3(get)]
343
input: usize,
344
#[pyo3(get)]
345
payload: Py<PyAny>,
346
}
347
348
pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<Py<PyAny>> {
349
match plan {
350
IR::PythonScan { options } => {
351
let python_src = match options.python_source {
352
PythonScanSource::Pyarrow => "pyarrow",
353
PythonScanSource::Cuda => "cuda",
354
PythonScanSource::IOPlugin => "io_plugin",
355
};
356
357
PythonScan {
358
options: (
359
options
360
.scan_fn
361
.as_ref()
362
.map_or_else(|| py.None(), |s| s.0.clone_ref(py)),
363
options.with_columns.as_ref().map_or_else(
364
|| Ok(py.None()),
365
|cols| {
366
cols.iter()
367
.map(|x| x.as_str())
368
.collect::<Vec<_>>()
369
.into_py_any(py)
370
},
371
)?,
372
python_src,
373
match &options.predicate {
374
PythonPredicate::None => py.None(),
375
PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,
376
PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,
377
},
378
options
379
.n_rows
380
.map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,
381
)
382
.into_py_any(py)?,
383
}
384
.into_py_any(py)
385
},
386
IR::Slice { input, offset, len } => Slice {
387
input: input.0,
388
offset: *offset,
389
len: *len,
390
}
391
.into_py_any(py),
392
IR::Filter { input, predicate } => Filter {
393
input: input.0,
394
predicate: predicate.into(),
395
}
396
.into_py_any(py),
397
IR::Scan {
398
hive_parts: Some(_),
399
..
400
} => Err(PyNotImplementedError::new_err(
401
"scan with hive partitioning",
402
)),
403
IR::Scan {
404
sources,
405
file_info: _,
406
hive_parts: _,
407
predicate,
408
predicate_file_skip_applied,
409
output_schema: _,
410
scan_type,
411
unified_scan_args,
412
} => {
413
Scan {
414
paths: {
415
let paths = sources
416
.into_paths()
417
.ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?;
418
419
let out = PyList::new(py, [] as [(); 0])?;
420
421
// Manual conversion to preserve `uri://...` - converting Rust `Path` to `PosixPath`
422
// will corrupt to `uri:/...`
423
for path in paths.iter() {
424
out.append(path.to_str())?;
425
}
426
427
out.into_py_any(py)?
428
},
429
// TODO: file info
430
file_info: py.None(),
431
predicate: predicate
432
.as_ref()
433
.filter(|_| {
434
!matches!(
435
predicate_file_skip_applied,
436
Some(PredicateFileSkip {
437
no_residual_predicate: true,
438
original_len: _,
439
})
440
)
441
})
442
.map(|e| e.into()),
443
file_options: PyFileOptions {
444
inner: (**unified_scan_args).clone(),
445
},
446
scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,
447
}
448
}
449
.into_py_any(py),
450
IR::DataFrameScan {
451
df,
452
schema: _,
453
output_schema,
454
} => DataFrameScan {
455
df: PyDataFrame::new((**df).clone()),
456
projection: output_schema.as_ref().map_or_else(
457
|| Ok(py.None()),
458
|s| {
459
s.iter_names()
460
.map(|s| s.as_str())
461
.collect::<Vec<_>>()
462
.into_py_any(py)
463
},
464
)?,
465
selection: None,
466
}
467
.into_py_any(py),
468
IR::SimpleProjection { input, columns: _ } => {
469
SimpleProjection { input: input.0 }.into_py_any(py)
470
},
471
IR::Select {
472
input,
473
expr,
474
schema: _,
475
options,
476
} => Select {
477
expr: expr.iter().map(|e| e.into()).collect(),
478
input: input.0,
479
should_broadcast: options.should_broadcast,
480
}
481
.into_py_any(py),
482
IR::Sort {
483
input,
484
by_column,
485
slice,
486
sort_options,
487
} => Sort {
488
input: input.0,
489
by_column: by_column.iter().map(|e| e.into()).collect(),
490
sort_options: (
491
sort_options.maintain_order,
492
sort_options.nulls_last.clone(),
493
sort_options.descending.clone(),
494
),
495
slice: *slice,
496
}
497
.into_py_any(py),
498
IR::Cache { input, id } => Cache {
499
input: input.0,
500
id_: id.as_u128(),
501
}
502
.into_py_any(py),
503
IR::GroupBy {
504
input,
505
keys,
506
aggs,
507
schema: _,
508
apply,
509
maintain_order,
510
options,
511
} => GroupBy {
512
input: input.0,
513
keys: keys.iter().map(|e| e.into()).collect(),
514
aggs: aggs.iter().map(|e| e.into()).collect(),
515
apply: apply.as_ref().map_or(Ok(()), |_| {
516
Err(PyNotImplementedError::new_err(format!(
517
"apply inside GroupBy {plan:?}"
518
)))
519
})?,
520
maintain_order: *maintain_order,
521
options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,
522
}
523
.into_py_any(py),
524
IR::Join {
525
input_left,
526
input_right,
527
schema: _,
528
left_on,
529
right_on,
530
options,
531
} => Join {
532
input_left: input_left.0,
533
input_right: input_right.0,
534
left_on: left_on.iter().map(|e| e.into()).collect(),
535
right_on: right_on.iter().map(|e| e.into()).collect(),
536
options: {
537
let how = &options.args.how;
538
let name = Into::<&str>::into(how).into_pyobject(py)?;
539
(
540
match how {
541
#[cfg(feature = "asof_join")]
542
JoinType::AsOf(_) => {
543
return Err(PyNotImplementedError::new_err("asof join"));
544
},
545
#[cfg(feature = "iejoin")]
546
JoinType::IEJoin => {
547
let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options
548
else {
549
unreachable!()
550
};
551
(
552
name,
553
crate::Wrap(ie_options.operator1).into_py_any(py)?,
554
ie_options.operator2.as_ref().map_or_else(
555
|| Ok(py.None()),
556
|op| crate::Wrap(*op).into_py_any(py),
557
)?,
558
)
559
.into_py_any(py)?
560
},
561
// This is a cross join fused with a predicate. Shown in the IR::explain as
562
// NESTED LOOP JOIN
563
JoinType::Cross if options.options.is_some() => {
564
return Err(PyNotImplementedError::new_err("nested loop join"));
565
},
566
_ => name.into_any().unbind(),
567
},
568
options.args.nulls_equal,
569
options.args.slice,
570
options.args.suffix().as_str(),
571
options.args.coalesce.coalesce(how),
572
Into::<&str>::into(options.args.maintain_order),
573
)
574
.into_py_any(py)?
575
},
576
}
577
.into_py_any(py),
578
IR::HStack {
579
input,
580
exprs,
581
schema: _,
582
options,
583
} => HStack {
584
input: input.0,
585
exprs: exprs.iter().map(|e| e.into()).collect(),
586
should_broadcast: options.should_broadcast,
587
}
588
.into_py_any(py),
589
IR::Distinct { input, options } => Distinct {
590
input: input.0,
591
options: (
592
Into::<&str>::into(options.keep_strategy),
593
options.subset.as_ref().map_or_else(
594
|| Ok(py.None()),
595
|f| {
596
f.iter()
597
.map(|s| s.as_ref())
598
.collect::<Vec<&str>>()
599
.into_py_any(py)
600
},
601
)?,
602
options.maintain_order,
603
options.slice,
604
)
605
.into_py_any(py)?,
606
}
607
.into_py_any(py),
608
IR::MapFunction { input, function } => MapFunction {
609
input: input.0,
610
function: match function {
611
FunctionIR::OpaquePython(_) => {
612
return Err(PyNotImplementedError::new_err("opaque python mapfunction"));
613
},
614
FunctionIR::Opaque {
615
function: _,
616
schema: _,
617
predicate_pd: _,
618
projection_pd: _,
619
streamable: _,
620
fmt_str: _,
621
} => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),
622
FunctionIR::Unnest { columns, separator } => (
623
"unnest",
624
columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
625
separator.as_ref().map(|s| s.to_string()),
626
)
627
.into_py_any(py)?,
628
FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,
629
FunctionIR::Explode {
630
columns,
631
options,
632
schema: _,
633
} => (
634
"explode",
635
columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
636
options.empty_as_null,
637
options.keep_nulls,
638
)
639
.into_py_any(py)?,
640
#[cfg(feature = "pivot")]
641
FunctionIR::Unpivot { args, schema: _ } => (
642
"unpivot",
643
args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
644
args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
645
args.variable_name.as_str().into_py_any(py)?,
646
args.value_name.as_str().into_py_any(py)?,
647
)
648
.into_py_any(py)?,
649
FunctionIR::RowIndex {
650
name,
651
schema: _,
652
offset,
653
} => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,
654
FunctionIR::FastCount {
655
sources,
656
scan_type,
657
cloud_options,
658
alias,
659
} => {
660
let sources = sources
661
.into_paths()
662
.ok_or_else(|| {
663
PyNotImplementedError::new_err("FastCount with BytesIO sources")
664
})?
665
.iter()
666
.map(|p| p.to_str())
667
.collect::<Vec<_>>()
668
.into_py_any(py)?;
669
670
let scan_type = scan_type_to_pyobject(py, scan_type, cloud_options)?;
671
672
let alias = alias
673
.as_ref()
674
.map(|a| a.as_str())
675
.map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;
676
677
("fast_count", sources, scan_type, alias).into_py_any(py)?
678
},
679
FunctionIR::Hint(_) => return Err(PyNotImplementedError::new_err("hint ir")),
680
},
681
}
682
.into_py_any(py),
683
IR::Union { inputs, options } => Union {
684
inputs: inputs.iter().map(|n| n.0).collect(),
685
// TODO: rest of options
686
options: options.slice,
687
}
688
.into_py_any(py),
689
IR::HConcat {
690
inputs,
691
schema: _,
692
options: _,
693
} => HConcat {
694
inputs: inputs.iter().map(|n| n.0).collect(),
695
options: (),
696
}
697
.into_py_any(py),
698
IR::ExtContext {
699
input,
700
contexts,
701
schema: _,
702
} => ExtContext {
703
input: input.0,
704
contexts: contexts.iter().map(|n| n.0).collect(),
705
}
706
.into_py_any(py),
707
IR::Sink { input, payload } => Sink {
708
input: input.0,
709
payload: PyString::new(
710
py,
711
&serde_json::to_string(payload)
712
.map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
713
)
714
.into(),
715
}
716
.into_py_any(py),
717
IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(
718
"Not expecting to see a SinkMultiple node",
719
)),
720
#[cfg(feature = "merge_sorted")]
721
IR::MergeSorted {
722
input_left,
723
input_right,
724
key,
725
} => MergeSorted {
726
input_left: input_left.0,
727
input_right: input_right.0,
728
key: key.to_string(),
729
}
730
.into_py_any(py),
731
IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
732
}
733
}
734
735