Path: blob/main/crates/polars-parquet/src/parquet/write/page.rs
8512 views
use std::io::Write;12#[cfg(feature = "async")]3use futures::{AsyncWrite, AsyncWriteExt};4use polars_parquet_format::thrift::protocol::TCompactOutputProtocol;5#[cfg(feature = "async")]6use polars_parquet_format::thrift::protocol::TCompactOutputStreamProtocol;7use polars_parquet_format::{DictionaryPageHeader, Encoding, PageType};89use crate::parquet::compression::Compression;10use crate::parquet::error::{ParquetError, ParquetResult};11use crate::parquet::page::{12CompressedDataPage, CompressedDictPage, CompressedPage, DataPageHeader, ParquetPageHeader,13};14use crate::parquet::statistics::Statistics;1516pub(crate) fn is_data_page(page: &PageWriteSpec) -> bool {17page.header.type_ == PageType::DATA_PAGE || page.header.type_ == PageType::DATA_PAGE_V218}1920pub(crate) fn is_dict_page(page: &PageWriteSpec) -> bool {21page.header.type_ == PageType::DICTIONARY_PAGE22}2324fn maybe_bytes(uncompressed: usize, compressed: usize) -> ParquetResult<(i32, i32)> {25let uncompressed_page_size: i32 = uncompressed.try_into().map_err(|_| {26ParquetError::oos(format!(27"A page can only contain i32::MAX uncompressed bytes. This one contains {uncompressed}"28))29})?;3031let compressed_page_size: i32 = compressed.try_into().map_err(|_| {32ParquetError::oos(format!(33"A page can only contain i32::MAX compressed bytes. This one contains {compressed}"34))35})?;3637Ok((uncompressed_page_size, compressed_page_size))38}3940/// Contains page write metrics.41pub struct PageWriteSpec {42pub header: ParquetPageHeader,43#[allow(dead_code)]44pub num_values: usize,45/// The number of actual rows. For non-nested values, this is equal to the number of values.46pub num_rows: usize,47pub header_size: u64,48pub offset: u64,49pub bytes_written: u64,50pub compression: Compression,51pub statistics: Option<Statistics>,52}5354pub fn write_page<W: Write>(55writer: &mut W,56offset: u64,57compressed_page: &CompressedPage,58) -> ParquetResult<PageWriteSpec> {59let num_values = compressed_page.num_values();60let num_rows = compressed_page61.num_rows()62.expect("We should have num_rows when we are writing");6364let header = match &compressed_page {65CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),66CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),67}?;6869let header_size = write_page_header(writer, &header)?;70let mut bytes_written = header_size;7172bytes_written += match &compressed_page {73CompressedPage::Data(compressed_page) => {74writer.write_all(&compressed_page.buffer)?;75compressed_page.buffer.len() as u6476},77CompressedPage::Dict(compressed_page) => {78writer.write_all(&compressed_page.buffer)?;79compressed_page.buffer.len() as u6480},81};8283let statistics = match &compressed_page {84CompressedPage::Data(compressed_page) => compressed_page.statistics().transpose()?,85CompressedPage::Dict(_) => None,86};8788Ok(PageWriteSpec {89header,90header_size,91offset,92bytes_written,93compression: compressed_page.compression(),94statistics,95num_values,96num_rows,97})98}99100#[cfg(feature = "async")]101#[cfg_attr(docsrs, doc(cfg(feature = "async")))]102pub async fn write_page_async<W: AsyncWrite + Unpin + Send>(103writer: &mut W,104offset: u64,105compressed_page: &CompressedPage,106) -> ParquetResult<PageWriteSpec> {107let num_values = compressed_page.num_values();108let num_rows = compressed_page109.num_rows()110.expect("We should have the num_rows when we are writing");111112let header = match &compressed_page {113CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),114CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),115}?;116117let header_size = write_page_header_async(writer, &header).await?;118let mut bytes_written = header_size as u64;119120bytes_written += match &compressed_page {121CompressedPage::Data(compressed_page) => {122writer.write_all(&compressed_page.buffer).await?;123compressed_page.buffer.len() as u64124},125CompressedPage::Dict(compressed_page) => {126writer.write_all(&compressed_page.buffer).await?;127compressed_page.buffer.len() as u64128},129};130131let statistics = match &compressed_page {132CompressedPage::Data(compressed_page) => compressed_page.statistics().transpose()?,133CompressedPage::Dict(_) => None,134};135136Ok(PageWriteSpec {137header,138header_size,139offset,140bytes_written,141compression: compressed_page.compression(),142statistics,143num_rows,144num_values,145})146}147148fn assemble_data_page_header(page: &CompressedDataPage) -> ParquetResult<ParquetPageHeader> {149let (uncompressed_page_size, compressed_page_size) =150maybe_bytes(page.uncompressed_size(), page.compressed_size())?;151152let mut page_header = ParquetPageHeader {153type_: match page.header() {154DataPageHeader::V1(_) => PageType::DATA_PAGE,155DataPageHeader::V2(_) => PageType::DATA_PAGE_V2,156},157uncompressed_page_size,158compressed_page_size,159crc: None,160data_page_header: None,161index_page_header: None,162dictionary_page_header: None,163data_page_header_v2: None,164};165166match page.header() {167DataPageHeader::V1(header) => {168page_header.data_page_header = Some(header.clone());169},170DataPageHeader::V2(header) => {171page_header.data_page_header_v2 = Some(header.clone());172},173}174Ok(page_header)175}176177fn assemble_dict_page_header(page: &CompressedDictPage) -> ParquetResult<ParquetPageHeader> {178let (uncompressed_page_size, compressed_page_size) =179maybe_bytes(page.uncompressed_page_size, page.buffer.len())?;180181let num_values: i32 = page.num_values.try_into().map_err(|_| {182ParquetError::oos(format!(183"A dictionary page can only contain i32::MAX items. This one contains {}",184page.num_values185))186})?;187188Ok(ParquetPageHeader {189type_: PageType::DICTIONARY_PAGE,190uncompressed_page_size,191compressed_page_size,192crc: None,193data_page_header: None,194index_page_header: None,195dictionary_page_header: Some(DictionaryPageHeader {196num_values,197encoding: Encoding::PLAIN,198is_sorted: None,199}),200data_page_header_v2: None,201})202}203204/// writes the page header into `writer`, returning the number of bytes used in the process.205fn write_page_header<W: Write>(206mut writer: &mut W,207header: &ParquetPageHeader,208) -> ParquetResult<u64> {209let mut protocol = TCompactOutputProtocol::new(&mut writer);210Ok(header.write_to_out_protocol(&mut protocol)? as u64)211}212213#[cfg(feature = "async")]214#[cfg_attr(docsrs, doc(cfg(feature = "async")))]215/// writes the page header into `writer`, returning the number of bytes used in the process.216async fn write_page_header_async<W: AsyncWrite + Unpin + Send>(217mut writer: &mut W,218header: &ParquetPageHeader,219) -> ParquetResult<u64> {220let mut protocol = TCompactOutputStreamProtocol::new(&mut writer);221Ok(header.write_to_out_stream_protocol(&mut protocol).await? as u64)222}223224#[cfg(test)]225mod tests {226use super::*;227use crate::parquet::CowBuffer;228229#[test]230fn dict_too_large() {231let page = CompressedDictPage::new(232CowBuffer::Owned(vec![]),233Compression::Uncompressed,234i32::MAX as usize + 1,235100,236false,237);238assert!(assemble_dict_page_header(&page).is_err());239}240241#[test]242fn dict_too_many_values() {243let page = CompressedDictPage::new(244CowBuffer::Owned(vec![]),245Compression::Uncompressed,2460,247i32::MAX as usize + 1,248false,249);250assert!(assemble_dict_page_header(&page).is_err());251}252}253254255