Path: blob/main/crates/polars-arrow/src/io/ipc/write/serialize/mod.rs
8424 views
#![allow(clippy::ptr_arg)] // false positive in clippy, see https://github.com/rust-lang/rust-clippy/issues/84631use arrow_format::ipc;23use super::super::compression;4use super::super::endianness::is_native_little_endian;5use super::common::{Compression, pad_to_64};6use crate::array::*;7use crate::bitmap::Bitmap;8use crate::datatypes::PhysicalType;9use crate::offset::{Offset, OffsetsBuffer};10use crate::trusted_len::TrustedLen;11use crate::types::NativeType;12use crate::{match_integer_type, with_match_primitive_type_full};13mod binary;14mod binview;15mod boolean;16mod fixed_size_binary;17mod fixed_sized_list;18mod list;19mod map;20mod primitive;21mod struct_;22mod union;2324use binary::*;25use binview::*;26use boolean::*;27use fixed_size_binary::*;28use fixed_sized_list::*;29use list::*;30use map::*;31use primitive::*;32use struct_::*;33use union::*;3435/// Writes an [`Array`] to `arrow_data`36pub fn write(37array: &dyn Array,38buffers: &mut Vec<ipc::Buffer>,39arrow_data: &mut Vec<u8>,40nodes: &mut Vec<ipc::FieldNode>,41offset: &mut i64,42is_little_endian: bool,43compression: Option<Compression>,44) {45nodes.push(ipc::FieldNode {46length: array.len() as i64,47null_count: array.null_count() as i64,48});49use PhysicalType::*;50match array.dtype().to_physical_type() {51Null => (),52Boolean => write_boolean(53array.as_any().downcast_ref().unwrap(),54buffers,55arrow_data,56offset,57is_little_endian,58compression,59),60Primitive(primitive) => with_match_primitive_type_full!(primitive, |$T| {61let array = array.as_any().downcast_ref().unwrap();62write_primitive::<$T>(array, buffers, arrow_data, offset, is_little_endian, compression)63}),64Binary => write_binary::<i32>(65array.as_any().downcast_ref().unwrap(),66buffers,67arrow_data,68offset,69is_little_endian,70compression,71),72LargeBinary => write_binary::<i64>(73array.as_any().downcast_ref().unwrap(),74buffers,75arrow_data,76offset,77is_little_endian,78compression,79),80FixedSizeBinary => write_fixed_size_binary(81array.as_any().downcast_ref().unwrap(),82buffers,83arrow_data,84offset,85is_little_endian,86compression,87),88Utf8 => write_utf8::<i32>(89array.as_any().downcast_ref().unwrap(),90buffers,91arrow_data,92offset,93is_little_endian,94compression,95),96LargeUtf8 => write_utf8::<i64>(97array.as_any().downcast_ref().unwrap(),98buffers,99arrow_data,100offset,101is_little_endian,102compression,103),104List => write_list::<i32>(105array.as_any().downcast_ref().unwrap(),106buffers,107arrow_data,108nodes,109offset,110is_little_endian,111compression,112),113LargeList => write_list::<i64>(114array.as_any().downcast_ref().unwrap(),115buffers,116arrow_data,117nodes,118offset,119is_little_endian,120compression,121),122FixedSizeList => write_fixed_size_list(123array.as_any().downcast_ref().unwrap(),124buffers,125arrow_data,126nodes,127offset,128is_little_endian,129compression,130),131Struct => write_struct(132array.as_any().downcast_ref().unwrap(),133buffers,134arrow_data,135nodes,136offset,137is_little_endian,138compression,139),140Dictionary(key_type) => match_integer_type!(key_type, |$T| {141let array: &DictionaryArray<$T> = array.as_any().downcast_ref().unwrap();142let keys_array: &PrimitiveArray<$T> = array.keys().as_any().downcast_ref().unwrap();143144write_primitive::<$T>(145keys_array,146buffers,147arrow_data,148offset,149is_little_endian,150compression151)152}),153Union => {154write_union(155array.as_any().downcast_ref().unwrap(),156buffers,157arrow_data,158nodes,159offset,160is_little_endian,161compression,162);163},164Map => {165write_map(166array.as_any().downcast_ref().unwrap(),167buffers,168arrow_data,169nodes,170offset,171is_little_endian,172compression,173);174},175Utf8View => write_binview(176array.as_any().downcast_ref::<Utf8ViewArray>().unwrap(),177buffers,178arrow_data,179offset,180is_little_endian,181compression,182),183BinaryView => write_binview(184array.as_any().downcast_ref::<BinaryViewArray>().unwrap(),185buffers,186arrow_data,187offset,188is_little_endian,189compression,190),191}192}193194#[inline]195fn pad_buffer_to_64(buffer: &mut Vec<u8>, length: usize) {196let pad_len = pad_to_64(length);197for _ in 0..pad_len {198buffer.push(0u8);199}200}201202/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.203fn write_bytes(204bytes: &[u8],205buffers: &mut Vec<ipc::Buffer>,206arrow_data: &mut Vec<u8>,207offset: &mut i64,208compression: Option<Compression>,209) {210let start = arrow_data.len();211if let Some(compression) = compression {212arrow_data.extend_from_slice(&(bytes.len() as i64).to_le_bytes());213match compression {214Compression::LZ4 => {215compression::compress_lz4(bytes, arrow_data).unwrap();216},217Compression::ZSTD(level) => {218compression::compress_zstd(bytes, arrow_data, level).unwrap();219},220}221} else {222arrow_data.extend_from_slice(bytes);223};224225buffers.push(finish_buffer(arrow_data, start, offset));226}227228fn write_bitmap(229bitmap: Option<&Bitmap>,230length: usize,231buffers: &mut Vec<ipc::Buffer>,232arrow_data: &mut Vec<u8>,233offset: &mut i64,234compression: Option<Compression>,235) {236match bitmap {237Some(bitmap) => {238assert_eq!(bitmap.len(), length);239let (slice, slice_offset, _) = bitmap.as_slice();240if slice_offset != 0 {241// case where we can't slice the bitmap as the offsets are not multiple of 8242let bytes = Bitmap::from_trusted_len_iter(bitmap.iter());243let (slice, _, _) = bytes.as_slice();244write_bytes(slice, buffers, arrow_data, offset, compression)245} else {246write_bytes(slice, buffers, arrow_data, offset, compression)247}248},249None => {250buffers.push(ipc::Buffer {251offset: *offset,252length: 0,253});254},255}256}257258/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.259fn write_buffer<T: NativeType>(260buffer: &[T],261buffers: &mut Vec<ipc::Buffer>,262arrow_data: &mut Vec<u8>,263offset: &mut i64,264is_little_endian: bool,265compression: Option<Compression>,266) {267let start = arrow_data.len();268if let Some(compression) = compression {269_write_compressed_buffer(buffer, arrow_data, is_little_endian, compression);270} else {271_write_buffer(buffer, arrow_data, is_little_endian);272};273274buffers.push(finish_buffer(arrow_data, start, offset));275}276277#[inline]278fn _write_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(279buffer: I,280arrow_data: &mut Vec<u8>,281is_little_endian: bool,282) {283let len = buffer.size_hint().0;284arrow_data.reserve(len * size_of::<T>());285if is_little_endian {286buffer287.map(|x| T::to_le_bytes(&x))288.for_each(|x| arrow_data.extend_from_slice(x.as_ref()))289} else {290buffer291.map(|x| T::to_be_bytes(&x))292.for_each(|x| arrow_data.extend_from_slice(x.as_ref()))293}294}295296#[inline]297fn _write_compressed_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(298buffer: I,299arrow_data: &mut Vec<u8>,300is_little_endian: bool,301compression: Compression,302) {303let len = buffer.size_hint().0;304let mut swapped = Vec::with_capacity(len * size_of::<T>());305if is_little_endian {306buffer307.map(|x| T::to_le_bytes(&x))308.for_each(|x| swapped.extend_from_slice(x.as_ref()));309} else {310buffer311.map(|x| T::to_be_bytes(&x))312.for_each(|x| swapped.extend_from_slice(x.as_ref()))313};314arrow_data.extend_from_slice(&(swapped.len() as i64).to_le_bytes());315match compression {316Compression::LZ4 => {317compression::compress_lz4(&swapped, arrow_data).unwrap();318},319Compression::ZSTD(level) => {320compression::compress_zstd(&swapped, arrow_data, level).unwrap();321},322}323}324325fn _write_buffer<T: NativeType>(buffer: &[T], arrow_data: &mut Vec<u8>, is_little_endian: bool) {326if is_little_endian == is_native_little_endian() {327// in native endianness we can use the bytes directly.328let buffer = bytemuck::cast_slice(buffer);329arrow_data.extend_from_slice(buffer);330} else {331_write_buffer_from_iter(buffer.iter().copied(), arrow_data, is_little_endian)332}333}334335fn _write_compressed_buffer<T: NativeType>(336buffer: &[T],337arrow_data: &mut Vec<u8>,338is_little_endian: bool,339compression: Compression,340) {341if is_little_endian == is_native_little_endian() {342let bytes = bytemuck::cast_slice(buffer);343arrow_data.extend_from_slice(&(bytes.len() as i64).to_le_bytes());344match compression {345Compression::LZ4 => {346compression::compress_lz4(bytes, arrow_data).unwrap();347},348Compression::ZSTD(level) => {349compression::compress_zstd(bytes, arrow_data, level).unwrap();350},351}352} else {353todo!()354}355}356357/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.358#[inline]359fn write_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(360buffer: I,361buffers: &mut Vec<ipc::Buffer>,362arrow_data: &mut Vec<u8>,363offset: &mut i64,364is_little_endian: bool,365compression: Option<Compression>,366) {367let start = arrow_data.len();368369if let Some(compression) = compression {370_write_compressed_buffer_from_iter(buffer, arrow_data, is_little_endian, compression);371} else {372_write_buffer_from_iter(buffer, arrow_data, is_little_endian);373}374375buffers.push(finish_buffer(arrow_data, start, offset));376}377378fn finish_buffer(arrow_data: &mut Vec<u8>, start: usize, offset: &mut i64) -> ipc::Buffer {379let buffer_len = (arrow_data.len() - start) as i64;380381pad_buffer_to_64(arrow_data, arrow_data.len() - start);382let total_len = (arrow_data.len() - start) as i64;383384let buffer = ipc::Buffer {385offset: *offset,386length: buffer_len,387};388*offset += total_len;389buffer390}391392393