Path: blob/main/crates/polars-parquet/src/arrow/write/binary/basic.rs
6940 views
use arrow::array::{Array, BinaryArray, ValueSize};1use arrow::bitmap::Bitmap;2use arrow::offset::Offset;3use polars_error::PolarsResult;45use super::super::{WriteOptions, utils};6use crate::arrow::read::schema::is_nullable;7use crate::parquet::encoding::{Encoding, delta_bitpacked};8use crate::parquet::schema::types::PrimitiveType;9use crate::parquet::statistics::{BinaryStatistics, ParquetStatistics};10use crate::write::utils::invalid_encoding;11use crate::write::{EncodeNullability, Page, StatisticsOptions};1213pub(crate) fn encode_non_null_values<'a, I: Iterator<Item = &'a [u8]>>(14iter: I,15buffer: &mut Vec<u8>,16) {17iter.for_each(|x| {18// BYTE_ARRAY: first 4 bytes denote length in littleendian.19let len = (x.len() as u32).to_le_bytes();20buffer.extend_from_slice(&len);21buffer.extend_from_slice(x);22})23}2425pub(crate) fn encode_plain<O: Offset>(26array: &BinaryArray<O>,27options: EncodeNullability,28buffer: &mut Vec<u8>,29) {30if options.is_optional() && array.validity().is_some() {31let len_before = buffer.len();32let capacity =33array.get_values_size() + (array.len() - array.null_count()) * size_of::<u32>();34buffer.reserve(capacity);35encode_non_null_values(array.non_null_values_iter(), buffer);36// Ensure we allocated properly.37debug_assert_eq!(buffer.len() - len_before, capacity);38} else {39let len_before = buffer.len();40let capacity = array.get_values_size() + array.len() * size_of::<u32>();41buffer.reserve(capacity);42encode_non_null_values(array.values_iter(), buffer);43// Ensure we allocated properly.44debug_assert_eq!(buffer.len() - len_before, capacity);45}46}4748pub fn array_to_page<O: Offset>(49array: &BinaryArray<O>,50options: WriteOptions,51type_: PrimitiveType,52encoding: Encoding,53) -> PolarsResult<Page> {54let validity = array.validity();55let is_optional = is_nullable(&type_.field_info);56let encode_options = EncodeNullability::new(is_optional);5758let mut buffer = vec![];59utils::write_def_levels(60&mut buffer,61is_optional,62validity,63array.len(),64options.version,65)?;6667let definition_levels_byte_length = buffer.len();6869match encoding {70Encoding::Plain => encode_plain(array, encode_options, &mut buffer),71Encoding::DeltaLengthByteArray => encode_delta(72array.values(),73array.offsets().buffer(),74array.validity(),75encode_options,76&mut buffer,77),78_ => return Err(invalid_encoding(encoding, array.dtype())),79}8081let statistics = if options.has_statistics() {82Some(build_statistics(array, type_.clone(), &options.statistics))83} else {84None85};8687utils::build_plain_page(88buffer,89array.len(),90array.len(),91array.null_count(),920,93definition_levels_byte_length,94statistics,95type_,96options,97encoding,98)99.map(Page::Data)100}101102pub(crate) fn build_statistics<O: Offset>(103array: &BinaryArray<O>,104primitive_type: PrimitiveType,105options: &StatisticsOptions,106) -> ParquetStatistics {107use polars_compute::min_max::MinMaxKernel;108109BinaryStatistics {110primitive_type,111null_count: options.null_count.then_some(array.null_count() as i64),112distinct_count: None,113max_value: options114.max_value115.then(|| array.max_propagate_nan_kernel().map(<[u8]>::to_vec))116.flatten(),117min_value: options118.min_value119.then(|| array.min_propagate_nan_kernel().map(<[u8]>::to_vec))120.flatten(),121}122.serialize()123}124125pub(crate) fn encode_delta<O: Offset>(126values: &[u8],127offsets: &[O],128validity: Option<&Bitmap>,129options: EncodeNullability,130buffer: &mut Vec<u8>,131) {132if options.is_optional() && validity.is_some() {133if let Some(validity) = validity {134let lengths = offsets135.windows(2)136.map(|w| (w[1] - w[0]).to_usize() as i64)137.zip(validity.iter())138.flat_map(|(x, is_valid)| if is_valid { Some(x) } else { None });139let length = offsets.len() - 1 - validity.unset_bits();140let lengths = utils::ExactSizedIter::new(lengths, length);141142delta_bitpacked::encode(lengths, buffer, 1);143} else {144let lengths = offsets.windows(2).map(|w| (w[1] - w[0]).to_usize() as i64);145delta_bitpacked::encode(lengths, buffer, 1);146}147} else {148let lengths = offsets.windows(2).map(|w| (w[1] - w[0]).to_usize() as i64);149delta_bitpacked::encode(lengths, buffer, 1);150}151152buffer.extend_from_slice(153&values[offsets.first().unwrap().to_usize()..offsets.last().unwrap().to_usize()],154)155}156157/// Returns the ordering of two binary values. This corresponds to pyarrows' ordering158/// of statistics.159#[inline(always)]160pub(crate) fn ord_binary<'a>(a: &'a [u8], b: &'a [u8]) -> std::cmp::Ordering {161a.cmp(b)162}163164165