Path: blob/main/crates/polars-arrow/src/io/ipc/write/common_sync.rs
6940 views
use std::io::Write;12use polars_error::PolarsResult;34use super::super::CONTINUATION_MARKER;5use super::common::{EncodedData, pad_to_64};67/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written8pub fn write_message<W: Write>(9writer: &mut W,10encoded: &EncodedData,11) -> PolarsResult<(usize, usize)> {12let arrow_data_len = encoded.arrow_data.len();1314let a = 8 - 1;15let buffer = &encoded.ipc_message;16let flatbuf_size = buffer.len();17let prefix_size = 8;18let aligned_size = (flatbuf_size + prefix_size + a) & !a;19let padding_bytes = aligned_size - flatbuf_size - prefix_size;2021write_continuation(writer, (aligned_size - prefix_size) as i32)?;2223// write the flatbuf24if flatbuf_size > 0 {25writer.write_all(buffer)?;26}27// write padding28// aligned to a 8 byte boundary, so maximum is [u8;8]29const PADDING_MAX: [u8; 8] = [0u8; 8];30writer.write_all(&PADDING_MAX[..padding_bytes])?;3132// write arrow data33let body_len = if arrow_data_len > 0 {34write_body_buffers(writer, &encoded.arrow_data)?35} else {36037};3839Ok((aligned_size, body_len))40}4142fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> PolarsResult<usize> {43let len = data.len();44let pad_len = pad_to_64(data.len());45let total_len = len + pad_len;4647// write body buffer48writer.write_all(data)?;49if pad_len > 0 {50writer.write_all(&vec![0u8; pad_len][..])?;51}5253Ok(total_len)54}5556/// Write a record batch to the writer, writing the message size before the message57/// if the record batch is being written to a stream58pub fn write_continuation<W: Write>(writer: &mut W, total_len: i32) -> PolarsResult<usize> {59writer.write_all(&CONTINUATION_MARKER)?;60writer.write_all(&total_len.to_le_bytes()[..])?;61Ok(8)62}636465