Path: blob/main/crates/polars-io/src/parquet/read/reader.rs
6940 views
use std::io::{Read, Seek};1use std::sync::Arc;23use arrow::datatypes::ArrowSchemaRef;4use polars_core::prelude::*;5use polars_parquet::read;67use super::read_impl::read_parquet;8use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_projection_indices};9use crate::RowIndex;10use crate::mmap::MmapBytesReader;11use crate::parquet::metadata::FileMetadataRef;12use crate::prelude::*;1314/// Read Apache parquet format into a DataFrame.15#[must_use]16pub struct ParquetReader<R: Read + Seek> {17reader: R,18rechunk: bool,19slice: (usize, usize),20columns: Option<Vec<String>>,21projection: Option<Vec<usize>>,22parallel: ParallelStrategy,23schema: Option<ArrowSchemaRef>,24row_index: Option<RowIndex>,25low_memory: bool,26metadata: Option<FileMetadataRef>,27hive_partition_columns: Option<Vec<Series>>,28include_file_path: Option<(PlSmallStr, Arc<str>)>,29}3031impl<R: MmapBytesReader> ParquetReader<R> {32/// Try to reduce memory pressure at the expense of performance. If setting this does not reduce memory33/// enough, turn off parallelization.34pub fn set_low_memory(mut self, low_memory: bool) -> Self {35self.low_memory = low_memory;36self37}3839/// Read the parquet file in parallel (default). The single threaded reader consumes less memory.40pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {41self.parallel = parallel;42self43}4445pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self {46self.slice = slice.unwrap_or((0, usize::MAX));47self48}4950/// Columns to select/ project51pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {52self.columns = columns;53self54}5556/// Set the reader's column projection. This counts from 0, meaning that57/// `vec![0, 4]` would select the 1st and 5th column.58pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {59self.projection = projection;60self61}6263/// Add a row index column.64pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {65self.row_index = row_index;66self67}6869/// Checks that the file contains all the columns in `projected_arrow_schema` with the same70/// dtype, and sets the projection indices.71pub fn with_arrow_schema_projection(72mut self,73first_schema: &Arc<ArrowSchema>,74projected_arrow_schema: Option<&ArrowSchema>,75allow_missing_columns: bool,76) -> PolarsResult<Self> {77let slf_schema = self.schema()?;78let slf_schema_width = slf_schema.len();7980if allow_missing_columns {81// Must check the dtypes82ensure_matching_dtypes_if_found(83projected_arrow_schema.unwrap_or(first_schema.as_ref()),84self.schema()?.as_ref(),85)?;86self.schema = Some(Arc::new(87first_schema88.iter()89.map(|(name, field)| {90(name.clone(), slf_schema.get(name).unwrap_or(field).clone())91})92.collect(),93));94}9596let schema = self.schema()?;9798(|| {99if let Some(projected_arrow_schema) = projected_arrow_schema {100self.projection = projected_arrow_schema_to_projection_indices(101schema.as_ref(),102projected_arrow_schema,103)?;104} else {105if slf_schema_width > first_schema.len() {106polars_bail!(107SchemaMismatch:108"parquet file contained extra columns and no selection was given"109)110}111112self.projection =113projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;114};115Ok(())116})()117.map_err(|e| {118if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {119e.wrap_msg(|s| {120format!(121"error with column selection, \122consider passing `missing_columns='insert'`: {s}"123)124})125} else {126e127}128})?;129130Ok(self)131}132133/// [`Schema`] of the file.134pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {135self.schema = Some(match &self.schema {136Some(schema) => schema.clone(),137None => {138let metadata = self.get_metadata()?;139Arc::new(read::infer_schema(metadata)?)140},141});142143Ok(self.schema.clone().unwrap())144}145146/// Number of rows in the parquet file.147pub fn num_rows(&mut self) -> PolarsResult<usize> {148let metadata = self.get_metadata()?;149Ok(metadata.num_rows)150}151152pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {153self.hive_partition_columns = columns;154self155}156157pub fn with_include_file_path(158mut self,159include_file_path: Option<(PlSmallStr, Arc<str>)>,160) -> Self {161self.include_file_path = include_file_path;162self163}164165pub fn set_metadata(&mut self, metadata: FileMetadataRef) {166self.metadata = Some(metadata);167}168169pub fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {170if self.metadata.is_none() {171self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));172}173Ok(self.metadata.as_ref().unwrap())174}175}176177impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {178/// Create a new [`ParquetReader`] from an existing `Reader`.179fn new(reader: R) -> Self {180ParquetReader {181reader,182rechunk: false,183slice: (0, usize::MAX),184columns: None,185projection: None,186parallel: Default::default(),187row_index: None,188low_memory: false,189metadata: None,190schema: None,191hive_partition_columns: None,192include_file_path: None,193}194}195196fn set_rechunk(mut self, rechunk: bool) -> Self {197self.rechunk = rechunk;198self199}200201fn finish(mut self) -> PolarsResult<DataFrame> {202let schema = self.schema()?;203let metadata = self.get_metadata()?.clone();204let n_rows = metadata.num_rows.min(self.slice.0 + self.slice.1);205206if let Some(cols) = &self.columns {207self.projection = Some(columns_to_projection(cols, schema.as_ref())?);208}209210let mut df = read_parquet(211self.reader,212self.slice,213self.projection.as_deref(),214&schema,215Some(metadata),216self.parallel,217self.row_index,218self.hive_partition_columns.as_deref(),219)?;220221if self.rechunk {222df.as_single_chunk_par();223};224225if let Some((col, value)) = &self.include_file_path {226unsafe {227df.with_column_unchecked(Column::new_scalar(228col.clone(),229Scalar::new(230DataType::String,231AnyValue::StringOwned(value.as_ref().into()),232),233if df.width() > 0 { df.height() } else { n_rows },234))235};236}237238Ok(df)239}240}241242243