Path: blob/main/crates/polars-arrow/src/io/ipc/append/mod.rs
6940 views
//! A struct adapter of Read+Seek+Write to append to IPC files1// read header and convert to writer information2// seek to first byte of header - 13// write new batch4// write new footer5use std::io::{Read, Seek, SeekFrom, Write};67use polars_error::{PolarsResult, polars_bail, polars_err};89use super::endianness::is_native_little_endian;10use super::read::{self, FileMetadata};11use super::write::common::DictionaryTracker;12use super::write::writer::*;13use super::write::*;1415impl<R: Read + Seek + Write> FileWriter<R> {16/// Creates a new [`FileWriter`] from an existing file, seeking to the last message17/// and appending new messages afterwards. Users call `finish` to write the footer (with both)18/// the existing and appended messages on it.19/// # Error20/// This function errors iff:21/// * the file's endianness is not the native endianness (not yet supported)22/// * the file is not a valid Arrow IPC file23pub fn try_from_file(24mut writer: R,25metadata: FileMetadata,26options: WriteOptions,27) -> PolarsResult<FileWriter<R>> {28if metadata.ipc_schema.is_little_endian != is_native_little_endian() {29polars_bail!(ComputeError: "appending to a file of a non-native endianness is not supported")30}3132let dictionaries =33read::read_file_dictionaries(&mut writer, &metadata, &mut Default::default())?;3435let last_block = metadata.blocks.last().ok_or_else(|| {36polars_err!(oos = "an Arrow IPC file must have at least 1 message (the schema message)")37})?;38let offset: u64 = last_block39.offset40.try_into()41.map_err(|_| polars_err!(oos = "the block's offset must be a positive number"))?;42let meta_data_length: u64 = last_block43.meta_data_length44.try_into()45.map_err(|_| polars_err!(oos = "the block's offset must be a positive number"))?;46let body_length: u64 = last_block47.body_length48.try_into()49.map_err(|_| polars_err!(oos = "the block's body length must be a positive number"))?;50let offset: u64 = offset + meta_data_length + body_length;5152writer.seek(SeekFrom::Start(offset))?;5354Ok(FileWriter {55writer,56options,57schema: metadata.schema,58ipc_fields: metadata.ipc_schema.fields,59block_offsets: offset as usize,60dictionary_blocks: metadata.dictionaries.unwrap_or_default(),61record_blocks: metadata.blocks,62state: State::Started, // file already exists, so we are ready63dictionary_tracker: DictionaryTracker {64dictionaries,65cannot_replace: true,66},67encoded_message: Default::default(),68custom_schema_metadata: None,69})70}71}727374