Path: blob/main/crates/polars-arrow/src/io/ipc/read/read_basic.rs
6940 views
use std::collections::VecDeque;1use std::io::{Read, Seek, SeekFrom};23use polars_error::{PolarsResult, polars_bail, polars_err};45use super::super::compression;6use super::super::endianness::is_native_little_endian;7use super::{Compression, IpcBuffer, Node, OutOfSpecKind};8use crate::bitmap::Bitmap;9use crate::buffer::Buffer;10use crate::types::NativeType;1112fn read_swapped<T: NativeType, R: Read + Seek>(13reader: &mut R,14length: usize,15buffer: &mut Vec<T>,16is_little_endian: bool,17) -> PolarsResult<()> {18// Slow case where we must reverse bits.19#[expect(clippy::slow_vector_initialization)] // Avoid alloc_zeroed, leads to syscall.20let mut slice = Vec::new();21slice.resize(length * size_of::<T>(), 0);22reader.read_exact(&mut slice)?;2324let chunks = slice.chunks_exact(size_of::<T>());25if !is_little_endian {26// machine is little endian, file is big endian27buffer28.as_mut_slice()29.iter_mut()30.zip(chunks)31.try_for_each(|(slot, chunk)| {32let a: T::Bytes = match chunk.try_into() {33Ok(a) => a,34Err(_) => unreachable!(),35};36*slot = T::from_be_bytes(a);37PolarsResult::Ok(())38})?;39} else {40// machine is big endian, file is little endian41polars_bail!(ComputeError:42"Reading little endian files from big endian machines",43)44}45Ok(())46}4748fn read_uncompressed_bytes<R: Read + Seek>(49reader: &mut R,50buffer_length: usize,51is_little_endian: bool,52) -> PolarsResult<Vec<u8>> {53if is_native_little_endian() == is_little_endian {54let mut buffer = Vec::with_capacity(buffer_length);55let _ = reader56.take(buffer_length as u64)57.read_to_end(&mut buffer)58.unwrap();59Ok(buffer)60} else {61unreachable!()62}63}6465fn read_uncompressed_buffer<T: NativeType, R: Read + Seek>(66reader: &mut R,67buffer_length: usize,68length: usize,69is_little_endian: bool,70) -> PolarsResult<Vec<T>> {71let required_number_of_bytes = length.saturating_mul(size_of::<T>());72if required_number_of_bytes > buffer_length {73polars_bail!(74oos = OutOfSpecKind::InvalidBuffer {75length,76type_name: std::any::type_name::<T>(),77required_number_of_bytes,78buffer_length,79}80);81}8283// it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read84// see also https://github.com/MaikKlein/ash/issues/354#issue-78173058085let mut buffer = vec![T::default(); length];8687if is_native_little_endian() == is_little_endian {88// fast case where we can just copy the contents89let slice = bytemuck::cast_slice_mut(&mut buffer);90reader.read_exact(slice)?;91} else {92read_swapped(reader, length, &mut buffer, is_little_endian)?;93}94Ok(buffer)95}9697fn read_compressed_buffer<T: NativeType, R: Read + Seek>(98reader: &mut R,99buffer_length: usize,100output_length: Option<usize>,101is_little_endian: bool,102compression: Compression,103scratch: &mut Vec<u8>,104) -> PolarsResult<Vec<T>> {105if output_length == Some(0) {106return Ok(vec![]);107}108109if is_little_endian != is_native_little_endian() {110polars_bail!(ComputeError:111"Reading compressed and big endian IPC".to_string(),112)113}114115// decompress first116scratch.clear();117scratch.try_reserve(buffer_length)?;118reader119.by_ref()120.take(buffer_length as u64)121.read_to_end(scratch)?;122123let length = output_length124.unwrap_or_else(|| i64::from_le_bytes(scratch[..8].try_into().unwrap()) as usize);125126// It is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read127// see also https://github.com/MaikKlein/ash/issues/354#issue-781730580128let mut buffer = vec![T::default(); length];129130let out_slice = bytemuck::cast_slice_mut(&mut buffer);131132let compression = compression133.codec()134.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferCompression(err)))?;135136match compression {137arrow_format::ipc::CompressionType::Lz4Frame => {138compression::decompress_lz4(&scratch[8..], out_slice)?;139},140arrow_format::ipc::CompressionType::Zstd => {141compression::decompress_zstd(&scratch[8..], out_slice)?;142},143}144Ok(buffer)145}146147fn read_compressed_bytes<R: Read + Seek>(148reader: &mut R,149buffer_length: usize,150is_little_endian: bool,151compression: Compression,152scratch: &mut Vec<u8>,153) -> PolarsResult<Vec<u8>> {154read_compressed_buffer::<u8, _>(155reader,156buffer_length,157None,158is_little_endian,159compression,160scratch,161)162}163164pub fn read_bytes<R: Read + Seek>(165buf: &mut VecDeque<IpcBuffer>,166reader: &mut R,167block_offset: u64,168is_little_endian: bool,169compression: Option<Compression>,170scratch: &mut Vec<u8>,171) -> PolarsResult<Buffer<u8>> {172let buf = buf173.pop_front()174.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;175176let offset: u64 = buf177.offset()178.try_into()179.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;180181let buffer_length: usize = buf182.length()183.try_into()184.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;185186reader.seek(SeekFrom::Start(block_offset + offset))?;187188if let Some(compression) = compression {189Ok(read_compressed_bytes(190reader,191buffer_length,192is_little_endian,193compression,194scratch,195)?196.into())197} else {198Ok(read_uncompressed_bytes(reader, buffer_length, is_little_endian)?.into())199}200}201202pub fn read_buffer<T: NativeType, R: Read + Seek>(203buf: &mut VecDeque<IpcBuffer>,204length: usize, // in slots205reader: &mut R,206block_offset: u64,207is_little_endian: bool,208compression: Option<Compression>,209scratch: &mut Vec<u8>,210) -> PolarsResult<Buffer<T>> {211let buf = buf212.pop_front()213.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;214215let offset: u64 = buf216.offset()217.try_into()218.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;219220let buffer_length: usize = buf221.length()222.try_into()223.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;224225reader.seek(SeekFrom::Start(block_offset + offset))?;226227if let Some(compression) = compression {228Ok(read_compressed_buffer(229reader,230buffer_length,231Some(length),232is_little_endian,233compression,234scratch,235)?236.into())237} else {238Ok(read_uncompressed_buffer(reader, buffer_length, length, is_little_endian)?.into())239}240}241242fn read_uncompressed_bitmap<R: Read + Seek>(243length: usize,244bytes: usize,245reader: &mut R,246) -> PolarsResult<Vec<u8>> {247if length > bytes * 8 {248polars_bail!(249oos = OutOfSpecKind::InvalidBitmap {250length,251number_of_bits: bytes * 8,252}253)254}255256let mut buffer = vec![];257buffer.try_reserve(bytes)?;258reader259.by_ref()260.take(bytes as u64)261.read_to_end(&mut buffer)?;262263Ok(buffer)264}265266fn read_compressed_bitmap<R: Read + Seek>(267length: usize,268bytes: usize,269compression: Compression,270reader: &mut R,271scratch: &mut Vec<u8>,272) -> PolarsResult<Vec<u8>> {273#[expect(clippy::slow_vector_initialization)] // Avoid alloc_zeroed, leads to syscall.274let mut buffer = Vec::new();275buffer.resize(length.div_ceil(8), 0);276277scratch.clear();278scratch.try_reserve(bytes)?;279reader.by_ref().take(bytes as u64).read_to_end(scratch)?;280281let compression = compression282.codec()283.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferCompression(err)))?;284285match compression {286arrow_format::ipc::CompressionType::Lz4Frame => {287compression::decompress_lz4(&scratch[8..], &mut buffer)?;288},289arrow_format::ipc::CompressionType::Zstd => {290compression::decompress_zstd(&scratch[8..], &mut buffer)?;291},292}293Ok(buffer)294}295296pub fn read_bitmap<R: Read + Seek>(297buf: &mut VecDeque<IpcBuffer>,298length: usize,299reader: &mut R,300block_offset: u64,301_: bool,302compression: Option<Compression>,303scratch: &mut Vec<u8>,304) -> PolarsResult<Bitmap> {305let buf = buf306.pop_front()307.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;308309let offset: u64 = buf310.offset()311.try_into()312.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;313314let bytes: usize = buf315.length()316.try_into()317.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;318319reader.seek(SeekFrom::Start(block_offset + offset))?;320321let buffer = if let Some(compression) = compression {322read_compressed_bitmap(length, bytes, compression, reader, scratch)323} else {324read_uncompressed_bitmap(length, bytes, reader)325}?;326327Bitmap::try_new(buffer, length)328}329330#[allow(clippy::too_many_arguments)]331pub fn read_validity<R: Read + Seek>(332buffers: &mut VecDeque<IpcBuffer>,333field_node: Node,334reader: &mut R,335block_offset: u64,336is_little_endian: bool,337compression: Option<Compression>,338limit: Option<usize>,339scratch: &mut Vec<u8>,340) -> PolarsResult<Option<Bitmap>> {341let length: usize = field_node342.length()343.try_into()344.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;345let length = limit.map(|limit| limit.min(length)).unwrap_or(length);346347Ok(if field_node.null_count() > 0 {348Some(read_bitmap(349buffers,350length,351reader,352block_offset,353is_little_endian,354compression,355scratch,356)?)357} else {358let _ = buffers359.pop_front()360.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;361None362})363}364365366