Path: blob/main/crates/polars-parquet/src/arrow/read/statistics.rs
6940 views
//! APIs exposing `crate::parquet`'s statistics as arrow's statistics.1use arrow::array::{2Array, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, MutableBinaryViewArray,3MutableBooleanArray, MutableFixedSizeBinaryArray, MutablePrimitiveArray, NullArray,4PrimitiveArray, Utf8ViewArray,5};6use arrow::datatypes::{ArrowDataType, Field, IntegerType, IntervalUnit, TimeUnit};7use arrow::types::{NativeType, days_ms, f16, i256};8use ethnum::I256;9use polars_utils::IdxSize;10use polars_utils::pl_str::PlSmallStr;1112use super::{ParquetTimeUnit, RowGroupMetadata};13use crate::parquet::error::{ParquetError, ParquetResult};14use crate::parquet::schema::types::PhysicalType as ParquetPhysicalType;15use crate::parquet::statistics::Statistics as ParquetStatistics;16use crate::read::{17ColumnChunkMetadata, PrimitiveLogicalType, convert_days_ms, convert_i128, convert_i256,18convert_year_month, int96_to_i64_ns,19};2021/// Parquet statistics for a nesting level22#[derive(Debug, PartialEq)]23pub enum Statistics {24Column(Box<ColumnStatistics>),2526List(Option<Box<Statistics>>),27FixedSizeList(Option<Box<Statistics>>, usize),2829Struct(Box<[Option<Statistics>]>),30Dictionary(IntegerType, Option<Box<Statistics>>, bool),31}3233/// Arrow-deserialized parquet statistics of a leaf-column34#[derive(Debug, PartialEq)]35pub struct ColumnStatistics {36field: Field,3738logical_type: Option<PrimitiveLogicalType>,39physical_type: ParquetPhysicalType,4041/// Statistics of the leaf array of the column42statistics: ParquetStatistics,43}4445#[derive(Debug, PartialEq)]46pub enum ColumnPathSegment {47List { is_large: bool },48FixedSizeList { width: usize },49Dictionary { key: IntegerType, is_sorted: bool },50Struct { column_idx: usize },51}5253/// Arrow-deserialized parquet statistics of a leaf-column54#[derive(Debug, PartialEq)]55pub struct ArrowColumnStatistics {56pub null_count: Option<u64>,57pub distinct_count: Option<u64>,5859// While these two are Box<dyn Array>, they will only ever contain one valid value. This might60// seems dumb, and don't get me wrong it is, but arrow::Scalar is basically useless.61pub min_value: Option<Box<dyn Array>>,62pub max_value: Option<Box<dyn Array>>,63}6465/// Arrow-deserialized parquet statistics of a leaf-column66pub struct ArrowColumnStatisticsArrays {67pub null_count: PrimitiveArray<IdxSize>,68pub distinct_count: PrimitiveArray<IdxSize>,69pub min_value: Box<dyn Array>,70pub max_value: Box<dyn Array>,71}7273fn timestamp(logical_type: Option<&PrimitiveLogicalType>, time_unit: TimeUnit, x: i64) -> i64 {74let unit = if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = logical_type {75unit76} else {77return x;78};7980match (unit, time_unit) {81(ParquetTimeUnit::Milliseconds, TimeUnit::Second) => x / 1_000,82(ParquetTimeUnit::Microseconds, TimeUnit::Second) => x / 1_000_000,83(ParquetTimeUnit::Nanoseconds, TimeUnit::Second) => x * 1_000_000_000,8485(ParquetTimeUnit::Milliseconds, TimeUnit::Millisecond) => x,86(ParquetTimeUnit::Microseconds, TimeUnit::Millisecond) => x / 1_000,87(ParquetTimeUnit::Nanoseconds, TimeUnit::Millisecond) => x / 1_000_000,8889(ParquetTimeUnit::Milliseconds, TimeUnit::Microsecond) => x * 1_000,90(ParquetTimeUnit::Microseconds, TimeUnit::Microsecond) => x,91(ParquetTimeUnit::Nanoseconds, TimeUnit::Microsecond) => x / 1_000,9293(ParquetTimeUnit::Milliseconds, TimeUnit::Nanosecond) => x * 1_000_000,94(ParquetTimeUnit::Microseconds, TimeUnit::Nanosecond) => x * 1_000,95(ParquetTimeUnit::Nanoseconds, TimeUnit::Nanosecond) => x,96}97}9899impl ColumnStatistics {100pub fn into_arrow(self) -> ParquetResult<ArrowColumnStatistics> {101use ParquetStatistics as S;102let (null_count, distinct_count) = match &self.statistics {103S::Binary(s) => (s.null_count, s.distinct_count),104S::Boolean(s) => (s.null_count, s.distinct_count),105S::FixedLen(s) => (s.null_count, s.distinct_count),106S::Int32(s) => (s.null_count, s.distinct_count),107S::Int64(s) => (s.null_count, s.distinct_count),108S::Int96(s) => (s.null_count, s.distinct_count),109S::Float(s) => (s.null_count, s.distinct_count),110S::Double(s) => (s.null_count, s.distinct_count),111};112113let null_count = null_count.map(|v| v as u64);114let distinct_count = distinct_count.map(|v| v as u64);115116macro_rules! rmap {117($expect:ident, $map:expr) => {{118let s = self.statistics.$expect();119120let min = s.min_value;121let max = s.max_value;122123let min = ($map)(min)?.map(|x| Box::new(x) as Box<dyn Array>);124let max = ($map)(max)?.map(|x| Box::new(x) as Box<dyn Array>);125126(min, max)127}};128($expect:ident, @prim $from:ty $(as $to:ty)? $(, $map:expr)?) => {{129rmap!(130$expect,131|x: Option<$from>| {132$(133let x = x.map(|x| x as $to);134)?135$(136let x = x.map($map);137)?138ParquetResult::Ok(x.map(|x| PrimitiveArray::$(<$to>::)?new(139self.field.dtype().clone(),140vec![x].into(),141None,142)))143}144)145}};146(@binary $(, $map:expr)?) => {{147rmap!(148expect_binary,149|x: Option<Vec<u8>>| {150$(151let x = x.map($map);152)?153ParquetResult::Ok(x.map(|x| BinaryViewArray::from_slice([Some(x)])))154}155)156}};157(@string) => {{158rmap!(159expect_binary,160|x: Option<Vec<u8>>| {161let x = x.map(String::from_utf8).transpose().map_err(|_| {162ParquetError::oos("Invalid UTF8 in Statistics")163})?;164ParquetResult::Ok(x.map(|x| Utf8ViewArray::from_slice([Some(x)])))165}166)167}};168}169170use {ArrowDataType as D, ParquetPhysicalType as PPT};171let (min_value, max_value) = match (self.field.dtype(), &self.physical_type) {172(D::Null, _) => (None, None),173174(D::Boolean, _) => rmap!(expect_boolean, |x: Option<bool>| ParquetResult::Ok(175x.map(|x| BooleanArray::new(ArrowDataType::Boolean, vec![x].into(), None,))176)),177178(D::Int8, _) => rmap!(expect_int32, @prim i32 as i8),179(D::Int16, _) => rmap!(expect_int32, @prim i32 as i16),180(D::Int32 | D::Date32 | D::Time32(_), _) => rmap!(expect_int32, @prim i32 as i32),181182// some implementations of parquet write arrow's date64 into i32.183(D::Date64, PPT::Int32) => rmap!(expect_int32, @prim i32 as i64, |x| x * 86400000),184185(D::Int64 | D::Time64(_) | D::Duration(_), _) | (D::Date64, PPT::Int64) => {186rmap!(expect_int64, @prim i64 as i64)187},188189(D::Interval(IntervalUnit::YearMonth), _) => rmap!(190expect_binary,191@prim Vec<u8>,192|x| convert_year_month(&x)193),194(D::Interval(IntervalUnit::DayTime), _) => rmap!(195expect_binary,196@prim Vec<u8>,197|x| convert_days_ms(&x)198),199200(D::UInt8, _) => rmap!(expect_int32, @prim i32 as u8),201(D::UInt16, _) => rmap!(expect_int32, @prim i32 as u16),202(D::UInt32, PPT::Int32) => rmap!(expect_int32, @prim i32 as u32),203204// some implementations of parquet write arrow's u32 into i64.205(D::UInt32, PPT::Int64) => rmap!(expect_int64, @prim i64 as u32),206(D::UInt64, _) => rmap!(expect_int64, @prim i64 as u64),207208(D::Timestamp(time_unit, _), PPT::Int96) => {209rmap!(expect_int96, @prim [u32; 3], |x| {210timestamp(self.logical_type.as_ref(), *time_unit, int96_to_i64_ns(x))211})212},213(D::Timestamp(time_unit, _), PPT::Int64) => {214rmap!(expect_int64, @prim i64, |x| {215timestamp(self.logical_type.as_ref(), *time_unit, x)216})217},218219// Read Float16, since we don't have a f16 type in Polars we read it to a Float32.220(_, PPT::FixedLenByteArray(2))221if matches!(222self.logical_type.as_ref(),223Some(PrimitiveLogicalType::Float16)224) =>225{226rmap!(expect_fixedlen, @prim Vec<u8>, |v| f16::from_le_bytes([v[0], v[1]]).to_f32())227},228(D::Float32, _) => rmap!(expect_float, @prim f32),229(D::Float64, _) => rmap!(expect_double, @prim f64),230231(D::Decimal(_, _), PPT::Int32) => rmap!(expect_int32, @prim i32 as i128),232(D::Decimal(_, _), PPT::Int64) => rmap!(expect_int64, @prim i64 as i128),233(D::Decimal(_, _), PPT::FixedLenByteArray(n)) if *n > 16 => {234return Err(ParquetError::not_supported(format!(235"Can't decode Decimal128 type from Fixed Size Byte Array of len {n:?}",236)));237},238(D::Decimal(_, _), PPT::FixedLenByteArray(n)) => rmap!(239expect_fixedlen,240@prim Vec<u8>,241|x| convert_i128(&x, *n)242),243(D::Decimal256(_, _), PPT::Int32) => {244rmap!(expect_int32, @prim i32, |x: i32| i256(I256::new(x.into())))245},246(D::Decimal256(_, _), PPT::Int64) => {247rmap!(expect_int64, @prim i64, |x: i64| i256(I256::new(x.into())))248},249(D::Decimal256(_, _), PPT::FixedLenByteArray(n)) if *n > 16 => {250return Err(ParquetError::not_supported(format!(251"Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}",252)));253},254(D::Decimal256(_, _), PPT::FixedLenByteArray(_)) => rmap!(255expect_fixedlen,256@prim Vec<u8>,257|x| convert_i256(&x)258),259(D::Binary, _) => rmap!(@binary),260(D::LargeBinary, _) => rmap!(@binary),261(D::Utf8, _) => rmap!(@string),262(D::LargeUtf8, _) => rmap!(@string),263264(D::BinaryView, _) => rmap!(@binary),265(D::Utf8View, _) => rmap!(@string),266267(D::FixedSizeBinary(_), _) => {268rmap!(expect_fixedlen, |x: Option<Vec<u8>>| ParquetResult::Ok(269x.map(|x| FixedSizeBinaryArray::new(270self.field.dtype().clone(),271x.into(),272None273))274))275},276277other => todo!("{:?}", other),278};279280Ok(ArrowColumnStatistics {281null_count,282distinct_count,283284min_value,285max_value,286})287}288}289290/// Deserializes the statistics in the column chunks from a single `row_group`291/// into [`Statistics`] associated from `field`'s name.292///293/// # Errors294/// This function errors if the deserialization of the statistics fails (e.g. invalid utf8)295pub fn deserialize_all(296field: &Field,297row_groups: &[RowGroupMetadata],298field_idx: usize,299) -> ParquetResult<Option<ArrowColumnStatisticsArrays>> {300assert!(!row_groups.is_empty());301use ArrowDataType as D;302match field.dtype() {303// @TODO: These are all a bit more complex, skip for now.304D::List(..) | D::LargeList(..) => Ok(None),305D::Dictionary(..) => Ok(None),306D::FixedSizeList(..) => Ok(None),307D::Struct(..) => Ok(None),308309_ => {310let mut null_count = MutablePrimitiveArray::<IdxSize>::with_capacity(row_groups.len());311let mut distinct_count =312MutablePrimitiveArray::<IdxSize>::with_capacity(row_groups.len());313314let primitive_type = &row_groups[0].parquet_columns()[field_idx]315.descriptor()316.descriptor317.primitive_type;318319let logical_type = &primitive_type.logical_type;320let physical_type = &primitive_type.physical_type;321322macro_rules! rmap {323($expect:ident, $map:expr, $arr:ty$(, $arg:expr)?) => {{324let mut min_arr = <$arr>::with_capacity(row_groups.len()$(, $arg)?);325let mut max_arr = <$arr>::with_capacity(row_groups.len()$(, $arg)?);326327for rg in row_groups {328let column = &rg.parquet_columns()[field_idx];329let s = column.statistics().transpose()?;330331let (v_min, v_max, v_null_count, v_distinct_count) = match s {332None => (None, None, None, None),333Some(s) => {334let s = s.$expect();335336let min = s.min_value;337let max = s.max_value;338339let min = ($map)(min)?;340let max = ($map)(max)?;341342(343min,344max,345s.null_count.map(|v| v as IdxSize),346s.distinct_count.map(|v| v as IdxSize),347)348}349};350351min_arr.push(v_min);352max_arr.push(v_max);353null_count.push(v_null_count);354distinct_count.push(v_distinct_count);355}356357(min_arr.freeze().to_boxed(), max_arr.freeze().to_boxed())358}};359($expect:ident, $arr:ty, @prim $from:ty $(as $to:ty)? $(, $map:expr)?) => {{360rmap!(361$expect,362|x: Option<$from>| {363$(364let x = x.map(|x| x as $to);365)?366$(367let x = x.map($map);368)?369ParquetResult::Ok(x)370},371$arr372)373}};374(@binary $(, $map:expr)?) => {{375rmap!(376expect_binary,377|x: Option<Vec<u8>>| {378$(379let x = x.map($map);380)?381ParquetResult::Ok(x)382},383MutableBinaryViewArray<[u8]>384)385}};386(@string) => {{387rmap!(388expect_binary,389|x: Option<Vec<u8>>| {390let x = x.map(String::from_utf8).transpose().map_err(|_| {391ParquetError::oos("Invalid UTF8 in Statistics")392})?;393ParquetResult::Ok(x)394},395MutableBinaryViewArray<str>396)397}};398}399400use {ArrowDataType as D, ParquetPhysicalType as PPT};401let (min_value, max_value) = match (field.dtype(), physical_type) {402(D::Null, _) => (403NullArray::new(ArrowDataType::Null, row_groups.len()).to_boxed(),404NullArray::new(ArrowDataType::Null, row_groups.len()).to_boxed(),405),406407(D::Boolean, _) => rmap!(408expect_boolean,409|x: Option<bool>| ParquetResult::Ok(x),410MutableBooleanArray411),412413(D::Int8, _) => rmap!(expect_int32, MutablePrimitiveArray::<i8>, @prim i32 as i8),414(D::Int16, _) => {415rmap!(expect_int32, MutablePrimitiveArray::<i16>, @prim i32 as i16)416},417(D::Int32 | D::Date32 | D::Time32(_), _) => {418rmap!(expect_int32, MutablePrimitiveArray::<i32>, @prim i32 as i32)419},420421// some implementations of parquet write arrow's date64 into i32.422(D::Date64, PPT::Int32) => {423rmap!(expect_int32, MutablePrimitiveArray::<i64>, @prim i32 as i64, |x| x * 86400000)424},425426(D::Int64 | D::Time64(_) | D::Duration(_), _) | (D::Date64, PPT::Int64) => {427rmap!(expect_int64, MutablePrimitiveArray::<i64>, @prim i64 as i64)428},429430(D::Interval(IntervalUnit::YearMonth), _) => rmap!(431expect_binary,432MutablePrimitiveArray::<i32>,433@prim Vec<u8>,434|x| convert_year_month(&x)435),436(D::Interval(IntervalUnit::DayTime), _) => rmap!(437expect_binary,438MutablePrimitiveArray::<days_ms>,439@prim Vec<u8>,440|x| convert_days_ms(&x)441),442443(D::UInt8, _) => rmap!(expect_int32, MutablePrimitiveArray::<u8>, @prim i32 as u8),444(D::UInt16, _) => {445rmap!(expect_int32, MutablePrimitiveArray::<u16>, @prim i32 as u16)446},447(D::UInt32, PPT::Int32) => {448rmap!(expect_int32, MutablePrimitiveArray::<u32>, @prim i32 as u32)449},450451// some implementations of parquet write arrow's u32 into i64.452(D::UInt32, PPT::Int64) => {453rmap!(expect_int64, MutablePrimitiveArray::<u32>, @prim i64 as u32)454},455(D::UInt64, _) => {456rmap!(expect_int64, MutablePrimitiveArray::<u64>, @prim i64 as u64)457},458459(D::Timestamp(time_unit, _), PPT::Int96) => {460rmap!(expect_int96, MutablePrimitiveArray::<i64>, @prim [u32; 3], |x| {461timestamp(logical_type.as_ref(), *time_unit, int96_to_i64_ns(x))462})463},464(D::Timestamp(time_unit, _), PPT::Int64) => {465rmap!(expect_int64, MutablePrimitiveArray::<i64>, @prim i64, |x| {466timestamp(logical_type.as_ref(), *time_unit, x)467})468},469470// Read Float16, since we don't have a f16 type in Polars we read it to a Float32.471(_, PPT::FixedLenByteArray(2))472if matches!(logical_type.as_ref(), Some(PrimitiveLogicalType::Float16)) =>473{474rmap!(expect_fixedlen, MutablePrimitiveArray::<f32>, @prim Vec<u8>, |v| f16::from_le_bytes([v[0], v[1]]).to_f32())475},476(D::Float32, _) => rmap!(expect_float, MutablePrimitiveArray::<f32>, @prim f32),477(D::Float64, _) => rmap!(expect_double, MutablePrimitiveArray::<f64>, @prim f64),478479(D::Decimal(_, _), PPT::Int32) => {480rmap!(expect_int32, MutablePrimitiveArray::<i128>, @prim i32 as i128)481},482(D::Decimal(_, _), PPT::Int64) => {483rmap!(expect_int64, MutablePrimitiveArray::<i128>, @prim i64 as i128)484},485(D::Decimal(_, _), PPT::FixedLenByteArray(n)) if *n > 16 => {486return Err(ParquetError::not_supported(format!(487"Can't decode Decimal128 type from Fixed Size Byte Array of len {n:?}",488)));489},490(D::Decimal(_, _), PPT::FixedLenByteArray(n)) => rmap!(491expect_fixedlen,492MutablePrimitiveArray::<i128>,493@prim Vec<u8>,494|x| convert_i128(&x, *n)495),496(D::Decimal256(_, _), PPT::Int32) => {497rmap!(expect_int32, MutablePrimitiveArray::<i256>, @prim i32, |x: i32| i256(I256::new(x.into())))498},499(D::Decimal256(_, _), PPT::Int64) => {500rmap!(expect_int64, MutablePrimitiveArray::<i256>, @prim i64, |x: i64| i256(I256::new(x.into())))501},502(D::Decimal256(_, _), PPT::FixedLenByteArray(n)) if *n > 16 => {503return Err(ParquetError::not_supported(format!(504"Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}",505)));506},507(D::Decimal256(_, _), PPT::FixedLenByteArray(_)) => rmap!(508expect_fixedlen,509MutablePrimitiveArray::<i256>,510@prim Vec<u8>,511|x| convert_i256(&x)512),513(D::Binary, _) => rmap!(@binary),514(D::LargeBinary, _) => rmap!(@binary),515(D::Utf8, _) => rmap!(@string),516(D::LargeUtf8, _) => rmap!(@string),517518(D::BinaryView, _) => rmap!(@binary),519(D::Utf8View, _) => rmap!(@string),520521(D::FixedSizeBinary(width), _) => {522rmap!(523expect_fixedlen,524|x: Option<Vec<u8>>| ParquetResult::Ok(x),525MutableFixedSizeBinaryArray,526*width527)528},529530other => todo!("{:?}", other),531};532533Ok(Some(ArrowColumnStatisticsArrays {534null_count: null_count.freeze(),535distinct_count: distinct_count.freeze(),536min_value,537max_value,538}))539},540}541}542543/// Deserializes the statistics in the column chunks from a single `row_group`544/// into [`Statistics`] associated from `field`'s name.545///546/// # Errors547/// This function errors if the deserialization of the statistics fails (e.g. invalid utf8)548pub fn deserialize<'a>(549field: &Field,550columns: &mut impl ExactSizeIterator<Item = &'a ColumnChunkMetadata>,551) -> ParquetResult<Option<Statistics>> {552use ArrowDataType as D;553match field.dtype() {554D::List(field) | D::LargeList(field) => Ok(Some(Statistics::List(555deserialize(field.as_ref(), columns)?.map(Box::new),556))),557D::Dictionary(key, dtype, is_sorted) => Ok(Some(Statistics::Dictionary(558*key,559deserialize(560&Field::new(PlSmallStr::EMPTY, dtype.as_ref().clone(), true),561columns,562)?563.map(Box::new),564*is_sorted,565))),566D::FixedSizeList(field, width) => Ok(Some(Statistics::FixedSizeList(567deserialize(field.as_ref(), columns)?.map(Box::new),568*width,569))),570D::Struct(fields) => {571let field_columns = fields572.iter()573.map(|f| deserialize(f, columns))574.collect::<ParquetResult<_>>()?;575Ok(Some(Statistics::Struct(field_columns)))576},577_ => {578let column = columns.next().unwrap();579580Ok(column.statistics().transpose()?.map(|statistics| {581let primitive_type = &column.descriptor().descriptor.primitive_type;582583Statistics::Column(Box::new(ColumnStatistics {584field: field.clone(),585586logical_type: primitive_type.logical_type,587physical_type: primitive_type.physical_type,588589statistics,590}))591}))592},593}594}595596597