Path: blob/main/crates/polars-arrow/src/io/ipc/read/stream.rs
8431 views
use std::io::{Read, Seek};12use arrow_format::ipc::planus::ReadAsRoot;3use polars_error::{PolarsError, PolarsResult, polars_bail, polars_err};4use polars_utils::bool::UnsafeBool;56use super::super::CONTINUATION_MARKER;7use super::common::*;8use super::schema::deserialize_stream_metadata;9use super::{Dictionaries, OutOfSpecKind};10use crate::array::Array;11use crate::datatypes::{ArrowSchema, Metadata};12use crate::io::ipc::IpcSchema;13use crate::record_batch::RecordBatchT;1415/// Metadata of an Arrow IPC stream, written at the start of the stream16#[derive(Debug, Clone)]17pub struct StreamMetadata {18/// The schema that is read from the stream's first message19pub schema: ArrowSchema,2021/// The custom metadata that is read from the schema22pub custom_schema_metadata: Option<Metadata>,2324/// The IPC version of the stream25pub version: arrow_format::ipc::MetadataVersion,2627/// The IPC fields tracking dictionaries28pub ipc_schema: IpcSchema,29}3031/// Reads the metadata of the stream32pub fn read_stream_metadata(reader: &mut dyn std::io::Read) -> PolarsResult<StreamMetadata> {33// determine metadata length34let mut meta_size: [u8; 4] = [0; 4];35reader.read_exact(&mut meta_size)?;36let meta_length = {37// If a continuation marker is encountered, skip over it and read38// the size from the next four bytes.39if meta_size == CONTINUATION_MARKER {40reader.read_exact(&mut meta_size)?;41}42i32::from_le_bytes(meta_size)43};4445let length: usize = meta_length46.try_into()47.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;4849let mut buffer = vec![];50buffer.try_reserve(length)?;51reader.take(length as u64).read_to_end(&mut buffer)?;5253deserialize_stream_metadata(&buffer)54}5556/// Encodes the stream's status after each read.57///58/// A stream is an iterator, and an iterator returns `Option<Item>`. The `Item`59/// type in the [`StreamReader`] case is `StreamState`, which means that an Arrow60/// stream may yield one of three values: (1) `None`, which signals that the stream61/// is done; (2) [`StreamState::Some`], which signals that there was62/// data waiting in the stream and we read it; and finally (3)63/// [`Some(StreamState::Waiting)`], which means that the stream is still "live", it64/// just doesn't hold any data right now.65pub enum StreamState {66/// A live stream without data67Waiting,68/// Next item in the stream69Some(RecordBatchT<Box<dyn Array>>),70}7172impl StreamState {73/// Return the data inside this wrapper.74///75/// # Panics76///77/// If the `StreamState` was `Waiting`.78pub fn unwrap(self) -> RecordBatchT<Box<dyn Array>> {79if let StreamState::Some(batch) = self {80batch81} else {82panic!("The batch is not available")83}84}85}8687/// Reads the next item, yielding `None` if the stream is done,88/// and a [`StreamState`] otherwise.89fn read_next<R: Read + Seek>(90reader: &mut R,91metadata: &StreamMetadata,92dictionaries: &mut Dictionaries,93message_buffer: &mut Vec<u8>,94projection: &Option<ProjectionInfo>,95scratch: &mut Vec<u8>,96checked: UnsafeBool,97) -> PolarsResult<Option<StreamState>> {98// determine metadata length99let mut meta_length: [u8; 4] = [0; 4];100101match reader.read_exact(&mut meta_length) {102Ok(()) => (),103Err(e) => {104return if e.kind() == std::io::ErrorKind::UnexpectedEof {105// Handle EOF without the "0xFFFFFFFF 0x00000000"106// valid according to:107// https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format108Ok(Some(StreamState::Waiting))109} else {110Err(PolarsError::from(e))111};112},113}114115let meta_length = {116// If a continuation marker is encountered, skip over it and read117// the size from the next four bytes.118if meta_length == CONTINUATION_MARKER {119reader.read_exact(&mut meta_length)?;120}121i32::from_le_bytes(meta_length)122};123124let meta_length: usize = meta_length125.try_into()126.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;127128if meta_length == 0 {129// the stream has ended, mark the reader as finished130return Ok(None);131}132133message_buffer.clear();134message_buffer.try_reserve(meta_length)?;135reader136.by_ref()137.take(meta_length as u64)138.read_to_end(message_buffer)?;139140let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer.as_ref())141.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))?;142143let header = message144.header()145.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?146.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;147148let block_length: usize = message149.body_length()150.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferBodyLength(err)))?151.try_into()152.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;153154match header {155arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => {156let cur_pos = reader.stream_position()?;157158let chunk = read_record_batch(159batch,160&metadata.schema,161&metadata.ipc_schema,162projection.as_ref().map(|x| x.columns.as_ref()),163None,164dictionaries,165metadata.version,166&mut (&mut *reader).take(block_length as u64),1670,168scratch,169checked,170);171172let new_pos = reader.stream_position()?;173let read_size = new_pos - cur_pos;174175reader.seek(std::io::SeekFrom::Current(176block_length as i64 - read_size as i64,177))?;178179if let Some(ProjectionInfo { map, .. }) = projection {180// re-order according to projection181chunk182.map(|chunk| apply_projection(chunk, map))183.map(|x| Some(StreamState::Some(x)))184} else {185chunk.map(|x| Some(StreamState::Some(x)))186}187},188arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {189let cur_pos = reader.stream_position()?;190191read_dictionary(192batch,193&metadata.schema,194&metadata.ipc_schema,195dictionaries,196&mut (&mut *reader).take(block_length as u64),1970,198scratch,199checked,200)?;201202let new_pos = reader.stream_position()?;203let read_size = new_pos - cur_pos;204205reader.seek(std::io::SeekFrom::Current(206block_length as i64 - read_size as i64,207))?;208209// read the next message until we encounter a RecordBatch message210read_next(211reader,212metadata,213dictionaries,214message_buffer,215projection,216scratch,217checked,218)219},220_ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),221}222}223224/// Arrow Stream reader.225///226/// An [`Iterator`] over an Arrow stream that yields a result of [`StreamState`]s.227/// This is the recommended way to read an arrow stream (by iterating over its data).228///229/// For a more thorough walkthrough consult [this example](https://github.com/jorgecarleitao/polars_arrow/tree/main/examples/ipc_pyarrow).230pub struct StreamReader<R: Read> {231reader: R,232metadata: StreamMetadata,233dictionaries: Dictionaries,234finished: bool,235message_buffer: Vec<u8>,236projection: Option<ProjectionInfo>,237scratch: Vec<u8>,238checked: UnsafeBool,239}240241impl<R: Read + Seek> StreamReader<R> {242/// Try to create a new stream reader243///244/// The first message in the stream is the schema, the reader will fail if it does not245/// encounter a schema.246/// To check if the reader is done, use `is_finished(self)`247pub fn new(reader: R, metadata: StreamMetadata, projection: Option<Vec<usize>>) -> Self {248let projection =249projection.map(|projection| prepare_projection(&metadata.schema, projection));250251Self {252reader,253metadata,254dictionaries: Default::default(),255finished: false,256message_buffer: Default::default(),257projection,258scratch: Default::default(),259checked: UnsafeBool::default(),260}261}262263/// # Safety264/// Don't do expensive checks.265/// This means the data source has to be trusted to be correct.266pub unsafe fn unchecked(mut self) -> Self {267unsafe {268self.checked = UnsafeBool::new_false();269}270self271}272273/// Return the schema of the stream274pub fn metadata(&self) -> &StreamMetadata {275&self.metadata276}277278/// Return the schema of the file279pub fn schema(&self) -> &ArrowSchema {280self.projection281.as_ref()282.map(|x| &x.schema)283.unwrap_or(&self.metadata.schema)284}285286/// Check if the stream is finished287pub fn is_finished(&self) -> bool {288self.finished289}290291fn maybe_next(&mut self) -> PolarsResult<Option<StreamState>> {292if self.finished {293return Ok(None);294}295let batch = read_next(296&mut self.reader,297&self.metadata,298&mut self.dictionaries,299&mut self.message_buffer,300&self.projection,301&mut self.scratch,302self.checked,303)?;304if batch.is_none() {305self.finished = true;306}307Ok(batch)308}309}310311impl<R: Read + Seek> Iterator for StreamReader<R> {312type Item = PolarsResult<StreamState>;313314fn next(&mut self) -> Option<Self::Item> {315self.maybe_next().transpose()316}317}318319320