Path: blob/main/crates/polars-io/src/parquet/write/writer.rs
8485 views
use std::io::Write;1use std::sync::Mutex;23use polars_buffer::Buffer;4use polars_core::frame::chunk_df_for_writing;5use polars_core::prelude::*;6use polars_parquet::write::{7CompressionOptions, Encoding, FileWriter, StatisticsOptions, Version, WriteOptions,8get_dtype_encoding, to_parquet_schema,9};1011use super::batched_writer::BatchedWriter;12use super::options::ParquetCompression;13use super::{KeyValueMetadata, ParquetWriteOptions};14use crate::shared::schema_to_arrow_checked;1516impl ParquetWriteOptions {17pub fn to_writer<F>(&self, f: F) -> ParquetWriter<F>18where19F: Write,20{21ParquetWriter::new(f)22.with_compression(self.compression)23.with_statistics(self.statistics)24.with_row_group_size(self.row_group_size)25.with_data_page_size(self.data_page_size)26.with_key_value_metadata(self.key_value_metadata.clone())27}28}2930/// Write a DataFrame to Parquet format.31#[must_use]32pub struct ParquetWriter<W> {33writer: W,34/// Data page compression35compression: CompressionOptions,36/// Compute and write column statistics.37statistics: StatisticsOptions,38/// if `None` will be 512^2 rows39row_group_size: Option<usize>,40/// if `None` will be 1024^2 bytes41data_page_size: Option<usize>,42/// Serialize columns in parallel43parallel: bool,44/// Custom file-level key value metadata45key_value_metadata: Option<KeyValueMetadata>,46/// Context info for the Parquet file being written.47context_info: Option<PlHashMap<String, String>>,48}4950impl<W> ParquetWriter<W>51where52W: Write,53{54/// Create a new writer55pub fn new(writer: W) -> Self56where57W: Write,58{59ParquetWriter {60writer,61compression: ParquetCompression::default().into(),62statistics: StatisticsOptions::default(),63row_group_size: None,64data_page_size: None,65parallel: true,66key_value_metadata: None,67context_info: None,68}69}7071/// Set the compression used. Defaults to `Zstd`.72///73/// The default compression `Zstd` has very good performance, but may not yet been supported74/// by older readers. If you want more compatibility guarantees, consider using `Snappy`.75pub fn with_compression(mut self, compression: ParquetCompression) -> Self {76self.compression = compression.into();77self78}7980/// Compute and write statistic81pub fn with_statistics(mut self, statistics: StatisticsOptions) -> Self {82self.statistics = statistics;83self84}8586/// Set the row group size (in number of rows) during writing. This can reduce memory pressure and improve87/// writing performance.88pub fn with_row_group_size(mut self, size: Option<usize>) -> Self {89self.row_group_size = size;90self91}9293/// Sets the maximum bytes size of a data page. If `None` will be 1024^2 bytes.94pub fn with_data_page_size(mut self, limit: Option<usize>) -> Self {95self.data_page_size = limit;96self97}9899/// Serialize columns in parallel100pub fn set_parallel(mut self, parallel: bool) -> Self {101self.parallel = parallel;102self103}104105/// Set custom file-level key value metadata for the Parquet file106pub fn with_key_value_metadata(mut self, key_value_metadata: Option<KeyValueMetadata>) -> Self {107self.key_value_metadata = key_value_metadata;108self109}110111/// Set context information for the writer112pub fn with_context_info(mut self, context_info: Option<PlHashMap<String, String>>) -> Self {113self.context_info = context_info;114self115}116117pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {118let schema = schema_to_arrow_checked(schema, CompatLevel::newest(), "parquet")?;119let parquet_schema = to_parquet_schema(&schema)?;120let encodings = get_encodings(&schema);121let options = self.materialize_options();122let writer = Mutex::new(FileWriter::try_new(self.writer, schema, options)?);123124Ok(BatchedWriter {125writer,126parquet_schema,127encodings,128options,129parallel: self.parallel,130key_value_metadata: self.key_value_metadata,131})132}133134fn materialize_options(&self) -> WriteOptions {135WriteOptions {136statistics: self.statistics,137compression: self.compression,138version: Version::V1,139data_page_size: self.data_page_size,140}141}142143/// Write the given DataFrame in the writer `W`.144/// Returns the total size of the file.145pub fn finish(self, df: &mut DataFrame) -> PolarsResult<u64> {146let chunked_df = chunk_df_for_writing(df, self.row_group_size.unwrap_or(512 * 512))?;147let mut batched = self.batched(chunked_df.schema())?;148batched.write_batch(&chunked_df)?;149batched.finish()150}151}152153pub fn get_encodings(schema: &ArrowSchema) -> Buffer<Vec<Encoding>> {154schema155.iter_values()156.map(|f| get_dtype_encoding(&f.dtype))157.collect()158}159160161