Path: blob/main/crates/polars-arrow/src/io/ipc/read/array/union.rs
6940 views
use std::collections::VecDeque;1use std::io::{Read, Seek};23use polars_error::{PolarsResult, polars_err};45use super::super::super::IpcField;6use super::super::deserialize::{read, skip};7use super::super::read_basic::*;8use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};9use crate::array::UnionArray;10use crate::datatypes::{ArrowDataType, UnionMode};11use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node};1213#[allow(clippy::too_many_arguments)]14pub fn read_union<R: Read + Seek>(15field_nodes: &mut VecDeque<Node>,16variadic_buffer_counts: &mut VecDeque<usize>,17dtype: ArrowDataType,18ipc_field: &IpcField,19buffers: &mut VecDeque<IpcBuffer>,20reader: &mut R,21dictionaries: &Dictionaries,22block_offset: u64,23is_little_endian: bool,24compression: Option<Compression>,25limit: Option<usize>,26version: Version,27scratch: &mut Vec<u8>,28) -> PolarsResult<UnionArray> {29let field_node = try_get_field_node(field_nodes, &dtype)?;3031if version != Version::V5 {32let _ = buffers33.pop_front()34.ok_or_else(|| polars_err!(oos = "IPC: missing validity buffer."))?;35};3637let length = try_get_array_length(field_node, limit)?;3839let types = read_buffer(40buffers,41length,42reader,43block_offset,44is_little_endian,45compression,46scratch,47)?;4849let offsets = if let ArrowDataType::Union(u) = &dtype {50if !u.mode.is_sparse() {51Some(read_buffer(52buffers,53length,54reader,55block_offset,56is_little_endian,57compression,58scratch,59)?)60} else {61None62}63} else {64unreachable!()65};6667let fields = UnionArray::get_fields(&dtype);6869let fields = fields70.iter()71.zip(ipc_field.fields.iter())72.map(|(field, ipc_field)| {73read(74field_nodes,75variadic_buffer_counts,76field,77ipc_field,78buffers,79reader,80dictionaries,81block_offset,82is_little_endian,83compression,84None,85version,86scratch,87)88})89.collect::<PolarsResult<Vec<_>>>()?;9091UnionArray::try_new(dtype, types, fields, offsets)92}9394pub fn skip_union(95field_nodes: &mut VecDeque<Node>,96dtype: &ArrowDataType,97buffers: &mut VecDeque<IpcBuffer>,98variadic_buffer_counts: &mut VecDeque<usize>,99) -> PolarsResult<()> {100let _ = field_nodes.pop_front().ok_or_else(|| {101polars_err!(102oos = "IPC: unable to fetch the field for struct. The file or stream is corrupted."103)104})?;105106let _ = buffers107.pop_front()108.ok_or_else(|| polars_err!(oos = "IPC: missing validity buffer."))?;109if let ArrowDataType::Union(u) = dtype {110assert!(u.mode == UnionMode::Dense);111let _ = buffers112.pop_front()113.ok_or_else(|| polars_err!(oos = "IPC: missing offsets buffer."))?;114} else {115unreachable!()116};117118let fields = UnionArray::get_fields(dtype);119120fields121.iter()122.try_for_each(|field| skip(field_nodes, field.dtype(), buffers, variadic_buffer_counts))123}124125126