Path: blob/main/crates/polars-io/src/parquet/read/utils.rs
6940 views
use std::borrow::Cow;12use polars_core::prelude::{ArrowSchema, DataFrame, DataType, IDX_DTYPE, Series};3use polars_core::schema::SchemaNamesAndDtypes;4use polars_error::{PolarsResult, polars_bail};56use crate::RowIndex;7use crate::hive::materialize_hive_partitions;8use crate::utils::apply_projection;910pub fn materialize_empty_df(11projection: Option<&[usize]>,12reader_schema: &ArrowSchema,13hive_partition_columns: Option<&[Series]>,14row_index: Option<&RowIndex>,15) -> DataFrame {16let schema = if let Some(projection) = projection {17Cow::Owned(apply_projection(reader_schema, projection))18} else {19Cow::Borrowed(reader_schema)20};21let mut df = DataFrame::empty_with_arrow_schema(&schema);2223if let Some(row_index) = row_index {24df.insert_column(0, Series::new_empty(row_index.name.clone(), &IDX_DTYPE))25.unwrap();26}2728materialize_hive_partitions(&mut df, reader_schema, hive_partition_columns);2930df31}3233pub(super) fn projected_arrow_schema_to_projection_indices(34schema: &ArrowSchema,35projected_arrow_schema: &ArrowSchema,36) -> PolarsResult<Option<Vec<usize>>> {37let mut projection_indices = Vec::with_capacity(projected_arrow_schema.len());38let mut is_full_ordered_projection = projected_arrow_schema.len() == schema.len();3940for (i, field) in projected_arrow_schema.iter_values().enumerate() {41let dtype = {42let Some((idx, _, field)) = schema.get_full(&field.name) else {43polars_bail!(ColumnNotFound: "did not find column in file: {}", field.name)44};4546projection_indices.push(idx);47is_full_ordered_projection &= idx == i;4849DataType::from_arrow_field(field)50};51let expected_dtype = DataType::from_arrow_field(field);5253if dtype.clone() != expected_dtype {54polars_bail!(55mismatch,56col = &field.name,57expected = expected_dtype,58found = dtype59);60}61}6263Ok((!is_full_ordered_projection).then_some(projection_indices))64}6566/// Utility to ensure the dtype of the column in `current_schema` matches the dtype in `schema` if67/// that column exists in `schema`.68pub fn ensure_matching_dtypes_if_found(69schema: &ArrowSchema,70current_schema: &ArrowSchema,71) -> PolarsResult<()> {72current_schema73.iter_names_and_dtypes()74.try_for_each(|(name, dtype)| {75if let Some(field) = schema.get(name) {76if dtype != &field.dtype {77// Check again with timezone normalization78// TODO: Add an ArrowDtype eq wrapper?79let lhs = DataType::from_arrow_dtype(dtype);80let rhs = DataType::from_arrow_field(field);8182if lhs != rhs {83polars_bail!(84SchemaMismatch:85"dtypes differ for column {}: {:?} != {:?}"86, name, dtype, &field.dtype87);88}89}90}91Ok(())92})93}949596