Path: blob/main/crates/polars-parquet/src/parquet/read/column/mod.rs
7885 views
use std::vec::IntoIter;12use polars_utils::idx_vec::UnitVec;34use super::{MemReader, PageReader, get_page_iterator};5use crate::parquet::error::{ParquetError, ParquetResult};6use crate::parquet::metadata::{ColumnChunkMetadata, RowGroupMetadata};7use crate::parquet::page::CompressedPage;8use crate::parquet::schema::types::ParquetType;910/// Returns a [`ColumnIterator`] of column chunks corresponding to `field`.11///12/// Contrarily to [`get_page_iterator`] that returns a single iterator of pages, this iterator13/// iterates over columns, one by one, and returns a [`PageReader`] per column.14/// For primitive fields (e.g. `i64`), [`ColumnIterator`] yields exactly one column.15/// For complex fields, it yields multiple columns.16/// `max_page_size` is the maximum number of bytes allowed.17pub fn get_column_iterator<'a>(18reader: MemReader,19row_group: &'a RowGroupMetadata,20field_name: &str,21max_page_size: usize,22) -> ColumnIterator<'a> {23let columns = row_group24.columns_under_root_iter(field_name)25.unwrap()26.rev()27.collect::<UnitVec<_>>();28ColumnIterator::new(reader, columns, max_page_size)29}3031/// State of [`MutStreamingIterator`].32#[derive(Debug)]33pub enum State<T> {34/// Iterator still has elements35Some(T),36/// Iterator finished37Finished(Vec<u8>),38}3940/// A special kind of fallible streaming iterator where `advance` consumes the iterator.41pub trait MutStreamingIterator: Sized {42type Item;43type Error;4445fn advance(self) -> std::result::Result<State<Self>, Self::Error>;46fn get(&mut self) -> Option<&mut Self::Item>;47}4849/// A [`MutStreamingIterator`] that reads column chunks one by one,50/// returning a [`PageReader`] per column.51pub struct ColumnIterator<'a> {52reader: MemReader,53columns: UnitVec<&'a ColumnChunkMetadata>,54max_page_size: usize,55}5657impl<'a> ColumnIterator<'a> {58/// Returns a new [`ColumnIterator`]59/// `max_page_size` is the maximum allowed page size60pub fn new(61reader: MemReader,62columns: UnitVec<&'a ColumnChunkMetadata>,63max_page_size: usize,64) -> Self {65Self {66reader,67columns,68max_page_size,69}70}71}7273impl<'a> Iterator for ColumnIterator<'a> {74type Item = ParquetResult<(PageReader, &'a ColumnChunkMetadata)>;7576fn next(&mut self) -> Option<Self::Item> {77if self.columns.is_empty() {78return None;79};80let column = self.columns.pop().unwrap();8182let iter =83match get_page_iterator(column, self.reader.clone(), Vec::new(), self.max_page_size) {84Err(e) => return Some(Err(e)),85Ok(v) => v,86};87Some(Ok((iter, column)))88}89}9091/// A [`MutStreamingIterator`] of pre-read column chunks92#[derive(Debug)]93pub struct ReadColumnIterator {94field: ParquetType,95chunks: Vec<(96Vec<Result<CompressedPage, ParquetError>>,97ColumnChunkMetadata,98)>,99current: Option<(100IntoIter<Result<CompressedPage, ParquetError>>,101ColumnChunkMetadata,102)>,103}104105impl ReadColumnIterator {106/// Returns a new [`ReadColumnIterator`]107pub fn new(108field: ParquetType,109chunks: Vec<(110Vec<Result<CompressedPage, ParquetError>>,111ColumnChunkMetadata,112)>,113) -> Self {114Self {115field,116chunks,117current: None,118}119}120}121122impl MutStreamingIterator for ReadColumnIterator {123type Item = (124IntoIter<Result<CompressedPage, ParquetError>>,125ColumnChunkMetadata,126);127type Error = ParquetError;128129fn advance(mut self) -> Result<State<Self>, ParquetError> {130if self.chunks.is_empty() {131return Ok(State::Finished(vec![]));132}133self.current = self134.chunks135.pop()136.map(|(pages, meta)| (pages.into_iter(), meta));137Ok(State::Some(Self {138field: self.field,139chunks: self.chunks,140current: self.current,141}))142}143144fn get(&mut self) -> Option<&mut Self::Item> {145self.current.as_mut()146}147}148149150