Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/dataframe/export.rs
7889 views
1
use arrow::datatypes::IntegerType;
2
use arrow::record_batch::RecordBatch;
3
use parking_lot::RwLockWriteGuard;
4
use polars::prelude::*;
5
use polars_compute::cast::CastOptionsImpl;
6
use pyo3::IntoPyObjectExt;
7
use pyo3::prelude::*;
8
use pyo3::types::{PyCapsule, PyList, PyTuple};
9
10
use super::PyDataFrame;
11
use crate::conversion::{ObjectValue, Wrap};
12
use crate::error::PyPolarsErr;
13
use crate::interop;
14
use crate::interop::arrow::to_py::dataframe_to_stream;
15
use crate::prelude::PyCompatLevel;
16
use crate::utils::EnterPolarsExt;
17
18
#[pymethods]
19
impl PyDataFrame {
20
#[cfg(feature = "object")]
21
pub fn row_tuple<'py>(&self, idx: i64, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
22
let df = self.df.read();
23
let idx = if idx < 0 {
24
(df.height() as i64 + idx) as usize
25
} else {
26
idx as usize
27
};
28
if idx >= df.height() {
29
return Err(PyPolarsErr::from(polars_err!(oob = idx, df.height())).into());
30
}
31
PyTuple::new(
32
py,
33
df.get_columns().iter().map(|s| match s.dtype() {
34
DataType::Object(_) => {
35
let obj: Option<&ObjectValue> = s.get_object(idx).map(|any| any.into());
36
obj.into_py_any(py).unwrap()
37
},
38
_ => Wrap(s.get(idx).unwrap()).into_py_any(py).unwrap(),
39
}),
40
)
41
}
42
43
#[cfg(feature = "object")]
44
pub fn row_tuples<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
45
let df = self.df.read();
46
let mut rechunked;
47
// Rechunk if random access would become rather expensive.
48
// TODO: iterate over the chunks directly instead of using random access.
49
let df = if df.max_n_chunks() > 16 {
50
rechunked = df.clone();
51
py.enter_polars_ok(|| rechunked.as_single_chunk_par())?;
52
&rechunked
53
} else {
54
&df
55
};
56
PyList::new(
57
py,
58
(0..df.height()).map(|idx| {
59
PyTuple::new(
60
py,
61
df.get_columns().iter().map(|c| match c.dtype() {
62
DataType::Null => py.None(),
63
DataType::Object(_) => {
64
let obj: Option<&ObjectValue> = c.get_object(idx).map(|any| any.into());
65
obj.into_py_any(py).unwrap()
66
},
67
_ => {
68
// SAFETY: we are in bounds.
69
let av = unsafe { c.get_unchecked(idx) };
70
Wrap(av).into_py_any(py).unwrap()
71
},
72
}),
73
)
74
.unwrap()
75
}),
76
)
77
}
78
79
#[allow(clippy::wrong_self_convention)]
80
pub fn to_arrow(
81
&self,
82
py: Python<'_>,
83
compat_level: PyCompatLevel,
84
) -> PyResult<Vec<Py<PyAny>>> {
85
let mut df = self.df.write();
86
let dfr = &mut *df; // Lock guard isn't Send, but mut ref is.
87
py.enter_polars_ok(|| dfr.align_chunks_par())?;
88
let df = RwLockWriteGuard::downgrade(df);
89
90
let pyarrow = py.import("pyarrow")?;
91
92
let mut chunks = df.iter_chunks(compat_level.0, true);
93
let mut rbs = Vec::with_capacity(chunks.size_hint().0);
94
// df.iter_chunks() iteration could internally try to acquire the GIL on another thread,
95
// so we make sure to run chunks.next() within enter_polars().
96
while let Some(rb) = py.enter_polars_ok(|| chunks.next())? {
97
let rb = interop::arrow::to_py::to_py_rb(&rb, py, &pyarrow)?;
98
rbs.push(rb);
99
}
100
Ok(rbs)
101
}
102
103
/// Create a `Vec` of PyArrow RecordBatch instances.
104
///
105
/// Note this will give bad results for columns with dtype `pl.Object`,
106
/// since those can't be converted correctly via PyArrow. The calling Python
107
/// code should make sure these are not included.
108
#[allow(clippy::wrong_self_convention)]
109
pub fn to_pandas(&self, py: Python) -> PyResult<Vec<Py<PyAny>>> {
110
let mut df = self.df.write();
111
let dfr = &mut *df; // Lock guard isn't Send, but mut ref is.
112
py.enter_polars_ok(|| dfr.as_single_chunk_par())?;
113
let df = RwLockWriteGuard::downgrade(df);
114
Python::attach(|py| {
115
let pyarrow = py.import("pyarrow")?;
116
let cat_columns = df
117
.get_columns()
118
.iter()
119
.enumerate()
120
.filter(|(_i, s)| {
121
matches!(
122
s.dtype(),
123
DataType::Categorical(_, _) | DataType::Enum(_, _)
124
)
125
})
126
.map(|(i, _)| i)
127
.collect::<Vec<_>>();
128
129
let enum_and_categorical_dtype = ArrowDataType::Dictionary(
130
IntegerType::Int64,
131
Box::new(ArrowDataType::LargeUtf8),
132
false,
133
);
134
135
let mut replaced_schema = None;
136
let rbs = df
137
.iter_chunks(CompatLevel::oldest(), true)
138
.map(|rb| {
139
let length = rb.len();
140
let (schema, mut arrays) = rb.into_schema_and_arrays();
141
142
// Pandas does not allow unsigned dictionary indices so we replace them.
143
replaced_schema =
144
(replaced_schema.is_none() && !cat_columns.is_empty()).then(|| {
145
let mut schema = schema.as_ref().clone();
146
for i in &cat_columns {
147
let (_, field) = schema.get_at_index_mut(*i).unwrap();
148
field.dtype = enum_and_categorical_dtype.clone();
149
}
150
Arc::new(schema)
151
});
152
153
for i in &cat_columns {
154
let arr = arrays.get_mut(*i).unwrap();
155
let out = polars_compute::cast::cast(
156
&**arr,
157
&enum_and_categorical_dtype,
158
CastOptionsImpl::default(),
159
)
160
.unwrap();
161
*arr = out;
162
}
163
let schema = replaced_schema
164
.as_ref()
165
.map_or(schema, |replaced| replaced.clone());
166
let rb = RecordBatch::new(length, schema, arrays);
167
168
interop::arrow::to_py::to_py_rb(&rb, py, &pyarrow)
169
})
170
.collect::<PyResult<_>>()?;
171
Ok(rbs)
172
})
173
}
174
175
#[allow(unused_variables)]
176
#[pyo3(signature = (requested_schema))]
177
fn __arrow_c_stream__<'py>(
178
&self,
179
py: Python<'py>,
180
requested_schema: Option<Py<PyAny>>,
181
) -> PyResult<Bound<'py, PyCapsule>> {
182
py.enter_polars_ok(|| {
183
self.df.write().as_single_chunk_par();
184
})?;
185
dataframe_to_stream(&self.df.read(), py)
186
}
187
}
188
189