Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-core/src/schema/iceberg.rs
6940 views
1
//! TODO
2
//!
3
//! This should ideally be moved to `polars-schema`, currently it cannot due to dependency on
4
//! `polars_core::DataType`.
5
use std::borrow::Cow;
6
use std::sync::Arc;
7
8
use arrow::datatypes::{ArrowDataType, ArrowSchema, Field as ArrowField};
9
use polars_error::{PolarsResult, feature_gated, polars_bail, polars_err};
10
use polars_utils::aliases::InitHashMaps;
11
use polars_utils::pl_str::PlSmallStr;
12
13
use crate::prelude::{DataType, Field, PlIndexMap};
14
15
/// Maps Iceberg physical IDs to columns.
16
///
17
/// Note: This doesn't use `Schema<D>` as the keys are u32's.
18
#[derive(Debug, Clone, Eq, PartialEq)]
19
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
20
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
21
pub struct IcebergSchema(PlIndexMap<u32, IcebergColumn>);
22
pub type IcebergSchemaRef = Arc<IcebergSchema>;
23
24
impl IcebergSchema {
25
/// Constructs a schema keyed by the physical ID stored in the arrow field metadata.
26
pub fn from_arrow_schema(schema: &ArrowSchema) -> PolarsResult<Self> {
27
Self::try_from_arrow_fields_iter(schema.iter_values())
28
}
29
30
pub fn try_from_arrow_fields_iter<'a, I>(iter: I) -> PolarsResult<Self>
31
where
32
I: IntoIterator<Item = &'a ArrowField>,
33
{
34
let iter = iter.into_iter();
35
let size_hint = iter.size_hint();
36
37
let mut out = PlIndexMap::with_capacity(size_hint.1.unwrap_or(size_hint.0));
38
39
for arrow_field in iter {
40
let col: IcebergColumn = arrow_field_to_iceberg_column_rec(arrow_field, None)?;
41
let existing = out.insert(col.physical_id, col);
42
43
if let Some(existing) = existing {
44
polars_bail!(
45
Duplicate:
46
"IcebergSchema: duplicate physical ID {:?}",
47
existing,
48
)
49
}
50
}
51
52
Ok(Self(out))
53
}
54
}
55
56
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
57
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
58
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
59
pub struct IcebergColumn {
60
/// Output name
61
pub name: PlSmallStr,
62
/// This is expected to map from 'PARQUET:field_id'
63
pub physical_id: u32,
64
pub type_: IcebergColumnType,
65
}
66
67
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
68
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
69
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
70
pub enum IcebergColumnType {
71
Primitive {
72
/// This must not be a nested data type.
73
dtype: DataType,
74
},
75
List(Box<IcebergColumn>),
76
/// (values, width)
77
FixedSizeList(Box<IcebergColumn>, usize),
78
Struct(IcebergSchema),
79
}
80
81
impl IcebergColumnType {
82
pub fn to_polars_dtype(&self) -> DataType {
83
use IcebergColumnType::*;
84
85
match self {
86
Primitive { dtype } => dtype.clone(),
87
List(inner) => DataType::List(Box::new(inner.type_.to_polars_dtype())),
88
FixedSizeList(inner, width) => {
89
feature_gated!("dtype-array", {
90
DataType::Array(Box::new(inner.type_.to_polars_dtype()), *width)
91
})
92
},
93
Struct(fields) => feature_gated!("dtype-struct", {
94
DataType::Struct(
95
fields
96
.values()
97
.map(|col| Field::new(col.name.clone(), col.type_.to_polars_dtype()))
98
.collect(),
99
)
100
}),
101
}
102
}
103
104
pub fn is_nested(&self) -> bool {
105
use IcebergColumnType::*;
106
107
match self {
108
List(_) | FixedSizeList(..) | Struct(_) => true,
109
Primitive { .. } => false,
110
}
111
}
112
}
113
114
fn arrow_field_to_iceberg_column_rec(
115
field: &ArrowField,
116
field_id_override: Option<u32>,
117
) -> PolarsResult<IcebergColumn> {
118
const PARQUET_FIELD_ID_KEY: &str = "PARQUET:field_id";
119
const MAP_DEFAULT_ID: u32 = u32::MAX; // u32::MAX
120
121
let physical_id: u32 = field_id_override.ok_or(Cow::Borrowed("")).or_else(|_| {
122
field
123
.metadata
124
.as_deref()
125
.ok_or(Cow::Borrowed("metadata was None"))
126
.and_then(|md| {
127
md.get(PARQUET_FIELD_ID_KEY)
128
.ok_or(Cow::Borrowed("key not found in metadata"))
129
})
130
.and_then(|x| {
131
x.parse()
132
.map_err(|_| Cow::Owned(format!("could not parse value as u32: '{x}'")))
133
})
134
.map_err(|failed_reason: Cow<'_, str>| {
135
polars_err!(
136
SchemaFieldNotFound:
137
"IcebergSchema: failed to load '{PARQUET_FIELD_ID_KEY}' for field {}: {}",
138
&field.name,
139
failed_reason,
140
)
141
})
142
})?;
143
144
// Prevent accidental re-use.
145
#[expect(unused)]
146
let field_id_override: ();
147
148
use ArrowDataType as ADT;
149
150
let name = field.name.clone();
151
152
let type_ = match &field.dtype {
153
ADT::List(field) | ADT::LargeList(field) | ADT::Map(field, _) => {
154
// The `field` directly under the `Map` type does not contain a physical ID, so we add one in here.
155
// Note that this branch also catches `(Large)List` as the `Map` columns get loaded as that type
156
// from Parquet (currently unsure if this is intended).
157
let field_id_override = field
158
.metadata
159
.as_ref()
160
.is_none_or(|x| !x.contains_key(PARQUET_FIELD_ID_KEY))
161
.then_some(MAP_DEFAULT_ID);
162
163
IcebergColumnType::List(Box::new(arrow_field_to_iceberg_column_rec(
164
field,
165
field_id_override,
166
)?))
167
},
168
169
#[cfg(feature = "dtype-array")]
170
ADT::FixedSizeList(field, width) => IcebergColumnType::FixedSizeList(
171
Box::new(arrow_field_to_iceberg_column_rec(field, None)?),
172
*width,
173
),
174
175
#[cfg(feature = "dtype-struct")]
176
ADT::Struct(fields) => {
177
IcebergColumnType::Struct(IcebergSchema::try_from_arrow_fields_iter(fields)?)
178
},
179
180
dtype => {
181
if dtype.is_nested() {
182
polars_bail!(
183
ComputeError:
184
"IcebergSchema: unsupported arrow type: {:?}",
185
dtype,
186
)
187
}
188
189
let dtype =
190
DataType::from_arrow_field(&ArrowField::new(name.clone(), dtype.clone(), true));
191
192
IcebergColumnType::Primitive { dtype }
193
},
194
};
195
196
let out = IcebergColumn {
197
name,
198
physical_id,
199
type_,
200
};
201
202
Ok(out)
203
}
204
205
impl<T> FromIterator<T> for IcebergSchema
206
where
207
PlIndexMap<u32, IcebergColumn>: FromIterator<T>,
208
{
209
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
210
Self(PlIndexMap::<u32, IcebergColumn>::from_iter(iter))
211
}
212
}
213
214
impl std::hash::Hash for IcebergSchema {
215
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
216
for col in self.values() {
217
col.hash(state);
218
}
219
}
220
}
221
222
impl std::ops::Deref for IcebergSchema {
223
type Target = PlIndexMap<u32, IcebergColumn>;
224
225
fn deref(&self) -> &Self::Target {
226
&self.0
227
}
228
}
229
230
impl std::ops::DerefMut for IcebergSchema {
231
fn deref_mut(&mut self) -> &mut Self::Target {
232
&mut self.0
233
}
234
}
235
236