Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/write/file.rs
6940 views
1
use std::io::Write;
2
3
use polars_parquet_format::RowGroup;
4
use polars_parquet_format::thrift::protocol::TCompactOutputProtocol;
5
6
use super::indexes::{write_column_index, write_offset_index};
7
use super::page::PageWriteSpec;
8
use super::row_group::write_row_group;
9
use super::{RowGroupIterColumns, WriteOptions};
10
use crate::parquet::error::{ParquetError, ParquetResult};
11
pub use crate::parquet::metadata::KeyValue;
12
use crate::parquet::metadata::{SchemaDescriptor, ThriftFileMetadata};
13
use crate::parquet::write::State;
14
use crate::parquet::{FOOTER_SIZE, PARQUET_MAGIC};
15
16
pub(super) fn start_file<W: Write>(writer: &mut W) -> ParquetResult<u64> {
17
writer.write_all(&PARQUET_MAGIC)?;
18
Ok(PARQUET_MAGIC.len() as u64)
19
}
20
21
pub(super) fn end_file<W: Write>(
22
mut writer: &mut W,
23
metadata: &ThriftFileMetadata,
24
) -> ParquetResult<u64> {
25
// Write metadata
26
let mut protocol = TCompactOutputProtocol::new(&mut writer);
27
let metadata_len = metadata.write_to_out_protocol(&mut protocol)? as i32;
28
29
// Write footer
30
let metadata_bytes = metadata_len.to_le_bytes();
31
let mut footer_buffer = [0u8; FOOTER_SIZE as usize];
32
(0..4).for_each(|i| {
33
footer_buffer[i] = metadata_bytes[i];
34
});
35
36
(&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?;
37
writer.write_all(&footer_buffer)?;
38
writer.flush()?;
39
Ok(metadata_len as u64 + FOOTER_SIZE)
40
}
41
42
fn create_column_orders(schema_desc: &SchemaDescriptor) -> Vec<polars_parquet_format::ColumnOrder> {
43
// We only include ColumnOrder for leaf nodes.
44
// Currently only supported ColumnOrder is TypeDefinedOrder so we set this
45
// for all leaf nodes.
46
// Even if the column has an undefined sort order, such as INTERVAL, this
47
// is still technically the defined TYPEORDER so it should still be set.
48
(0..schema_desc.columns().len())
49
.map(|_| {
50
polars_parquet_format::ColumnOrder::TYPEORDER(
51
polars_parquet_format::TypeDefinedOrder {},
52
)
53
})
54
.collect()
55
}
56
57
/// An interface to write a parquet file.
58
/// Use `start` to write the header, `write` to write a row group,
59
/// and `end` to write the footer.
60
pub struct FileWriter<W: Write> {
61
writer: W,
62
schema: SchemaDescriptor,
63
options: WriteOptions,
64
created_by: Option<String>,
65
66
offset: u64,
67
row_groups: Vec<RowGroup>,
68
page_specs: Vec<Vec<Vec<PageWriteSpec>>>,
69
/// Used to store the current state for writing the file
70
state: State,
71
// when the file is written, metadata becomes available
72
metadata: Option<ThriftFileMetadata>,
73
}
74
75
/// Writes a parquet file containing only the header and footer
76
///
77
/// This is used to write the metadata as a separate Parquet file, usually when data
78
/// is partitioned across multiple files.
79
///
80
/// Note: Recall that when combining row groups from [`ThriftFileMetadata`], the `file_path` on each
81
/// of their column chunks must be updated with their path relative to where they are written to.
82
pub fn write_metadata_sidecar<W: Write>(
83
writer: &mut W,
84
metadata: &ThriftFileMetadata,
85
) -> ParquetResult<u64> {
86
let mut len = start_file(writer)?;
87
len += end_file(writer, metadata)?;
88
Ok(len)
89
}
90
91
// Accessors
92
impl<W: Write> FileWriter<W> {
93
/// The options assigned to the file
94
pub fn options(&self) -> &WriteOptions {
95
&self.options
96
}
97
98
/// The [`SchemaDescriptor`] assigned to this file
99
pub fn schema(&self) -> &SchemaDescriptor {
100
&self.schema
101
}
102
103
/// Returns the [`ThriftFileMetadata`]. This is Some iff the [`Self::end`] has been called.
104
///
105
/// This is used to write the metadata as a separate Parquet file, usually when data
106
/// is partitioned across multiple files
107
pub fn metadata(&self) -> Option<&ThriftFileMetadata> {
108
self.metadata.as_ref()
109
}
110
}
111
112
impl<W: Write> FileWriter<W> {
113
/// Returns a new [`FileWriter`].
114
pub fn new(
115
writer: W,
116
schema: SchemaDescriptor,
117
options: WriteOptions,
118
created_by: Option<String>,
119
) -> Self {
120
Self {
121
writer,
122
schema,
123
options,
124
created_by,
125
offset: 0,
126
row_groups: vec![],
127
page_specs: vec![],
128
state: State::Initialised,
129
metadata: None,
130
}
131
}
132
133
/// Writes the header of the file.
134
///
135
/// This is automatically called by [`Self::write`] if not called following [`Self::new`].
136
///
137
/// # Errors
138
/// Returns an error if data has been written to the file.
139
fn start(&mut self) -> ParquetResult<()> {
140
if self.offset == 0 {
141
self.offset = start_file(&mut self.writer)?;
142
self.state = State::Started;
143
Ok(())
144
} else {
145
Err(ParquetError::InvalidParameter(
146
"Start cannot be called twice".to_string(),
147
))
148
}
149
}
150
151
/// Writes a row group to the file.
152
///
153
/// This call is IO-bounded
154
pub fn write<E>(&mut self, row_group: RowGroupIterColumns<'_, E>) -> ParquetResult<()>
155
where
156
ParquetError: From<E>,
157
E: std::error::Error,
158
{
159
if self.offset == 0 {
160
self.start()?;
161
}
162
let ordinal = self.row_groups.len();
163
let (group, specs, size) = write_row_group(
164
&mut self.writer,
165
self.offset,
166
self.schema.columns(),
167
row_group,
168
ordinal,
169
)?;
170
self.offset += size;
171
self.row_groups.push(group);
172
self.page_specs.push(specs);
173
Ok(())
174
}
175
176
/// Writes the footer of the parquet file. Returns the total size of the file and the
177
/// underlying writer.
178
pub fn end(&mut self, key_value_metadata: Option<Vec<KeyValue>>) -> ParquetResult<u64> {
179
if self.offset == 0 {
180
self.start()?;
181
}
182
183
if self.state != State::Started {
184
return Err(ParquetError::InvalidParameter(
185
"End cannot be called twice".to_string(),
186
));
187
}
188
// compute file stats
189
let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();
190
191
if self.options.write_statistics {
192
// write column indexes (require page statistics)
193
self.row_groups
194
.iter_mut()
195
.zip(self.page_specs.iter())
196
.try_for_each(|(group, pages)| {
197
group.columns.iter_mut().zip(pages.iter()).try_for_each(
198
|(column, pages)| {
199
let offset = self.offset;
200
column.column_index_offset = Some(offset as i64);
201
self.offset += write_column_index(&mut self.writer, pages)?;
202
let length = self.offset - offset;
203
column.column_index_length = Some(length as i32);
204
ParquetResult::Ok(())
205
},
206
)?;
207
ParquetResult::Ok(())
208
})?;
209
};
210
211
// write offset index
212
self.row_groups
213
.iter_mut()
214
.zip(self.page_specs.iter())
215
.try_for_each(|(group, pages)| {
216
group
217
.columns
218
.iter_mut()
219
.zip(pages.iter())
220
.try_for_each(|(column, pages)| {
221
let offset = self.offset;
222
column.offset_index_offset = Some(offset as i64);
223
self.offset += write_offset_index(&mut self.writer, pages)?;
224
column.offset_index_length = Some((self.offset - offset) as i32);
225
ParquetResult::Ok(())
226
})?;
227
ParquetResult::Ok(())
228
})?;
229
230
let metadata = ThriftFileMetadata::new(
231
self.options.version.into(),
232
self.schema.clone().into_thrift(),
233
num_rows,
234
self.row_groups.clone(),
235
key_value_metadata,
236
self.created_by.clone(),
237
Some(create_column_orders(&self.schema)),
238
None,
239
None,
240
);
241
242
let len = end_file(&mut self.writer, &metadata)?;
243
self.state = State::Finished;
244
self.metadata = Some(metadata);
245
Ok(self.offset + len)
246
}
247
248
/// Returns the underlying writer.
249
pub fn into_inner(self) -> W {
250
self.writer
251
}
252
253
/// Returns the underlying writer and [`ThriftFileMetadata`]
254
/// # Panics
255
/// This function panics if [`Self::end`] has not yet been called
256
pub fn into_inner_and_metadata(self) -> (W, ThriftFileMetadata) {
257
(self.writer, self.metadata.expect("File to have ended"))
258
}
259
}
260
261