Path: blob/main/crates/polars-parquet/src/parquet/write/compression.rs
6940 views
use crate::parquet::compression::CompressionOptions;1use crate::parquet::error::{ParquetError, ParquetResult};2use crate::parquet::page::{3CompressedDataPage, CompressedDictPage, CompressedPage, DataPage, DataPageHeader, DictPage,4Page,5};6use crate::parquet::{CowBuffer, FallibleStreamingIterator, compression};78/// Compresses a [`DataPage`] into a [`CompressedDataPage`].9fn compress_data(10page: DataPage,11mut compressed_buffer: Vec<u8>,12compression: CompressionOptions,13) -> ParquetResult<CompressedDataPage> {14let DataPage {15mut buffer,16header,17descriptor,18num_rows,19} = page;20let uncompressed_page_size = buffer.len();21let num_rows = num_rows.expect("We should have num_rows when we are writing");22if compression != CompressionOptions::Uncompressed {23match &header {24DataPageHeader::V1(_) => {25compression::compress(compression, &buffer, &mut compressed_buffer)?;26},27DataPageHeader::V2(header) => {28let levels_byte_length = (header.repetition_levels_byte_length29+ header.definition_levels_byte_length)30as usize;31compressed_buffer.extend_from_slice(&buffer[..levels_byte_length]);32compression::compress(33compression,34&buffer[levels_byte_length..],35&mut compressed_buffer,36)?;37},38};39} else {40std::mem::swap(buffer.to_mut(), &mut compressed_buffer);41}4243Ok(CompressedDataPage::new(44header,45CowBuffer::Owned(compressed_buffer),46compression.into(),47uncompressed_page_size,48descriptor,49num_rows,50))51}5253fn compress_dict(54page: DictPage,55mut compressed_buffer: Vec<u8>,56compression: CompressionOptions,57) -> ParquetResult<CompressedDictPage> {58let DictPage {59buffer,60num_values,61is_sorted,62} = page;6364let uncompressed_page_size = buffer.len();65let compressed_buffer = if compression != CompressionOptions::Uncompressed {66compression::compress(compression, &buffer, &mut compressed_buffer)?;67CowBuffer::Owned(compressed_buffer)68} else {69buffer70};7172Ok(CompressedDictPage::new(73compressed_buffer,74compression.into(),75uncompressed_page_size,76num_values,77is_sorted,78))79}8081/// Compresses an [`EncodedPage`] into a [`CompressedPage`] using `compressed_buffer` as the82/// intermediary buffer.83///84/// `compressed_buffer` is taken by value because it becomes owned by [`CompressedPage`]85///86/// # Errors87/// Errors if the compressor fails88pub fn compress(89page: Page,90compressed_buffer: Vec<u8>,91compression: CompressionOptions,92) -> ParquetResult<CompressedPage> {93match page {94Page::Data(page) => {95compress_data(page, compressed_buffer, compression).map(CompressedPage::Data)96},97Page::Dict(page) => {98compress_dict(page, compressed_buffer, compression).map(CompressedPage::Dict)99},100}101}102103/// A [`FallibleStreamingIterator`] that consumes [`Page`] and yields [`CompressedPage`]104/// holding a reusable buffer ([`Vec<u8>`]) for compression.105pub struct Compressor<I: Iterator<Item = ParquetResult<Page>>> {106iter: I,107compression: CompressionOptions,108buffer: Vec<u8>,109current: Option<CompressedPage>,110}111112impl<I: Iterator<Item = ParquetResult<Page>>> Compressor<I> {113/// Creates a new [`Compressor`]114pub fn new(iter: I, compression: CompressionOptions, buffer: Vec<u8>) -> Self {115Self {116iter,117compression,118buffer,119current: None,120}121}122123/// Creates a new [`Compressor`] (same as `new`)124pub fn new_from_vec(iter: I, compression: CompressionOptions, buffer: Vec<u8>) -> Self {125Self::new(iter, compression, buffer)126}127128/// Deconstructs itself into its iterator and scratch buffer.129pub fn into_inner(mut self) -> (I, Vec<u8>) {130let mut buffer = if let Some(page) = self.current.as_mut() {131std::mem::take(page.buffer_mut())132} else {133std::mem::take(&mut self.buffer)134};135buffer.clear();136(self.iter, buffer)137}138}139140impl<I: Iterator<Item = ParquetResult<Page>>> FallibleStreamingIterator for Compressor<I> {141type Item = CompressedPage;142type Error = ParquetError;143144fn advance(&mut self) -> std::result::Result<(), Self::Error> {145let mut compressed_buffer = if let Some(page) = self.current.as_mut() {146std::mem::take(page.buffer_mut())147} else {148std::mem::take(&mut self.buffer)149};150compressed_buffer.clear();151152let next = self153.iter154.next()155.map(|x| x.and_then(|page| compress(page, compressed_buffer, self.compression)))156.transpose()?;157self.current = next;158Ok(())159}160161fn get(&self) -> Option<&Self::Item> {162self.current.as_ref()163}164}165166impl<I: Iterator<Item = ParquetResult<Page>>> Iterator for Compressor<I> {167type Item = ParquetResult<CompressedPage>;168169fn next(&mut self) -> Option<Self::Item> {170let mut compressed_buffer = if let Some(page) = self.current.as_mut() {171std::mem::take(page.buffer_mut())172} else {173std::mem::take(&mut self.buffer)174};175compressed_buffer.clear();176177let page = self.iter.next()?;178let page = match page {179Ok(page) => page,180Err(err) => return Some(Err(err)),181};182183Some(compress(page, compressed_buffer, self.compression))184}185}186187188