Path: blob/main/crates/polars-core/src/schema/iceberg.rs
6940 views
//! TODO1//!2//! This should ideally be moved to `polars-schema`, currently it cannot due to dependency on3//! `polars_core::DataType`.4use std::borrow::Cow;5use std::sync::Arc;67use arrow::datatypes::{ArrowDataType, ArrowSchema, Field as ArrowField};8use polars_error::{PolarsResult, feature_gated, polars_bail, polars_err};9use polars_utils::aliases::InitHashMaps;10use polars_utils::pl_str::PlSmallStr;1112use crate::prelude::{DataType, Field, PlIndexMap};1314/// Maps Iceberg physical IDs to columns.15///16/// Note: This doesn't use `Schema<D>` as the keys are u32's.17#[derive(Debug, Clone, Eq, PartialEq)]18#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]19#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]20pub struct IcebergSchema(PlIndexMap<u32, IcebergColumn>);21pub type IcebergSchemaRef = Arc<IcebergSchema>;2223impl IcebergSchema {24/// Constructs a schema keyed by the physical ID stored in the arrow field metadata.25pub fn from_arrow_schema(schema: &ArrowSchema) -> PolarsResult<Self> {26Self::try_from_arrow_fields_iter(schema.iter_values())27}2829pub fn try_from_arrow_fields_iter<'a, I>(iter: I) -> PolarsResult<Self>30where31I: IntoIterator<Item = &'a ArrowField>,32{33let iter = iter.into_iter();34let size_hint = iter.size_hint();3536let mut out = PlIndexMap::with_capacity(size_hint.1.unwrap_or(size_hint.0));3738for arrow_field in iter {39let col: IcebergColumn = arrow_field_to_iceberg_column_rec(arrow_field, None)?;40let existing = out.insert(col.physical_id, col);4142if let Some(existing) = existing {43polars_bail!(44Duplicate:45"IcebergSchema: duplicate physical ID {:?}",46existing,47)48}49}5051Ok(Self(out))52}53}5455#[derive(Debug, Clone, Eq, Hash, PartialEq)]56#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]57#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]58pub struct IcebergColumn {59/// Output name60pub name: PlSmallStr,61/// This is expected to map from 'PARQUET:field_id'62pub physical_id: u32,63pub type_: IcebergColumnType,64}6566#[derive(Debug, Clone, Eq, Hash, PartialEq)]67#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]68#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]69pub enum IcebergColumnType {70Primitive {71/// This must not be a nested data type.72dtype: DataType,73},74List(Box<IcebergColumn>),75/// (values, width)76FixedSizeList(Box<IcebergColumn>, usize),77Struct(IcebergSchema),78}7980impl IcebergColumnType {81pub fn to_polars_dtype(&self) -> DataType {82use IcebergColumnType::*;8384match self {85Primitive { dtype } => dtype.clone(),86List(inner) => DataType::List(Box::new(inner.type_.to_polars_dtype())),87FixedSizeList(inner, width) => {88feature_gated!("dtype-array", {89DataType::Array(Box::new(inner.type_.to_polars_dtype()), *width)90})91},92Struct(fields) => feature_gated!("dtype-struct", {93DataType::Struct(94fields95.values()96.map(|col| Field::new(col.name.clone(), col.type_.to_polars_dtype()))97.collect(),98)99}),100}101}102103pub fn is_nested(&self) -> bool {104use IcebergColumnType::*;105106match self {107List(_) | FixedSizeList(..) | Struct(_) => true,108Primitive { .. } => false,109}110}111}112113fn arrow_field_to_iceberg_column_rec(114field: &ArrowField,115field_id_override: Option<u32>,116) -> PolarsResult<IcebergColumn> {117const PARQUET_FIELD_ID_KEY: &str = "PARQUET:field_id";118const MAP_DEFAULT_ID: u32 = u32::MAX; // u32::MAX119120let physical_id: u32 = field_id_override.ok_or(Cow::Borrowed("")).or_else(|_| {121field122.metadata123.as_deref()124.ok_or(Cow::Borrowed("metadata was None"))125.and_then(|md| {126md.get(PARQUET_FIELD_ID_KEY)127.ok_or(Cow::Borrowed("key not found in metadata"))128})129.and_then(|x| {130x.parse()131.map_err(|_| Cow::Owned(format!("could not parse value as u32: '{x}'")))132})133.map_err(|failed_reason: Cow<'_, str>| {134polars_err!(135SchemaFieldNotFound:136"IcebergSchema: failed to load '{PARQUET_FIELD_ID_KEY}' for field {}: {}",137&field.name,138failed_reason,139)140})141})?;142143// Prevent accidental re-use.144#[expect(unused)]145let field_id_override: ();146147use ArrowDataType as ADT;148149let name = field.name.clone();150151let type_ = match &field.dtype {152ADT::List(field) | ADT::LargeList(field) | ADT::Map(field, _) => {153// The `field` directly under the `Map` type does not contain a physical ID, so we add one in here.154// Note that this branch also catches `(Large)List` as the `Map` columns get loaded as that type155// from Parquet (currently unsure if this is intended).156let field_id_override = field157.metadata158.as_ref()159.is_none_or(|x| !x.contains_key(PARQUET_FIELD_ID_KEY))160.then_some(MAP_DEFAULT_ID);161162IcebergColumnType::List(Box::new(arrow_field_to_iceberg_column_rec(163field,164field_id_override,165)?))166},167168#[cfg(feature = "dtype-array")]169ADT::FixedSizeList(field, width) => IcebergColumnType::FixedSizeList(170Box::new(arrow_field_to_iceberg_column_rec(field, None)?),171*width,172),173174#[cfg(feature = "dtype-struct")]175ADT::Struct(fields) => {176IcebergColumnType::Struct(IcebergSchema::try_from_arrow_fields_iter(fields)?)177},178179dtype => {180if dtype.is_nested() {181polars_bail!(182ComputeError:183"IcebergSchema: unsupported arrow type: {:?}",184dtype,185)186}187188let dtype =189DataType::from_arrow_field(&ArrowField::new(name.clone(), dtype.clone(), true));190191IcebergColumnType::Primitive { dtype }192},193};194195let out = IcebergColumn {196name,197physical_id,198type_,199};200201Ok(out)202}203204impl<T> FromIterator<T> for IcebergSchema205where206PlIndexMap<u32, IcebergColumn>: FromIterator<T>,207{208fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {209Self(PlIndexMap::<u32, IcebergColumn>::from_iter(iter))210}211}212213impl std::hash::Hash for IcebergSchema {214fn hash<H: std::hash::Hasher>(&self, state: &mut H) {215for col in self.values() {216col.hash(state);217}218}219}220221impl std::ops::Deref for IcebergSchema {222type Target = PlIndexMap<u32, IcebergColumn>;223224fn deref(&self) -> &Self::Target {225&self.0226}227}228229impl std::ops::DerefMut for IcebergSchema {230fn deref_mut(&mut self) -> &mut Self::Target {231&mut self.0232}233}234235236