Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/metadata_utils.rs
6939 views
1
use polars_error::PolarsResult;
2
use polars_io::utils::byte_source::{ByteSource, DynByteSource};
3
use polars_utils::mmap::MemSlice;
4
5
/// Read the metadata bytes of a parquet file, does not decode the bytes. If during metadata fetch
6
/// the bytes of the entire file are loaded, it is returned in the second return value.
7
pub async fn read_parquet_metadata_bytes(
8
byte_source: &DynByteSource,
9
verbose: bool,
10
) -> PolarsResult<(MemSlice, Option<MemSlice>)> {
11
use polars_parquet::parquet::PARQUET_MAGIC;
12
use polars_parquet::parquet::error::ParquetError;
13
14
const FOOTER_HEADER_SIZE: usize = polars_parquet::parquet::FOOTER_SIZE as usize;
15
16
let file_size = byte_source.get_size().await?;
17
18
if file_size < FOOTER_HEADER_SIZE {
19
return Err(ParquetError::OutOfSpec(format!(
20
"file size ({file_size}) is less than minimum size required to store parquet footer ({FOOTER_HEADER_SIZE})"
21
))
22
.into());
23
}
24
25
let estimated_metadata_size = if let DynByteSource::MemSlice(_) = byte_source {
26
// Mmapped or in-memory, reads are free.
27
file_size
28
} else {
29
(file_size / 2048).clamp(16_384, 131_072).min(file_size)
30
};
31
32
let bytes = byte_source
33
.get_range((file_size - estimated_metadata_size)..file_size)
34
.await?;
35
36
let footer_header_bytes = bytes.slice((bytes.len() - FOOTER_HEADER_SIZE)..bytes.len());
37
38
let (v, remaining) = footer_header_bytes.split_at(4);
39
let footer_size = u32::from_le_bytes(v.try_into().unwrap());
40
41
if remaining != PARQUET_MAGIC {
42
return Err(ParquetError::OutOfSpec(format!(
43
r#"expected parquet magic bytes "{}" in footer, got "{}" instead"#,
44
std::str::from_utf8(&PARQUET_MAGIC).unwrap(),
45
String::from_utf8_lossy(remaining)
46
))
47
.into());
48
}
49
50
let footer_size = footer_size as usize + FOOTER_HEADER_SIZE;
51
52
if file_size < footer_size {
53
return Err(ParquetError::OutOfSpec(format!(
54
"file size ({file_size}) is less than the indicated footer size ({footer_size})"
55
))
56
.into());
57
}
58
59
if bytes.len() < footer_size {
60
debug_assert!(!matches!(byte_source, DynByteSource::MemSlice(_)));
61
if verbose {
62
eprintln!(
63
"[ParquetFileReader]: Extra {} bytes need to be fetched for metadata \
64
(initial estimate = {}, actual size = {})",
65
footer_size - estimated_metadata_size,
66
bytes.len(),
67
footer_size,
68
);
69
}
70
71
let mut out = Vec::with_capacity(footer_size);
72
let offset = file_size - footer_size;
73
let len = footer_size - bytes.len();
74
let delta_bytes = byte_source.get_range(offset..(offset + len)).await?;
75
76
debug_assert!(out.capacity() >= delta_bytes.len() + bytes.len());
77
78
out.extend_from_slice(&delta_bytes);
79
out.extend_from_slice(&bytes);
80
81
Ok((MemSlice::from_vec(out), None))
82
} else {
83
if verbose && !matches!(byte_source, DynByteSource::MemSlice(_)) {
84
eprintln!(
85
"[ParquetFileReader]: Fetched all bytes for metadata on first try \
86
(initial estimate = {}, actual size = {}, excess = {}, total file size = {})",
87
bytes.len(),
88
footer_size,
89
estimated_metadata_size - footer_size,
90
file_size,
91
);
92
}
93
94
let metadata_bytes = bytes.slice((bytes.len() - footer_size)..bytes.len());
95
96
if bytes.len() == file_size {
97
Ok((metadata_bytes, Some(bytes)))
98
} else {
99
debug_assert!(!matches!(byte_source, DynByteSource::MemSlice(_)));
100
let metadata_bytes = if bytes.len() - footer_size >= bytes.len() {
101
// Re-allocate to drop the excess bytes
102
MemSlice::from_vec(metadata_bytes.to_vec())
103
} else {
104
metadata_bytes
105
};
106
107
Ok((metadata_bytes, None))
108
}
109
}
110
}
111
112