Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/write/file.rs
8512 views
1
use std::io::Write;
2
3
use polars_parquet_format::RowGroup;
4
use polars_parquet_format::thrift::protocol::TCompactOutputProtocol;
5
6
use super::indexes::{write_column_index, write_offset_index};
7
use super::page::PageWriteSpec;
8
use super::row_group::write_row_group;
9
use super::{RowGroupIterColumns, WriteOptions};
10
use crate::parquet::error::{ParquetError, ParquetResult};
11
pub use crate::parquet::metadata::KeyValue;
12
use crate::parquet::metadata::{SchemaDescriptor, ThriftFileMetadata};
13
use crate::parquet::write::State;
14
use crate::parquet::{FOOTER_SIZE, PARQUET_MAGIC};
15
16
pub(super) fn start_file<W: Write>(writer: &mut W) -> ParquetResult<u64> {
17
writer.write_all(&PARQUET_MAGIC)?;
18
Ok(PARQUET_MAGIC.len() as u64)
19
}
20
21
pub(super) fn end_file<W: Write>(
22
mut writer: &mut W,
23
metadata: &ThriftFileMetadata,
24
) -> ParquetResult<u64> {
25
// Write metadata
26
let mut protocol = TCompactOutputProtocol::new(&mut writer);
27
let metadata_len = metadata.write_to_out_protocol(&mut protocol)? as i32;
28
29
// Write footer
30
let metadata_bytes = metadata_len.to_le_bytes();
31
let mut footer_buffer = [0u8; FOOTER_SIZE as usize];
32
(0..4).for_each(|i| {
33
footer_buffer[i] = metadata_bytes[i];
34
});
35
36
(&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?;
37
writer.write_all(&footer_buffer)?;
38
writer.flush()?;
39
Ok(metadata_len as u64 + FOOTER_SIZE)
40
}
41
42
fn create_column_orders(schema_desc: &SchemaDescriptor) -> Vec<polars_parquet_format::ColumnOrder> {
43
// We only include ColumnOrder for leaf nodes.
44
// Currently only supported ColumnOrder is TypeDefinedOrder so we set this
45
// for all leaf nodes.
46
// Even if the column has an undefined sort order, such as INTERVAL, this
47
// is still technically the defined TYPEORDER so it should still be set.
48
(0..schema_desc.columns().len())
49
.map(|_| {
50
polars_parquet_format::ColumnOrder::TYPEORDER(
51
polars_parquet_format::TypeDefinedOrder {},
52
)
53
})
54
.collect()
55
}
56
57
/// An interface to write a parquet file.
58
/// Use `start` to write the header, `write` to write a row group,
59
/// and `end` to write the footer.
60
pub struct FileWriter<W: Write> {
61
writer: W,
62
schema: SchemaDescriptor,
63
options: WriteOptions,
64
created_by: Option<String>,
65
66
offset: u64,
67
row_groups: Vec<RowGroup>,
68
page_specs: Vec<Vec<Vec<PageWriteSpec>>>,
69
/// Used to store the current state for writing the file
70
state: State,
71
// when the file is written, metadata becomes available
72
metadata: Option<ThriftFileMetadata>,
73
}
74
75
/// Writes a parquet file containing only the header and footer
76
///
77
/// This is used to write the metadata as a separate Parquet file, usually when data
78
/// is partitioned across multiple files.
79
///
80
/// Note: Recall that when combining row groups from [`ThriftFileMetadata`], the `file_path` on each
81
/// of their column chunks must be updated with their path relative to where they are written to.
82
pub fn write_metadata_sidecar<W: Write>(
83
writer: &mut W,
84
metadata: &ThriftFileMetadata,
85
) -> ParquetResult<u64> {
86
let mut len = start_file(writer)?;
87
len += end_file(writer, metadata)?;
88
Ok(len)
89
}
90
91
// Accessors
92
impl<W: Write> FileWriter<W> {
93
/// The options assigned to the file
94
pub fn options(&self) -> &WriteOptions {
95
&self.options
96
}
97
98
/// The [`SchemaDescriptor`] assigned to this file
99
pub fn schema(&self) -> &SchemaDescriptor {
100
&self.schema
101
}
102
103
/// Returns the [`ThriftFileMetadata`]. This is Some iff the [`Self::end`] has been called.
104
///
105
/// This is used to write the metadata as a separate Parquet file, usually when data
106
/// is partitioned across multiple files
107
pub fn metadata(&self) -> Option<&ThriftFileMetadata> {
108
self.metadata.as_ref()
109
}
110
}
111
112
impl<W: Write> FileWriter<W> {
113
/// Returns a new [`FileWriter`].
114
pub fn new(
115
writer: W,
116
schema: SchemaDescriptor,
117
options: WriteOptions,
118
created_by: Option<String>,
119
) -> Self {
120
Self {
121
writer,
122
schema,
123
options,
124
created_by,
125
offset: 0,
126
row_groups: vec![],
127
page_specs: vec![],
128
state: State::Initialised,
129
metadata: None,
130
}
131
}
132
133
/// Writes the header of the file.
134
///
135
/// This is automatically called by [`Self::write`] if not called following [`Self::new`].
136
///
137
/// # Errors
138
/// Returns an error if data has been written to the file.
139
fn start(&mut self) -> ParquetResult<()> {
140
if self.offset == 0 {
141
self.offset = start_file(&mut self.writer)?;
142
self.state = State::Started;
143
Ok(())
144
} else {
145
Err(ParquetError::InvalidParameter(
146
"Start cannot be called twice".to_string(),
147
))
148
}
149
}
150
151
/// Writes a row group to the file.
152
///
153
/// This call is IO-bounded
154
pub fn write<E>(
155
&mut self,
156
num_rows: u64,
157
row_group: RowGroupIterColumns<'_, E>,
158
) -> ParquetResult<()>
159
where
160
ParquetError: From<E>,
161
E: std::error::Error,
162
{
163
if self.offset == 0 {
164
self.start()?;
165
}
166
let ordinal = self.row_groups.len();
167
let (group, specs, size) = write_row_group(
168
&mut self.writer,
169
num_rows,
170
self.offset,
171
self.schema.columns(),
172
row_group,
173
ordinal,
174
)?;
175
self.offset += size;
176
self.row_groups.push(group);
177
self.page_specs.push(specs);
178
Ok(())
179
}
180
181
/// Writes the footer of the parquet file. Returns the total size of the file and the
182
/// underlying writer.
183
pub fn end(&mut self, key_value_metadata: Option<Vec<KeyValue>>) -> ParquetResult<u64> {
184
if self.offset == 0 {
185
self.start()?;
186
}
187
188
if self.state != State::Started {
189
return Err(ParquetError::InvalidParameter(
190
"End cannot be called twice".to_string(),
191
));
192
}
193
// compute file stats
194
let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();
195
196
if self.options.write_statistics {
197
// write column indexes (require page statistics)
198
self.row_groups
199
.iter_mut()
200
.zip(self.page_specs.iter())
201
.try_for_each(|(group, pages)| {
202
group.columns.iter_mut().zip(pages.iter()).try_for_each(
203
|(column, pages)| {
204
let offset = self.offset;
205
column.column_index_offset = Some(offset as i64);
206
self.offset += write_column_index(&mut self.writer, pages)?;
207
let length = self.offset - offset;
208
column.column_index_length = Some(length as i32);
209
ParquetResult::Ok(())
210
},
211
)?;
212
ParquetResult::Ok(())
213
})?;
214
};
215
216
// write offset index
217
self.row_groups
218
.iter_mut()
219
.zip(self.page_specs.iter())
220
.try_for_each(|(group, pages)| {
221
group
222
.columns
223
.iter_mut()
224
.zip(pages.iter())
225
.try_for_each(|(column, pages)| {
226
let offset = self.offset;
227
column.offset_index_offset = Some(offset as i64);
228
self.offset += write_offset_index(&mut self.writer, pages)?;
229
column.offset_index_length = Some((self.offset - offset) as i32);
230
ParquetResult::Ok(())
231
})?;
232
ParquetResult::Ok(())
233
})?;
234
235
let metadata = ThriftFileMetadata::new(
236
self.options.version.into(),
237
self.schema.clone().into_thrift(),
238
num_rows,
239
self.row_groups.clone(),
240
key_value_metadata,
241
self.created_by.clone(),
242
Some(create_column_orders(&self.schema)),
243
None,
244
None,
245
);
246
247
let len = end_file(&mut self.writer, &metadata)?;
248
self.state = State::Finished;
249
self.metadata = Some(metadata);
250
Ok(self.offset + len)
251
}
252
253
/// Returns the underlying writer.
254
pub fn into_inner(self) -> W {
255
self.writer
256
}
257
258
/// Returns the underlying writer and [`ThriftFileMetadata`]
259
/// # Panics
260
/// This function panics if [`Self::end`] has not yet been called
261
pub fn into_inner_and_metadata(self) -> (W, ThriftFileMetadata) {
262
(self.writer, self.metadata.expect("File to have ended"))
263
}
264
}
265
266