Path: blob/main/crates/polars-parquet/src/parquet/read/column/mod.rs
8512 views
use std::io::Cursor;1use std::vec::IntoIter;23use polars_buffer::Buffer;4use polars_utils::idx_vec::UnitVec;56use super::{PageReader, get_page_iterator};7use crate::parquet::error::{ParquetError, ParquetResult};8use crate::parquet::metadata::{ColumnChunkMetadata, RowGroupMetadata};9use crate::parquet::page::CompressedPage;10use crate::parquet::schema::types::ParquetType;1112/// Returns a [`ColumnIterator`] of column chunks corresponding to `field`.13///14/// Contrarily to [`get_page_iterator`] that returns a single iterator of pages, this iterator15/// iterates over columns, one by one, and returns a [`PageReader`] per column.16/// For primitive fields (e.g. `i64`), [`ColumnIterator`] yields exactly one column.17/// For complex fields, it yields multiple columns.18/// `max_page_size` is the maximum number of bytes allowed.19pub fn get_column_iterator<'a>(20reader: Cursor<Buffer<u8>>,21row_group: &'a RowGroupMetadata,22field_name: &str,23max_page_size: usize,24) -> ColumnIterator<'a> {25let columns = row_group26.columns_under_root_iter(field_name)27.unwrap()28.rev()29.collect::<UnitVec<_>>();30ColumnIterator::new(reader, columns, max_page_size)31}3233/// State of [`MutStreamingIterator`].34#[derive(Debug)]35pub enum State<T> {36/// Iterator still has elements37Some(T),38/// Iterator finished39Finished(Vec<u8>),40}4142/// A special kind of fallible streaming iterator where `advance` consumes the iterator.43pub trait MutStreamingIterator: Sized {44type Item;45type Error;4647fn advance(self) -> std::result::Result<State<Self>, Self::Error>;48fn get(&mut self) -> Option<&mut Self::Item>;49}5051/// A [`MutStreamingIterator`] that reads column chunks one by one,52/// returning a [`PageReader`] per column.53pub struct ColumnIterator<'a> {54reader: Cursor<Buffer<u8>>,55columns: UnitVec<&'a ColumnChunkMetadata>,56max_page_size: usize,57}5859impl<'a> ColumnIterator<'a> {60/// Returns a new [`ColumnIterator`]61/// `max_page_size` is the maximum allowed page size62pub fn new(63reader: Cursor<Buffer<u8>>,64columns: UnitVec<&'a ColumnChunkMetadata>,65max_page_size: usize,66) -> Self {67Self {68reader,69columns,70max_page_size,71}72}73}7475impl<'a> Iterator for ColumnIterator<'a> {76type Item = ParquetResult<(PageReader, &'a ColumnChunkMetadata)>;7778fn next(&mut self) -> Option<Self::Item> {79if self.columns.is_empty() {80return None;81};82let column = self.columns.pop().unwrap();8384let iter =85match get_page_iterator(column, self.reader.clone(), Vec::new(), self.max_page_size) {86Err(e) => return Some(Err(e)),87Ok(v) => v,88};89Some(Ok((iter, column)))90}91}9293/// A [`MutStreamingIterator`] of pre-read column chunks94#[derive(Debug)]95pub struct ReadColumnIterator {96field: ParquetType,97chunks: Vec<(98Vec<Result<CompressedPage, ParquetError>>,99ColumnChunkMetadata,100)>,101current: Option<(102IntoIter<Result<CompressedPage, ParquetError>>,103ColumnChunkMetadata,104)>,105}106107impl ReadColumnIterator {108/// Returns a new [`ReadColumnIterator`]109pub fn new(110field: ParquetType,111chunks: Vec<(112Vec<Result<CompressedPage, ParquetError>>,113ColumnChunkMetadata,114)>,115) -> Self {116Self {117field,118chunks,119current: None,120}121}122}123124impl MutStreamingIterator for ReadColumnIterator {125type Item = (126IntoIter<Result<CompressedPage, ParquetError>>,127ColumnChunkMetadata,128);129type Error = ParquetError;130131fn advance(mut self) -> Result<State<Self>, ParquetError> {132if self.chunks.is_empty() {133return Ok(State::Finished(vec![]));134}135self.current = self136.chunks137.pop()138.map(|(pages, meta)| (pages.into_iter(), meta));139Ok(State::Some(Self {140field: self.field,141chunks: self.chunks,142current: self.current,143}))144}145146fn get(&mut self) -> Option<&mut Self::Item> {147self.current.as_mut()148}149}150151152