Path: blob/main/crates/polars-core/src/schema/iceberg.rs
8450 views
//! TODO1//!2//! This should ideally be moved to `polars-schema`, currently it cannot due to dependency on3//! `polars_core::DataType`.4use std::borrow::Cow;5use std::sync::Arc;67use arrow::datatypes::{ArrowDataType, ArrowSchema, Field as ArrowField};8use polars_error::{PolarsResult, feature_gated, polars_bail, polars_err};9use polars_utils::aliases::InitHashMaps;10use polars_utils::pl_str::PlSmallStr;1112use crate::prelude::{DataType, Field, PlIndexMap};1314pub const LIST_ELEMENT_DEFAULT_ID: u32 = u32::MAX;1516/// Maps Iceberg physical IDs to columns.17///18/// Note: This doesn't use `Schema<F>` as the keys are u32's.19#[derive(Debug, Clone, Eq, PartialEq)]20#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]21#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]22pub struct IcebergSchema(PlIndexMap<u32, IcebergColumn>);23pub type IcebergSchemaRef = Arc<IcebergSchema>;2425impl IcebergSchema {26/// Constructs a schema keyed by the physical ID stored in the arrow field metadata.27pub fn from_arrow_schema(schema: &ArrowSchema) -> PolarsResult<Self> {28Self::try_from_arrow_fields_iter(schema.iter_values())29}3031pub fn try_from_arrow_fields_iter<'a, I>(iter: I) -> PolarsResult<Self>32where33I: IntoIterator<Item = &'a ArrowField>,34{35let iter = iter.into_iter();3637let mut out = PlIndexMap::with_capacity(iter.size_hint().0);3839for arrow_field in iter {40let col: IcebergColumn = arrow_field_to_iceberg_column_rec(arrow_field, None)?;41let existing = out.insert(col.physical_id, col);4243if let Some(existing) = existing {44polars_bail!(45Duplicate:46"IcebergSchema: duplicate physical ID {:?}",47existing,48)49}50}5152Ok(Self(out))53}54}5556#[derive(Debug, Clone, Eq, Hash, PartialEq)]57#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]58#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]59pub struct IcebergColumn {60/// Output name61pub name: PlSmallStr,62/// This is expected to map from 'PARQUET:field_id'63pub physical_id: u32,64pub type_: IcebergColumnType,65}6667#[derive(Debug, Clone, Eq, Hash, PartialEq)]68#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]69#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]70pub enum IcebergColumnType {71Primitive {72/// This must not be a nested data type.73dtype: DataType,74},75List(Box<IcebergColumn>),76/// (values, width)77FixedSizeList(Box<IcebergColumn>, usize),78Struct(IcebergSchema),79}8081impl IcebergColumnType {82pub fn to_polars_dtype(&self) -> DataType {83use IcebergColumnType::*;8485match self {86Primitive { dtype } => dtype.clone(),87List(inner) => DataType::List(Box::new(inner.type_.to_polars_dtype())),88FixedSizeList(inner, width) => {89feature_gated!("dtype-array", {90DataType::Array(Box::new(inner.type_.to_polars_dtype()), *width)91})92},93Struct(fields) => feature_gated!("dtype-struct", {94DataType::Struct(95fields96.values()97.map(|col| Field::new(col.name.clone(), col.type_.to_polars_dtype()))98.collect(),99)100}),101}102}103104pub fn is_nested(&self) -> bool {105use IcebergColumnType::*;106107match self {108List(_) | FixedSizeList(..) | Struct(_) => true,109Primitive { .. } => false,110}111}112}113114fn arrow_field_to_iceberg_column_rec(115field: &ArrowField,116field_id_override: Option<u32>,117) -> PolarsResult<IcebergColumn> {118const PARQUET_FIELD_ID_KEY: &str = "PARQUET:field_id";119120let physical_id: u32 = field_id_override.ok_or(Cow::Borrowed("")).or_else(|_| {121field122.metadata123.as_deref()124.ok_or(Cow::Borrowed("metadata was None"))125.and_then(|md| {126md.get(PARQUET_FIELD_ID_KEY)127.ok_or(Cow::Borrowed("key not found in metadata"))128})129.and_then(|x| {130x.parse()131.map_err(|_| Cow::Owned(format!("could not parse value as u32: '{x}'")))132})133.map_err(|failed_reason: Cow<'_, str>| {134polars_err!(135SchemaFieldNotFound:136"IcebergSchema: failed to load '{PARQUET_FIELD_ID_KEY}' for field {}: {}",137&field.name,138failed_reason,139)140})141})?;142143// Prevent accidental re-use.144#[expect(unused)]145let field_id_override: ();146147use ArrowDataType as ADT;148149let name = field.name.clone();150151let type_ = match &field.dtype {152ADT::List(field) | ADT::LargeList(field) | ADT::Map(field, _) => {153// The `field` directly under the `Map` type does not contain a physical ID, so we add one in here.154// Note that this branch also catches `(Large)List` as the `Map` columns get loaded as that type155// from Parquet (currently unsure if this is intended).156let field_id_override = field157.metadata158.as_ref()159.is_none_or(|x| !x.contains_key(PARQUET_FIELD_ID_KEY))160.then_some(LIST_ELEMENT_DEFAULT_ID);161162IcebergColumnType::List(Box::new(arrow_field_to_iceberg_column_rec(163field,164field_id_override,165)?))166},167168#[cfg(feature = "dtype-array")]169ADT::FixedSizeList(field, width) => IcebergColumnType::FixedSizeList(170Box::new(arrow_field_to_iceberg_column_rec(field, None)?),171*width,172),173174#[cfg(feature = "dtype-struct")]175ADT::Struct(fields) => {176IcebergColumnType::Struct(IcebergSchema::try_from_arrow_fields_iter(fields)?)177},178179dtype => {180if let ADT::Dictionary(_key_type, value_type, _is_sorted) = dtype181&& !value_type.is_nested()182{183let dtype =184DataType::from_arrow_field(&ArrowField::new(name.clone(), dtype.clone(), true));185186IcebergColumnType::Primitive { dtype }187} else if dtype.is_nested() {188polars_bail!(189ComputeError:190"IcebergSchema: unsupported arrow type: {:?}",191dtype,192)193} else {194let dtype =195DataType::from_arrow_field(&ArrowField::new(name.clone(), dtype.clone(), true));196197IcebergColumnType::Primitive { dtype }198}199},200};201202let out = IcebergColumn {203name,204physical_id,205type_,206};207208Ok(out)209}210211impl<T> FromIterator<T> for IcebergSchema212where213PlIndexMap<u32, IcebergColumn>: FromIterator<T>,214{215fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {216Self(PlIndexMap::<u32, IcebergColumn>::from_iter(iter))217}218}219220impl std::hash::Hash for IcebergSchema {221fn hash<H: std::hash::Hasher>(&self, state: &mut H) {222for col in self.values() {223col.hash(state);224}225}226}227228impl std::ops::Deref for IcebergSchema {229type Target = PlIndexMap<u32, IcebergColumn>;230231fn deref(&self) -> &Self::Target {232&self.0233}234}235236impl std::ops::DerefMut for IcebergSchema {237fn deref_mut(&mut self) -> &mut Self::Target {238&mut self.0239}240}241242243