Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/metadata/column_chunk_metadata.rs
6940 views
1
use polars_parquet_format::{ColumnChunk, ColumnMetaData, Encoding};
2
3
use super::column_descriptor::ColumnDescriptor;
4
use crate::parquet::compression::Compression;
5
use crate::parquet::error::{ParquetError, ParquetResult};
6
use crate::parquet::schema::types::PhysicalType;
7
use crate::parquet::statistics::Statistics;
8
9
#[cfg(feature = "serde")]
10
mod serde_types {
11
pub use std::io::Cursor;
12
13
pub use polars_parquet_format::thrift::protocol::{
14
TCompactInputProtocol, TCompactOutputProtocol,
15
};
16
pub use serde::de::Error as DeserializeError;
17
pub use serde::ser::Error as SerializeError;
18
pub use serde::{Deserialize, Deserializer, Serialize, Serializer};
19
}
20
#[cfg(feature = "serde")]
21
use serde_types::*;
22
23
/// Metadata for a column chunk.
24
///
25
/// This contains the `ColumnDescriptor` associated with the chunk so that deserializers have
26
/// access to the descriptor (e.g. physical, converted, logical).
27
///
28
/// This struct is intentionally not `Clone`, as it is a huge struct.
29
#[derive(Debug)]
30
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
31
pub struct ColumnChunkMetadata {
32
#[cfg_attr(feature = "serde", serde(serialize_with = "serialize_column_chunk"))]
33
#[cfg_attr(
34
feature = "serde",
35
serde(deserialize_with = "deserialize_column_chunk")
36
)]
37
column_chunk: ColumnChunk,
38
column_descr: ColumnDescriptor,
39
}
40
41
#[cfg(feature = "serde")]
42
fn serialize_column_chunk<S>(
43
column_chunk: &ColumnChunk,
44
serializer: S,
45
) -> std::result::Result<S::Ok, S::Error>
46
where
47
S: Serializer,
48
{
49
let mut buf = vec![];
50
let cursor = Cursor::new(&mut buf[..]);
51
let mut protocol = TCompactOutputProtocol::new(cursor);
52
column_chunk
53
.write_to_out_protocol(&mut protocol)
54
.map_err(S::Error::custom)?;
55
serializer.serialize_bytes(&buf)
56
}
57
58
#[cfg(feature = "serde")]
59
fn deserialize_column_chunk<'de, D>(deserializer: D) -> std::result::Result<ColumnChunk, D::Error>
60
where
61
D: Deserializer<'de>,
62
{
63
use polars_utils::pl_serialize::deserialize_map_bytes;
64
65
deserialize_map_bytes(deserializer, |b| {
66
let mut b = b.as_ref();
67
let mut protocol = TCompactInputProtocol::new(&mut b, usize::MAX);
68
ColumnChunk::read_from_in_protocol(&mut protocol).map_err(D::Error::custom)
69
})?
70
}
71
72
// Represents common operations for a column chunk.
73
impl ColumnChunkMetadata {
74
/// Returns a new [`ColumnChunkMetadata`]
75
pub fn new(column_chunk: ColumnChunk, column_descr: ColumnDescriptor) -> Self {
76
Self {
77
column_chunk,
78
column_descr,
79
}
80
}
81
82
/// File where the column chunk is stored.
83
///
84
/// If not set, assumed to belong to the same file as the metadata.
85
/// This path is relative to the current file.
86
pub fn file_path(&self) -> &Option<String> {
87
&self.column_chunk.file_path
88
}
89
90
/// Byte offset in `file_path()`.
91
pub fn file_offset(&self) -> i64 {
92
self.column_chunk.file_offset
93
}
94
95
/// Returns this column's [`ColumnChunk`]
96
pub fn column_chunk(&self) -> &ColumnChunk {
97
&self.column_chunk
98
}
99
100
/// The column's [`ColumnMetaData`]
101
pub fn metadata(&self) -> &ColumnMetaData {
102
self.column_chunk.meta_data.as_ref().unwrap()
103
}
104
105
/// The [`ColumnDescriptor`] for this column. This descriptor contains the physical and logical type
106
/// of the pages.
107
pub fn descriptor(&self) -> &ColumnDescriptor {
108
&self.column_descr
109
}
110
111
/// The [`PhysicalType`] of this column.
112
pub fn physical_type(&self) -> PhysicalType {
113
self.column_descr.descriptor.primitive_type.physical_type
114
}
115
116
/// Decodes the raw statistics into [`Statistics`].
117
pub fn statistics(&self) -> Option<ParquetResult<Statistics>> {
118
self.metadata().statistics.as_ref().map(|x| {
119
Statistics::deserialize(x, self.column_descr.descriptor.primitive_type.clone())
120
})
121
}
122
123
/// Total number of values in this column chunk. Note that this is not necessarily the number
124
/// of rows. E.g. the (nested) array `[[1, 2], [3]]` has 2 rows and 3 values.
125
pub fn num_values(&self) -> i64 {
126
self.metadata().num_values
127
}
128
129
/// [`Compression`] for this column.
130
pub fn compression(&self) -> Compression {
131
self.metadata().codec.try_into().unwrap()
132
}
133
134
/// Returns the total compressed data size of this column chunk.
135
pub fn compressed_size(&self) -> i64 {
136
self.metadata().total_compressed_size
137
}
138
139
/// Returns the total uncompressed data size of this column chunk.
140
pub fn uncompressed_size(&self) -> i64 {
141
self.metadata().total_uncompressed_size
142
}
143
144
/// Returns the offset for the column data.
145
pub fn data_page_offset(&self) -> i64 {
146
self.metadata().data_page_offset
147
}
148
149
/// Returns `true` if this column chunk contains a index page, `false` otherwise.
150
pub fn has_index_page(&self) -> bool {
151
self.metadata().index_page_offset.is_some()
152
}
153
154
/// Returns the offset for the index page.
155
pub fn index_page_offset(&self) -> Option<i64> {
156
self.metadata().index_page_offset
157
}
158
159
/// Returns the offset for the dictionary page, if any.
160
pub fn dictionary_page_offset(&self) -> Option<i64> {
161
self.metadata().dictionary_page_offset
162
}
163
164
/// Returns the encoding for this column
165
pub fn column_encoding(&self) -> &Vec<Encoding> {
166
&self.metadata().encodings
167
}
168
169
/// Returns the offset and length in bytes of the column chunk within the file
170
pub fn byte_range(&self) -> core::ops::Range<u64> {
171
// this has been validated in [`try_from_thrift`]
172
column_metadata_byte_range(self.metadata())
173
}
174
175
/// Method to convert from Thrift.
176
pub(crate) fn try_from_thrift(
177
column_descr: ColumnDescriptor,
178
column_chunk: ColumnChunk,
179
) -> ParquetResult<Self> {
180
// validate metadata
181
if let Some(meta) = &column_chunk.meta_data {
182
let _: u64 = meta.total_compressed_size.try_into()?;
183
184
if let Some(offset) = meta.dictionary_page_offset {
185
let _: u64 = offset.try_into()?;
186
}
187
let _: u64 = meta.data_page_offset.try_into()?;
188
189
let _: Compression = meta.codec.try_into()?;
190
} else {
191
return Err(ParquetError::oos("Column chunk requires metadata"));
192
}
193
194
Ok(Self {
195
column_chunk,
196
column_descr,
197
})
198
}
199
200
/// Method to convert to Thrift.
201
pub fn into_thrift(self) -> ColumnChunk {
202
self.column_chunk
203
}
204
}
205
206
pub(super) fn column_metadata_byte_range(
207
column_metadata: &ColumnMetaData,
208
) -> core::ops::Range<u64> {
209
let offset = if let Some(dict_page_offset) = column_metadata.dictionary_page_offset {
210
dict_page_offset as u64
211
} else {
212
column_metadata.data_page_offset as u64
213
};
214
let len = column_metadata.total_compressed_size as u64;
215
offset..offset.checked_add(len).unwrap()
216
}
217
218