Path: blob/main/crates/polars-arrow/src/io/ipc/read/array/union.rs
8421 views
use std::collections::VecDeque;1use std::io::{Read, Seek};23use polars_error::{PolarsResult, polars_err};4use polars_utils::bool::UnsafeBool;56use super::super::super::IpcField;7use super::super::deserialize::{read, skip};8use super::super::read_basic::*;9use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};10use crate::array::UnionArray;11use crate::datatypes::{ArrowDataType, UnionMode};12use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node};1314#[allow(clippy::too_many_arguments)]15pub fn read_union<R: Read + Seek>(16field_nodes: &mut VecDeque<Node>,17variadic_buffer_counts: &mut VecDeque<usize>,18dtype: ArrowDataType,19ipc_field: &IpcField,20buffers: &mut VecDeque<IpcBuffer>,21reader: &mut R,22dictionaries: &Dictionaries,23block_offset: u64,24is_little_endian: bool,25compression: Option<Compression>,26limit: Option<usize>,27version: Version,28scratch: &mut Vec<u8>,29checked: UnsafeBool,30) -> PolarsResult<UnionArray> {31let field_node = try_get_field_node(field_nodes, &dtype)?;3233if version != Version::V5 {34let _ = buffers35.pop_front()36.ok_or_else(|| polars_err!(oos = "IPC: missing validity buffer."))?;37};3839let length = try_get_array_length(field_node, limit)?;4041let types = read_buffer(42buffers,43length,44reader,45block_offset,46is_little_endian,47compression,48scratch,49)?;5051let offsets = if let ArrowDataType::Union(u) = &dtype {52if !u.mode.is_sparse() {53Some(read_buffer(54buffers,55length,56reader,57block_offset,58is_little_endian,59compression,60scratch,61)?)62} else {63None64}65} else {66unreachable!()67};6869let fields = UnionArray::get_fields(&dtype);7071let fields = fields72.iter()73.zip(ipc_field.fields.iter())74.map(|(field, ipc_field)| {75read(76field_nodes,77variadic_buffer_counts,78field,79ipc_field,80buffers,81reader,82dictionaries,83block_offset,84is_little_endian,85compression,86None,87version,88scratch,89checked,90)91})92.collect::<PolarsResult<Vec<_>>>()?;9394UnionArray::try_new(dtype, types, fields, offsets)95}9697pub fn skip_union(98field_nodes: &mut VecDeque<Node>,99dtype: &ArrowDataType,100buffers: &mut VecDeque<IpcBuffer>,101variadic_buffer_counts: &mut VecDeque<usize>,102) -> PolarsResult<()> {103let _ = field_nodes.pop_front().ok_or_else(|| {104polars_err!(105oos = "IPC: unable to fetch the field for struct. The file or stream is corrupted."106)107})?;108109let _ = buffers110.pop_front()111.ok_or_else(|| polars_err!(oos = "IPC: missing validity buffer."))?;112if let ArrowDataType::Union(u) = dtype {113assert!(u.mode == UnionMode::Dense);114let _ = buffers115.pop_front()116.ok_or_else(|| polars_err!(oos = "IPC: missing offsets buffer."))?;117} else {118unreachable!()119};120121let fields = UnionArray::get_fields(dtype);122123fields124.iter()125.try_for_each(|field| skip(field_nodes, field.dtype(), buffers, variadic_buffer_counts))126}127128129