Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/write/stream.rs
6940 views
1
use std::io::Write;
2
3
use futures::{AsyncWrite, AsyncWriteExt};
4
use polars_parquet_format::RowGroup;
5
use polars_parquet_format::thrift::protocol::TCompactOutputStreamProtocol;
6
7
use super::row_group::write_row_group_async;
8
use super::{RowGroupIterColumns, WriteOptions};
9
use crate::parquet::error::{ParquetError, ParquetResult};
10
use crate::parquet::metadata::{KeyValue, SchemaDescriptor};
11
use crate::parquet::write::State;
12
use crate::parquet::write::indexes::{write_column_index_async, write_offset_index_async};
13
use crate::parquet::write::page::PageWriteSpec;
14
use crate::parquet::{FOOTER_SIZE, PARQUET_MAGIC};
15
16
async fn start_file<W: AsyncWrite + Unpin>(writer: &mut W) -> ParquetResult<u64> {
17
writer.write_all(&PARQUET_MAGIC).await?;
18
Ok(PARQUET_MAGIC.len() as u64)
19
}
20
21
async fn end_file<W: AsyncWrite + Unpin + Send>(
22
mut writer: &mut W,
23
metadata: polars_parquet_format::FileMetaData,
24
) -> ParquetResult<u64> {
25
// Write file metadata
26
let mut protocol = TCompactOutputStreamProtocol::new(&mut writer);
27
let metadata_len = metadata.write_to_out_stream_protocol(&mut protocol).await? as i32;
28
29
// Write footer
30
let metadata_bytes = metadata_len.to_le_bytes();
31
let mut footer_buffer = [0u8; FOOTER_SIZE as usize];
32
(0..4).for_each(|i| {
33
footer_buffer[i] = metadata_bytes[i];
34
});
35
36
(&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?;
37
writer.write_all(&footer_buffer).await?;
38
writer.flush().await?;
39
Ok(metadata_len as u64 + FOOTER_SIZE)
40
}
41
42
/// An interface to write a parquet file asynchronously.
43
/// Use `start` to write the header, `write` to write a row group,
44
/// and `end` to write the footer.
45
pub struct FileStreamer<W: AsyncWrite + Unpin + Send> {
46
writer: W,
47
schema: SchemaDescriptor,
48
options: WriteOptions,
49
created_by: Option<String>,
50
51
offset: u64,
52
row_groups: Vec<RowGroup>,
53
page_specs: Vec<Vec<Vec<PageWriteSpec>>>,
54
/// Used to store the current state for writing the file
55
state: State,
56
}
57
58
// Accessors
59
impl<W: AsyncWrite + Unpin + Send> FileStreamer<W> {
60
/// The options assigned to the file
61
pub fn options(&self) -> &WriteOptions {
62
&self.options
63
}
64
65
/// The [`SchemaDescriptor`] assigned to this file
66
pub fn schema(&self) -> &SchemaDescriptor {
67
&self.schema
68
}
69
}
70
71
impl<W: AsyncWrite + Unpin + Send> FileStreamer<W> {
72
/// Returns a new [`FileStreamer`].
73
pub fn new(
74
writer: W,
75
schema: SchemaDescriptor,
76
options: WriteOptions,
77
created_by: Option<String>,
78
) -> Self {
79
Self {
80
writer,
81
schema,
82
options,
83
created_by,
84
offset: 0,
85
row_groups: vec![],
86
page_specs: vec![],
87
state: State::Initialised,
88
}
89
}
90
91
/// Writes the header of the file.
92
///
93
/// This is automatically called by [`Self::write`] if not called following [`Self::new`].
94
///
95
/// # Errors
96
/// Returns an error if data has been written to the file.
97
async fn start(&mut self) -> ParquetResult<()> {
98
if self.offset == 0 {
99
self.offset = start_file(&mut self.writer).await? as u64;
100
self.state = State::Started;
101
Ok(())
102
} else {
103
Err(ParquetError::InvalidParameter(
104
"Start cannot be called twice".to_string(),
105
))
106
}
107
}
108
109
/// Writes a row group to the file.
110
pub async fn write<E>(&mut self, row_group: RowGroupIterColumns<'_, E>) -> ParquetResult<()>
111
where
112
ParquetError: From<E>,
113
E: std::error::Error,
114
{
115
if self.offset == 0 {
116
self.start().await?;
117
}
118
119
let ordinal = self.row_groups.len();
120
let (group, specs, size) = write_row_group_async(
121
&mut self.writer,
122
self.offset,
123
self.schema.columns(),
124
row_group,
125
ordinal,
126
)
127
.await?;
128
self.offset += size;
129
self.row_groups.push(group);
130
self.page_specs.push(specs);
131
Ok(())
132
}
133
134
/// Writes the footer of the parquet file. Returns the total size of the file and the
135
/// underlying writer.
136
pub async fn end(&mut self, key_value_metadata: Option<Vec<KeyValue>>) -> ParquetResult<u64> {
137
if self.offset == 0 {
138
self.start().await?;
139
}
140
141
if self.state != State::Started {
142
return Err(ParquetError::InvalidParameter(
143
"End cannot be called twice".to_string(),
144
));
145
}
146
// compute file stats
147
let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();
148
149
if self.options.write_statistics {
150
// write column indexes (require page statistics)
151
for (group, pages) in self.row_groups.iter_mut().zip(self.page_specs.iter()) {
152
for (column, pages) in group.columns.iter_mut().zip(pages.iter()) {
153
let offset = self.offset;
154
column.column_index_offset = Some(offset as i64);
155
self.offset += write_column_index_async(&mut self.writer, pages).await?;
156
let length = self.offset - offset;
157
column.column_index_length = Some(length as i32);
158
}
159
}
160
};
161
162
// write offset index
163
for (group, pages) in self.row_groups.iter_mut().zip(self.page_specs.iter()) {
164
for (column, pages) in group.columns.iter_mut().zip(pages.iter()) {
165
let offset = self.offset;
166
column.offset_index_offset = Some(offset as i64);
167
self.offset += write_offset_index_async(&mut self.writer, pages).await?;
168
column.offset_index_length = Some((self.offset - offset) as i32);
169
}
170
}
171
172
let metadata = polars_parquet_format::FileMetaData::new(
173
self.options.version.into(),
174
self.schema.clone().into_thrift(),
175
num_rows,
176
self.row_groups.clone(),
177
key_value_metadata,
178
self.created_by.clone(),
179
None,
180
None,
181
None,
182
);
183
184
let len = end_file(&mut self.writer, metadata).await?;
185
Ok(self.offset + len)
186
}
187
188
/// Returns the underlying writer.
189
pub fn into_inner(self) -> W {
190
self.writer
191
}
192
}
193
194