Path: blob/main/crates/polars-io/src/parquet/read/reader.rs
8450 views
use std::io::{Read, Seek};1use std::sync::Arc;23use arrow::datatypes::ArrowSchemaRef;4use polars_core::prelude::*;5use polars_parquet::read;6use polars_utils::pl_str::PlRefStr;78use super::read_impl::read_parquet;9use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_projection_indices};10use crate::RowIndex;11use crate::mmap::MmapBytesReader;12use crate::parquet::metadata::FileMetadataRef;13use crate::prelude::*;1415/// Read Apache parquet format into a DataFrame.16#[must_use]17pub struct ParquetReader<R: Read + Seek> {18reader: R,19rechunk: bool,20slice: (usize, usize),21columns: Option<Vec<String>>,22projection: Option<Vec<usize>>,23parallel: ParallelStrategy,24schema: Option<ArrowSchemaRef>,25row_index: Option<RowIndex>,26low_memory: bool,27metadata: Option<FileMetadataRef>,28hive_partition_columns: Option<Vec<Series>>,29include_file_path: Option<(PlSmallStr, PlRefStr)>,30}3132impl<R: MmapBytesReader> ParquetReader<R> {33/// Try to reduce memory pressure at the expense of performance. If setting this does not reduce memory34/// enough, turn off parallelization.35pub fn set_low_memory(mut self, low_memory: bool) -> Self {36self.low_memory = low_memory;37self38}3940/// Read the parquet file in parallel (default). The single threaded reader consumes less memory.41pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {42self.parallel = parallel;43self44}4546pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self {47self.slice = slice.unwrap_or((0, usize::MAX));48self49}5051/// Columns to select/ project52pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {53self.columns = columns;54self55}5657/// Set the reader's column projection. This counts from 0, meaning that58/// `vec![0, 4]` would select the 1st and 5th column.59pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {60self.projection = projection;61self62}6364/// Add a row index column.65pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {66self.row_index = row_index;67self68}6970/// Checks that the file contains all the columns in `projected_arrow_schema` with the same71/// dtype, and sets the projection indices.72pub fn with_arrow_schema_projection(73mut self,74first_schema: &Arc<ArrowSchema>,75projected_arrow_schema: Option<&ArrowSchema>,76allow_missing_columns: bool,77) -> PolarsResult<Self> {78let slf_schema = self.schema()?;79let slf_schema_width = slf_schema.len();8081if allow_missing_columns {82// Must check the dtypes83ensure_matching_dtypes_if_found(84projected_arrow_schema.unwrap_or(first_schema.as_ref()),85self.schema()?.as_ref(),86)?;87self.schema = Some(Arc::new(88first_schema89.iter()90.map(|(name, field)| {91(name.clone(), slf_schema.get(name).unwrap_or(field).clone())92})93.collect(),94));95}9697let schema = self.schema()?;9899(|| {100if let Some(projected_arrow_schema) = projected_arrow_schema {101self.projection = projected_arrow_schema_to_projection_indices(102schema.as_ref(),103projected_arrow_schema,104)?;105} else {106if slf_schema_width > first_schema.len() {107polars_bail!(108SchemaMismatch:109"parquet file contained extra columns and no selection was given"110)111}112113self.projection =114projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;115};116Ok(())117})()118.map_err(|e| {119if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {120e.wrap_msg(|s| {121format!(122"error with column selection, \123consider passing `missing_columns='insert'`: {s}"124)125})126} else {127e128}129})?;130131Ok(self)132}133134/// [`Schema`] of the file.135pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {136self.schema = Some(match &self.schema {137Some(schema) => schema.clone(),138None => {139let metadata = self.get_metadata()?;140Arc::new(read::infer_schema(metadata)?)141},142});143144Ok(self.schema.clone().unwrap())145}146147/// Number of rows in the parquet file.148pub fn num_rows(&mut self) -> PolarsResult<usize> {149let metadata = self.get_metadata()?;150Ok(metadata.num_rows)151}152153pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {154self.hive_partition_columns = columns;155self156}157158pub fn with_include_file_path(159mut self,160include_file_path: Option<(PlSmallStr, PlRefStr)>,161) -> Self {162self.include_file_path = include_file_path;163self164}165166pub fn set_metadata(&mut self, metadata: FileMetadataRef) {167self.metadata = Some(metadata);168}169170pub fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {171if self.metadata.is_none() {172self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));173}174Ok(self.metadata.as_ref().unwrap())175}176}177178impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {179/// Create a new [`ParquetReader`] from an existing `Reader`.180fn new(reader: R) -> Self {181ParquetReader {182reader,183rechunk: false,184slice: (0, usize::MAX),185columns: None,186projection: None,187parallel: Default::default(),188row_index: None,189low_memory: false,190metadata: None,191schema: None,192hive_partition_columns: None,193include_file_path: None,194}195}196197fn set_rechunk(mut self, rechunk: bool) -> Self {198self.rechunk = rechunk;199self200}201202fn finish(mut self) -> PolarsResult<DataFrame> {203let schema = self.schema()?;204let metadata = self.get_metadata()?.clone();205let n_rows = metadata.num_rows.min(self.slice.0 + self.slice.1);206207if let Some(cols) = &self.columns {208self.projection = Some(columns_to_projection(cols, schema.as_ref())?);209}210211let mut df = read_parquet(212self.reader,213self.slice,214self.projection.as_deref(),215&schema,216Some(metadata),217self.parallel,218self.row_index,219self.hive_partition_columns.as_deref(),220)?;221222if self.rechunk {223df.rechunk_mut_par();224};225226if let Some((col, value)) = &self.include_file_path {227unsafe {228df.push_column_unchecked(Column::new_scalar(229col.clone(),230Scalar::new(231DataType::String,232AnyValue::StringOwned(value.as_str().into()),233),234if df.width() > 0 { df.height() } else { n_rows },235))236};237}238239Ok(df)240}241}242243244