Path: blob/main/crates/polars-arrow/src/io/ipc/read/stream.rs
6940 views
use std::io::Read;12use arrow_format::ipc::planus::ReadAsRoot;3use polars_error::{PolarsError, PolarsResult, polars_bail, polars_err};45use super::super::CONTINUATION_MARKER;6use super::common::*;7use super::schema::deserialize_stream_metadata;8use super::{Dictionaries, OutOfSpecKind};9use crate::array::Array;10use crate::datatypes::{ArrowSchema, Metadata};11use crate::io::ipc::IpcSchema;12use crate::record_batch::RecordBatchT;1314/// Metadata of an Arrow IPC stream, written at the start of the stream15#[derive(Debug, Clone)]16pub struct StreamMetadata {17/// The schema that is read from the stream's first message18pub schema: ArrowSchema,1920/// The custom metadata that is read from the schema21pub custom_schema_metadata: Option<Metadata>,2223/// The IPC version of the stream24pub version: arrow_format::ipc::MetadataVersion,2526/// The IPC fields tracking dictionaries27pub ipc_schema: IpcSchema,28}2930/// Reads the metadata of the stream31pub fn read_stream_metadata(reader: &mut dyn std::io::Read) -> PolarsResult<StreamMetadata> {32// determine metadata length33let mut meta_size: [u8; 4] = [0; 4];34reader.read_exact(&mut meta_size)?;35let meta_length = {36// If a continuation marker is encountered, skip over it and read37// the size from the next four bytes.38if meta_size == CONTINUATION_MARKER {39reader.read_exact(&mut meta_size)?;40}41i32::from_le_bytes(meta_size)42};4344let length: usize = meta_length45.try_into()46.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;4748let mut buffer = vec![];49buffer.try_reserve(length)?;50reader.take(length as u64).read_to_end(&mut buffer)?;5152deserialize_stream_metadata(&buffer)53}5455/// Encodes the stream's status after each read.56///57/// A stream is an iterator, and an iterator returns `Option<Item>`. The `Item`58/// type in the [`StreamReader`] case is `StreamState`, which means that an Arrow59/// stream may yield one of three values: (1) `None`, which signals that the stream60/// is done; (2) [`StreamState::Some`], which signals that there was61/// data waiting in the stream and we read it; and finally (3)62/// [`Some(StreamState::Waiting)`], which means that the stream is still "live", it63/// just doesn't hold any data right now.64pub enum StreamState {65/// A live stream without data66Waiting,67/// Next item in the stream68Some(RecordBatchT<Box<dyn Array>>),69}7071impl StreamState {72/// Return the data inside this wrapper.73///74/// # Panics75///76/// If the `StreamState` was `Waiting`.77pub fn unwrap(self) -> RecordBatchT<Box<dyn Array>> {78if let StreamState::Some(batch) = self {79batch80} else {81panic!("The batch is not available")82}83}84}8586/// Reads the next item, yielding `None` if the stream is done,87/// and a [`StreamState`] otherwise.88fn read_next<R: Read>(89reader: &mut R,90metadata: &StreamMetadata,91dictionaries: &mut Dictionaries,92message_buffer: &mut Vec<u8>,93data_buffer: &mut Vec<u8>,94projection: &Option<ProjectionInfo>,95scratch: &mut Vec<u8>,96) -> PolarsResult<Option<StreamState>> {97// determine metadata length98let mut meta_length: [u8; 4] = [0; 4];99100match reader.read_exact(&mut meta_length) {101Ok(()) => (),102Err(e) => {103return if e.kind() == std::io::ErrorKind::UnexpectedEof {104// Handle EOF without the "0xFFFFFFFF 0x00000000"105// valid according to:106// https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format107Ok(Some(StreamState::Waiting))108} else {109Err(PolarsError::from(e))110};111},112}113114let meta_length = {115// If a continuation marker is encountered, skip over it and read116// the size from the next four bytes.117if meta_length == CONTINUATION_MARKER {118reader.read_exact(&mut meta_length)?;119}120i32::from_le_bytes(meta_length)121};122123let meta_length: usize = meta_length124.try_into()125.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;126127if meta_length == 0 {128// the stream has ended, mark the reader as finished129return Ok(None);130}131132message_buffer.clear();133message_buffer.try_reserve(meta_length)?;134reader135.by_ref()136.take(meta_length as u64)137.read_to_end(message_buffer)?;138139let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer.as_ref())140.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))?;141142let header = message143.header()144.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?145.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;146147let block_length: usize = message148.body_length()149.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferBodyLength(err)))?150.try_into()151.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;152153match header {154arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => {155data_buffer.clear();156data_buffer.try_reserve(block_length)?;157reader158.by_ref()159.take(block_length as u64)160.read_to_end(data_buffer)?;161162let file_size = data_buffer.len() as u64;163164let mut reader = std::io::Cursor::new(data_buffer);165166let chunk = read_record_batch(167batch,168&metadata.schema,169&metadata.ipc_schema,170projection.as_ref().map(|x| x.columns.as_ref()),171None,172dictionaries,173metadata.version,174&mut reader,1750,176file_size,177scratch,178);179180if let Some(ProjectionInfo { map, .. }) = projection {181// re-order according to projection182chunk183.map(|chunk| apply_projection(chunk, map))184.map(|x| Some(StreamState::Some(x)))185} else {186chunk.map(|x| Some(StreamState::Some(x)))187}188},189arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {190data_buffer.clear();191data_buffer.try_reserve(block_length)?;192reader193.by_ref()194.take(block_length as u64)195.read_to_end(data_buffer)?;196197let file_size = data_buffer.len() as u64;198let mut dict_reader = std::io::Cursor::new(&data_buffer);199200read_dictionary(201batch,202&metadata.schema,203&metadata.ipc_schema,204dictionaries,205&mut dict_reader,2060,207file_size,208scratch,209)?;210211// read the next message until we encounter a RecordBatch message212read_next(213reader,214metadata,215dictionaries,216message_buffer,217data_buffer,218projection,219scratch,220)221},222_ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),223}224}225226/// Arrow Stream reader.227///228/// An [`Iterator`] over an Arrow stream that yields a result of [`StreamState`]s.229/// This is the recommended way to read an arrow stream (by iterating over its data).230///231/// For a more thorough walkthrough consult [this example](https://github.com/jorgecarleitao/polars_arrow/tree/main/examples/ipc_pyarrow).232pub struct StreamReader<R: Read> {233reader: R,234metadata: StreamMetadata,235dictionaries: Dictionaries,236finished: bool,237data_buffer: Vec<u8>,238message_buffer: Vec<u8>,239projection: Option<ProjectionInfo>,240scratch: Vec<u8>,241}242243impl<R: Read> StreamReader<R> {244/// Try to create a new stream reader245///246/// The first message in the stream is the schema, the reader will fail if it does not247/// encounter a schema.248/// To check if the reader is done, use `is_finished(self)`249pub fn new(reader: R, metadata: StreamMetadata, projection: Option<Vec<usize>>) -> Self {250let projection =251projection.map(|projection| prepare_projection(&metadata.schema, projection));252253Self {254reader,255metadata,256dictionaries: Default::default(),257finished: false,258data_buffer: Default::default(),259message_buffer: Default::default(),260projection,261scratch: Default::default(),262}263}264265/// Return the schema of the stream266pub fn metadata(&self) -> &StreamMetadata {267&self.metadata268}269270/// Return the schema of the file271pub fn schema(&self) -> &ArrowSchema {272self.projection273.as_ref()274.map(|x| &x.schema)275.unwrap_or(&self.metadata.schema)276}277278/// Check if the stream is finished279pub fn is_finished(&self) -> bool {280self.finished281}282283fn maybe_next(&mut self) -> PolarsResult<Option<StreamState>> {284if self.finished {285return Ok(None);286}287let batch = read_next(288&mut self.reader,289&self.metadata,290&mut self.dictionaries,291&mut self.message_buffer,292&mut self.data_buffer,293&self.projection,294&mut self.scratch,295)?;296if batch.is_none() {297self.finished = true;298}299Ok(batch)300}301}302303impl<R: Read> Iterator for StreamReader<R> {304type Item = PolarsResult<StreamState>;305306fn next(&mut self) -> Option<Self::Item> {307self.maybe_next().transpose()308}309}310311312