Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/write/binary/basic.rs
6940 views
1
use arrow::array::{Array, BinaryArray, ValueSize};
2
use arrow::bitmap::Bitmap;
3
use arrow::offset::Offset;
4
use polars_error::PolarsResult;
5
6
use super::super::{WriteOptions, utils};
7
use crate::arrow::read::schema::is_nullable;
8
use crate::parquet::encoding::{Encoding, delta_bitpacked};
9
use crate::parquet::schema::types::PrimitiveType;
10
use crate::parquet::statistics::{BinaryStatistics, ParquetStatistics};
11
use crate::write::utils::invalid_encoding;
12
use crate::write::{EncodeNullability, Page, StatisticsOptions};
13
14
pub(crate) fn encode_non_null_values<'a, I: Iterator<Item = &'a [u8]>>(
15
iter: I,
16
buffer: &mut Vec<u8>,
17
) {
18
iter.for_each(|x| {
19
// BYTE_ARRAY: first 4 bytes denote length in littleendian.
20
let len = (x.len() as u32).to_le_bytes();
21
buffer.extend_from_slice(&len);
22
buffer.extend_from_slice(x);
23
})
24
}
25
26
pub(crate) fn encode_plain<O: Offset>(
27
array: &BinaryArray<O>,
28
options: EncodeNullability,
29
buffer: &mut Vec<u8>,
30
) {
31
if options.is_optional() && array.validity().is_some() {
32
let len_before = buffer.len();
33
let capacity =
34
array.get_values_size() + (array.len() - array.null_count()) * size_of::<u32>();
35
buffer.reserve(capacity);
36
encode_non_null_values(array.non_null_values_iter(), buffer);
37
// Ensure we allocated properly.
38
debug_assert_eq!(buffer.len() - len_before, capacity);
39
} else {
40
let len_before = buffer.len();
41
let capacity = array.get_values_size() + array.len() * size_of::<u32>();
42
buffer.reserve(capacity);
43
encode_non_null_values(array.values_iter(), buffer);
44
// Ensure we allocated properly.
45
debug_assert_eq!(buffer.len() - len_before, capacity);
46
}
47
}
48
49
pub fn array_to_page<O: Offset>(
50
array: &BinaryArray<O>,
51
options: WriteOptions,
52
type_: PrimitiveType,
53
encoding: Encoding,
54
) -> PolarsResult<Page> {
55
let validity = array.validity();
56
let is_optional = is_nullable(&type_.field_info);
57
let encode_options = EncodeNullability::new(is_optional);
58
59
let mut buffer = vec![];
60
utils::write_def_levels(
61
&mut buffer,
62
is_optional,
63
validity,
64
array.len(),
65
options.version,
66
)?;
67
68
let definition_levels_byte_length = buffer.len();
69
70
match encoding {
71
Encoding::Plain => encode_plain(array, encode_options, &mut buffer),
72
Encoding::DeltaLengthByteArray => encode_delta(
73
array.values(),
74
array.offsets().buffer(),
75
array.validity(),
76
encode_options,
77
&mut buffer,
78
),
79
_ => return Err(invalid_encoding(encoding, array.dtype())),
80
}
81
82
let statistics = if options.has_statistics() {
83
Some(build_statistics(array, type_.clone(), &options.statistics))
84
} else {
85
None
86
};
87
88
utils::build_plain_page(
89
buffer,
90
array.len(),
91
array.len(),
92
array.null_count(),
93
0,
94
definition_levels_byte_length,
95
statistics,
96
type_,
97
options,
98
encoding,
99
)
100
.map(Page::Data)
101
}
102
103
pub(crate) fn build_statistics<O: Offset>(
104
array: &BinaryArray<O>,
105
primitive_type: PrimitiveType,
106
options: &StatisticsOptions,
107
) -> ParquetStatistics {
108
use polars_compute::min_max::MinMaxKernel;
109
110
BinaryStatistics {
111
primitive_type,
112
null_count: options.null_count.then_some(array.null_count() as i64),
113
distinct_count: None,
114
max_value: options
115
.max_value
116
.then(|| array.max_propagate_nan_kernel().map(<[u8]>::to_vec))
117
.flatten(),
118
min_value: options
119
.min_value
120
.then(|| array.min_propagate_nan_kernel().map(<[u8]>::to_vec))
121
.flatten(),
122
}
123
.serialize()
124
}
125
126
pub(crate) fn encode_delta<O: Offset>(
127
values: &[u8],
128
offsets: &[O],
129
validity: Option<&Bitmap>,
130
options: EncodeNullability,
131
buffer: &mut Vec<u8>,
132
) {
133
if options.is_optional() && validity.is_some() {
134
if let Some(validity) = validity {
135
let lengths = offsets
136
.windows(2)
137
.map(|w| (w[1] - w[0]).to_usize() as i64)
138
.zip(validity.iter())
139
.flat_map(|(x, is_valid)| if is_valid { Some(x) } else { None });
140
let length = offsets.len() - 1 - validity.unset_bits();
141
let lengths = utils::ExactSizedIter::new(lengths, length);
142
143
delta_bitpacked::encode(lengths, buffer, 1);
144
} else {
145
let lengths = offsets.windows(2).map(|w| (w[1] - w[0]).to_usize() as i64);
146
delta_bitpacked::encode(lengths, buffer, 1);
147
}
148
} else {
149
let lengths = offsets.windows(2).map(|w| (w[1] - w[0]).to_usize() as i64);
150
delta_bitpacked::encode(lengths, buffer, 1);
151
}
152
153
buffer.extend_from_slice(
154
&values[offsets.first().unwrap().to_usize()..offsets.last().unwrap().to_usize()],
155
)
156
}
157
158
/// Returns the ordering of two binary values. This corresponds to pyarrows' ordering
159
/// of statistics.
160
#[inline(always)]
161
pub(crate) fn ord_binary<'a>(a: &'a [u8], b: &'a [u8]) -> std::cmp::Ordering {
162
a.cmp(b)
163
}
164
165