Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/write/row_group.rs
6940 views
1
use std::io::Write;
2
3
#[cfg(feature = "async")]
4
use futures::AsyncWrite;
5
use polars_parquet_format::{ColumnChunk, RowGroup};
6
7
use super::column_chunk::write_column_chunk;
8
#[cfg(feature = "async")]
9
use super::column_chunk::write_column_chunk_async;
10
use super::page::{PageWriteSpec, is_data_page};
11
use super::{DynIter, DynStreamingIterator};
12
use crate::parquet::error::{ParquetError, ParquetResult};
13
use crate::parquet::metadata::{ColumnChunkMetadata, ColumnDescriptor};
14
use crate::parquet::page::CompressedPage;
15
16
pub struct ColumnOffsetsMetadata {
17
pub dictionary_page_offset: Option<i64>,
18
pub data_page_offset: Option<i64>,
19
}
20
21
impl ColumnOffsetsMetadata {
22
pub fn from_column_chunk(column_chunk: &ColumnChunk) -> ColumnOffsetsMetadata {
23
ColumnOffsetsMetadata {
24
dictionary_page_offset: column_chunk
25
.meta_data
26
.as_ref()
27
.map(|meta| meta.dictionary_page_offset)
28
.unwrap_or(None),
29
data_page_offset: column_chunk
30
.meta_data
31
.as_ref()
32
.map(|meta| meta.data_page_offset),
33
}
34
}
35
36
pub fn from_column_chunk_metadata(
37
column_chunk_metadata: &ColumnChunkMetadata,
38
) -> ColumnOffsetsMetadata {
39
ColumnOffsetsMetadata {
40
dictionary_page_offset: column_chunk_metadata.dictionary_page_offset(),
41
data_page_offset: Some(column_chunk_metadata.data_page_offset()),
42
}
43
}
44
45
pub fn calc_row_group_file_offset(&self) -> Option<i64> {
46
self.dictionary_page_offset
47
.filter(|x| *x > 0_i64)
48
.or(self.data_page_offset)
49
}
50
}
51
52
fn compute_num_rows(columns: &[(ColumnChunk, Vec<PageWriteSpec>)]) -> ParquetResult<i64> {
53
columns
54
.first()
55
.map(|(_, specs)| {
56
let mut num_rows = 0;
57
specs
58
.iter()
59
.filter(|x| is_data_page(x))
60
.try_for_each(|spec| {
61
num_rows += spec.num_rows as i64;
62
ParquetResult::Ok(())
63
})?;
64
ParquetResult::Ok(num_rows)
65
})
66
.unwrap_or(Ok(0))
67
}
68
69
pub fn write_row_group<
70
'a,
71
W,
72
E, // external error any of the iterators may emit
73
>(
74
writer: &mut W,
75
mut offset: u64,
76
descriptors: &[ColumnDescriptor],
77
columns: DynIter<'a, std::result::Result<DynStreamingIterator<'a, CompressedPage, E>, E>>,
78
ordinal: usize,
79
) -> ParquetResult<(RowGroup, Vec<Vec<PageWriteSpec>>, u64)>
80
where
81
W: Write,
82
ParquetError: From<E>,
83
E: std::error::Error,
84
{
85
let column_iter = descriptors.iter().zip(columns);
86
87
let initial = offset;
88
let columns = column_iter
89
.map(|(descriptor, page_iter)| {
90
let (column, page_specs, size) =
91
write_column_chunk(writer, offset, descriptor, page_iter?)?;
92
offset += size;
93
Ok((column, page_specs))
94
})
95
.collect::<ParquetResult<Vec<_>>>()?;
96
let bytes_written = offset - initial;
97
98
let num_rows = compute_num_rows(&columns)?;
99
100
// compute row group stats
101
let file_offset = columns
102
.first()
103
.map(|(column_chunk, _)| {
104
ColumnOffsetsMetadata::from_column_chunk(column_chunk).calc_row_group_file_offset()
105
})
106
.unwrap_or(None);
107
108
let total_byte_size = columns
109
.iter()
110
.map(|(c, _)| c.meta_data.as_ref().unwrap().total_uncompressed_size)
111
.sum();
112
let total_compressed_size = columns
113
.iter()
114
.map(|(c, _)| c.meta_data.as_ref().unwrap().total_compressed_size)
115
.sum();
116
117
let (columns, specs) = columns.into_iter().unzip();
118
119
Ok((
120
RowGroup {
121
columns,
122
total_byte_size,
123
num_rows,
124
sorting_columns: None,
125
file_offset,
126
total_compressed_size: Some(total_compressed_size),
127
ordinal: ordinal.try_into().ok(),
128
},
129
specs,
130
bytes_written,
131
))
132
}
133
134
#[cfg(feature = "async")]
135
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
136
pub async fn write_row_group_async<
137
'a,
138
W,
139
E, // external error any of the iterators may emit
140
>(
141
writer: &mut W,
142
mut offset: u64,
143
descriptors: &[ColumnDescriptor],
144
columns: DynIter<'a, std::result::Result<DynStreamingIterator<'a, CompressedPage, E>, E>>,
145
ordinal: usize,
146
) -> ParquetResult<(RowGroup, Vec<Vec<PageWriteSpec>>, u64)>
147
where
148
W: AsyncWrite + Unpin + Send,
149
ParquetError: From<E>,
150
E: std::error::Error,
151
{
152
let column_iter = descriptors.iter().zip(columns);
153
154
let initial = offset;
155
let mut columns = vec![];
156
for (descriptor, page_iter) in column_iter {
157
let (column, page_specs, size) =
158
write_column_chunk_async(writer, offset, descriptor, page_iter?).await?;
159
offset += size;
160
columns.push((column, page_specs));
161
}
162
let bytes_written = offset - initial;
163
164
let num_rows = compute_num_rows(&columns)?;
165
166
// compute row group stats
167
let file_offset = columns
168
.first()
169
.map(|(column_chunk, _)| {
170
ColumnOffsetsMetadata::from_column_chunk(column_chunk).calc_row_group_file_offset()
171
})
172
.unwrap_or(None);
173
174
let total_byte_size = columns
175
.iter()
176
.map(|(c, _)| c.meta_data.as_ref().unwrap().total_uncompressed_size)
177
.sum();
178
let total_compressed_size = columns
179
.iter()
180
.map(|(c, _)| c.meta_data.as_ref().unwrap().total_compressed_size)
181
.sum();
182
183
let (columns, specs) = columns.into_iter().unzip();
184
185
Ok((
186
RowGroup {
187
columns,
188
total_byte_size,
189
num_rows: num_rows as i64,
190
sorting_columns: None,
191
file_offset,
192
total_compressed_size: Some(total_compressed_size),
193
ordinal: ordinal.try_into().ok(),
194
},
195
specs,
196
bytes_written,
197
))
198
}
199
200