Path: blob/main/crates/polars-parquet/src/arrow/write/utils.rs
6940 views
use arrow::bitmap::Bitmap;1use arrow::datatypes::ArrowDataType;2use polars_error::*;34use super::{Version, WriteOptions};5use crate::parquet::CowBuffer;6use crate::parquet::compression::CompressionOptions;7use crate::parquet::encoding::Encoding;8use crate::parquet::encoding::hybrid_rle::{self, encode};9use crate::parquet::metadata::Descriptor;10use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2};11use crate::parquet::schema::types::PrimitiveType;12use crate::parquet::statistics::ParquetStatistics;1314/// writes the def levels to a `Vec<u8>` and returns it.15pub fn write_def_levels(16writer: &mut Vec<u8>,17is_optional: bool,18validity: Option<&Bitmap>,19len: usize,20version: Version,21) -> PolarsResult<()> {22if is_optional {23match version {24Version::V1 => {25writer.extend(&[0, 0, 0, 0]);26let start = writer.len();2728match validity {29None => <bool as hybrid_rle::Encoder<bool>>::run_length_encode(30writer, len, true, 1,31)?,32Some(validity) => encode::<bool, _, _>(writer, validity.iter(), 1)?,33}3435// write the first 4 bytes as length36let length = ((writer.len() - start) as i32).to_le_bytes();37(0..4).for_each(|i| writer[start - 4 + i] = length[i]);38},39Version::V2 => match validity {40None => {41<bool as hybrid_rle::Encoder<bool>>::run_length_encode(writer, len, true, 1)?42},43Some(validity) => encode::<bool, _, _>(writer, validity.iter(), 1)?,44},45}4647Ok(())48} else {49// is required => no def levels50Ok(())51}52}5354#[allow(clippy::too_many_arguments)]55pub fn build_plain_page(56buffer: Vec<u8>,57num_values: usize,58num_rows: usize,59null_count: usize,60repetition_levels_byte_length: usize,61definition_levels_byte_length: usize,62statistics: Option<ParquetStatistics>,63type_: PrimitiveType,64options: WriteOptions,65encoding: Encoding,66) -> PolarsResult<DataPage> {67let header = match options.version {68Version::V1 => DataPageHeader::V1(DataPageHeaderV1 {69num_values: num_values as i32,70encoding: encoding.into(),71definition_level_encoding: Encoding::Rle.into(),72repetition_level_encoding: Encoding::Rle.into(),73statistics,74}),75Version::V2 => DataPageHeader::V2(DataPageHeaderV2 {76num_values: num_values as i32,77encoding: encoding.into(),78num_nulls: null_count as i32,79num_rows: num_rows as i32,80definition_levels_byte_length: definition_levels_byte_length as i32,81repetition_levels_byte_length: repetition_levels_byte_length as i32,82is_compressed: Some(options.compression != CompressionOptions::Uncompressed),83statistics,84}),85};86Ok(DataPage::new(87header,88CowBuffer::Owned(buffer),89Descriptor {90primitive_type: type_,91max_def_level: 0,92max_rep_level: 0,93},94num_rows,95))96}9798/// Auxiliary iterator adapter to declare the size hint of an iterator.99pub(super) struct ExactSizedIter<T, I: Iterator<Item = T>> {100iter: I,101remaining: usize,102}103104impl<T, I: Iterator<Item = T> + Clone> Clone for ExactSizedIter<T, I> {105fn clone(&self) -> Self {106Self {107iter: self.iter.clone(),108remaining: self.remaining,109}110}111}112113impl<T, I: Iterator<Item = T>> ExactSizedIter<T, I> {114pub fn new(iter: I, length: usize) -> Self {115Self {116iter,117remaining: length,118}119}120}121122impl<T, I: Iterator<Item = T>> Iterator for ExactSizedIter<T, I> {123type Item = T;124125#[inline]126fn next(&mut self) -> Option<Self::Item> {127self.iter.next().inspect(|_| self.remaining -= 1)128}129130#[inline]131fn size_hint(&self) -> (usize, Option<usize>) {132(self.remaining, Some(self.remaining))133}134}135136impl<T, I: Iterator<Item = T>> std::iter::ExactSizeIterator for ExactSizedIter<T, I> {}137138/// Returns the number of bits needed to bitpack `max`139#[inline]140pub fn get_bit_width(max: u64) -> u32 {14164 - max.leading_zeros()142}143144pub(super) fn invalid_encoding(encoding: Encoding, dtype: &ArrowDataType) -> PolarsError {145polars_err!(InvalidOperation:146"Datatype {:?} cannot be encoded by {:?} encoding",147dtype,148encoding149)150}151152153