Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/write/indexes/serialize.rs
6940 views
1
use polars_parquet_format::{BoundaryOrder, ColumnIndex, OffsetIndex, PageLocation};
2
3
use crate::parquet::error::{ParquetError, ParquetResult};
4
use crate::parquet::write::page::{PageWriteSpec, is_data_page};
5
6
pub fn serialize_column_index(pages: &[PageWriteSpec]) -> ParquetResult<ColumnIndex> {
7
let mut null_pages = Vec::with_capacity(pages.len());
8
let mut min_values = Vec::with_capacity(pages.len());
9
let mut max_values = Vec::with_capacity(pages.len());
10
let mut null_counts = Vec::with_capacity(pages.len());
11
12
pages
13
.iter()
14
.filter(|x| is_data_page(x))
15
.try_for_each(|spec| {
16
if let Some(stats) = &spec.statistics {
17
let stats = stats.serialize();
18
19
let null_count = stats
20
.null_count
21
.ok_or_else(|| ParquetError::oos("null count of a page is required"))?;
22
null_counts.push(null_count);
23
24
if let Some(min_value) = stats.min_value {
25
min_values.push(min_value);
26
max_values.push(
27
stats
28
.max_value
29
.ok_or_else(|| ParquetError::oos("max value of a page is required"))?,
30
);
31
null_pages.push(false)
32
} else {
33
min_values.push(vec![0]);
34
max_values.push(vec![0]);
35
null_pages.push(true)
36
}
37
38
ParquetResult::Ok(())
39
} else {
40
Err(ParquetError::oos(
41
"options were set to write statistics but some pages miss them",
42
))
43
}
44
})?;
45
Ok(ColumnIndex {
46
null_pages,
47
min_values,
48
max_values,
49
boundary_order: BoundaryOrder::UNORDERED,
50
null_counts: Some(null_counts),
51
repetition_level_histograms: None,
52
definition_level_histograms: None,
53
})
54
}
55
56
pub fn serialize_offset_index(pages: &[PageWriteSpec]) -> ParquetResult<OffsetIndex> {
57
let mut first_row_index = 0;
58
let page_locations = pages
59
.iter()
60
.filter(|x| is_data_page(x))
61
.map(|spec| {
62
let location = PageLocation {
63
offset: spec.offset.try_into()?,
64
compressed_page_size: spec.bytes_written.try_into()?,
65
first_row_index,
66
};
67
let num_rows = spec.num_rows;
68
first_row_index += num_rows as i64;
69
Ok(location)
70
})
71
.collect::<ParquetResult<Vec<_>>>()?;
72
73
Ok(OffsetIndex {
74
page_locations,
75
unencoded_byte_array_data_bytes: None,
76
})
77
}
78
79