Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/write/compression.rs
6940 views
1
use crate::parquet::compression::CompressionOptions;
2
use crate::parquet::error::{ParquetError, ParquetResult};
3
use crate::parquet::page::{
4
CompressedDataPage, CompressedDictPage, CompressedPage, DataPage, DataPageHeader, DictPage,
5
Page,
6
};
7
use crate::parquet::{CowBuffer, FallibleStreamingIterator, compression};
8
9
/// Compresses a [`DataPage`] into a [`CompressedDataPage`].
10
fn compress_data(
11
page: DataPage,
12
mut compressed_buffer: Vec<u8>,
13
compression: CompressionOptions,
14
) -> ParquetResult<CompressedDataPage> {
15
let DataPage {
16
mut buffer,
17
header,
18
descriptor,
19
num_rows,
20
} = page;
21
let uncompressed_page_size = buffer.len();
22
let num_rows = num_rows.expect("We should have num_rows when we are writing");
23
if compression != CompressionOptions::Uncompressed {
24
match &header {
25
DataPageHeader::V1(_) => {
26
compression::compress(compression, &buffer, &mut compressed_buffer)?;
27
},
28
DataPageHeader::V2(header) => {
29
let levels_byte_length = (header.repetition_levels_byte_length
30
+ header.definition_levels_byte_length)
31
as usize;
32
compressed_buffer.extend_from_slice(&buffer[..levels_byte_length]);
33
compression::compress(
34
compression,
35
&buffer[levels_byte_length..],
36
&mut compressed_buffer,
37
)?;
38
},
39
};
40
} else {
41
std::mem::swap(buffer.to_mut(), &mut compressed_buffer);
42
}
43
44
Ok(CompressedDataPage::new(
45
header,
46
CowBuffer::Owned(compressed_buffer),
47
compression.into(),
48
uncompressed_page_size,
49
descriptor,
50
num_rows,
51
))
52
}
53
54
fn compress_dict(
55
page: DictPage,
56
mut compressed_buffer: Vec<u8>,
57
compression: CompressionOptions,
58
) -> ParquetResult<CompressedDictPage> {
59
let DictPage {
60
buffer,
61
num_values,
62
is_sorted,
63
} = page;
64
65
let uncompressed_page_size = buffer.len();
66
let compressed_buffer = if compression != CompressionOptions::Uncompressed {
67
compression::compress(compression, &buffer, &mut compressed_buffer)?;
68
CowBuffer::Owned(compressed_buffer)
69
} else {
70
buffer
71
};
72
73
Ok(CompressedDictPage::new(
74
compressed_buffer,
75
compression.into(),
76
uncompressed_page_size,
77
num_values,
78
is_sorted,
79
))
80
}
81
82
/// Compresses an [`EncodedPage`] into a [`CompressedPage`] using `compressed_buffer` as the
83
/// intermediary buffer.
84
///
85
/// `compressed_buffer` is taken by value because it becomes owned by [`CompressedPage`]
86
///
87
/// # Errors
88
/// Errors if the compressor fails
89
pub fn compress(
90
page: Page,
91
compressed_buffer: Vec<u8>,
92
compression: CompressionOptions,
93
) -> ParquetResult<CompressedPage> {
94
match page {
95
Page::Data(page) => {
96
compress_data(page, compressed_buffer, compression).map(CompressedPage::Data)
97
},
98
Page::Dict(page) => {
99
compress_dict(page, compressed_buffer, compression).map(CompressedPage::Dict)
100
},
101
}
102
}
103
104
/// A [`FallibleStreamingIterator`] that consumes [`Page`] and yields [`CompressedPage`]
105
/// holding a reusable buffer ([`Vec<u8>`]) for compression.
106
pub struct Compressor<I: Iterator<Item = ParquetResult<Page>>> {
107
iter: I,
108
compression: CompressionOptions,
109
buffer: Vec<u8>,
110
current: Option<CompressedPage>,
111
}
112
113
impl<I: Iterator<Item = ParquetResult<Page>>> Compressor<I> {
114
/// Creates a new [`Compressor`]
115
pub fn new(iter: I, compression: CompressionOptions, buffer: Vec<u8>) -> Self {
116
Self {
117
iter,
118
compression,
119
buffer,
120
current: None,
121
}
122
}
123
124
/// Creates a new [`Compressor`] (same as `new`)
125
pub fn new_from_vec(iter: I, compression: CompressionOptions, buffer: Vec<u8>) -> Self {
126
Self::new(iter, compression, buffer)
127
}
128
129
/// Deconstructs itself into its iterator and scratch buffer.
130
pub fn into_inner(mut self) -> (I, Vec<u8>) {
131
let mut buffer = if let Some(page) = self.current.as_mut() {
132
std::mem::take(page.buffer_mut())
133
} else {
134
std::mem::take(&mut self.buffer)
135
};
136
buffer.clear();
137
(self.iter, buffer)
138
}
139
}
140
141
impl<I: Iterator<Item = ParquetResult<Page>>> FallibleStreamingIterator for Compressor<I> {
142
type Item = CompressedPage;
143
type Error = ParquetError;
144
145
fn advance(&mut self) -> std::result::Result<(), Self::Error> {
146
let mut compressed_buffer = if let Some(page) = self.current.as_mut() {
147
std::mem::take(page.buffer_mut())
148
} else {
149
std::mem::take(&mut self.buffer)
150
};
151
compressed_buffer.clear();
152
153
let next = self
154
.iter
155
.next()
156
.map(|x| x.and_then(|page| compress(page, compressed_buffer, self.compression)))
157
.transpose()?;
158
self.current = next;
159
Ok(())
160
}
161
162
fn get(&self) -> Option<&Self::Item> {
163
self.current.as_ref()
164
}
165
}
166
167
impl<I: Iterator<Item = ParquetResult<Page>>> Iterator for Compressor<I> {
168
type Item = ParquetResult<CompressedPage>;
169
170
fn next(&mut self) -> Option<Self::Item> {
171
let mut compressed_buffer = if let Some(page) = self.current.as_mut() {
172
std::mem::take(page.buffer_mut())
173
} else {
174
std::mem::take(&mut self.buffer)
175
};
176
compressed_buffer.clear();
177
178
let page = self.iter.next()?;
179
let page = match page {
180
Ok(page) => page,
181
Err(err) => return Some(Err(err)),
182
};
183
184
Some(compress(page, compressed_buffer, self.compression))
185
}
186
}
187
188