Path: blob/main/crates/polars-arrow/src/io/ipc/read/file.rs
8449 views
use std::io::{Read, Seek, SeekFrom};1use std::sync::Arc;23use arrow_format::ipc::FooterRef;4use arrow_format::ipc::planus::ReadAsRoot;5use polars_error::{PolarsResult, polars_bail, polars_err};6use polars_utils::aliases::{InitHashMaps, PlHashMap};7use polars_utils::bool::UnsafeBool;89use super::super::{ARROW_MAGIC_V1, ARROW_MAGIC_V2, CONTINUATION_MARKER};10use super::common::*;11use super::schema::fb_to_schema;12use super::{Dictionaries, OutOfSpecKind, SendableIterator};13use crate::array::Array;14use crate::datatypes::{ArrowSchemaRef, Metadata};15use crate::io::ipc::IpcSchema;16use crate::record_batch::RecordBatchT;1718/// Metadata of an Arrow IPC file, written in the footer of the file.19#[derive(Debug, Clone)]20pub struct FileMetadata {21/// The schema that is read from the file footer22pub schema: ArrowSchemaRef,2324/// The custom metadata that is read from the schema25pub custom_schema_metadata: Option<Arc<Metadata>>,2627/// The files' [`IpcSchema`]28pub ipc_schema: IpcSchema,2930/// The blocks in the file31///32/// A block indicates the regions in the file to read to get data33pub blocks: Vec<arrow_format::ipc::Block>,3435/// Dictionaries associated to each dict_id36pub dictionaries: Option<Vec<arrow_format::ipc::Block>>,3738/// The total size of the file in bytes39pub size: u64,40}4142/// Read the row count by summing the length of the of the record batches43pub fn get_row_count<R: Read + Seek>(reader: &mut R) -> PolarsResult<i64> {44let (_, footer_len) = read_footer_len(reader)?;45let footer = read_footer(reader, footer_len)?;46let (_, blocks) = deserialize_footer_blocks(&footer)?;4748get_row_count_from_blocks(reader, &blocks)49}5051/// Read the row count by summing the length of the of the record batches in blocks52pub fn get_row_count_from_blocks<R: Read + Seek>(53reader: &mut R,54blocks: &[arrow_format::ipc::Block],55) -> PolarsResult<i64> {56let mut message_scratch: Vec<u8> = Default::default();5758blocks59.iter()60.map(|block| {61let message = get_message_from_block(reader, block, &mut message_scratch)?;62let record_batch = get_record_batch(message)?;63record_batch.length().map_err(|e| e.into())64})65.sum()66}6768pub(crate) fn get_dictionary_batch<'a>(69message: &'a arrow_format::ipc::MessageRef,70) -> PolarsResult<arrow_format::ipc::DictionaryBatchRef<'a>> {71let header = message72.header()73.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?74.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;75match header {76arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => Ok(batch),77_ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),78}79}8081#[allow(clippy::too_many_arguments)]82pub fn read_dictionary_block<R: Read + Seek>(83reader: &mut R,84metadata: &FileMetadata,85block: &arrow_format::ipc::Block,86// When true, the underlying reader bytestream represents a standalone IPC Block87// rather than a complete IPC File.88force_zero_offset: bool,89dictionaries: &mut Dictionaries,90message_scratch: &mut Vec<u8>,91dictionary_scratch: &mut Vec<u8>,92checked: UnsafeBool,93) -> PolarsResult<()> {94let offset: u64 = if force_zero_offset {95096} else {97block98.offset99.try_into()100.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?101};102103let length: u64 = block104.meta_data_length105.try_into()106.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;107108let message = get_message_from_block_offset(reader, offset, message_scratch)?;109let batch = get_dictionary_batch(&message)?;110111read_dictionary(112batch,113&metadata.schema,114&metadata.ipc_schema,115dictionaries,116reader,117offset + length,118dictionary_scratch,119checked,120)121}122123/// Reads all file's dictionaries, if any124/// This function is IO-bounded125pub fn read_file_dictionaries<R: Read + Seek>(126reader: &mut R,127metadata: &FileMetadata,128scratch: &mut Vec<u8>,129checked: UnsafeBool,130) -> PolarsResult<Dictionaries> {131let mut dictionaries = Default::default();132133let blocks = if let Some(blocks) = &metadata.dictionaries {134blocks135} else {136return Ok(PlHashMap::new());137};138// use a temporary smaller scratch for the messages139let mut message_scratch = Default::default();140141for block in blocks {142read_dictionary_block(143reader,144metadata,145block,146false,147&mut dictionaries,148&mut message_scratch,149scratch,150checked,151)?;152}153Ok(dictionaries)154}155156pub(super) fn decode_footer_len(footer: [u8; 10], end: u64) -> PolarsResult<(u64, usize)> {157let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());158159if footer[4..] != ARROW_MAGIC_V2 {160if footer[..4] == ARROW_MAGIC_V1 {161polars_bail!(ComputeError: "feather v1 not supported");162}163return Err(polars_err!(oos = OutOfSpecKind::InvalidFooter));164}165let footer_len = footer_len166.try_into()167.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;168169Ok((end, footer_len))170}171172/// Reads the footer's length and magic number in footer173fn read_footer_len<R: Read + Seek>(reader: &mut R) -> PolarsResult<(u64, usize)> {174// read footer length and magic number in footer175let end = reader.seek(SeekFrom::End(-10))? + 10;176177let mut footer: [u8; 10] = [0; 10];178179reader.read_exact(&mut footer)?;180decode_footer_len(footer, end)181}182183fn read_footer<R: Read + Seek>(reader: &mut R, footer_len: usize) -> PolarsResult<Vec<u8>> {184// read footer185reader.seek(SeekFrom::End(-10 - footer_len as i64))?;186187let mut serialized_footer = vec![];188serialized_footer.try_reserve(footer_len)?;189reader190.by_ref()191.take(footer_len as u64)192.read_to_end(&mut serialized_footer)?;193Ok(serialized_footer)194}195196fn deserialize_footer_blocks(197footer_data: &[u8],198) -> PolarsResult<(FooterRef<'_>, Vec<arrow_format::ipc::Block>)> {199let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data)200.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))?;201202let blocks = footer203.record_batches()204.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?205.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;206207let blocks = blocks208.iter()209.map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref)))210.collect::<PolarsResult<Vec<_>>>()?;211Ok((footer, blocks))212}213214pub(super) fn deserialize_footer_ref(footer_data: &[u8]) -> PolarsResult<FooterRef<'_>> {215arrow_format::ipc::FooterRef::read_as_root(footer_data)216.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))217}218219pub(super) fn deserialize_schema_ref_from_footer(220footer: arrow_format::ipc::FooterRef<'_>,221) -> PolarsResult<arrow_format::ipc::SchemaRef<'_>> {222footer223.schema()224.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferSchema(err)))?225.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingSchema))226}227228/// Get the IPC blocks from the footer containing record batches229pub(super) fn iter_recordbatch_blocks_from_footer(230footer: arrow_format::ipc::FooterRef<'_>,231) -> PolarsResult<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_> {232let blocks = footer233.record_batches()234.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?235.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;236237Ok(blocks238.into_iter()239.map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref))))240}241242pub(super) fn iter_dictionary_blocks_from_footer(243footer: arrow_format::ipc::FooterRef<'_>,244) -> PolarsResult<Option<impl SendableIterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_>>245{246let dictionaries = footer247.dictionaries()248.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferDictionaries(err)))?;249250Ok(dictionaries.map(|dicts| {251dicts252.into_iter()253.map(|blockref| Ok(<arrow_format::ipc::Block>::from(blockref)))254}))255}256257pub fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult<FileMetadata> {258let footer = deserialize_footer_ref(footer_data)?;259let blocks = iter_recordbatch_blocks_from_footer(footer)?.collect::<PolarsResult<Vec<_>>>()?;260let dictionaries = iter_dictionary_blocks_from_footer(footer)?261.map(|dicts| dicts.collect::<PolarsResult<Vec<_>>>())262.transpose()?;263let ipc_schema = deserialize_schema_ref_from_footer(footer)?;264let (schema, ipc_schema, custom_schema_metadata) = fb_to_schema(ipc_schema)?;265266Ok(FileMetadata {267schema: Arc::new(schema),268ipc_schema,269blocks,270dictionaries,271size,272custom_schema_metadata: custom_schema_metadata.map(Arc::new),273})274}275276/// Read the Arrow IPC file's metadata277pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> PolarsResult<FileMetadata> {278let start = reader.stream_position()?;279let (end, footer_len) = read_footer_len(reader)?;280let serialized_footer = read_footer(reader, footer_len)?;281deserialize_footer(&serialized_footer, end - start)282}283284pub(crate) fn get_record_batch(285message: arrow_format::ipc::MessageRef,286) -> PolarsResult<arrow_format::ipc::RecordBatchRef> {287let header = message288.header()289.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?290.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;291match header {292arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => Ok(batch),293_ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),294}295}296297pub fn get_message_from_block_offset<'a, R: Read + Seek>(298reader: &mut R,299offset: u64,300message_scratch: &'a mut Vec<u8>,301) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {302reader.seek(SeekFrom::Start(offset))?;303let mut meta_buf = [0; 4];304reader.read_exact(&mut meta_buf)?;305if meta_buf == CONTINUATION_MARKER {306// continuation marker encountered, read message next307reader.read_exact(&mut meta_buf)?;308}309310let meta_len = i32::from_le_bytes(meta_buf)311.try_into()312.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;313314message_scratch.clear();315message_scratch.try_reserve(meta_len)?;316reader317.by_ref()318.take(meta_len as u64)319.read_to_end(message_scratch)?;320321arrow_format::ipc::MessageRef::read_as_root(message_scratch)322.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))323}324325pub(super) fn get_message_from_block<'a, R: Read + Seek>(326reader: &mut R,327block: &arrow_format::ipc::Block,328message_scratch: &'a mut Vec<u8>,329) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {330let offset: u64 = block331.offset332.try_into()333.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;334335get_message_from_block_offset(reader, offset, message_scratch)336}337338/// Reads the record batch at position `index` from the reader.339///340/// This function is useful for random access to the file. For example, if341/// you have indexed the file somewhere else, this allows pruning342/// certain parts of the file.343/// # Panics344/// This function panics iff `index >= metadata.blocks.len()`345#[allow(clippy::too_many_arguments)]346pub fn read_batch<R: Read + Seek>(347reader: &mut R,348dictionaries: &Dictionaries,349metadata: &FileMetadata,350projection: Option<&[usize]>,351limit: Option<usize>,352index: usize,353// When true, the reader object is handled as an IPC Block.354force_zero_offset: bool,355message_scratch: &mut Vec<u8>,356data_scratch: &mut Vec<u8>,357checked: UnsafeBool,358) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {359let block = metadata.blocks[index];360361let offset: u64 = if force_zero_offset {3620363} else {364block365.offset366.try_into()367.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?368};369370let length: u64 = block371.meta_data_length372.try_into()373.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;374375let message = get_message_from_block_offset(reader, offset, message_scratch)?;376let batch = get_record_batch(message)?;377378read_record_batch(379batch,380&metadata.schema,381&metadata.ipc_schema,382projection,383limit,384dictionaries,385message386.version()387.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferVersion(err)))?,388reader,389offset + length,390data_scratch,391checked,392)393}394395396