Path: blob/main/crates/polars-arrow/src/io/ipc/write/writer.rs
6940 views
use std::io::Write;1use std::sync::Arc;23use arrow_format::ipc::planus::Builder;4use polars_error::{PolarsResult, polars_bail};56use super::super::{ARROW_MAGIC_V2, IpcField};7use super::common::{DictionaryTracker, EncodedData, WriteOptions};8use super::common_sync::{write_continuation, write_message};9use super::{default_ipc_fields, schema, schema_to_bytes};10use crate::array::Array;11use crate::datatypes::*;12use crate::io::ipc::write::common::encode_chunk_amortized;13use crate::record_batch::RecordBatchT;1415#[derive(Clone, Copy, PartialEq, Eq)]16pub(crate) enum State {17None,18Started,19Finished,20}2122/// Arrow file writer23pub struct FileWriter<W: Write> {24/// The object to write to25pub(crate) writer: W,26/// IPC write options27pub(crate) options: WriteOptions,28/// A reference to the schema, used in validating record batches29pub(crate) schema: ArrowSchemaRef,30pub(crate) ipc_fields: Vec<IpcField>,31/// The number of bytes between each block of bytes, as an offset for random access32pub(crate) block_offsets: usize,33/// Dictionary blocks that will be written as part of the IPC footer34pub(crate) dictionary_blocks: Vec<arrow_format::ipc::Block>,35/// Record blocks that will be written as part of the IPC footer36pub(crate) record_blocks: Vec<arrow_format::ipc::Block>,37/// Whether the writer footer has been written, and the writer is finished38pub(crate) state: State,39/// Keeps track of dictionaries that have been written40pub(crate) dictionary_tracker: DictionaryTracker,41/// Buffer/scratch that is reused between writes42pub(crate) encoded_message: EncodedData,43/// Custom schema-level metadata44pub(crate) custom_schema_metadata: Option<Arc<Metadata>>,45}4647impl<W: Write> FileWriter<W> {48/// Creates a new [`FileWriter`] and writes the header to `writer`49pub fn try_new(50writer: W,51schema: ArrowSchemaRef,52ipc_fields: Option<Vec<IpcField>>,53options: WriteOptions,54) -> PolarsResult<Self> {55let mut slf = Self::new(writer, schema, ipc_fields, options);56slf.start()?;5758Ok(slf)59}6061/// Creates a new [`FileWriter`].62pub fn new(63writer: W,64schema: ArrowSchemaRef,65ipc_fields: Option<Vec<IpcField>>,66options: WriteOptions,67) -> Self {68let ipc_fields = if let Some(ipc_fields) = ipc_fields {69ipc_fields70} else {71default_ipc_fields(schema.iter_values())72};7374Self {75writer,76options,77schema,78ipc_fields,79block_offsets: 0,80dictionary_blocks: vec![],81record_blocks: vec![],82state: State::None,83dictionary_tracker: DictionaryTracker {84dictionaries: Default::default(),85cannot_replace: true,86},87encoded_message: Default::default(),88custom_schema_metadata: None,89}90}9192/// Consumes itself into the inner writer93pub fn into_inner(self) -> W {94self.writer95}9697/// Get the inner memory scratches so they can be reused in a new writer.98/// This can be utilized to save memory allocations for performance reasons.99pub fn get_scratches(&mut self) -> EncodedData {100std::mem::take(&mut self.encoded_message)101}102/// Set the inner memory scratches so they can be reused in a new writer.103/// This can be utilized to save memory allocations for performance reasons.104pub fn set_scratches(&mut self, scratches: EncodedData) {105self.encoded_message = scratches;106}107108/// Writes the header and first (schema) message to the file.109/// # Errors110/// Errors if the file has been started or has finished.111pub fn start(&mut self) -> PolarsResult<()> {112if self.state != State::None {113polars_bail!(oos = "The IPC file can only be started once");114}115// write magic to header116self.writer.write_all(&ARROW_MAGIC_V2[..])?;117// create an 8-byte boundary after the header118self.writer.write_all(&[0, 0])?;119// write the schema, set the written bytes to the schema120121let encoded_message = EncodedData {122ipc_message: schema_to_bytes(123&self.schema,124&self.ipc_fields,125// No need to pass metadata here, as it is already written to the footer in `finish`126None,127),128arrow_data: vec![],129};130131let (meta, data) = write_message(&mut self.writer, &encoded_message)?;132self.block_offsets += meta + data + 8; // 8 <=> arrow magic + 2 bytes for alignment133self.state = State::Started;134Ok(())135}136137/// Writes [`RecordBatchT`] to the file138pub fn write(139&mut self,140chunk: &RecordBatchT<Box<dyn Array>>,141ipc_fields: Option<&[IpcField]>,142) -> PolarsResult<()> {143if self.state != State::Started {144polars_bail!(145oos = "The IPC file must be started before it can be written to. Call `start` before `write`"146);147}148149let ipc_fields = if let Some(ipc_fields) = ipc_fields {150ipc_fields151} else {152self.ipc_fields.as_ref()153};154let encoded_dictionaries = encode_chunk_amortized(155chunk,156ipc_fields,157&mut self.dictionary_tracker,158&self.options,159&mut self.encoded_message,160)?;161162let encoded_message = std::mem::take(&mut self.encoded_message);163self.write_encoded(&encoded_dictionaries[..], &encoded_message)?;164self.encoded_message = encoded_message;165166Ok(())167}168169pub fn write_encoded(170&mut self,171encoded_dictionaries: &[EncodedData],172encoded_message: &EncodedData,173) -> PolarsResult<()> {174if self.state != State::Started {175polars_bail!(176oos = "The IPC file must be started before it can be written to. Call `start` before `write`"177);178}179180// add all dictionaries181for encoded_dictionary in encoded_dictionaries {182let (meta, data) = write_message(&mut self.writer, encoded_dictionary)?;183184let block = arrow_format::ipc::Block {185offset: self.block_offsets as i64,186meta_data_length: meta as i32,187body_length: data as i64,188};189self.dictionary_blocks.push(block);190self.block_offsets += meta + data;191}192193self.write_encoded_record_batch(encoded_message)?;194195Ok(())196}197198pub fn write_encoded_record_batch(199&mut self,200encoded_message: &EncodedData,201) -> PolarsResult<()> {202let (meta, data) = write_message(&mut self.writer, encoded_message)?;203// add a record block for the footer204let block = arrow_format::ipc::Block {205offset: self.block_offsets as i64,206meta_data_length: meta as i32, // TODO: is this still applicable?207body_length: data as i64,208};209self.record_blocks.push(block);210self.block_offsets += meta + data;211212Ok(())213}214215/// Write footer and closing tag, then mark the writer as done216pub fn finish(&mut self) -> PolarsResult<()> {217if self.state != State::Started {218polars_bail!(219oos = "The IPC file must be started before it can be finished. Call `start` before `finish`"220);221}222223// write EOS224write_continuation(&mut self.writer, 0)?;225226let schema = schema::serialize_schema(227&self.schema,228&self.ipc_fields,229self.custom_schema_metadata.as_deref(),230);231232let root = arrow_format::ipc::Footer {233version: arrow_format::ipc::MetadataVersion::V5,234schema: Some(Box::new(schema)),235dictionaries: Some(std::mem::take(&mut self.dictionary_blocks)),236record_batches: Some(std::mem::take(&mut self.record_blocks)),237custom_metadata: None,238};239let mut builder = Builder::new();240let footer_data = builder.finish(&root, None);241self.writer.write_all(footer_data)?;242self.writer243.write_all(&(footer_data.len() as i32).to_le_bytes())?;244self.writer.write_all(&ARROW_MAGIC_V2)?;245self.writer.flush()?;246self.state = State::Finished;247248Ok(())249}250251/// Sets custom schema metadata. Must be called before `start` is called252pub fn set_custom_schema_metadata(&mut self, custom_metadata: Arc<Metadata>) {253self.custom_schema_metadata = Some(custom_metadata);254}255}256257258