Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/write/primitive/basic.rs
6940 views
1
use arrow::array::{Array, PrimitiveArray};
2
use arrow::scalar::PrimitiveScalar;
3
use arrow::types::NativeType;
4
use polars_error::{PolarsResult, polars_bail};
5
6
use super::super::{WriteOptions, utils};
7
use crate::arrow::read::schema::is_nullable;
8
use crate::arrow::write::utils::ExactSizedIter;
9
use crate::parquet::encoding::Encoding;
10
use crate::parquet::encoding::delta_bitpacked::encode;
11
use crate::parquet::page::DataPage;
12
use crate::parquet::schema::types::PrimitiveType;
13
use crate::parquet::statistics::PrimitiveStatistics;
14
use crate::parquet::types::NativeType as ParquetNativeType;
15
use crate::read::Page;
16
use crate::write::{EncodeNullability, StatisticsOptions};
17
18
pub(crate) fn encode_plain<T, P>(
19
array: &PrimitiveArray<T>,
20
options: EncodeNullability,
21
mut buffer: Vec<u8>,
22
) -> Vec<u8>
23
where
24
T: NativeType,
25
P: ParquetNativeType,
26
T: num_traits::AsPrimitive<P>,
27
{
28
let is_optional = options.is_optional();
29
30
if is_optional {
31
// append the non-null values
32
let validity = array.validity();
33
34
if let Some(validity) = validity {
35
let null_count = validity.unset_bits();
36
37
if null_count > 0 {
38
let mut iter = validity.iter();
39
let values = array.values().as_slice();
40
41
buffer.reserve(size_of::<P::Bytes>() * (array.len() - null_count));
42
43
let mut offset = 0;
44
let mut remaining_valid = array.len() - null_count;
45
while remaining_valid > 0 {
46
let num_valid = iter.take_leading_ones();
47
buffer.extend(
48
values[offset..offset + num_valid]
49
.iter()
50
.flat_map(|value| value.as_().to_le_bytes()),
51
);
52
remaining_valid -= num_valid;
53
offset += num_valid;
54
55
let num_invalid = iter.take_leading_zeros();
56
offset += num_invalid;
57
}
58
59
return buffer;
60
}
61
}
62
}
63
64
buffer.reserve(size_of::<P>() * array.len());
65
buffer.extend(
66
array
67
.values()
68
.iter()
69
.flat_map(|value| value.as_().to_le_bytes()),
70
);
71
72
buffer
73
}
74
75
pub(crate) fn encode_delta<T, P>(
76
array: &PrimitiveArray<T>,
77
options: EncodeNullability,
78
mut buffer: Vec<u8>,
79
) -> Vec<u8>
80
where
81
T: NativeType,
82
P: ParquetNativeType,
83
T: num_traits::AsPrimitive<P>,
84
P: num_traits::AsPrimitive<i64>,
85
{
86
let is_optional = options.is_optional();
87
88
if is_optional {
89
// append the non-null values
90
let iterator = array.non_null_values_iter().map(|x| {
91
let parquet_native: P = x.as_();
92
let integer: i64 = parquet_native.as_();
93
integer
94
});
95
let iterator = ExactSizedIter::new(iterator, array.len() - array.null_count());
96
encode(iterator, &mut buffer, 1)
97
} else {
98
// append all values
99
let iterator = array.values().iter().map(|x| {
100
let parquet_native: P = x.as_();
101
let integer: i64 = parquet_native.as_();
102
integer
103
});
104
encode(iterator, &mut buffer, 1)
105
}
106
buffer
107
}
108
109
pub fn array_to_page_plain<T, P>(
110
array: &PrimitiveArray<T>,
111
options: WriteOptions,
112
type_: PrimitiveType,
113
) -> PolarsResult<DataPage>
114
where
115
T: NativeType,
116
P: ParquetNativeType,
117
T: num_traits::AsPrimitive<P>,
118
{
119
array_to_page(array, options, type_, Encoding::Plain, encode_plain)
120
}
121
122
pub fn array_to_page_integer<T, P>(
123
array: &PrimitiveArray<T>,
124
options: WriteOptions,
125
type_: PrimitiveType,
126
encoding: Encoding,
127
) -> PolarsResult<Page>
128
where
129
T: NativeType,
130
P: ParquetNativeType,
131
T: num_traits::AsPrimitive<P>,
132
P: num_traits::AsPrimitive<i64>,
133
{
134
match encoding {
135
Encoding::Plain => array_to_page(array, options, type_, encoding, encode_plain),
136
Encoding::DeltaBinaryPacked => array_to_page(array, options, type_, encoding, encode_delta),
137
other => polars_bail!(nyi = "Encoding integer as {other:?}"),
138
}
139
.map(Page::Data)
140
}
141
142
pub fn array_to_page<T, P, F: Fn(&PrimitiveArray<T>, EncodeNullability, Vec<u8>) -> Vec<u8>>(
143
array: &PrimitiveArray<T>,
144
options: WriteOptions,
145
type_: PrimitiveType,
146
encoding: Encoding,
147
encode: F,
148
) -> PolarsResult<DataPage>
149
where
150
T: NativeType,
151
P: ParquetNativeType,
152
// constraint required to build statistics
153
T: num_traits::AsPrimitive<P>,
154
{
155
let is_optional = is_nullable(&type_.field_info);
156
let encode_options = EncodeNullability::new(is_optional);
157
158
let validity = array.validity();
159
160
let mut buffer = vec![];
161
utils::write_def_levels(
162
&mut buffer,
163
is_optional,
164
validity,
165
array.len(),
166
options.version,
167
)?;
168
169
let definition_levels_byte_length = buffer.len();
170
171
let buffer = encode(array, encode_options, buffer);
172
173
let statistics = if options.has_statistics() {
174
Some(build_statistics(array, type_.clone(), &options.statistics).serialize())
175
} else {
176
None
177
};
178
179
utils::build_plain_page(
180
buffer,
181
array.len(),
182
array.len(),
183
array.null_count(),
184
0,
185
definition_levels_byte_length,
186
statistics,
187
type_,
188
options,
189
encoding,
190
)
191
}
192
193
pub fn build_statistics<T, P>(
194
array: &PrimitiveArray<T>,
195
primitive_type: PrimitiveType,
196
options: &StatisticsOptions,
197
) -> PrimitiveStatistics<P>
198
where
199
T: NativeType,
200
P: ParquetNativeType,
201
T: num_traits::AsPrimitive<P>,
202
{
203
let (min_value, max_value) = match (options.min_value, options.max_value) {
204
(true, true) => {
205
match polars_compute::min_max::dyn_array_min_max_propagate_nan(array as &dyn Array) {
206
None => (None, None),
207
Some((l, r)) => (Some(l), Some(r)),
208
}
209
},
210
(true, false) => (
211
polars_compute::min_max::dyn_array_min_propagate_nan(array as &dyn Array),
212
None,
213
),
214
(false, true) => (
215
None,
216
polars_compute::min_max::dyn_array_max_propagate_nan(array as &dyn Array),
217
),
218
(false, false) => (None, None),
219
};
220
221
let min_value = min_value.and_then(|s| {
222
s.as_any()
223
.downcast_ref::<PrimitiveScalar<T>>()
224
.unwrap()
225
.value()
226
.map(|x| x.as_())
227
});
228
let max_value = max_value.and_then(|s| {
229
s.as_any()
230
.downcast_ref::<PrimitiveScalar<T>>()
231
.unwrap()
232
.value()
233
.map(|x| x.as_())
234
});
235
236
PrimitiveStatistics::<P> {
237
primitive_type,
238
null_count: options.null_count.then_some(array.null_count() as i64),
239
distinct_count: None,
240
max_value,
241
min_value,
242
}
243
}
244
245