Path: blob/main/crates/polars-parquet/src/parquet/read/stream.rs
6940 views
use std::io::SeekFrom;12use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};34use super::super::metadata::FileMetadata;5use super::super::{DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, PARQUET_MAGIC};6use super::metadata::{deserialize_metadata, metadata_len};7use crate::parquet::HEADER_SIZE;8use crate::parquet::error::{ParquetError, ParquetResult};910async fn stream_len(11seek: &mut (impl AsyncSeek + std::marker::Unpin),12) -> std::result::Result<u64, std::io::Error> {13let old_pos = seek.seek(SeekFrom::Current(0)).await?;14let len = seek.seek(SeekFrom::End(0)).await?;1516// Avoid seeking a third time when we were already at the end of the17// stream. The branch is usually way cheaper than a seek operation.18if old_pos != len {19seek.seek(SeekFrom::Start(old_pos)).await?;20}2122Ok(len)23}2425/// Asynchronously reads the files' metadata26pub async fn read_metadata<R: AsyncRead + AsyncSeek + Send + std::marker::Unpin>(27reader: &mut R,28) -> ParquetResult<FileMetadata> {29let file_size = stream_len(reader).await?;3031if file_size < HEADER_SIZE + FOOTER_SIZE {32return Err(ParquetError::oos(33"A parquet file must contain a header and footer with at least 12 bytes",34));35}3637// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer38let default_end_len = std::cmp::min(DEFAULT_FOOTER_READ_SIZE, file_size) as usize;39reader40.seek(SeekFrom::End(-(default_end_len as i64)))41.await?;4243let mut buffer = vec![];44buffer.try_reserve(default_end_len)?;45reader46.take(default_end_len as u64)47.read_to_end(&mut buffer)48.await?;4950// check this is indeed a parquet file51if buffer[default_end_len - 4..] != PARQUET_MAGIC {52return Err(ParquetError::oos("Invalid Parquet file. Corrupt footer"));53}5455let metadata_len: u32 = metadata_len(&buffer, default_end_len);56let metadata_len: u64 = metadata_len as u64;5758let footer_len = FOOTER_SIZE + metadata_len;59if footer_len > file_size {60return Err(ParquetError::oos(61"The footer size must be smaller or equal to the file's size",62));63}6465let reader = if (footer_len as usize) < buffer.len() {66// the whole metadata is in the bytes we already read67let remaining = buffer.len() - footer_len as usize;68&buffer[remaining..]69} else {70// the end of file read by default is not long enough, read again including the metadata.71reader.seek(SeekFrom::End(-(footer_len as i64))).await?;7273buffer.clear();74buffer.try_reserve(footer_len as usize)?;75reader76.take(footer_len as u64)77.read_to_end(&mut buffer)78.await?;7980&buffer81};8283// a highly nested but sparse struct could result in many allocations84let max_size = reader.len() * 2 + 1024;8586deserialize_metadata(reader, max_size)87}888990