Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/metadata_utils.rs
6939 views
use polars_error::PolarsResult;1use polars_io::utils::byte_source::{ByteSource, DynByteSource};2use polars_utils::mmap::MemSlice;34/// Read the metadata bytes of a parquet file, does not decode the bytes. If during metadata fetch5/// the bytes of the entire file are loaded, it is returned in the second return value.6pub async fn read_parquet_metadata_bytes(7byte_source: &DynByteSource,8verbose: bool,9) -> PolarsResult<(MemSlice, Option<MemSlice>)> {10use polars_parquet::parquet::PARQUET_MAGIC;11use polars_parquet::parquet::error::ParquetError;1213const FOOTER_HEADER_SIZE: usize = polars_parquet::parquet::FOOTER_SIZE as usize;1415let file_size = byte_source.get_size().await?;1617if file_size < FOOTER_HEADER_SIZE {18return Err(ParquetError::OutOfSpec(format!(19"file size ({file_size}) is less than minimum size required to store parquet footer ({FOOTER_HEADER_SIZE})"20))21.into());22}2324let estimated_metadata_size = if let DynByteSource::MemSlice(_) = byte_source {25// Mmapped or in-memory, reads are free.26file_size27} else {28(file_size / 2048).clamp(16_384, 131_072).min(file_size)29};3031let bytes = byte_source32.get_range((file_size - estimated_metadata_size)..file_size)33.await?;3435let footer_header_bytes = bytes.slice((bytes.len() - FOOTER_HEADER_SIZE)..bytes.len());3637let (v, remaining) = footer_header_bytes.split_at(4);38let footer_size = u32::from_le_bytes(v.try_into().unwrap());3940if remaining != PARQUET_MAGIC {41return Err(ParquetError::OutOfSpec(format!(42r#"expected parquet magic bytes "{}" in footer, got "{}" instead"#,43std::str::from_utf8(&PARQUET_MAGIC).unwrap(),44String::from_utf8_lossy(remaining)45))46.into());47}4849let footer_size = footer_size as usize + FOOTER_HEADER_SIZE;5051if file_size < footer_size {52return Err(ParquetError::OutOfSpec(format!(53"file size ({file_size}) is less than the indicated footer size ({footer_size})"54))55.into());56}5758if bytes.len() < footer_size {59debug_assert!(!matches!(byte_source, DynByteSource::MemSlice(_)));60if verbose {61eprintln!(62"[ParquetFileReader]: Extra {} bytes need to be fetched for metadata \63(initial estimate = {}, actual size = {})",64footer_size - estimated_metadata_size,65bytes.len(),66footer_size,67);68}6970let mut out = Vec::with_capacity(footer_size);71let offset = file_size - footer_size;72let len = footer_size - bytes.len();73let delta_bytes = byte_source.get_range(offset..(offset + len)).await?;7475debug_assert!(out.capacity() >= delta_bytes.len() + bytes.len());7677out.extend_from_slice(&delta_bytes);78out.extend_from_slice(&bytes);7980Ok((MemSlice::from_vec(out), None))81} else {82if verbose && !matches!(byte_source, DynByteSource::MemSlice(_)) {83eprintln!(84"[ParquetFileReader]: Fetched all bytes for metadata on first try \85(initial estimate = {}, actual size = {}, excess = {}, total file size = {})",86bytes.len(),87footer_size,88estimated_metadata_size - footer_size,89file_size,90);91}9293let metadata_bytes = bytes.slice((bytes.len() - footer_size)..bytes.len());9495if bytes.len() == file_size {96Ok((metadata_bytes, Some(bytes)))97} else {98debug_assert!(!matches!(byte_source, DynByteSource::MemSlice(_)));99let metadata_bytes = if bytes.len() - footer_size >= bytes.len() {100// Re-allocate to drop the excess bytes101MemSlice::from_vec(metadata_bytes.to_vec())102} else {103metadata_bytes104};105106Ok((metadata_bytes, None))107}108}109}110111112