Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/read/column/mod.rs
8512 views
1
use std::io::Cursor;
2
use std::vec::IntoIter;
3
4
use polars_buffer::Buffer;
5
use polars_utils::idx_vec::UnitVec;
6
7
use super::{PageReader, get_page_iterator};
8
use crate::parquet::error::{ParquetError, ParquetResult};
9
use crate::parquet::metadata::{ColumnChunkMetadata, RowGroupMetadata};
10
use crate::parquet::page::CompressedPage;
11
use crate::parquet::schema::types::ParquetType;
12
13
/// Returns a [`ColumnIterator`] of column chunks corresponding to `field`.
14
///
15
/// Contrarily to [`get_page_iterator`] that returns a single iterator of pages, this iterator
16
/// iterates over columns, one by one, and returns a [`PageReader`] per column.
17
/// For primitive fields (e.g. `i64`), [`ColumnIterator`] yields exactly one column.
18
/// For complex fields, it yields multiple columns.
19
/// `max_page_size` is the maximum number of bytes allowed.
20
pub fn get_column_iterator<'a>(
21
reader: Cursor<Buffer<u8>>,
22
row_group: &'a RowGroupMetadata,
23
field_name: &str,
24
max_page_size: usize,
25
) -> ColumnIterator<'a> {
26
let columns = row_group
27
.columns_under_root_iter(field_name)
28
.unwrap()
29
.rev()
30
.collect::<UnitVec<_>>();
31
ColumnIterator::new(reader, columns, max_page_size)
32
}
33
34
/// State of [`MutStreamingIterator`].
35
#[derive(Debug)]
36
pub enum State<T> {
37
/// Iterator still has elements
38
Some(T),
39
/// Iterator finished
40
Finished(Vec<u8>),
41
}
42
43
/// A special kind of fallible streaming iterator where `advance` consumes the iterator.
44
pub trait MutStreamingIterator: Sized {
45
type Item;
46
type Error;
47
48
fn advance(self) -> std::result::Result<State<Self>, Self::Error>;
49
fn get(&mut self) -> Option<&mut Self::Item>;
50
}
51
52
/// A [`MutStreamingIterator`] that reads column chunks one by one,
53
/// returning a [`PageReader`] per column.
54
pub struct ColumnIterator<'a> {
55
reader: Cursor<Buffer<u8>>,
56
columns: UnitVec<&'a ColumnChunkMetadata>,
57
max_page_size: usize,
58
}
59
60
impl<'a> ColumnIterator<'a> {
61
/// Returns a new [`ColumnIterator`]
62
/// `max_page_size` is the maximum allowed page size
63
pub fn new(
64
reader: Cursor<Buffer<u8>>,
65
columns: UnitVec<&'a ColumnChunkMetadata>,
66
max_page_size: usize,
67
) -> Self {
68
Self {
69
reader,
70
columns,
71
max_page_size,
72
}
73
}
74
}
75
76
impl<'a> Iterator for ColumnIterator<'a> {
77
type Item = ParquetResult<(PageReader, &'a ColumnChunkMetadata)>;
78
79
fn next(&mut self) -> Option<Self::Item> {
80
if self.columns.is_empty() {
81
return None;
82
};
83
let column = self.columns.pop().unwrap();
84
85
let iter =
86
match get_page_iterator(column, self.reader.clone(), Vec::new(), self.max_page_size) {
87
Err(e) => return Some(Err(e)),
88
Ok(v) => v,
89
};
90
Some(Ok((iter, column)))
91
}
92
}
93
94
/// A [`MutStreamingIterator`] of pre-read column chunks
95
#[derive(Debug)]
96
pub struct ReadColumnIterator {
97
field: ParquetType,
98
chunks: Vec<(
99
Vec<Result<CompressedPage, ParquetError>>,
100
ColumnChunkMetadata,
101
)>,
102
current: Option<(
103
IntoIter<Result<CompressedPage, ParquetError>>,
104
ColumnChunkMetadata,
105
)>,
106
}
107
108
impl ReadColumnIterator {
109
/// Returns a new [`ReadColumnIterator`]
110
pub fn new(
111
field: ParquetType,
112
chunks: Vec<(
113
Vec<Result<CompressedPage, ParquetError>>,
114
ColumnChunkMetadata,
115
)>,
116
) -> Self {
117
Self {
118
field,
119
chunks,
120
current: None,
121
}
122
}
123
}
124
125
impl MutStreamingIterator for ReadColumnIterator {
126
type Item = (
127
IntoIter<Result<CompressedPage, ParquetError>>,
128
ColumnChunkMetadata,
129
);
130
type Error = ParquetError;
131
132
fn advance(mut self) -> Result<State<Self>, ParquetError> {
133
if self.chunks.is_empty() {
134
return Ok(State::Finished(vec![]));
135
}
136
self.current = self
137
.chunks
138
.pop()
139
.map(|(pages, meta)| (pages.into_iter(), meta));
140
Ok(State::Some(Self {
141
field: self.field,
142
chunks: self.chunks,
143
current: self.current,
144
}))
145
}
146
147
fn get(&mut self) -> Option<&mut Self::Item> {
148
self.current.as_mut()
149
}
150
}
151
152