Path: blob/main/crates/polars-arrow/src/io/ipc/write/serialize/mod.rs
6940 views
#![allow(clippy::ptr_arg)] // false positive in clippy, see https://github.com/rust-lang/rust-clippy/issues/84631use arrow_format::ipc;23use super::super::compression;4use super::super::endianness::is_native_little_endian;5use super::common::{Compression, pad_to_64};6use crate::array::*;7use crate::bitmap::Bitmap;8use crate::datatypes::PhysicalType;9use crate::offset::{Offset, OffsetsBuffer};10use crate::trusted_len::TrustedLen;11use crate::types::NativeType;12use crate::{match_integer_type, with_match_primitive_type_full};13mod binary;14mod binview;15mod boolean;16mod dictionary;17mod fixed_size_binary;18mod fixed_sized_list;19mod list;20mod map;21mod primitive;22mod struct_;23mod union;2425use binary::*;26use binview::*;27use boolean::*;28pub(super) use dictionary::*;29use fixed_size_binary::*;30use fixed_sized_list::*;31use list::*;32use map::*;33use primitive::*;34use struct_::*;35use union::*;3637/// Writes an [`Array`] to `arrow_data`38pub fn write(39array: &dyn Array,40buffers: &mut Vec<ipc::Buffer>,41arrow_data: &mut Vec<u8>,42nodes: &mut Vec<ipc::FieldNode>,43offset: &mut i64,44is_little_endian: bool,45compression: Option<Compression>,46) {47nodes.push(ipc::FieldNode {48length: array.len() as i64,49null_count: array.null_count() as i64,50});51use PhysicalType::*;52match array.dtype().to_physical_type() {53Null => (),54Boolean => write_boolean(55array.as_any().downcast_ref().unwrap(),56buffers,57arrow_data,58offset,59is_little_endian,60compression,61),62Primitive(primitive) => with_match_primitive_type_full!(primitive, |$T| {63let array = array.as_any().downcast_ref().unwrap();64write_primitive::<$T>(array, buffers, arrow_data, offset, is_little_endian, compression)65}),66Binary => write_binary::<i32>(67array.as_any().downcast_ref().unwrap(),68buffers,69arrow_data,70offset,71is_little_endian,72compression,73),74LargeBinary => write_binary::<i64>(75array.as_any().downcast_ref().unwrap(),76buffers,77arrow_data,78offset,79is_little_endian,80compression,81),82FixedSizeBinary => write_fixed_size_binary(83array.as_any().downcast_ref().unwrap(),84buffers,85arrow_data,86offset,87is_little_endian,88compression,89),90Utf8 => write_utf8::<i32>(91array.as_any().downcast_ref().unwrap(),92buffers,93arrow_data,94offset,95is_little_endian,96compression,97),98LargeUtf8 => write_utf8::<i64>(99array.as_any().downcast_ref().unwrap(),100buffers,101arrow_data,102offset,103is_little_endian,104compression,105),106List => write_list::<i32>(107array.as_any().downcast_ref().unwrap(),108buffers,109arrow_data,110nodes,111offset,112is_little_endian,113compression,114),115LargeList => write_list::<i64>(116array.as_any().downcast_ref().unwrap(),117buffers,118arrow_data,119nodes,120offset,121is_little_endian,122compression,123),124FixedSizeList => write_fixed_size_list(125array.as_any().downcast_ref().unwrap(),126buffers,127arrow_data,128nodes,129offset,130is_little_endian,131compression,132),133Struct => write_struct(134array.as_any().downcast_ref().unwrap(),135buffers,136arrow_data,137nodes,138offset,139is_little_endian,140compression,141),142Dictionary(key_type) => match_integer_type!(key_type, |$T| {143write_dictionary::<$T>(144array.as_any().downcast_ref().unwrap(),145buffers,146arrow_data,147nodes,148offset,149is_little_endian,150compression,151true,152);153}),154Union => {155write_union(156array.as_any().downcast_ref().unwrap(),157buffers,158arrow_data,159nodes,160offset,161is_little_endian,162compression,163);164},165Map => {166write_map(167array.as_any().downcast_ref().unwrap(),168buffers,169arrow_data,170nodes,171offset,172is_little_endian,173compression,174);175},176Utf8View => write_binview(177array.as_any().downcast_ref::<Utf8ViewArray>().unwrap(),178buffers,179arrow_data,180offset,181is_little_endian,182compression,183),184BinaryView => write_binview(185array.as_any().downcast_ref::<BinaryViewArray>().unwrap(),186buffers,187arrow_data,188offset,189is_little_endian,190compression,191),192}193}194195#[inline]196fn pad_buffer_to_64(buffer: &mut Vec<u8>, length: usize) {197let pad_len = pad_to_64(length);198for _ in 0..pad_len {199buffer.push(0u8);200}201}202203/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.204fn write_bytes(205bytes: &[u8],206buffers: &mut Vec<ipc::Buffer>,207arrow_data: &mut Vec<u8>,208offset: &mut i64,209compression: Option<Compression>,210) {211let start = arrow_data.len();212if let Some(compression) = compression {213arrow_data.extend_from_slice(&(bytes.len() as i64).to_le_bytes());214match compression {215Compression::LZ4 => {216compression::compress_lz4(bytes, arrow_data).unwrap();217},218Compression::ZSTD => {219compression::compress_zstd(bytes, arrow_data).unwrap();220},221}222} else {223arrow_data.extend_from_slice(bytes);224};225226buffers.push(finish_buffer(arrow_data, start, offset));227}228229fn write_bitmap(230bitmap: Option<&Bitmap>,231length: usize,232buffers: &mut Vec<ipc::Buffer>,233arrow_data: &mut Vec<u8>,234offset: &mut i64,235compression: Option<Compression>,236) {237match bitmap {238Some(bitmap) => {239assert_eq!(bitmap.len(), length);240let (slice, slice_offset, _) = bitmap.as_slice();241if slice_offset != 0 {242// case where we can't slice the bitmap as the offsets are not multiple of 8243let bytes = Bitmap::from_trusted_len_iter(bitmap.iter());244let (slice, _, _) = bytes.as_slice();245write_bytes(slice, buffers, arrow_data, offset, compression)246} else {247write_bytes(slice, buffers, arrow_data, offset, compression)248}249},250None => {251buffers.push(ipc::Buffer {252offset: *offset,253length: 0,254});255},256}257}258259/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.260fn write_buffer<T: NativeType>(261buffer: &[T],262buffers: &mut Vec<ipc::Buffer>,263arrow_data: &mut Vec<u8>,264offset: &mut i64,265is_little_endian: bool,266compression: Option<Compression>,267) {268let start = arrow_data.len();269if let Some(compression) = compression {270_write_compressed_buffer(buffer, arrow_data, is_little_endian, compression);271} else {272_write_buffer(buffer, arrow_data, is_little_endian);273};274275buffers.push(finish_buffer(arrow_data, start, offset));276}277278#[inline]279fn _write_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(280buffer: I,281arrow_data: &mut Vec<u8>,282is_little_endian: bool,283) {284let len = buffer.size_hint().0;285arrow_data.reserve(len * size_of::<T>());286if is_little_endian {287buffer288.map(|x| T::to_le_bytes(&x))289.for_each(|x| arrow_data.extend_from_slice(x.as_ref()))290} else {291buffer292.map(|x| T::to_be_bytes(&x))293.for_each(|x| arrow_data.extend_from_slice(x.as_ref()))294}295}296297#[inline]298fn _write_compressed_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(299buffer: I,300arrow_data: &mut Vec<u8>,301is_little_endian: bool,302compression: Compression,303) {304let len = buffer.size_hint().0;305let mut swapped = Vec::with_capacity(len * size_of::<T>());306if is_little_endian {307buffer308.map(|x| T::to_le_bytes(&x))309.for_each(|x| swapped.extend_from_slice(x.as_ref()));310} else {311buffer312.map(|x| T::to_be_bytes(&x))313.for_each(|x| swapped.extend_from_slice(x.as_ref()))314};315arrow_data.extend_from_slice(&(swapped.len() as i64).to_le_bytes());316match compression {317Compression::LZ4 => {318compression::compress_lz4(&swapped, arrow_data).unwrap();319},320Compression::ZSTD => {321compression::compress_zstd(&swapped, arrow_data).unwrap();322},323}324}325326fn _write_buffer<T: NativeType>(buffer: &[T], arrow_data: &mut Vec<u8>, is_little_endian: bool) {327if is_little_endian == is_native_little_endian() {328// in native endianness we can use the bytes directly.329let buffer = bytemuck::cast_slice(buffer);330arrow_data.extend_from_slice(buffer);331} else {332_write_buffer_from_iter(buffer.iter().copied(), arrow_data, is_little_endian)333}334}335336fn _write_compressed_buffer<T: NativeType>(337buffer: &[T],338arrow_data: &mut Vec<u8>,339is_little_endian: bool,340compression: Compression,341) {342if is_little_endian == is_native_little_endian() {343let bytes = bytemuck::cast_slice(buffer);344arrow_data.extend_from_slice(&(bytes.len() as i64).to_le_bytes());345match compression {346Compression::LZ4 => {347compression::compress_lz4(bytes, arrow_data).unwrap();348},349Compression::ZSTD => {350compression::compress_zstd(bytes, arrow_data).unwrap();351},352}353} else {354todo!()355}356}357358/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.359#[inline]360fn write_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(361buffer: I,362buffers: &mut Vec<ipc::Buffer>,363arrow_data: &mut Vec<u8>,364offset: &mut i64,365is_little_endian: bool,366compression: Option<Compression>,367) {368let start = arrow_data.len();369370if let Some(compression) = compression {371_write_compressed_buffer_from_iter(buffer, arrow_data, is_little_endian, compression);372} else {373_write_buffer_from_iter(buffer, arrow_data, is_little_endian);374}375376buffers.push(finish_buffer(arrow_data, start, offset));377}378379fn finish_buffer(arrow_data: &mut Vec<u8>, start: usize, offset: &mut i64) -> ipc::Buffer {380let buffer_len = (arrow_data.len() - start) as i64;381382pad_buffer_to_64(arrow_data, arrow_data.len() - start);383let total_len = (arrow_data.len() - start) as i64;384385let buffer = ipc::Buffer {386offset: *offset,387length: buffer_len,388};389*offset += total_len;390buffer391}392393394