Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/read/file.rs
8449 views
1
use std::io::{Read, Seek, SeekFrom};
2
use std::sync::Arc;
3
4
use arrow_format::ipc::FooterRef;
5
use arrow_format::ipc::planus::ReadAsRoot;
6
use polars_error::{PolarsResult, polars_bail, polars_err};
7
use polars_utils::aliases::{InitHashMaps, PlHashMap};
8
use polars_utils::bool::UnsafeBool;
9
10
use super::super::{ARROW_MAGIC_V1, ARROW_MAGIC_V2, CONTINUATION_MARKER};
11
use super::common::*;
12
use super::schema::fb_to_schema;
13
use super::{Dictionaries, OutOfSpecKind, SendableIterator};
14
use crate::array::Array;
15
use crate::datatypes::{ArrowSchemaRef, Metadata};
16
use crate::io::ipc::IpcSchema;
17
use crate::record_batch::RecordBatchT;
18
19
/// Metadata of an Arrow IPC file, written in the footer of the file.
20
#[derive(Debug, Clone)]
21
pub struct FileMetadata {
22
/// The schema that is read from the file footer
23
pub schema: ArrowSchemaRef,
24
25
/// The custom metadata that is read from the schema
26
pub custom_schema_metadata: Option<Arc<Metadata>>,
27
28
/// The files' [`IpcSchema`]
29
pub ipc_schema: IpcSchema,
30
31
/// The blocks in the file
32
///
33
/// A block indicates the regions in the file to read to get data
34
pub blocks: Vec<arrow_format::ipc::Block>,
35
36
/// Dictionaries associated to each dict_id
37
pub dictionaries: Option<Vec<arrow_format::ipc::Block>>,
38
39
/// The total size of the file in bytes
40
pub size: u64,
41
}
42
43
/// Read the row count by summing the length of the of the record batches
44
pub fn get_row_count<R: Read + Seek>(reader: &mut R) -> PolarsResult<i64> {
45
let (_, footer_len) = read_footer_len(reader)?;
46
let footer = read_footer(reader, footer_len)?;
47
let (_, blocks) = deserialize_footer_blocks(&footer)?;
48
49
get_row_count_from_blocks(reader, &blocks)
50
}
51
52
/// Read the row count by summing the length of the of the record batches in blocks
53
pub fn get_row_count_from_blocks<R: Read + Seek>(
54
reader: &mut R,
55
blocks: &[arrow_format::ipc::Block],
56
) -> PolarsResult<i64> {
57
let mut message_scratch: Vec<u8> = Default::default();
58
59
blocks
60
.iter()
61
.map(|block| {
62
let message = get_message_from_block(reader, block, &mut message_scratch)?;
63
let record_batch = get_record_batch(message)?;
64
record_batch.length().map_err(|e| e.into())
65
})
66
.sum()
67
}
68
69
pub(crate) fn get_dictionary_batch<'a>(
70
message: &'a arrow_format::ipc::MessageRef,
71
) -> PolarsResult<arrow_format::ipc::DictionaryBatchRef<'a>> {
72
let header = message
73
.header()
74
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
75
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
76
match header {
77
arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => Ok(batch),
78
_ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
79
}
80
}
81
82
#[allow(clippy::too_many_arguments)]
83
pub fn read_dictionary_block<R: Read + Seek>(
84
reader: &mut R,
85
metadata: &FileMetadata,
86
block: &arrow_format::ipc::Block,
87
// When true, the underlying reader bytestream represents a standalone IPC Block
88
// rather than a complete IPC File.
89
force_zero_offset: bool,
90
dictionaries: &mut Dictionaries,
91
message_scratch: &mut Vec<u8>,
92
dictionary_scratch: &mut Vec<u8>,
93
checked: UnsafeBool,
94
) -> PolarsResult<()> {
95
let offset: u64 = if force_zero_offset {
96
0
97
} else {
98
block
99
.offset
100
.try_into()
101
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?
102
};
103
104
let length: u64 = block
105
.meta_data_length
106
.try_into()
107
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
108
109
let message = get_message_from_block_offset(reader, offset, message_scratch)?;
110
let batch = get_dictionary_batch(&message)?;
111
112
read_dictionary(
113
batch,
114
&metadata.schema,
115
&metadata.ipc_schema,
116
dictionaries,
117
reader,
118
offset + length,
119
dictionary_scratch,
120
checked,
121
)
122
}
123
124
/// Reads all file's dictionaries, if any
125
/// This function is IO-bounded
126
pub fn read_file_dictionaries<R: Read + Seek>(
127
reader: &mut R,
128
metadata: &FileMetadata,
129
scratch: &mut Vec<u8>,
130
checked: UnsafeBool,
131
) -> PolarsResult<Dictionaries> {
132
let mut dictionaries = Default::default();
133
134
let blocks = if let Some(blocks) = &metadata.dictionaries {
135
blocks
136
} else {
137
return Ok(PlHashMap::new());
138
};
139
// use a temporary smaller scratch for the messages
140
let mut message_scratch = Default::default();
141
142
for block in blocks {
143
read_dictionary_block(
144
reader,
145
metadata,
146
block,
147
false,
148
&mut dictionaries,
149
&mut message_scratch,
150
scratch,
151
checked,
152
)?;
153
}
154
Ok(dictionaries)
155
}
156
157
pub(super) fn decode_footer_len(footer: [u8; 10], end: u64) -> PolarsResult<(u64, usize)> {
158
let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());
159
160
if footer[4..] != ARROW_MAGIC_V2 {
161
if footer[..4] == ARROW_MAGIC_V1 {
162
polars_bail!(ComputeError: "feather v1 not supported");
163
}
164
return Err(polars_err!(oos = OutOfSpecKind::InvalidFooter));
165
}
166
let footer_len = footer_len
167
.try_into()
168
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
169
170
Ok((end, footer_len))
171
}
172
173
/// Reads the footer's length and magic number in footer
174
fn read_footer_len<R: Read + Seek>(reader: &mut R) -> PolarsResult<(u64, usize)> {
175
// read footer length and magic number in footer
176
let end = reader.seek(SeekFrom::End(-10))? + 10;
177
178
let mut footer: [u8; 10] = [0; 10];
179
180
reader.read_exact(&mut footer)?;
181
decode_footer_len(footer, end)
182
}
183
184
fn read_footer<R: Read + Seek>(reader: &mut R, footer_len: usize) -> PolarsResult<Vec<u8>> {
185
// read footer
186
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
187
188
let mut serialized_footer = vec![];
189
serialized_footer.try_reserve(footer_len)?;
190
reader
191
.by_ref()
192
.take(footer_len as u64)
193
.read_to_end(&mut serialized_footer)?;
194
Ok(serialized_footer)
195
}
196
197
fn deserialize_footer_blocks(
198
footer_data: &[u8],
199
) -> PolarsResult<(FooterRef<'_>, Vec<arrow_format::ipc::Block>)> {
200
let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data)
201
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))?;
202
203
let blocks = footer
204
.record_batches()
205
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
206
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;
207
208
let blocks = blocks
209
.iter()
210
.map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref)))
211
.collect::<PolarsResult<Vec<_>>>()?;
212
Ok((footer, blocks))
213
}
214
215
pub(super) fn deserialize_footer_ref(footer_data: &[u8]) -> PolarsResult<FooterRef<'_>> {
216
arrow_format::ipc::FooterRef::read_as_root(footer_data)
217
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))
218
}
219
220
pub(super) fn deserialize_schema_ref_from_footer(
221
footer: arrow_format::ipc::FooterRef<'_>,
222
) -> PolarsResult<arrow_format::ipc::SchemaRef<'_>> {
223
footer
224
.schema()
225
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferSchema(err)))?
226
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingSchema))
227
}
228
229
/// Get the IPC blocks from the footer containing record batches
230
pub(super) fn iter_recordbatch_blocks_from_footer(
231
footer: arrow_format::ipc::FooterRef<'_>,
232
) -> PolarsResult<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_> {
233
let blocks = footer
234
.record_batches()
235
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
236
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;
237
238
Ok(blocks
239
.into_iter()
240
.map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref))))
241
}
242
243
pub(super) fn iter_dictionary_blocks_from_footer(
244
footer: arrow_format::ipc::FooterRef<'_>,
245
) -> PolarsResult<Option<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_>>
246
{
247
let dictionaries = footer
248
.dictionaries()
249
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferDictionaries(err)))?;
250
251
Ok(dictionaries.map(|dicts| {
252
dicts
253
.into_iter()
254
.map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref)))
255
}))
256
}
257
258
pub fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult<FileMetadata> {
259
let footer = deserialize_footer_ref(footer_data)?;
260
let blocks = iter_recordbatch_blocks_from_footer(footer)?.collect::<PolarsResult<Vec<_>>>()?;
261
let dictionaries = iter_dictionary_blocks_from_footer(footer)?
262
.map(|dicts| dicts.collect::<PolarsResult<Vec<_>>>())
263
.transpose()?;
264
let ipc_schema = deserialize_schema_ref_from_footer(footer)?;
265
let (schema, ipc_schema, custom_schema_metadata) = fb_to_schema(ipc_schema)?;
266
267
Ok(FileMetadata {
268
schema: Arc::new(schema),
269
ipc_schema,
270
blocks,
271
dictionaries,
272
size,
273
custom_schema_metadata: custom_schema_metadata.map(Arc::new),
274
})
275
}
276
277
/// Read the Arrow IPC file's metadata
278
pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> PolarsResult<FileMetadata> {
279
let start = reader.stream_position()?;
280
let (end, footer_len) = read_footer_len(reader)?;
281
let serialized_footer = read_footer(reader, footer_len)?;
282
deserialize_footer(&serialized_footer, end - start)
283
}
284
285
pub(crate) fn get_record_batch(
286
message: arrow_format::ipc::MessageRef,
287
) -> PolarsResult<arrow_format::ipc::RecordBatchRef> {
288
let header = message
289
.header()
290
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
291
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
292
match header {
293
arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => Ok(batch),
294
_ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
295
}
296
}
297
298
pub fn get_message_from_block_offset<'a, R: Read + Seek>(
299
reader: &mut R,
300
offset: u64,
301
message_scratch: &'a mut Vec<u8>,
302
) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
303
reader.seek(SeekFrom::Start(offset))?;
304
let mut meta_buf = [0; 4];
305
reader.read_exact(&mut meta_buf)?;
306
if meta_buf == CONTINUATION_MARKER {
307
// continuation marker encountered, read message next
308
reader.read_exact(&mut meta_buf)?;
309
}
310
311
let meta_len = i32::from_le_bytes(meta_buf)
312
.try_into()
313
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
314
315
message_scratch.clear();
316
message_scratch.try_reserve(meta_len)?;
317
reader
318
.by_ref()
319
.take(meta_len as u64)
320
.read_to_end(message_scratch)?;
321
322
arrow_format::ipc::MessageRef::read_as_root(message_scratch)
323
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))
324
}
325
326
pub(super) fn get_message_from_block<'a, R: Read + Seek>(
327
reader: &mut R,
328
block: &arrow_format::ipc::Block,
329
message_scratch: &'a mut Vec<u8>,
330
) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
331
let offset: u64 = block
332
.offset
333
.try_into()
334
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
335
336
get_message_from_block_offset(reader, offset, message_scratch)
337
}
338
339
/// Reads the record batch at position `index` from the reader.
340
///
341
/// This function is useful for random access to the file. For example, if
342
/// you have indexed the file somewhere else, this allows pruning
343
/// certain parts of the file.
344
/// # Panics
345
/// This function panics iff `index >= metadata.blocks.len()`
346
#[allow(clippy::too_many_arguments)]
347
pub fn read_batch<R: Read + Seek>(
348
reader: &mut R,
349
dictionaries: &Dictionaries,
350
metadata: &FileMetadata,
351
projection: Option<&[usize]>,
352
limit: Option<usize>,
353
index: usize,
354
// When true, the reader object is handled as an IPC Block.
355
force_zero_offset: bool,
356
message_scratch: &mut Vec<u8>,
357
data_scratch: &mut Vec<u8>,
358
checked: UnsafeBool,
359
) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
360
let block = metadata.blocks[index];
361
362
let offset: u64 = if force_zero_offset {
363
0
364
} else {
365
block
366
.offset
367
.try_into()
368
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?
369
};
370
371
let length: u64 = block
372
.meta_data_length
373
.try_into()
374
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
375
376
let message = get_message_from_block_offset(reader, offset, message_scratch)?;
377
let batch = get_record_batch(message)?;
378
379
read_record_batch(
380
batch,
381
&metadata.schema,
382
&metadata.ipc_schema,
383
projection,
384
limit,
385
dictionaries,
386
message
387
.version()
388
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferVersion(err)))?,
389
reader,
390
offset + length,
391
data_scratch,
392
checked,
393
)
394
}
395
396