Path: blob/main/crates/polars-parquet/src/parquet/write/file.rs
6940 views
use std::io::Write;12use polars_parquet_format::RowGroup;3use polars_parquet_format::thrift::protocol::TCompactOutputProtocol;45use super::indexes::{write_column_index, write_offset_index};6use super::page::PageWriteSpec;7use super::row_group::write_row_group;8use super::{RowGroupIterColumns, WriteOptions};9use crate::parquet::error::{ParquetError, ParquetResult};10pub use crate::parquet::metadata::KeyValue;11use crate::parquet::metadata::{SchemaDescriptor, ThriftFileMetadata};12use crate::parquet::write::State;13use crate::parquet::{FOOTER_SIZE, PARQUET_MAGIC};1415pub(super) fn start_file<W: Write>(writer: &mut W) -> ParquetResult<u64> {16writer.write_all(&PARQUET_MAGIC)?;17Ok(PARQUET_MAGIC.len() as u64)18}1920pub(super) fn end_file<W: Write>(21mut writer: &mut W,22metadata: &ThriftFileMetadata,23) -> ParquetResult<u64> {24// Write metadata25let mut protocol = TCompactOutputProtocol::new(&mut writer);26let metadata_len = metadata.write_to_out_protocol(&mut protocol)? as i32;2728// Write footer29let metadata_bytes = metadata_len.to_le_bytes();30let mut footer_buffer = [0u8; FOOTER_SIZE as usize];31(0..4).for_each(|i| {32footer_buffer[i] = metadata_bytes[i];33});3435(&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?;36writer.write_all(&footer_buffer)?;37writer.flush()?;38Ok(metadata_len as u64 + FOOTER_SIZE)39}4041fn create_column_orders(schema_desc: &SchemaDescriptor) -> Vec<polars_parquet_format::ColumnOrder> {42// We only include ColumnOrder for leaf nodes.43// Currently only supported ColumnOrder is TypeDefinedOrder so we set this44// for all leaf nodes.45// Even if the column has an undefined sort order, such as INTERVAL, this46// is still technically the defined TYPEORDER so it should still be set.47(0..schema_desc.columns().len())48.map(|_| {49polars_parquet_format::ColumnOrder::TYPEORDER(50polars_parquet_format::TypeDefinedOrder {},51)52})53.collect()54}5556/// An interface to write a parquet file.57/// Use `start` to write the header, `write` to write a row group,58/// and `end` to write the footer.59pub struct FileWriter<W: Write> {60writer: W,61schema: SchemaDescriptor,62options: WriteOptions,63created_by: Option<String>,6465offset: u64,66row_groups: Vec<RowGroup>,67page_specs: Vec<Vec<Vec<PageWriteSpec>>>,68/// Used to store the current state for writing the file69state: State,70// when the file is written, metadata becomes available71metadata: Option<ThriftFileMetadata>,72}7374/// Writes a parquet file containing only the header and footer75///76/// This is used to write the metadata as a separate Parquet file, usually when data77/// is partitioned across multiple files.78///79/// Note: Recall that when combining row groups from [`ThriftFileMetadata`], the `file_path` on each80/// of their column chunks must be updated with their path relative to where they are written to.81pub fn write_metadata_sidecar<W: Write>(82writer: &mut W,83metadata: &ThriftFileMetadata,84) -> ParquetResult<u64> {85let mut len = start_file(writer)?;86len += end_file(writer, metadata)?;87Ok(len)88}8990// Accessors91impl<W: Write> FileWriter<W> {92/// The options assigned to the file93pub fn options(&self) -> &WriteOptions {94&self.options95}9697/// The [`SchemaDescriptor`] assigned to this file98pub fn schema(&self) -> &SchemaDescriptor {99&self.schema100}101102/// Returns the [`ThriftFileMetadata`]. This is Some iff the [`Self::end`] has been called.103///104/// This is used to write the metadata as a separate Parquet file, usually when data105/// is partitioned across multiple files106pub fn metadata(&self) -> Option<&ThriftFileMetadata> {107self.metadata.as_ref()108}109}110111impl<W: Write> FileWriter<W> {112/// Returns a new [`FileWriter`].113pub fn new(114writer: W,115schema: SchemaDescriptor,116options: WriteOptions,117created_by: Option<String>,118) -> Self {119Self {120writer,121schema,122options,123created_by,124offset: 0,125row_groups: vec![],126page_specs: vec![],127state: State::Initialised,128metadata: None,129}130}131132/// Writes the header of the file.133///134/// This is automatically called by [`Self::write`] if not called following [`Self::new`].135///136/// # Errors137/// Returns an error if data has been written to the file.138fn start(&mut self) -> ParquetResult<()> {139if self.offset == 0 {140self.offset = start_file(&mut self.writer)?;141self.state = State::Started;142Ok(())143} else {144Err(ParquetError::InvalidParameter(145"Start cannot be called twice".to_string(),146))147}148}149150/// Writes a row group to the file.151///152/// This call is IO-bounded153pub fn write<E>(&mut self, row_group: RowGroupIterColumns<'_, E>) -> ParquetResult<()>154where155ParquetError: From<E>,156E: std::error::Error,157{158if self.offset == 0 {159self.start()?;160}161let ordinal = self.row_groups.len();162let (group, specs, size) = write_row_group(163&mut self.writer,164self.offset,165self.schema.columns(),166row_group,167ordinal,168)?;169self.offset += size;170self.row_groups.push(group);171self.page_specs.push(specs);172Ok(())173}174175/// Writes the footer of the parquet file. Returns the total size of the file and the176/// underlying writer.177pub fn end(&mut self, key_value_metadata: Option<Vec<KeyValue>>) -> ParquetResult<u64> {178if self.offset == 0 {179self.start()?;180}181182if self.state != State::Started {183return Err(ParquetError::InvalidParameter(184"End cannot be called twice".to_string(),185));186}187// compute file stats188let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();189190if self.options.write_statistics {191// write column indexes (require page statistics)192self.row_groups193.iter_mut()194.zip(self.page_specs.iter())195.try_for_each(|(group, pages)| {196group.columns.iter_mut().zip(pages.iter()).try_for_each(197|(column, pages)| {198let offset = self.offset;199column.column_index_offset = Some(offset as i64);200self.offset += write_column_index(&mut self.writer, pages)?;201let length = self.offset - offset;202column.column_index_length = Some(length as i32);203ParquetResult::Ok(())204},205)?;206ParquetResult::Ok(())207})?;208};209210// write offset index211self.row_groups212.iter_mut()213.zip(self.page_specs.iter())214.try_for_each(|(group, pages)| {215group216.columns217.iter_mut()218.zip(pages.iter())219.try_for_each(|(column, pages)| {220let offset = self.offset;221column.offset_index_offset = Some(offset as i64);222self.offset += write_offset_index(&mut self.writer, pages)?;223column.offset_index_length = Some((self.offset - offset) as i32);224ParquetResult::Ok(())225})?;226ParquetResult::Ok(())227})?;228229let metadata = ThriftFileMetadata::new(230self.options.version.into(),231self.schema.clone().into_thrift(),232num_rows,233self.row_groups.clone(),234key_value_metadata,235self.created_by.clone(),236Some(create_column_orders(&self.schema)),237None,238None,239);240241let len = end_file(&mut self.writer, &metadata)?;242self.state = State::Finished;243self.metadata = Some(metadata);244Ok(self.offset + len)245}246247/// Returns the underlying writer.248pub fn into_inner(self) -> W {249self.writer250}251252/// Returns the underlying writer and [`ThriftFileMetadata`]253/// # Panics254/// This function panics if [`Self::end`] has not yet been called255pub fn into_inner_and_metadata(self) -> (W, ThriftFileMetadata) {256(self.writer, self.metadata.expect("File to have ended"))257}258}259260261