Path: blob/main/crates/polars-python/src/interop/arrow/to_py.rs
7889 views
use std::ffi::CString;12use arrow::datatypes::ArrowDataType;3use arrow::ffi;4use arrow::record_batch::RecordBatch;5use polars::datatypes::CompatLevel;6use polars::frame::DataFrame;7use polars::prelude::{ArrayRef, ArrowField, PlSmallStr, SchemaExt};8use polars::series::Series;9use polars_core::utils::arrow;10use polars_error::PolarsResult;11use pyo3::ffi::Py_uintptr_t;12use pyo3::prelude::*;13use pyo3::types::PyCapsule;1415/// Arrow array to Python.16pub(crate) fn to_py_array(17array: ArrayRef,18field: &ArrowField,19pyarrow: &Bound<PyModule>,20) -> PyResult<Py<PyAny>> {21let schema = Box::new(ffi::export_field_to_c(field));22let array = Box::new(ffi::export_array_to_c(array));2324let schema_ptr: *const ffi::ArrowSchema = &*schema;25let array_ptr: *const ffi::ArrowArray = &*array;2627let array = pyarrow.getattr("Array")?.call_method1(28"_import_from_c",29(array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),30)?;3132Ok(array.unbind())33}3435/// RecordBatch to Python.36pub(crate) fn to_py_rb(37rb: &RecordBatch,38py: Python<'_>,39pyarrow: &Bound<PyModule>,40) -> PyResult<Py<PyAny>> {41let mut arrays = Vec::with_capacity(rb.width());4243for (array, field) in rb.columns().iter().zip(rb.schema().iter_values()) {44let array_object = to_py_array(array.clone(), field, pyarrow)?;45arrays.push(array_object);46}4748let schema = Box::new(ffi::export_field_to_c(&ArrowField {49name: PlSmallStr::EMPTY,50dtype: ArrowDataType::Struct(rb.schema().iter_values().cloned().collect()),51is_nullable: false,52metadata: None,53}));54let schema_ptr: *const ffi::ArrowSchema = &*schema;5556let schema = pyarrow57.getattr("Schema")?58.call_method1("_import_from_c", (schema_ptr as Py_uintptr_t,))?;59let record = pyarrow60.getattr("RecordBatch")?61.call_method1("from_arrays", (arrays, py.None(), schema))?;6263Ok(record.unbind())64}6566/// Export a series to a C stream via a PyCapsule according to the Arrow PyCapsule Interface67/// https://arrow.apache.org/docs/dev/format/CDataInterface/PyCapsuleInterface.html68pub(crate) fn series_to_stream<'py>(69series: &Series,70py: Python<'py>,71) -> PyResult<Bound<'py, PyCapsule>> {72let field = series.field().to_arrow(CompatLevel::newest());73let series = series.clone();74let iter = Box::new(75(0..series.n_chunks()).map(move |i| Ok(series.to_arrow(i, CompatLevel::newest()))),76) as _;7778let stream = ffi::export_iterator(iter, field);79let stream_capsule_name = CString::new("arrow_array_stream").unwrap();80PyCapsule::new(py, stream, Some(stream_capsule_name))81}8283pub(crate) fn dataframe_to_stream<'py>(84df: &DataFrame,85py: Python<'py>,86) -> PyResult<Bound<'py, PyCapsule>> {87let iter = Box::new(DataFrameStreamIterator::new(df));88let field = iter.field();89let stream = ffi::export_iterator(iter, field);90let stream_capsule_name = CString::new("arrow_array_stream").unwrap();91PyCapsule::new(py, stream, Some(stream_capsule_name))92}9394#[cfg(feature = "c_api")]95#[pyfunction]96pub(crate) fn polars_schema_to_pycapsule<'py>(97py: Python<'py>,98schema: crate::prelude::Wrap<polars::prelude::Schema>,99compat_level: crate::prelude::PyCompatLevel,100) -> PyResult<Bound<'py, PyCapsule>> {101let schema: arrow::ffi::ArrowSchema = arrow::ffi::export_field_to_c(&ArrowField::new(102PlSmallStr::EMPTY,103ArrowDataType::Struct(104schema105.0106.iter_fields()107.map(|x| x.to_arrow(compat_level.0))108.collect(),109),110false,111));112113let capsule_name = CString::new("arrow_schema").unwrap();114PyCapsule::new(py, schema, Some(capsule_name))115}116117pub struct DataFrameStreamIterator {118columns: Vec<Series>,119dtype: ArrowDataType,120idx: usize,121n_chunks: usize,122}123124impl DataFrameStreamIterator {125fn new(df: &DataFrame) -> Self {126let schema = df.schema().to_arrow(CompatLevel::newest());127let dtype = ArrowDataType::Struct(schema.into_iter_values().collect());128129Self {130columns: df131.get_columns()132.iter()133.map(|v| v.as_materialized_series().clone())134.collect(),135dtype,136idx: 0,137n_chunks: df.first_col_n_chunks(),138}139}140141fn field(&self) -> ArrowField {142ArrowField::new(PlSmallStr::EMPTY, self.dtype.clone(), false)143}144}145146impl Iterator for DataFrameStreamIterator {147type Item = PolarsResult<ArrayRef>;148149fn next(&mut self) -> Option<Self::Item> {150if self.idx >= self.n_chunks {151None152} else {153// create a batch of the columns with the same chunk no.154let batch_cols = self155.columns156.iter()157.map(|s| s.to_arrow(self.idx, CompatLevel::newest()))158.collect::<Vec<_>>();159self.idx += 1;160161let array = arrow::array::StructArray::new(162self.dtype.clone(),163batch_cols[0].len(),164batch_cols,165None,166);167Some(Ok(Box::new(array)))168}169}170}171172173