Path: blob/main/crates/polars-parquet/src/parquet/write/file.rs
8512 views
use std::io::Write;12use polars_parquet_format::RowGroup;3use polars_parquet_format::thrift::protocol::TCompactOutputProtocol;45use super::indexes::{write_column_index, write_offset_index};6use super::page::PageWriteSpec;7use super::row_group::write_row_group;8use super::{RowGroupIterColumns, WriteOptions};9use crate::parquet::error::{ParquetError, ParquetResult};10pub use crate::parquet::metadata::KeyValue;11use crate::parquet::metadata::{SchemaDescriptor, ThriftFileMetadata};12use crate::parquet::write::State;13use crate::parquet::{FOOTER_SIZE, PARQUET_MAGIC};1415pub(super) fn start_file<W: Write>(writer: &mut W) -> ParquetResult<u64> {16writer.write_all(&PARQUET_MAGIC)?;17Ok(PARQUET_MAGIC.len() as u64)18}1920pub(super) fn end_file<W: Write>(21mut writer: &mut W,22metadata: &ThriftFileMetadata,23) -> ParquetResult<u64> {24// Write metadata25let mut protocol = TCompactOutputProtocol::new(&mut writer);26let metadata_len = metadata.write_to_out_protocol(&mut protocol)? as i32;2728// Write footer29let metadata_bytes = metadata_len.to_le_bytes();30let mut footer_buffer = [0u8; FOOTER_SIZE as usize];31(0..4).for_each(|i| {32footer_buffer[i] = metadata_bytes[i];33});3435(&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?;36writer.write_all(&footer_buffer)?;37writer.flush()?;38Ok(metadata_len as u64 + FOOTER_SIZE)39}4041fn create_column_orders(schema_desc: &SchemaDescriptor) -> Vec<polars_parquet_format::ColumnOrder> {42// We only include ColumnOrder for leaf nodes.43// Currently only supported ColumnOrder is TypeDefinedOrder so we set this44// for all leaf nodes.45// Even if the column has an undefined sort order, such as INTERVAL, this46// is still technically the defined TYPEORDER so it should still be set.47(0..schema_desc.columns().len())48.map(|_| {49polars_parquet_format::ColumnOrder::TYPEORDER(50polars_parquet_format::TypeDefinedOrder {},51)52})53.collect()54}5556/// An interface to write a parquet file.57/// Use `start` to write the header, `write` to write a row group,58/// and `end` to write the footer.59pub struct FileWriter<W: Write> {60writer: W,61schema: SchemaDescriptor,62options: WriteOptions,63created_by: Option<String>,6465offset: u64,66row_groups: Vec<RowGroup>,67page_specs: Vec<Vec<Vec<PageWriteSpec>>>,68/// Used to store the current state for writing the file69state: State,70// when the file is written, metadata becomes available71metadata: Option<ThriftFileMetadata>,72}7374/// Writes a parquet file containing only the header and footer75///76/// This is used to write the metadata as a separate Parquet file, usually when data77/// is partitioned across multiple files.78///79/// Note: Recall that when combining row groups from [`ThriftFileMetadata`], the `file_path` on each80/// of their column chunks must be updated with their path relative to where they are written to.81pub fn write_metadata_sidecar<W: Write>(82writer: &mut W,83metadata: &ThriftFileMetadata,84) -> ParquetResult<u64> {85let mut len = start_file(writer)?;86len += end_file(writer, metadata)?;87Ok(len)88}8990// Accessors91impl<W: Write> FileWriter<W> {92/// The options assigned to the file93pub fn options(&self) -> &WriteOptions {94&self.options95}9697/// The [`SchemaDescriptor`] assigned to this file98pub fn schema(&self) -> &SchemaDescriptor {99&self.schema100}101102/// Returns the [`ThriftFileMetadata`]. This is Some iff the [`Self::end`] has been called.103///104/// This is used to write the metadata as a separate Parquet file, usually when data105/// is partitioned across multiple files106pub fn metadata(&self) -> Option<&ThriftFileMetadata> {107self.metadata.as_ref()108}109}110111impl<W: Write> FileWriter<W> {112/// Returns a new [`FileWriter`].113pub fn new(114writer: W,115schema: SchemaDescriptor,116options: WriteOptions,117created_by: Option<String>,118) -> Self {119Self {120writer,121schema,122options,123created_by,124offset: 0,125row_groups: vec![],126page_specs: vec![],127state: State::Initialised,128metadata: None,129}130}131132/// Writes the header of the file.133///134/// This is automatically called by [`Self::write`] if not called following [`Self::new`].135///136/// # Errors137/// Returns an error if data has been written to the file.138fn start(&mut self) -> ParquetResult<()> {139if self.offset == 0 {140self.offset = start_file(&mut self.writer)?;141self.state = State::Started;142Ok(())143} else {144Err(ParquetError::InvalidParameter(145"Start cannot be called twice".to_string(),146))147}148}149150/// Writes a row group to the file.151///152/// This call is IO-bounded153pub fn write<E>(154&mut self,155num_rows: u64,156row_group: RowGroupIterColumns<'_, E>,157) -> ParquetResult<()>158where159ParquetError: From<E>,160E: std::error::Error,161{162if self.offset == 0 {163self.start()?;164}165let ordinal = self.row_groups.len();166let (group, specs, size) = write_row_group(167&mut self.writer,168num_rows,169self.offset,170self.schema.columns(),171row_group,172ordinal,173)?;174self.offset += size;175self.row_groups.push(group);176self.page_specs.push(specs);177Ok(())178}179180/// Writes the footer of the parquet file. Returns the total size of the file and the181/// underlying writer.182pub fn end(&mut self, key_value_metadata: Option<Vec<KeyValue>>) -> ParquetResult<u64> {183if self.offset == 0 {184self.start()?;185}186187if self.state != State::Started {188return Err(ParquetError::InvalidParameter(189"End cannot be called twice".to_string(),190));191}192// compute file stats193let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();194195if self.options.write_statistics {196// write column indexes (require page statistics)197self.row_groups198.iter_mut()199.zip(self.page_specs.iter())200.try_for_each(|(group, pages)| {201group.columns.iter_mut().zip(pages.iter()).try_for_each(202|(column, pages)| {203let offset = self.offset;204column.column_index_offset = Some(offset as i64);205self.offset += write_column_index(&mut self.writer, pages)?;206let length = self.offset - offset;207column.column_index_length = Some(length as i32);208ParquetResult::Ok(())209},210)?;211ParquetResult::Ok(())212})?;213};214215// write offset index216self.row_groups217.iter_mut()218.zip(self.page_specs.iter())219.try_for_each(|(group, pages)| {220group221.columns222.iter_mut()223.zip(pages.iter())224.try_for_each(|(column, pages)| {225let offset = self.offset;226column.offset_index_offset = Some(offset as i64);227self.offset += write_offset_index(&mut self.writer, pages)?;228column.offset_index_length = Some((self.offset - offset) as i32);229ParquetResult::Ok(())230})?;231ParquetResult::Ok(())232})?;233234let metadata = ThriftFileMetadata::new(235self.options.version.into(),236self.schema.clone().into_thrift(),237num_rows,238self.row_groups.clone(),239key_value_metadata,240self.created_by.clone(),241Some(create_column_orders(&self.schema)),242None,243None,244);245246let len = end_file(&mut self.writer, &metadata)?;247self.state = State::Finished;248self.metadata = Some(metadata);249Ok(self.offset + len)250}251252/// Returns the underlying writer.253pub fn into_inner(self) -> W {254self.writer255}256257/// Returns the underlying writer and [`ThriftFileMetadata`]258/// # Panics259/// This function panics if [`Self::end`] has not yet been called260pub fn into_inner_and_metadata(self) -> (W, ThriftFileMetadata) {261(self.writer, self.metadata.expect("File to have ended"))262}263}264265266