Path: blob/main/crates/polars-python/src/dataframe/export.rs
7889 views
use arrow::datatypes::IntegerType;1use arrow::record_batch::RecordBatch;2use parking_lot::RwLockWriteGuard;3use polars::prelude::*;4use polars_compute::cast::CastOptionsImpl;5use pyo3::IntoPyObjectExt;6use pyo3::prelude::*;7use pyo3::types::{PyCapsule, PyList, PyTuple};89use super::PyDataFrame;10use crate::conversion::{ObjectValue, Wrap};11use crate::error::PyPolarsErr;12use crate::interop;13use crate::interop::arrow::to_py::dataframe_to_stream;14use crate::prelude::PyCompatLevel;15use crate::utils::EnterPolarsExt;1617#[pymethods]18impl PyDataFrame {19#[cfg(feature = "object")]20pub fn row_tuple<'py>(&self, idx: i64, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {21let df = self.df.read();22let idx = if idx < 0 {23(df.height() as i64 + idx) as usize24} else {25idx as usize26};27if idx >= df.height() {28return Err(PyPolarsErr::from(polars_err!(oob = idx, df.height())).into());29}30PyTuple::new(31py,32df.get_columns().iter().map(|s| match s.dtype() {33DataType::Object(_) => {34let obj: Option<&ObjectValue> = s.get_object(idx).map(|any| any.into());35obj.into_py_any(py).unwrap()36},37_ => Wrap(s.get(idx).unwrap()).into_py_any(py).unwrap(),38}),39)40}4142#[cfg(feature = "object")]43pub fn row_tuples<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {44let df = self.df.read();45let mut rechunked;46// Rechunk if random access would become rather expensive.47// TODO: iterate over the chunks directly instead of using random access.48let df = if df.max_n_chunks() > 16 {49rechunked = df.clone();50py.enter_polars_ok(|| rechunked.as_single_chunk_par())?;51&rechunked52} else {53&df54};55PyList::new(56py,57(0..df.height()).map(|idx| {58PyTuple::new(59py,60df.get_columns().iter().map(|c| match c.dtype() {61DataType::Null => py.None(),62DataType::Object(_) => {63let obj: Option<&ObjectValue> = c.get_object(idx).map(|any| any.into());64obj.into_py_any(py).unwrap()65},66_ => {67// SAFETY: we are in bounds.68let av = unsafe { c.get_unchecked(idx) };69Wrap(av).into_py_any(py).unwrap()70},71}),72)73.unwrap()74}),75)76}7778#[allow(clippy::wrong_self_convention)]79pub fn to_arrow(80&self,81py: Python<'_>,82compat_level: PyCompatLevel,83) -> PyResult<Vec<Py<PyAny>>> {84let mut df = self.df.write();85let dfr = &mut *df; // Lock guard isn't Send, but mut ref is.86py.enter_polars_ok(|| dfr.align_chunks_par())?;87let df = RwLockWriteGuard::downgrade(df);8889let pyarrow = py.import("pyarrow")?;9091let mut chunks = df.iter_chunks(compat_level.0, true);92let mut rbs = Vec::with_capacity(chunks.size_hint().0);93// df.iter_chunks() iteration could internally try to acquire the GIL on another thread,94// so we make sure to run chunks.next() within enter_polars().95while let Some(rb) = py.enter_polars_ok(|| chunks.next())? {96let rb = interop::arrow::to_py::to_py_rb(&rb, py, &pyarrow)?;97rbs.push(rb);98}99Ok(rbs)100}101102/// Create a `Vec` of PyArrow RecordBatch instances.103///104/// Note this will give bad results for columns with dtype `pl.Object`,105/// since those can't be converted correctly via PyArrow. The calling Python106/// code should make sure these are not included.107#[allow(clippy::wrong_self_convention)]108pub fn to_pandas(&self, py: Python) -> PyResult<Vec<Py<PyAny>>> {109let mut df = self.df.write();110let dfr = &mut *df; // Lock guard isn't Send, but mut ref is.111py.enter_polars_ok(|| dfr.as_single_chunk_par())?;112let df = RwLockWriteGuard::downgrade(df);113Python::attach(|py| {114let pyarrow = py.import("pyarrow")?;115let cat_columns = df116.get_columns()117.iter()118.enumerate()119.filter(|(_i, s)| {120matches!(121s.dtype(),122DataType::Categorical(_, _) | DataType::Enum(_, _)123)124})125.map(|(i, _)| i)126.collect::<Vec<_>>();127128let enum_and_categorical_dtype = ArrowDataType::Dictionary(129IntegerType::Int64,130Box::new(ArrowDataType::LargeUtf8),131false,132);133134let mut replaced_schema = None;135let rbs = df136.iter_chunks(CompatLevel::oldest(), true)137.map(|rb| {138let length = rb.len();139let (schema, mut arrays) = rb.into_schema_and_arrays();140141// Pandas does not allow unsigned dictionary indices so we replace them.142replaced_schema =143(replaced_schema.is_none() && !cat_columns.is_empty()).then(|| {144let mut schema = schema.as_ref().clone();145for i in &cat_columns {146let (_, field) = schema.get_at_index_mut(*i).unwrap();147field.dtype = enum_and_categorical_dtype.clone();148}149Arc::new(schema)150});151152for i in &cat_columns {153let arr = arrays.get_mut(*i).unwrap();154let out = polars_compute::cast::cast(155&**arr,156&enum_and_categorical_dtype,157CastOptionsImpl::default(),158)159.unwrap();160*arr = out;161}162let schema = replaced_schema163.as_ref()164.map_or(schema, |replaced| replaced.clone());165let rb = RecordBatch::new(length, schema, arrays);166167interop::arrow::to_py::to_py_rb(&rb, py, &pyarrow)168})169.collect::<PyResult<_>>()?;170Ok(rbs)171})172}173174#[allow(unused_variables)]175#[pyo3(signature = (requested_schema))]176fn __arrow_c_stream__<'py>(177&self,178py: Python<'py>,179requested_schema: Option<Py<PyAny>>,180) -> PyResult<Bound<'py, PyCapsule>> {181py.enter_polars_ok(|| {182self.df.write().as_single_chunk_par();183})?;184dataframe_to_stream(&self.df.read(), py)185}186}187188189