Path: blob/main/crates/polars-arrow/src/io/ipc/read/read_basic.rs
8424 views
use std::collections::VecDeque;1use std::io::{Read, Seek, SeekFrom};23use polars_buffer::Buffer;4use polars_error::{PolarsResult, polars_bail, polars_ensure, polars_err};56use super::super::compression;7use super::super::endianness::is_native_little_endian;8use super::{Compression, IpcBuffer, Node, OutOfSpecKind};9use crate::bitmap::Bitmap;10use crate::types::NativeType;1112fn read_swapped<T: NativeType, R: Read + Seek>(13reader: &mut R,14length: usize,15buffer: &mut Vec<T>,16is_little_endian: bool,17) -> PolarsResult<()> {18// Slow case where we must reverse bits.19#[expect(clippy::slow_vector_initialization)] // Avoid alloc_zeroed, leads to syscall.20let mut slice = Vec::new();21slice.resize(length * size_of::<T>(), 0);22reader.read_exact(&mut slice)?;2324let chunks = slice.chunks_exact(size_of::<T>());25if !is_little_endian {26// machine is little endian, file is big endian27buffer28.as_mut_slice()29.iter_mut()30.zip(chunks)31.try_for_each(|(slot, chunk)| {32let a: T::Bytes = match chunk.try_into() {33Ok(a) => a,34Err(_) => unreachable!(),35};36*slot = T::from_be_bytes(a);37PolarsResult::Ok(())38})?;39} else {40// machine is big endian, file is little endian41polars_bail!(ComputeError:42"Reading little endian files from big endian machines",43)44}45Ok(())46}4748fn read_uncompressed_bytes<R: Read + Seek>(49reader: &mut R,50buffer_length: usize,51is_little_endian: bool,52) -> PolarsResult<Vec<u8>> {53if is_native_little_endian() == is_little_endian {54let mut buffer = Vec::with_capacity(buffer_length);55let _ = reader56.take(buffer_length as u64)57.read_to_end(&mut buffer)58.unwrap();5960polars_ensure!(buffer.len() == buffer_length, ComputeError: "Malformed IPC file: expected compressed buffer of len {buffer_length}, got {}", buffer.len());6162Ok(buffer)63} else {64unreachable!()65}66}6768fn read_uncompressed_buffer<T: NativeType, R: Read + Seek>(69reader: &mut R,70buffer_length: usize,71length: usize,72is_little_endian: bool,73) -> PolarsResult<Vec<T>> {74let required_number_of_bytes = length.saturating_mul(size_of::<T>());75if required_number_of_bytes > buffer_length {76polars_bail!(77oos = OutOfSpecKind::InvalidBuffer {78length,79type_name: std::any::type_name::<T>(),80required_number_of_bytes,81buffer_length,82}83);84}8586// it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read87// see also https://github.com/MaikKlein/ash/issues/354#issue-78173058088let mut buffer = vec![T::default(); length];8990if is_native_little_endian() == is_little_endian {91// fast case where we can just copy the contents92let slice = bytemuck::cast_slice_mut(&mut buffer);93reader.read_exact(slice)?;94} else {95read_swapped(reader, length, &mut buffer, is_little_endian)?;96}97Ok(buffer)98}99100fn read_compressed_buffer<T: NativeType, R: Read + Seek>(101reader: &mut R,102buffer_length: usize,103// Upper bound for the number of rows to be returned.104row_limit: Option<usize>,105is_little_endian: bool,106compression: Compression,107scratch: &mut Vec<u8>,108) -> PolarsResult<Vec<T>> {109if row_limit == Some(0) {110return Ok(vec![]);111}112113if is_little_endian != is_native_little_endian() {114polars_bail!(ComputeError:115"Reading compressed and big endian IPC",116)117}118119// Decompress first.120scratch.clear();121scratch.try_reserve(buffer_length)?;122reader123.by_ref()124.take(buffer_length as u64)125.read_to_end(scratch)?;126127polars_ensure!(scratch.len() == buffer_length, ComputeError: "Malformed IPC file: expected compressed buffer of len {buffer_length}, got {}", scratch.len());128129let decompressed_len_field = i64::from_le_bytes(scratch[..8].try_into().unwrap());130let decompressed_bytes: usize = if decompressed_len_field == -1 {131buffer_length - 8132} else {133decompressed_len_field.try_into().map_err(|_| {134polars_err!(ComputeError: "Malformed IPC file: got invalid decompressed length {decompressed_len_field}")135})?136};137138polars_ensure!(decompressed_bytes.is_multiple_of(size_of::<T>()),139ComputeError: "Malformed IPC file: got decompressed buffer length which is not a multiple of the data type");140let n_rows_in_array = decompressed_bytes / size_of::<T>();141142if decompressed_len_field == -1 {143return Ok(bytemuck::cast_slice(&scratch[8..]).to_vec());144}145146// It is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read147// see also https://github.com/MaikKlein/ash/issues/354#issue-781730580148149let n_rows_exact = row_limit150.map(|limit| std::cmp::min(limit, n_rows_in_array))151.unwrap_or(n_rows_in_array);152153let mut buffer = vec![T::default(); n_rows_exact];154let out_slice = bytemuck::cast_slice_mut(&mut buffer);155156let compression = compression157.codec()158.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferCompression(err)))?;159160match compression {161arrow_format::ipc::CompressionType::Lz4Frame => {162compression::decompress_lz4(&scratch[8..], out_slice)?;163},164arrow_format::ipc::CompressionType::Zstd => {165compression::decompress_zstd(&scratch[8..], out_slice)?;166},167}168Ok(buffer)169}170171fn read_compressed_bytes<R: Read + Seek>(172reader: &mut R,173buffer_length: usize,174is_little_endian: bool,175compression: Compression,176scratch: &mut Vec<u8>,177) -> PolarsResult<Vec<u8>> {178read_compressed_buffer::<u8, _>(179reader,180buffer_length,181None,182is_little_endian,183compression,184scratch,185)186}187188pub fn read_bytes<R: Read + Seek>(189buf: &mut VecDeque<IpcBuffer>,190reader: &mut R,191block_offset: u64,192is_little_endian: bool,193compression: Option<Compression>,194scratch: &mut Vec<u8>,195) -> PolarsResult<Buffer<u8>> {196let buf = buf197.pop_front()198.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;199200let offset: u64 = buf201.offset()202.try_into()203.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;204205let buffer_length: usize = buf206.length()207.try_into()208.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;209210reader.seek(SeekFrom::Start(block_offset + offset))?;211212if let Some(compression) = compression {213Ok(read_compressed_bytes(214reader,215buffer_length,216is_little_endian,217compression,218scratch,219)?220.into())221} else {222Ok(read_uncompressed_bytes(reader, buffer_length, is_little_endian)?.into())223}224}225226pub fn read_buffer<T: NativeType, R: Read + Seek>(227buf: &mut VecDeque<IpcBuffer>,228length: usize, // in slots229reader: &mut R,230block_offset: u64,231is_little_endian: bool,232compression: Option<Compression>,233scratch: &mut Vec<u8>,234) -> PolarsResult<Buffer<T>> {235let buf = buf236.pop_front()237.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;238239let offset: u64 = buf240.offset()241.try_into()242.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;243244let buffer_length: usize = buf245.length()246.try_into()247.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;248249reader.seek(SeekFrom::Start(block_offset + offset))?;250251if let Some(compression) = compression {252Ok(read_compressed_buffer(253reader,254buffer_length,255Some(length),256is_little_endian,257compression,258scratch,259)?260.into())261} else {262Ok(read_uncompressed_buffer(reader, buffer_length, length, is_little_endian)?.into())263}264}265266fn read_uncompressed_bitmap<R: Read + Seek>(267row_limit: usize,268bytes: usize,269reader: &mut R,270) -> PolarsResult<Vec<u8>> {271if row_limit > bytes * 8 {272polars_bail!(273oos = OutOfSpecKind::InvalidBitmap {274length: row_limit,275number_of_bits: bytes * 8,276}277)278}279280let mut buffer = vec![];281buffer.try_reserve(bytes)?;282reader283.by_ref()284.take(bytes as u64)285.read_to_end(&mut buffer)?;286287polars_ensure!(buffer.len() == bytes, ComputeError: "Malformed IPC file: expected compressed buffer of len {bytes}, got {}", buffer.len());288289Ok(buffer)290}291292fn read_compressed_bitmap<R: Read + Seek>(293row_limit: usize,294bytes: usize,295compression: Compression,296reader: &mut R,297scratch: &mut Vec<u8>,298) -> PolarsResult<Vec<u8>> {299scratch.clear();300scratch.try_reserve(bytes)?;301reader.by_ref().take(bytes as u64).read_to_end(scratch)?;302if scratch.len() != bytes {303polars_bail!(ComputeError: "Malformed IPC file: expected compressed buffer of len {bytes}, got {}", scratch.len());304}305306let decompressed_len_field = i64::from_le_bytes(scratch[..8].try_into().unwrap());307let decompressed_bytes: usize = if decompressed_len_field == -1 {308scratch.len() - 8309} else {310decompressed_len_field.try_into().map_err(|_| {311polars_err!(ComputeError: "Malformed IPC file: got invalid decompressed length {decompressed_len_field}")312})?313};314315// In addition to the slicing use case, we allow for excess bytes in untruncated buffers,316// see https://github.com/pola-rs/polars/issues/26126317// and https://github.com/apache/arrow/issues/48883318polars_ensure!(decompressed_bytes >= row_limit.div_ceil(8),319ComputeError: "Malformed IPC file: got unexpected decompressed output length {decompressed_bytes}, expected {}", row_limit.div_ceil(8));320321if decompressed_len_field == -1 {322return Ok(bytemuck::cast_slice(&scratch[8..]).to_vec());323}324325#[expect(clippy::slow_vector_initialization)] // Avoid alloc_zeroed, leads to syscall.326let mut buffer = Vec::new();327buffer.resize(decompressed_bytes, 0);328329let compression = compression330.codec()331.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferCompression(err)))?;332333match compression {334arrow_format::ipc::CompressionType::Lz4Frame => {335compression::decompress_lz4(&scratch[8..], &mut buffer)?;336},337arrow_format::ipc::CompressionType::Zstd => {338compression::decompress_zstd(&scratch[8..], &mut buffer)?;339},340}341Ok(buffer)342}343344pub fn read_bitmap<R: Read + Seek>(345buf: &mut VecDeque<IpcBuffer>,346row_limit: usize,347reader: &mut R,348block_offset: u64,349_: bool,350compression: Option<Compression>,351scratch: &mut Vec<u8>,352) -> PolarsResult<Bitmap> {353let buf = buf354.pop_front()355.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;356357let offset: u64 = buf358.offset()359.try_into()360.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;361362let bytes: usize = buf363.length()364.try_into()365.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;366367reader.seek(SeekFrom::Start(block_offset + offset))?;368369let buffer = if let Some(compression) = compression {370read_compressed_bitmap(row_limit, bytes, compression, reader, scratch)371} else {372read_uncompressed_bitmap(row_limit, bytes, reader)373}?;374375Bitmap::try_new(buffer, row_limit)376}377378#[allow(clippy::too_many_arguments)]379pub fn read_validity<R: Read + Seek>(380buffers: &mut VecDeque<IpcBuffer>,381field_node: Node,382reader: &mut R,383block_offset: u64,384is_little_endian: bool,385compression: Option<Compression>,386limit: Option<usize>,387scratch: &mut Vec<u8>,388) -> PolarsResult<Option<Bitmap>> {389let length: usize = field_node390.length()391.try_into()392.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;393let row_limit = limit.map(|limit| limit.min(length)).unwrap_or(length);394395Ok(if field_node.null_count() > 0 {396Some(read_bitmap(397buffers,398row_limit,399reader,400block_offset,401is_little_endian,402compression,403scratch,404)?)405} else {406let _ = buffers407.pop_front()408.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;409None410})411}412413414