Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/write/page.rs
6940 views
1
use std::io::Write;
2
3
#[cfg(feature = "async")]
4
use futures::{AsyncWrite, AsyncWriteExt};
5
use polars_parquet_format::thrift::protocol::TCompactOutputProtocol;
6
#[cfg(feature = "async")]
7
use polars_parquet_format::thrift::protocol::TCompactOutputStreamProtocol;
8
use polars_parquet_format::{DictionaryPageHeader, Encoding, PageType};
9
10
use crate::parquet::compression::Compression;
11
use crate::parquet::error::{ParquetError, ParquetResult};
12
use crate::parquet::page::{
13
CompressedDataPage, CompressedDictPage, CompressedPage, DataPageHeader, ParquetPageHeader,
14
};
15
use crate::parquet::statistics::Statistics;
16
17
pub(crate) fn is_data_page(page: &PageWriteSpec) -> bool {
18
page.header.type_ == PageType::DATA_PAGE || page.header.type_ == PageType::DATA_PAGE_V2
19
}
20
21
fn maybe_bytes(uncompressed: usize, compressed: usize) -> ParquetResult<(i32, i32)> {
22
let uncompressed_page_size: i32 = uncompressed.try_into().map_err(|_| {
23
ParquetError::oos(format!(
24
"A page can only contain i32::MAX uncompressed bytes. This one contains {uncompressed}"
25
))
26
})?;
27
28
let compressed_page_size: i32 = compressed.try_into().map_err(|_| {
29
ParquetError::oos(format!(
30
"A page can only contain i32::MAX compressed bytes. This one contains {compressed}"
31
))
32
})?;
33
34
Ok((uncompressed_page_size, compressed_page_size))
35
}
36
37
/// Contains page write metrics.
38
pub struct PageWriteSpec {
39
pub header: ParquetPageHeader,
40
#[allow(dead_code)]
41
pub num_values: usize,
42
/// The number of actual rows. For non-nested values, this is equal to the number of values.
43
pub num_rows: usize,
44
pub header_size: u64,
45
pub offset: u64,
46
pub bytes_written: u64,
47
pub compression: Compression,
48
pub statistics: Option<Statistics>,
49
}
50
51
pub fn write_page<W: Write>(
52
writer: &mut W,
53
offset: u64,
54
compressed_page: &CompressedPage,
55
) -> ParquetResult<PageWriteSpec> {
56
let num_values = compressed_page.num_values();
57
let num_rows = compressed_page
58
.num_rows()
59
.expect("We should have num_rows when we are writing");
60
61
let header = match &compressed_page {
62
CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),
63
CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),
64
}?;
65
66
let header_size = write_page_header(writer, &header)?;
67
let mut bytes_written = header_size;
68
69
bytes_written += match &compressed_page {
70
CompressedPage::Data(compressed_page) => {
71
writer.write_all(&compressed_page.buffer)?;
72
compressed_page.buffer.len() as u64
73
},
74
CompressedPage::Dict(compressed_page) => {
75
writer.write_all(&compressed_page.buffer)?;
76
compressed_page.buffer.len() as u64
77
},
78
};
79
80
let statistics = match &compressed_page {
81
CompressedPage::Data(compressed_page) => compressed_page.statistics().transpose()?,
82
CompressedPage::Dict(_) => None,
83
};
84
85
Ok(PageWriteSpec {
86
header,
87
header_size,
88
offset,
89
bytes_written,
90
compression: compressed_page.compression(),
91
statistics,
92
num_values,
93
num_rows,
94
})
95
}
96
97
#[cfg(feature = "async")]
98
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
99
pub async fn write_page_async<W: AsyncWrite + Unpin + Send>(
100
writer: &mut W,
101
offset: u64,
102
compressed_page: &CompressedPage,
103
) -> ParquetResult<PageWriteSpec> {
104
let num_values = compressed_page.num_values();
105
let num_rows = compressed_page
106
.num_rows()
107
.expect("We should have the num_rows when we are writing");
108
109
let header = match &compressed_page {
110
CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),
111
CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),
112
}?;
113
114
let header_size = write_page_header_async(writer, &header).await?;
115
let mut bytes_written = header_size as u64;
116
117
bytes_written += match &compressed_page {
118
CompressedPage::Data(compressed_page) => {
119
writer.write_all(&compressed_page.buffer).await?;
120
compressed_page.buffer.len() as u64
121
},
122
CompressedPage::Dict(compressed_page) => {
123
writer.write_all(&compressed_page.buffer).await?;
124
compressed_page.buffer.len() as u64
125
},
126
};
127
128
let statistics = match &compressed_page {
129
CompressedPage::Data(compressed_page) => compressed_page.statistics().transpose()?,
130
CompressedPage::Dict(_) => None,
131
};
132
133
Ok(PageWriteSpec {
134
header,
135
header_size,
136
offset,
137
bytes_written,
138
compression: compressed_page.compression(),
139
statistics,
140
num_rows,
141
num_values,
142
})
143
}
144
145
fn assemble_data_page_header(page: &CompressedDataPage) -> ParquetResult<ParquetPageHeader> {
146
let (uncompressed_page_size, compressed_page_size) =
147
maybe_bytes(page.uncompressed_size(), page.compressed_size())?;
148
149
let mut page_header = ParquetPageHeader {
150
type_: match page.header() {
151
DataPageHeader::V1(_) => PageType::DATA_PAGE,
152
DataPageHeader::V2(_) => PageType::DATA_PAGE_V2,
153
},
154
uncompressed_page_size,
155
compressed_page_size,
156
crc: None,
157
data_page_header: None,
158
index_page_header: None,
159
dictionary_page_header: None,
160
data_page_header_v2: None,
161
};
162
163
match page.header() {
164
DataPageHeader::V1(header) => {
165
page_header.data_page_header = Some(header.clone());
166
},
167
DataPageHeader::V2(header) => {
168
page_header.data_page_header_v2 = Some(header.clone());
169
},
170
}
171
Ok(page_header)
172
}
173
174
fn assemble_dict_page_header(page: &CompressedDictPage) -> ParquetResult<ParquetPageHeader> {
175
let (uncompressed_page_size, compressed_page_size) =
176
maybe_bytes(page.uncompressed_page_size, page.buffer.len())?;
177
178
let num_values: i32 = page.num_values.try_into().map_err(|_| {
179
ParquetError::oos(format!(
180
"A dictionary page can only contain i32::MAX items. This one contains {}",
181
page.num_values
182
))
183
})?;
184
185
Ok(ParquetPageHeader {
186
type_: PageType::DICTIONARY_PAGE,
187
uncompressed_page_size,
188
compressed_page_size,
189
crc: None,
190
data_page_header: None,
191
index_page_header: None,
192
dictionary_page_header: Some(DictionaryPageHeader {
193
num_values,
194
encoding: Encoding::PLAIN,
195
is_sorted: None,
196
}),
197
data_page_header_v2: None,
198
})
199
}
200
201
/// writes the page header into `writer`, returning the number of bytes used in the process.
202
fn write_page_header<W: Write>(
203
mut writer: &mut W,
204
header: &ParquetPageHeader,
205
) -> ParquetResult<u64> {
206
let mut protocol = TCompactOutputProtocol::new(&mut writer);
207
Ok(header.write_to_out_protocol(&mut protocol)? as u64)
208
}
209
210
#[cfg(feature = "async")]
211
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
212
/// writes the page header into `writer`, returning the number of bytes used in the process.
213
async fn write_page_header_async<W: AsyncWrite + Unpin + Send>(
214
mut writer: &mut W,
215
header: &ParquetPageHeader,
216
) -> ParquetResult<u64> {
217
let mut protocol = TCompactOutputStreamProtocol::new(&mut writer);
218
Ok(header.write_to_out_stream_protocol(&mut protocol).await? as u64)
219
}
220
221
#[cfg(test)]
222
mod tests {
223
use super::*;
224
use crate::parquet::CowBuffer;
225
226
#[test]
227
fn dict_too_large() {
228
let page = CompressedDictPage::new(
229
CowBuffer::Owned(vec![]),
230
Compression::Uncompressed,
231
i32::MAX as usize + 1,
232
100,
233
false,
234
);
235
assert!(assemble_dict_page_header(&page).is_err());
236
}
237
238
#[test]
239
fn dict_too_many_values() {
240
let page = CompressedDictPage::new(
241
CowBuffer::Owned(vec![]),
242
Compression::Uncompressed,
243
0,
244
i32::MAX as usize + 1,
245
false,
246
);
247
assert!(assemble_dict_page_header(&page).is_err());
248
}
249
}
250
251