Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/parquet/write/writer.rs
8485 views
1
use std::io::Write;
2
use std::sync::Mutex;
3
4
use polars_buffer::Buffer;
5
use polars_core::frame::chunk_df_for_writing;
6
use polars_core::prelude::*;
7
use polars_parquet::write::{
8
CompressionOptions, Encoding, FileWriter, StatisticsOptions, Version, WriteOptions,
9
get_dtype_encoding, to_parquet_schema,
10
};
11
12
use super::batched_writer::BatchedWriter;
13
use super::options::ParquetCompression;
14
use super::{KeyValueMetadata, ParquetWriteOptions};
15
use crate::shared::schema_to_arrow_checked;
16
17
impl ParquetWriteOptions {
18
pub fn to_writer<F>(&self, f: F) -> ParquetWriter<F>
19
where
20
F: Write,
21
{
22
ParquetWriter::new(f)
23
.with_compression(self.compression)
24
.with_statistics(self.statistics)
25
.with_row_group_size(self.row_group_size)
26
.with_data_page_size(self.data_page_size)
27
.with_key_value_metadata(self.key_value_metadata.clone())
28
}
29
}
30
31
/// Write a DataFrame to Parquet format.
32
#[must_use]
33
pub struct ParquetWriter<W> {
34
writer: W,
35
/// Data page compression
36
compression: CompressionOptions,
37
/// Compute and write column statistics.
38
statistics: StatisticsOptions,
39
/// if `None` will be 512^2 rows
40
row_group_size: Option<usize>,
41
/// if `None` will be 1024^2 bytes
42
data_page_size: Option<usize>,
43
/// Serialize columns in parallel
44
parallel: bool,
45
/// Custom file-level key value metadata
46
key_value_metadata: Option<KeyValueMetadata>,
47
/// Context info for the Parquet file being written.
48
context_info: Option<PlHashMap<String, String>>,
49
}
50
51
impl<W> ParquetWriter<W>
52
where
53
W: Write,
54
{
55
/// Create a new writer
56
pub fn new(writer: W) -> Self
57
where
58
W: Write,
59
{
60
ParquetWriter {
61
writer,
62
compression: ParquetCompression::default().into(),
63
statistics: StatisticsOptions::default(),
64
row_group_size: None,
65
data_page_size: None,
66
parallel: true,
67
key_value_metadata: None,
68
context_info: None,
69
}
70
}
71
72
/// Set the compression used. Defaults to `Zstd`.
73
///
74
/// The default compression `Zstd` has very good performance, but may not yet been supported
75
/// by older readers. If you want more compatibility guarantees, consider using `Snappy`.
76
pub fn with_compression(mut self, compression: ParquetCompression) -> Self {
77
self.compression = compression.into();
78
self
79
}
80
81
/// Compute and write statistic
82
pub fn with_statistics(mut self, statistics: StatisticsOptions) -> Self {
83
self.statistics = statistics;
84
self
85
}
86
87
/// Set the row group size (in number of rows) during writing. This can reduce memory pressure and improve
88
/// writing performance.
89
pub fn with_row_group_size(mut self, size: Option<usize>) -> Self {
90
self.row_group_size = size;
91
self
92
}
93
94
/// Sets the maximum bytes size of a data page. If `None` will be 1024^2 bytes.
95
pub fn with_data_page_size(mut self, limit: Option<usize>) -> Self {
96
self.data_page_size = limit;
97
self
98
}
99
100
/// Serialize columns in parallel
101
pub fn set_parallel(mut self, parallel: bool) -> Self {
102
self.parallel = parallel;
103
self
104
}
105
106
/// Set custom file-level key value metadata for the Parquet file
107
pub fn with_key_value_metadata(mut self, key_value_metadata: Option<KeyValueMetadata>) -> Self {
108
self.key_value_metadata = key_value_metadata;
109
self
110
}
111
112
/// Set context information for the writer
113
pub fn with_context_info(mut self, context_info: Option<PlHashMap<String, String>>) -> Self {
114
self.context_info = context_info;
115
self
116
}
117
118
pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
119
let schema = schema_to_arrow_checked(schema, CompatLevel::newest(), "parquet")?;
120
let parquet_schema = to_parquet_schema(&schema)?;
121
let encodings = get_encodings(&schema);
122
let options = self.materialize_options();
123
let writer = Mutex::new(FileWriter::try_new(self.writer, schema, options)?);
124
125
Ok(BatchedWriter {
126
writer,
127
parquet_schema,
128
encodings,
129
options,
130
parallel: self.parallel,
131
key_value_metadata: self.key_value_metadata,
132
})
133
}
134
135
fn materialize_options(&self) -> WriteOptions {
136
WriteOptions {
137
statistics: self.statistics,
138
compression: self.compression,
139
version: Version::V1,
140
data_page_size: self.data_page_size,
141
}
142
}
143
144
/// Write the given DataFrame in the writer `W`.
145
/// Returns the total size of the file.
146
pub fn finish(self, df: &mut DataFrame) -> PolarsResult<u64> {
147
let chunked_df = chunk_df_for_writing(df, self.row_group_size.unwrap_or(512 * 512))?;
148
let mut batched = self.batched(chunked_df.schema())?;
149
batched.write_batch(&chunked_df)?;
150
batched.finish()
151
}
152
}
153
154
pub fn get_encodings(schema: &ArrowSchema) -> Buffer<Vec<Encoding>> {
155
schema
156
.iter_values()
157
.map(|f| get_dtype_encoding(&f.dtype))
158
.collect()
159
}
160
161