Path: blob/main/crates/polars-arrow/src/io/ipc/read/reader.rs
8421 views
use std::io::{Read, Seek};12use arrow_format::ipc::KeyValueRef;3use polars_error::{PolarsResult, polars_err};4use polars_utils::bool::UnsafeBool;56use super::common::*;7use super::file::{get_message_from_block, get_message_from_block_offset, get_record_batch};8use super::{Dictionaries, FileMetadata, read_batch, read_file_dictionaries};9use crate::array::Array;10use crate::datatypes::ArrowSchema;11use crate::record_batch::RecordBatchT;1213/// An iterator of [`RecordBatchT`]s from an Arrow IPC file.14pub struct FileReader<R: Read + Seek> {15reader: R,16metadata: FileMetadata,17// the dictionaries are going to be read18dictionaries: Option<Dictionaries>,19current_block: usize,20projection: Option<ProjectionInfo>,21remaining: usize,22data_scratch: Vec<u8>,23message_scratch: Vec<u8>,24checked: UnsafeBool,25}2627impl<R: Read + Seek> FileReader<R> {28/// Creates a new [`FileReader`]. Use `projection` to only take certain columns.29/// # Panic30/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)31pub fn new(32reader: R,33metadata: FileMetadata,34projection: Option<Vec<usize>>,35limit: Option<usize>,36) -> Self {37let projection =38projection.map(|projection| prepare_projection(&metadata.schema, projection));39Self {40reader,41metadata,42dictionaries: Default::default(),43projection,44remaining: limit.unwrap_or(usize::MAX),45current_block: 0,46data_scratch: Default::default(),47message_scratch: Default::default(),48checked: Default::default(),49}50}5152/// # Safety53/// Don't do expensive checks.54/// This means the data source has to be trusted to be correct.55pub unsafe fn unchecked(mut self) -> Self {56unsafe {57self.checked = UnsafeBool::new_false();58}59self60}6162/// Creates a new [`FileReader`]. Use `projection` to only take certain columns.63/// # Panic64/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)65pub fn new_with_projection_info(66reader: R,67metadata: FileMetadata,68projection: Option<ProjectionInfo>,69limit: Option<usize>,70) -> Self {71Self {72reader,73metadata,74dictionaries: Default::default(),75projection,76remaining: limit.unwrap_or(usize::MAX),77current_block: 0,78data_scratch: Default::default(),79message_scratch: Default::default(),80checked: Default::default(),81}82}8384/// Return the schema of the file85pub fn schema(&self) -> &ArrowSchema {86self.projection87.as_ref()88.map(|x| &x.schema)89.unwrap_or(&self.metadata.schema)90}9192/// Returns the [`FileMetadata`]93pub fn metadata(&self) -> &FileMetadata {94&self.metadata95}9697/// Consumes this FileReader, returning the underlying reader98pub fn into_inner(self) -> R {99self.reader100}101102pub fn set_current_block(&mut self, idx: usize) {103self.current_block = idx;104}105106pub fn get_current_block(&self) -> usize {107self.current_block108}109110/// Get the inner memory scratches so they can be reused in a new writer.111/// This can be utilized to save memory allocations for performance reasons.112pub fn take_projection_info(&mut self) -> Option<ProjectionInfo> {113std::mem::take(&mut self.projection)114}115116/// Get the inner memory scratches so they can be reused in a new writer.117/// This can be utilized to save memory allocations for performance reasons.118pub fn take_scratches(&mut self) -> (Vec<u8>, Vec<u8>) {119(120std::mem::take(&mut self.data_scratch),121std::mem::take(&mut self.message_scratch),122)123}124125/// Set the inner memory scratches so they can be reused in a new writer.126/// This can be utilized to save memory allocations for performance reasons.127pub fn set_scratches(&mut self, scratches: (Vec<u8>, Vec<u8>)) {128(self.data_scratch, self.message_scratch) = scratches;129}130131pub fn read_dictionaries(&mut self) -> PolarsResult<()> {132if self.dictionaries.is_none() {133self.dictionaries = Some(read_file_dictionaries(134&mut self.reader,135&self.metadata,136&mut self.data_scratch,137self.checked,138)?);139};140Ok(())141}142143/// Skip over blocks until we have seen at most `offset` rows, returning how many rows we are144/// still too see.145///146/// This will never go over the `offset`. Meaning that if the `offset < current_block.len()`,147/// the block will not be skipped.148pub fn skip_blocks_till_limit(&mut self, offset: u64) -> PolarsResult<u64> {149let mut remaining_offset = offset;150151for (i, block) in self.metadata.blocks.iter().enumerate() {152let message =153get_message_from_block(&mut self.reader, block, &mut self.message_scratch)?;154let record_batch = get_record_batch(message)?;155156let length = record_batch.length()?;157let length = length as u64;158159if length > remaining_offset {160self.current_block = i;161return Ok(remaining_offset);162}163164remaining_offset -= length;165}166167self.current_block = self.metadata.blocks.len();168Ok(remaining_offset)169}170171pub fn next_record_batch(172&mut self,173) -> Option<PolarsResult<arrow_format::ipc::RecordBatchRef<'_>>> {174let block = self.metadata.blocks.get(self.current_block)?;175self.current_block += 1;176let message = get_message_from_block(&mut self.reader, block, &mut self.message_scratch);177Some(message.and_then(|m| get_record_batch(m)))178}179}180181impl<R: Read + Seek> Iterator for FileReader<R> {182type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;183184fn next(&mut self) -> Option<Self::Item> {185// get current block186if self.current_block == self.metadata.blocks.len() {187return None;188}189190match self.read_dictionaries() {191Ok(_) => {},192Err(e) => return Some(Err(e)),193};194195let block = self.current_block;196self.current_block += 1;197198let chunk = read_batch(199&mut self.reader,200self.dictionaries.as_ref().unwrap(),201&self.metadata,202self.projection.as_ref().map(|x| x.columns.as_ref()),203Some(self.remaining),204block,205false,206&mut self.message_scratch,207&mut self.data_scratch,208self.checked,209);210self.remaining -= chunk.as_ref().map(|x| x.len()).unwrap_or_default();211212let chunk = if let Some(ProjectionInfo { map, .. }) = &self.projection {213// re-order according to projection214chunk.map(|chunk| apply_projection(chunk, map))215} else {216chunk217};218Some(chunk)219}220}221222/// A reader that has access to exactly one standalone IPC Block of an Arrow IPC file.223/// The block contains either a `RecordBatch` or a `DictionaryBatch`.224/// The `dictionaries` field must be initialized prior to decoding a `RecordBatch`.225pub struct BlockReader<R: Read + Seek> {226pub reader: R,227}228229impl<R: Read + Seek> BlockReader<R> {230pub fn new(reader: R) -> Self {231Self { reader }232}233234/// Reads the record batch header and returns its length (i.e., number of rows).235pub fn record_batch_num_rows(&mut self, message_scratch: &mut Vec<u8>) -> PolarsResult<usize> {236let offset: u64 = 0;237238let message = get_message_from_block_offset(&mut self.reader, offset, message_scratch)?;239let batch = get_record_batch(message)?;240let out = batch.length().map(|l| usize::try_from(l).unwrap())?;241Ok(out)242}243244/// Reads the record batch header and returns the custom_metadata.245pub fn record_batch_custom_metadata<'a>(246&mut self,247message_scratch: &'a mut Vec<u8>,248) -> PolarsResult<Option<Vec<KeyValueRef<'a>>>> {249let offset: u64 = 0;250let message = get_message_from_block_offset(&mut self.reader, offset, message_scratch)?;251let custom_metadata = message.custom_metadata()?;252253custom_metadata254.map(|kv_results| {255kv_results256.into_iter()257.map(|res| {258res.map_err(|e| {259polars_err!(260ComputeError:261"failed to get KeyValue from IPC custom metadata: {}",262e263)264})265})266.collect::<Result<Vec<KeyValueRef>, _>>()267})268.transpose()269}270}271272273