Path: blob/main/crates/polars-python/src/series/import.rs
8383 views
use arrow::array::{Array, PrimitiveArray};1use arrow::ffi;2use arrow::ffi::{ArrowArray, ArrowArrayStream, ArrowArrayStreamReader, ArrowSchema};3use polars::prelude::*;4use polars_ffi::version_0::SeriesExport;5use pyo3::exceptions::{PyTypeError, PyValueError};6use pyo3::prelude::*;7use pyo3::pybacked::PyBackedBytes;8use pyo3::types::{PyCapsule, PyTuple, PyType};910use super::PySeries;11use crate::error::PyPolarsErr;1213/// Validate PyCapsule has provided name14fn validate_pycapsule_name(capsule: &Bound<PyCapsule>, expected_name: &str) -> PyResult<()> {15let capsule_name = capsule.name()?;16if let Some(capsule_name) = capsule_name {17let capsule_name = unsafe { capsule_name.as_cstr() };18if capsule_name.to_str() != Ok(expected_name) {19return Err(PyValueError::new_err(format!(20"Expected name '{expected_name}' in PyCapsule, instead got '{capsule_name:?}'"21)));22}23} else {24return Err(PyValueError::new_err(25"Expected schema PyCapsule to have name set.",26));27}2829Ok(())30}3132/// Import `__arrow_c_array__` across Python boundary33pub(crate) fn call_arrow_c_array<'py>(34ob: &Bound<'py, PyAny>,35) -> PyResult<(Bound<'py, PyCapsule>, Bound<'py, PyCapsule>)> {36if !ob.hasattr("__arrow_c_array__")? {37return Err(PyValueError::new_err(38"Expected an object with dunder __arrow_c_array__",39));40}4142let tuple = ob.getattr("__arrow_c_array__")?.call0()?;43if !tuple.is_instance_of::<PyTuple>() {44return Err(PyTypeError::new_err(45"Expected __arrow_c_array__ to return a tuple.",46));47}4849let schema_capsule = tuple.get_item(0)?.cast_into()?;50let array_capsule = tuple.get_item(1)?.cast_into()?;51Ok((schema_capsule, array_capsule))52}5354pub(crate) fn import_array_pycapsules(55schema_capsule: &Bound<PyCapsule>,56array_capsule: &Bound<PyCapsule>,57) -> PyResult<(arrow::datatypes::Field, Box<dyn Array>)> {58let field = import_schema_pycapsule(schema_capsule)?;5960validate_pycapsule_name(array_capsule, "arrow_array")?;6162// # Safety63// array_capsule holds a valid C ArrowArray pointer, as defined by the Arrow PyCapsule64// Interface65unsafe {66#[allow(deprecated)]67let array_ptr = std::ptr::replace(array_capsule.pointer() as _, ArrowArray::empty());68let array = ffi::import_array_from_c(array_ptr, field.dtype().clone()).unwrap();6970Ok((field, array))71}72}7374pub(crate) fn import_schema_pycapsule(75schema_capsule: &Bound<PyCapsule>,76) -> PyResult<arrow::datatypes::Field> {77validate_pycapsule_name(schema_capsule, "arrow_schema")?;7879// # Safety80// schema_capsule holds a valid C ArrowSchema pointer, as defined by the Arrow PyCapsule81// Interface82unsafe {83#[allow(deprecated)]84let schema_ptr = schema_capsule.reference::<ArrowSchema>();85let field = ffi::import_field_from_c(schema_ptr).unwrap();8687Ok(field)88}89}9091/// Import `__arrow_c_stream__` across Python boundary.92fn call_arrow_c_stream<'py>(ob: &Bound<'py, PyAny>) -> PyResult<Bound<'py, PyCapsule>> {93if !ob.hasattr("__arrow_c_stream__")? {94return Err(PyValueError::new_err(95"Expected an object with dunder __arrow_c_stream__",96));97}9899let capsule = ob.getattr("__arrow_c_stream__")?.call0()?.cast_into()?;100Ok(capsule)101}102103pub(crate) fn import_stream_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<PySeries> {104validate_pycapsule_name(capsule, "arrow_array_stream")?;105106// # Safety107// capsule holds a valid C ArrowArrayStream pointer, as defined by the Arrow PyCapsule108// Interface109let mut stream = unsafe {110// Takes ownership of the pointed to ArrowArrayStream111// This acts to move the data out of the capsule pointer, setting the release callback to NULL112#[allow(deprecated)]113let stream_ptr = Box::new(std::ptr::replace(114capsule.pointer() as _,115ArrowArrayStream::empty(),116));117ArrowArrayStreamReader::try_new(stream_ptr)118.map_err(|err| PyValueError::new_err(err.to_string()))?119};120121let mut produced_arrays: Vec<Box<dyn Array>> = vec![];122while let Some(array) = unsafe { stream.next() } {123produced_arrays.push(array.map_err(PyPolarsErr::from)?);124}125126// Series::try_from fails for an empty vec of chunks127let s = if produced_arrays.is_empty() {128let polars_dt = DataType::from_arrow_field(stream.field());129Series::new_empty(stream.field().name.clone(), &polars_dt)130} else {131Series::try_from((stream.field(), produced_arrays)).map_err(PyPolarsErr::from)?132};133Ok(PySeries::new(s))134}135#[pymethods]136impl PySeries {137#[classmethod]138pub fn from_arrow_c_array(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {139let (schema_capsule, array_capsule) = call_arrow_c_array(ob)?;140let (field, array) = import_array_pycapsules(&schema_capsule, &array_capsule)?;141let s = Series::try_from((&field, array)).unwrap();142Ok(PySeries::new(s))143}144145#[classmethod]146pub fn from_arrow_c_stream(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {147let capsule = call_arrow_c_stream(ob)?;148import_stream_pycapsule(&capsule)149}150151#[classmethod]152/// Import a series via polars-ffi153/// Takes ownership of the [`SeriesExport`] at [`location`]154/// # Safety155/// [`location`] should be the address of an allocated and initialized [`SeriesExport`]156pub unsafe fn _import(_cls: &Bound<PyType>, location: usize) -> PyResult<Self> {157let location = location as *mut SeriesExport;158159// # Safety160// `location` should be valid for reading161let series = unsafe {162let export = location.read();163polars_ffi::version_0::import_series(export).map_err(PyPolarsErr::from)?164};165Ok(PySeries::from(series))166}167168#[staticmethod]169pub fn _import_decimal_from_iceberg_binary_repr(170bytes_list: &Bound<PyAny>, // list[bytes | None]171precision: usize,172scale: usize,173) -> PyResult<Self> {174// From iceberg spec:175// * Decimal(P, S): Stores unscaled value as two’s-complement176// big-endian binary, using the minimum number of bytes for the177// value.178let max_abs_decimal_value = 10_i128.pow(u32::try_from(precision).unwrap()) - 1;179180let out: Vec<i128> = bytes_list181.try_iter()?182.map(|bytes| {183let be_bytes: Option<PyBackedBytes> = bytes?.extract()?;184185let mut le_bytes: [u8; 16] = [0; _];186187if let Some(be_bytes) = be_bytes.as_deref() {188if be_bytes.len() > le_bytes.len() {189return Err(PyValueError::new_err(format!(190"iceberg binary data for decimal exceeded 16 bytes: {}",191be_bytes.len()192)));193}194195for (i, byte) in be_bytes.iter().rev().enumerate() {196le_bytes[i] = *byte;197}198}199200let value = i128::from_le_bytes(le_bytes);201202if value.abs() > max_abs_decimal_value {203return Err(PyValueError::new_err(format!(204"iceberg decoded value for decimal exceeded precision: \205value: {value}, precision: {precision}",206)));207}208209Ok(value)210})211.collect::<PyResult<_>>()?;212213Ok(PySeries::from(unsafe {214Series::from_chunks_and_dtype_unchecked(215PlSmallStr::EMPTY,216vec![PrimitiveArray::<i128>::from_vec(out).boxed()],217&DataType::Decimal(precision, scale),218)219}))220}221}222223224