Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/read/array/map.rs
6940 views
1
use std::collections::VecDeque;
2
use std::io::{Read, Seek};
3
4
use polars_error::{PolarsResult, polars_err};
5
6
use super::super::super::IpcField;
7
use super::super::deserialize::{read, skip};
8
use super::super::read_basic::*;
9
use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};
10
use crate::array::MapArray;
11
use crate::buffer::Buffer;
12
use crate::datatypes::ArrowDataType;
13
use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node};
14
15
#[allow(clippy::too_many_arguments)]
16
pub fn read_map<R: Read + Seek>(
17
field_nodes: &mut VecDeque<Node>,
18
variadic_buffer_counts: &mut VecDeque<usize>,
19
dtype: ArrowDataType,
20
ipc_field: &IpcField,
21
buffers: &mut VecDeque<IpcBuffer>,
22
reader: &mut R,
23
dictionaries: &Dictionaries,
24
block_offset: u64,
25
is_little_endian: bool,
26
compression: Option<Compression>,
27
limit: Option<usize>,
28
version: Version,
29
scratch: &mut Vec<u8>,
30
) -> PolarsResult<MapArray> {
31
let field_node = try_get_field_node(field_nodes, &dtype)?;
32
33
let validity = read_validity(
34
buffers,
35
field_node,
36
reader,
37
block_offset,
38
is_little_endian,
39
compression,
40
limit,
41
scratch,
42
)?;
43
44
let length = try_get_array_length(field_node, limit)?;
45
46
let offsets = read_buffer::<i32, _>(
47
buffers,
48
1 + length,
49
reader,
50
block_offset,
51
is_little_endian,
52
compression,
53
scratch,
54
)
55
// Older versions of the IPC format sometimes do not report an offset
56
.or_else(|_| PolarsResult::Ok(Buffer::<i32>::from(vec![0i32])))?;
57
58
let field = MapArray::get_field(&dtype);
59
60
let last_offset: usize = offsets.last().copied().unwrap() as usize;
61
62
let field = read(
63
field_nodes,
64
variadic_buffer_counts,
65
field,
66
&ipc_field.fields[0],
67
buffers,
68
reader,
69
dictionaries,
70
block_offset,
71
is_little_endian,
72
compression,
73
Some(last_offset),
74
version,
75
scratch,
76
)?;
77
MapArray::try_new(dtype, offsets.try_into()?, field, validity)
78
}
79
80
pub fn skip_map(
81
field_nodes: &mut VecDeque<Node>,
82
dtype: &ArrowDataType,
83
buffers: &mut VecDeque<IpcBuffer>,
84
variadic_buffer_counts: &mut VecDeque<usize>,
85
) -> PolarsResult<()> {
86
let _ = field_nodes.pop_front().ok_or_else(|| {
87
polars_err!(
88
oos = "IPC: unable to fetch the field for map. The file or stream is corrupted."
89
)
90
})?;
91
92
let _ = buffers
93
.pop_front()
94
.ok_or_else(|| polars_err!(oos = "IPC: missing validity buffer."))?;
95
let _ = buffers
96
.pop_front()
97
.ok_or_else(|| polars_err!(oos = "IPC: missing offsets buffer."))?;
98
99
let dtype = MapArray::get_field(dtype).dtype();
100
101
skip(field_nodes, dtype, buffers, variadic_buffer_counts)
102
}
103
104