Path: blob/main/crates/polars-arrow/src/io/ipc/read/file.rs
6940 views
use std::io::{Read, Seek, SeekFrom};1use std::sync::Arc;23use arrow_format::ipc::FooterRef;4use arrow_format::ipc::planus::ReadAsRoot;5use polars_error::{PolarsResult, polars_bail, polars_err};6use polars_utils::aliases::{InitHashMaps, PlHashMap};78use super::super::{ARROW_MAGIC_V1, ARROW_MAGIC_V2, CONTINUATION_MARKER};9use super::common::*;10use super::schema::fb_to_schema;11use super::{Dictionaries, OutOfSpecKind, SendableIterator};12use crate::array::Array;13use crate::datatypes::{ArrowSchemaRef, Metadata};14use crate::io::ipc::IpcSchema;15use crate::record_batch::RecordBatchT;1617/// Metadata of an Arrow IPC file, written in the footer of the file.18#[derive(Debug, Clone)]19pub struct FileMetadata {20/// The schema that is read from the file footer21pub schema: ArrowSchemaRef,2223/// The custom metadata that is read from the schema24pub custom_schema_metadata: Option<Arc<Metadata>>,2526/// The files' [`IpcSchema`]27pub ipc_schema: IpcSchema,2829/// The blocks in the file30///31/// A block indicates the regions in the file to read to get data32pub blocks: Vec<arrow_format::ipc::Block>,3334/// Dictionaries associated to each dict_id35pub(crate) dictionaries: Option<Vec<arrow_format::ipc::Block>>,3637/// The total size of the file in bytes38pub size: u64,39}4041/// Read the row count by summing the length of the of the record batches42pub fn get_row_count<R: Read + Seek>(reader: &mut R) -> PolarsResult<i64> {43let (_, footer_len) = read_footer_len(reader)?;44let footer = read_footer(reader, footer_len)?;45let (_, blocks) = deserialize_footer_blocks(&footer)?;4647get_row_count_from_blocks(reader, &blocks)48}4950/// Read the row count by summing the length of the of the record batches in blocks51pub fn get_row_count_from_blocks<R: Read + Seek>(52reader: &mut R,53blocks: &[arrow_format::ipc::Block],54) -> PolarsResult<i64> {55let mut message_scratch: Vec<u8> = Default::default();5657blocks58.iter()59.map(|block| {60let message = get_message_from_block(reader, block, &mut message_scratch)?;61let record_batch = get_record_batch(message)?;62record_batch.length().map_err(|e| e.into())63})64.sum()65}6667pub(crate) fn get_dictionary_batch<'a>(68message: &'a arrow_format::ipc::MessageRef,69) -> PolarsResult<arrow_format::ipc::DictionaryBatchRef<'a>> {70let header = message71.header()72.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?73.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;74match header {75arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => Ok(batch),76_ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),77}78}7980fn read_dictionary_block<R: Read + Seek>(81reader: &mut R,82metadata: &FileMetadata,83block: &arrow_format::ipc::Block,84dictionaries: &mut Dictionaries,85message_scratch: &mut Vec<u8>,86dictionary_scratch: &mut Vec<u8>,87) -> PolarsResult<()> {88let message = get_message_from_block(reader, block, message_scratch)?;89let batch = get_dictionary_batch(&message)?;9091let offset: u64 = block92.offset93.try_into()94.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;9596let length: u64 = block97.meta_data_length98.try_into()99.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;100101read_dictionary(102batch,103&metadata.schema,104&metadata.ipc_schema,105dictionaries,106reader,107offset + length,108metadata.size,109dictionary_scratch,110)111}112113/// Reads all file's dictionaries, if any114/// This function is IO-bounded115pub fn read_file_dictionaries<R: Read + Seek>(116reader: &mut R,117metadata: &FileMetadata,118scratch: &mut Vec<u8>,119) -> PolarsResult<Dictionaries> {120let mut dictionaries = Default::default();121122let blocks = if let Some(blocks) = &metadata.dictionaries {123blocks124} else {125return Ok(PlHashMap::new());126};127// use a temporary smaller scratch for the messages128let mut message_scratch = Default::default();129130for block in blocks {131read_dictionary_block(132reader,133metadata,134block,135&mut dictionaries,136&mut message_scratch,137scratch,138)?;139}140Ok(dictionaries)141}142143pub(super) fn decode_footer_len(footer: [u8; 10], end: u64) -> PolarsResult<(u64, usize)> {144let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());145146if footer[4..] != ARROW_MAGIC_V2 {147if footer[..4] == ARROW_MAGIC_V1 {148polars_bail!(ComputeError: "feather v1 not supported");149}150return Err(polars_err!(oos = OutOfSpecKind::InvalidFooter));151}152let footer_len = footer_len153.try_into()154.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;155156Ok((end, footer_len))157}158159/// Reads the footer's length and magic number in footer160fn read_footer_len<R: Read + Seek>(reader: &mut R) -> PolarsResult<(u64, usize)> {161// read footer length and magic number in footer162let end = reader.seek(SeekFrom::End(-10))? + 10;163164let mut footer: [u8; 10] = [0; 10];165166reader.read_exact(&mut footer)?;167decode_footer_len(footer, end)168}169170fn read_footer<R: Read + Seek>(reader: &mut R, footer_len: usize) -> PolarsResult<Vec<u8>> {171// read footer172reader.seek(SeekFrom::End(-10 - footer_len as i64))?;173174let mut serialized_footer = vec![];175serialized_footer.try_reserve(footer_len)?;176reader177.by_ref()178.take(footer_len as u64)179.read_to_end(&mut serialized_footer)?;180Ok(serialized_footer)181}182183fn deserialize_footer_blocks(184footer_data: &[u8],185) -> PolarsResult<(FooterRef<'_>, Vec<arrow_format::ipc::Block>)> {186let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data)187.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))?;188189let blocks = footer190.record_batches()191.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?192.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;193194let blocks = blocks195.iter()196.map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref)))197.collect::<PolarsResult<Vec<_>>>()?;198Ok((footer, blocks))199}200201pub(super) fn deserialize_footer_ref(footer_data: &[u8]) -> PolarsResult<FooterRef<'_>> {202arrow_format::ipc::FooterRef::read_as_root(footer_data)203.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))204}205206pub(super) fn deserialize_schema_ref_from_footer(207footer: arrow_format::ipc::FooterRef<'_>,208) -> PolarsResult<arrow_format::ipc::SchemaRef<'_>> {209footer210.schema()211.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferSchema(err)))?212.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingSchema))213}214215/// Get the IPC blocks from the footer containing record batches216pub(super) fn iter_recordbatch_blocks_from_footer(217footer: arrow_format::ipc::FooterRef<'_>,218) -> PolarsResult<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_> {219let blocks = footer220.record_batches()221.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?222.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;223224Ok(blocks225.into_iter()226.map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref))))227}228229pub(super) fn iter_dictionary_blocks_from_footer(230footer: arrow_format::ipc::FooterRef<'_>,231) -> PolarsResult<Option<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_>>232{233let dictionaries = footer234.dictionaries()235.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferDictionaries(err)))?;236237Ok(dictionaries.map(|dicts| {238dicts239.into_iter()240.map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref)))241}))242}243244pub fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult<FileMetadata> {245let footer = deserialize_footer_ref(footer_data)?;246let blocks = iter_recordbatch_blocks_from_footer(footer)?.collect::<PolarsResult<Vec<_>>>()?;247let dictionaries = iter_dictionary_blocks_from_footer(footer)?248.map(|dicts| dicts.collect::<PolarsResult<Vec<_>>>())249.transpose()?;250let ipc_schema = deserialize_schema_ref_from_footer(footer)?;251let (schema, ipc_schema, custom_schema_metadata) = fb_to_schema(ipc_schema)?;252253Ok(FileMetadata {254schema: Arc::new(schema),255ipc_schema,256blocks,257dictionaries,258size,259custom_schema_metadata: custom_schema_metadata.map(Arc::new),260})261}262263/// Read the Arrow IPC file's metadata264pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> PolarsResult<FileMetadata> {265let start = reader.stream_position()?;266let (end, footer_len) = read_footer_len(reader)?;267let serialized_footer = read_footer(reader, footer_len)?;268deserialize_footer(&serialized_footer, end - start)269}270271pub(crate) fn get_record_batch(272message: arrow_format::ipc::MessageRef,273) -> PolarsResult<arrow_format::ipc::RecordBatchRef> {274let header = message275.header()276.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?277.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;278match header {279arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => Ok(batch),280_ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),281}282}283284fn get_message_from_block_offset<'a, R: Read + Seek>(285reader: &mut R,286offset: u64,287message_scratch: &'a mut Vec<u8>,288) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {289// read length290reader.seek(SeekFrom::Start(offset))?;291let mut meta_buf = [0; 4];292reader.read_exact(&mut meta_buf)?;293if meta_buf == CONTINUATION_MARKER {294// continuation marker encountered, read message next295reader.read_exact(&mut meta_buf)?;296}297let meta_len = i32::from_le_bytes(meta_buf)298.try_into()299.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;300301message_scratch.clear();302message_scratch.try_reserve(meta_len)?;303reader304.by_ref()305.take(meta_len as u64)306.read_to_end(message_scratch)?;307308arrow_format::ipc::MessageRef::read_as_root(message_scratch)309.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))310}311312pub(super) fn get_message_from_block<'a, R: Read + Seek>(313reader: &mut R,314block: &arrow_format::ipc::Block,315message_scratch: &'a mut Vec<u8>,316) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {317let offset: u64 = block318.offset319.try_into()320.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;321322get_message_from_block_offset(reader, offset, message_scratch)323}324325/// Reads the record batch at position `index` from the reader.326///327/// This function is useful for random access to the file. For example, if328/// you have indexed the file somewhere else, this allows pruning329/// certain parts of the file.330/// # Panics331/// This function panics iff `index >= metadata.blocks.len()`332#[allow(clippy::too_many_arguments)]333pub fn read_batch<R: Read + Seek>(334reader: &mut R,335dictionaries: &Dictionaries,336metadata: &FileMetadata,337projection: Option<&[usize]>,338limit: Option<usize>,339index: usize,340message_scratch: &mut Vec<u8>,341data_scratch: &mut Vec<u8>,342) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {343let block = metadata.blocks[index];344345let offset: u64 = block346.offset347.try_into()348.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;349350let length: u64 = block351.meta_data_length352.try_into()353.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;354355let message = get_message_from_block_offset(reader, offset, message_scratch)?;356let batch = get_record_batch(message)?;357358read_record_batch(359batch,360&metadata.schema,361&metadata.ipc_schema,362projection,363limit,364dictionaries,365message366.version()367.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferVersion(err)))?,368reader,369offset + length,370metadata.size,371data_scratch,372)373}374375376