Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/write/utils.rs
6940 views
1
use arrow::bitmap::Bitmap;
2
use arrow::datatypes::ArrowDataType;
3
use polars_error::*;
4
5
use super::{Version, WriteOptions};
6
use crate::parquet::CowBuffer;
7
use crate::parquet::compression::CompressionOptions;
8
use crate::parquet::encoding::Encoding;
9
use crate::parquet::encoding::hybrid_rle::{self, encode};
10
use crate::parquet::metadata::Descriptor;
11
use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2};
12
use crate::parquet::schema::types::PrimitiveType;
13
use crate::parquet::statistics::ParquetStatistics;
14
15
/// writes the def levels to a `Vec<u8>` and returns it.
16
pub fn write_def_levels(
17
writer: &mut Vec<u8>,
18
is_optional: bool,
19
validity: Option<&Bitmap>,
20
len: usize,
21
version: Version,
22
) -> PolarsResult<()> {
23
if is_optional {
24
match version {
25
Version::V1 => {
26
writer.extend(&[0, 0, 0, 0]);
27
let start = writer.len();
28
29
match validity {
30
None => <bool as hybrid_rle::Encoder<bool>>::run_length_encode(
31
writer, len, true, 1,
32
)?,
33
Some(validity) => encode::<bool, _, _>(writer, validity.iter(), 1)?,
34
}
35
36
// write the first 4 bytes as length
37
let length = ((writer.len() - start) as i32).to_le_bytes();
38
(0..4).for_each(|i| writer[start - 4 + i] = length[i]);
39
},
40
Version::V2 => match validity {
41
None => {
42
<bool as hybrid_rle::Encoder<bool>>::run_length_encode(writer, len, true, 1)?
43
},
44
Some(validity) => encode::<bool, _, _>(writer, validity.iter(), 1)?,
45
},
46
}
47
48
Ok(())
49
} else {
50
// is required => no def levels
51
Ok(())
52
}
53
}
54
55
#[allow(clippy::too_many_arguments)]
56
pub fn build_plain_page(
57
buffer: Vec<u8>,
58
num_values: usize,
59
num_rows: usize,
60
null_count: usize,
61
repetition_levels_byte_length: usize,
62
definition_levels_byte_length: usize,
63
statistics: Option<ParquetStatistics>,
64
type_: PrimitiveType,
65
options: WriteOptions,
66
encoding: Encoding,
67
) -> PolarsResult<DataPage> {
68
let header = match options.version {
69
Version::V1 => DataPageHeader::V1(DataPageHeaderV1 {
70
num_values: num_values as i32,
71
encoding: encoding.into(),
72
definition_level_encoding: Encoding::Rle.into(),
73
repetition_level_encoding: Encoding::Rle.into(),
74
statistics,
75
}),
76
Version::V2 => DataPageHeader::V2(DataPageHeaderV2 {
77
num_values: num_values as i32,
78
encoding: encoding.into(),
79
num_nulls: null_count as i32,
80
num_rows: num_rows as i32,
81
definition_levels_byte_length: definition_levels_byte_length as i32,
82
repetition_levels_byte_length: repetition_levels_byte_length as i32,
83
is_compressed: Some(options.compression != CompressionOptions::Uncompressed),
84
statistics,
85
}),
86
};
87
Ok(DataPage::new(
88
header,
89
CowBuffer::Owned(buffer),
90
Descriptor {
91
primitive_type: type_,
92
max_def_level: 0,
93
max_rep_level: 0,
94
},
95
num_rows,
96
))
97
}
98
99
/// Auxiliary iterator adapter to declare the size hint of an iterator.
100
pub(super) struct ExactSizedIter<T, I: Iterator<Item = T>> {
101
iter: I,
102
remaining: usize,
103
}
104
105
impl<T, I: Iterator<Item = T> + Clone> Clone for ExactSizedIter<T, I> {
106
fn clone(&self) -> Self {
107
Self {
108
iter: self.iter.clone(),
109
remaining: self.remaining,
110
}
111
}
112
}
113
114
impl<T, I: Iterator<Item = T>> ExactSizedIter<T, I> {
115
pub fn new(iter: I, length: usize) -> Self {
116
Self {
117
iter,
118
remaining: length,
119
}
120
}
121
}
122
123
impl<T, I: Iterator<Item = T>> Iterator for ExactSizedIter<T, I> {
124
type Item = T;
125
126
#[inline]
127
fn next(&mut self) -> Option<Self::Item> {
128
self.iter.next().inspect(|_| self.remaining -= 1)
129
}
130
131
#[inline]
132
fn size_hint(&self) -> (usize, Option<usize>) {
133
(self.remaining, Some(self.remaining))
134
}
135
}
136
137
impl<T, I: Iterator<Item = T>> std::iter::ExactSizeIterator for ExactSizedIter<T, I> {}
138
139
/// Returns the number of bits needed to bitpack `max`
140
#[inline]
141
pub fn get_bit_width(max: u64) -> u32 {
142
64 - max.leading_zeros()
143
}
144
145
pub(super) fn invalid_encoding(encoding: Encoding, dtype: &ArrowDataType) -> PolarsError {
146
polars_err!(InvalidOperation:
147
"Datatype {:?} cannot be encoded by {:?} encoding",
148
dtype,
149
encoding
150
)
151
}
152
153