Path: blob/main/crates/polars-parquet/src/parquet/write/stream.rs
6940 views
use std::io::Write;12use futures::{AsyncWrite, AsyncWriteExt};3use polars_parquet_format::RowGroup;4use polars_parquet_format::thrift::protocol::TCompactOutputStreamProtocol;56use super::row_group::write_row_group_async;7use super::{RowGroupIterColumns, WriteOptions};8use crate::parquet::error::{ParquetError, ParquetResult};9use crate::parquet::metadata::{KeyValue, SchemaDescriptor};10use crate::parquet::write::State;11use crate::parquet::write::indexes::{write_column_index_async, write_offset_index_async};12use crate::parquet::write::page::PageWriteSpec;13use crate::parquet::{FOOTER_SIZE, PARQUET_MAGIC};1415async fn start_file<W: AsyncWrite + Unpin>(writer: &mut W) -> ParquetResult<u64> {16writer.write_all(&PARQUET_MAGIC).await?;17Ok(PARQUET_MAGIC.len() as u64)18}1920async fn end_file<W: AsyncWrite + Unpin + Send>(21mut writer: &mut W,22metadata: polars_parquet_format::FileMetaData,23) -> ParquetResult<u64> {24// Write file metadata25let mut protocol = TCompactOutputStreamProtocol::new(&mut writer);26let metadata_len = metadata.write_to_out_stream_protocol(&mut protocol).await? as i32;2728// Write footer29let metadata_bytes = metadata_len.to_le_bytes();30let mut footer_buffer = [0u8; FOOTER_SIZE as usize];31(0..4).for_each(|i| {32footer_buffer[i] = metadata_bytes[i];33});3435(&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?;36writer.write_all(&footer_buffer).await?;37writer.flush().await?;38Ok(metadata_len as u64 + FOOTER_SIZE)39}4041/// An interface to write a parquet file asynchronously.42/// Use `start` to write the header, `write` to write a row group,43/// and `end` to write the footer.44pub struct FileStreamer<W: AsyncWrite + Unpin + Send> {45writer: W,46schema: SchemaDescriptor,47options: WriteOptions,48created_by: Option<String>,4950offset: u64,51row_groups: Vec<RowGroup>,52page_specs: Vec<Vec<Vec<PageWriteSpec>>>,53/// Used to store the current state for writing the file54state: State,55}5657// Accessors58impl<W: AsyncWrite + Unpin + Send> FileStreamer<W> {59/// The options assigned to the file60pub fn options(&self) -> &WriteOptions {61&self.options62}6364/// The [`SchemaDescriptor`] assigned to this file65pub fn schema(&self) -> &SchemaDescriptor {66&self.schema67}68}6970impl<W: AsyncWrite + Unpin + Send> FileStreamer<W> {71/// Returns a new [`FileStreamer`].72pub fn new(73writer: W,74schema: SchemaDescriptor,75options: WriteOptions,76created_by: Option<String>,77) -> Self {78Self {79writer,80schema,81options,82created_by,83offset: 0,84row_groups: vec![],85page_specs: vec![],86state: State::Initialised,87}88}8990/// Writes the header of the file.91///92/// This is automatically called by [`Self::write`] if not called following [`Self::new`].93///94/// # Errors95/// Returns an error if data has been written to the file.96async fn start(&mut self) -> ParquetResult<()> {97if self.offset == 0 {98self.offset = start_file(&mut self.writer).await? as u64;99self.state = State::Started;100Ok(())101} else {102Err(ParquetError::InvalidParameter(103"Start cannot be called twice".to_string(),104))105}106}107108/// Writes a row group to the file.109pub async fn write<E>(&mut self, row_group: RowGroupIterColumns<'_, E>) -> ParquetResult<()>110where111ParquetError: From<E>,112E: std::error::Error,113{114if self.offset == 0 {115self.start().await?;116}117118let ordinal = self.row_groups.len();119let (group, specs, size) = write_row_group_async(120&mut self.writer,121self.offset,122self.schema.columns(),123row_group,124ordinal,125)126.await?;127self.offset += size;128self.row_groups.push(group);129self.page_specs.push(specs);130Ok(())131}132133/// Writes the footer of the parquet file. Returns the total size of the file and the134/// underlying writer.135pub async fn end(&mut self, key_value_metadata: Option<Vec<KeyValue>>) -> ParquetResult<u64> {136if self.offset == 0 {137self.start().await?;138}139140if self.state != State::Started {141return Err(ParquetError::InvalidParameter(142"End cannot be called twice".to_string(),143));144}145// compute file stats146let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();147148if self.options.write_statistics {149// write column indexes (require page statistics)150for (group, pages) in self.row_groups.iter_mut().zip(self.page_specs.iter()) {151for (column, pages) in group.columns.iter_mut().zip(pages.iter()) {152let offset = self.offset;153column.column_index_offset = Some(offset as i64);154self.offset += write_column_index_async(&mut self.writer, pages).await?;155let length = self.offset - offset;156column.column_index_length = Some(length as i32);157}158}159};160161// write offset index162for (group, pages) in self.row_groups.iter_mut().zip(self.page_specs.iter()) {163for (column, pages) in group.columns.iter_mut().zip(pages.iter()) {164let offset = self.offset;165column.offset_index_offset = Some(offset as i64);166self.offset += write_offset_index_async(&mut self.writer, pages).await?;167column.offset_index_length = Some((self.offset - offset) as i32);168}169}170171let metadata = polars_parquet_format::FileMetaData::new(172self.options.version.into(),173self.schema.clone().into_thrift(),174num_rows,175self.row_groups.clone(),176key_value_metadata,177self.created_by.clone(),178None,179None,180None,181);182183let len = end_file(&mut self.writer, metadata).await?;184Ok(self.offset + len)185}186187/// Returns the underlying writer.188pub fn into_inner(self) -> W {189self.writer190}191}192193194