Path: blob/main/crates/polars-parquet/src/arrow/write/primitive/basic.rs
6940 views
use arrow::array::{Array, PrimitiveArray};1use arrow::scalar::PrimitiveScalar;2use arrow::types::NativeType;3use polars_error::{PolarsResult, polars_bail};45use super::super::{WriteOptions, utils};6use crate::arrow::read::schema::is_nullable;7use crate::arrow::write::utils::ExactSizedIter;8use crate::parquet::encoding::Encoding;9use crate::parquet::encoding::delta_bitpacked::encode;10use crate::parquet::page::DataPage;11use crate::parquet::schema::types::PrimitiveType;12use crate::parquet::statistics::PrimitiveStatistics;13use crate::parquet::types::NativeType as ParquetNativeType;14use crate::read::Page;15use crate::write::{EncodeNullability, StatisticsOptions};1617pub(crate) fn encode_plain<T, P>(18array: &PrimitiveArray<T>,19options: EncodeNullability,20mut buffer: Vec<u8>,21) -> Vec<u8>22where23T: NativeType,24P: ParquetNativeType,25T: num_traits::AsPrimitive<P>,26{27let is_optional = options.is_optional();2829if is_optional {30// append the non-null values31let validity = array.validity();3233if let Some(validity) = validity {34let null_count = validity.unset_bits();3536if null_count > 0 {37let mut iter = validity.iter();38let values = array.values().as_slice();3940buffer.reserve(size_of::<P::Bytes>() * (array.len() - null_count));4142let mut offset = 0;43let mut remaining_valid = array.len() - null_count;44while remaining_valid > 0 {45let num_valid = iter.take_leading_ones();46buffer.extend(47values[offset..offset + num_valid]48.iter()49.flat_map(|value| value.as_().to_le_bytes()),50);51remaining_valid -= num_valid;52offset += num_valid;5354let num_invalid = iter.take_leading_zeros();55offset += num_invalid;56}5758return buffer;59}60}61}6263buffer.reserve(size_of::<P>() * array.len());64buffer.extend(65array66.values()67.iter()68.flat_map(|value| value.as_().to_le_bytes()),69);7071buffer72}7374pub(crate) fn encode_delta<T, P>(75array: &PrimitiveArray<T>,76options: EncodeNullability,77mut buffer: Vec<u8>,78) -> Vec<u8>79where80T: NativeType,81P: ParquetNativeType,82T: num_traits::AsPrimitive<P>,83P: num_traits::AsPrimitive<i64>,84{85let is_optional = options.is_optional();8687if is_optional {88// append the non-null values89let iterator = array.non_null_values_iter().map(|x| {90let parquet_native: P = x.as_();91let integer: i64 = parquet_native.as_();92integer93});94let iterator = ExactSizedIter::new(iterator, array.len() - array.null_count());95encode(iterator, &mut buffer, 1)96} else {97// append all values98let iterator = array.values().iter().map(|x| {99let parquet_native: P = x.as_();100let integer: i64 = parquet_native.as_();101integer102});103encode(iterator, &mut buffer, 1)104}105buffer106}107108pub fn array_to_page_plain<T, P>(109array: &PrimitiveArray<T>,110options: WriteOptions,111type_: PrimitiveType,112) -> PolarsResult<DataPage>113where114T: NativeType,115P: ParquetNativeType,116T: num_traits::AsPrimitive<P>,117{118array_to_page(array, options, type_, Encoding::Plain, encode_plain)119}120121pub fn array_to_page_integer<T, P>(122array: &PrimitiveArray<T>,123options: WriteOptions,124type_: PrimitiveType,125encoding: Encoding,126) -> PolarsResult<Page>127where128T: NativeType,129P: ParquetNativeType,130T: num_traits::AsPrimitive<P>,131P: num_traits::AsPrimitive<i64>,132{133match encoding {134Encoding::Plain => array_to_page(array, options, type_, encoding, encode_plain),135Encoding::DeltaBinaryPacked => array_to_page(array, options, type_, encoding, encode_delta),136other => polars_bail!(nyi = "Encoding integer as {other:?}"),137}138.map(Page::Data)139}140141pub fn array_to_page<T, P, F: Fn(&PrimitiveArray<T>, EncodeNullability, Vec<u8>) -> Vec<u8>>(142array: &PrimitiveArray<T>,143options: WriteOptions,144type_: PrimitiveType,145encoding: Encoding,146encode: F,147) -> PolarsResult<DataPage>148where149T: NativeType,150P: ParquetNativeType,151// constraint required to build statistics152T: num_traits::AsPrimitive<P>,153{154let is_optional = is_nullable(&type_.field_info);155let encode_options = EncodeNullability::new(is_optional);156157let validity = array.validity();158159let mut buffer = vec![];160utils::write_def_levels(161&mut buffer,162is_optional,163validity,164array.len(),165options.version,166)?;167168let definition_levels_byte_length = buffer.len();169170let buffer = encode(array, encode_options, buffer);171172let statistics = if options.has_statistics() {173Some(build_statistics(array, type_.clone(), &options.statistics).serialize())174} else {175None176};177178utils::build_plain_page(179buffer,180array.len(),181array.len(),182array.null_count(),1830,184definition_levels_byte_length,185statistics,186type_,187options,188encoding,189)190}191192pub fn build_statistics<T, P>(193array: &PrimitiveArray<T>,194primitive_type: PrimitiveType,195options: &StatisticsOptions,196) -> PrimitiveStatistics<P>197where198T: NativeType,199P: ParquetNativeType,200T: num_traits::AsPrimitive<P>,201{202let (min_value, max_value) = match (options.min_value, options.max_value) {203(true, true) => {204match polars_compute::min_max::dyn_array_min_max_propagate_nan(array as &dyn Array) {205None => (None, None),206Some((l, r)) => (Some(l), Some(r)),207}208},209(true, false) => (210polars_compute::min_max::dyn_array_min_propagate_nan(array as &dyn Array),211None,212),213(false, true) => (214None,215polars_compute::min_max::dyn_array_max_propagate_nan(array as &dyn Array),216),217(false, false) => (None, None),218};219220let min_value = min_value.and_then(|s| {221s.as_any()222.downcast_ref::<PrimitiveScalar<T>>()223.unwrap()224.value()225.map(|x| x.as_())226});227let max_value = max_value.and_then(|s| {228s.as_any()229.downcast_ref::<PrimitiveScalar<T>>()230.unwrap()231.value()232.map(|x| x.as_())233});234235PrimitiveStatistics::<P> {236primitive_type,237null_count: options.null_count.then_some(array.null_count() as i64),238distinct_count: None,239max_value,240min_value,241}242}243244245