Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/write/row_group.rs
8512 views
1
use std::io::Write;
2
3
#[cfg(feature = "async")]
4
use futures::AsyncWrite;
5
use polars_parquet_format::{ColumnChunk, RowGroup};
6
7
use super::column_chunk::write_column_chunk;
8
#[cfg(feature = "async")]
9
use super::column_chunk::write_column_chunk_async;
10
use super::page::{PageWriteSpec, is_data_page};
11
use super::{DynIter, DynStreamingIterator};
12
use crate::parquet::error::{ParquetError, ParquetResult};
13
use crate::parquet::metadata::{ColumnChunkMetadata, ColumnDescriptor};
14
use crate::parquet::page::CompressedPage;
15
16
pub struct ColumnOffsetsMetadata {
17
pub dictionary_page_offset: Option<i64>,
18
pub data_page_offset: Option<i64>,
19
}
20
21
impl ColumnOffsetsMetadata {
22
pub fn from_column_chunk(column_chunk: &ColumnChunk) -> ColumnOffsetsMetadata {
23
ColumnOffsetsMetadata {
24
dictionary_page_offset: column_chunk
25
.meta_data
26
.as_ref()
27
.map(|meta| meta.dictionary_page_offset)
28
.unwrap_or(None),
29
data_page_offset: column_chunk
30
.meta_data
31
.as_ref()
32
.map(|meta| meta.data_page_offset),
33
}
34
}
35
36
pub fn from_column_chunk_metadata(
37
column_chunk_metadata: &ColumnChunkMetadata,
38
) -> ColumnOffsetsMetadata {
39
ColumnOffsetsMetadata {
40
dictionary_page_offset: column_chunk_metadata.dictionary_page_offset(),
41
data_page_offset: Some(column_chunk_metadata.data_page_offset()),
42
}
43
}
44
45
pub fn calc_row_group_file_offset(&self) -> Option<i64> {
46
self.dictionary_page_offset
47
.filter(|x| *x > 0_i64)
48
.or(self.data_page_offset)
49
}
50
}
51
52
fn compute_num_rows(columns: &[(ColumnChunk, Vec<PageWriteSpec>)]) -> ParquetResult<i64> {
53
columns
54
.first()
55
.map(|(_, specs)| {
56
let mut num_rows = 0;
57
specs
58
.iter()
59
.filter(|x| is_data_page(x))
60
.try_for_each(|spec| {
61
num_rows += spec.num_rows as i64;
62
ParquetResult::Ok(())
63
})?;
64
ParquetResult::Ok(num_rows)
65
})
66
.unwrap_or(Ok(0))
67
}
68
69
pub fn write_row_group<
70
'a,
71
W,
72
E, // external error any of the iterators may emit
73
>(
74
writer: &mut W,
75
num_rows: u64,
76
mut offset: u64,
77
descriptors: &[ColumnDescriptor],
78
columns: DynIter<'a, std::result::Result<DynStreamingIterator<'a, CompressedPage, E>, E>>,
79
ordinal: usize,
80
) -> ParquetResult<(RowGroup, Vec<Vec<PageWriteSpec>>, u64)>
81
where
82
W: Write,
83
ParquetError: From<E>,
84
E: std::error::Error,
85
{
86
let column_iter = descriptors.iter().zip(columns);
87
88
let initial = offset;
89
let columns = column_iter
90
.map(|(descriptor, page_iter)| {
91
let (column, page_specs, size) =
92
write_column_chunk(writer, offset, descriptor, page_iter?)?;
93
offset += size;
94
Ok((column, page_specs))
95
})
96
.collect::<ParquetResult<Vec<_>>>()?;
97
let bytes_written = offset - initial;
98
99
let num_rows = if num_rows != u64::MAX
100
&& let Ok(v) = i64::try_from(num_rows)
101
{
102
if cfg!(debug_assertions) {
103
let inferred = compute_num_rows(&columns)?;
104
assert!(v == inferred || (columns.is_empty() && inferred == 0));
105
}
106
v
107
} else {
108
compute_num_rows(&columns)?
109
};
110
111
// compute row group stats
112
let file_offset = columns
113
.first()
114
.map(|(column_chunk, _)| {
115
ColumnOffsetsMetadata::from_column_chunk(column_chunk).calc_row_group_file_offset()
116
})
117
.unwrap_or(None);
118
119
let total_byte_size = columns
120
.iter()
121
.map(|(c, _)| c.meta_data.as_ref().unwrap().total_uncompressed_size)
122
.sum();
123
let total_compressed_size = columns
124
.iter()
125
.map(|(c, _)| c.meta_data.as_ref().unwrap().total_compressed_size)
126
.sum();
127
128
let (columns, specs) = columns.into_iter().unzip();
129
130
Ok((
131
RowGroup {
132
columns,
133
total_byte_size,
134
num_rows,
135
sorting_columns: None,
136
file_offset,
137
total_compressed_size: Some(total_compressed_size),
138
ordinal: ordinal.try_into().ok(),
139
},
140
specs,
141
bytes_written,
142
))
143
}
144
145
#[cfg(feature = "async")]
146
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
147
pub async fn write_row_group_async<
148
'a,
149
W,
150
E, // external error any of the iterators may emit
151
>(
152
writer: &mut W,
153
mut offset: u64,
154
descriptors: &[ColumnDescriptor],
155
columns: DynIter<'a, std::result::Result<DynStreamingIterator<'a, CompressedPage, E>, E>>,
156
ordinal: usize,
157
) -> ParquetResult<(RowGroup, Vec<Vec<PageWriteSpec>>, u64)>
158
where
159
W: AsyncWrite + Unpin + Send,
160
ParquetError: From<E>,
161
E: std::error::Error,
162
{
163
let column_iter = descriptors.iter().zip(columns);
164
165
let initial = offset;
166
let mut columns = vec![];
167
for (descriptor, page_iter) in column_iter {
168
let (column, page_specs, size) =
169
write_column_chunk_async(writer, offset, descriptor, page_iter?).await?;
170
offset += size;
171
columns.push((column, page_specs));
172
}
173
let bytes_written = offset - initial;
174
175
let num_rows = compute_num_rows(&columns)?;
176
177
// compute row group stats
178
let file_offset = columns
179
.first()
180
.map(|(column_chunk, _)| {
181
ColumnOffsetsMetadata::from_column_chunk(column_chunk).calc_row_group_file_offset()
182
})
183
.unwrap_or(None);
184
185
let total_byte_size = columns
186
.iter()
187
.map(|(c, _)| c.meta_data.as_ref().unwrap().total_uncompressed_size)
188
.sum();
189
let total_compressed_size = columns
190
.iter()
191
.map(|(c, _)| c.meta_data.as_ref().unwrap().total_compressed_size)
192
.sum();
193
194
let (columns, specs) = columns.into_iter().unzip();
195
196
Ok((
197
RowGroup {
198
columns,
199
total_byte_size,
200
num_rows: num_rows as i64,
201
sorting_columns: None,
202
file_offset,
203
total_compressed_size: Some(total_compressed_size),
204
ordinal: ordinal.try_into().ok(),
205
},
206
specs,
207
bytes_written,
208
))
209
}
210
211