Path: blob/main/crates/polars-parquet/src/parquet/read/compression.rs
6940 views
use polars_parquet_format::DataPageHeaderV2;12use super::PageReader;3use crate::parquet::CowBuffer;4use crate::parquet::compression::{self, Compression, DecompressionContext};5use crate::parquet::error::{ParquetError, ParquetResult};6use crate::parquet::page::{7CompressedDataPage, CompressedPage, DataPage, DataPageHeader, DictPage, Page,8};910fn decompress_v1(11compressed: &[u8],12compression: Compression,13buffer: &mut [u8],14context: &mut DecompressionContext,15) -> ParquetResult<()> {16compression::decompress(compression, compressed, buffer, context)17}1819fn decompress_v2(20compressed: &[u8],21page_header: &DataPageHeaderV2,22compression: Compression,23buffer: &mut [u8],24context: &mut DecompressionContext,25) -> ParquetResult<()> {26// When processing data page v2, depending on enabled compression for the27// page, we should account for uncompressed data ('offset') of28// repetition and definition levels.29//30// We always use 0 offset for other pages other than v2, `true` flag means31// that compression will be applied if decompressor is defined32let offset = (page_header.definition_levels_byte_length33+ page_header.repetition_levels_byte_length) as usize;34// When is_compressed flag is missing the page is considered compressed35let can_decompress = page_header.is_compressed.unwrap_or(true);3637if can_decompress {38if offset > buffer.len() || offset > compressed.len() {39return Err(ParquetError::oos(40"V2 Page Header reported incorrect offset to compressed data",41));42}4344(buffer[..offset]).copy_from_slice(&compressed[..offset]);4546// https://github.com/pola-rs/polars/issues/2217047if compressed.len() > offset {48compression::decompress(49compression,50&compressed[offset..],51&mut buffer[offset..],52context,53)?;54}55} else {56if buffer.len() != compressed.len() {57return Err(ParquetError::oos(58"V2 Page Header reported incorrect decompressed size",59));60}61buffer.copy_from_slice(compressed);62}63Ok(())64}6566/// Decompresses the page, using `buffer` for decompression.67/// If `page.buffer.len() == 0`, there was no decompression and the buffer was moved.68/// Else, decompression took place.69pub fn decompress(70compressed_page: CompressedPage,71buffer: &mut Vec<u8>,72context: &mut DecompressionContext,73) -> ParquetResult<Page> {74Ok(match (compressed_page.compression(), compressed_page) {75(Compression::Uncompressed, CompressedPage::Data(page)) => Page::Data(DataPage::new_read(76page.header,77page.buffer,78page.descriptor,79)),80(_, CompressedPage::Data(page)) => {81// prepare the compression buffer82let read_size = page.uncompressed_size();83if read_size > buffer.capacity() {84// Clear before resizing to avoid memcpy'ing junk we don't care about anymore rather85// than just memset'ing zeroes.86buffer.clear();87}88buffer.resize(read_size, 0);8990match page.header() {91DataPageHeader::V1(_) => {92decompress_v1(&page.buffer, page.compression, buffer, context)?93},94DataPageHeader::V2(header) => {95decompress_v2(&page.buffer, header, page.compression, buffer, context)?96},97}98let buffer = CowBuffer::Owned(std::mem::take(buffer));99100Page::Data(DataPage::new_read(page.header, buffer, page.descriptor))101},102(Compression::Uncompressed, CompressedPage::Dict(page)) => Page::Dict(DictPage {103buffer: page.buffer,104num_values: page.num_values,105is_sorted: page.is_sorted,106}),107(_, CompressedPage::Dict(page)) => {108// prepare the compression buffer109let read_size = page.uncompressed_page_size;110if read_size > buffer.capacity() {111// Clear before resizing to avoid memcpy'ing junk we don't care about anymore rather112// than just memset'ing zeroes.113buffer.clear();114}115buffer.resize(read_size, 0);116117decompress_v1(&page.buffer, page.compression(), buffer, context)?;118let buffer = CowBuffer::Owned(std::mem::take(buffer));119120Page::Dict(DictPage {121buffer,122num_values: page.num_values,123is_sorted: page.is_sorted,124})125},126})127}128129type _Decompressor<I> = streaming_decompression::Decompressor<130CompressedPage,131Page,132fn(CompressedPage, &mut Vec<u8>) -> ParquetResult<Page>,133ParquetError,134I,135>;136137impl streaming_decompression::Compressed for CompressedPage {138#[inline]139fn is_compressed(&self) -> bool {140self.compression() != Compression::Uncompressed141}142}143144impl streaming_decompression::Decompressed for Page {145#[inline]146fn buffer_mut(&mut self) -> &mut Vec<u8> {147self.buffer_mut()148}149}150151/// A [`FallibleStreamingIterator`] that decompresses [`CompressedPage`] into [`DataPage`].152/// # Implementation153/// This decompressor uses an internal [`Vec<u8>`] to perform decompressions which154/// is reused across pages, so that a single allocation is required.155/// If the pages are not compressed, the internal buffer is not used.156pub struct BasicDecompressor {157reader: PageReader,158buffer: Vec<u8>,159context: DecompressionContext,160}161162impl BasicDecompressor {163/// Create a new [`BasicDecompressor`]164pub fn new(reader: PageReader, buffer: Vec<u8>) -> Self {165Self {166reader,167buffer,168context: DecompressionContext::Unset,169}170}171172/// The total number of values is given from the `ColumnChunk` metadata.173///174/// - Nested column: equal to the number of non-null values at the lowest nesting level.175/// - Unnested column: equal to the number of non-null rows.176pub fn total_num_values(&self) -> usize {177self.reader.total_num_values()178}179180/// Returns its internal buffer, consuming itself.181pub fn into_inner(self) -> Vec<u8> {182self.buffer183}184185pub fn read_dict_page(&mut self) -> ParquetResult<Option<DictPage>> {186match self.reader.read_dict()? {187None => Ok(None),188Some(p) => {189let num_values = p.num_values;190let page = decompress(191CompressedPage::Dict(p),192&mut Vec::with_capacity(num_values),193&mut self.context,194)?;195196match page {197Page::Dict(d) => Ok(Some(d)),198Page::Data(_) => unreachable!(),199}200},201}202}203204pub fn reuse_page_buffer(&mut self, page: DataPage) {205let buffer = match page.buffer {206CowBuffer::Borrowed(_) => return,207CowBuffer::Owned(vec) => vec,208};209210if self.buffer.capacity() > buffer.capacity() {211return;212};213214self.buffer = buffer;215}216}217218pub struct DataPageItem {219page: CompressedDataPage,220}221222impl DataPageItem {223pub fn num_values(&self) -> usize {224self.page.num_values()225}226227pub fn page(&self) -> &CompressedDataPage {228&self.page229}230231pub fn decompress(self, decompressor: &mut BasicDecompressor) -> ParquetResult<DataPage> {232let p = decompress(233CompressedPage::Data(self.page),234&mut decompressor.buffer,235&mut decompressor.context,236)?;237let Page::Data(p) = p else {238panic!("Decompressing a data page should result in a data page");239};240241Ok(p)242}243}244245impl Iterator for BasicDecompressor {246type Item = ParquetResult<DataPageItem>;247248fn next(&mut self) -> Option<Self::Item> {249let page = match self.reader.next() {250None => return None,251Some(Err(e)) => return Some(Err(e)),252Some(Ok(p)) => p,253};254255let CompressedPage::Data(page) = page else {256return Some(Err(ParquetError::oos(257"Found dictionary page beyond the first page of a column chunk",258)));259};260261Some(Ok(DataPageItem { page }))262}263264fn size_hint(&self) -> (usize, Option<usize>) {265self.reader.size_hint()266}267}268269#[cfg(test)]270mod tests {271use polars_parquet_format::Encoding;272273use super::*;274275#[test]276fn test_decompress_v2_empty_datapage() {277let compressions = [278Compression::Snappy,279Compression::Gzip,280Compression::Lzo,281Compression::Brotli,282Compression::Lz4,283Compression::Zstd,284Compression::Lz4Raw,285];286287// this datapage has an empty compressed section after the first two bytes (uncompressed definition levels)288let compressed: &mut Vec<u8> = &mut vec![0x03, 0x00];289let page_header = DataPageHeaderV2::new(1, 1, 1, Encoding::PLAIN, 2, 0, true, None);290let buffer: &mut Vec<u8> = &mut vec![0, 2];291292compressions.iter().for_each(|compression| {293test_decompress_v2_datapage(compressed, &page_header, *compression, buffer, compressed)294});295}296297fn test_decompress_v2_datapage(298compressed: &[u8],299page_header: &DataPageHeaderV2,300compression: Compression,301buffer: &mut [u8],302expected: &[u8],303) {304decompress_v2(305compressed,306page_header,307compression,308buffer,309&mut DecompressionContext::Unset,310)311.unwrap();312assert_eq!(buffer, expected);313}314}315316317