Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/interop/arrow/to_py.rs
7889 views
1
use std::ffi::CString;
2
3
use arrow::datatypes::ArrowDataType;
4
use arrow::ffi;
5
use arrow::record_batch::RecordBatch;
6
use polars::datatypes::CompatLevel;
7
use polars::frame::DataFrame;
8
use polars::prelude::{ArrayRef, ArrowField, PlSmallStr, SchemaExt};
9
use polars::series::Series;
10
use polars_core::utils::arrow;
11
use polars_error::PolarsResult;
12
use pyo3::ffi::Py_uintptr_t;
13
use pyo3::prelude::*;
14
use pyo3::types::PyCapsule;
15
16
/// Arrow array to Python.
17
pub(crate) fn to_py_array(
18
array: ArrayRef,
19
field: &ArrowField,
20
pyarrow: &Bound<PyModule>,
21
) -> PyResult<Py<PyAny>> {
22
let schema = Box::new(ffi::export_field_to_c(field));
23
let array = Box::new(ffi::export_array_to_c(array));
24
25
let schema_ptr: *const ffi::ArrowSchema = &*schema;
26
let array_ptr: *const ffi::ArrowArray = &*array;
27
28
let array = pyarrow.getattr("Array")?.call_method1(
29
"_import_from_c",
30
(array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),
31
)?;
32
33
Ok(array.unbind())
34
}
35
36
/// RecordBatch to Python.
37
pub(crate) fn to_py_rb(
38
rb: &RecordBatch,
39
py: Python<'_>,
40
pyarrow: &Bound<PyModule>,
41
) -> PyResult<Py<PyAny>> {
42
let mut arrays = Vec::with_capacity(rb.width());
43
44
for (array, field) in rb.columns().iter().zip(rb.schema().iter_values()) {
45
let array_object = to_py_array(array.clone(), field, pyarrow)?;
46
arrays.push(array_object);
47
}
48
49
let schema = Box::new(ffi::export_field_to_c(&ArrowField {
50
name: PlSmallStr::EMPTY,
51
dtype: ArrowDataType::Struct(rb.schema().iter_values().cloned().collect()),
52
is_nullable: false,
53
metadata: None,
54
}));
55
let schema_ptr: *const ffi::ArrowSchema = &*schema;
56
57
let schema = pyarrow
58
.getattr("Schema")?
59
.call_method1("_import_from_c", (schema_ptr as Py_uintptr_t,))?;
60
let record = pyarrow
61
.getattr("RecordBatch")?
62
.call_method1("from_arrays", (arrays, py.None(), schema))?;
63
64
Ok(record.unbind())
65
}
66
67
/// Export a series to a C stream via a PyCapsule according to the Arrow PyCapsule Interface
68
/// https://arrow.apache.org/docs/dev/format/CDataInterface/PyCapsuleInterface.html
69
pub(crate) fn series_to_stream<'py>(
70
series: &Series,
71
py: Python<'py>,
72
) -> PyResult<Bound<'py, PyCapsule>> {
73
let field = series.field().to_arrow(CompatLevel::newest());
74
let series = series.clone();
75
let iter = Box::new(
76
(0..series.n_chunks()).map(move |i| Ok(series.to_arrow(i, CompatLevel::newest()))),
77
) as _;
78
79
let stream = ffi::export_iterator(iter, field);
80
let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
81
PyCapsule::new(py, stream, Some(stream_capsule_name))
82
}
83
84
pub(crate) fn dataframe_to_stream<'py>(
85
df: &DataFrame,
86
py: Python<'py>,
87
) -> PyResult<Bound<'py, PyCapsule>> {
88
let iter = Box::new(DataFrameStreamIterator::new(df));
89
let field = iter.field();
90
let stream = ffi::export_iterator(iter, field);
91
let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
92
PyCapsule::new(py, stream, Some(stream_capsule_name))
93
}
94
95
#[cfg(feature = "c_api")]
96
#[pyfunction]
97
pub(crate) fn polars_schema_to_pycapsule<'py>(
98
py: Python<'py>,
99
schema: crate::prelude::Wrap<polars::prelude::Schema>,
100
compat_level: crate::prelude::PyCompatLevel,
101
) -> PyResult<Bound<'py, PyCapsule>> {
102
let schema: arrow::ffi::ArrowSchema = arrow::ffi::export_field_to_c(&ArrowField::new(
103
PlSmallStr::EMPTY,
104
ArrowDataType::Struct(
105
schema
106
.0
107
.iter_fields()
108
.map(|x| x.to_arrow(compat_level.0))
109
.collect(),
110
),
111
false,
112
));
113
114
let capsule_name = CString::new("arrow_schema").unwrap();
115
PyCapsule::new(py, schema, Some(capsule_name))
116
}
117
118
pub struct DataFrameStreamIterator {
119
columns: Vec<Series>,
120
dtype: ArrowDataType,
121
idx: usize,
122
n_chunks: usize,
123
}
124
125
impl DataFrameStreamIterator {
126
fn new(df: &DataFrame) -> Self {
127
let schema = df.schema().to_arrow(CompatLevel::newest());
128
let dtype = ArrowDataType::Struct(schema.into_iter_values().collect());
129
130
Self {
131
columns: df
132
.get_columns()
133
.iter()
134
.map(|v| v.as_materialized_series().clone())
135
.collect(),
136
dtype,
137
idx: 0,
138
n_chunks: df.first_col_n_chunks(),
139
}
140
}
141
142
fn field(&self) -> ArrowField {
143
ArrowField::new(PlSmallStr::EMPTY, self.dtype.clone(), false)
144
}
145
}
146
147
impl Iterator for DataFrameStreamIterator {
148
type Item = PolarsResult<ArrayRef>;
149
150
fn next(&mut self) -> Option<Self::Item> {
151
if self.idx >= self.n_chunks {
152
None
153
} else {
154
// create a batch of the columns with the same chunk no.
155
let batch_cols = self
156
.columns
157
.iter()
158
.map(|s| s.to_arrow(self.idx, CompatLevel::newest()))
159
.collect::<Vec<_>>();
160
self.idx += 1;
161
162
let array = arrow::array::StructArray::new(
163
self.dtype.clone(),
164
batch_cols[0].len(),
165
batch_cols,
166
None,
167
);
168
Some(Ok(Box::new(array)))
169
}
170
}
171
}
172
173