Path: blob/main/crates/polars-arrow/src/io/ipc/read/reader.rs
6940 views
use std::io::{Read, Seek};12use polars_error::PolarsResult;34use super::common::*;5use super::file::{get_message_from_block, get_record_batch};6use super::{Dictionaries, FileMetadata, read_batch, read_file_dictionaries};7use crate::array::Array;8use crate::datatypes::ArrowSchema;9use crate::record_batch::RecordBatchT;1011/// An iterator of [`RecordBatchT`]s from an Arrow IPC file.12pub struct FileReader<R: Read + Seek> {13reader: R,14metadata: FileMetadata,15// the dictionaries are going to be read16dictionaries: Option<Dictionaries>,17current_block: usize,18projection: Option<ProjectionInfo>,19remaining: usize,20data_scratch: Vec<u8>,21message_scratch: Vec<u8>,22}2324impl<R: Read + Seek> FileReader<R> {25/// Creates a new [`FileReader`]. Use `projection` to only take certain columns.26/// # Panic27/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)28pub fn new(29reader: R,30metadata: FileMetadata,31projection: Option<Vec<usize>>,32limit: Option<usize>,33) -> Self {34let projection =35projection.map(|projection| prepare_projection(&metadata.schema, projection));36Self {37reader,38metadata,39dictionaries: Default::default(),40projection,41remaining: limit.unwrap_or(usize::MAX),42current_block: 0,43data_scratch: Default::default(),44message_scratch: Default::default(),45}46}4748/// Creates a new [`FileReader`]. Use `projection` to only take certain columns.49/// # Panic50/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)51pub fn new_with_projection_info(52reader: R,53metadata: FileMetadata,54projection: Option<ProjectionInfo>,55limit: Option<usize>,56) -> Self {57Self {58reader,59metadata,60dictionaries: Default::default(),61projection,62remaining: limit.unwrap_or(usize::MAX),63current_block: 0,64data_scratch: Default::default(),65message_scratch: Default::default(),66}67}6869/// Return the schema of the file70pub fn schema(&self) -> &ArrowSchema {71self.projection72.as_ref()73.map(|x| &x.schema)74.unwrap_or(&self.metadata.schema)75}7677/// Returns the [`FileMetadata`]78pub fn metadata(&self) -> &FileMetadata {79&self.metadata80}8182/// Consumes this FileReader, returning the underlying reader83pub fn into_inner(self) -> R {84self.reader85}8687pub fn set_current_block(&mut self, idx: usize) {88self.current_block = idx;89}9091pub fn get_current_block(&self) -> usize {92self.current_block93}9495/// Get the inner memory scratches so they can be reused in a new writer.96/// This can be utilized to save memory allocations for performance reasons.97pub fn take_projection_info(&mut self) -> Option<ProjectionInfo> {98std::mem::take(&mut self.projection)99}100101/// Get the inner memory scratches so they can be reused in a new writer.102/// This can be utilized to save memory allocations for performance reasons.103pub fn take_scratches(&mut self) -> (Vec<u8>, Vec<u8>) {104(105std::mem::take(&mut self.data_scratch),106std::mem::take(&mut self.message_scratch),107)108}109110/// Set the inner memory scratches so they can be reused in a new writer.111/// This can be utilized to save memory allocations for performance reasons.112pub fn set_scratches(&mut self, scratches: (Vec<u8>, Vec<u8>)) {113(self.data_scratch, self.message_scratch) = scratches;114}115116fn read_dictionaries(&mut self) -> PolarsResult<()> {117if self.dictionaries.is_none() {118self.dictionaries = Some(read_file_dictionaries(119&mut self.reader,120&self.metadata,121&mut self.data_scratch,122)?);123};124Ok(())125}126127/// Skip over blocks until we have seen at most `offset` rows, returning how many rows we are128/// still too see.129///130/// This will never go over the `offset`. Meaning that if the `offset < current_block.len()`,131/// the block will not be skipped.132pub fn skip_blocks_till_limit(&mut self, offset: u64) -> PolarsResult<u64> {133let mut remaining_offset = offset;134135for (i, block) in self.metadata.blocks.iter().enumerate() {136let message =137get_message_from_block(&mut self.reader, block, &mut self.message_scratch)?;138let record_batch = get_record_batch(message)?;139140let length = record_batch.length()?;141let length = length as u64;142143if length > remaining_offset {144self.current_block = i;145return Ok(remaining_offset);146}147148remaining_offset -= length;149}150151self.current_block = self.metadata.blocks.len();152Ok(remaining_offset)153}154155pub fn next_record_batch(156&mut self,157) -> Option<PolarsResult<arrow_format::ipc::RecordBatchRef<'_>>> {158let block = self.metadata.blocks.get(self.current_block)?;159self.current_block += 1;160let message = get_message_from_block(&mut self.reader, block, &mut self.message_scratch);161Some(message.and_then(|m| get_record_batch(m)))162}163}164165impl<R: Read + Seek> Iterator for FileReader<R> {166type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;167168fn next(&mut self) -> Option<Self::Item> {169// get current block170if self.current_block == self.metadata.blocks.len() {171return None;172}173174match self.read_dictionaries() {175Ok(_) => {},176Err(e) => return Some(Err(e)),177};178179let block = self.current_block;180self.current_block += 1;181182let chunk = read_batch(183&mut self.reader,184self.dictionaries.as_ref().unwrap(),185&self.metadata,186self.projection.as_ref().map(|x| x.columns.as_ref()),187Some(self.remaining),188block,189&mut self.message_scratch,190&mut self.data_scratch,191);192self.remaining -= chunk.as_ref().map(|x| x.len()).unwrap_or_default();193194let chunk = if let Some(ProjectionInfo { map, .. }) = &self.projection {195// re-order according to projection196chunk.map(|chunk| apply_projection(chunk, map))197} else {198chunk199};200Some(chunk)201}202}203204205