Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/ipc/ipc_stream.rs
6939 views
1
//! # (De)serializing Arrows Streaming IPC format.
2
//!
3
//! Arrow Streaming IPC is a [binary format](https://arrow.apache.org/docs/python/ipc.html).
4
//! It used for sending an arbitrary length sequence of record batches.
5
//! The format must be processed from start to end, and does not support random access.
6
//! It is different than IPC, if you can't deserialize a file with `IpcReader::new`, it's probably an IPC Stream File.
7
//!
8
//! ## Example
9
//!
10
//! ```rust
11
//! use polars_core::prelude::*;
12
//! use polars_io::prelude::*;
13
//! use std::io::Cursor;
14
//!
15
//!
16
//! let c0 = Column::new("days".into(), &[0, 1, 2, 3, 4]);
17
//! let c1 = Column::new("temp".into(), &[22.1, 19.9, 7., 2., 3.]);
18
//! let mut df = DataFrame::new(vec![c0, c1]).unwrap();
19
//!
20
//! // Create an in memory file handler.
21
//! // Vec<u8>: Read + Write
22
//! // Cursor<T>: Seek
23
//!
24
//! let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
25
//!
26
//! // write to the in memory buffer
27
//! IpcStreamWriter::new(&mut buf).finish(&mut df).expect("ipc writer");
28
//!
29
//! // reset the buffers index after writing to the beginning of the buffer
30
//! buf.set_position(0);
31
//!
32
//! // read the buffer into a DataFrame
33
//! let df_read = IpcStreamReader::new(buf).finish().unwrap();
34
//! assert!(df.equals(&df_read));
35
//! ```
36
use std::io::{Read, Write};
37
use std::path::PathBuf;
38
39
use arrow::datatypes::Metadata;
40
use arrow::io::ipc::read::{StreamMetadata, StreamState};
41
use arrow::io::ipc::write::WriteOptions;
42
use arrow::io::ipc::{read, write};
43
use polars_core::frame::chunk_df_for_writing;
44
use polars_core::prelude::*;
45
46
use crate::prelude::*;
47
use crate::shared::{ArrowReader, finish_reader};
48
49
/// Read Arrows Stream IPC format into a DataFrame
50
///
51
/// # Example
52
/// ```
53
/// use polars_core::prelude::*;
54
/// use std::fs::File;
55
/// use polars_io::ipc::IpcStreamReader;
56
/// use polars_io::SerReader;
57
///
58
/// fn example() -> PolarsResult<DataFrame> {
59
/// let file = File::open("file.ipc").expect("file not found");
60
///
61
/// IpcStreamReader::new(file)
62
/// .finish()
63
/// }
64
/// ```
65
#[must_use]
66
pub struct IpcStreamReader<R> {
67
/// File or Stream object
68
reader: R,
69
/// Aggregates chunks afterwards to a single chunk.
70
rechunk: bool,
71
n_rows: Option<usize>,
72
projection: Option<Vec<usize>>,
73
columns: Option<Vec<String>>,
74
row_index: Option<RowIndex>,
75
metadata: Option<StreamMetadata>,
76
}
77
78
impl<R: Read> IpcStreamReader<R> {
79
/// Get schema of the Ipc Stream File
80
pub fn schema(&mut self) -> PolarsResult<Schema> {
81
Ok(Schema::from_arrow_schema(&self.metadata()?.schema))
82
}
83
84
/// Get arrow schema of the Ipc Stream File, this is faster than creating a polars schema.
85
pub fn arrow_schema(&mut self) -> PolarsResult<ArrowSchema> {
86
Ok(self.metadata()?.schema)
87
}
88
89
/// Get schema-level custom metadata of the Ipc Stream file
90
pub fn custom_metadata(&mut self) -> PolarsResult<Option<Arc<Metadata>>> {
91
Ok(self.metadata()?.custom_schema_metadata.map(Arc::new))
92
}
93
94
/// Stop reading when `n` rows are read.
95
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
96
self.n_rows = num_rows;
97
self
98
}
99
100
/// Columns to select/ project
101
pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
102
self.columns = columns;
103
self
104
}
105
106
/// Add a row index column.
107
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
108
self.row_index = row_index;
109
self
110
}
111
112
/// Set the reader's column projection. This counts from 0, meaning that
113
/// `vec![0, 4]` would select the 1st and 5th column.
114
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
115
self.projection = projection;
116
self
117
}
118
119
fn metadata(&mut self) -> PolarsResult<StreamMetadata> {
120
match &self.metadata {
121
None => {
122
let metadata = read::read_stream_metadata(&mut self.reader)?;
123
self.metadata = Option::from(metadata.clone());
124
Ok(metadata)
125
},
126
Some(md) => Ok(md.clone()),
127
}
128
}
129
}
130
131
impl<R> ArrowReader for read::StreamReader<R>
132
where
133
R: Read,
134
{
135
fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
136
self.next().map_or(Ok(None), |v| match v {
137
Ok(stream_state) => match stream_state {
138
StreamState::Waiting => Ok(None),
139
StreamState::Some(chunk) => Ok(Some(chunk)),
140
},
141
Err(err) => Err(err),
142
})
143
}
144
}
145
146
impl<R> SerReader<R> for IpcStreamReader<R>
147
where
148
R: Read,
149
{
150
fn new(reader: R) -> Self {
151
IpcStreamReader {
152
reader,
153
rechunk: true,
154
n_rows: None,
155
columns: None,
156
projection: None,
157
row_index: None,
158
metadata: None,
159
}
160
}
161
162
fn set_rechunk(mut self, rechunk: bool) -> Self {
163
self.rechunk = rechunk;
164
self
165
}
166
167
fn finish(mut self) -> PolarsResult<DataFrame> {
168
let rechunk = self.rechunk;
169
let metadata = self.metadata()?;
170
let schema = &metadata.schema;
171
172
if let Some(columns) = self.columns {
173
let prj = columns_to_projection(&columns, schema)?;
174
self.projection = Some(prj);
175
}
176
177
let schema = if let Some(projection) = &self.projection {
178
apply_projection(&metadata.schema, projection)
179
} else {
180
metadata.schema.clone()
181
};
182
183
let ipc_reader =
184
read::StreamReader::new(&mut self.reader, metadata.clone(), self.projection);
185
finish_reader(
186
ipc_reader,
187
rechunk,
188
self.n_rows,
189
None,
190
&schema,
191
self.row_index,
192
)
193
}
194
}
195
196
/// Write a DataFrame to Arrow's Streaming IPC format
197
///
198
/// # Example
199
///
200
/// ```
201
/// use polars_core::prelude::*;
202
/// use polars_io::ipc::IpcStreamWriter;
203
/// use std::fs::File;
204
/// use polars_io::SerWriter;
205
///
206
/// fn example(df: &mut DataFrame) -> PolarsResult<()> {
207
/// let mut file = File::create("file.ipc").expect("could not create file");
208
///
209
/// let mut writer = IpcStreamWriter::new(&mut file);
210
///
211
/// let custom_metadata = [
212
/// ("first_name".into(), "John".into()),
213
/// ("last_name".into(), "Doe".into()),
214
/// ]
215
/// .into_iter()
216
/// .collect();
217
/// writer.set_custom_schema_metadata(Arc::new(custom_metadata));
218
///
219
/// writer.finish(df)
220
/// }
221
///
222
/// ```
223
#[must_use]
224
pub struct IpcStreamWriter<W> {
225
writer: W,
226
compression: Option<IpcCompression>,
227
compat_level: CompatLevel,
228
/// Custom schema-level metadata
229
custom_schema_metadata: Option<Arc<Metadata>>,
230
}
231
232
use arrow::record_batch::RecordBatch;
233
234
use crate::RowIndex;
235
236
impl<W> IpcStreamWriter<W> {
237
/// Set the compression used. Defaults to None.
238
pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
239
self.compression = compression;
240
self
241
}
242
243
pub fn with_compat_level(mut self, compat_level: CompatLevel) -> Self {
244
self.compat_level = compat_level;
245
self
246
}
247
248
/// Sets custom schema metadata. Must be called before `start` is called
249
pub fn set_custom_schema_metadata(&mut self, custom_metadata: Arc<Metadata>) {
250
self.custom_schema_metadata = Some(custom_metadata);
251
}
252
}
253
254
impl<W> SerWriter<W> for IpcStreamWriter<W>
255
where
256
W: Write,
257
{
258
fn new(writer: W) -> Self {
259
IpcStreamWriter {
260
writer,
261
compression: None,
262
compat_level: CompatLevel::oldest(),
263
custom_schema_metadata: None,
264
}
265
}
266
267
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
268
let mut ipc_stream_writer = write::StreamWriter::new(
269
&mut self.writer,
270
WriteOptions {
271
compression: self.compression.map(|c| c.into()),
272
},
273
);
274
275
if let Some(custom_metadata) = &self.custom_schema_metadata {
276
ipc_stream_writer.set_custom_schema_metadata(Arc::clone(custom_metadata));
277
}
278
279
ipc_stream_writer.start(&df.schema().to_arrow(self.compat_level), None)?;
280
let df = chunk_df_for_writing(df, 512 * 512)?;
281
let iter = df.iter_chunks(self.compat_level, true);
282
283
for batch in iter {
284
ipc_stream_writer.write(&batch, None)?
285
}
286
ipc_stream_writer.finish()?;
287
Ok(())
288
}
289
}
290
291
pub struct IpcStreamWriterOption {
292
compression: Option<IpcCompression>,
293
extension: PathBuf,
294
}
295
296
impl IpcStreamWriterOption {
297
pub fn new() -> Self {
298
Self {
299
compression: None,
300
extension: PathBuf::from(".ipc"),
301
}
302
}
303
304
/// Set the compression used. Defaults to None.
305
pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
306
self.compression = compression;
307
self
308
}
309
310
/// Set the extension. Defaults to ".ipc".
311
pub fn with_extension(mut self, extension: PathBuf) -> Self {
312
self.extension = extension;
313
self
314
}
315
}
316
317
impl Default for IpcStreamWriterOption {
318
fn default() -> Self {
319
Self::new()
320
}
321
}
322
323