Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/read/array/struct_.rs
6940 views
1
use std::collections::VecDeque;
2
use std::io::{Read, Seek};
3
4
use polars_error::{PolarsResult, polars_err};
5
6
use super::super::super::IpcField;
7
use super::super::deserialize::{read, skip};
8
use super::super::read_basic::*;
9
use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};
10
use super::try_get_array_length;
11
use crate::array::StructArray;
12
use crate::datatypes::ArrowDataType;
13
use crate::io::ipc::read::array::try_get_field_node;
14
15
#[allow(clippy::too_many_arguments)]
16
pub fn read_struct<R: Read + Seek>(
17
field_nodes: &mut VecDeque<Node>,
18
variadic_buffer_counts: &mut VecDeque<usize>,
19
dtype: ArrowDataType,
20
ipc_field: &IpcField,
21
buffers: &mut VecDeque<IpcBuffer>,
22
reader: &mut R,
23
dictionaries: &Dictionaries,
24
block_offset: u64,
25
is_little_endian: bool,
26
compression: Option<Compression>,
27
limit: Option<usize>,
28
version: Version,
29
scratch: &mut Vec<u8>,
30
) -> PolarsResult<StructArray> {
31
let field_node = try_get_field_node(field_nodes, &dtype)?;
32
let length = try_get_array_length(field_node, limit)?;
33
34
let validity = read_validity(
35
buffers,
36
field_node,
37
reader,
38
block_offset,
39
is_little_endian,
40
compression,
41
limit,
42
scratch,
43
)?;
44
45
let fields = StructArray::get_fields(&dtype);
46
47
let values = fields
48
.iter()
49
.zip(ipc_field.fields.iter())
50
.map(|(field, ipc_field)| {
51
read(
52
field_nodes,
53
variadic_buffer_counts,
54
field,
55
ipc_field,
56
buffers,
57
reader,
58
dictionaries,
59
block_offset,
60
is_little_endian,
61
compression,
62
limit,
63
version,
64
scratch,
65
)
66
})
67
.collect::<PolarsResult<Vec<_>>>()?;
68
69
StructArray::try_new(dtype, length, values, validity)
70
}
71
72
pub fn skip_struct(
73
field_nodes: &mut VecDeque<Node>,
74
dtype: &ArrowDataType,
75
buffers: &mut VecDeque<IpcBuffer>,
76
variadic_buffer_counts: &mut VecDeque<usize>,
77
) -> PolarsResult<()> {
78
let _ = field_nodes.pop_front().ok_or_else(|| {
79
polars_err!(
80
oos = "IPC: unable to fetch the field for struct. The file or stream is corrupted."
81
)
82
})?;
83
84
let _ = buffers
85
.pop_front()
86
.ok_or_else(|| polars_err!(oos = "IPC: missing validity buffer."))?;
87
88
let fields = StructArray::get_fields(dtype);
89
90
fields
91
.iter()
92
.try_for_each(|field| skip(field_nodes, field.dtype(), buffers, variadic_buffer_counts))
93
}
94
95