Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/read/array/union.rs
8421 views
1
use std::collections::VecDeque;
2
use std::io::{Read, Seek};
3
4
use polars_error::{PolarsResult, polars_err};
5
use polars_utils::bool::UnsafeBool;
6
7
use super::super::super::IpcField;
8
use super::super::deserialize::{read, skip};
9
use super::super::read_basic::*;
10
use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};
11
use crate::array::UnionArray;
12
use crate::datatypes::{ArrowDataType, UnionMode};
13
use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node};
14
15
#[allow(clippy::too_many_arguments)]
16
pub fn read_union<R: Read + Seek>(
17
field_nodes: &mut VecDeque<Node>,
18
variadic_buffer_counts: &mut VecDeque<usize>,
19
dtype: ArrowDataType,
20
ipc_field: &IpcField,
21
buffers: &mut VecDeque<IpcBuffer>,
22
reader: &mut R,
23
dictionaries: &Dictionaries,
24
block_offset: u64,
25
is_little_endian: bool,
26
compression: Option<Compression>,
27
limit: Option<usize>,
28
version: Version,
29
scratch: &mut Vec<u8>,
30
checked: UnsafeBool,
31
) -> PolarsResult<UnionArray> {
32
let field_node = try_get_field_node(field_nodes, &dtype)?;
33
34
if version != Version::V5 {
35
let _ = buffers
36
.pop_front()
37
.ok_or_else(|| polars_err!(oos = "IPC: missing validity buffer."))?;
38
};
39
40
let length = try_get_array_length(field_node, limit)?;
41
42
let types = read_buffer(
43
buffers,
44
length,
45
reader,
46
block_offset,
47
is_little_endian,
48
compression,
49
scratch,
50
)?;
51
52
let offsets = if let ArrowDataType::Union(u) = &dtype {
53
if !u.mode.is_sparse() {
54
Some(read_buffer(
55
buffers,
56
length,
57
reader,
58
block_offset,
59
is_little_endian,
60
compression,
61
scratch,
62
)?)
63
} else {
64
None
65
}
66
} else {
67
unreachable!()
68
};
69
70
let fields = UnionArray::get_fields(&dtype);
71
72
let fields = fields
73
.iter()
74
.zip(ipc_field.fields.iter())
75
.map(|(field, ipc_field)| {
76
read(
77
field_nodes,
78
variadic_buffer_counts,
79
field,
80
ipc_field,
81
buffers,
82
reader,
83
dictionaries,
84
block_offset,
85
is_little_endian,
86
compression,
87
None,
88
version,
89
scratch,
90
checked,
91
)
92
})
93
.collect::<PolarsResult<Vec<_>>>()?;
94
95
UnionArray::try_new(dtype, types, fields, offsets)
96
}
97
98
pub fn skip_union(
99
field_nodes: &mut VecDeque<Node>,
100
dtype: &ArrowDataType,
101
buffers: &mut VecDeque<IpcBuffer>,
102
variadic_buffer_counts: &mut VecDeque<usize>,
103
) -> PolarsResult<()> {
104
let _ = field_nodes.pop_front().ok_or_else(|| {
105
polars_err!(
106
oos = "IPC: unable to fetch the field for struct. The file or stream is corrupted."
107
)
108
})?;
109
110
let _ = buffers
111
.pop_front()
112
.ok_or_else(|| polars_err!(oos = "IPC: missing validity buffer."))?;
113
if let ArrowDataType::Union(u) = dtype {
114
assert!(u.mode == UnionMode::Dense);
115
let _ = buffers
116
.pop_front()
117
.ok_or_else(|| polars_err!(oos = "IPC: missing offsets buffer."))?;
118
} else {
119
unreachable!()
120
};
121
122
let fields = UnionArray::get_fields(dtype);
123
124
fields
125
.iter()
126
.try_for_each(|field| skip(field_nodes, field.dtype(), buffers, variadic_buffer_counts))
127
}
128
129