Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/read/file.rs
6940 views
1
use std::io::{Read, Seek, SeekFrom};
2
use std::sync::Arc;
3
4
use arrow_format::ipc::FooterRef;
5
use arrow_format::ipc::planus::ReadAsRoot;
6
use polars_error::{PolarsResult, polars_bail, polars_err};
7
use polars_utils::aliases::{InitHashMaps, PlHashMap};
8
9
use super::super::{ARROW_MAGIC_V1, ARROW_MAGIC_V2, CONTINUATION_MARKER};
10
use super::common::*;
11
use super::schema::fb_to_schema;
12
use super::{Dictionaries, OutOfSpecKind, SendableIterator};
13
use crate::array::Array;
14
use crate::datatypes::{ArrowSchemaRef, Metadata};
15
use crate::io::ipc::IpcSchema;
16
use crate::record_batch::RecordBatchT;
17
18
/// Metadata of an Arrow IPC file, written in the footer of the file.
19
#[derive(Debug, Clone)]
20
pub struct FileMetadata {
21
/// The schema that is read from the file footer
22
pub schema: ArrowSchemaRef,
23
24
/// The custom metadata that is read from the schema
25
pub custom_schema_metadata: Option<Arc<Metadata>>,
26
27
/// The files' [`IpcSchema`]
28
pub ipc_schema: IpcSchema,
29
30
/// The blocks in the file
31
///
32
/// A block indicates the regions in the file to read to get data
33
pub blocks: Vec<arrow_format::ipc::Block>,
34
35
/// Dictionaries associated to each dict_id
36
pub(crate) dictionaries: Option<Vec<arrow_format::ipc::Block>>,
37
38
/// The total size of the file in bytes
39
pub size: u64,
40
}
41
42
/// Read the row count by summing the length of the of the record batches
43
pub fn get_row_count<R: Read + Seek>(reader: &mut R) -> PolarsResult<i64> {
44
let (_, footer_len) = read_footer_len(reader)?;
45
let footer = read_footer(reader, footer_len)?;
46
let (_, blocks) = deserialize_footer_blocks(&footer)?;
47
48
get_row_count_from_blocks(reader, &blocks)
49
}
50
51
/// Read the row count by summing the length of the of the record batches in blocks
52
pub fn get_row_count_from_blocks<R: Read + Seek>(
53
reader: &mut R,
54
blocks: &[arrow_format::ipc::Block],
55
) -> PolarsResult<i64> {
56
let mut message_scratch: Vec<u8> = Default::default();
57
58
blocks
59
.iter()
60
.map(|block| {
61
let message = get_message_from_block(reader, block, &mut message_scratch)?;
62
let record_batch = get_record_batch(message)?;
63
record_batch.length().map_err(|e| e.into())
64
})
65
.sum()
66
}
67
68
pub(crate) fn get_dictionary_batch<'a>(
69
message: &'a arrow_format::ipc::MessageRef,
70
) -> PolarsResult<arrow_format::ipc::DictionaryBatchRef<'a>> {
71
let header = message
72
.header()
73
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
74
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
75
match header {
76
arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => Ok(batch),
77
_ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
78
}
79
}
80
81
fn read_dictionary_block<R: Read + Seek>(
82
reader: &mut R,
83
metadata: &FileMetadata,
84
block: &arrow_format::ipc::Block,
85
dictionaries: &mut Dictionaries,
86
message_scratch: &mut Vec<u8>,
87
dictionary_scratch: &mut Vec<u8>,
88
) -> PolarsResult<()> {
89
let message = get_message_from_block(reader, block, message_scratch)?;
90
let batch = get_dictionary_batch(&message)?;
91
92
let offset: u64 = block
93
.offset
94
.try_into()
95
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
96
97
let length: u64 = block
98
.meta_data_length
99
.try_into()
100
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
101
102
read_dictionary(
103
batch,
104
&metadata.schema,
105
&metadata.ipc_schema,
106
dictionaries,
107
reader,
108
offset + length,
109
metadata.size,
110
dictionary_scratch,
111
)
112
}
113
114
/// Reads all file's dictionaries, if any
115
/// This function is IO-bounded
116
pub fn read_file_dictionaries<R: Read + Seek>(
117
reader: &mut R,
118
metadata: &FileMetadata,
119
scratch: &mut Vec<u8>,
120
) -> PolarsResult<Dictionaries> {
121
let mut dictionaries = Default::default();
122
123
let blocks = if let Some(blocks) = &metadata.dictionaries {
124
blocks
125
} else {
126
return Ok(PlHashMap::new());
127
};
128
// use a temporary smaller scratch for the messages
129
let mut message_scratch = Default::default();
130
131
for block in blocks {
132
read_dictionary_block(
133
reader,
134
metadata,
135
block,
136
&mut dictionaries,
137
&mut message_scratch,
138
scratch,
139
)?;
140
}
141
Ok(dictionaries)
142
}
143
144
pub(super) fn decode_footer_len(footer: [u8; 10], end: u64) -> PolarsResult<(u64, usize)> {
145
let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());
146
147
if footer[4..] != ARROW_MAGIC_V2 {
148
if footer[..4] == ARROW_MAGIC_V1 {
149
polars_bail!(ComputeError: "feather v1 not supported");
150
}
151
return Err(polars_err!(oos = OutOfSpecKind::InvalidFooter));
152
}
153
let footer_len = footer_len
154
.try_into()
155
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
156
157
Ok((end, footer_len))
158
}
159
160
/// Reads the footer's length and magic number in footer
161
fn read_footer_len<R: Read + Seek>(reader: &mut R) -> PolarsResult<(u64, usize)> {
162
// read footer length and magic number in footer
163
let end = reader.seek(SeekFrom::End(-10))? + 10;
164
165
let mut footer: [u8; 10] = [0; 10];
166
167
reader.read_exact(&mut footer)?;
168
decode_footer_len(footer, end)
169
}
170
171
fn read_footer<R: Read + Seek>(reader: &mut R, footer_len: usize) -> PolarsResult<Vec<u8>> {
172
// read footer
173
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
174
175
let mut serialized_footer = vec![];
176
serialized_footer.try_reserve(footer_len)?;
177
reader
178
.by_ref()
179
.take(footer_len as u64)
180
.read_to_end(&mut serialized_footer)?;
181
Ok(serialized_footer)
182
}
183
184
fn deserialize_footer_blocks(
185
footer_data: &[u8],
186
) -> PolarsResult<(FooterRef<'_>, Vec<arrow_format::ipc::Block>)> {
187
let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data)
188
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))?;
189
190
let blocks = footer
191
.record_batches()
192
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
193
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;
194
195
let blocks = blocks
196
.iter()
197
.map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref)))
198
.collect::<PolarsResult<Vec<_>>>()?;
199
Ok((footer, blocks))
200
}
201
202
pub(super) fn deserialize_footer_ref(footer_data: &[u8]) -> PolarsResult<FooterRef<'_>> {
203
arrow_format::ipc::FooterRef::read_as_root(footer_data)
204
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))
205
}
206
207
pub(super) fn deserialize_schema_ref_from_footer(
208
footer: arrow_format::ipc::FooterRef<'_>,
209
) -> PolarsResult<arrow_format::ipc::SchemaRef<'_>> {
210
footer
211
.schema()
212
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferSchema(err)))?
213
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingSchema))
214
}
215
216
/// Get the IPC blocks from the footer containing record batches
217
pub(super) fn iter_recordbatch_blocks_from_footer(
218
footer: arrow_format::ipc::FooterRef<'_>,
219
) -> PolarsResult<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_> {
220
let blocks = footer
221
.record_batches()
222
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
223
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;
224
225
Ok(blocks
226
.into_iter()
227
.map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref))))
228
}
229
230
pub(super) fn iter_dictionary_blocks_from_footer(
231
footer: arrow_format::ipc::FooterRef<'_>,
232
) -> PolarsResult<Option<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_>>
233
{
234
let dictionaries = footer
235
.dictionaries()
236
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferDictionaries(err)))?;
237
238
Ok(dictionaries.map(|dicts| {
239
dicts
240
.into_iter()
241
.map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref)))
242
}))
243
}
244
245
pub fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult<FileMetadata> {
246
let footer = deserialize_footer_ref(footer_data)?;
247
let blocks = iter_recordbatch_blocks_from_footer(footer)?.collect::<PolarsResult<Vec<_>>>()?;
248
let dictionaries = iter_dictionary_blocks_from_footer(footer)?
249
.map(|dicts| dicts.collect::<PolarsResult<Vec<_>>>())
250
.transpose()?;
251
let ipc_schema = deserialize_schema_ref_from_footer(footer)?;
252
let (schema, ipc_schema, custom_schema_metadata) = fb_to_schema(ipc_schema)?;
253
254
Ok(FileMetadata {
255
schema: Arc::new(schema),
256
ipc_schema,
257
blocks,
258
dictionaries,
259
size,
260
custom_schema_metadata: custom_schema_metadata.map(Arc::new),
261
})
262
}
263
264
/// Read the Arrow IPC file's metadata
265
pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> PolarsResult<FileMetadata> {
266
let start = reader.stream_position()?;
267
let (end, footer_len) = read_footer_len(reader)?;
268
let serialized_footer = read_footer(reader, footer_len)?;
269
deserialize_footer(&serialized_footer, end - start)
270
}
271
272
pub(crate) fn get_record_batch(
273
message: arrow_format::ipc::MessageRef,
274
) -> PolarsResult<arrow_format::ipc::RecordBatchRef> {
275
let header = message
276
.header()
277
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
278
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
279
match header {
280
arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => Ok(batch),
281
_ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
282
}
283
}
284
285
fn get_message_from_block_offset<'a, R: Read + Seek>(
286
reader: &mut R,
287
offset: u64,
288
message_scratch: &'a mut Vec<u8>,
289
) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
290
// read length
291
reader.seek(SeekFrom::Start(offset))?;
292
let mut meta_buf = [0; 4];
293
reader.read_exact(&mut meta_buf)?;
294
if meta_buf == CONTINUATION_MARKER {
295
// continuation marker encountered, read message next
296
reader.read_exact(&mut meta_buf)?;
297
}
298
let meta_len = i32::from_le_bytes(meta_buf)
299
.try_into()
300
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
301
302
message_scratch.clear();
303
message_scratch.try_reserve(meta_len)?;
304
reader
305
.by_ref()
306
.take(meta_len as u64)
307
.read_to_end(message_scratch)?;
308
309
arrow_format::ipc::MessageRef::read_as_root(message_scratch)
310
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))
311
}
312
313
pub(super) fn get_message_from_block<'a, R: Read + Seek>(
314
reader: &mut R,
315
block: &arrow_format::ipc::Block,
316
message_scratch: &'a mut Vec<u8>,
317
) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
318
let offset: u64 = block
319
.offset
320
.try_into()
321
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
322
323
get_message_from_block_offset(reader, offset, message_scratch)
324
}
325
326
/// Reads the record batch at position `index` from the reader.
327
///
328
/// This function is useful for random access to the file. For example, if
329
/// you have indexed the file somewhere else, this allows pruning
330
/// certain parts of the file.
331
/// # Panics
332
/// This function panics iff `index >= metadata.blocks.len()`
333
#[allow(clippy::too_many_arguments)]
334
pub fn read_batch<R: Read + Seek>(
335
reader: &mut R,
336
dictionaries: &Dictionaries,
337
metadata: &FileMetadata,
338
projection: Option<&[usize]>,
339
limit: Option<usize>,
340
index: usize,
341
message_scratch: &mut Vec<u8>,
342
data_scratch: &mut Vec<u8>,
343
) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
344
let block = metadata.blocks[index];
345
346
let offset: u64 = block
347
.offset
348
.try_into()
349
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
350
351
let length: u64 = block
352
.meta_data_length
353
.try_into()
354
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
355
356
let message = get_message_from_block_offset(reader, offset, message_scratch)?;
357
let batch = get_record_batch(message)?;
358
359
read_record_batch(
360
batch,
361
&metadata.schema,
362
&metadata.ipc_schema,
363
projection,
364
limit,
365
dictionaries,
366
message
367
.version()
368
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferVersion(err)))?,
369
reader,
370
offset + length,
371
metadata.size,
372
data_scratch,
373
)
374
}
375
376