Path: blob/main/crates/polars-parquet/src/parquet/write/page.rs
6940 views
use std::io::Write;12#[cfg(feature = "async")]3use futures::{AsyncWrite, AsyncWriteExt};4use polars_parquet_format::thrift::protocol::TCompactOutputProtocol;5#[cfg(feature = "async")]6use polars_parquet_format::thrift::protocol::TCompactOutputStreamProtocol;7use polars_parquet_format::{DictionaryPageHeader, Encoding, PageType};89use crate::parquet::compression::Compression;10use crate::parquet::error::{ParquetError, ParquetResult};11use crate::parquet::page::{12CompressedDataPage, CompressedDictPage, CompressedPage, DataPageHeader, ParquetPageHeader,13};14use crate::parquet::statistics::Statistics;1516pub(crate) fn is_data_page(page: &PageWriteSpec) -> bool {17page.header.type_ == PageType::DATA_PAGE || page.header.type_ == PageType::DATA_PAGE_V218}1920fn maybe_bytes(uncompressed: usize, compressed: usize) -> ParquetResult<(i32, i32)> {21let uncompressed_page_size: i32 = uncompressed.try_into().map_err(|_| {22ParquetError::oos(format!(23"A page can only contain i32::MAX uncompressed bytes. This one contains {uncompressed}"24))25})?;2627let compressed_page_size: i32 = compressed.try_into().map_err(|_| {28ParquetError::oos(format!(29"A page can only contain i32::MAX compressed bytes. This one contains {compressed}"30))31})?;3233Ok((uncompressed_page_size, compressed_page_size))34}3536/// Contains page write metrics.37pub struct PageWriteSpec {38pub header: ParquetPageHeader,39#[allow(dead_code)]40pub num_values: usize,41/// The number of actual rows. For non-nested values, this is equal to the number of values.42pub num_rows: usize,43pub header_size: u64,44pub offset: u64,45pub bytes_written: u64,46pub compression: Compression,47pub statistics: Option<Statistics>,48}4950pub fn write_page<W: Write>(51writer: &mut W,52offset: u64,53compressed_page: &CompressedPage,54) -> ParquetResult<PageWriteSpec> {55let num_values = compressed_page.num_values();56let num_rows = compressed_page57.num_rows()58.expect("We should have num_rows when we are writing");5960let header = match &compressed_page {61CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),62CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),63}?;6465let header_size = write_page_header(writer, &header)?;66let mut bytes_written = header_size;6768bytes_written += match &compressed_page {69CompressedPage::Data(compressed_page) => {70writer.write_all(&compressed_page.buffer)?;71compressed_page.buffer.len() as u6472},73CompressedPage::Dict(compressed_page) => {74writer.write_all(&compressed_page.buffer)?;75compressed_page.buffer.len() as u6476},77};7879let statistics = match &compressed_page {80CompressedPage::Data(compressed_page) => compressed_page.statistics().transpose()?,81CompressedPage::Dict(_) => None,82};8384Ok(PageWriteSpec {85header,86header_size,87offset,88bytes_written,89compression: compressed_page.compression(),90statistics,91num_values,92num_rows,93})94}9596#[cfg(feature = "async")]97#[cfg_attr(docsrs, doc(cfg(feature = "async")))]98pub async fn write_page_async<W: AsyncWrite + Unpin + Send>(99writer: &mut W,100offset: u64,101compressed_page: &CompressedPage,102) -> ParquetResult<PageWriteSpec> {103let num_values = compressed_page.num_values();104let num_rows = compressed_page105.num_rows()106.expect("We should have the num_rows when we are writing");107108let header = match &compressed_page {109CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),110CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),111}?;112113let header_size = write_page_header_async(writer, &header).await?;114let mut bytes_written = header_size as u64;115116bytes_written += match &compressed_page {117CompressedPage::Data(compressed_page) => {118writer.write_all(&compressed_page.buffer).await?;119compressed_page.buffer.len() as u64120},121CompressedPage::Dict(compressed_page) => {122writer.write_all(&compressed_page.buffer).await?;123compressed_page.buffer.len() as u64124},125};126127let statistics = match &compressed_page {128CompressedPage::Data(compressed_page) => compressed_page.statistics().transpose()?,129CompressedPage::Dict(_) => None,130};131132Ok(PageWriteSpec {133header,134header_size,135offset,136bytes_written,137compression: compressed_page.compression(),138statistics,139num_rows,140num_values,141})142}143144fn assemble_data_page_header(page: &CompressedDataPage) -> ParquetResult<ParquetPageHeader> {145let (uncompressed_page_size, compressed_page_size) =146maybe_bytes(page.uncompressed_size(), page.compressed_size())?;147148let mut page_header = ParquetPageHeader {149type_: match page.header() {150DataPageHeader::V1(_) => PageType::DATA_PAGE,151DataPageHeader::V2(_) => PageType::DATA_PAGE_V2,152},153uncompressed_page_size,154compressed_page_size,155crc: None,156data_page_header: None,157index_page_header: None,158dictionary_page_header: None,159data_page_header_v2: None,160};161162match page.header() {163DataPageHeader::V1(header) => {164page_header.data_page_header = Some(header.clone());165},166DataPageHeader::V2(header) => {167page_header.data_page_header_v2 = Some(header.clone());168},169}170Ok(page_header)171}172173fn assemble_dict_page_header(page: &CompressedDictPage) -> ParquetResult<ParquetPageHeader> {174let (uncompressed_page_size, compressed_page_size) =175maybe_bytes(page.uncompressed_page_size, page.buffer.len())?;176177let num_values: i32 = page.num_values.try_into().map_err(|_| {178ParquetError::oos(format!(179"A dictionary page can only contain i32::MAX items. This one contains {}",180page.num_values181))182})?;183184Ok(ParquetPageHeader {185type_: PageType::DICTIONARY_PAGE,186uncompressed_page_size,187compressed_page_size,188crc: None,189data_page_header: None,190index_page_header: None,191dictionary_page_header: Some(DictionaryPageHeader {192num_values,193encoding: Encoding::PLAIN,194is_sorted: None,195}),196data_page_header_v2: None,197})198}199200/// writes the page header into `writer`, returning the number of bytes used in the process.201fn write_page_header<W: Write>(202mut writer: &mut W,203header: &ParquetPageHeader,204) -> ParquetResult<u64> {205let mut protocol = TCompactOutputProtocol::new(&mut writer);206Ok(header.write_to_out_protocol(&mut protocol)? as u64)207}208209#[cfg(feature = "async")]210#[cfg_attr(docsrs, doc(cfg(feature = "async")))]211/// writes the page header into `writer`, returning the number of bytes used in the process.212async fn write_page_header_async<W: AsyncWrite + Unpin + Send>(213mut writer: &mut W,214header: &ParquetPageHeader,215) -> ParquetResult<u64> {216let mut protocol = TCompactOutputStreamProtocol::new(&mut writer);217Ok(header.write_to_out_stream_protocol(&mut protocol).await? as u64)218}219220#[cfg(test)]221mod tests {222use super::*;223use crate::parquet::CowBuffer;224225#[test]226fn dict_too_large() {227let page = CompressedDictPage::new(228CowBuffer::Owned(vec![]),229Compression::Uncompressed,230i32::MAX as usize + 1,231100,232false,233);234assert!(assemble_dict_page_header(&page).is_err());235}236237#[test]238fn dict_too_many_values() {239let page = CompressedDictPage::new(240CowBuffer::Owned(vec![]),241Compression::Uncompressed,2420,243i32::MAX as usize + 1,244false,245);246assert!(assemble_dict_page_header(&page).is_err());247}248}249250251