Path: blob/main/crates/polars-python/src/dataset/dataset_provider_funcs.rs
7889 views
//! Note: Currently only used for iceberg.1use std::sync::Arc;23use polars::prelude::{DslPlan, PlSmallStr, Schema, SchemaRef};4use polars_core::config;5use polars_error::PolarsResult;6use polars_utils::python_function::PythonObject;7use pyo3::conversion::FromPyObjectBound;8use pyo3::exceptions::PyValueError;9use pyo3::pybacked::PyBackedStr;10use pyo3::types::{PyAnyMethods, PyDict, PyList, PyListMethods};11use pyo3::{Py, PyAny, PyResult, Python, intern};1213use crate::interop::arrow::to_rust::field_to_rust;14use crate::prelude::{Wrap, get_lf};1516pub fn name(dataset_object: &PythonObject) -> PlSmallStr {17Python::attach(|py| {18PyResult::Ok(PlSmallStr::from_str(19&dataset_object20.getattr(py, intern!(py, "__class__"))?21.getattr(py, intern!(py, "__name__"))?22.extract::<PyBackedStr>(py)?,23))24})25.unwrap()26}2728pub fn schema(dataset_object: &PythonObject) -> PolarsResult<SchemaRef> {29Python::attach(|py| {30let pyarrow_schema_cls = py31.import("pyarrow")32.ok()33.and_then(|pa| pa.getattr("Schema").ok());3435let schema_obj = dataset_object.getattr(py, "schema")?.call0(py)?;3637let schema_cls = schema_obj.getattr(py, "__class__")?;3839// PyIceberg returns arrow schemas, we convert them here.40if let Some(pyarrow_schema_cls) = pyarrow_schema_cls {41if schema_cls.is(&pyarrow_schema_cls) {42if config::verbose() {43eprintln!("python dataset: convert from arrow schema");44}4546let mut iter = schema_obj47.bind(py)48.try_iter()?49.map(|x| x.and_then(field_to_rust));5051let mut last_err = None;5253let schema =54Schema::from_iter_check_duplicates(std::iter::from_fn(|| match iter.next() {55Some(Ok(v)) => Some(v),56Some(Err(e)) => {57last_err = Some(e);58None59},60None => None,61}))?;6263if let Some(last_err) = last_err {64return Err(last_err.into());65}6667return Ok(Arc::new(schema));68}69}7071let Wrap(schema) = Wrap::<Schema>::from_py_object_bound(schema_obj.bind_borrowed(py))?;7273Ok(Arc::new(schema))74})75}7677pub fn to_dataset_scan(78dataset_object: &PythonObject,79existing_resolved_version_key: Option<&str>,80limit: Option<usize>,81projection: Option<&[PlSmallStr]>,82filter_columns: Option<&[PlSmallStr]>,83pyarrow_predicate: Option<&str>,84) -> PolarsResult<Option<(DslPlan, PlSmallStr)>> {85Python::attach(|py| {86let kwargs = PyDict::new(py);8788kwargs.set_item(89intern!(py, "existing_resolved_version_key"),90existing_resolved_version_key,91)?;9293if let Some(limit) = limit {94kwargs.set_item(intern!(py, "limit"), limit)?;95}9697if let Some(projection) = projection {98let projection_list = PyList::empty(py);99100for name in projection {101projection_list.append(name.as_str())?;102}103104kwargs.set_item(intern!(py, "projection"), projection_list)?;105}106107if let Some(filter_columns) = filter_columns {108let filter_columns_list = PyList::empty(py);109110for name in filter_columns {111filter_columns_list.append(name.as_str())?;112}113114kwargs.set_item(intern!(py, "filter_columns"), filter_columns_list)?;115}116117kwargs.set_item(intern!(py, "pyarrow_predicate"), pyarrow_predicate)?;118119let Some((scan, version)): Option<(Py<PyAny>, Wrap<PlSmallStr>)> = dataset_object120.getattr(py, intern!(py, "to_dataset_scan"))?121.call(py, (), Some(&kwargs))?122.extract(py)?123else {124return Ok(None);125};126127let Ok(lf) = get_lf(scan.bind(py)) else {128return Err(129PyValueError::new_err(format!("cannot extract LazyFrame from {}", &scan)).into(),130);131};132133Ok(Some((lf.logical_plan, version.0)))134})135}136137138