Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/write/statistics.rs
8512 views
1
use crate::parquet::error::{ParquetError, ParquetResult};
2
use crate::parquet::schema::types::PhysicalType;
3
use crate::parquet::statistics::*;
4
use crate::parquet::types::NativeType;
5
6
#[inline]
7
fn reduce_single<T, F: Fn(T, T) -> T>(lhs: Option<T>, rhs: Option<T>, op: F) -> Option<T> {
8
match (lhs, rhs) {
9
(None, None) => None,
10
(Some(x), None) => Some(x),
11
(None, Some(x)) => Some(x),
12
(Some(x), Some(y)) => Some(op(x, y)),
13
}
14
}
15
16
#[inline]
17
fn reduce_vec8(lhs: Option<Vec<u8>>, rhs: &Option<Vec<u8>>, max: bool) -> Option<Vec<u8>> {
18
let take_min = !max;
19
20
match (lhs, rhs) {
21
(None, None) => None,
22
(Some(x), None) => Some(x),
23
(None, Some(x)) => Some(x.clone()),
24
(Some(x), Some(y)) => Some(if (&x <= y) == take_min { x } else { y.clone() }),
25
}
26
}
27
28
pub fn reduce(stats: &[&Option<Statistics>]) -> ParquetResult<Option<Statistics>> {
29
if stats.is_empty() {
30
return Ok(None);
31
}
32
let stats = stats
33
.iter()
34
.filter_map(|x| x.as_ref())
35
.collect::<Vec<&Statistics>>();
36
if stats.is_empty() {
37
return Ok(None);
38
};
39
40
let same_type = stats
41
.iter()
42
.skip(1)
43
.all(|x| x.physical_type() == stats[0].physical_type());
44
if !same_type {
45
return Err(ParquetError::oos(
46
"The statistics do not have the same dtype",
47
));
48
};
49
50
use PhysicalType as T;
51
let stats = match stats[0].physical_type() {
52
T::Boolean => reduce_boolean(stats.iter().map(|x| x.expect_as_boolean())).into(),
53
T::Int32 => reduce_primitive::<i32, _>(stats.iter().map(|x| x.expect_as_int32())).into(),
54
T::Int64 => reduce_primitive(stats.iter().map(|x| x.expect_as_int64())).into(),
55
T::Float => reduce_primitive(stats.iter().map(|x| x.expect_as_float())).into(),
56
T::Double => reduce_primitive(stats.iter().map(|x| x.expect_as_double())).into(),
57
T::ByteArray => reduce_binary(stats.iter().map(|x| x.expect_as_binary())).into(),
58
T::FixedLenByteArray(_) => {
59
reduce_fix_len_binary(stats.iter().map(|x| x.expect_as_fixedlen())).into()
60
},
61
_ => todo!(),
62
};
63
64
Ok(Some(stats))
65
}
66
67
fn reduce_binary<'a, I: Iterator<Item = &'a BinaryStatistics>>(mut stats: I) -> BinaryStatistics {
68
let initial = stats.next().unwrap().clone();
69
stats.fold(initial, |mut acc, new| {
70
acc.min_value = reduce_vec8(acc.min_value, &new.min_value, false);
71
acc.max_value = reduce_vec8(acc.max_value, &new.max_value, true);
72
acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);
73
acc.distinct_count = None;
74
acc
75
})
76
}
77
78
fn reduce_fix_len_binary<'a, I: Iterator<Item = &'a FixedLenStatistics>>(
79
mut stats: I,
80
) -> FixedLenStatistics {
81
let initial = stats.next().unwrap().clone();
82
stats.fold(initial, |mut acc, new| {
83
acc.min_value = reduce_vec8(acc.min_value, &new.min_value, false);
84
acc.max_value = reduce_vec8(acc.max_value, &new.max_value, true);
85
acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);
86
acc.distinct_count = None;
87
acc
88
})
89
}
90
91
fn reduce_boolean<'a, I: Iterator<Item = &'a BooleanStatistics>>(
92
mut stats: I,
93
) -> BooleanStatistics {
94
let initial = stats.next().unwrap().clone();
95
stats.fold(initial, |mut acc, new| {
96
acc.min_value = reduce_single(
97
acc.min_value,
98
new.min_value,
99
|x, y| if x & !(y) { y } else { x },
100
);
101
acc.max_value = reduce_single(
102
acc.max_value,
103
new.max_value,
104
|x, y| if x & !(y) { x } else { y },
105
);
106
acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);
107
acc.distinct_count = None;
108
acc
109
})
110
}
111
112
fn reduce_primitive<
113
'a,
114
T: NativeType + std::cmp::PartialOrd,
115
I: Iterator<Item = &'a PrimitiveStatistics<T>>,
116
>(
117
mut stats: I,
118
) -> PrimitiveStatistics<T> {
119
let initial = stats.next().unwrap().clone();
120
stats.fold(initial, |mut acc, new| {
121
acc.min_value = reduce_single(
122
acc.min_value,
123
new.min_value,
124
|x, y| if x > y { y } else { x },
125
);
126
acc.max_value = reduce_single(
127
acc.max_value,
128
new.max_value,
129
|x, y| if x > y { x } else { y },
130
);
131
acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);
132
acc.distinct_count = None;
133
acc
134
})
135
}
136
137
#[cfg(test)]
138
mod tests {
139
use super::*;
140
use crate::parquet::schema::types::PrimitiveType;
141
142
#[test]
143
fn binary() -> ParquetResult<()> {
144
let iter = [
145
BinaryStatistics {
146
primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray),
147
null_count: Some(0),
148
distinct_count: None,
149
min_value: Some(vec![1, 2]),
150
max_value: Some(vec![3, 4]),
151
},
152
BinaryStatistics {
153
primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray),
154
null_count: Some(0),
155
distinct_count: None,
156
min_value: Some(vec![4, 5]),
157
max_value: None,
158
},
159
];
160
let a = reduce_binary(iter.iter());
161
162
assert_eq!(
163
a,
164
BinaryStatistics {
165
primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray,),
166
null_count: Some(0),
167
distinct_count: None,
168
min_value: Some(vec![1, 2]),
169
max_value: Some(vec![3, 4]),
170
},
171
);
172
173
Ok(())
174
}
175
176
#[test]
177
fn fixed_len_binary() -> ParquetResult<()> {
178
let iter = [
179
FixedLenStatistics {
180
primitive_type: PrimitiveType::from_physical(
181
"bla".into(),
182
PhysicalType::FixedLenByteArray(2),
183
),
184
null_count: Some(0),
185
distinct_count: None,
186
min_value: Some(vec![1, 2]),
187
max_value: Some(vec![3, 4]),
188
},
189
FixedLenStatistics {
190
primitive_type: PrimitiveType::from_physical(
191
"bla".into(),
192
PhysicalType::FixedLenByteArray(2),
193
),
194
null_count: Some(0),
195
distinct_count: None,
196
min_value: Some(vec![4, 5]),
197
max_value: None,
198
},
199
];
200
let a = reduce_fix_len_binary(iter.iter());
201
202
assert_eq!(
203
a,
204
FixedLenStatistics {
205
primitive_type: PrimitiveType::from_physical(
206
"bla".into(),
207
PhysicalType::FixedLenByteArray(2),
208
),
209
null_count: Some(0),
210
distinct_count: None,
211
min_value: Some(vec![1, 2]),
212
max_value: Some(vec![3, 4]),
213
},
214
);
215
216
Ok(())
217
}
218
219
#[test]
220
fn boolean() -> ParquetResult<()> {
221
let iter = [
222
BooleanStatistics {
223
null_count: Some(0),
224
distinct_count: None,
225
min_value: Some(false),
226
max_value: Some(false),
227
},
228
BooleanStatistics {
229
null_count: Some(0),
230
distinct_count: None,
231
min_value: Some(true),
232
max_value: Some(true),
233
},
234
];
235
let a = reduce_boolean(iter.iter());
236
237
assert_eq!(
238
a,
239
BooleanStatistics {
240
null_count: Some(0),
241
distinct_count: None,
242
min_value: Some(false),
243
max_value: Some(true),
244
},
245
);
246
247
Ok(())
248
}
249
250
#[test]
251
fn primitive() -> ParquetResult<()> {
252
let iter = [PrimitiveStatistics {
253
null_count: Some(2),
254
distinct_count: None,
255
min_value: Some(30),
256
max_value: Some(70),
257
primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::Int32),
258
}];
259
let a = reduce_primitive(iter.iter());
260
261
assert_eq!(
262
a,
263
PrimitiveStatistics {
264
null_count: Some(2),
265
distinct_count: None,
266
min_value: Some(30),
267
max_value: Some(70),
268
primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::Int32,),
269
},
270
);
271
272
Ok(())
273
}
274
275
#[test]
276
fn binary_prefix_ordering() -> ParquetResult<()> {
277
// Here [1, 2] is a prefix of [1, 2, 0].
278
// Lexicographically: [1, 2] < [1, 2, 0],
279
// so min must be [1, 2] and max must be [1, 2, 0].
280
let iter = [
281
BinaryStatistics {
282
primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray),
283
null_count: Some(0),
284
distinct_count: None,
285
min_value: Some(vec![1, 2]),
286
max_value: Some(vec![1, 2]),
287
},
288
BinaryStatistics {
289
primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray),
290
null_count: Some(0),
291
distinct_count: None,
292
min_value: Some(vec![1, 2, 0]),
293
max_value: Some(vec![1, 2, 0]),
294
},
295
];
296
297
let a = reduce_binary(iter.iter());
298
299
assert_eq!(a.min_value, Some(vec![1, 2]));
300
assert_eq!(a.max_value, Some(vec![1, 2, 0]));
301
assert_eq!(a.null_count, Some(0));
302
assert_eq!(a.distinct_count, None);
303
304
Ok(())
305
}
306
307
#[test]
308
fn test_reduce_vec8_equal_prefix_min_max() -> ParquetResult<()> {
309
let a = vec![1, 2];
310
let b = vec![1, 2, 0];
311
312
// For max=true, we expect the longer (lexicographically larger) value.
313
let max_val = reduce_vec8(Some(a.clone()), &Some(b.clone()), true).unwrap();
314
assert_eq!(max_val, b);
315
316
// For max=false, we expect the shorter (lexicographically smaller) value.
317
let min_val = reduce_vec8(Some(a.clone()), &Some(b), false).unwrap();
318
assert_eq!(min_val, a);
319
320
Ok(())
321
}
322
}
323
324