Path: blob/main/crates/polars-parquet/src/parquet/write/statistics.rs
8512 views
use crate::parquet::error::{ParquetError, ParquetResult};1use crate::parquet::schema::types::PhysicalType;2use crate::parquet::statistics::*;3use crate::parquet::types::NativeType;45#[inline]6fn reduce_single<T, F: Fn(T, T) -> T>(lhs: Option<T>, rhs: Option<T>, op: F) -> Option<T> {7match (lhs, rhs) {8(None, None) => None,9(Some(x), None) => Some(x),10(None, Some(x)) => Some(x),11(Some(x), Some(y)) => Some(op(x, y)),12}13}1415#[inline]16fn reduce_vec8(lhs: Option<Vec<u8>>, rhs: &Option<Vec<u8>>, max: bool) -> Option<Vec<u8>> {17let take_min = !max;1819match (lhs, rhs) {20(None, None) => None,21(Some(x), None) => Some(x),22(None, Some(x)) => Some(x.clone()),23(Some(x), Some(y)) => Some(if (&x <= y) == take_min { x } else { y.clone() }),24}25}2627pub fn reduce(stats: &[&Option<Statistics>]) -> ParquetResult<Option<Statistics>> {28if stats.is_empty() {29return Ok(None);30}31let stats = stats32.iter()33.filter_map(|x| x.as_ref())34.collect::<Vec<&Statistics>>();35if stats.is_empty() {36return Ok(None);37};3839let same_type = stats40.iter()41.skip(1)42.all(|x| x.physical_type() == stats[0].physical_type());43if !same_type {44return Err(ParquetError::oos(45"The statistics do not have the same dtype",46));47};4849use PhysicalType as T;50let stats = match stats[0].physical_type() {51T::Boolean => reduce_boolean(stats.iter().map(|x| x.expect_as_boolean())).into(),52T::Int32 => reduce_primitive::<i32, _>(stats.iter().map(|x| x.expect_as_int32())).into(),53T::Int64 => reduce_primitive(stats.iter().map(|x| x.expect_as_int64())).into(),54T::Float => reduce_primitive(stats.iter().map(|x| x.expect_as_float())).into(),55T::Double => reduce_primitive(stats.iter().map(|x| x.expect_as_double())).into(),56T::ByteArray => reduce_binary(stats.iter().map(|x| x.expect_as_binary())).into(),57T::FixedLenByteArray(_) => {58reduce_fix_len_binary(stats.iter().map(|x| x.expect_as_fixedlen())).into()59},60_ => todo!(),61};6263Ok(Some(stats))64}6566fn reduce_binary<'a, I: Iterator<Item = &'a BinaryStatistics>>(mut stats: I) -> BinaryStatistics {67let initial = stats.next().unwrap().clone();68stats.fold(initial, |mut acc, new| {69acc.min_value = reduce_vec8(acc.min_value, &new.min_value, false);70acc.max_value = reduce_vec8(acc.max_value, &new.max_value, true);71acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);72acc.distinct_count = None;73acc74})75}7677fn reduce_fix_len_binary<'a, I: Iterator<Item = &'a FixedLenStatistics>>(78mut stats: I,79) -> FixedLenStatistics {80let initial = stats.next().unwrap().clone();81stats.fold(initial, |mut acc, new| {82acc.min_value = reduce_vec8(acc.min_value, &new.min_value, false);83acc.max_value = reduce_vec8(acc.max_value, &new.max_value, true);84acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);85acc.distinct_count = None;86acc87})88}8990fn reduce_boolean<'a, I: Iterator<Item = &'a BooleanStatistics>>(91mut stats: I,92) -> BooleanStatistics {93let initial = stats.next().unwrap().clone();94stats.fold(initial, |mut acc, new| {95acc.min_value = reduce_single(96acc.min_value,97new.min_value,98|x, y| if x & !(y) { y } else { x },99);100acc.max_value = reduce_single(101acc.max_value,102new.max_value,103|x, y| if x & !(y) { x } else { y },104);105acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);106acc.distinct_count = None;107acc108})109}110111fn reduce_primitive<112'a,113T: NativeType + std::cmp::PartialOrd,114I: Iterator<Item = &'a PrimitiveStatistics<T>>,115>(116mut stats: I,117) -> PrimitiveStatistics<T> {118let initial = stats.next().unwrap().clone();119stats.fold(initial, |mut acc, new| {120acc.min_value = reduce_single(121acc.min_value,122new.min_value,123|x, y| if x > y { y } else { x },124);125acc.max_value = reduce_single(126acc.max_value,127new.max_value,128|x, y| if x > y { x } else { y },129);130acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);131acc.distinct_count = None;132acc133})134}135136#[cfg(test)]137mod tests {138use super::*;139use crate::parquet::schema::types::PrimitiveType;140141#[test]142fn binary() -> ParquetResult<()> {143let iter = [144BinaryStatistics {145primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray),146null_count: Some(0),147distinct_count: None,148min_value: Some(vec![1, 2]),149max_value: Some(vec![3, 4]),150},151BinaryStatistics {152primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray),153null_count: Some(0),154distinct_count: None,155min_value: Some(vec![4, 5]),156max_value: None,157},158];159let a = reduce_binary(iter.iter());160161assert_eq!(162a,163BinaryStatistics {164primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray,),165null_count: Some(0),166distinct_count: None,167min_value: Some(vec![1, 2]),168max_value: Some(vec![3, 4]),169},170);171172Ok(())173}174175#[test]176fn fixed_len_binary() -> ParquetResult<()> {177let iter = [178FixedLenStatistics {179primitive_type: PrimitiveType::from_physical(180"bla".into(),181PhysicalType::FixedLenByteArray(2),182),183null_count: Some(0),184distinct_count: None,185min_value: Some(vec![1, 2]),186max_value: Some(vec![3, 4]),187},188FixedLenStatistics {189primitive_type: PrimitiveType::from_physical(190"bla".into(),191PhysicalType::FixedLenByteArray(2),192),193null_count: Some(0),194distinct_count: None,195min_value: Some(vec![4, 5]),196max_value: None,197},198];199let a = reduce_fix_len_binary(iter.iter());200201assert_eq!(202a,203FixedLenStatistics {204primitive_type: PrimitiveType::from_physical(205"bla".into(),206PhysicalType::FixedLenByteArray(2),207),208null_count: Some(0),209distinct_count: None,210min_value: Some(vec![1, 2]),211max_value: Some(vec![3, 4]),212},213);214215Ok(())216}217218#[test]219fn boolean() -> ParquetResult<()> {220let iter = [221BooleanStatistics {222null_count: Some(0),223distinct_count: None,224min_value: Some(false),225max_value: Some(false),226},227BooleanStatistics {228null_count: Some(0),229distinct_count: None,230min_value: Some(true),231max_value: Some(true),232},233];234let a = reduce_boolean(iter.iter());235236assert_eq!(237a,238BooleanStatistics {239null_count: Some(0),240distinct_count: None,241min_value: Some(false),242max_value: Some(true),243},244);245246Ok(())247}248249#[test]250fn primitive() -> ParquetResult<()> {251let iter = [PrimitiveStatistics {252null_count: Some(2),253distinct_count: None,254min_value: Some(30),255max_value: Some(70),256primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::Int32),257}];258let a = reduce_primitive(iter.iter());259260assert_eq!(261a,262PrimitiveStatistics {263null_count: Some(2),264distinct_count: None,265min_value: Some(30),266max_value: Some(70),267primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::Int32,),268},269);270271Ok(())272}273274#[test]275fn binary_prefix_ordering() -> ParquetResult<()> {276// Here [1, 2] is a prefix of [1, 2, 0].277// Lexicographically: [1, 2] < [1, 2, 0],278// so min must be [1, 2] and max must be [1, 2, 0].279let iter = [280BinaryStatistics {281primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray),282null_count: Some(0),283distinct_count: None,284min_value: Some(vec![1, 2]),285max_value: Some(vec![1, 2]),286},287BinaryStatistics {288primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray),289null_count: Some(0),290distinct_count: None,291min_value: Some(vec![1, 2, 0]),292max_value: Some(vec![1, 2, 0]),293},294];295296let a = reduce_binary(iter.iter());297298assert_eq!(a.min_value, Some(vec![1, 2]));299assert_eq!(a.max_value, Some(vec![1, 2, 0]));300assert_eq!(a.null_count, Some(0));301assert_eq!(a.distinct_count, None);302303Ok(())304}305306#[test]307fn test_reduce_vec8_equal_prefix_min_max() -> ParquetResult<()> {308let a = vec![1, 2];309let b = vec![1, 2, 0];310311// For max=true, we expect the longer (lexicographically larger) value.312let max_val = reduce_vec8(Some(a.clone()), &Some(b.clone()), true).unwrap();313assert_eq!(max_val, b);314315// For max=false, we expect the shorter (lexicographically smaller) value.316let min_val = reduce_vec8(Some(a.clone()), &Some(b), false).unwrap();317assert_eq!(min_val, a);318319Ok(())320}321}322323324