Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/mod.rs
8503 views
1
//! APIs to read from Parquet format.
2
3
mod binary;
4
mod binview;
5
mod boolean;
6
mod categorical;
7
mod dictionary_encoded;
8
mod fixed_size_binary;
9
mod nested;
10
mod nested_utils;
11
mod null;
12
mod primitive;
13
mod simple;
14
mod utils;
15
16
use std::io::Cursor;
17
18
use arrow::array::{Array, FixedSizeListArray, ListArray, MapArray};
19
use arrow::bitmap::Bitmap;
20
use arrow::datatypes::{ArrowDataType, Field};
21
use arrow::offset::Offsets;
22
use polars_buffer::Buffer;
23
use simple::page_iter_to_array;
24
25
pub use self::nested_utils::{InitNested, NestedState, init_nested};
26
pub use self::utils::filter::{Filter, PredicateFilter};
27
use self::utils::freeze_validity;
28
use super::*;
29
use crate::parquet::error::ParquetResult;
30
use crate::parquet::read::get_page_iterator as _get_page_iterator;
31
use crate::parquet::schema::types::PrimitiveType;
32
33
/// Creates a new iterator of compressed pages.
34
pub fn get_page_iterator(
35
column_metadata: &ColumnChunkMetadata,
36
reader: Cursor<Buffer<u8>>,
37
buffer: Vec<u8>,
38
max_header_size: usize,
39
) -> PolarsResult<PageReader> {
40
Ok(_get_page_iterator(
41
column_metadata,
42
reader,
43
buffer,
44
max_header_size,
45
)?)
46
}
47
48
/// Creates a new [`ListArray`] or [`FixedSizeListArray`].
49
pub fn create_list(
50
dtype: ArrowDataType,
51
nested: &mut NestedState,
52
values: Box<dyn Array>,
53
) -> Box<dyn Array> {
54
let (length, mut offsets, validity) = nested.pop().unwrap();
55
let validity = validity.and_then(freeze_validity);
56
match dtype.to_storage() {
57
ArrowDataType::List(_) => {
58
offsets.push(values.len() as i64);
59
60
let offsets = offsets.iter().map(|x| *x as i32).collect::<Vec<_>>();
61
62
let offsets: Offsets<i32> = offsets
63
.try_into()
64
.expect("i64 offsets do not fit in i32 offsets");
65
66
Box::new(ListArray::<i32>::new(
67
dtype,
68
offsets.into(),
69
values,
70
validity,
71
))
72
},
73
ArrowDataType::LargeList(_) => {
74
offsets.push(values.len() as i64);
75
76
Box::new(ListArray::<i64>::new(
77
dtype,
78
offsets.try_into().expect("List too large"),
79
values,
80
validity,
81
))
82
},
83
ArrowDataType::FixedSizeList(_, _) => {
84
Box::new(FixedSizeListArray::new(dtype, length, values, validity))
85
},
86
_ => unreachable!(),
87
}
88
}
89
90
/// Creates a new [`MapArray`].
91
pub fn create_map(
92
dtype: ArrowDataType,
93
nested: &mut NestedState,
94
values: Box<dyn Array>,
95
) -> Box<dyn Array> {
96
let (_, mut offsets, validity) = nested.pop().unwrap();
97
match dtype.to_storage() {
98
ArrowDataType::Map(_, _) => {
99
offsets.push(values.len() as i64);
100
let offsets = offsets.iter().map(|x| *x as i32).collect::<Vec<_>>();
101
102
let offsets: Offsets<i32> = offsets
103
.try_into()
104
.expect("i64 offsets do not fit in i32 offsets");
105
106
Box::new(MapArray::new(
107
dtype,
108
offsets.into(),
109
values,
110
validity.and_then(freeze_validity),
111
))
112
},
113
_ => unreachable!(),
114
}
115
}
116
117
fn is_primitive(dtype: &ArrowDataType) -> bool {
118
matches!(
119
dtype.to_physical_type(),
120
arrow::datatypes::PhysicalType::Primitive(_)
121
| arrow::datatypes::PhysicalType::Null
122
| arrow::datatypes::PhysicalType::Boolean
123
| arrow::datatypes::PhysicalType::Utf8
124
| arrow::datatypes::PhysicalType::LargeUtf8
125
| arrow::datatypes::PhysicalType::Binary
126
| arrow::datatypes::PhysicalType::BinaryView
127
| arrow::datatypes::PhysicalType::Utf8View
128
| arrow::datatypes::PhysicalType::LargeBinary
129
| arrow::datatypes::PhysicalType::FixedSizeBinary
130
| arrow::datatypes::PhysicalType::Dictionary(_)
131
) && !matches!(dtype, ArrowDataType::Extension(_))
132
}
133
134
fn columns_to_iter_recursive(
135
mut columns: Vec<BasicDecompressor>,
136
mut types: Vec<&PrimitiveType>,
137
field: Field,
138
init: Vec<InitNested>,
139
filter: Option<Filter>,
140
) -> ParquetResult<(NestedState, Vec<Box<dyn Array>>, Bitmap)> {
141
if init.is_empty() && is_primitive(&field.dtype) {
142
let (_, array, pred_true_mask) = page_iter_to_array(
143
columns.pop().unwrap(),
144
types.pop().unwrap(),
145
field,
146
filter,
147
None,
148
)?;
149
150
return Ok((NestedState::default(), array, pred_true_mask));
151
}
152
153
nested::columns_to_iter_recursive(columns, types, field, init, filter)
154
}
155
156
/// Returns the number of (parquet) columns that a [`ArrowDataType`] contains.
157
pub fn n_columns(dtype: &ArrowDataType) -> usize {
158
use arrow::datatypes::PhysicalType::*;
159
match dtype.to_physical_type() {
160
Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
161
| Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => 1,
162
List | FixedSizeList | LargeList => {
163
let a = dtype.to_storage();
164
if let ArrowDataType::List(inner) = a {
165
n_columns(&inner.dtype)
166
} else if let ArrowDataType::LargeList(inner) = a {
167
n_columns(&inner.dtype)
168
} else if let ArrowDataType::FixedSizeList(inner, _) = a {
169
n_columns(&inner.dtype)
170
} else {
171
unreachable!()
172
}
173
},
174
Map => {
175
let a = dtype.to_storage();
176
if let ArrowDataType::Map(inner, _) = a {
177
n_columns(&inner.dtype)
178
} else {
179
unreachable!()
180
}
181
},
182
Struct => {
183
if let ArrowDataType::Struct(fields) = dtype.to_storage() {
184
fields.iter().map(|inner| n_columns(&inner.dtype)).sum()
185
} else {
186
unreachable!()
187
}
188
},
189
_ => todo!(),
190
}
191
}
192
193
/// An iterator adapter that maps multiple iterators of [`PagesIter`] into an iterator of [`Array`]s.
194
///
195
/// For a non-nested datatypes such as [`ArrowDataType::Int32`], this function requires a single element in `columns` and `types`.
196
/// For nested types, `columns` must be composed by all parquet columns with associated types `types`.
197
///
198
/// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.dtype`.
199
pub fn column_iter_to_arrays(
200
columns: Vec<BasicDecompressor>,
201
types: Vec<&PrimitiveType>,
202
field: Field,
203
filter: Option<Filter>,
204
) -> PolarsResult<(Vec<Box<dyn Array>>, Bitmap)> {
205
let (_, array, pred_true_mask) =
206
columns_to_iter_recursive(columns, types, field, vec![], filter)?;
207
Ok((array, pred_true_mask))
208
}
209
210