Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/nested.rs
8458 views
1
use arrow::array::StructArray;
2
use arrow::datatypes::{
3
DTYPE_CATEGORICAL_LEGACY, DTYPE_CATEGORICAL_NEW, DTYPE_ENUM_VALUES_LEGACY,
4
DTYPE_ENUM_VALUES_NEW, IntegerType,
5
};
6
use polars_compute::cast::CastOptionsImpl;
7
8
use self::categorical::CategoricalDecoder;
9
use self::nested::deserialize::utils::freeze_validity;
10
use self::nested_utils::NestedContent;
11
use self::utils::PageDecoder;
12
use super::*;
13
use crate::parquet::error::ParquetResult;
14
15
pub fn columns_to_iter_recursive(
16
mut columns: Vec<BasicDecompressor>,
17
mut types: Vec<&PrimitiveType>,
18
field: Field,
19
mut init: Vec<InitNested>,
20
filter: Option<Filter>,
21
) -> ParquetResult<(NestedState, Vec<Box<dyn Array>>, Bitmap)> {
22
if !field.dtype().is_nested() || field.is_pl_pq_empty_struct() {
23
let pages = columns.pop().unwrap();
24
init.push(InitNested::Primitive(field.is_nullable));
25
let type_ = types.pop().unwrap();
26
let (nested, arr, pdm) = page_iter_to_array(pages, type_, field, filter, Some(init))?;
27
Ok((nested.unwrap(), arr, pdm))
28
} else {
29
match field.dtype() {
30
ArrowDataType::List(inner) | ArrowDataType::LargeList(inner) => {
31
init.push(InitNested::List(field.is_nullable));
32
let (mut nested, array, ptm) = columns_to_iter_recursive(
33
columns,
34
types,
35
inner.as_ref().clone(),
36
init,
37
filter,
38
)?;
39
let array = array
40
.into_iter()
41
.map(|array| create_list(field.dtype().clone(), &mut nested, array))
42
.collect();
43
Ok((nested, array, ptm))
44
},
45
ArrowDataType::FixedSizeList(inner, width) => {
46
init.push(InitNested::FixedSizeList(field.is_nullable, *width));
47
let (mut nested, array, ptm) = columns_to_iter_recursive(
48
columns,
49
types,
50
inner.as_ref().clone(),
51
init,
52
filter,
53
)?;
54
let array = array
55
.into_iter()
56
.map(|array| create_list(field.dtype().clone(), &mut nested, array))
57
.collect();
58
Ok((nested, array, ptm))
59
},
60
ArrowDataType::Struct(fields) => {
61
// This definitely does not support Filter predicate yet.
62
assert!(!matches!(&filter, Some(Filter::Predicate(_))));
63
64
// @NOTE:
65
// We go back to front here, because we constantly split off the end of the array
66
// to grab the relevant columns and types.
67
//
68
// Is this inefficient? Yes. Is this how we are going to do it for now? Yes.
69
70
let Some(last_field) = fields.last() else {
71
return Err(ParquetError::not_supported("Struct has zero fields"));
72
};
73
74
let field_to_nested_array =
75
|mut init: Vec<InitNested>,
76
columns: &mut Vec<BasicDecompressor>,
77
types: &mut Vec<&PrimitiveType>,
78
struct_field: &Field| {
79
init.push(InitNested::Struct(field.is_nullable));
80
let n = n_columns(&struct_field.dtype);
81
let columns = columns.split_off(columns.len() - n);
82
let types = types.split_off(types.len() - n);
83
84
columns_to_iter_recursive(
85
columns,
86
types,
87
struct_field.clone(),
88
init,
89
filter.clone(),
90
)
91
};
92
93
let (mut nested, mut last_array, _) =
94
field_to_nested_array(init.clone(), &mut columns, &mut types, last_field)?;
95
debug_assert!(matches!(nested.last().unwrap(), NestedContent::Struct));
96
let (length, _, struct_validity) = nested.pop().unwrap();
97
98
let mut field_arrays = Vec::<Box<dyn Array>>::with_capacity(fields.len());
99
assert_eq!(last_array.len(), 1);
100
field_arrays.push(last_array.pop().unwrap());
101
102
for field in fields.iter().rev().skip(1) {
103
let (mut _nested, mut array, _) =
104
field_to_nested_array(init.clone(), &mut columns, &mut types, field)?;
105
106
#[cfg(debug_assertions)]
107
{
108
debug_assert!(matches!(_nested.last().unwrap(), NestedContent::Struct));
109
debug_assert_eq!(
110
_nested.pop().unwrap().2.and_then(freeze_validity),
111
struct_validity.clone().and_then(freeze_validity),
112
);
113
}
114
115
assert_eq!(array.len(), 1);
116
field_arrays.push(array.pop().unwrap());
117
}
118
119
field_arrays.reverse();
120
let struct_validity = struct_validity.and_then(freeze_validity);
121
122
Ok((
123
nested,
124
vec![
125
StructArray::new(
126
ArrowDataType::Struct(fields.clone()),
127
length,
128
field_arrays,
129
struct_validity,
130
)
131
.to_boxed(),
132
],
133
Bitmap::new(),
134
))
135
},
136
ArrowDataType::Map(inner, _) => {
137
init.push(InitNested::List(field.is_nullable));
138
let (mut nested, array, ptm) = columns_to_iter_recursive(
139
columns,
140
types,
141
inner.as_ref().clone(),
142
init,
143
filter,
144
)?;
145
let array = array
146
.into_iter()
147
.map(|array| create_map(field.dtype().clone(), &mut nested, array))
148
.collect();
149
Ok((nested, array, ptm))
150
},
151
152
ArrowDataType::Dictionary(key_type, value_type, _) => {
153
// @note: this should only hit in two cases:
154
// - polars enum's and categorical's
155
// - int -> string which can be turned into categoricals
156
assert!(matches!(value_type.as_ref(), ArrowDataType::Utf8View));
157
158
init.push(InitNested::Primitive(field.is_nullable));
159
160
if field.metadata.as_ref().is_none_or(|md| {
161
!md.contains_key(DTYPE_ENUM_VALUES_LEGACY)
162
&& !md.contains_key(DTYPE_ENUM_VALUES_NEW)
163
&& !md.contains_key(DTYPE_CATEGORICAL_NEW)
164
&& !md.contains_key(DTYPE_CATEGORICAL_LEGACY)
165
}) {
166
let (nested, arrays, ptm) = PageDecoder::new(
167
&field.name,
168
columns.pop().unwrap(),
169
ArrowDataType::Utf8View,
170
binview::BinViewDecoder::new_string(),
171
Some(init),
172
)?
173
.collect_nested(filter)?;
174
175
let arrays = arrays
176
.into_iter()
177
.map(|arr| {
178
polars_compute::cast::cast(
179
arr.as_ref(),
180
field.dtype(),
181
CastOptionsImpl::default(),
182
)
183
.unwrap()
184
})
185
.collect();
186
187
Ok((nested, arrays, ptm))
188
} else {
189
let (nested, arr, ptm) = match key_type {
190
IntegerType::UInt8 => PageDecoder::new(
191
&field.name,
192
columns.pop().unwrap(),
193
field.dtype().clone(),
194
CategoricalDecoder::<u8>::new(),
195
Some(init),
196
)?
197
.collect_boxed(filter)?,
198
IntegerType::UInt16 => PageDecoder::new(
199
&field.name,
200
columns.pop().unwrap(),
201
field.dtype().clone(),
202
CategoricalDecoder::<u16>::new(),
203
Some(init),
204
)?
205
.collect_boxed(filter)?,
206
IntegerType::UInt32 => PageDecoder::new(
207
&field.name,
208
columns.pop().unwrap(),
209
field.dtype().clone(),
210
CategoricalDecoder::<u32>::new(),
211
Some(init),
212
)?
213
.collect_boxed(filter)?,
214
_ => unimplemented!(),
215
};
216
217
Ok((nested.unwrap(), arr, ptm))
218
}
219
},
220
221
ArrowDataType::Extension(ext) => {
222
// Perform deserialization for the storage type.
223
let (nested, mut array, ptm) = columns_to_iter_recursive(
224
columns,
225
types,
226
field.with_dtype(ext.inner.clone()),
227
init,
228
filter,
229
)?;
230
231
// Restore the extension type.
232
for arr in &mut array {
233
assert!(arr.dtype() == &ext.inner);
234
*arr.dtype_mut() = field.dtype.clone();
235
}
236
237
Ok((nested, array, ptm))
238
},
239
240
other => Err(ParquetError::not_supported(format!(
241
"Deserializing type {other:?} from parquet"
242
))),
243
}
244
}
245
}
246
247