Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/nested.rs
6940 views
1
use arrow::array::StructArray;
2
use arrow::datatypes::{
3
DTYPE_CATEGORICAL_LEGACY, DTYPE_CATEGORICAL_NEW, DTYPE_ENUM_VALUES_LEGACY,
4
DTYPE_ENUM_VALUES_NEW, IntegerType,
5
};
6
use polars_compute::cast::CastOptionsImpl;
7
8
use self::categorical::CategoricalDecoder;
9
use self::nested::deserialize::utils::freeze_validity;
10
use self::nested_utils::NestedContent;
11
use self::utils::PageDecoder;
12
use super::*;
13
use crate::parquet::error::ParquetResult;
14
15
pub fn columns_to_iter_recursive(
16
mut columns: Vec<BasicDecompressor>,
17
mut types: Vec<&PrimitiveType>,
18
field: Field,
19
mut init: Vec<InitNested>,
20
filter: Option<Filter>,
21
) -> ParquetResult<(NestedState, Box<dyn Array>, Bitmap)> {
22
if !field.dtype().is_nested() {
23
let pages = columns.pop().unwrap();
24
init.push(InitNested::Primitive(field.is_nullable));
25
let type_ = types.pop().unwrap();
26
let (nested, arr, pdm) = page_iter_to_array(pages, type_, field, filter, Some(init))?;
27
Ok((nested.unwrap(), arr, pdm))
28
} else {
29
match field.dtype() {
30
ArrowDataType::List(inner) | ArrowDataType::LargeList(inner) => {
31
init.push(InitNested::List(field.is_nullable));
32
let (mut nested, array, ptm) = columns_to_iter_recursive(
33
columns,
34
types,
35
inner.as_ref().clone(),
36
init,
37
filter,
38
)?;
39
let array = create_list(field.dtype().clone(), &mut nested, array);
40
Ok((nested, array, ptm))
41
},
42
ArrowDataType::FixedSizeList(inner, width) => {
43
init.push(InitNested::FixedSizeList(field.is_nullable, *width));
44
let (mut nested, array, ptm) = columns_to_iter_recursive(
45
columns,
46
types,
47
inner.as_ref().clone(),
48
init,
49
filter,
50
)?;
51
let array = create_list(field.dtype().clone(), &mut nested, array);
52
Ok((nested, array, ptm))
53
},
54
ArrowDataType::Struct(fields) => {
55
// This definitely does not support Filter predicate yet.
56
assert!(!matches!(&filter, Some(Filter::Predicate(_))));
57
58
// @NOTE:
59
// We go back to front here, because we constantly split off the end of the array
60
// to grab the relevant columns and types.
61
//
62
// Is this inefficient? Yes. Is this how we are going to do it for now? Yes.
63
64
let Some(last_field) = fields.last() else {
65
return Err(ParquetError::not_supported("Struct has zero fields"));
66
};
67
68
let field_to_nested_array =
69
|mut init: Vec<InitNested>,
70
columns: &mut Vec<BasicDecompressor>,
71
types: &mut Vec<&PrimitiveType>,
72
struct_field: &Field| {
73
init.push(InitNested::Struct(field.is_nullable));
74
let n = n_columns(&struct_field.dtype);
75
let columns = columns.split_off(columns.len() - n);
76
let types = types.split_off(types.len() - n);
77
78
columns_to_iter_recursive(
79
columns,
80
types,
81
struct_field.clone(),
82
init,
83
filter.clone(),
84
)
85
};
86
87
let (mut nested, last_array, _) =
88
field_to_nested_array(init.clone(), &mut columns, &mut types, last_field)?;
89
debug_assert!(matches!(nested.last().unwrap(), NestedContent::Struct));
90
let (length, _, struct_validity) = nested.pop().unwrap();
91
92
let mut field_arrays = Vec::<Box<dyn Array>>::with_capacity(fields.len());
93
field_arrays.push(last_array);
94
95
for field in fields.iter().rev().skip(1) {
96
let (mut _nested, array, _) =
97
field_to_nested_array(init.clone(), &mut columns, &mut types, field)?;
98
99
#[cfg(debug_assertions)]
100
{
101
debug_assert!(matches!(_nested.last().unwrap(), NestedContent::Struct));
102
debug_assert_eq!(
103
_nested.pop().unwrap().2.and_then(freeze_validity),
104
struct_validity.clone().and_then(freeze_validity),
105
);
106
}
107
108
field_arrays.push(array);
109
}
110
111
field_arrays.reverse();
112
let struct_validity = struct_validity.and_then(freeze_validity);
113
114
Ok((
115
nested,
116
StructArray::new(
117
ArrowDataType::Struct(fields.clone()),
118
length,
119
field_arrays,
120
struct_validity,
121
)
122
.to_boxed(),
123
Bitmap::new(),
124
))
125
},
126
ArrowDataType::Map(inner, _) => {
127
init.push(InitNested::List(field.is_nullable));
128
let (mut nested, array, ptm) = columns_to_iter_recursive(
129
columns,
130
types,
131
inner.as_ref().clone(),
132
init,
133
filter,
134
)?;
135
let array = create_map(field.dtype().clone(), &mut nested, array);
136
Ok((nested, array, ptm))
137
},
138
139
ArrowDataType::Dictionary(key_type, value_type, _) => {
140
// @note: this should only hit in two cases:
141
// - polars enum's and categorical's
142
// - int -> string which can be turned into categoricals
143
assert!(matches!(value_type.as_ref(), ArrowDataType::Utf8View));
144
145
init.push(InitNested::Primitive(field.is_nullable));
146
147
if field.metadata.as_ref().is_none_or(|md| {
148
!md.contains_key(DTYPE_ENUM_VALUES_LEGACY)
149
&& !md.contains_key(DTYPE_ENUM_VALUES_NEW)
150
&& !md.contains_key(DTYPE_CATEGORICAL_NEW)
151
&& !md.contains_key(DTYPE_CATEGORICAL_LEGACY)
152
}) {
153
let (nested, arr, ptm) = PageDecoder::new(
154
&field.name,
155
columns.pop().unwrap(),
156
ArrowDataType::Utf8View,
157
binview::BinViewDecoder::new_string(),
158
Some(init),
159
)?
160
.collect_nested(filter)?;
161
162
let arr = polars_compute::cast::cast(
163
arr.as_ref(),
164
field.dtype(),
165
CastOptionsImpl::default(),
166
)
167
.unwrap();
168
169
Ok((nested, arr, ptm))
170
} else {
171
let (nested, arr, ptm) = match key_type {
172
IntegerType::UInt8 => PageDecoder::new(
173
&field.name,
174
columns.pop().unwrap(),
175
field.dtype().clone(),
176
CategoricalDecoder::<u8>::new(),
177
Some(init),
178
)?
179
.collect_boxed(filter)?,
180
IntegerType::UInt16 => PageDecoder::new(
181
&field.name,
182
columns.pop().unwrap(),
183
field.dtype().clone(),
184
CategoricalDecoder::<u16>::new(),
185
Some(init),
186
)?
187
.collect_boxed(filter)?,
188
IntegerType::UInt32 => PageDecoder::new(
189
&field.name,
190
columns.pop().unwrap(),
191
field.dtype().clone(),
192
CategoricalDecoder::<u32>::new(),
193
Some(init),
194
)?
195
.collect_boxed(filter)?,
196
_ => unimplemented!(),
197
};
198
199
Ok((nested.unwrap(), arr, ptm))
200
}
201
},
202
other => Err(ParquetError::not_supported(format!(
203
"Deserializing type {other:?} from parquet"
204
))),
205
}
206
}
207
}
208
209