Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/metadata/row_metadata.rs
6940 views
1
use std::sync::Arc;
2
3
use hashbrown::hash_map::RawEntryMut;
4
use polars_parquet_format::{RowGroup, SortingColumn};
5
use polars_utils::aliases::{InitHashMaps, PlHashMap};
6
use polars_utils::idx_vec::UnitVec;
7
use polars_utils::pl_str::PlSmallStr;
8
use polars_utils::unitvec;
9
10
use super::column_chunk_metadata::{ColumnChunkMetadata, column_metadata_byte_range};
11
use super::schema_descriptor::SchemaDescriptor;
12
use crate::parquet::error::{ParquetError, ParquetResult};
13
14
type ColumnLookup = PlHashMap<PlSmallStr, UnitVec<usize>>;
15
16
trait InitColumnLookup {
17
fn add_column(&mut self, index: usize, column: &ColumnChunkMetadata);
18
}
19
20
impl InitColumnLookup for ColumnLookup {
21
#[inline(always)]
22
fn add_column(&mut self, index: usize, column: &ColumnChunkMetadata) {
23
let root_name = &column.descriptor().path_in_schema[0];
24
25
match self.raw_entry_mut().from_key(root_name) {
26
RawEntryMut::Vacant(slot) => {
27
slot.insert(root_name.clone(), unitvec![index]);
28
},
29
RawEntryMut::Occupied(mut slot) => {
30
slot.get_mut().push(index);
31
},
32
};
33
}
34
}
35
36
/// Metadata for a row group.
37
#[derive(Debug, Clone, Default)]
38
pub struct RowGroupMetadata {
39
// Moving of `ColumnChunkMetadata` is very expensive they are rather big. So, we arc the vec
40
// instead of having an arc slice. This way we don't to move the vec values into an arc when
41
// collecting.
42
columns: Arc<Vec<ColumnChunkMetadata>>,
43
column_lookup: PlHashMap<PlSmallStr, UnitVec<usize>>,
44
num_rows: usize,
45
total_byte_size: usize,
46
full_byte_range: core::ops::Range<u64>,
47
sorting_columns: Option<Vec<SortingColumn>>,
48
}
49
50
impl RowGroupMetadata {
51
#[inline(always)]
52
pub fn n_columns(&self) -> usize {
53
self.columns.len()
54
}
55
56
/// Fetch all columns under this root name if it exists.
57
pub fn columns_under_root_iter(
58
&self,
59
root_name: &str,
60
) -> Option<impl ExactSizeIterator<Item = &ColumnChunkMetadata> + DoubleEndedIterator> {
61
self.column_lookup
62
.get(root_name)
63
.map(|x| x.iter().map(|&x| &self.columns[x]))
64
}
65
66
/// Fetch all columns under this root name if it exists.
67
pub fn columns_idxs_under_root_iter<'a>(&'a self, root_name: &str) -> Option<&'a [usize]> {
68
self.column_lookup.get(root_name).map(|x| x.as_slice())
69
}
70
71
pub fn parquet_columns(&self) -> &[ColumnChunkMetadata] {
72
self.columns.as_ref().as_slice()
73
}
74
75
/// Number of rows in this row group.
76
pub fn num_rows(&self) -> usize {
77
self.num_rows
78
}
79
80
/// Total byte size of all uncompressed column data in this row group.
81
pub fn total_byte_size(&self) -> usize {
82
self.total_byte_size
83
}
84
85
/// Total size of all compressed column data in this row group.
86
pub fn compressed_size(&self) -> usize {
87
self.columns
88
.iter()
89
.map(|c| c.compressed_size() as usize)
90
.sum::<usize>()
91
}
92
93
pub fn full_byte_range(&self) -> core::ops::Range<u64> {
94
self.full_byte_range.clone()
95
}
96
97
pub fn byte_ranges_iter(&self) -> impl '_ + ExactSizeIterator<Item = core::ops::Range<u64>> {
98
self.columns.iter().map(|x| x.byte_range())
99
}
100
101
pub fn sorting_columns(&self) -> Option<&[SortingColumn]> {
102
self.sorting_columns.as_deref()
103
}
104
105
/// Method to convert from Thrift.
106
pub(crate) fn try_from_thrift(
107
schema_descr: &SchemaDescriptor,
108
rg: RowGroup,
109
) -> ParquetResult<RowGroupMetadata> {
110
if schema_descr.columns().len() != rg.columns.len() {
111
return Err(ParquetError::oos(format!(
112
"The number of columns in the row group ({}) must be equal to the number of columns in the schema ({})",
113
rg.columns.len(),
114
schema_descr.columns().len()
115
)));
116
}
117
let total_byte_size = rg.total_byte_size.try_into()?;
118
let num_rows = rg.num_rows.try_into()?;
119
120
let mut column_lookup = ColumnLookup::with_capacity(rg.columns.len());
121
let mut full_byte_range = if let Some(first_column_chunk) = rg.columns.first() {
122
let Some(metadata) = &first_column_chunk.meta_data else {
123
return Err(ParquetError::oos("Column chunk requires metadata"));
124
};
125
column_metadata_byte_range(metadata)
126
} else {
127
0..0
128
};
129
130
let sorting_columns = rg.sorting_columns.clone();
131
132
let columns = rg
133
.columns
134
.into_iter()
135
.zip(schema_descr.columns())
136
.enumerate()
137
.map(|(i, (column_chunk, descriptor))| {
138
let column =
139
ColumnChunkMetadata::try_from_thrift(descriptor.clone(), column_chunk)?;
140
141
column_lookup.add_column(i, &column);
142
143
let byte_range = column.byte_range();
144
full_byte_range = full_byte_range.start.min(byte_range.start)
145
..full_byte_range.end.max(byte_range.end);
146
147
Ok(column)
148
})
149
.collect::<ParquetResult<Vec<_>>>()?;
150
let columns = Arc::new(columns);
151
152
Ok(RowGroupMetadata {
153
columns,
154
column_lookup,
155
num_rows,
156
total_byte_size,
157
full_byte_range,
158
sorting_columns,
159
})
160
}
161
}
162
163