Path: blob/main/crates/polars-parquet/src/arrow/read/statistics.rs
8492 views
//! APIs exposing `crate::parquet`'s statistics as arrow's statistics.1use arrow::array::{2Array, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, MutableBinaryViewArray,3MutableBooleanArray, MutableFixedSizeBinaryArray, MutablePrimitiveArray, NullArray,4PrimitiveArray, Utf8ViewArray,5};6use arrow::datatypes::{ArrowDataType, Field, IntegerType, IntervalUnit, TimeUnit};7use arrow::types::{days_ms, i256};8use ethnum::I256;9use num_traits::{AsPrimitive, FromBytes};10use polars_utils::IdxSize;11use polars_utils::float16::pf16;12use polars_utils::pl_str::PlSmallStr;1314use super::{ParquetTimeUnit, RowGroupMetadata};15use crate::parquet::error::{ParquetError, ParquetResult};16use crate::parquet::schema::types::PhysicalType as ParquetPhysicalType;17use crate::parquet::statistics::Statistics as ParquetStatistics;18use crate::read::{19ColumnChunkMetadata, PrimitiveLogicalType, convert_days_ms, convert_i128, convert_i256,20convert_year_month, int96_to_i64_ns,21};2223/// Parquet statistics for a nesting level24#[derive(Debug, PartialEq)]25pub enum Statistics {26Column(Box<ColumnStatistics>),2728List(Option<Box<Statistics>>),29FixedSizeList(Option<Box<Statistics>>, usize),3031Struct(Box<[Option<Statistics>]>),32Dictionary(IntegerType, Option<Box<Statistics>>, bool),33}3435/// Arrow-deserialized parquet statistics of a leaf-column36#[derive(Debug, PartialEq)]37pub struct ColumnStatistics {38field: Field,3940logical_type: Option<PrimitiveLogicalType>,41physical_type: ParquetPhysicalType,4243/// Statistics of the leaf array of the column44statistics: ParquetStatistics,45}4647#[derive(Debug, PartialEq)]48pub enum ColumnPathSegment {49List { is_large: bool },50FixedSizeList { width: usize },51Dictionary { key: IntegerType, is_sorted: bool },52Struct { column_idx: usize },53}5455/// Arrow-deserialized parquet statistics of a leaf-column56#[derive(Debug, PartialEq)]57pub struct ArrowColumnStatistics {58pub null_count: Option<u64>,59pub distinct_count: Option<u64>,6061// While these two are Box<dyn Array>, they will only ever contain one valid value. This might62// seems dumb, and don't get me wrong it is, but arrow::Scalar is basically useless.63pub min_value: Option<Box<dyn Array>>,64pub max_value: Option<Box<dyn Array>>,65}6667/// Arrow-deserialized parquet statistics of a leaf-column68pub struct ArrowColumnStatisticsArrays {69pub null_count: PrimitiveArray<IdxSize>,70pub distinct_count: PrimitiveArray<IdxSize>,71pub min_value: Box<dyn Array>,72pub max_value: Box<dyn Array>,73}7475fn timestamp(logical_type: Option<&PrimitiveLogicalType>, time_unit: TimeUnit, x: i64) -> i64 {76let unit = if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = logical_type {77unit78} else {79return x;80};8182match (unit, time_unit) {83(ParquetTimeUnit::Milliseconds, TimeUnit::Second) => x / 1_000,84(ParquetTimeUnit::Microseconds, TimeUnit::Second) => x / 1_000_000,85(ParquetTimeUnit::Nanoseconds, TimeUnit::Second) => x * 1_000_000_000,8687(ParquetTimeUnit::Milliseconds, TimeUnit::Millisecond) => x,88(ParquetTimeUnit::Microseconds, TimeUnit::Millisecond) => x / 1_000,89(ParquetTimeUnit::Nanoseconds, TimeUnit::Millisecond) => x / 1_000_000,9091(ParquetTimeUnit::Milliseconds, TimeUnit::Microsecond) => x * 1_000,92(ParquetTimeUnit::Microseconds, TimeUnit::Microsecond) => x,93(ParquetTimeUnit::Nanoseconds, TimeUnit::Microsecond) => x / 1_000,9495(ParquetTimeUnit::Milliseconds, TimeUnit::Nanosecond) => x * 1_000_000,96(ParquetTimeUnit::Microseconds, TimeUnit::Nanosecond) => x * 1_000,97(ParquetTimeUnit::Nanoseconds, TimeUnit::Nanosecond) => x,98}99}100101impl ColumnStatistics {102pub fn into_arrow(self) -> ParquetResult<ArrowColumnStatistics> {103use ParquetStatistics as S;104let (null_count, distinct_count) = match &self.statistics {105S::Binary(s) => (s.null_count, s.distinct_count),106S::Boolean(s) => (s.null_count, s.distinct_count),107S::FixedLen(s) => (s.null_count, s.distinct_count),108S::Int32(s) => (s.null_count, s.distinct_count),109S::Int64(s) => (s.null_count, s.distinct_count),110S::Int96(s) => (s.null_count, s.distinct_count),111S::Float(s) => (s.null_count, s.distinct_count),112S::Double(s) => (s.null_count, s.distinct_count),113};114115let null_count = null_count.map(|v| v as u64);116let distinct_count = distinct_count.map(|v| v as u64);117118macro_rules! rmap {119($expect:ident, $map:expr) => {{120let s = self.statistics.$expect();121122let min = s.min_value;123let max = s.max_value;124125let min = ($map)(min)?.map(|x| Box::new(x) as Box<dyn Array>);126let max = ($map)(max)?.map(|x| Box::new(x) as Box<dyn Array>);127128(min, max)129}};130($expect:ident, @prim $from:ty $(as $to:ty)? $(, $map:expr)?) => {{131rmap!(132$expect,133|x: Option<$from>| {134$(135let x = x.map(|x| AsPrimitive::<$to>::as_(x));136)?137$(138let x = x.map($map);139)?140ParquetResult::Ok(x.map(|x| PrimitiveArray::$(<$to>::)?new(141self.field.dtype().clone(),142vec![x].into(),143None,144)))145}146)147}};148(@binary $(, $map:expr)?) => {{149rmap!(150expect_binary,151|x: Option<Vec<u8>>| {152$(153let x = x.map($map);154)?155ParquetResult::Ok(x.map(|x| BinaryViewArray::from_slice([Some(x)])))156}157)158}};159(@string) => {{160rmap!(161expect_binary,162|x: Option<Vec<u8>>| {163let x = x.map(String::from_utf8).transpose().map_err(|_| {164ParquetError::oos("Invalid UTF8 in Statistics")165})?;166ParquetResult::Ok(x.map(|x| Utf8ViewArray::from_slice([Some(x)])))167}168)169}};170}171172use {ArrowDataType as D, ParquetPhysicalType as PPT};173let (min_value, max_value) = match (self.field.dtype(), &self.physical_type) {174(D::Null, _) => (None, None),175176(D::Boolean, _) => rmap!(expect_boolean, |x: Option<bool>| ParquetResult::Ok(177x.map(|x| BooleanArray::new(ArrowDataType::Boolean, vec![x].into(), None,))178)),179180(D::Int8, _) => rmap!(expect_int32, @prim i32 as i8),181(D::Int16, _) => rmap!(expect_int32, @prim i32 as i16),182(D::Int32 | D::Date32 | D::Time32(_), _) => rmap!(expect_int32, @prim i32 as i32),183184// some implementations of parquet write arrow's date64 into i32.185(D::Date64, PPT::Int32) => rmap!(expect_int32, @prim i32 as i64, |x| x * 86400000),186187(D::Int64 | D::Time64(_) | D::Duration(_), _) | (D::Date64, PPT::Int64) => {188rmap!(expect_int64, @prim i64 as i64)189},190191(D::Interval(IntervalUnit::YearMonth), _) => rmap!(192expect_binary,193@prim Vec<u8>,194|x| convert_year_month(&x)195),196(D::Interval(IntervalUnit::DayTime), _) => rmap!(197expect_binary,198@prim Vec<u8>,199|x| convert_days_ms(&x)200),201202(D::UInt8, _) => rmap!(expect_int32, @prim i32 as u8),203(D::UInt16, _) => rmap!(expect_int32, @prim i32 as u16),204(D::UInt32, PPT::Int32) => rmap!(expect_int32, @prim i32 as u32),205206// some implementations of parquet write arrow's u32 into i64.207(D::UInt32, PPT::Int64) => rmap!(expect_int64, @prim i64 as u32),208(D::UInt64, _) => rmap!(expect_int64, @prim i64 as u64),209210(D::Timestamp(time_unit, _), PPT::Int96) => {211rmap!(expect_int96, @prim [u32; 3], |x| {212timestamp(self.logical_type.as_ref(), *time_unit, int96_to_i64_ns(x))213})214},215(D::Timestamp(time_unit, _), PPT::Int64) => {216rmap!(expect_int64, @prim i64, |x| {217timestamp(self.logical_type.as_ref(), *time_unit, x)218})219},220221(D::Float16, PPT::FixedLenByteArray(2))222if matches!(223self.logical_type.as_ref(),224Some(PrimitiveLogicalType::Float16)225) =>226{227rmap!(expect_fixedlen, @prim Vec<u8>, |v| pf16::from_le_bytes(&[v[0], v[1]]))228},229(D::Float32, _) => rmap!(expect_float, @prim f32),230(D::Float64, _) => rmap!(expect_double, @prim f64),231232(D::Decimal(_, _), PPT::Int32) => rmap!(expect_int32, @prim i32 as i128),233(D::Decimal(_, _), PPT::Int64) => rmap!(expect_int64, @prim i64 as i128),234(D::Decimal(_, _), PPT::FixedLenByteArray(n)) if *n > 16 => {235return Err(ParquetError::not_supported(format!(236"Can't decode Decimal128 type from Fixed Size Byte Array of len {n:?}",237)));238},239(D::Decimal(_, _), PPT::FixedLenByteArray(n)) => rmap!(240expect_fixedlen,241@prim Vec<u8>,242|x| convert_i128(&x, *n)243),244(D::Decimal256(_, _), PPT::Int32) => {245rmap!(expect_int32, @prim i32, |x: i32| i256(I256::new(x.into())))246},247(D::Decimal256(_, _), PPT::Int64) => {248rmap!(expect_int64, @prim i64, |x: i64| i256(I256::new(x.into())))249},250(D::Decimal256(_, _), PPT::FixedLenByteArray(n)) if *n > 16 => {251return Err(ParquetError::not_supported(format!(252"Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}",253)));254},255(D::Decimal256(_, _), PPT::FixedLenByteArray(_)) => rmap!(256expect_fixedlen,257@prim Vec<u8>,258|x| convert_i256(&x)259),260(D::Binary, _) => rmap!(@binary),261(D::LargeBinary, _) => rmap!(@binary),262(D::Utf8, _) => rmap!(@string),263(D::LargeUtf8, _) => rmap!(@string),264265(D::BinaryView, _) => rmap!(@binary),266(D::Utf8View, _) => rmap!(@string),267268(D::FixedSizeBinary(_), _) => {269rmap!(expect_fixedlen, |x: Option<Vec<u8>>| ParquetResult::Ok(270x.map(|x| FixedSizeBinaryArray::new(271self.field.dtype().clone(),272x.into(),273None274))275))276},277278other => todo!("{:?}", other),279};280281Ok(ArrowColumnStatistics {282null_count,283distinct_count,284285min_value,286max_value,287})288}289}290291/// Deserializes the statistics in the column chunks from a single `row_group`292/// into [`Statistics`] associated from `field`'s name.293///294/// # Errors295/// This function errors if the deserialization of the statistics fails (e.g. invalid utf8)296pub fn deserialize_all(297field: &Field,298row_groups: &[RowGroupMetadata],299field_idx: usize,300) -> ParquetResult<Option<ArrowColumnStatisticsArrays>> {301assert!(!row_groups.is_empty());302use ArrowDataType as D;303match field.dtype() {304// @TODO: These are all a bit more complex, skip for now.305D::List(..) | D::LargeList(..) => Ok(None),306D::Dictionary(..) => Ok(None),307D::FixedSizeList(..) => Ok(None),308D::Struct(..) => Ok(None),309310_ => {311let mut null_count = MutablePrimitiveArray::<IdxSize>::with_capacity(row_groups.len());312let mut distinct_count =313MutablePrimitiveArray::<IdxSize>::with_capacity(row_groups.len());314315let primitive_type = &row_groups[0].parquet_columns()[field_idx]316.descriptor()317.descriptor318.primitive_type;319320let logical_type = &primitive_type.logical_type;321let physical_type = &primitive_type.physical_type;322323macro_rules! rmap {324($expect:ident, $map:expr, $arr:ty$(, $arg:expr)?) => {{325let mut min_arr = <$arr>::with_capacity(row_groups.len()$(, $arg)?);326let mut max_arr = <$arr>::with_capacity(row_groups.len()$(, $arg)?);327328for rg in row_groups {329let column = &rg.parquet_columns()[field_idx];330let s = column.statistics().transpose()?;331332let (v_min, v_max, v_null_count, v_distinct_count) = match s {333None => (None, None, None, None),334Some(s) => {335let s = s.$expect();336337let min = s.min_value;338let max = s.max_value;339340let min = ($map)(min)?;341let max = ($map)(max)?;342343(344min,345max,346s.null_count.map(|v| v as IdxSize),347s.distinct_count.map(|v| v as IdxSize),348)349}350};351352min_arr.push(v_min);353max_arr.push(v_max);354null_count.push(v_null_count);355distinct_count.push(v_distinct_count);356}357358(min_arr.freeze().to_boxed(), max_arr.freeze().to_boxed())359}};360($expect:ident, $arr:ty, @prim $from:ty $(as $to:ty)? $(, $map:expr)?) => {{361rmap!(362$expect,363|x: Option<$from>| {364$(365let x = x.map(|x| AsPrimitive::<$to>::as_(x));366)?367$(368let x = x.map($map);369)?370ParquetResult::Ok(x)371},372$arr373)374}};375(@binary $(, $map:expr)?) => {{376rmap!(377expect_binary,378|x: Option<Vec<u8>>| {379$(380let x = x.map($map);381)?382ParquetResult::Ok(x)383},384MutableBinaryViewArray<[u8]>385)386}};387(@string) => {{388rmap!(389expect_binary,390|x: Option<Vec<u8>>| {391let x = x.map(String::from_utf8).transpose().map_err(|_| {392ParquetError::oos("Invalid UTF8 in Statistics")393})?;394ParquetResult::Ok(x)395},396MutableBinaryViewArray<str>397)398}};399}400401use {ArrowDataType as D, ParquetPhysicalType as PPT};402let (min_value, max_value) = match (field.dtype(), physical_type) {403(D::Null, _) => (404NullArray::new(ArrowDataType::Null, row_groups.len()).to_boxed(),405NullArray::new(ArrowDataType::Null, row_groups.len()).to_boxed(),406),407408(D::Boolean, _) => rmap!(409expect_boolean,410|x: Option<bool>| ParquetResult::Ok(x),411MutableBooleanArray412),413414(D::Int8, _) => rmap!(expect_int32, MutablePrimitiveArray::<i8>, @prim i32 as i8),415(D::Int16, _) => {416rmap!(expect_int32, MutablePrimitiveArray::<i16>, @prim i32 as i16)417},418(D::Int32 | D::Date32 | D::Time32(_), _) => {419rmap!(expect_int32, MutablePrimitiveArray::<i32>, @prim i32 as i32)420},421422// some implementations of parquet write arrow's date64 into i32.423(D::Date64, PPT::Int32) => {424rmap!(expect_int32, MutablePrimitiveArray::<i64>, @prim i32 as i64, |x| x * 86400000)425},426427(D::Int64 | D::Time64(_) | D::Duration(_), _) | (D::Date64, PPT::Int64) => {428rmap!(expect_int64, MutablePrimitiveArray::<i64>, @prim i64 as i64)429},430431(D::Interval(IntervalUnit::YearMonth), _) => rmap!(432expect_binary,433MutablePrimitiveArray::<i32>,434@prim Vec<u8>,435|x| convert_year_month(&x)436),437(D::Interval(IntervalUnit::DayTime), _) => rmap!(438expect_binary,439MutablePrimitiveArray::<days_ms>,440@prim Vec<u8>,441|x| convert_days_ms(&x)442),443444(D::UInt8, _) => rmap!(expect_int32, MutablePrimitiveArray::<u8>, @prim i32 as u8),445(D::UInt16, _) => {446rmap!(expect_int32, MutablePrimitiveArray::<u16>, @prim i32 as u16)447},448(D::UInt32, PPT::Int32) => {449rmap!(expect_int32, MutablePrimitiveArray::<u32>, @prim i32 as u32)450},451452// some implementations of parquet write arrow's u32 into i64.453(D::UInt32, PPT::Int64) => {454rmap!(expect_int64, MutablePrimitiveArray::<u32>, @prim i64 as u32)455},456(D::UInt64, _) => {457rmap!(expect_int64, MutablePrimitiveArray::<u64>, @prim i64 as u64)458},459460(D::Timestamp(time_unit, _), PPT::Int96) => {461rmap!(expect_int96, MutablePrimitiveArray::<i64>, @prim [u32; 3], |x| {462timestamp(logical_type.as_ref(), *time_unit, int96_to_i64_ns(x))463})464},465(D::Timestamp(time_unit, _), PPT::Int64) => {466rmap!(expect_int64, MutablePrimitiveArray::<i64>, @prim i64, |x| {467timestamp(logical_type.as_ref(), *time_unit, x)468})469},470471(D::Float16, _) => {472rmap!(expect_fixedlen, MutablePrimitiveArray::<pf16>, @prim Vec<u8>, |v| {473let le_bytes: [u8; 2] = [v[0], v[1]];474pf16::from_le_bytes(&le_bytes)475})476},477(D::Float32, _) => rmap!(expect_float, MutablePrimitiveArray::<f32>, @prim f32),478(D::Float64, _) => rmap!(expect_double, MutablePrimitiveArray::<f64>, @prim f64),479480(D::Decimal(_, _), PPT::Int32) => {481rmap!(expect_int32, MutablePrimitiveArray::<i128>, @prim i32 as i128)482},483(D::Decimal(_, _), PPT::Int64) => {484rmap!(expect_int64, MutablePrimitiveArray::<i128>, @prim i64 as i128)485},486(D::Decimal(_, _), PPT::FixedLenByteArray(n)) if *n > 16 => {487return Err(ParquetError::not_supported(format!(488"Can't decode Decimal128 type from Fixed Size Byte Array of len {n:?}",489)));490},491(D::Decimal(_, _), PPT::FixedLenByteArray(n)) => rmap!(492expect_fixedlen,493MutablePrimitiveArray::<i128>,494@prim Vec<u8>,495|x| convert_i128(&x, *n)496),497(D::Decimal256(_, _), PPT::Int32) => {498rmap!(expect_int32, MutablePrimitiveArray::<i256>, @prim i32, |x: i32| i256(I256::new(x.into())))499},500(D::Decimal256(_, _), PPT::Int64) => {501rmap!(expect_int64, MutablePrimitiveArray::<i256>, @prim i64, |x: i64| i256(I256::new(x.into())))502},503(D::Decimal256(_, _), PPT::FixedLenByteArray(n)) if *n > 16 => {504return Err(ParquetError::not_supported(format!(505"Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}",506)));507},508(D::Decimal256(_, _), PPT::FixedLenByteArray(_)) => rmap!(509expect_fixedlen,510MutablePrimitiveArray::<i256>,511@prim Vec<u8>,512|x| convert_i256(&x)513),514(D::Binary, _) => rmap!(@binary),515(D::LargeBinary, _) => rmap!(@binary),516(D::Utf8, _) => rmap!(@string),517(D::LargeUtf8, _) => rmap!(@string),518519(D::BinaryView, _) => rmap!(@binary),520(D::Utf8View, _) => rmap!(@string),521522(D::FixedSizeBinary(width), _) => {523rmap!(524expect_fixedlen,525|x: Option<Vec<u8>>| ParquetResult::Ok(x),526MutableFixedSizeBinaryArray,527*width528)529},530531other => todo!("{:?}", other),532};533534Ok(Some(ArrowColumnStatisticsArrays {535null_count: null_count.freeze(),536distinct_count: distinct_count.freeze(),537min_value,538max_value,539}))540},541}542}543544/// Deserializes the statistics in the column chunks from a single `row_group`545/// into [`Statistics`] associated from `field`'s name.546///547/// # Errors548/// This function errors if the deserialization of the statistics fails (e.g. invalid utf8)549pub fn deserialize<'a>(550field: &Field,551columns: &mut impl ExactSizeIterator<Item = &'a ColumnChunkMetadata>,552) -> ParquetResult<Option<Statistics>> {553use ArrowDataType as D;554match field.dtype() {555D::List(field) | D::LargeList(field) => Ok(Some(Statistics::List(556deserialize(field.as_ref(), columns)?.map(Box::new),557))),558D::Dictionary(key, dtype, is_sorted) => Ok(Some(Statistics::Dictionary(559*key,560deserialize(561&Field::new(PlSmallStr::EMPTY, dtype.as_ref().clone(), true),562columns,563)?564.map(Box::new),565*is_sorted,566))),567D::FixedSizeList(field, width) => Ok(Some(Statistics::FixedSizeList(568deserialize(field.as_ref(), columns)?.map(Box::new),569*width,570))),571D::Struct(fields) => {572let field_columns = fields573.iter()574.map(|f| deserialize(f, columns))575.collect::<ParquetResult<_>>()?;576Ok(Some(Statistics::Struct(field_columns)))577},578_ => {579let column = columns.next().unwrap();580581Ok(column.statistics().transpose()?.map(|statistics| {582let primitive_type = &column.descriptor().descriptor.primitive_type;583584Statistics::Column(Box::new(ColumnStatistics {585field: field.clone(),586587logical_type: primitive_type.logical_type,588physical_type: primitive_type.physical_type,589590statistics,591}))592}))593},594}595}596597598