Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/write/column_chunk.rs
8512 views
1
use std::io::Write;
2
3
#[cfg(feature = "async")]
4
use futures::AsyncWrite;
5
use polars_parquet_format::thrift::protocol::TCompactOutputProtocol;
6
#[cfg(feature = "async")]
7
use polars_parquet_format::thrift::protocol::TCompactOutputStreamProtocol;
8
use polars_parquet_format::{ColumnChunk, ColumnMetaData, Type};
9
use polars_utils::aliases::PlHashSet;
10
11
use super::DynStreamingIterator;
12
#[cfg(feature = "async")]
13
use super::page::write_page_async;
14
use super::page::{PageWriteSpec, is_dict_page, write_page};
15
use super::statistics::reduce;
16
use crate::parquet::FallibleStreamingIterator;
17
use crate::parquet::compression::Compression;
18
use crate::parquet::encoding::Encoding;
19
use crate::parquet::error::{ParquetError, ParquetResult};
20
use crate::parquet::metadata::ColumnDescriptor;
21
use crate::parquet::page::{CompressedPage, PageType};
22
23
pub fn write_column_chunk<W, E>(
24
writer: &mut W,
25
mut offset: u64,
26
descriptor: &ColumnDescriptor,
27
mut compressed_pages: DynStreamingIterator<'_, CompressedPage, E>,
28
) -> ParquetResult<(ColumnChunk, Vec<PageWriteSpec>, u64)>
29
where
30
W: Write,
31
ParquetError: From<E>,
32
E: std::error::Error,
33
{
34
// write every page
35
36
let initial = offset;
37
38
let mut specs = vec![];
39
while let Some(compressed_page) = compressed_pages.next()? {
40
let spec = write_page(writer, offset, compressed_page)?;
41
offset += spec.bytes_written;
42
specs.push(spec);
43
}
44
let mut bytes_written = offset - initial;
45
46
let column_chunk = build_column_chunk(&specs, descriptor)?;
47
48
// write metadata
49
let mut protocol = TCompactOutputProtocol::new(writer);
50
bytes_written += column_chunk
51
.meta_data
52
.as_ref()
53
.unwrap()
54
.write_to_out_protocol(&mut protocol)? as u64;
55
56
Ok((column_chunk, specs, bytes_written))
57
}
58
59
#[cfg(feature = "async")]
60
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
61
pub async fn write_column_chunk_async<W, E>(
62
writer: &mut W,
63
mut offset: u64,
64
descriptor: &ColumnDescriptor,
65
mut compressed_pages: DynStreamingIterator<'_, CompressedPage, E>,
66
) -> ParquetResult<(ColumnChunk, Vec<PageWriteSpec>, u64)>
67
where
68
W: AsyncWrite + Unpin + Send,
69
ParquetError: From<E>,
70
E: std::error::Error,
71
{
72
let initial = offset;
73
// write every page
74
let mut specs = vec![];
75
while let Some(compressed_page) = compressed_pages.next()? {
76
let spec = write_page_async(writer, offset, compressed_page).await?;
77
offset += spec.bytes_written;
78
specs.push(spec);
79
}
80
let mut bytes_written = offset - initial;
81
82
let column_chunk = build_column_chunk(&specs, descriptor)?;
83
84
// write metadata
85
let mut protocol = TCompactOutputStreamProtocol::new(writer);
86
bytes_written += column_chunk
87
.meta_data
88
.as_ref()
89
.unwrap()
90
.write_to_out_stream_protocol(&mut protocol)
91
.await? as u64;
92
93
Ok((column_chunk, specs, bytes_written))
94
}
95
96
fn build_column_chunk(
97
specs: &[PageWriteSpec],
98
descriptor: &ColumnDescriptor,
99
) -> ParquetResult<ColumnChunk> {
100
// compute stats to build header at the end of the chunk
101
102
let compression = specs
103
.iter()
104
.map(|spec| spec.compression)
105
.collect::<PlHashSet<_>>();
106
if compression.len() > 1 {
107
return Err(crate::parquet::error::ParquetError::oos(
108
"All pages within a column chunk must be compressed with the same codec",
109
));
110
}
111
let compression = compression
112
.into_iter()
113
.next()
114
.unwrap_or(Compression::Uncompressed);
115
116
// SPEC: the total compressed size is the total compressed size of each page + the header size
117
let total_compressed_size = specs
118
.iter()
119
.map(|x| x.header_size as i64 + x.header.compressed_page_size as i64)
120
.sum();
121
// SPEC: the total compressed size is the total compressed size of each page + the header size
122
let total_uncompressed_size = specs
123
.iter()
124
.map(|x| x.header_size as i64 + x.header.uncompressed_page_size as i64)
125
.sum();
126
// Per the Parquet spec, if a dictionary page exists it is always the first page
127
// in a column chunk. So we check specs[0] for a dictionary page.
128
// Ref: https://github.com/apache/parquet-format/blob/master/README.md#column-chunks
129
let dictionary_page_offset = specs
130
.first()
131
.filter(|spec| is_dict_page(spec))
132
.map(|spec| spec.offset as i64);
133
134
// If we found a dictionary page at specs[0], the first data page is at specs[1].
135
let data_page_offset = if dictionary_page_offset.is_some() {
136
specs.get(1).map(|spec| spec.offset).unwrap_or(0) as i64
137
} else {
138
specs.first().map(|spec| spec.offset).unwrap_or(0) as i64
139
};
140
let num_values = specs
141
.iter()
142
.map(|spec| {
143
let type_ = spec.header.type_.try_into().unwrap();
144
match type_ {
145
PageType::DataPage => {
146
spec.header.data_page_header.as_ref().unwrap().num_values as i64
147
},
148
PageType::DataPageV2 => {
149
spec.header.data_page_header_v2.as_ref().unwrap().num_values as i64
150
},
151
_ => 0, // only data pages contribute
152
}
153
})
154
.sum();
155
let mut encodings = specs
156
.iter()
157
.flat_map(|spec| {
158
let type_ = spec.header.type_.try_into().unwrap();
159
match type_ {
160
PageType::DataPage => vec![
161
spec.header.data_page_header.as_ref().unwrap().encoding,
162
Encoding::Rle.into(),
163
],
164
PageType::DataPageV2 => {
165
vec![
166
spec.header.data_page_header_v2.as_ref().unwrap().encoding,
167
Encoding::Rle.into(),
168
]
169
},
170
PageType::DictionaryPage => vec![
171
spec.header
172
.dictionary_page_header
173
.as_ref()
174
.unwrap()
175
.encoding,
176
],
177
}
178
})
179
.collect::<PlHashSet<_>>() // unique
180
.into_iter() // to vec
181
.collect::<Vec<_>>();
182
183
// Sort the encodings to have deterministic metadata
184
encodings.sort();
185
186
let statistics = specs.iter().map(|x| &x.statistics).collect::<Vec<_>>();
187
let statistics = reduce(&statistics)?;
188
let statistics = statistics.map(|x| x.serialize());
189
190
let (type_, _): (Type, Option<i32>) = descriptor.descriptor.primitive_type.physical_type.into();
191
192
let metadata = ColumnMetaData {
193
type_,
194
encodings,
195
path_in_schema: descriptor
196
.path_in_schema
197
.iter()
198
.map(|x| x.to_string())
199
.collect::<Vec<_>>(),
200
codec: compression.into(),
201
num_values,
202
total_uncompressed_size,
203
total_compressed_size,
204
key_value_metadata: None,
205
data_page_offset,
206
index_page_offset: None,
207
dictionary_page_offset,
208
statistics,
209
encoding_stats: None,
210
bloom_filter_offset: None,
211
bloom_filter_length: None,
212
size_statistics: None,
213
};
214
215
Ok(ColumnChunk {
216
file_path: None, // same file for now.
217
file_offset: data_page_offset + total_compressed_size,
218
meta_data: Some(metadata),
219
offset_index_offset: None,
220
offset_index_length: None,
221
column_index_offset: None,
222
column_index_length: None,
223
crypto_metadata: None,
224
encrypted_column_metadata: None,
225
})
226
}
227
228