Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/series/import.rs
8383 views
1
use arrow::array::{Array, PrimitiveArray};
2
use arrow::ffi;
3
use arrow::ffi::{ArrowArray, ArrowArrayStream, ArrowArrayStreamReader, ArrowSchema};
4
use polars::prelude::*;
5
use polars_ffi::version_0::SeriesExport;
6
use pyo3::exceptions::{PyTypeError, PyValueError};
7
use pyo3::prelude::*;
8
use pyo3::pybacked::PyBackedBytes;
9
use pyo3::types::{PyCapsule, PyTuple, PyType};
10
11
use super::PySeries;
12
use crate::error::PyPolarsErr;
13
14
/// Validate PyCapsule has provided name
15
fn validate_pycapsule_name(capsule: &Bound<PyCapsule>, expected_name: &str) -> PyResult<()> {
16
let capsule_name = capsule.name()?;
17
if let Some(capsule_name) = capsule_name {
18
let capsule_name = unsafe { capsule_name.as_cstr() };
19
if capsule_name.to_str() != Ok(expected_name) {
20
return Err(PyValueError::new_err(format!(
21
"Expected name '{expected_name}' in PyCapsule, instead got '{capsule_name:?}'"
22
)));
23
}
24
} else {
25
return Err(PyValueError::new_err(
26
"Expected schema PyCapsule to have name set.",
27
));
28
}
29
30
Ok(())
31
}
32
33
/// Import `__arrow_c_array__` across Python boundary
34
pub(crate) fn call_arrow_c_array<'py>(
35
ob: &Bound<'py, PyAny>,
36
) -> PyResult<(Bound<'py, PyCapsule>, Bound<'py, PyCapsule>)> {
37
if !ob.hasattr("__arrow_c_array__")? {
38
return Err(PyValueError::new_err(
39
"Expected an object with dunder __arrow_c_array__",
40
));
41
}
42
43
let tuple = ob.getattr("__arrow_c_array__")?.call0()?;
44
if !tuple.is_instance_of::<PyTuple>() {
45
return Err(PyTypeError::new_err(
46
"Expected __arrow_c_array__ to return a tuple.",
47
));
48
}
49
50
let schema_capsule = tuple.get_item(0)?.cast_into()?;
51
let array_capsule = tuple.get_item(1)?.cast_into()?;
52
Ok((schema_capsule, array_capsule))
53
}
54
55
pub(crate) fn import_array_pycapsules(
56
schema_capsule: &Bound<PyCapsule>,
57
array_capsule: &Bound<PyCapsule>,
58
) -> PyResult<(arrow::datatypes::Field, Box<dyn Array>)> {
59
let field = import_schema_pycapsule(schema_capsule)?;
60
61
validate_pycapsule_name(array_capsule, "arrow_array")?;
62
63
// # Safety
64
// array_capsule holds a valid C ArrowArray pointer, as defined by the Arrow PyCapsule
65
// Interface
66
unsafe {
67
#[allow(deprecated)]
68
let array_ptr = std::ptr::replace(array_capsule.pointer() as _, ArrowArray::empty());
69
let array = ffi::import_array_from_c(array_ptr, field.dtype().clone()).unwrap();
70
71
Ok((field, array))
72
}
73
}
74
75
pub(crate) fn import_schema_pycapsule(
76
schema_capsule: &Bound<PyCapsule>,
77
) -> PyResult<arrow::datatypes::Field> {
78
validate_pycapsule_name(schema_capsule, "arrow_schema")?;
79
80
// # Safety
81
// schema_capsule holds a valid C ArrowSchema pointer, as defined by the Arrow PyCapsule
82
// Interface
83
unsafe {
84
#[allow(deprecated)]
85
let schema_ptr = schema_capsule.reference::<ArrowSchema>();
86
let field = ffi::import_field_from_c(schema_ptr).unwrap();
87
88
Ok(field)
89
}
90
}
91
92
/// Import `__arrow_c_stream__` across Python boundary.
93
fn call_arrow_c_stream<'py>(ob: &Bound<'py, PyAny>) -> PyResult<Bound<'py, PyCapsule>> {
94
if !ob.hasattr("__arrow_c_stream__")? {
95
return Err(PyValueError::new_err(
96
"Expected an object with dunder __arrow_c_stream__",
97
));
98
}
99
100
let capsule = ob.getattr("__arrow_c_stream__")?.call0()?.cast_into()?;
101
Ok(capsule)
102
}
103
104
pub(crate) fn import_stream_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<PySeries> {
105
validate_pycapsule_name(capsule, "arrow_array_stream")?;
106
107
// # Safety
108
// capsule holds a valid C ArrowArrayStream pointer, as defined by the Arrow PyCapsule
109
// Interface
110
let mut stream = unsafe {
111
// Takes ownership of the pointed to ArrowArrayStream
112
// This acts to move the data out of the capsule pointer, setting the release callback to NULL
113
#[allow(deprecated)]
114
let stream_ptr = Box::new(std::ptr::replace(
115
capsule.pointer() as _,
116
ArrowArrayStream::empty(),
117
));
118
ArrowArrayStreamReader::try_new(stream_ptr)
119
.map_err(|err| PyValueError::new_err(err.to_string()))?
120
};
121
122
let mut produced_arrays: Vec<Box<dyn Array>> = vec![];
123
while let Some(array) = unsafe { stream.next() } {
124
produced_arrays.push(array.map_err(PyPolarsErr::from)?);
125
}
126
127
// Series::try_from fails for an empty vec of chunks
128
let s = if produced_arrays.is_empty() {
129
let polars_dt = DataType::from_arrow_field(stream.field());
130
Series::new_empty(stream.field().name.clone(), &polars_dt)
131
} else {
132
Series::try_from((stream.field(), produced_arrays)).map_err(PyPolarsErr::from)?
133
};
134
Ok(PySeries::new(s))
135
}
136
#[pymethods]
137
impl PySeries {
138
#[classmethod]
139
pub fn from_arrow_c_array(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {
140
let (schema_capsule, array_capsule) = call_arrow_c_array(ob)?;
141
let (field, array) = import_array_pycapsules(&schema_capsule, &array_capsule)?;
142
let s = Series::try_from((&field, array)).unwrap();
143
Ok(PySeries::new(s))
144
}
145
146
#[classmethod]
147
pub fn from_arrow_c_stream(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {
148
let capsule = call_arrow_c_stream(ob)?;
149
import_stream_pycapsule(&capsule)
150
}
151
152
#[classmethod]
153
/// Import a series via polars-ffi
154
/// Takes ownership of the [`SeriesExport`] at [`location`]
155
/// # Safety
156
/// [`location`] should be the address of an allocated and initialized [`SeriesExport`]
157
pub unsafe fn _import(_cls: &Bound<PyType>, location: usize) -> PyResult<Self> {
158
let location = location as *mut SeriesExport;
159
160
// # Safety
161
// `location` should be valid for reading
162
let series = unsafe {
163
let export = location.read();
164
polars_ffi::version_0::import_series(export).map_err(PyPolarsErr::from)?
165
};
166
Ok(PySeries::from(series))
167
}
168
169
#[staticmethod]
170
pub fn _import_decimal_from_iceberg_binary_repr(
171
bytes_list: &Bound<PyAny>, // list[bytes | None]
172
precision: usize,
173
scale: usize,
174
) -> PyResult<Self> {
175
// From iceberg spec:
176
// * Decimal(P, S): Stores unscaled value as two’s-complement
177
// big-endian binary, using the minimum number of bytes for the
178
// value.
179
let max_abs_decimal_value = 10_i128.pow(u32::try_from(precision).unwrap()) - 1;
180
181
let out: Vec<i128> = bytes_list
182
.try_iter()?
183
.map(|bytes| {
184
let be_bytes: Option<PyBackedBytes> = bytes?.extract()?;
185
186
let mut le_bytes: [u8; 16] = [0; _];
187
188
if let Some(be_bytes) = be_bytes.as_deref() {
189
if be_bytes.len() > le_bytes.len() {
190
return Err(PyValueError::new_err(format!(
191
"iceberg binary data for decimal exceeded 16 bytes: {}",
192
be_bytes.len()
193
)));
194
}
195
196
for (i, byte) in be_bytes.iter().rev().enumerate() {
197
le_bytes[i] = *byte;
198
}
199
}
200
201
let value = i128::from_le_bytes(le_bytes);
202
203
if value.abs() > max_abs_decimal_value {
204
return Err(PyValueError::new_err(format!(
205
"iceberg decoded value for decimal exceeded precision: \
206
value: {value}, precision: {precision}",
207
)));
208
}
209
210
Ok(value)
211
})
212
.collect::<PyResult<_>>()?;
213
214
Ok(PySeries::from(unsafe {
215
Series::from_chunks_and_dtype_unchecked(
216
PlSmallStr::EMPTY,
217
vec![PrimitiveArray::<i128>::from_vec(out).boxed()],
218
&DataType::Decimal(precision, scale),
219
)
220
}))
221
}
222
}
223
224