Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/read/stream.rs
6940 views
1
use std::io::Read;
2
3
use arrow_format::ipc::planus::ReadAsRoot;
4
use polars_error::{PolarsError, PolarsResult, polars_bail, polars_err};
5
6
use super::super::CONTINUATION_MARKER;
7
use super::common::*;
8
use super::schema::deserialize_stream_metadata;
9
use super::{Dictionaries, OutOfSpecKind};
10
use crate::array::Array;
11
use crate::datatypes::{ArrowSchema, Metadata};
12
use crate::io::ipc::IpcSchema;
13
use crate::record_batch::RecordBatchT;
14
15
/// Metadata of an Arrow IPC stream, written at the start of the stream
16
#[derive(Debug, Clone)]
17
pub struct StreamMetadata {
18
/// The schema that is read from the stream's first message
19
pub schema: ArrowSchema,
20
21
/// The custom metadata that is read from the schema
22
pub custom_schema_metadata: Option<Metadata>,
23
24
/// The IPC version of the stream
25
pub version: arrow_format::ipc::MetadataVersion,
26
27
/// The IPC fields tracking dictionaries
28
pub ipc_schema: IpcSchema,
29
}
30
31
/// Reads the metadata of the stream
32
pub fn read_stream_metadata(reader: &mut dyn std::io::Read) -> PolarsResult<StreamMetadata> {
33
// determine metadata length
34
let mut meta_size: [u8; 4] = [0; 4];
35
reader.read_exact(&mut meta_size)?;
36
let meta_length = {
37
// If a continuation marker is encountered, skip over it and read
38
// the size from the next four bytes.
39
if meta_size == CONTINUATION_MARKER {
40
reader.read_exact(&mut meta_size)?;
41
}
42
i32::from_le_bytes(meta_size)
43
};
44
45
let length: usize = meta_length
46
.try_into()
47
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
48
49
let mut buffer = vec![];
50
buffer.try_reserve(length)?;
51
reader.take(length as u64).read_to_end(&mut buffer)?;
52
53
deserialize_stream_metadata(&buffer)
54
}
55
56
/// Encodes the stream's status after each read.
57
///
58
/// A stream is an iterator, and an iterator returns `Option<Item>`. The `Item`
59
/// type in the [`StreamReader`] case is `StreamState`, which means that an Arrow
60
/// stream may yield one of three values: (1) `None`, which signals that the stream
61
/// is done; (2) [`StreamState::Some`], which signals that there was
62
/// data waiting in the stream and we read it; and finally (3)
63
/// [`Some(StreamState::Waiting)`], which means that the stream is still "live", it
64
/// just doesn't hold any data right now.
65
pub enum StreamState {
66
/// A live stream without data
67
Waiting,
68
/// Next item in the stream
69
Some(RecordBatchT<Box<dyn Array>>),
70
}
71
72
impl StreamState {
73
/// Return the data inside this wrapper.
74
///
75
/// # Panics
76
///
77
/// If the `StreamState` was `Waiting`.
78
pub fn unwrap(self) -> RecordBatchT<Box<dyn Array>> {
79
if let StreamState::Some(batch) = self {
80
batch
81
} else {
82
panic!("The batch is not available")
83
}
84
}
85
}
86
87
/// Reads the next item, yielding `None` if the stream is done,
88
/// and a [`StreamState`] otherwise.
89
fn read_next<R: Read>(
90
reader: &mut R,
91
metadata: &StreamMetadata,
92
dictionaries: &mut Dictionaries,
93
message_buffer: &mut Vec<u8>,
94
data_buffer: &mut Vec<u8>,
95
projection: &Option<ProjectionInfo>,
96
scratch: &mut Vec<u8>,
97
) -> PolarsResult<Option<StreamState>> {
98
// determine metadata length
99
let mut meta_length: [u8; 4] = [0; 4];
100
101
match reader.read_exact(&mut meta_length) {
102
Ok(()) => (),
103
Err(e) => {
104
return if e.kind() == std::io::ErrorKind::UnexpectedEof {
105
// Handle EOF without the "0xFFFFFFFF 0x00000000"
106
// valid according to:
107
// https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
108
Ok(Some(StreamState::Waiting))
109
} else {
110
Err(PolarsError::from(e))
111
};
112
},
113
}
114
115
let meta_length = {
116
// If a continuation marker is encountered, skip over it and read
117
// the size from the next four bytes.
118
if meta_length == CONTINUATION_MARKER {
119
reader.read_exact(&mut meta_length)?;
120
}
121
i32::from_le_bytes(meta_length)
122
};
123
124
let meta_length: usize = meta_length
125
.try_into()
126
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
127
128
if meta_length == 0 {
129
// the stream has ended, mark the reader as finished
130
return Ok(None);
131
}
132
133
message_buffer.clear();
134
message_buffer.try_reserve(meta_length)?;
135
reader
136
.by_ref()
137
.take(meta_length as u64)
138
.read_to_end(message_buffer)?;
139
140
let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer.as_ref())
141
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))?;
142
143
let header = message
144
.header()
145
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
146
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
147
148
let block_length: usize = message
149
.body_length()
150
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferBodyLength(err)))?
151
.try_into()
152
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
153
154
match header {
155
arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => {
156
data_buffer.clear();
157
data_buffer.try_reserve(block_length)?;
158
reader
159
.by_ref()
160
.take(block_length as u64)
161
.read_to_end(data_buffer)?;
162
163
let file_size = data_buffer.len() as u64;
164
165
let mut reader = std::io::Cursor::new(data_buffer);
166
167
let chunk = read_record_batch(
168
batch,
169
&metadata.schema,
170
&metadata.ipc_schema,
171
projection.as_ref().map(|x| x.columns.as_ref()),
172
None,
173
dictionaries,
174
metadata.version,
175
&mut reader,
176
0,
177
file_size,
178
scratch,
179
);
180
181
if let Some(ProjectionInfo { map, .. }) = projection {
182
// re-order according to projection
183
chunk
184
.map(|chunk| apply_projection(chunk, map))
185
.map(|x| Some(StreamState::Some(x)))
186
} else {
187
chunk.map(|x| Some(StreamState::Some(x)))
188
}
189
},
190
arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {
191
data_buffer.clear();
192
data_buffer.try_reserve(block_length)?;
193
reader
194
.by_ref()
195
.take(block_length as u64)
196
.read_to_end(data_buffer)?;
197
198
let file_size = data_buffer.len() as u64;
199
let mut dict_reader = std::io::Cursor::new(&data_buffer);
200
201
read_dictionary(
202
batch,
203
&metadata.schema,
204
&metadata.ipc_schema,
205
dictionaries,
206
&mut dict_reader,
207
0,
208
file_size,
209
scratch,
210
)?;
211
212
// read the next message until we encounter a RecordBatch message
213
read_next(
214
reader,
215
metadata,
216
dictionaries,
217
message_buffer,
218
data_buffer,
219
projection,
220
scratch,
221
)
222
},
223
_ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
224
}
225
}
226
227
/// Arrow Stream reader.
228
///
229
/// An [`Iterator`] over an Arrow stream that yields a result of [`StreamState`]s.
230
/// This is the recommended way to read an arrow stream (by iterating over its data).
231
///
232
/// For a more thorough walkthrough consult [this example](https://github.com/jorgecarleitao/polars_arrow/tree/main/examples/ipc_pyarrow).
233
pub struct StreamReader<R: Read> {
234
reader: R,
235
metadata: StreamMetadata,
236
dictionaries: Dictionaries,
237
finished: bool,
238
data_buffer: Vec<u8>,
239
message_buffer: Vec<u8>,
240
projection: Option<ProjectionInfo>,
241
scratch: Vec<u8>,
242
}
243
244
impl<R: Read> StreamReader<R> {
245
/// Try to create a new stream reader
246
///
247
/// The first message in the stream is the schema, the reader will fail if it does not
248
/// encounter a schema.
249
/// To check if the reader is done, use `is_finished(self)`
250
pub fn new(reader: R, metadata: StreamMetadata, projection: Option<Vec<usize>>) -> Self {
251
let projection =
252
projection.map(|projection| prepare_projection(&metadata.schema, projection));
253
254
Self {
255
reader,
256
metadata,
257
dictionaries: Default::default(),
258
finished: false,
259
data_buffer: Default::default(),
260
message_buffer: Default::default(),
261
projection,
262
scratch: Default::default(),
263
}
264
}
265
266
/// Return the schema of the stream
267
pub fn metadata(&self) -> &StreamMetadata {
268
&self.metadata
269
}
270
271
/// Return the schema of the file
272
pub fn schema(&self) -> &ArrowSchema {
273
self.projection
274
.as_ref()
275
.map(|x| &x.schema)
276
.unwrap_or(&self.metadata.schema)
277
}
278
279
/// Check if the stream is finished
280
pub fn is_finished(&self) -> bool {
281
self.finished
282
}
283
284
fn maybe_next(&mut self) -> PolarsResult<Option<StreamState>> {
285
if self.finished {
286
return Ok(None);
287
}
288
let batch = read_next(
289
&mut self.reader,
290
&self.metadata,
291
&mut self.dictionaries,
292
&mut self.message_buffer,
293
&mut self.data_buffer,
294
&self.projection,
295
&mut self.scratch,
296
)?;
297
if batch.is_none() {
298
self.finished = true;
299
}
300
Ok(batch)
301
}
302
}
303
304
impl<R: Read> Iterator for StreamReader<R> {
305
type Item = PolarsResult<StreamState>;
306
307
fn next(&mut self) -> Option<Self::Item> {
308
self.maybe_next().transpose()
309
}
310
}
311
312