Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/write/row_group.rs
6940 views
1
use arrow::array::Array;
2
use arrow::datatypes::ArrowSchema;
3
use arrow::record_batch::RecordBatchT;
4
use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
5
6
use super::{
7
ColumnWriteOptions, DynIter, DynStreamingIterator, RowGroupIterColumns, SchemaDescriptor,
8
WriteOptions, array_to_columns, to_parquet_schema,
9
};
10
use crate::parquet::FallibleStreamingIterator;
11
use crate::parquet::error::ParquetError;
12
use crate::parquet::schema::types::ParquetType;
13
use crate::parquet::write::Compressor;
14
15
/// Maps a [`RecordBatchT`] and parquet-specific options to an [`RowGroupIterColumns`] used to
16
/// write to parquet
17
/// # Panics
18
/// Iff
19
/// * `encodings.len() != fields.len()` or
20
/// * `encodings.len() != chunk.arrays().len()`
21
pub fn row_group_iter<A: AsRef<dyn Array> + 'static + Send + Sync>(
22
chunk: RecordBatchT<A>,
23
column_options: Vec<ColumnWriteOptions>,
24
fields: Vec<ParquetType>,
25
options: WriteOptions,
26
) -> RowGroupIterColumns<'static, PolarsError> {
27
assert_eq!(column_options.len(), fields.len());
28
assert_eq!(column_options.len(), chunk.arrays().len());
29
DynIter::new(
30
chunk
31
.into_arrays()
32
.into_iter()
33
.zip(fields)
34
.zip(column_options)
35
.flat_map(move |((array, type_), column_options)| {
36
let encoded_columns =
37
array_to_columns(array, type_, &column_options, options).unwrap();
38
encoded_columns
39
.into_iter()
40
.map(|encoded_pages| {
41
let pages = encoded_pages;
42
43
let pages = DynIter::new(
44
pages
45
.into_iter()
46
.map(|x| x.map_err(|e| ParquetError::oos(e.to_string()))),
47
);
48
49
let compressed_pages = Compressor::new(pages, options.compression, vec![])
50
.map_err(to_compute_err);
51
Ok(DynStreamingIterator::new(compressed_pages))
52
})
53
.collect::<Vec<_>>()
54
}),
55
)
56
}
57
58
/// An iterator adapter that converts an iterator over [`RecordBatchT`] into an iterator
59
/// of row groups.
60
/// Use it to create an iterator consumable by the parquet's API.
61
pub struct RowGroupIterator<
62
A: AsRef<dyn Array> + 'static,
63
I: Iterator<Item = PolarsResult<RecordBatchT<A>>>,
64
> {
65
iter: I,
66
options: WriteOptions,
67
parquet_schema: SchemaDescriptor,
68
column_options: Vec<ColumnWriteOptions>,
69
}
70
71
impl<A: AsRef<dyn Array> + 'static, I: Iterator<Item = PolarsResult<RecordBatchT<A>>>>
72
RowGroupIterator<A, I>
73
{
74
/// Creates a new [`RowGroupIterator`] from an iterator over [`RecordBatchT`].
75
///
76
/// # Errors
77
/// Iff
78
/// * the Arrow schema can't be converted to a valid Parquet schema.
79
/// * the length of the encodings is different from the number of fields in schema
80
pub fn try_new(
81
iter: I,
82
schema: &ArrowSchema,
83
options: WriteOptions,
84
column_options: Vec<ColumnWriteOptions>,
85
) -> PolarsResult<Self> {
86
if column_options.len() != schema.len() {
87
polars_bail!(InvalidOperation:
88
"The number of column options must equal the number of fields".to_string(),
89
)
90
}
91
let parquet_schema = to_parquet_schema(schema, &column_options)?;
92
93
Ok(Self {
94
iter,
95
options,
96
parquet_schema,
97
column_options,
98
})
99
}
100
101
/// Returns the [`SchemaDescriptor`] of the [`RowGroupIterator`].
102
pub fn parquet_schema(&self) -> &SchemaDescriptor {
103
&self.parquet_schema
104
}
105
}
106
107
impl<A: AsRef<dyn Array> + 'static + Send + Sync, I: Iterator<Item = PolarsResult<RecordBatchT<A>>>>
108
Iterator for RowGroupIterator<A, I>
109
{
110
type Item = PolarsResult<RowGroupIterColumns<'static, PolarsError>>;
111
112
fn next(&mut self) -> Option<Self::Item> {
113
let options = self.options;
114
115
self.iter.next().map(|maybe_chunk| {
116
let chunk = maybe_chunk?;
117
if self.column_options.len() != chunk.arrays().len() {
118
polars_bail!(InvalidOperation:
119
"The number of arrays in the chunk must equal the number of fields in the schema"
120
)
121
};
122
let encodings = self.column_options.clone();
123
Ok(row_group_iter(
124
chunk,
125
encodings,
126
self.parquet_schema.fields().to_vec(),
127
options,
128
))
129
})
130
}
131
}
132
133