Path: blob/main/crates/polars-arrow/src/io/ipc/write/stream.rs
6940 views
//! Arrow IPC File and Stream Writers1//!2//! The `FileWriter` and `StreamWriter` have similar interfaces,3//! however the `FileWriter` expects a reader that supports `Seek`ing45use std::io::Write;6use std::sync::Arc;78use polars_error::{PolarsError, PolarsResult};910use super::super::IpcField;11use super::common::{DictionaryTracker, EncodedData, WriteOptions, encode_chunk};12use super::common_sync::{write_continuation, write_message};13use super::{default_ipc_fields, schema_to_bytes};14use crate::array::Array;15use crate::datatypes::*;16use crate::record_batch::RecordBatchT;1718/// Arrow stream writer19///20/// The data written by this writer must be read in order. To signal that no more21/// data is arriving through the stream call [`self.finish()`](StreamWriter::finish);22///23/// For a usage walkthrough consult [this example](https://github.com/jorgecarleitao/polars_arrow/tree/main/examples/ipc_pyarrow).24pub struct StreamWriter<W: Write> {25/// The object to write to26writer: W,27/// IPC write options28write_options: WriteOptions,29/// Whether the stream has been finished30finished: bool,31/// Keeps track of dictionaries that have been written32dictionary_tracker: DictionaryTracker,33/// Custom schema-level metadata34custom_schema_metadata: Option<Arc<Metadata>>,3536ipc_fields: Option<Vec<IpcField>>,37}3839impl<W: Write> StreamWriter<W> {40/// Creates a new [`StreamWriter`]41pub fn new(writer: W, write_options: WriteOptions) -> Self {42Self {43writer,44write_options,45finished: false,46dictionary_tracker: DictionaryTracker {47dictionaries: Default::default(),48cannot_replace: false,49},50ipc_fields: None,51custom_schema_metadata: None,52}53}5455/// Sets custom schema metadata. Must be called before `start` is called56pub fn set_custom_schema_metadata(&mut self, custom_metadata: Arc<Metadata>) {57self.custom_schema_metadata = Some(custom_metadata);58}5960/// Starts the stream by writing a Schema message to it.61/// Use `ipc_fields` to declare dictionary ids in the schema, for dictionary-reuse62pub fn start(63&mut self,64schema: &ArrowSchema,65ipc_fields: Option<Vec<IpcField>>,66) -> PolarsResult<()> {67self.ipc_fields = Some(if let Some(ipc_fields) = ipc_fields {68ipc_fields69} else {70default_ipc_fields(schema.iter_values())71});7273let encoded_message = EncodedData {74ipc_message: schema_to_bytes(75schema,76self.ipc_fields.as_ref().unwrap(),77self.custom_schema_metadata.as_deref(),78),79arrow_data: vec![],80};81write_message(&mut self.writer, &encoded_message)?;82Ok(())83}8485/// Writes [`RecordBatchT`] to the stream86pub fn write(87&mut self,88columns: &RecordBatchT<Box<dyn Array>>,89ipc_fields: Option<&[IpcField]>,90) -> PolarsResult<()> {91if self.finished {92let io_err = std::io::Error::new(93std::io::ErrorKind::UnexpectedEof,94"Cannot write to a finished stream".to_string(),95);96return Err(PolarsError::from(io_err));97}9899// we can't make it a closure because it borrows (and it can't borrow mut and non-mut below)100#[allow(clippy::or_fun_call)]101let fields = ipc_fields.unwrap_or(self.ipc_fields.as_ref().unwrap());102103let (encoded_dictionaries, encoded_message) = encode_chunk(104columns,105fields,106&mut self.dictionary_tracker,107&self.write_options,108)?;109110for encoded_dictionary in encoded_dictionaries {111write_message(&mut self.writer, &encoded_dictionary)?;112}113114write_message(&mut self.writer, &encoded_message)?;115Ok(())116}117118/// Write continuation bytes, and mark the stream as done119pub fn finish(&mut self) -> PolarsResult<()> {120write_continuation(&mut self.writer, 0)?;121122self.finished = true;123124Ok(())125}126127/// Consumes itself, returning the inner writer.128pub fn into_inner(self) -> W {129self.writer130}131}132133134