Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/ffi/stream.rs
6939 views
1
use std::ffi::{CStr, CString};
2
use std::ops::DerefMut;
3
4
use polars_error::{PolarsError, PolarsResult, polars_bail, polars_err};
5
6
use super::{
7
ArrowArray, ArrowArrayStream, ArrowSchema, export_array_to_c, export_field_to_c,
8
import_array_from_c, import_field_from_c,
9
};
10
use crate::array::Array;
11
use crate::datatypes::Field;
12
13
impl Drop for ArrowArrayStream {
14
fn drop(&mut self) {
15
match self.release {
16
None => (),
17
Some(release) => unsafe { release(self) },
18
};
19
}
20
}
21
22
unsafe impl Send for ArrowArrayStream {}
23
24
impl ArrowArrayStream {
25
/// Creates an empty [`ArrowArrayStream`] used to import from a producer.
26
pub fn empty() -> Self {
27
Self {
28
get_schema: None,
29
get_next: None,
30
get_last_error: None,
31
release: None,
32
private_data: std::ptr::null_mut(),
33
}
34
}
35
}
36
37
unsafe fn handle_error(iter: &mut ArrowArrayStream) -> PolarsError {
38
let error = unsafe { (iter.get_last_error.unwrap())(&mut *iter) };
39
40
if error.is_null() {
41
return polars_err!(ComputeError: "got unspecified external error");
42
}
43
44
let error = unsafe { CStr::from_ptr(error) };
45
polars_err!(ComputeError: "got external error: {}", error.to_str().unwrap())
46
}
47
48
/// Implements an iterator of [`Array`] consumed from the [C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html).
49
pub struct ArrowArrayStreamReader<Iter: DerefMut<Target = ArrowArrayStream>> {
50
iter: Iter,
51
field: Field,
52
}
53
54
impl<Iter: DerefMut<Target = ArrowArrayStream>> ArrowArrayStreamReader<Iter> {
55
/// Returns a new [`ArrowArrayStreamReader`]
56
/// # Error
57
/// Errors iff the [`ArrowArrayStream`] is out of specification,
58
/// or was already released prior to calling this function.
59
///
60
/// # Safety
61
/// This method is intrinsically `unsafe` since it assumes that the `ArrowArrayStream`
62
/// contains a valid Arrow C stream interface.
63
/// In particular:
64
/// * The `ArrowArrayStream` fulfills the invariants of the C stream interface
65
/// * The schema `get_schema` produces fulfills the C data interface
66
pub unsafe fn try_new(mut iter: Iter) -> PolarsResult<Self> {
67
if iter.release.is_none() {
68
polars_bail!(InvalidOperation: "the C stream was already released")
69
};
70
71
if iter.get_next.is_none() {
72
polars_bail!(InvalidOperation: "the C stream must contain a non-null get_next")
73
};
74
75
if iter.get_last_error.is_none() {
76
polars_bail!(InvalidOperation: "The C stream MUST contain a non-null get_last_error")
77
};
78
79
let mut field = ArrowSchema::empty();
80
let status = if let Some(f) = iter.get_schema {
81
unsafe { (f)(&mut *iter, &mut field) }
82
} else {
83
polars_bail!(InvalidOperation:
84
"The C stream MUST contain a non-null get_schema"
85
)
86
};
87
88
if status != 0 {
89
return Err(unsafe { handle_error(&mut iter) });
90
}
91
92
let field = unsafe { import_field_from_c(&field)? };
93
94
Ok(Self { iter, field })
95
}
96
97
/// Returns the field provided by the stream
98
pub fn field(&self) -> &Field {
99
&self.field
100
}
101
102
/// Advances this iterator by one array
103
/// # Error
104
/// Errors iff:
105
/// * The C stream interface returns an error
106
/// * The C stream interface returns an invalid array (that we can identify, see Safety below)
107
///
108
/// # Safety
109
/// Calling this iterator's `next` assumes that the [`ArrowArrayStream`] produces arrow arrays
110
/// that fulfill the C data interface
111
pub unsafe fn next(&mut self) -> Option<PolarsResult<Box<dyn Array>>> {
112
let mut array = ArrowArray::empty();
113
let status = unsafe { (self.iter.get_next.unwrap())(&mut *self.iter, &mut array) };
114
115
if status != 0 {
116
return Some(Err(unsafe { handle_error(&mut self.iter) }));
117
}
118
119
// last paragraph of https://arrow.apache.org/docs/format/CStreamInterface.html#c.ArrowArrayStream.get_next
120
array.release?;
121
122
// SAFETY: assumed from the C stream interface
123
unsafe { import_array_from_c(array, self.field.dtype.clone()) }
124
.map(Some)
125
.transpose()
126
}
127
}
128
129
struct PrivateData {
130
iter: Box<dyn Iterator<Item = PolarsResult<Box<dyn Array>>>>,
131
field: Field,
132
error: Option<CString>,
133
}
134
135
unsafe extern "C" fn get_next(iter: *mut ArrowArrayStream, array: *mut ArrowArray) -> i32 {
136
if iter.is_null() {
137
return 2001;
138
}
139
let private = &mut *((*iter).private_data as *mut PrivateData);
140
141
match private.iter.next() {
142
Some(Ok(item)) => {
143
// check that the array has the same dtype as field
144
let item_dt = item.dtype();
145
let expected_dt = private.field.dtype();
146
if item_dt != expected_dt {
147
private.error = Some(CString::new(format!("The iterator produced an item of data type {item_dt:?} but the producer expects data type {expected_dt:?}").as_bytes().to_vec()).unwrap());
148
return 2001; // custom application specific error (since this is never a result of this interface)
149
}
150
151
std::ptr::write(array, export_array_to_c(item));
152
153
private.error = None;
154
0
155
},
156
Some(Err(err)) => {
157
private.error = Some(CString::new(err.to_string().as_bytes().to_vec()).unwrap());
158
2001 // custom application specific error (since this is never a result of this interface)
159
},
160
None => {
161
let a = ArrowArray::empty();
162
std::ptr::write_unaligned(array, a);
163
private.error = None;
164
0
165
},
166
}
167
}
168
169
unsafe extern "C" fn get_schema(iter: *mut ArrowArrayStream, schema: *mut ArrowSchema) -> i32 {
170
if iter.is_null() {
171
return 2001;
172
}
173
let private = &mut *((*iter).private_data as *mut PrivateData);
174
175
std::ptr::write(schema, export_field_to_c(&private.field));
176
0
177
}
178
179
unsafe extern "C" fn get_last_error(iter: *mut ArrowArrayStream) -> *const ::std::os::raw::c_char {
180
if iter.is_null() {
181
return std::ptr::null();
182
}
183
let private = &mut *((*iter).private_data as *mut PrivateData);
184
185
private
186
.error
187
.as_ref()
188
.map(|x| x.as_ptr())
189
.unwrap_or(std::ptr::null())
190
}
191
192
unsafe extern "C" fn release(iter: *mut ArrowArrayStream) {
193
if iter.is_null() {
194
return;
195
}
196
let _ = Box::from_raw((*iter).private_data as *mut PrivateData);
197
(*iter).release = None;
198
// private drops automatically
199
}
200
201
/// Exports an iterator to the [C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html)
202
pub fn export_iterator(
203
iter: Box<dyn Iterator<Item = PolarsResult<Box<dyn Array>>>>,
204
field: Field,
205
) -> ArrowArrayStream {
206
let private_data = Box::new(PrivateData {
207
iter,
208
field,
209
error: None,
210
});
211
212
ArrowArrayStream {
213
get_schema: Some(get_schema),
214
get_next: Some(get_next),
215
get_last_error: Some(get_last_error),
216
release: Some(release),
217
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
218
}
219
}
220
221