Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/dataset/dataset_provider_funcs.rs
7889 views
1
//! Note: Currently only used for iceberg.
2
use std::sync::Arc;
3
4
use polars::prelude::{DslPlan, PlSmallStr, Schema, SchemaRef};
5
use polars_core::config;
6
use polars_error::PolarsResult;
7
use polars_utils::python_function::PythonObject;
8
use pyo3::conversion::FromPyObjectBound;
9
use pyo3::exceptions::PyValueError;
10
use pyo3::pybacked::PyBackedStr;
11
use pyo3::types::{PyAnyMethods, PyDict, PyList, PyListMethods};
12
use pyo3::{Py, PyAny, PyResult, Python, intern};
13
14
use crate::interop::arrow::to_rust::field_to_rust;
15
use crate::prelude::{Wrap, get_lf};
16
17
pub fn name(dataset_object: &PythonObject) -> PlSmallStr {
18
Python::attach(|py| {
19
PyResult::Ok(PlSmallStr::from_str(
20
&dataset_object
21
.getattr(py, intern!(py, "__class__"))?
22
.getattr(py, intern!(py, "__name__"))?
23
.extract::<PyBackedStr>(py)?,
24
))
25
})
26
.unwrap()
27
}
28
29
pub fn schema(dataset_object: &PythonObject) -> PolarsResult<SchemaRef> {
30
Python::attach(|py| {
31
let pyarrow_schema_cls = py
32
.import("pyarrow")
33
.ok()
34
.and_then(|pa| pa.getattr("Schema").ok());
35
36
let schema_obj = dataset_object.getattr(py, "schema")?.call0(py)?;
37
38
let schema_cls = schema_obj.getattr(py, "__class__")?;
39
40
// PyIceberg returns arrow schemas, we convert them here.
41
if let Some(pyarrow_schema_cls) = pyarrow_schema_cls {
42
if schema_cls.is(&pyarrow_schema_cls) {
43
if config::verbose() {
44
eprintln!("python dataset: convert from arrow schema");
45
}
46
47
let mut iter = schema_obj
48
.bind(py)
49
.try_iter()?
50
.map(|x| x.and_then(field_to_rust));
51
52
let mut last_err = None;
53
54
let schema =
55
Schema::from_iter_check_duplicates(std::iter::from_fn(|| match iter.next() {
56
Some(Ok(v)) => Some(v),
57
Some(Err(e)) => {
58
last_err = Some(e);
59
None
60
},
61
None => None,
62
}))?;
63
64
if let Some(last_err) = last_err {
65
return Err(last_err.into());
66
}
67
68
return Ok(Arc::new(schema));
69
}
70
}
71
72
let Wrap(schema) = Wrap::<Schema>::from_py_object_bound(schema_obj.bind_borrowed(py))?;
73
74
Ok(Arc::new(schema))
75
})
76
}
77
78
pub fn to_dataset_scan(
79
dataset_object: &PythonObject,
80
existing_resolved_version_key: Option<&str>,
81
limit: Option<usize>,
82
projection: Option<&[PlSmallStr]>,
83
filter_columns: Option<&[PlSmallStr]>,
84
pyarrow_predicate: Option<&str>,
85
) -> PolarsResult<Option<(DslPlan, PlSmallStr)>> {
86
Python::attach(|py| {
87
let kwargs = PyDict::new(py);
88
89
kwargs.set_item(
90
intern!(py, "existing_resolved_version_key"),
91
existing_resolved_version_key,
92
)?;
93
94
if let Some(limit) = limit {
95
kwargs.set_item(intern!(py, "limit"), limit)?;
96
}
97
98
if let Some(projection) = projection {
99
let projection_list = PyList::empty(py);
100
101
for name in projection {
102
projection_list.append(name.as_str())?;
103
}
104
105
kwargs.set_item(intern!(py, "projection"), projection_list)?;
106
}
107
108
if let Some(filter_columns) = filter_columns {
109
let filter_columns_list = PyList::empty(py);
110
111
for name in filter_columns {
112
filter_columns_list.append(name.as_str())?;
113
}
114
115
kwargs.set_item(intern!(py, "filter_columns"), filter_columns_list)?;
116
}
117
118
kwargs.set_item(intern!(py, "pyarrow_predicate"), pyarrow_predicate)?;
119
120
let Some((scan, version)): Option<(Py<PyAny>, Wrap<PlSmallStr>)> = dataset_object
121
.getattr(py, intern!(py, "to_dataset_scan"))?
122
.call(py, (), Some(&kwargs))?
123
.extract(py)?
124
else {
125
return Ok(None);
126
};
127
128
let Ok(lf) = get_lf(scan.bind(py)) else {
129
return Err(
130
PyValueError::new_err(format!("cannot extract LazyFrame from {}", &scan)).into(),
131
);
132
};
133
134
Ok(Some((lf.logical_plan, version.0)))
135
})
136
}
137
138