Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/write/row_group.rs
8480 views
1
use arrow::array::Array;
2
use arrow::datatypes::ArrowSchema;
3
use arrow::record_batch::RecordBatchT;
4
use polars_buffer::Buffer;
5
use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
6
7
use super::{
8
DynIter, DynStreamingIterator, Encoding, RowGroupIterColumns, SchemaDescriptor, WriteOptions,
9
array_to_columns, to_parquet_schema,
10
};
11
use crate::parquet::FallibleStreamingIterator;
12
use crate::parquet::error::ParquetError;
13
use crate::parquet::schema::types::ParquetType;
14
use crate::parquet::write::Compressor;
15
16
/// Maps a [`RecordBatchT`] and parquet-specific options to an [`RowGroupIterColumns`] used to
17
/// write to parquet
18
/// # Panics
19
/// Iff
20
/// * `encodings.len() != fields.len()` or
21
/// * `encodings.len() != chunk.arrays().len()`
22
pub fn row_group_iter<A: AsRef<dyn Array> + 'static + Send + Sync>(
23
chunk: RecordBatchT<A>,
24
encodings: Buffer<Vec<Encoding>>,
25
fields: Vec<ParquetType>,
26
options: WriteOptions,
27
) -> RowGroupIterColumns<'static, PolarsError> {
28
assert_eq!(encodings.len(), fields.len());
29
assert_eq!(encodings.len(), chunk.arrays().len());
30
DynIter::new(
31
chunk
32
.into_arrays()
33
.into_iter()
34
.zip(fields)
35
.enumerate()
36
.flat_map(move |(i, (array, type_))| {
37
let encoding = encodings[i].as_slice();
38
let encoded_columns = array_to_columns(array, type_, options, encoding).unwrap();
39
encoded_columns
40
.into_iter()
41
.map(|encoded_pages| {
42
let pages = encoded_pages;
43
44
let pages = DynIter::new(
45
pages
46
.into_iter()
47
.map(|x| x.map_err(|e| ParquetError::oos(e.to_string()))),
48
);
49
50
let compressed_pages = Compressor::new(pages, options.compression, vec![])
51
.map_err(to_compute_err);
52
Ok(DynStreamingIterator::new(compressed_pages))
53
})
54
.collect::<Vec<_>>()
55
}),
56
)
57
}
58
59
/// An iterator adapter that converts an iterator over [`RecordBatchT`] into an iterator
60
/// of row groups.
61
/// Use it to create an iterator consumable by the parquet's API.
62
pub struct RowGroupIterator<
63
A: AsRef<dyn Array> + 'static,
64
I: Iterator<Item = PolarsResult<RecordBatchT<A>>>,
65
> {
66
iter: I,
67
options: WriteOptions,
68
parquet_schema: SchemaDescriptor,
69
encodings: Buffer<Vec<Encoding>>,
70
}
71
72
impl<A: AsRef<dyn Array> + 'static, I: Iterator<Item = PolarsResult<RecordBatchT<A>>>>
73
RowGroupIterator<A, I>
74
{
75
/// Creates a new [`RowGroupIterator`] from an iterator over [`RecordBatchT`].
76
///
77
/// # Errors
78
/// Iff
79
/// * the Arrow schema can't be converted to a valid Parquet schema.
80
/// * the length of the encodings is different from the number of fields in schema
81
pub fn try_new(
82
iter: I,
83
schema: &ArrowSchema,
84
options: WriteOptions,
85
encodings: Buffer<Vec<Encoding>>,
86
) -> PolarsResult<Self> {
87
if encodings.len() != schema.len() {
88
polars_bail!(InvalidOperation:
89
"The number of encodings must equal the number of fields",
90
)
91
}
92
let parquet_schema = to_parquet_schema(schema)?;
93
94
Ok(Self {
95
iter,
96
options,
97
parquet_schema,
98
encodings,
99
})
100
}
101
102
/// Returns the [`SchemaDescriptor`] of the [`RowGroupIterator`].
103
pub fn parquet_schema(&self) -> &SchemaDescriptor {
104
&self.parquet_schema
105
}
106
}
107
108
impl<A: AsRef<dyn Array> + 'static + Send + Sync, I: Iterator<Item = PolarsResult<RecordBatchT<A>>>>
109
Iterator for RowGroupIterator<A, I>
110
{
111
type Item = PolarsResult<RowGroupIterColumns<'static, PolarsError>>;
112
113
fn next(&mut self) -> Option<Self::Item> {
114
let options = self.options;
115
116
self.iter.next().map(|maybe_chunk| {
117
let chunk = maybe_chunk?;
118
if self.encodings.len() != chunk.arrays().len() {
119
polars_bail!(InvalidOperation:
120
"The number of arrays in the chunk must equal the number of fields in the schema"
121
)
122
};
123
let encodings = self.encodings.clone();
124
Ok(row_group_iter(
125
chunk,
126
encodings,
127
self.parquet_schema.fields().to_vec(),
128
options,
129
))
130
})
131
}
132
}
133
134