Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/write/nested/mod.rs
6940 views
1
mod dremel;
2
3
pub use dremel::num_values;
4
use polars_error::PolarsResult;
5
6
use super::Nested;
7
use crate::parquet::encoding::hybrid_rle::encode;
8
use crate::parquet::read::levels::get_bit_width;
9
use crate::parquet::write::Version;
10
11
fn write_levels_v1<F: FnOnce(&mut Vec<u8>) -> PolarsResult<()>>(
12
buffer: &mut Vec<u8>,
13
encode: F,
14
) -> PolarsResult<()> {
15
buffer.extend_from_slice(&[0; 4]);
16
let start = buffer.len();
17
18
encode(buffer)?;
19
20
let end = buffer.len();
21
let length = end - start;
22
23
// write the first 4 bytes as length
24
let length = (length as i32).to_le_bytes();
25
(0..4).for_each(|i| buffer[start - 4 + i] = length[i]);
26
Ok(())
27
}
28
29
/// writes the rep levels to a `Vec<u8>`.
30
fn write_rep_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -> PolarsResult<()> {
31
let max_level = max_rep_level(nested) as i16;
32
if max_level == 0 {
33
return Ok(());
34
}
35
let num_bits = get_bit_width(max_level);
36
37
let levels = dremel::BufferedDremelIter::new(nested).map(|d| u32::from(d.rep));
38
39
match version {
40
Version::V1 => {
41
write_levels_v1(buffer, |buffer: &mut Vec<u8>| {
42
encode::<u32, _, _>(buffer, levels, num_bits)?;
43
Ok(())
44
})?;
45
},
46
Version::V2 => {
47
encode::<u32, _, _>(buffer, levels, num_bits)?;
48
},
49
}
50
51
Ok(())
52
}
53
54
/// writes the def levels to a `Vec<u8>`.
55
fn write_def_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -> PolarsResult<()> {
56
let max_level = max_def_level(nested) as i16;
57
if max_level == 0 {
58
return Ok(());
59
}
60
let num_bits = get_bit_width(max_level);
61
62
let levels = dremel::BufferedDremelIter::new(nested).map(|d| u32::from(d.def));
63
64
match version {
65
Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec<u8>| {
66
encode::<u32, _, _>(buffer, levels, num_bits)?;
67
Ok(())
68
}),
69
Version::V2 => Ok(encode::<u32, _, _>(buffer, levels, num_bits)?),
70
}
71
}
72
73
fn max_def_level(nested: &[Nested]) -> usize {
74
nested
75
.iter()
76
.map(|nested| match nested {
77
Nested::Primitive(nested) => nested.is_optional as usize,
78
Nested::List(nested) => 1 + (nested.is_optional as usize),
79
Nested::LargeList(nested) => 1 + (nested.is_optional as usize),
80
Nested::Struct(nested) => nested.is_optional as usize,
81
Nested::FixedSizeList(nested) => 1 + nested.is_optional as usize,
82
})
83
.sum()
84
}
85
86
fn max_rep_level(nested: &[Nested]) -> usize {
87
nested
88
.iter()
89
.map(|nested| match nested {
90
Nested::FixedSizeList(_) | Nested::LargeList(_) | Nested::List(_) => 1,
91
Nested::Primitive(_) | Nested::Struct(_) => 0,
92
})
93
.sum()
94
}
95
96
/// Write `repetition_levels` and `definition_levels` to buffer.
97
pub fn write_rep_and_def(
98
page_version: Version,
99
nested: &[Nested],
100
buffer: &mut Vec<u8>,
101
) -> PolarsResult<(usize, usize)> {
102
write_rep_levels(buffer, nested, page_version)?;
103
let repetition_levels_byte_length = buffer.len();
104
105
write_def_levels(buffer, nested, page_version)?;
106
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;
107
108
Ok((repetition_levels_byte_length, definition_levels_byte_length))
109
}
110
111