Path: blob/main/crates/polars-stream/src/nodes/io_sources/ipc/metadata.rs
8439 views
use arrow::io::ipc::read::OutOfSpecKind;1use polars_buffer::Buffer;2use polars_error::{PolarsResult, polars_bail, polars_ensure, polars_err};3use polars_io::utils::byte_source::{ByteSource, DynByteSource};45/// Read the metadata bytes of a parquet file, does not decode the bytes. If during metadata fetch6/// the bytes of the entire file are loaded, it is returned in the second return value.7pub async fn read_ipc_metadata_bytes(8byte_source: &DynByteSource,9verbose: bool,10) -> PolarsResult<(Buffer<u8>, Option<Buffer<u8>>)> {11const FOOTER_HEADER_SIZE: usize = 10;12const ARROW_MAGIC_V1: [u8; 4] = [b'F', b'E', b'A', b'1'];13const ARROW_MAGIC_V2: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];1415let file_size = byte_source.get_size().await?;1617polars_ensure!(18file_size >= FOOTER_HEADER_SIZE,19ComputeError: "ipc file size is smaller than the minimum"20);2122let estimated_metadata_size = if let DynByteSource::Buffer(_) = byte_source {23// Mmapped or in-memory, reads are free.24file_size25} else {26(file_size / 2048).clamp(16_384, 131_072).min(file_size)27};2829let bytes = byte_source30.get_range((file_size - estimated_metadata_size)..file_size)31.await?;3233let footer_header_bytes = bytes.clone().sliced((bytes.len() - FOOTER_HEADER_SIZE)..);3435if footer_header_bytes[4..] != ARROW_MAGIC_V2 {36if footer_header_bytes[..4] == ARROW_MAGIC_V1 {37polars_bail!(ComputeError: "feather v1 not supported");38}39return Err(polars_err!(oos = OutOfSpecKind::InvalidFooter));40}4142let footer_size = u32::from_le_bytes(footer_header_bytes[..4].try_into().unwrap());43let footer_size = footer_size as usize + FOOTER_HEADER_SIZE;4445polars_ensure!(46file_size >= footer_size,47ComputeError:48"file size ({file_size}) is less than the indicated footer size ({footer_size})",49);5051if bytes.len() < footer_size {52debug_assert!(!matches!(byte_source, DynByteSource::Buffer(_)));53if verbose {54eprintln!(55"[IpcFileReader]: Extra {} bytes need to be fetched for metadata \56(initial estimate = {}, actual size = {})",57footer_size - estimated_metadata_size,58bytes.len(),59footer_size,60);61}6263let mut out = Vec::with_capacity(footer_size);64let offset = file_size - footer_size;65let len = footer_size - bytes.len();66let delta_bytes = byte_source.get_range(offset..(offset + len)).await?;6768debug_assert!(out.capacity() >= delta_bytes.len() + bytes.len());6970out.extend_from_slice(&delta_bytes);71out.extend_from_slice(&bytes);7273Ok((Buffer::from_vec(out), None))74} else {75if verbose && !matches!(byte_source, DynByteSource::Buffer(_)) {76eprintln!(77"[IpcFileReader]: Fetched all bytes for metadata on first try \78(initial estimate = {}, actual size = {}, excess = {}, total file size = {})",79bytes.len(),80footer_size,81estimated_metadata_size - footer_size,82file_size,83);84}8586let metadata_bytes = bytes.clone().sliced((bytes.len() - footer_size)..);8788if bytes.len() == file_size {89Ok((metadata_bytes, Some(bytes)))90} else {91debug_assert!(!matches!(byte_source, DynByteSource::Buffer(_)));92let metadata_bytes = if bytes.len() - footer_size >= bytes.len() {93// Re-allocate to drop the excess bytes94Buffer::from_vec(metadata_bytes.to_vec())95} else {96metadata_bytes97};9899Ok((metadata_bytes, None))100}101}102}103104105