Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/write/column_chunk.rs
6940 views
1
use std::io::Write;
2
3
#[cfg(feature = "async")]
4
use futures::AsyncWrite;
5
use polars_parquet_format::thrift::protocol::TCompactOutputProtocol;
6
#[cfg(feature = "async")]
7
use polars_parquet_format::thrift::protocol::TCompactOutputStreamProtocol;
8
use polars_parquet_format::{ColumnChunk, ColumnMetaData, Type};
9
use polars_utils::aliases::PlHashSet;
10
11
use super::DynStreamingIterator;
12
#[cfg(feature = "async")]
13
use super::page::write_page_async;
14
use super::page::{PageWriteSpec, write_page};
15
use super::statistics::reduce;
16
use crate::parquet::FallibleStreamingIterator;
17
use crate::parquet::compression::Compression;
18
use crate::parquet::encoding::Encoding;
19
use crate::parquet::error::{ParquetError, ParquetResult};
20
use crate::parquet::metadata::ColumnDescriptor;
21
use crate::parquet::page::{CompressedPage, PageType};
22
23
pub fn write_column_chunk<W, E>(
24
writer: &mut W,
25
mut offset: u64,
26
descriptor: &ColumnDescriptor,
27
mut compressed_pages: DynStreamingIterator<'_, CompressedPage, E>,
28
) -> ParquetResult<(ColumnChunk, Vec<PageWriteSpec>, u64)>
29
where
30
W: Write,
31
ParquetError: From<E>,
32
E: std::error::Error,
33
{
34
// write every page
35
36
let initial = offset;
37
38
let mut specs = vec![];
39
while let Some(compressed_page) = compressed_pages.next()? {
40
let spec = write_page(writer, offset, compressed_page)?;
41
offset += spec.bytes_written;
42
specs.push(spec);
43
}
44
let mut bytes_written = offset - initial;
45
46
let column_chunk = build_column_chunk(&specs, descriptor)?;
47
48
// write metadata
49
let mut protocol = TCompactOutputProtocol::new(writer);
50
bytes_written += column_chunk
51
.meta_data
52
.as_ref()
53
.unwrap()
54
.write_to_out_protocol(&mut protocol)? as u64;
55
56
Ok((column_chunk, specs, bytes_written))
57
}
58
59
#[cfg(feature = "async")]
60
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
61
pub async fn write_column_chunk_async<W, E>(
62
writer: &mut W,
63
mut offset: u64,
64
descriptor: &ColumnDescriptor,
65
mut compressed_pages: DynStreamingIterator<'_, CompressedPage, E>,
66
) -> ParquetResult<(ColumnChunk, Vec<PageWriteSpec>, u64)>
67
where
68
W: AsyncWrite + Unpin + Send,
69
ParquetError: From<E>,
70
E: std::error::Error,
71
{
72
let initial = offset;
73
// write every page
74
let mut specs = vec![];
75
while let Some(compressed_page) = compressed_pages.next()? {
76
let spec = write_page_async(writer, offset, compressed_page).await?;
77
offset += spec.bytes_written;
78
specs.push(spec);
79
}
80
let mut bytes_written = offset - initial;
81
82
let column_chunk = build_column_chunk(&specs, descriptor)?;
83
84
// write metadata
85
let mut protocol = TCompactOutputStreamProtocol::new(writer);
86
bytes_written += column_chunk
87
.meta_data
88
.as_ref()
89
.unwrap()
90
.write_to_out_stream_protocol(&mut protocol)
91
.await? as u64;
92
93
Ok((column_chunk, specs, bytes_written))
94
}
95
96
fn build_column_chunk(
97
specs: &[PageWriteSpec],
98
descriptor: &ColumnDescriptor,
99
) -> ParquetResult<ColumnChunk> {
100
// compute stats to build header at the end of the chunk
101
102
let compression = specs
103
.iter()
104
.map(|spec| spec.compression)
105
.collect::<PlHashSet<_>>();
106
if compression.len() > 1 {
107
return Err(crate::parquet::error::ParquetError::oos(
108
"All pages within a column chunk must be compressed with the same codec",
109
));
110
}
111
let compression = compression
112
.into_iter()
113
.next()
114
.unwrap_or(Compression::Uncompressed);
115
116
// SPEC: the total compressed size is the total compressed size of each page + the header size
117
let total_compressed_size = specs
118
.iter()
119
.map(|x| x.header_size as i64 + x.header.compressed_page_size as i64)
120
.sum();
121
// SPEC: the total compressed size is the total compressed size of each page + the header size
122
let total_uncompressed_size = specs
123
.iter()
124
.map(|x| x.header_size as i64 + x.header.uncompressed_page_size as i64)
125
.sum();
126
let data_page_offset = specs.first().map(|spec| spec.offset).unwrap_or(0) as i64;
127
let num_values = specs
128
.iter()
129
.map(|spec| {
130
let type_ = spec.header.type_.try_into().unwrap();
131
match type_ {
132
PageType::DataPage => {
133
spec.header.data_page_header.as_ref().unwrap().num_values as i64
134
},
135
PageType::DataPageV2 => {
136
spec.header.data_page_header_v2.as_ref().unwrap().num_values as i64
137
},
138
_ => 0, // only data pages contribute
139
}
140
})
141
.sum();
142
let mut encodings = specs
143
.iter()
144
.flat_map(|spec| {
145
let type_ = spec.header.type_.try_into().unwrap();
146
match type_ {
147
PageType::DataPage => vec![
148
spec.header.data_page_header.as_ref().unwrap().encoding,
149
Encoding::Rle.into(),
150
],
151
PageType::DataPageV2 => {
152
vec![
153
spec.header.data_page_header_v2.as_ref().unwrap().encoding,
154
Encoding::Rle.into(),
155
]
156
},
157
PageType::DictionaryPage => vec![
158
spec.header
159
.dictionary_page_header
160
.as_ref()
161
.unwrap()
162
.encoding,
163
],
164
}
165
})
166
.collect::<PlHashSet<_>>() // unique
167
.into_iter() // to vec
168
.collect::<Vec<_>>();
169
170
// Sort the encodings to have deterministic metadata
171
encodings.sort();
172
173
let statistics = specs.iter().map(|x| &x.statistics).collect::<Vec<_>>();
174
let statistics = reduce(&statistics)?;
175
let statistics = statistics.map(|x| x.serialize());
176
177
let (type_, _): (Type, Option<i32>) = descriptor.descriptor.primitive_type.physical_type.into();
178
179
let metadata = ColumnMetaData {
180
type_,
181
encodings,
182
path_in_schema: descriptor
183
.path_in_schema
184
.iter()
185
.map(|x| x.to_string())
186
.collect::<Vec<_>>(),
187
codec: compression.into(),
188
num_values,
189
total_uncompressed_size,
190
total_compressed_size,
191
key_value_metadata: None,
192
data_page_offset,
193
index_page_offset: None,
194
dictionary_page_offset: None,
195
statistics,
196
encoding_stats: None,
197
bloom_filter_offset: None,
198
bloom_filter_length: None,
199
size_statistics: None,
200
};
201
202
Ok(ColumnChunk {
203
file_path: None, // same file for now.
204
file_offset: data_page_offset + total_compressed_size,
205
meta_data: Some(metadata),
206
offset_index_offset: None,
207
offset_index_length: None,
208
column_index_offset: None,
209
column_index_length: None,
210
crypto_metadata: None,
211
encrypted_column_metadata: None,
212
})
213
}
214
215