Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/mod.rs
6940 views
1
//! APIs to read from Parquet format.
2
3
mod binview;
4
mod boolean;
5
mod categorical;
6
mod dictionary_encoded;
7
mod fixed_size_binary;
8
mod nested;
9
mod nested_utils;
10
mod null;
11
mod primitive;
12
mod simple;
13
mod utils;
14
15
use arrow::array::{Array, FixedSizeListArray, ListArray, MapArray};
16
use arrow::bitmap::Bitmap;
17
use arrow::datatypes::{ArrowDataType, Field};
18
use arrow::offset::Offsets;
19
use polars_utils::mmap::MemReader;
20
use simple::page_iter_to_array;
21
22
pub use self::nested_utils::{InitNested, NestedState, init_nested};
23
pub use self::utils::filter::{Filter, PredicateFilter};
24
use self::utils::freeze_validity;
25
use super::*;
26
use crate::parquet::error::ParquetResult;
27
use crate::parquet::read::get_page_iterator as _get_page_iterator;
28
use crate::parquet::schema::types::PrimitiveType;
29
30
/// Creates a new iterator of compressed pages.
31
pub fn get_page_iterator(
32
column_metadata: &ColumnChunkMetadata,
33
reader: MemReader,
34
buffer: Vec<u8>,
35
max_header_size: usize,
36
) -> PolarsResult<PageReader> {
37
Ok(_get_page_iterator(
38
column_metadata,
39
reader,
40
buffer,
41
max_header_size,
42
)?)
43
}
44
45
/// Creates a new [`ListArray`] or [`FixedSizeListArray`].
46
pub fn create_list(
47
dtype: ArrowDataType,
48
nested: &mut NestedState,
49
values: Box<dyn Array>,
50
) -> Box<dyn Array> {
51
let (length, mut offsets, validity) = nested.pop().unwrap();
52
let validity = validity.and_then(freeze_validity);
53
match dtype.to_logical_type() {
54
ArrowDataType::List(_) => {
55
offsets.push(values.len() as i64);
56
57
let offsets = offsets.iter().map(|x| *x as i32).collect::<Vec<_>>();
58
59
let offsets: Offsets<i32> = offsets
60
.try_into()
61
.expect("i64 offsets do not fit in i32 offsets");
62
63
Box::new(ListArray::<i32>::new(
64
dtype,
65
offsets.into(),
66
values,
67
validity,
68
))
69
},
70
ArrowDataType::LargeList(_) => {
71
offsets.push(values.len() as i64);
72
73
Box::new(ListArray::<i64>::new(
74
dtype,
75
offsets.try_into().expect("List too large"),
76
values,
77
validity,
78
))
79
},
80
ArrowDataType::FixedSizeList(_, _) => {
81
Box::new(FixedSizeListArray::new(dtype, length, values, validity))
82
},
83
_ => unreachable!(),
84
}
85
}
86
87
/// Creates a new [`MapArray`].
88
pub fn create_map(
89
dtype: ArrowDataType,
90
nested: &mut NestedState,
91
values: Box<dyn Array>,
92
) -> Box<dyn Array> {
93
let (_, mut offsets, validity) = nested.pop().unwrap();
94
match dtype.to_logical_type() {
95
ArrowDataType::Map(_, _) => {
96
offsets.push(values.len() as i64);
97
let offsets = offsets.iter().map(|x| *x as i32).collect::<Vec<_>>();
98
99
let offsets: Offsets<i32> = offsets
100
.try_into()
101
.expect("i64 offsets do not fit in i32 offsets");
102
103
Box::new(MapArray::new(
104
dtype,
105
offsets.into(),
106
values,
107
validity.and_then(freeze_validity),
108
))
109
},
110
_ => unreachable!(),
111
}
112
}
113
114
fn is_primitive(dtype: &ArrowDataType) -> bool {
115
matches!(
116
dtype.to_physical_type(),
117
arrow::datatypes::PhysicalType::Primitive(_)
118
| arrow::datatypes::PhysicalType::Null
119
| arrow::datatypes::PhysicalType::Boolean
120
| arrow::datatypes::PhysicalType::Utf8
121
| arrow::datatypes::PhysicalType::LargeUtf8
122
| arrow::datatypes::PhysicalType::Binary
123
| arrow::datatypes::PhysicalType::BinaryView
124
| arrow::datatypes::PhysicalType::Utf8View
125
| arrow::datatypes::PhysicalType::LargeBinary
126
| arrow::datatypes::PhysicalType::FixedSizeBinary
127
| arrow::datatypes::PhysicalType::Dictionary(_)
128
)
129
}
130
131
fn columns_to_iter_recursive(
132
mut columns: Vec<BasicDecompressor>,
133
mut types: Vec<&PrimitiveType>,
134
field: Field,
135
init: Vec<InitNested>,
136
filter: Option<Filter>,
137
) -> ParquetResult<(NestedState, Box<dyn Array>, Bitmap)> {
138
if init.is_empty() && is_primitive(&field.dtype) {
139
let (_, array, pred_true_mask) = page_iter_to_array(
140
columns.pop().unwrap(),
141
types.pop().unwrap(),
142
field,
143
filter,
144
None,
145
)?;
146
147
return Ok((NestedState::default(), array, pred_true_mask));
148
}
149
150
nested::columns_to_iter_recursive(columns, types, field, init, filter)
151
}
152
153
/// Returns the number of (parquet) columns that a [`ArrowDataType`] contains.
154
pub fn n_columns(dtype: &ArrowDataType) -> usize {
155
use arrow::datatypes::PhysicalType::*;
156
match dtype.to_physical_type() {
157
Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
158
| Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => 1,
159
List | FixedSizeList | LargeList => {
160
let a = dtype.to_logical_type();
161
if let ArrowDataType::List(inner) = a {
162
n_columns(&inner.dtype)
163
} else if let ArrowDataType::LargeList(inner) = a {
164
n_columns(&inner.dtype)
165
} else if let ArrowDataType::FixedSizeList(inner, _) = a {
166
n_columns(&inner.dtype)
167
} else {
168
unreachable!()
169
}
170
},
171
Map => {
172
let a = dtype.to_logical_type();
173
if let ArrowDataType::Map(inner, _) = a {
174
n_columns(&inner.dtype)
175
} else {
176
unreachable!()
177
}
178
},
179
Struct => {
180
if let ArrowDataType::Struct(fields) = dtype.to_logical_type() {
181
fields.iter().map(|inner| n_columns(&inner.dtype)).sum()
182
} else {
183
unreachable!()
184
}
185
},
186
_ => todo!(),
187
}
188
}
189
190
/// An iterator adapter that maps multiple iterators of [`PagesIter`] into an iterator of [`Array`]s.
191
///
192
/// For a non-nested datatypes such as [`ArrowDataType::Int32`], this function requires a single element in `columns` and `types`.
193
/// For nested types, `columns` must be composed by all parquet columns with associated types `types`.
194
///
195
/// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.dtype`.
196
pub fn column_iter_to_arrays(
197
columns: Vec<BasicDecompressor>,
198
types: Vec<&PrimitiveType>,
199
field: Field,
200
filter: Option<Filter>,
201
) -> PolarsResult<(Box<dyn Array>, Bitmap)> {
202
let (_, array, pred_true_mask) =
203
columns_to_iter_recursive(columns, types, field, vec![], filter)?;
204
Ok((array, pred_true_mask))
205
}
206
207