Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/write/statistics.rs
6940 views
1
use crate::parquet::error::{ParquetError, ParquetResult};
2
use crate::parquet::schema::types::PhysicalType;
3
use crate::parquet::statistics::*;
4
use crate::parquet::types::NativeType;
5
6
#[inline]
7
fn reduce_single<T, F: Fn(T, T) -> T>(lhs: Option<T>, rhs: Option<T>, op: F) -> Option<T> {
8
match (lhs, rhs) {
9
(None, None) => None,
10
(Some(x), None) => Some(x),
11
(None, Some(x)) => Some(x),
12
(Some(x), Some(y)) => Some(op(x, y)),
13
}
14
}
15
16
#[inline]
17
fn reduce_vec8(lhs: Option<Vec<u8>>, rhs: &Option<Vec<u8>>, max: bool) -> Option<Vec<u8>> {
18
match (lhs, rhs) {
19
(None, None) => None,
20
(Some(x), None) => Some(x),
21
(None, Some(x)) => Some(x.clone()),
22
(Some(x), Some(y)) => Some(ord_binary(x, y.clone(), max)),
23
}
24
}
25
26
pub fn reduce(stats: &[&Option<Statistics>]) -> ParquetResult<Option<Statistics>> {
27
if stats.is_empty() {
28
return Ok(None);
29
}
30
let stats = stats
31
.iter()
32
.filter_map(|x| x.as_ref())
33
.collect::<Vec<&Statistics>>();
34
if stats.is_empty() {
35
return Ok(None);
36
};
37
38
let same_type = stats
39
.iter()
40
.skip(1)
41
.all(|x| x.physical_type() == stats[0].physical_type());
42
if !same_type {
43
return Err(ParquetError::oos(
44
"The statistics do not have the same dtype",
45
));
46
};
47
48
use PhysicalType as T;
49
let stats = match stats[0].physical_type() {
50
T::Boolean => reduce_boolean(stats.iter().map(|x| x.expect_as_boolean())).into(),
51
T::Int32 => reduce_primitive::<i32, _>(stats.iter().map(|x| x.expect_as_int32())).into(),
52
T::Int64 => reduce_primitive(stats.iter().map(|x| x.expect_as_int64())).into(),
53
T::Float => reduce_primitive(stats.iter().map(|x| x.expect_as_float())).into(),
54
T::Double => reduce_primitive(stats.iter().map(|x| x.expect_as_double())).into(),
55
T::ByteArray => reduce_binary(stats.iter().map(|x| x.expect_as_binary())).into(),
56
T::FixedLenByteArray(_) => {
57
reduce_fix_len_binary(stats.iter().map(|x| x.expect_as_fixedlen())).into()
58
},
59
_ => todo!(),
60
};
61
62
Ok(Some(stats))
63
}
64
65
fn reduce_binary<'a, I: Iterator<Item = &'a BinaryStatistics>>(mut stats: I) -> BinaryStatistics {
66
let initial = stats.next().unwrap().clone();
67
stats.fold(initial, |mut acc, new| {
68
acc.min_value = reduce_vec8(acc.min_value, &new.min_value, false);
69
acc.max_value = reduce_vec8(acc.max_value, &new.max_value, true);
70
acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);
71
acc.distinct_count = None;
72
acc
73
})
74
}
75
76
fn reduce_fix_len_binary<'a, I: Iterator<Item = &'a FixedLenStatistics>>(
77
mut stats: I,
78
) -> FixedLenStatistics {
79
let initial = stats.next().unwrap().clone();
80
stats.fold(initial, |mut acc, new| {
81
acc.min_value = reduce_vec8(acc.min_value, &new.min_value, false);
82
acc.max_value = reduce_vec8(acc.max_value, &new.max_value, true);
83
acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);
84
acc.distinct_count = None;
85
acc
86
})
87
}
88
89
fn ord_binary(a: Vec<u8>, b: Vec<u8>, max: bool) -> Vec<u8> {
90
for (v1, v2) in a.iter().zip(b.iter()) {
91
match v1.cmp(v2) {
92
std::cmp::Ordering::Greater => {
93
if max {
94
return a;
95
} else {
96
return b;
97
}
98
},
99
std::cmp::Ordering::Less => {
100
if max {
101
return b;
102
} else {
103
return a;
104
}
105
},
106
_ => {},
107
}
108
}
109
a
110
}
111
112
fn reduce_boolean<'a, I: Iterator<Item = &'a BooleanStatistics>>(
113
mut stats: I,
114
) -> BooleanStatistics {
115
let initial = stats.next().unwrap().clone();
116
stats.fold(initial, |mut acc, new| {
117
acc.min_value = reduce_single(
118
acc.min_value,
119
new.min_value,
120
|x, y| if x & !(y) { y } else { x },
121
);
122
acc.max_value = reduce_single(
123
acc.max_value,
124
new.max_value,
125
|x, y| if x & !(y) { x } else { y },
126
);
127
acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);
128
acc.distinct_count = None;
129
acc
130
})
131
}
132
133
fn reduce_primitive<
134
'a,
135
T: NativeType + std::cmp::PartialOrd,
136
I: Iterator<Item = &'a PrimitiveStatistics<T>>,
137
>(
138
mut stats: I,
139
) -> PrimitiveStatistics<T> {
140
let initial = stats.next().unwrap().clone();
141
stats.fold(initial, |mut acc, new| {
142
acc.min_value = reduce_single(
143
acc.min_value,
144
new.min_value,
145
|x, y| if x > y { y } else { x },
146
);
147
acc.max_value = reduce_single(
148
acc.max_value,
149
new.max_value,
150
|x, y| if x > y { x } else { y },
151
);
152
acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);
153
acc.distinct_count = None;
154
acc
155
})
156
}
157
158
#[cfg(test)]
159
mod tests {
160
use super::*;
161
use crate::parquet::schema::types::PrimitiveType;
162
163
#[test]
164
fn binary() -> ParquetResult<()> {
165
let iter = vec![
166
BinaryStatistics {
167
primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray),
168
null_count: Some(0),
169
distinct_count: None,
170
min_value: Some(vec![1, 2]),
171
max_value: Some(vec![3, 4]),
172
},
173
BinaryStatistics {
174
primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray),
175
null_count: Some(0),
176
distinct_count: None,
177
min_value: Some(vec![4, 5]),
178
max_value: None,
179
},
180
];
181
let a = reduce_binary(iter.iter());
182
183
assert_eq!(
184
a,
185
BinaryStatistics {
186
primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray,),
187
null_count: Some(0),
188
distinct_count: None,
189
min_value: Some(vec![1, 2]),
190
max_value: Some(vec![3, 4]),
191
},
192
);
193
194
Ok(())
195
}
196
197
#[test]
198
fn fixed_len_binary() -> ParquetResult<()> {
199
let iter = vec![
200
FixedLenStatistics {
201
primitive_type: PrimitiveType::from_physical(
202
"bla".into(),
203
PhysicalType::FixedLenByteArray(2),
204
),
205
null_count: Some(0),
206
distinct_count: None,
207
min_value: Some(vec![1, 2]),
208
max_value: Some(vec![3, 4]),
209
},
210
FixedLenStatistics {
211
primitive_type: PrimitiveType::from_physical(
212
"bla".into(),
213
PhysicalType::FixedLenByteArray(2),
214
),
215
null_count: Some(0),
216
distinct_count: None,
217
min_value: Some(vec![4, 5]),
218
max_value: None,
219
},
220
];
221
let a = reduce_fix_len_binary(iter.iter());
222
223
assert_eq!(
224
a,
225
FixedLenStatistics {
226
primitive_type: PrimitiveType::from_physical(
227
"bla".into(),
228
PhysicalType::FixedLenByteArray(2),
229
),
230
null_count: Some(0),
231
distinct_count: None,
232
min_value: Some(vec![1, 2]),
233
max_value: Some(vec![3, 4]),
234
},
235
);
236
237
Ok(())
238
}
239
240
#[test]
241
fn boolean() -> ParquetResult<()> {
242
let iter = [
243
BooleanStatistics {
244
null_count: Some(0),
245
distinct_count: None,
246
min_value: Some(false),
247
max_value: Some(false),
248
},
249
BooleanStatistics {
250
null_count: Some(0),
251
distinct_count: None,
252
min_value: Some(true),
253
max_value: Some(true),
254
},
255
];
256
let a = reduce_boolean(iter.iter());
257
258
assert_eq!(
259
a,
260
BooleanStatistics {
261
null_count: Some(0),
262
distinct_count: None,
263
min_value: Some(false),
264
max_value: Some(true),
265
},
266
);
267
268
Ok(())
269
}
270
271
#[test]
272
fn primitive() -> ParquetResult<()> {
273
let iter = [PrimitiveStatistics {
274
null_count: Some(2),
275
distinct_count: None,
276
min_value: Some(30),
277
max_value: Some(70),
278
primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::Int32),
279
}];
280
let a = reduce_primitive(iter.iter());
281
282
assert_eq!(
283
a,
284
PrimitiveStatistics {
285
null_count: Some(2),
286
distinct_count: None,
287
min_value: Some(30),
288
max_value: Some(70),
289
primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::Int32,),
290
},
291
);
292
293
Ok(())
294
}
295
}
296
297