Path: blob/main/crates/polars-arrow/src/io/ipc/write/common_sync.rs
8424 views
use std::io::Write;1use std::sync::Arc;23use arrow_format::ipc::planus::Builder;4use bytes::Bytes;5use polars_error::PolarsResult;67use super::super::{ARROW_MAGIC_V2, ARROW_MAGIC_V2_PADDED, CONTINUATION_MARKER};8use super::common::{EncodedData, pad_to_64};9use super::schema;10use crate::datatypes::*;11use crate::io::ipc::IpcField;12use crate::io::ipc::write::EncodedDataBytes;1314/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written15pub fn write_message<W: Write>(16writer: &mut W,17encoded: &EncodedData,18) -> PolarsResult<(usize, usize)> {19let arrow_data_len = encoded.arrow_data.len();2021let a = 8 - 1;22let buffer = &encoded.ipc_message;23let flatbuf_size = buffer.len();24let prefix_size = 8;25let aligned_size = (flatbuf_size + prefix_size + a) & !a;26let padding_bytes = aligned_size - flatbuf_size - prefix_size;2728write_continuation(writer, (aligned_size - prefix_size) as i32)?;2930// write the flatbuf31if flatbuf_size > 0 {32writer.write_all(buffer)?;33}34// write padding35// aligned to a 8 byte boundary, so maximum is [u8;8]36const PADDING_MAX: [u8; 8] = [0u8; 8];37writer.write_all(&PADDING_MAX[..padding_bytes])?;3839// write arrow data40let body_len = if arrow_data_len > 0 {41write_body_buffers(writer, &encoded.arrow_data)?42} else {43044};4546Ok((aligned_size, body_len))47}4849/// Encapsulate an encoded IPC message into the provided Bytes queue. Ownership50/// of the data will move. Returns the metadata and body length in bytes.51pub fn push_message(queue: &mut Vec<Bytes>, encoded: EncodedDataBytes) -> (usize, usize) {52let arrow_data_len = encoded.arrow_data.len();5354let a = 8 - 1;55let buffer = encoded.ipc_message;56let flatbuf_size = buffer.len();57let prefix_size = 8;58let aligned_size = (flatbuf_size + prefix_size + a) & !a;59let padding_bytes = aligned_size - flatbuf_size - prefix_size;6061// Continuation.62let total_len = (aligned_size - prefix_size) as i32;63queue.push(Bytes::from_static(&CONTINUATION_MARKER));64queue.push(Bytes::copy_from_slice(&total_len.to_le_bytes()[..]));6566// Write the flatbuf.67if flatbuf_size > 0 {68queue.push(buffer);69}7071// Write padding.72// This is aligned to a 8 byte boundary, so maximum is [u8;8].73const PADDING_MAX: [u8; 8] = [0u8; 8];74queue.push(Bytes::from_static(&PADDING_MAX[..padding_bytes]));7576// write arrow data77let body_len = if arrow_data_len > 0 {78let data = encoded.arrow_data;79let len = data.len();80let pad_len = pad_to_64(data.len());81let total_len = len + pad_len;8283queue.push(data);84if pad_len > 0 {85queue.push(Bytes::from(vec![0u8; pad_len]));86}87total_len88} else {89090};9192(aligned_size, body_len)93}9495fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> PolarsResult<usize> {96let len = data.len();97let pad_len = pad_to_64(data.len());98let total_len = len + pad_len;99100// write body buffer101writer.write_all(data)?;102if pad_len > 0 {103writer.write_all(&vec![0u8; pad_len][..])?;104}105106Ok(total_len)107}108109/// Write a record batch to the writer, writing the message size before the message110/// if the record batch is being written to a stream111pub fn write_continuation<W: Write>(writer: &mut W, total_len: i32) -> PolarsResult<usize> {112writer.write_all(&CONTINUATION_MARKER)?;113writer.write_all(&total_len.to_le_bytes()[..])?;114Ok(8)115}116117/// Push the IPC magic bytes.118pub fn push_magic(queue: &mut Vec<Bytes>, padded: bool) -> usize {119if padded {120queue.push(Bytes::from_static(&ARROW_MAGIC_V2_PADDED));1218122} else {123queue.push(Bytes::from_static(&ARROW_MAGIC_V2));1246125}126}127128/// Append a continuation marker and the `total_len` value to the Bytes queue.129pub fn push_continuation(queue: &mut Vec<Bytes>, total_len: i32) -> usize {130let mut buf = [0u8; 8];131buf[..4].copy_from_slice(&CONTINUATION_MARKER);132buf[4..].copy_from_slice(&total_len.to_le_bytes());133queue.push(Bytes::copy_from_slice(&buf));1348135}136137/// Build the IPC File Footer and accumulate the owned Bytes into the queue.138/// Returns the total number of bytes added.139pub fn push_footer(140queue: &mut Vec<Bytes>,141schema: &ArrowSchema,142ipc_fields: &[IpcField],143dictionary_blocks: Vec<arrow_format::ipc::Block>,144record_blocks: Vec<arrow_format::ipc::Block>,145// Placeholder, inherited from FileWriter for future use.146custom_schema_metadata: Option<Arc<Metadata>>,147) -> usize {148let mut total_len = 0;149150total_len += push_continuation(queue, 0);151152let schema = schema::serialize_schema(schema, ipc_fields, custom_schema_metadata.as_deref());153154let root = arrow_format::ipc::Footer {155version: arrow_format::ipc::MetadataVersion::V5,156schema: Some(Box::new(schema)),157dictionaries: Some(dictionary_blocks),158record_batches: Some(record_blocks),159custom_metadata: None,160};161let mut builder = Builder::new();162let footer_data = builder.finish(&root, None);163let footer_data = Bytes::copy_from_slice(footer_data);164let footer_data_len = footer_data.len();165166queue.push(footer_data);167total_len += footer_data_len;168169queue.push(Bytes::copy_from_slice(170&(footer_data_len as i32).to_le_bytes(),171));172total_len += 4;173174total_len175}176177178