Path: blob/main/crates/polars-parquet/src/parquet/page/mod.rs
8512 views
use super::CowBuffer;1use crate::parquet::compression::Compression;2use crate::parquet::encoding::{Encoding, get_length};3use crate::parquet::error::{ParquetError, ParquetResult};4use crate::parquet::metadata::Descriptor;5pub use crate::parquet::parquet_bridge::{DataPageHeaderExt, PageType};6use crate::parquet::statistics::Statistics;7pub use crate::parquet::thrift_format::{8DataPageHeader as DataPageHeaderV1, DataPageHeaderV2, Encoding as FormatEncoding,9PageHeader as ParquetPageHeader,10};1112pub enum PageResult {13Single(Page),14Two { dict: DictPage, data: DataPage },15}1617/// A [`CompressedDataPage`] is compressed, encoded representation of a Parquet data page.18/// It holds actual data and thus cloning it is expensive.19#[derive(Debug)]20pub struct CompressedDataPage {21pub(crate) header: DataPageHeader,22pub(crate) buffer: CowBuffer,23pub(crate) compression: Compression,24uncompressed_page_size: usize,25pub(crate) descriptor: Descriptor,26pub num_rows: Option<usize>,27}2829impl CompressedDataPage {30/// Returns a new [`CompressedDataPage`].31pub fn new(32header: DataPageHeader,33buffer: CowBuffer,34compression: Compression,35uncompressed_page_size: usize,36descriptor: Descriptor,37num_rows: usize,38) -> Self {39Self {40header,41buffer,42compression,43uncompressed_page_size,44descriptor,45num_rows: Some(num_rows),46}47}4849/// Returns a new [`CompressedDataPage`].50pub(crate) fn new_read(51header: DataPageHeader,52buffer: CowBuffer,53compression: Compression,54uncompressed_page_size: usize,55descriptor: Descriptor,56) -> Self {57Self {58header,59buffer,60compression,61uncompressed_page_size,62descriptor,63num_rows: None,64}65}6667pub fn header(&self) -> &DataPageHeader {68&self.header69}7071pub fn uncompressed_size(&self) -> usize {72self.uncompressed_page_size73}7475pub fn compressed_size(&self) -> usize {76self.buffer.len()77}7879/// The compression of the data in this page.80/// Note that what is compressed in a page depends on its version:81/// in V1, the whole data (`[repetition levels][definition levels][values]`) is compressed; in V2 only the values are compressed.82pub fn compression(&self) -> Compression {83self.compression84}8586pub fn num_values(&self) -> usize {87self.header.num_values()88}8990pub fn num_rows(&self) -> Option<usize> {91self.num_rows92}9394pub fn null_count(&self) -> Option<usize> {95self.header().null_count()96}9798/// Decodes the raw statistics into a statistics99pub fn statistics(&self) -> Option<ParquetResult<Statistics>> {100match &self.header {101DataPageHeader::V1(d) => d102.statistics103.as_ref()104.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),105DataPageHeader::V2(d) => d106.statistics107.as_ref()108.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),109}110}111112pub fn slice_mut(&mut self) -> &mut CowBuffer {113&mut self.buffer114}115}116117#[derive(Debug, Clone)]118pub enum DataPageHeader {119V1(DataPageHeaderV1),120V2(DataPageHeaderV2),121}122123impl DataPageHeader {124pub fn num_values(&self) -> usize {125match &self {126DataPageHeader::V1(d) => d.num_values as usize,127DataPageHeader::V2(d) => d.num_values as usize,128}129}130131pub fn null_count(&self) -> Option<usize> {132match &self {133DataPageHeader::V1(_) => None,134DataPageHeader::V2(d) => Some(d.num_nulls as usize),135}136}137138pub fn encoding(&self) -> FormatEncoding {139match self {140DataPageHeader::V1(d) => d.encoding,141DataPageHeader::V2(d) => d.encoding,142}143}144145pub fn is_dictionary_encoded(&self) -> bool {146matches!(self.encoding(), FormatEncoding::RLE_DICTIONARY)147}148}149150/// A [`DataPage`] is an uncompressed, encoded representation of a Parquet data page. It holds actual data151/// and thus cloning it is expensive.152#[derive(Debug, Clone)]153pub struct DataPage {154pub(super) header: DataPageHeader,155pub(super) buffer: CowBuffer,156pub descriptor: Descriptor,157pub num_rows: Option<usize>,158}159160impl DataPage {161pub fn new(162header: DataPageHeader,163buffer: CowBuffer,164descriptor: Descriptor,165num_rows: usize,166) -> Self {167Self {168header,169buffer,170descriptor,171num_rows: Some(num_rows),172}173}174175pub(crate) fn new_read(176header: DataPageHeader,177buffer: CowBuffer,178descriptor: Descriptor,179) -> Self {180Self {181header,182buffer,183descriptor,184num_rows: None,185}186}187188pub fn header(&self) -> &DataPageHeader {189&self.header190}191192pub fn buffer(&self) -> &[u8] {193&self.buffer194}195196/// Returns a mutable reference to the internal buffer.197/// Useful to recover the buffer after the page has been decoded.198pub fn buffer_mut(&mut self) -> &mut Vec<u8> {199self.buffer.to_mut()200}201202pub fn num_values(&self) -> usize {203self.header.num_values()204}205206pub fn null_count(&self) -> Option<usize> {207self.header.null_count()208}209210pub fn num_rows(&self) -> Option<usize> {211self.num_rows212}213214pub fn encoding(&self) -> Encoding {215match &self.header {216DataPageHeader::V1(d) => d.encoding(),217DataPageHeader::V2(d) => d.encoding(),218}219}220221pub fn definition_level_encoding(&self) -> Encoding {222match &self.header {223DataPageHeader::V1(d) => d.definition_level_encoding(),224DataPageHeader::V2(_) => Encoding::Rle,225}226}227228pub fn repetition_level_encoding(&self) -> Encoding {229match &self.header {230DataPageHeader::V1(d) => d.repetition_level_encoding(),231DataPageHeader::V2(_) => Encoding::Rle,232}233}234235/// Decodes the raw statistics into a statistics236pub fn statistics(&self) -> Option<ParquetResult<Statistics>> {237match &self.header {238DataPageHeader::V1(d) => d239.statistics240.as_ref()241.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),242DataPageHeader::V2(d) => d243.statistics244.as_ref()245.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),246}247}248}249250/// A [`Page`] is an uncompressed, encoded representation of a Parquet page. It may hold actual data251/// and thus cloning it may be expensive.252#[derive(Debug)]253#[allow(clippy::large_enum_variant)]254pub enum Page {255/// A [`DataPage`]256Data(DataPage),257/// A [`DictPage`]258Dict(DictPage),259}260261impl Page {262pub(crate) fn buffer_mut(&mut self) -> &mut Vec<u8> {263match self {264Self::Data(page) => page.buffer.to_mut(),265Self::Dict(page) => page.buffer.to_mut(),266}267}268}269270/// A [`CompressedPage`] is a compressed, encoded representation of a Parquet page. It holds actual data271/// and thus cloning it is expensive.272#[derive(Debug)]273#[allow(clippy::large_enum_variant)]274pub enum CompressedPage {275Data(CompressedDataPage),276Dict(CompressedDictPage),277}278279impl CompressedPage {280pub(crate) fn buffer_mut(&mut self) -> &mut Vec<u8> {281match self {282CompressedPage::Data(page) => page.buffer.to_mut(),283CompressedPage::Dict(page) => page.buffer.to_mut(),284}285}286287pub(crate) fn compression(&self) -> Compression {288match self {289CompressedPage::Data(page) => page.compression(),290CompressedPage::Dict(page) => page.compression(),291}292}293294pub(crate) fn num_values(&self) -> usize {295match self {296CompressedPage::Data(page) => page.num_values(),297CompressedPage::Dict(_) => 0,298}299}300301pub(crate) fn num_rows(&self) -> Option<usize> {302match self {303CompressedPage::Data(page) => page.num_rows(),304CompressedPage::Dict(_) => Some(0),305}306}307}308309/// An uncompressed, encoded dictionary page.310#[derive(Debug, Clone)]311pub struct DictPage {312pub buffer: CowBuffer,313pub num_values: usize,314pub is_sorted: bool,315}316317impl DictPage {318pub fn new(buffer: CowBuffer, num_values: usize, is_sorted: bool) -> Self {319Self {320buffer,321num_values,322is_sorted,323}324}325}326327/// A compressed, encoded dictionary page.328#[derive(Debug)]329pub struct CompressedDictPage {330pub(crate) buffer: CowBuffer,331compression: Compression,332pub(crate) num_values: usize,333pub(crate) uncompressed_page_size: usize,334pub is_sorted: bool,335}336337impl CompressedDictPage {338pub fn new(339buffer: CowBuffer,340compression: Compression,341uncompressed_page_size: usize,342num_values: usize,343is_sorted: bool,344) -> Self {345Self {346buffer,347compression,348uncompressed_page_size,349num_values,350is_sorted,351}352}353354/// The compression of the data in this page.355pub fn compression(&self) -> Compression {356self.compression357}358}359360pub struct EncodedSplitBuffer<'a> {361/// Encoded Repetition Levels362pub rep: &'a [u8],363/// Encoded Definition Levels364pub def: &'a [u8],365/// Encoded Values366pub values: &'a [u8],367}368369/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values) for v1 pages.370#[inline]371pub fn split_buffer_v1(372buffer: &[u8],373has_rep: bool,374has_def: bool,375) -> ParquetResult<EncodedSplitBuffer<'_>> {376let (rep, buffer) = if has_rep {377let level_buffer_length = get_length(buffer).ok_or_else(|| {378ParquetError::oos(379"The number of bytes declared in v1 rep levels is higher than the page size",380)381})?;382383if buffer.len() < level_buffer_length + 4 {384return Err(ParquetError::oos(385"The number of bytes declared in v1 rep levels is higher than the page size",386));387}388389buffer[4..].split_at(level_buffer_length)390} else {391(&[] as &[u8], buffer)392};393394let (def, buffer) = if has_def {395let level_buffer_length = get_length(buffer).ok_or_else(|| {396ParquetError::oos(397"The number of bytes declared in v1 def levels is higher than the page size",398)399})?;400401if buffer.len() < level_buffer_length + 4 {402return Err(ParquetError::oos(403"The number of bytes declared in v1 def levels is higher than the page size",404));405}406407buffer[4..].split_at(level_buffer_length)408} else {409(&[] as &[u8], buffer)410};411412Ok(EncodedSplitBuffer {413rep,414def,415values: buffer,416})417}418419/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values) for v2 pages.420pub fn split_buffer_v2(421buffer: &[u8],422rep_level_buffer_length: usize,423def_level_buffer_length: usize,424) -> ParquetResult<EncodedSplitBuffer<'_>> {425let (rep, buffer) = buffer.split_at(rep_level_buffer_length);426let (def, values) = buffer.split_at(def_level_buffer_length);427428Ok(EncodedSplitBuffer { rep, def, values })429}430431/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values).432pub fn split_buffer(page: &DataPage) -> ParquetResult<EncodedSplitBuffer<'_>> {433match page.header() {434DataPageHeader::V1(_) => split_buffer_v1(435page.buffer(),436page.descriptor.max_rep_level > 0,437page.descriptor.max_def_level > 0,438),439DataPageHeader::V2(header) => {440let def_level_buffer_length: usize = header.definition_levels_byte_length.try_into()?;441let rep_level_buffer_length: usize = header.repetition_levels_byte_length.try_into()?;442split_buffer_v2(443page.buffer(),444rep_level_buffer_length,445def_level_buffer_length,446)447},448}449}450451452