Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/ipc/ipc_file.rs
8430 views
1
//! # (De)serializing Arrows IPC format.
2
//!
3
//! Arrow IPC is a [binary format](https://arrow.apache.org/docs/python/ipc.html).
4
//! It is the recommended way to serialize and deserialize Polars DataFrames as this is most true
5
//! to the data schema.
6
//!
7
//! ## Example
8
//!
9
//! ```rust
10
//! use polars_core::prelude::*;
11
//! use polars_io::prelude::*;
12
//! use std::io::Cursor;
13
//!
14
//!
15
//! let s0 = Column::new("days".into(), &[0, 1, 2, 3, 4]);
16
//! let s1 = Column::new("temp".into(), &[22.1, 19.9, 7., 2., 3.]);
17
//! let mut df = DataFrame::new_infer_height(vec![s0, s1]).unwrap();
18
//!
19
//! // Create an in memory file handler.
20
//! // Vec<u8>: Read + Write
21
//! // Cursor<T>: Seek
22
//!
23
//! let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
24
//!
25
//! // write to the in memory buffer
26
//! IpcWriter::new(&mut buf).finish(&mut df).expect("ipc writer");
27
//!
28
//! // reset the buffers index after writing to the beginning of the buffer
29
//! buf.set_position(0);
30
//!
31
//! // read the buffer into a DataFrame
32
//! let df_read = IpcReader::new(buf).finish().unwrap();
33
//! assert!(df.equals(&df_read));
34
//! ```
35
use std::io::{Read, Seek};
36
use std::path::PathBuf;
37
38
use arrow::datatypes::{ArrowSchemaRef, Metadata};
39
use arrow::io::ipc::read::{self, get_row_count};
40
use arrow::record_batch::RecordBatch;
41
use polars_core::prelude::*;
42
use polars_utils::bool::UnsafeBool;
43
use polars_utils::pl_str::PlRefStr;
44
#[cfg(feature = "serde")]
45
use serde::{Deserialize, Serialize};
46
47
use crate::RowIndex;
48
use crate::hive::materialize_hive_partitions;
49
use crate::mmap::MmapBytesReader;
50
use crate::predicates::PhysicalIoExpr;
51
use crate::prelude::*;
52
use crate::shared::{ArrowReader, finish_reader};
53
54
#[derive(Clone, Debug, PartialEq, Hash)]
55
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
56
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
57
pub struct IpcScanOptions {
58
/// Read StatisticsFlags from the record batch custom metadata.
59
#[cfg_attr(feature = "serde", serde(default))]
60
pub record_batch_statistics: bool,
61
#[cfg_attr(feature = "serde", serde(default))]
62
pub checked: UnsafeBool,
63
}
64
65
#[expect(clippy::derivable_impls)]
66
impl Default for IpcScanOptions {
67
fn default() -> Self {
68
Self {
69
record_batch_statistics: false,
70
checked: Default::default(),
71
}
72
}
73
}
74
75
/// Read Arrows IPC format into a DataFrame
76
///
77
/// # Example
78
/// ```
79
/// use polars_core::prelude::*;
80
/// use std::fs::File;
81
/// use polars_io::ipc::IpcReader;
82
/// use polars_io::SerReader;
83
///
84
/// fn example() -> PolarsResult<DataFrame> {
85
/// let file = File::open("file.ipc").expect("file not found");
86
///
87
/// IpcReader::new(file)
88
/// .finish()
89
/// }
90
/// ```
91
#[must_use]
92
pub struct IpcReader<R: MmapBytesReader> {
93
/// File or Stream object
94
pub(super) reader: R,
95
/// Aggregates chunks afterwards to a single chunk.
96
rechunk: bool,
97
pub(super) n_rows: Option<usize>,
98
pub(super) projection: Option<Vec<usize>>,
99
pub(crate) columns: Option<Vec<String>>,
100
hive_partition_columns: Option<Vec<Series>>,
101
include_file_path: Option<(PlSmallStr, PlRefStr)>,
102
pub(super) row_index: Option<RowIndex>,
103
// Stores the as key semaphore to make sure we don't write to the memory mapped file.
104
pub(super) memory_map: Option<PathBuf>,
105
metadata: Option<read::FileMetadata>,
106
schema: Option<ArrowSchemaRef>,
107
}
108
109
fn check_mmap_err(err: PolarsError) -> PolarsResult<()> {
110
if let PolarsError::ComputeError(s) = &err {
111
if s.as_ref() == "memory_map can only be done on uncompressed IPC files" {
112
eprintln!(
113
"Could not memory_map compressed IPC file, defaulting to normal read. \
114
Toggle off 'memory_map' to silence this warning."
115
);
116
return Ok(());
117
}
118
}
119
Err(err)
120
}
121
122
impl<R: MmapBytesReader> IpcReader<R> {
123
fn get_metadata(&mut self) -> PolarsResult<&read::FileMetadata> {
124
if self.metadata.is_none() {
125
let metadata = read::read_file_metadata(&mut self.reader)?;
126
self.schema = Some(metadata.schema.clone());
127
self.metadata = Some(metadata);
128
}
129
Ok(self.metadata.as_ref().unwrap())
130
}
131
132
/// Get arrow schema of the Ipc File.
133
pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
134
self.get_metadata()?;
135
Ok(self.schema.as_ref().unwrap().clone())
136
}
137
138
/// Get schema-level custom metadata of the Ipc file
139
pub fn custom_metadata(&mut self) -> PolarsResult<Option<Arc<Metadata>>> {
140
self.get_metadata()?;
141
Ok(self
142
.metadata
143
.as_ref()
144
.and_then(|meta| meta.custom_schema_metadata.clone()))
145
}
146
147
/// Stop reading when `n` rows are read.
148
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
149
self.n_rows = num_rows;
150
self
151
}
152
153
/// Columns to select/ project
154
pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
155
self.columns = columns;
156
self
157
}
158
159
pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
160
self.hive_partition_columns = columns;
161
self
162
}
163
164
pub fn with_include_file_path(
165
mut self,
166
include_file_path: Option<(PlSmallStr, PlRefStr)>,
167
) -> Self {
168
self.include_file_path = include_file_path;
169
self
170
}
171
172
/// Add a row index column.
173
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
174
self.row_index = row_index;
175
self
176
}
177
178
/// Set the reader's column projection. This counts from 0, meaning that
179
/// `vec![0, 4]` would select the 1st and 5th column.
180
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
181
self.projection = projection;
182
self
183
}
184
185
/// Set if the file is to be memory_mapped. Only works with uncompressed files.
186
/// The file name must be passed to register the memory mapped file.
187
pub fn memory_mapped(mut self, path_buf: Option<PathBuf>) -> Self {
188
self.memory_map = path_buf;
189
self
190
}
191
192
// todo! hoist to lazy crate
193
#[cfg(feature = "lazy")]
194
pub fn finish_with_scan_ops(
195
mut self,
196
predicate: Option<Arc<dyn PhysicalIoExpr>>,
197
verbose: bool,
198
) -> PolarsResult<DataFrame> {
199
if self.memory_map.is_some() && self.reader.to_file().is_some() {
200
if verbose {
201
eprintln!("memory map ipc file")
202
}
203
match self.finish_memmapped(predicate.clone()) {
204
Ok(df) => return Ok(df),
205
Err(err) => check_mmap_err(err)?,
206
}
207
}
208
let rechunk = self.rechunk;
209
let metadata = read::read_file_metadata(&mut self.reader)?;
210
211
// NOTE: For some code paths this already happened. See
212
// https://github.com/pola-rs/polars/pull/14984#discussion_r1520125000
213
// where this was introduced.
214
if let Some(columns) = &self.columns {
215
self.projection = Some(columns_to_projection(columns, &metadata.schema)?);
216
}
217
218
let schema = if let Some(projection) = &self.projection {
219
Arc::new(apply_projection(&metadata.schema, projection))
220
} else {
221
metadata.schema.clone()
222
};
223
224
let reader = read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
225
226
finish_reader(reader, rechunk, None, predicate, &schema, self.row_index)
227
}
228
}
229
230
impl<R: MmapBytesReader> ArrowReader for read::FileReader<R>
231
where
232
R: Read + Seek,
233
{
234
fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
235
self.next().map_or(Ok(None), |v| v.map(Some))
236
}
237
}
238
239
impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
240
fn new(reader: R) -> Self {
241
IpcReader {
242
reader,
243
rechunk: true,
244
n_rows: None,
245
columns: None,
246
hive_partition_columns: None,
247
include_file_path: None,
248
projection: None,
249
row_index: None,
250
memory_map: None,
251
metadata: None,
252
schema: None,
253
}
254
}
255
256
fn set_rechunk(mut self, rechunk: bool) -> Self {
257
self.rechunk = rechunk;
258
self
259
}
260
261
fn finish(mut self) -> PolarsResult<DataFrame> {
262
let reader_schema = if let Some(ref schema) = self.schema {
263
schema.clone()
264
} else {
265
self.get_metadata()?.schema.clone()
266
};
267
let reader_schema = reader_schema.as_ref();
268
269
let hive_partition_columns = self.hive_partition_columns.take();
270
let include_file_path = self.include_file_path.take();
271
272
// In case only hive columns are projected, the df would be empty, but we need the row count
273
// of the file in order to project the correct number of rows for the hive columns.
274
let mut df = (|| {
275
if self.projection.as_ref().is_some_and(|x| x.is_empty()) {
276
let row_count = if let Some(v) = self.n_rows {
277
v
278
} else {
279
get_row_count(&mut self.reader)? as usize
280
};
281
let mut df = DataFrame::empty_with_height(row_count);
282
283
if let Some(ri) = &self.row_index {
284
unsafe { df.with_row_index_mut(ri.name.clone(), Some(ri.offset)) };
285
}
286
return PolarsResult::Ok(df);
287
}
288
289
if self.memory_map.is_some() && self.reader.to_file().is_some() {
290
match self.finish_memmapped(None) {
291
Ok(df) => {
292
return Ok(df);
293
},
294
Err(err) => check_mmap_err(err)?,
295
}
296
}
297
let rechunk = self.rechunk;
298
let schema = self.get_metadata()?.schema.clone();
299
300
if let Some(columns) = &self.columns {
301
let prj = columns_to_projection(columns, schema.as_ref())?;
302
self.projection = Some(prj);
303
}
304
305
let schema = if let Some(projection) = &self.projection {
306
Arc::new(apply_projection(schema.as_ref(), projection))
307
} else {
308
schema
309
};
310
311
let metadata = self.get_metadata()?.clone();
312
313
let ipc_reader =
314
read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
315
let df = finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)?;
316
Ok(df)
317
})()?;
318
319
if let Some(hive_cols) = hive_partition_columns {
320
materialize_hive_partitions(&mut df, reader_schema, Some(hive_cols.as_slice()));
321
};
322
323
if let Some((col, value)) = include_file_path {
324
unsafe {
325
df.push_column_unchecked(Column::new_scalar(
326
col,
327
Scalar::new(
328
DataType::String,
329
AnyValue::StringOwned(value.as_str().into()),
330
),
331
df.height(),
332
))
333
};
334
}
335
336
Ok(df)
337
}
338
}
339
340