Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/write/page.rs
8512 views
1
use std::io::Write;
2
3
#[cfg(feature = "async")]
4
use futures::{AsyncWrite, AsyncWriteExt};
5
use polars_parquet_format::thrift::protocol::TCompactOutputProtocol;
6
#[cfg(feature = "async")]
7
use polars_parquet_format::thrift::protocol::TCompactOutputStreamProtocol;
8
use polars_parquet_format::{DictionaryPageHeader, Encoding, PageType};
9
10
use crate::parquet::compression::Compression;
11
use crate::parquet::error::{ParquetError, ParquetResult};
12
use crate::parquet::page::{
13
CompressedDataPage, CompressedDictPage, CompressedPage, DataPageHeader, ParquetPageHeader,
14
};
15
use crate::parquet::statistics::Statistics;
16
17
pub(crate) fn is_data_page(page: &PageWriteSpec) -> bool {
18
page.header.type_ == PageType::DATA_PAGE || page.header.type_ == PageType::DATA_PAGE_V2
19
}
20
21
pub(crate) fn is_dict_page(page: &PageWriteSpec) -> bool {
22
page.header.type_ == PageType::DICTIONARY_PAGE
23
}
24
25
fn maybe_bytes(uncompressed: usize, compressed: usize) -> ParquetResult<(i32, i32)> {
26
let uncompressed_page_size: i32 = uncompressed.try_into().map_err(|_| {
27
ParquetError::oos(format!(
28
"A page can only contain i32::MAX uncompressed bytes. This one contains {uncompressed}"
29
))
30
})?;
31
32
let compressed_page_size: i32 = compressed.try_into().map_err(|_| {
33
ParquetError::oos(format!(
34
"A page can only contain i32::MAX compressed bytes. This one contains {compressed}"
35
))
36
})?;
37
38
Ok((uncompressed_page_size, compressed_page_size))
39
}
40
41
/// Contains page write metrics.
42
pub struct PageWriteSpec {
43
pub header: ParquetPageHeader,
44
#[allow(dead_code)]
45
pub num_values: usize,
46
/// The number of actual rows. For non-nested values, this is equal to the number of values.
47
pub num_rows: usize,
48
pub header_size: u64,
49
pub offset: u64,
50
pub bytes_written: u64,
51
pub compression: Compression,
52
pub statistics: Option<Statistics>,
53
}
54
55
pub fn write_page<W: Write>(
56
writer: &mut W,
57
offset: u64,
58
compressed_page: &CompressedPage,
59
) -> ParquetResult<PageWriteSpec> {
60
let num_values = compressed_page.num_values();
61
let num_rows = compressed_page
62
.num_rows()
63
.expect("We should have num_rows when we are writing");
64
65
let header = match &compressed_page {
66
CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),
67
CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),
68
}?;
69
70
let header_size = write_page_header(writer, &header)?;
71
let mut bytes_written = header_size;
72
73
bytes_written += match &compressed_page {
74
CompressedPage::Data(compressed_page) => {
75
writer.write_all(&compressed_page.buffer)?;
76
compressed_page.buffer.len() as u64
77
},
78
CompressedPage::Dict(compressed_page) => {
79
writer.write_all(&compressed_page.buffer)?;
80
compressed_page.buffer.len() as u64
81
},
82
};
83
84
let statistics = match &compressed_page {
85
CompressedPage::Data(compressed_page) => compressed_page.statistics().transpose()?,
86
CompressedPage::Dict(_) => None,
87
};
88
89
Ok(PageWriteSpec {
90
header,
91
header_size,
92
offset,
93
bytes_written,
94
compression: compressed_page.compression(),
95
statistics,
96
num_values,
97
num_rows,
98
})
99
}
100
101
#[cfg(feature = "async")]
102
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
103
pub async fn write_page_async<W: AsyncWrite + Unpin + Send>(
104
writer: &mut W,
105
offset: u64,
106
compressed_page: &CompressedPage,
107
) -> ParquetResult<PageWriteSpec> {
108
let num_values = compressed_page.num_values();
109
let num_rows = compressed_page
110
.num_rows()
111
.expect("We should have the num_rows when we are writing");
112
113
let header = match &compressed_page {
114
CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),
115
CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),
116
}?;
117
118
let header_size = write_page_header_async(writer, &header).await?;
119
let mut bytes_written = header_size as u64;
120
121
bytes_written += match &compressed_page {
122
CompressedPage::Data(compressed_page) => {
123
writer.write_all(&compressed_page.buffer).await?;
124
compressed_page.buffer.len() as u64
125
},
126
CompressedPage::Dict(compressed_page) => {
127
writer.write_all(&compressed_page.buffer).await?;
128
compressed_page.buffer.len() as u64
129
},
130
};
131
132
let statistics = match &compressed_page {
133
CompressedPage::Data(compressed_page) => compressed_page.statistics().transpose()?,
134
CompressedPage::Dict(_) => None,
135
};
136
137
Ok(PageWriteSpec {
138
header,
139
header_size,
140
offset,
141
bytes_written,
142
compression: compressed_page.compression(),
143
statistics,
144
num_rows,
145
num_values,
146
})
147
}
148
149
fn assemble_data_page_header(page: &CompressedDataPage) -> ParquetResult<ParquetPageHeader> {
150
let (uncompressed_page_size, compressed_page_size) =
151
maybe_bytes(page.uncompressed_size(), page.compressed_size())?;
152
153
let mut page_header = ParquetPageHeader {
154
type_: match page.header() {
155
DataPageHeader::V1(_) => PageType::DATA_PAGE,
156
DataPageHeader::V2(_) => PageType::DATA_PAGE_V2,
157
},
158
uncompressed_page_size,
159
compressed_page_size,
160
crc: None,
161
data_page_header: None,
162
index_page_header: None,
163
dictionary_page_header: None,
164
data_page_header_v2: None,
165
};
166
167
match page.header() {
168
DataPageHeader::V1(header) => {
169
page_header.data_page_header = Some(header.clone());
170
},
171
DataPageHeader::V2(header) => {
172
page_header.data_page_header_v2 = Some(header.clone());
173
},
174
}
175
Ok(page_header)
176
}
177
178
fn assemble_dict_page_header(page: &CompressedDictPage) -> ParquetResult<ParquetPageHeader> {
179
let (uncompressed_page_size, compressed_page_size) =
180
maybe_bytes(page.uncompressed_page_size, page.buffer.len())?;
181
182
let num_values: i32 = page.num_values.try_into().map_err(|_| {
183
ParquetError::oos(format!(
184
"A dictionary page can only contain i32::MAX items. This one contains {}",
185
page.num_values
186
))
187
})?;
188
189
Ok(ParquetPageHeader {
190
type_: PageType::DICTIONARY_PAGE,
191
uncompressed_page_size,
192
compressed_page_size,
193
crc: None,
194
data_page_header: None,
195
index_page_header: None,
196
dictionary_page_header: Some(DictionaryPageHeader {
197
num_values,
198
encoding: Encoding::PLAIN,
199
is_sorted: None,
200
}),
201
data_page_header_v2: None,
202
})
203
}
204
205
/// writes the page header into `writer`, returning the number of bytes used in the process.
206
fn write_page_header<W: Write>(
207
mut writer: &mut W,
208
header: &ParquetPageHeader,
209
) -> ParquetResult<u64> {
210
let mut protocol = TCompactOutputProtocol::new(&mut writer);
211
Ok(header.write_to_out_protocol(&mut protocol)? as u64)
212
}
213
214
#[cfg(feature = "async")]
215
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
216
/// writes the page header into `writer`, returning the number of bytes used in the process.
217
async fn write_page_header_async<W: AsyncWrite + Unpin + Send>(
218
mut writer: &mut W,
219
header: &ParquetPageHeader,
220
) -> ParquetResult<u64> {
221
let mut protocol = TCompactOutputStreamProtocol::new(&mut writer);
222
Ok(header.write_to_out_stream_protocol(&mut protocol).await? as u64)
223
}
224
225
#[cfg(test)]
226
mod tests {
227
use super::*;
228
use crate::parquet::CowBuffer;
229
230
#[test]
231
fn dict_too_large() {
232
let page = CompressedDictPage::new(
233
CowBuffer::Owned(vec![]),
234
Compression::Uncompressed,
235
i32::MAX as usize + 1,
236
100,
237
false,
238
);
239
assert!(assemble_dict_page_header(&page).is_err());
240
}
241
242
#[test]
243
fn dict_too_many_values() {
244
let page = CompressedDictPage::new(
245
CowBuffer::Owned(vec![]),
246
Compression::Uncompressed,
247
0,
248
i32::MAX as usize + 1,
249
false,
250
);
251
assert!(assemble_dict_page_header(&page).is_err());
252
}
253
}
254
255