Path: blob/main/crates/polars-parquet/src/parquet/write/statistics.rs
6940 views
use crate::parquet::error::{ParquetError, ParquetResult};1use crate::parquet::schema::types::PhysicalType;2use crate::parquet::statistics::*;3use crate::parquet::types::NativeType;45#[inline]6fn reduce_single<T, F: Fn(T, T) -> T>(lhs: Option<T>, rhs: Option<T>, op: F) -> Option<T> {7match (lhs, rhs) {8(None, None) => None,9(Some(x), None) => Some(x),10(None, Some(x)) => Some(x),11(Some(x), Some(y)) => Some(op(x, y)),12}13}1415#[inline]16fn reduce_vec8(lhs: Option<Vec<u8>>, rhs: &Option<Vec<u8>>, max: bool) -> Option<Vec<u8>> {17match (lhs, rhs) {18(None, None) => None,19(Some(x), None) => Some(x),20(None, Some(x)) => Some(x.clone()),21(Some(x), Some(y)) => Some(ord_binary(x, y.clone(), max)),22}23}2425pub fn reduce(stats: &[&Option<Statistics>]) -> ParquetResult<Option<Statistics>> {26if stats.is_empty() {27return Ok(None);28}29let stats = stats30.iter()31.filter_map(|x| x.as_ref())32.collect::<Vec<&Statistics>>();33if stats.is_empty() {34return Ok(None);35};3637let same_type = stats38.iter()39.skip(1)40.all(|x| x.physical_type() == stats[0].physical_type());41if !same_type {42return Err(ParquetError::oos(43"The statistics do not have the same dtype",44));45};4647use PhysicalType as T;48let stats = match stats[0].physical_type() {49T::Boolean => reduce_boolean(stats.iter().map(|x| x.expect_as_boolean())).into(),50T::Int32 => reduce_primitive::<i32, _>(stats.iter().map(|x| x.expect_as_int32())).into(),51T::Int64 => reduce_primitive(stats.iter().map(|x| x.expect_as_int64())).into(),52T::Float => reduce_primitive(stats.iter().map(|x| x.expect_as_float())).into(),53T::Double => reduce_primitive(stats.iter().map(|x| x.expect_as_double())).into(),54T::ByteArray => reduce_binary(stats.iter().map(|x| x.expect_as_binary())).into(),55T::FixedLenByteArray(_) => {56reduce_fix_len_binary(stats.iter().map(|x| x.expect_as_fixedlen())).into()57},58_ => todo!(),59};6061Ok(Some(stats))62}6364fn reduce_binary<'a, I: Iterator<Item = &'a BinaryStatistics>>(mut stats: I) -> BinaryStatistics {65let initial = stats.next().unwrap().clone();66stats.fold(initial, |mut acc, new| {67acc.min_value = reduce_vec8(acc.min_value, &new.min_value, false);68acc.max_value = reduce_vec8(acc.max_value, &new.max_value, true);69acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);70acc.distinct_count = None;71acc72})73}7475fn reduce_fix_len_binary<'a, I: Iterator<Item = &'a FixedLenStatistics>>(76mut stats: I,77) -> FixedLenStatistics {78let initial = stats.next().unwrap().clone();79stats.fold(initial, |mut acc, new| {80acc.min_value = reduce_vec8(acc.min_value, &new.min_value, false);81acc.max_value = reduce_vec8(acc.max_value, &new.max_value, true);82acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);83acc.distinct_count = None;84acc85})86}8788fn ord_binary(a: Vec<u8>, b: Vec<u8>, max: bool) -> Vec<u8> {89for (v1, v2) in a.iter().zip(b.iter()) {90match v1.cmp(v2) {91std::cmp::Ordering::Greater => {92if max {93return a;94} else {95return b;96}97},98std::cmp::Ordering::Less => {99if max {100return b;101} else {102return a;103}104},105_ => {},106}107}108a109}110111fn reduce_boolean<'a, I: Iterator<Item = &'a BooleanStatistics>>(112mut stats: I,113) -> BooleanStatistics {114let initial = stats.next().unwrap().clone();115stats.fold(initial, |mut acc, new| {116acc.min_value = reduce_single(117acc.min_value,118new.min_value,119|x, y| if x & !(y) { y } else { x },120);121acc.max_value = reduce_single(122acc.max_value,123new.max_value,124|x, y| if x & !(y) { x } else { y },125);126acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);127acc.distinct_count = None;128acc129})130}131132fn reduce_primitive<133'a,134T: NativeType + std::cmp::PartialOrd,135I: Iterator<Item = &'a PrimitiveStatistics<T>>,136>(137mut stats: I,138) -> PrimitiveStatistics<T> {139let initial = stats.next().unwrap().clone();140stats.fold(initial, |mut acc, new| {141acc.min_value = reduce_single(142acc.min_value,143new.min_value,144|x, y| if x > y { y } else { x },145);146acc.max_value = reduce_single(147acc.max_value,148new.max_value,149|x, y| if x > y { x } else { y },150);151acc.null_count = reduce_single(acc.null_count, new.null_count, |x, y| x + y);152acc.distinct_count = None;153acc154})155}156157#[cfg(test)]158mod tests {159use super::*;160use crate::parquet::schema::types::PrimitiveType;161162#[test]163fn binary() -> ParquetResult<()> {164let iter = vec![165BinaryStatistics {166primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray),167null_count: Some(0),168distinct_count: None,169min_value: Some(vec![1, 2]),170max_value: Some(vec![3, 4]),171},172BinaryStatistics {173primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray),174null_count: Some(0),175distinct_count: None,176min_value: Some(vec![4, 5]),177max_value: None,178},179];180let a = reduce_binary(iter.iter());181182assert_eq!(183a,184BinaryStatistics {185primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::ByteArray,),186null_count: Some(0),187distinct_count: None,188min_value: Some(vec![1, 2]),189max_value: Some(vec![3, 4]),190},191);192193Ok(())194}195196#[test]197fn fixed_len_binary() -> ParquetResult<()> {198let iter = vec![199FixedLenStatistics {200primitive_type: PrimitiveType::from_physical(201"bla".into(),202PhysicalType::FixedLenByteArray(2),203),204null_count: Some(0),205distinct_count: None,206min_value: Some(vec![1, 2]),207max_value: Some(vec![3, 4]),208},209FixedLenStatistics {210primitive_type: PrimitiveType::from_physical(211"bla".into(),212PhysicalType::FixedLenByteArray(2),213),214null_count: Some(0),215distinct_count: None,216min_value: Some(vec![4, 5]),217max_value: None,218},219];220let a = reduce_fix_len_binary(iter.iter());221222assert_eq!(223a,224FixedLenStatistics {225primitive_type: PrimitiveType::from_physical(226"bla".into(),227PhysicalType::FixedLenByteArray(2),228),229null_count: Some(0),230distinct_count: None,231min_value: Some(vec![1, 2]),232max_value: Some(vec![3, 4]),233},234);235236Ok(())237}238239#[test]240fn boolean() -> ParquetResult<()> {241let iter = [242BooleanStatistics {243null_count: Some(0),244distinct_count: None,245min_value: Some(false),246max_value: Some(false),247},248BooleanStatistics {249null_count: Some(0),250distinct_count: None,251min_value: Some(true),252max_value: Some(true),253},254];255let a = reduce_boolean(iter.iter());256257assert_eq!(258a,259BooleanStatistics {260null_count: Some(0),261distinct_count: None,262min_value: Some(false),263max_value: Some(true),264},265);266267Ok(())268}269270#[test]271fn primitive() -> ParquetResult<()> {272let iter = [PrimitiveStatistics {273null_count: Some(2),274distinct_count: None,275min_value: Some(30),276max_value: Some(70),277primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::Int32),278}];279let a = reduce_primitive(iter.iter());280281assert_eq!(282a,283PrimitiveStatistics {284null_count: Some(2),285distinct_count: None,286min_value: Some(30),287max_value: Some(70),288primitive_type: PrimitiveType::from_physical("bla".into(), PhysicalType::Int32,),289},290);291292Ok(())293}294}295296297