Path: blob/main/crates/polars-python/src/lazyframe/visitor/nodes.rs
7890 views
#[cfg(feature = "iejoin")]1use polars::prelude::JoinTypeOptionsIR;2use polars::prelude::deletion::DeletionFilesList;3use polars::prelude::python_dsl::PythonScanSource;4use polars::prelude::{ColumnMapping, PredicateFileSkip};5use polars_core::prelude::IdxSize;6use polars_io::cloud::CloudOptions;7use polars_ops::prelude::JoinType;8use polars_plan::plans::IR;9use polars_plan::prelude::{FileScanIR, FunctionIR, PythonPredicate, UnifiedScanArgs};10use pyo3::IntoPyObjectExt;11use pyo3::exceptions::{PyNotImplementedError, PyValueError};12use pyo3::prelude::*;13use pyo3::types::{PyDict, PyList, PyString};1415use super::expr_nodes::PyGroupbyOptions;16use crate::PyDataFrame;17use crate::lazyframe::visit::PyExprIR;1819fn scan_type_to_pyobject(20py: Python<'_>,21scan_type: &FileScanIR,22cloud_options: &Option<CloudOptions>,23) -> PyResult<Py<PyAny>> {24match scan_type {25#[cfg(feature = "csv")]26FileScanIR::Csv { options } => {27let options = serde_json::to_string(options)28.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;29let cloud_options = serde_json::to_string(cloud_options)30.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;31Ok(("csv", options, cloud_options).into_py_any(py)?)32},33#[cfg(feature = "parquet")]34FileScanIR::Parquet { options, .. } => {35let options = serde_json::to_string(options)36.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;37let cloud_options = serde_json::to_string(cloud_options)38.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;39Ok(("parquet", options, cloud_options).into_py_any(py)?)40},41#[cfg(feature = "ipc")]42FileScanIR::Ipc { .. } => Err(PyNotImplementedError::new_err("ipc scan")),43#[cfg(feature = "json")]44FileScanIR::NDJson { options, .. } => {45let options = serde_json::to_string(options)46.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;47Ok(("ndjson", options).into_py_any(py)?)48},49#[cfg(feature = "scan_lines")]50FileScanIR::Lines { name } => Ok(("lines", name.as_str()).into_py_any(py)?),51FileScanIR::PythonDataset { .. } => {52Err(PyNotImplementedError::new_err("python dataset scan"))53},54FileScanIR::Anonymous { .. } => Err(PyNotImplementedError::new_err("anonymous scan")),55}56}5758#[pyclass(frozen)]59/// Scan a table with an optional predicate from a python function60pub struct PythonScan {61#[pyo3(get)]62options: Py<PyAny>,63}6465#[pyclass(frozen)]66/// Slice the table67pub struct Slice {68#[pyo3(get)]69input: usize,70#[pyo3(get)]71offset: i64,72#[pyo3(get)]73len: IdxSize,74}7576#[pyclass(frozen)]77/// Filter the table with a boolean expression78pub struct Filter {79#[pyo3(get)]80input: usize,81#[pyo3(get)]82predicate: PyExprIR,83}8485#[pyclass(frozen)]86#[derive(Clone)]87pub struct PyFileOptions {88inner: UnifiedScanArgs,89}9091#[pymethods]92impl PyFileOptions {93#[getter]94fn n_rows(&self) -> Option<(i64, usize)> {95self.inner96.pre_slice97.clone()98.map(|slice| <(i64, usize)>::try_from(slice).unwrap())99}100#[getter]101fn with_columns(&self) -> Option<Vec<&str>> {102self.inner103.projection104.as_ref()?105.iter()106.map(|x| x.as_str())107.collect::<Vec<_>>()108.into()109}110#[getter]111fn cache(&self, _py: Python<'_>) -> bool {112self.inner.cache113}114#[getter]115fn row_index(&self) -> Option<(&str, IdxSize)> {116self.inner117.row_index118.as_ref()119.map(|n| (n.name.as_str(), n.offset))120}121#[getter]122fn rechunk(&self, _py: Python<'_>) -> bool {123self.inner.rechunk124}125#[getter]126fn hive_options(&self, _py: Python<'_>) -> PyResult<Py<PyAny>> {127Err(PyNotImplementedError::new_err("hive options"))128}129#[getter]130fn include_file_paths(&self, _py: Python<'_>) -> Option<&str> {131self.inner.include_file_paths.as_deref()132}133134/// One of:135/// * None136/// * ("iceberg-position-delete", dict[int, list[str]])137#[getter]138fn deletion_files(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {139Ok(match &self.inner.deletion_files {140None => py.None().into_any(),141142Some(DeletionFilesList::IcebergPositionDelete(paths)) => {143let out = PyDict::new(py);144145for (k, v) in paths.iter() {146out.set_item(*k, v.as_ref())?;147}148149("iceberg-position-delete", out)150.into_pyobject(py)?151.into_any()152.unbind()153},154})155}156157/// One of:158/// * None159/// * ("iceberg-column-mapping", <unimplemented>)160#[getter]161fn column_mapping(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {162Ok(match &self.inner.column_mapping {163None => py.None().into_any(),164165Some(ColumnMapping::Iceberg { .. }) => unimplemented!(),166})167}168}169170#[pyclass(frozen)]171/// Scan a table from file172pub struct Scan {173#[pyo3(get)]174paths: Py<PyAny>,175#[pyo3(get)]176file_info: Py<PyAny>,177#[pyo3(get)]178predicate: Option<PyExprIR>,179#[pyo3(get)]180file_options: PyFileOptions,181#[pyo3(get)]182scan_type: Py<PyAny>,183}184185#[pyclass(frozen)]186/// Scan a table from an existing dataframe187pub struct DataFrameScan {188#[pyo3(get)]189df: PyDataFrame,190#[pyo3(get)]191projection: Py<PyAny>,192#[pyo3(get)]193selection: Option<PyExprIR>,194}195196#[pyclass(frozen)]197/// Project out columns from a table198pub struct SimpleProjection {199#[pyo3(get)]200input: usize,201}202203#[pyclass(frozen)]204/// Column selection205pub struct Select {206#[pyo3(get)]207input: usize,208#[pyo3(get)]209expr: Vec<PyExprIR>,210#[pyo3(get)]211should_broadcast: bool,212}213214#[pyclass(frozen)]215/// Sort the table216pub struct Sort {217#[pyo3(get)]218input: usize,219#[pyo3(get)]220by_column: Vec<PyExprIR>,221#[pyo3(get)]222sort_options: (bool, Vec<bool>, Vec<bool>),223#[pyo3(get)]224slice: Option<(i64, usize)>,225}226227#[pyclass(frozen)]228/// Cache the input at this point in the LP229pub struct Cache {230#[pyo3(get)]231input: usize,232#[pyo3(get)]233id_: u128,234}235236#[pyclass(frozen)]237/// Groupby aggregation238pub struct GroupBy {239#[pyo3(get)]240input: usize,241#[pyo3(get)]242keys: Vec<PyExprIR>,243#[pyo3(get)]244aggs: Vec<PyExprIR>,245#[pyo3(get)]246apply: (),247#[pyo3(get)]248maintain_order: bool,249#[pyo3(get)]250options: Py<PyAny>,251}252253#[pyclass(frozen)]254/// Join operation255pub struct Join {256#[pyo3(get)]257input_left: usize,258#[pyo3(get)]259input_right: usize,260#[pyo3(get)]261left_on: Vec<PyExprIR>,262#[pyo3(get)]263right_on: Vec<PyExprIR>,264#[pyo3(get)]265options: Py<PyAny>,266}267268#[pyclass(frozen)]269/// Merge sorted operation270pub struct MergeSorted {271#[pyo3(get)]272input_left: usize,273#[pyo3(get)]274input_right: usize,275#[pyo3(get)]276key: String,277}278279#[pyclass(frozen)]280/// Adding columns to the table without a Join281pub struct HStack {282#[pyo3(get)]283input: usize,284#[pyo3(get)]285exprs: Vec<PyExprIR>,286#[pyo3(get)]287should_broadcast: bool,288}289290#[pyclass(frozen)]291/// Like Select, but all operations produce a single row.292pub struct Reduce {293#[pyo3(get)]294input: usize,295#[pyo3(get)]296exprs: Vec<PyExprIR>,297}298299#[pyclass(frozen)]300/// Remove duplicates from the table301pub struct Distinct {302#[pyo3(get)]303input: usize,304#[pyo3(get)]305options: Py<PyAny>,306}307#[pyclass(frozen)]308/// A (User Defined) Function309pub struct MapFunction {310#[pyo3(get)]311input: usize,312#[pyo3(get)]313function: Py<PyAny>,314}315#[pyclass(frozen)]316pub struct Union {317#[pyo3(get)]318inputs: Vec<usize>,319#[pyo3(get)]320options: Option<(i64, usize)>,321}322#[pyclass(frozen)]323/// Horizontal concatenation of multiple plans324pub struct HConcat {325#[pyo3(get)]326inputs: Vec<usize>,327#[pyo3(get)]328options: (),329}330#[pyclass(frozen)]331/// This allows expressions to access other tables332pub struct ExtContext {333#[pyo3(get)]334input: usize,335#[pyo3(get)]336contexts: Vec<usize>,337}338339#[pyclass(frozen)]340pub struct Sink {341#[pyo3(get)]342input: usize,343#[pyo3(get)]344payload: Py<PyAny>,345}346347pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<Py<PyAny>> {348match plan {349IR::PythonScan { options } => {350let python_src = match options.python_source {351PythonScanSource::Pyarrow => "pyarrow",352PythonScanSource::Cuda => "cuda",353PythonScanSource::IOPlugin => "io_plugin",354};355356PythonScan {357options: (358options359.scan_fn360.as_ref()361.map_or_else(|| py.None(), |s| s.0.clone_ref(py)),362options.with_columns.as_ref().map_or_else(363|| Ok(py.None()),364|cols| {365cols.iter()366.map(|x| x.as_str())367.collect::<Vec<_>>()368.into_py_any(py)369},370)?,371python_src,372match &options.predicate {373PythonPredicate::None => py.None(),374PythonPredicate::PyArrow(s) => ("pyarrow", s).into_py_any(py)?,375PythonPredicate::Polars(e) => ("polars", e.node().0).into_py_any(py)?,376},377options378.n_rows379.map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?,380)381.into_py_any(py)?,382}383.into_py_any(py)384},385IR::Slice { input, offset, len } => Slice {386input: input.0,387offset: *offset,388len: *len,389}390.into_py_any(py),391IR::Filter { input, predicate } => Filter {392input: input.0,393predicate: predicate.into(),394}395.into_py_any(py),396IR::Scan {397hive_parts: Some(_),398..399} => Err(PyNotImplementedError::new_err(400"scan with hive partitioning",401)),402IR::Scan {403sources,404file_info: _,405hive_parts: _,406predicate,407predicate_file_skip_applied,408output_schema: _,409scan_type,410unified_scan_args,411} => {412Scan {413paths: {414let paths = sources415.into_paths()416.ok_or_else(|| PyNotImplementedError::new_err("scan with BytesIO"))?;417418let out = PyList::new(py, [] as [(); 0])?;419420// Manual conversion to preserve `uri://...` - converting Rust `Path` to `PosixPath`421// will corrupt to `uri:/...`422for path in paths.iter() {423out.append(path.to_str())?;424}425426out.into_py_any(py)?427},428// TODO: file info429file_info: py.None(),430predicate: predicate431.as_ref()432.filter(|_| {433!matches!(434predicate_file_skip_applied,435Some(PredicateFileSkip {436no_residual_predicate: true,437original_len: _,438})439)440})441.map(|e| e.into()),442file_options: PyFileOptions {443inner: (**unified_scan_args).clone(),444},445scan_type: scan_type_to_pyobject(py, scan_type, &unified_scan_args.cloud_options)?,446}447}448.into_py_any(py),449IR::DataFrameScan {450df,451schema: _,452output_schema,453} => DataFrameScan {454df: PyDataFrame::new((**df).clone()),455projection: output_schema.as_ref().map_or_else(456|| Ok(py.None()),457|s| {458s.iter_names()459.map(|s| s.as_str())460.collect::<Vec<_>>()461.into_py_any(py)462},463)?,464selection: None,465}466.into_py_any(py),467IR::SimpleProjection { input, columns: _ } => {468SimpleProjection { input: input.0 }.into_py_any(py)469},470IR::Select {471input,472expr,473schema: _,474options,475} => Select {476expr: expr.iter().map(|e| e.into()).collect(),477input: input.0,478should_broadcast: options.should_broadcast,479}480.into_py_any(py),481IR::Sort {482input,483by_column,484slice,485sort_options,486} => Sort {487input: input.0,488by_column: by_column.iter().map(|e| e.into()).collect(),489sort_options: (490sort_options.maintain_order,491sort_options.nulls_last.clone(),492sort_options.descending.clone(),493),494slice: *slice,495}496.into_py_any(py),497IR::Cache { input, id } => Cache {498input: input.0,499id_: id.as_u128(),500}501.into_py_any(py),502IR::GroupBy {503input,504keys,505aggs,506schema: _,507apply,508maintain_order,509options,510} => GroupBy {511input: input.0,512keys: keys.iter().map(|e| e.into()).collect(),513aggs: aggs.iter().map(|e| e.into()).collect(),514apply: apply.as_ref().map_or(Ok(()), |_| {515Err(PyNotImplementedError::new_err(format!(516"apply inside GroupBy {plan:?}"517)))518})?,519maintain_order: *maintain_order,520options: PyGroupbyOptions::new(options.as_ref().clone()).into_py_any(py)?,521}522.into_py_any(py),523IR::Join {524input_left,525input_right,526schema: _,527left_on,528right_on,529options,530} => Join {531input_left: input_left.0,532input_right: input_right.0,533left_on: left_on.iter().map(|e| e.into()).collect(),534right_on: right_on.iter().map(|e| e.into()).collect(),535options: {536let how = &options.args.how;537let name = Into::<&str>::into(how).into_pyobject(py)?;538(539match how {540#[cfg(feature = "asof_join")]541JoinType::AsOf(_) => {542return Err(PyNotImplementedError::new_err("asof join"));543},544#[cfg(feature = "iejoin")]545JoinType::IEJoin => {546let Some(JoinTypeOptionsIR::IEJoin(ie_options)) = &options.options547else {548unreachable!()549};550(551name,552crate::Wrap(ie_options.operator1).into_py_any(py)?,553ie_options.operator2.as_ref().map_or_else(554|| Ok(py.None()),555|op| crate::Wrap(*op).into_py_any(py),556)?,557)558.into_py_any(py)?559},560// This is a cross join fused with a predicate. Shown in the IR::explain as561// NESTED LOOP JOIN562JoinType::Cross if options.options.is_some() => {563return Err(PyNotImplementedError::new_err("nested loop join"));564},565_ => name.into_any().unbind(),566},567options.args.nulls_equal,568options.args.slice,569options.args.suffix().as_str(),570options.args.coalesce.coalesce(how),571Into::<&str>::into(options.args.maintain_order),572)573.into_py_any(py)?574},575}576.into_py_any(py),577IR::HStack {578input,579exprs,580schema: _,581options,582} => HStack {583input: input.0,584exprs: exprs.iter().map(|e| e.into()).collect(),585should_broadcast: options.should_broadcast,586}587.into_py_any(py),588IR::Distinct { input, options } => Distinct {589input: input.0,590options: (591Into::<&str>::into(options.keep_strategy),592options.subset.as_ref().map_or_else(593|| Ok(py.None()),594|f| {595f.iter()596.map(|s| s.as_ref())597.collect::<Vec<&str>>()598.into_py_any(py)599},600)?,601options.maintain_order,602options.slice,603)604.into_py_any(py)?,605}606.into_py_any(py),607IR::MapFunction { input, function } => MapFunction {608input: input.0,609function: match function {610FunctionIR::OpaquePython(_) => {611return Err(PyNotImplementedError::new_err("opaque python mapfunction"));612},613FunctionIR::Opaque {614function: _,615schema: _,616predicate_pd: _,617projection_pd: _,618streamable: _,619fmt_str: _,620} => return Err(PyNotImplementedError::new_err("opaque rust mapfunction")),621FunctionIR::Unnest { columns, separator } => (622"unnest",623columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),624separator.as_ref().map(|s| s.to_string()),625)626.into_py_any(py)?,627FunctionIR::Rechunk => ("rechunk",).into_py_any(py)?,628FunctionIR::Explode {629columns,630options,631schema: _,632} => (633"explode",634columns.iter().map(|s| s.to_string()).collect::<Vec<_>>(),635options.empty_as_null,636options.keep_nulls,637)638.into_py_any(py)?,639#[cfg(feature = "pivot")]640FunctionIR::Unpivot { args, schema: _ } => (641"unpivot",642args.index.iter().map(|s| s.as_str()).collect::<Vec<_>>(),643args.on.iter().map(|s| s.as_str()).collect::<Vec<_>>(),644args.variable_name.as_str().into_py_any(py)?,645args.value_name.as_str().into_py_any(py)?,646)647.into_py_any(py)?,648FunctionIR::RowIndex {649name,650schema: _,651offset,652} => ("row_index", name.to_string(), offset.unwrap_or(0)).into_py_any(py)?,653FunctionIR::FastCount {654sources,655scan_type,656cloud_options,657alias,658} => {659let sources = sources660.into_paths()661.ok_or_else(|| {662PyNotImplementedError::new_err("FastCount with BytesIO sources")663})?664.iter()665.map(|p| p.to_str())666.collect::<Vec<_>>()667.into_py_any(py)?;668669let scan_type = scan_type_to_pyobject(py, scan_type, cloud_options)?;670671let alias = alias672.as_ref()673.map(|a| a.as_str())674.map_or_else(|| Ok(py.None()), |s| s.into_py_any(py))?;675676("fast_count", sources, scan_type, alias).into_py_any(py)?677},678FunctionIR::Hint(_) => return Err(PyNotImplementedError::new_err("hint ir")),679},680}681.into_py_any(py),682IR::Union { inputs, options } => Union {683inputs: inputs.iter().map(|n| n.0).collect(),684// TODO: rest of options685options: options.slice,686}687.into_py_any(py),688IR::HConcat {689inputs,690schema: _,691options: _,692} => HConcat {693inputs: inputs.iter().map(|n| n.0).collect(),694options: (),695}696.into_py_any(py),697IR::ExtContext {698input,699contexts,700schema: _,701} => ExtContext {702input: input.0,703contexts: contexts.iter().map(|n| n.0).collect(),704}705.into_py_any(py),706IR::Sink { input, payload } => Sink {707input: input.0,708payload: PyString::new(709py,710&serde_json::to_string(payload)711.map_err(|err| PyValueError::new_err(format!("{err:?}")))?,712)713.into(),714}715.into_py_any(py),716IR::SinkMultiple { .. } => Err(PyNotImplementedError::new_err(717"Not expecting to see a SinkMultiple node",718)),719#[cfg(feature = "merge_sorted")]720IR::MergeSorted {721input_left,722input_right,723key,724} => MergeSorted {725input_left: input_left.0,726input_right: input_right.0,727key: key.to_string(),728}729.into_py_any(py),730IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),731}732}733734735