Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/read/array/union.rs
6940 views
1
use std::collections::VecDeque;
2
use std::io::{Read, Seek};
3
4
use polars_error::{PolarsResult, polars_err};
5
6
use super::super::super::IpcField;
7
use super::super::deserialize::{read, skip};
8
use super::super::read_basic::*;
9
use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};
10
use crate::array::UnionArray;
11
use crate::datatypes::{ArrowDataType, UnionMode};
12
use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node};
13
14
#[allow(clippy::too_many_arguments)]
15
pub fn read_union<R: Read + Seek>(
16
field_nodes: &mut VecDeque<Node>,
17
variadic_buffer_counts: &mut VecDeque<usize>,
18
dtype: ArrowDataType,
19
ipc_field: &IpcField,
20
buffers: &mut VecDeque<IpcBuffer>,
21
reader: &mut R,
22
dictionaries: &Dictionaries,
23
block_offset: u64,
24
is_little_endian: bool,
25
compression: Option<Compression>,
26
limit: Option<usize>,
27
version: Version,
28
scratch: &mut Vec<u8>,
29
) -> PolarsResult<UnionArray> {
30
let field_node = try_get_field_node(field_nodes, &dtype)?;
31
32
if version != Version::V5 {
33
let _ = buffers
34
.pop_front()
35
.ok_or_else(|| polars_err!(oos = "IPC: missing validity buffer."))?;
36
};
37
38
let length = try_get_array_length(field_node, limit)?;
39
40
let types = read_buffer(
41
buffers,
42
length,
43
reader,
44
block_offset,
45
is_little_endian,
46
compression,
47
scratch,
48
)?;
49
50
let offsets = if let ArrowDataType::Union(u) = &dtype {
51
if !u.mode.is_sparse() {
52
Some(read_buffer(
53
buffers,
54
length,
55
reader,
56
block_offset,
57
is_little_endian,
58
compression,
59
scratch,
60
)?)
61
} else {
62
None
63
}
64
} else {
65
unreachable!()
66
};
67
68
let fields = UnionArray::get_fields(&dtype);
69
70
let fields = fields
71
.iter()
72
.zip(ipc_field.fields.iter())
73
.map(|(field, ipc_field)| {
74
read(
75
field_nodes,
76
variadic_buffer_counts,
77
field,
78
ipc_field,
79
buffers,
80
reader,
81
dictionaries,
82
block_offset,
83
is_little_endian,
84
compression,
85
None,
86
version,
87
scratch,
88
)
89
})
90
.collect::<PolarsResult<Vec<_>>>()?;
91
92
UnionArray::try_new(dtype, types, fields, offsets)
93
}
94
95
pub fn skip_union(
96
field_nodes: &mut VecDeque<Node>,
97
dtype: &ArrowDataType,
98
buffers: &mut VecDeque<IpcBuffer>,
99
variadic_buffer_counts: &mut VecDeque<usize>,
100
) -> PolarsResult<()> {
101
let _ = field_nodes.pop_front().ok_or_else(|| {
102
polars_err!(
103
oos = "IPC: unable to fetch the field for struct. The file or stream is corrupted."
104
)
105
})?;
106
107
let _ = buffers
108
.pop_front()
109
.ok_or_else(|| polars_err!(oos = "IPC: missing validity buffer."))?;
110
if let ArrowDataType::Union(u) = dtype {
111
assert!(u.mode == UnionMode::Dense);
112
let _ = buffers
113
.pop_front()
114
.ok_or_else(|| polars_err!(oos = "IPC: missing offsets buffer."))?;
115
} else {
116
unreachable!()
117
};
118
119
let fields = UnionArray::get_fields(dtype);
120
121
fields
122
.iter()
123
.try_for_each(|field| skip(field_nodes, field.dtype(), buffers, variadic_buffer_counts))
124
}
125
126