Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/read/column/mod.rs
7885 views
1
use std::vec::IntoIter;
2
3
use polars_utils::idx_vec::UnitVec;
4
5
use super::{MemReader, PageReader, get_page_iterator};
6
use crate::parquet::error::{ParquetError, ParquetResult};
7
use crate::parquet::metadata::{ColumnChunkMetadata, RowGroupMetadata};
8
use crate::parquet::page::CompressedPage;
9
use crate::parquet::schema::types::ParquetType;
10
11
/// Returns a [`ColumnIterator`] of column chunks corresponding to `field`.
12
///
13
/// Contrarily to [`get_page_iterator`] that returns a single iterator of pages, this iterator
14
/// iterates over columns, one by one, and returns a [`PageReader`] per column.
15
/// For primitive fields (e.g. `i64`), [`ColumnIterator`] yields exactly one column.
16
/// For complex fields, it yields multiple columns.
17
/// `max_page_size` is the maximum number of bytes allowed.
18
pub fn get_column_iterator<'a>(
19
reader: MemReader,
20
row_group: &'a RowGroupMetadata,
21
field_name: &str,
22
max_page_size: usize,
23
) -> ColumnIterator<'a> {
24
let columns = row_group
25
.columns_under_root_iter(field_name)
26
.unwrap()
27
.rev()
28
.collect::<UnitVec<_>>();
29
ColumnIterator::new(reader, columns, max_page_size)
30
}
31
32
/// State of [`MutStreamingIterator`].
33
#[derive(Debug)]
34
pub enum State<T> {
35
/// Iterator still has elements
36
Some(T),
37
/// Iterator finished
38
Finished(Vec<u8>),
39
}
40
41
/// A special kind of fallible streaming iterator where `advance` consumes the iterator.
42
pub trait MutStreamingIterator: Sized {
43
type Item;
44
type Error;
45
46
fn advance(self) -> std::result::Result<State<Self>, Self::Error>;
47
fn get(&mut self) -> Option<&mut Self::Item>;
48
}
49
50
/// A [`MutStreamingIterator`] that reads column chunks one by one,
51
/// returning a [`PageReader`] per column.
52
pub struct ColumnIterator<'a> {
53
reader: MemReader,
54
columns: UnitVec<&'a ColumnChunkMetadata>,
55
max_page_size: usize,
56
}
57
58
impl<'a> ColumnIterator<'a> {
59
/// Returns a new [`ColumnIterator`]
60
/// `max_page_size` is the maximum allowed page size
61
pub fn new(
62
reader: MemReader,
63
columns: UnitVec<&'a ColumnChunkMetadata>,
64
max_page_size: usize,
65
) -> Self {
66
Self {
67
reader,
68
columns,
69
max_page_size,
70
}
71
}
72
}
73
74
impl<'a> Iterator for ColumnIterator<'a> {
75
type Item = ParquetResult<(PageReader, &'a ColumnChunkMetadata)>;
76
77
fn next(&mut self) -> Option<Self::Item> {
78
if self.columns.is_empty() {
79
return None;
80
};
81
let column = self.columns.pop().unwrap();
82
83
let iter =
84
match get_page_iterator(column, self.reader.clone(), Vec::new(), self.max_page_size) {
85
Err(e) => return Some(Err(e)),
86
Ok(v) => v,
87
};
88
Some(Ok((iter, column)))
89
}
90
}
91
92
/// A [`MutStreamingIterator`] of pre-read column chunks
93
#[derive(Debug)]
94
pub struct ReadColumnIterator {
95
field: ParquetType,
96
chunks: Vec<(
97
Vec<Result<CompressedPage, ParquetError>>,
98
ColumnChunkMetadata,
99
)>,
100
current: Option<(
101
IntoIter<Result<CompressedPage, ParquetError>>,
102
ColumnChunkMetadata,
103
)>,
104
}
105
106
impl ReadColumnIterator {
107
/// Returns a new [`ReadColumnIterator`]
108
pub fn new(
109
field: ParquetType,
110
chunks: Vec<(
111
Vec<Result<CompressedPage, ParquetError>>,
112
ColumnChunkMetadata,
113
)>,
114
) -> Self {
115
Self {
116
field,
117
chunks,
118
current: None,
119
}
120
}
121
}
122
123
impl MutStreamingIterator for ReadColumnIterator {
124
type Item = (
125
IntoIter<Result<CompressedPage, ParquetError>>,
126
ColumnChunkMetadata,
127
);
128
type Error = ParquetError;
129
130
fn advance(mut self) -> Result<State<Self>, ParquetError> {
131
if self.chunks.is_empty() {
132
return Ok(State::Finished(vec![]));
133
}
134
self.current = self
135
.chunks
136
.pop()
137
.map(|(pages, meta)| (pages.into_iter(), meta));
138
Ok(State::Some(Self {
139
field: self.field,
140
chunks: self.chunks,
141
current: self.current,
142
}))
143
}
144
145
fn get(&mut self) -> Option<&mut Self::Item> {
146
self.current.as_mut()
147
}
148
}
149
150