Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/read/metadata.rs
6940 views
1
use std::cmp::min;
2
use std::io::{Read, Seek, SeekFrom};
3
4
use polars_parquet_format::FileMetaData as TFileMetadata;
5
use polars_parquet_format::thrift::protocol::TCompactInputProtocol;
6
7
use super::super::metadata::FileMetadata;
8
use super::super::{DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, HEADER_SIZE, PARQUET_MAGIC};
9
use crate::parquet::error::{ParquetError, ParquetResult};
10
11
pub(super) fn metadata_len(buffer: &[u8], len: usize) -> u32 {
12
u32::from_le_bytes(buffer[len - 8..len - 4].try_into().unwrap())
13
}
14
15
// see (unstable) Seek::stream_len
16
fn stream_len(seek: &mut impl Seek) -> std::result::Result<u64, std::io::Error> {
17
let old_pos = seek.stream_position()?;
18
let len = seek.seek(SeekFrom::End(0))?;
19
20
// Avoid seeking a third time when we were already at the end of the
21
// stream. The branch is usually way cheaper than a seek operation.
22
if old_pos != len {
23
seek.seek(SeekFrom::Start(old_pos))?;
24
}
25
26
Ok(len)
27
}
28
29
/// Reads a [`FileMetadata`] from the reader, located at the end of the file.
30
pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> ParquetResult<FileMetadata> {
31
// check file is large enough to hold footer
32
let file_size = stream_len(reader)?;
33
read_metadata_with_size(reader, file_size)
34
}
35
36
/// Reads a [`FileMetadata`] from the reader, located at the end of the file, with known file size.
37
pub fn read_metadata_with_size<R: Read + Seek>(
38
reader: &mut R,
39
file_size: u64,
40
) -> ParquetResult<FileMetadata> {
41
if file_size < HEADER_SIZE + FOOTER_SIZE {
42
return Err(ParquetError::oos(
43
"A parquet file must contain a header and footer with at least 12 bytes",
44
));
45
}
46
47
// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
48
let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, file_size) as usize;
49
reader.seek(SeekFrom::End(-(default_end_len as i64)))?;
50
51
let mut buffer = Vec::with_capacity(default_end_len);
52
reader
53
.by_ref()
54
.take(default_end_len as u64)
55
.read_to_end(&mut buffer)?;
56
57
// check this is indeed a parquet file
58
if buffer[default_end_len - 4..] != PARQUET_MAGIC {
59
return Err(ParquetError::oos("The file must end with PAR1"));
60
}
61
62
let metadata_len: u32 = metadata_len(&buffer, default_end_len);
63
let metadata_len: u64 = metadata_len as u64;
64
65
let footer_len = FOOTER_SIZE + metadata_len;
66
if footer_len > file_size {
67
return Err(ParquetError::oos(
68
"The footer size must be smaller or equal to the file's size",
69
));
70
}
71
72
let reader: &[u8] = if (footer_len as usize) < buffer.len() {
73
// the whole metadata is in the bytes we already read
74
let remaining = buffer.len() - footer_len as usize;
75
&buffer[remaining..]
76
} else {
77
// the end of file read by default is not long enough, read again including the metadata.
78
reader.seek(SeekFrom::End(-(footer_len as i64)))?;
79
80
buffer.clear();
81
buffer.try_reserve(footer_len as usize)?;
82
reader.take(footer_len).read_to_end(&mut buffer)?;
83
84
&buffer
85
};
86
87
// a highly nested but sparse struct could result in many allocations
88
let max_size = reader.len() * 2 + 1024;
89
90
deserialize_metadata(reader, max_size)
91
}
92
93
/// Parse loaded metadata bytes
94
pub fn deserialize_metadata<R: Read>(reader: R, max_size: usize) -> ParquetResult<FileMetadata> {
95
let mut prot = TCompactInputProtocol::new(reader, max_size);
96
let metadata = TFileMetadata::read_from_in_protocol(&mut prot)?;
97
98
FileMetadata::try_from_thrift(metadata)
99
}
100
101