Path: blob/main/crates/polars-parquet/src/parquet/page/mod.rs
6940 views
use super::CowBuffer;1use crate::parquet::compression::Compression;2use crate::parquet::encoding::{Encoding, get_length};3use crate::parquet::error::{ParquetError, ParquetResult};4use crate::parquet::metadata::Descriptor;5pub use crate::parquet::parquet_bridge::{DataPageHeaderExt, PageType};6use crate::parquet::statistics::Statistics;7pub use crate::parquet::thrift_format::{8DataPageHeader as DataPageHeaderV1, DataPageHeaderV2, Encoding as FormatEncoding,9PageHeader as ParquetPageHeader,10};1112pub enum PageResult {13Single(Page),14Two { dict: DictPage, data: DataPage },15}1617/// A [`CompressedDataPage`] is compressed, encoded representation of a Parquet data page.18/// It holds actual data and thus cloning it is expensive.19#[derive(Debug)]20pub struct CompressedDataPage {21pub(crate) header: DataPageHeader,22pub(crate) buffer: CowBuffer,23pub(crate) compression: Compression,24uncompressed_page_size: usize,25pub(crate) descriptor: Descriptor,26pub num_rows: Option<usize>,27}2829impl CompressedDataPage {30/// Returns a new [`CompressedDataPage`].31pub fn new(32header: DataPageHeader,33buffer: CowBuffer,34compression: Compression,35uncompressed_page_size: usize,36descriptor: Descriptor,37num_rows: usize,38) -> Self {39Self {40header,41buffer,42compression,43uncompressed_page_size,44descriptor,45num_rows: Some(num_rows),46}47}4849/// Returns a new [`CompressedDataPage`].50pub(crate) fn new_read(51header: DataPageHeader,52buffer: CowBuffer,53compression: Compression,54uncompressed_page_size: usize,55descriptor: Descriptor,56) -> Self {57Self {58header,59buffer,60compression,61uncompressed_page_size,62descriptor,63num_rows: None,64}65}6667pub fn header(&self) -> &DataPageHeader {68&self.header69}7071pub fn uncompressed_size(&self) -> usize {72self.uncompressed_page_size73}7475pub fn compressed_size(&self) -> usize {76self.buffer.len()77}7879/// The compression of the data in this page.80/// Note that what is compressed in a page depends on its version:81/// in V1, the whole data (`[repetition levels][definition levels][values]`) is compressed; in V2 only the values are compressed.82pub fn compression(&self) -> Compression {83self.compression84}8586pub fn num_values(&self) -> usize {87self.header.num_values()88}8990pub fn num_rows(&self) -> Option<usize> {91self.num_rows92}9394/// Decodes the raw statistics into a statistics95pub fn statistics(&self) -> Option<ParquetResult<Statistics>> {96match &self.header {97DataPageHeader::V1(d) => d98.statistics99.as_ref()100.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),101DataPageHeader::V2(d) => d102.statistics103.as_ref()104.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),105}106}107108pub fn slice_mut(&mut self) -> &mut CowBuffer {109&mut self.buffer110}111}112113#[derive(Debug, Clone)]114pub enum DataPageHeader {115V1(DataPageHeaderV1),116V2(DataPageHeaderV2),117}118119impl DataPageHeader {120pub fn num_values(&self) -> usize {121match &self {122DataPageHeader::V1(d) => d.num_values as usize,123DataPageHeader::V2(d) => d.num_values as usize,124}125}126127pub fn null_count(&self) -> Option<usize> {128match &self {129DataPageHeader::V1(_) => None,130DataPageHeader::V2(d) => Some(d.num_nulls as usize),131}132}133134pub fn encoding(&self) -> FormatEncoding {135match self {136DataPageHeader::V1(d) => d.encoding,137DataPageHeader::V2(d) => d.encoding,138}139}140141pub fn is_dictionary_encoded(&self) -> bool {142matches!(self.encoding(), FormatEncoding::RLE_DICTIONARY)143}144}145146/// A [`DataPage`] is an uncompressed, encoded representation of a Parquet data page. It holds actual data147/// and thus cloning it is expensive.148#[derive(Debug, Clone)]149pub struct DataPage {150pub(super) header: DataPageHeader,151pub(super) buffer: CowBuffer,152pub descriptor: Descriptor,153pub num_rows: Option<usize>,154}155156impl DataPage {157pub fn new(158header: DataPageHeader,159buffer: CowBuffer,160descriptor: Descriptor,161num_rows: usize,162) -> Self {163Self {164header,165buffer,166descriptor,167num_rows: Some(num_rows),168}169}170171pub(crate) fn new_read(172header: DataPageHeader,173buffer: CowBuffer,174descriptor: Descriptor,175) -> Self {176Self {177header,178buffer,179descriptor,180num_rows: None,181}182}183184pub fn header(&self) -> &DataPageHeader {185&self.header186}187188pub fn buffer(&self) -> &[u8] {189&self.buffer190}191192/// Returns a mutable reference to the internal buffer.193/// Useful to recover the buffer after the page has been decoded.194pub fn buffer_mut(&mut self) -> &mut Vec<u8> {195self.buffer.to_mut()196}197198pub fn num_values(&self) -> usize {199self.header.num_values()200}201202pub fn null_count(&self) -> Option<usize> {203self.header.null_count()204}205206pub fn num_rows(&self) -> Option<usize> {207self.num_rows208}209210pub fn encoding(&self) -> Encoding {211match &self.header {212DataPageHeader::V1(d) => d.encoding(),213DataPageHeader::V2(d) => d.encoding(),214}215}216217pub fn definition_level_encoding(&self) -> Encoding {218match &self.header {219DataPageHeader::V1(d) => d.definition_level_encoding(),220DataPageHeader::V2(_) => Encoding::Rle,221}222}223224pub fn repetition_level_encoding(&self) -> Encoding {225match &self.header {226DataPageHeader::V1(d) => d.repetition_level_encoding(),227DataPageHeader::V2(_) => Encoding::Rle,228}229}230231/// Decodes the raw statistics into a statistics232pub fn statistics(&self) -> Option<ParquetResult<Statistics>> {233match &self.header {234DataPageHeader::V1(d) => d235.statistics236.as_ref()237.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),238DataPageHeader::V2(d) => d239.statistics240.as_ref()241.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),242}243}244}245246/// A [`Page`] is an uncompressed, encoded representation of a Parquet page. It may hold actual data247/// and thus cloning it may be expensive.248#[derive(Debug)]249#[allow(clippy::large_enum_variant)]250pub enum Page {251/// A [`DataPage`]252Data(DataPage),253/// A [`DictPage`]254Dict(DictPage),255}256257impl Page {258pub(crate) fn buffer_mut(&mut self) -> &mut Vec<u8> {259match self {260Self::Data(page) => page.buffer.to_mut(),261Self::Dict(page) => page.buffer.to_mut(),262}263}264265pub(crate) fn unwrap_data(self) -> DataPage {266match self {267Self::Data(page) => page,268_ => panic!(),269}270}271}272273/// A [`CompressedPage`] is a compressed, encoded representation of a Parquet page. It holds actual data274/// and thus cloning it is expensive.275#[derive(Debug)]276#[allow(clippy::large_enum_variant)]277pub enum CompressedPage {278Data(CompressedDataPage),279Dict(CompressedDictPage),280}281282impl CompressedPage {283pub(crate) fn buffer_mut(&mut self) -> &mut Vec<u8> {284match self {285CompressedPage::Data(page) => page.buffer.to_mut(),286CompressedPage::Dict(page) => page.buffer.to_mut(),287}288}289290pub(crate) fn compression(&self) -> Compression {291match self {292CompressedPage::Data(page) => page.compression(),293CompressedPage::Dict(page) => page.compression(),294}295}296297pub(crate) fn num_values(&self) -> usize {298match self {299CompressedPage::Data(page) => page.num_values(),300CompressedPage::Dict(_) => 0,301}302}303304pub(crate) fn num_rows(&self) -> Option<usize> {305match self {306CompressedPage::Data(page) => page.num_rows(),307CompressedPage::Dict(_) => Some(0),308}309}310}311312/// An uncompressed, encoded dictionary page.313#[derive(Debug, Clone)]314pub struct DictPage {315pub buffer: CowBuffer,316pub num_values: usize,317pub is_sorted: bool,318}319320impl DictPage {321pub fn new(buffer: CowBuffer, num_values: usize, is_sorted: bool) -> Self {322Self {323buffer,324num_values,325is_sorted,326}327}328}329330/// A compressed, encoded dictionary page.331#[derive(Debug)]332pub struct CompressedDictPage {333pub(crate) buffer: CowBuffer,334compression: Compression,335pub(crate) num_values: usize,336pub(crate) uncompressed_page_size: usize,337pub is_sorted: bool,338}339340impl CompressedDictPage {341pub fn new(342buffer: CowBuffer,343compression: Compression,344uncompressed_page_size: usize,345num_values: usize,346is_sorted: bool,347) -> Self {348Self {349buffer,350compression,351uncompressed_page_size,352num_values,353is_sorted,354}355}356357/// The compression of the data in this page.358pub fn compression(&self) -> Compression {359self.compression360}361}362363pub struct EncodedSplitBuffer<'a> {364/// Encoded Repetition Levels365pub rep: &'a [u8],366/// Encoded Definition Levels367pub def: &'a [u8],368/// Encoded Values369pub values: &'a [u8],370}371372/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values) for v1 pages.373#[inline]374pub fn split_buffer_v1(375buffer: &[u8],376has_rep: bool,377has_def: bool,378) -> ParquetResult<EncodedSplitBuffer<'_>> {379let (rep, buffer) = if has_rep {380let level_buffer_length = get_length(buffer).ok_or_else(|| {381ParquetError::oos(382"The number of bytes declared in v1 rep levels is higher than the page size",383)384})?;385386if buffer.len() < level_buffer_length + 4 {387return Err(ParquetError::oos(388"The number of bytes declared in v1 rep levels is higher than the page size",389));390}391392buffer[4..].split_at(level_buffer_length)393} else {394(&[] as &[u8], buffer)395};396397let (def, buffer) = if has_def {398let level_buffer_length = get_length(buffer).ok_or_else(|| {399ParquetError::oos(400"The number of bytes declared in v1 def levels is higher than the page size",401)402})?;403404if buffer.len() < level_buffer_length + 4 {405return Err(ParquetError::oos(406"The number of bytes declared in v1 def levels is higher than the page size",407));408}409410buffer[4..].split_at(level_buffer_length)411} else {412(&[] as &[u8], buffer)413};414415Ok(EncodedSplitBuffer {416rep,417def,418values: buffer,419})420}421422/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values) for v2 pages.423pub fn split_buffer_v2(424buffer: &[u8],425rep_level_buffer_length: usize,426def_level_buffer_length: usize,427) -> ParquetResult<EncodedSplitBuffer<'_>> {428let (rep, buffer) = buffer.split_at(rep_level_buffer_length);429let (def, values) = buffer.split_at(def_level_buffer_length);430431Ok(EncodedSplitBuffer { rep, def, values })432}433434/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values).435pub fn split_buffer(page: &DataPage) -> ParquetResult<EncodedSplitBuffer<'_>> {436match page.header() {437DataPageHeader::V1(_) => split_buffer_v1(438page.buffer(),439page.descriptor.max_rep_level > 0,440page.descriptor.max_def_level > 0,441),442DataPageHeader::V2(header) => {443let def_level_buffer_length: usize = header.definition_levels_byte_length.try_into()?;444let rep_level_buffer_length: usize = header.repetition_levels_byte_length.try_into()?;445split_buffer_v2(446page.buffer(),447rep_level_buffer_length,448def_level_buffer_length,449)450},451}452}453454455