Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/read/stream.rs
6940 views
1
use std::io::SeekFrom;
2
3
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
4
5
use super::super::metadata::FileMetadata;
6
use super::super::{DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, PARQUET_MAGIC};
7
use super::metadata::{deserialize_metadata, metadata_len};
8
use crate::parquet::HEADER_SIZE;
9
use crate::parquet::error::{ParquetError, ParquetResult};
10
11
async fn stream_len(
12
seek: &mut (impl AsyncSeek + std::marker::Unpin),
13
) -> std::result::Result<u64, std::io::Error> {
14
let old_pos = seek.seek(SeekFrom::Current(0)).await?;
15
let len = seek.seek(SeekFrom::End(0)).await?;
16
17
// Avoid seeking a third time when we were already at the end of the
18
// stream. The branch is usually way cheaper than a seek operation.
19
if old_pos != len {
20
seek.seek(SeekFrom::Start(old_pos)).await?;
21
}
22
23
Ok(len)
24
}
25
26
/// Asynchronously reads the files' metadata
27
pub async fn read_metadata<R: AsyncRead + AsyncSeek + Send + std::marker::Unpin>(
28
reader: &mut R,
29
) -> ParquetResult<FileMetadata> {
30
let file_size = stream_len(reader).await?;
31
32
if file_size < HEADER_SIZE + FOOTER_SIZE {
33
return Err(ParquetError::oos(
34
"A parquet file must contain a header and footer with at least 12 bytes",
35
));
36
}
37
38
// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
39
let default_end_len = std::cmp::min(DEFAULT_FOOTER_READ_SIZE, file_size) as usize;
40
reader
41
.seek(SeekFrom::End(-(default_end_len as i64)))
42
.await?;
43
44
let mut buffer = vec![];
45
buffer.try_reserve(default_end_len)?;
46
reader
47
.take(default_end_len as u64)
48
.read_to_end(&mut buffer)
49
.await?;
50
51
// check this is indeed a parquet file
52
if buffer[default_end_len - 4..] != PARQUET_MAGIC {
53
return Err(ParquetError::oos("Invalid Parquet file. Corrupt footer"));
54
}
55
56
let metadata_len: u32 = metadata_len(&buffer, default_end_len);
57
let metadata_len: u64 = metadata_len as u64;
58
59
let footer_len = FOOTER_SIZE + metadata_len;
60
if footer_len > file_size {
61
return Err(ParquetError::oos(
62
"The footer size must be smaller or equal to the file's size",
63
));
64
}
65
66
let reader = if (footer_len as usize) < buffer.len() {
67
// the whole metadata is in the bytes we already read
68
let remaining = buffer.len() - footer_len as usize;
69
&buffer[remaining..]
70
} else {
71
// the end of file read by default is not long enough, read again including the metadata.
72
reader.seek(SeekFrom::End(-(footer_len as i64))).await?;
73
74
buffer.clear();
75
buffer.try_reserve(footer_len as usize)?;
76
reader
77
.take(footer_len as u64)
78
.read_to_end(&mut buffer)
79
.await?;
80
81
&buffer
82
};
83
84
// a highly nested but sparse struct could result in many allocations
85
let max_size = reader.len() * 2 + 1024;
86
87
deserialize_metadata(reader, max_size)
88
}
89
90