Path: blob/main/crates/polars-parquet/src/arrow/write/binview/basic.rs
6940 views
use arrow::array::{Array, BinaryViewArray};1use polars_compute::min_max::MinMaxKernel;2use polars_error::PolarsResult;34use crate::parquet::encoding::delta_bitpacked;5use crate::parquet::schema::types::PrimitiveType;6use crate::parquet::statistics::{BinaryStatistics, ParquetStatistics};7use crate::read::schema::is_nullable;8use crate::write::binary::encode_non_null_values;9use crate::write::utils::invalid_encoding;10use crate::write::{EncodeNullability, Encoding, Page, StatisticsOptions, WriteOptions, utils};1112pub(crate) fn encode_plain(13array: &BinaryViewArray,14options: EncodeNullability,15buffer: &mut Vec<u8>,16) {17if options.is_optional() && array.validity().is_some() {18// @NOTE: This capacity might overestimate the amount of bytes since the buffers might19// still contain data that is not referenced by any value.20let capacity =21array.total_bytes_len() + (array.len() - array.null_count()) * size_of::<u32>();22buffer.reserve(capacity);2324encode_non_null_values(array.non_null_values_iter(), buffer);25// Append the non-null values.26} else {27// @NOTE: This capacity might overestimate the amount of bytes since the buffers might28// still contain data that is not referenced by any value.29let capacity = array.total_bytes_len() + array.len() * size_of::<u32>();30buffer.reserve(capacity);3132encode_non_null_values(array.values_iter(), buffer);33}34}3536pub(crate) fn encode_delta(37array: &BinaryViewArray,38options: EncodeNullability,39buffer: &mut Vec<u8>,40) {41if options.is_optional() && array.validity().is_some() {42let lengths = utils::ExactSizedIter::new(43array.non_null_views_iter().map(|v| v.length as i64),44array.len() - array.null_count(),45);46delta_bitpacked::encode(lengths, buffer, 1);4748for slice in array.non_null_values_iter() {49buffer.extend_from_slice(slice)50}51} else {52let lengths =53utils::ExactSizedIter::new(array.views().iter().map(|v| v.length as i64), array.len());54delta_bitpacked::encode(lengths, buffer, 1);5556buffer.extend(array.values_iter().flatten());57}58}5960pub fn array_to_page(61array: &BinaryViewArray,62options: WriteOptions,63type_: PrimitiveType,64encoding: Encoding,65) -> PolarsResult<Page> {66let is_optional = is_nullable(&type_.field_info);67let encode_options = EncodeNullability::new(is_optional);6869let mut buffer = vec![];70// TODO! reserve capacity71utils::write_def_levels(72&mut buffer,73is_optional,74array.validity(),75array.len(),76options.version,77)?;7879let definition_levels_byte_length = buffer.len();8081match encoding {82Encoding::Plain => encode_plain(array, encode_options, &mut buffer),83Encoding::DeltaLengthByteArray => encode_delta(array, encode_options, &mut buffer),84_ => return Err(invalid_encoding(encoding, array.dtype())),85}8687let statistics = if options.has_statistics() {88Some(build_statistics(array, type_.clone(), &options.statistics))89} else {90None91};9293utils::build_plain_page(94buffer,95array.len(),96array.len(),97array.null_count(),980,99definition_levels_byte_length,100statistics,101type_,102options,103encoding,104)105.map(Page::Data)106}107108pub(crate) fn build_statistics(109array: &BinaryViewArray,110primitive_type: PrimitiveType,111options: &StatisticsOptions,112) -> ParquetStatistics {113BinaryStatistics {114primitive_type,115null_count: options.null_count.then_some(array.null_count() as i64),116distinct_count: None,117max_value: options118.max_value119.then(|| array.max_propagate_nan_kernel().map(<[u8]>::to_vec))120.flatten(),121min_value: options122.min_value123.then(|| array.min_propagate_nan_kernel().map(<[u8]>::to_vec))124.flatten(),125}126.serialize()127}128129130