Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-core/src/schema/iceberg.rs
8450 views
1
//! TODO
2
//!
3
//! This should ideally be moved to `polars-schema`, currently it cannot due to dependency on
4
//! `polars_core::DataType`.
5
use std::borrow::Cow;
6
use std::sync::Arc;
7
8
use arrow::datatypes::{ArrowDataType, ArrowSchema, Field as ArrowField};
9
use polars_error::{PolarsResult, feature_gated, polars_bail, polars_err};
10
use polars_utils::aliases::InitHashMaps;
11
use polars_utils::pl_str::PlSmallStr;
12
13
use crate::prelude::{DataType, Field, PlIndexMap};
14
15
pub const LIST_ELEMENT_DEFAULT_ID: u32 = u32::MAX;
16
17
/// Maps Iceberg physical IDs to columns.
18
///
19
/// Note: This doesn't use `Schema<F>` as the keys are u32's.
20
#[derive(Debug, Clone, Eq, PartialEq)]
21
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
22
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
23
pub struct IcebergSchema(PlIndexMap<u32, IcebergColumn>);
24
pub type IcebergSchemaRef = Arc<IcebergSchema>;
25
26
impl IcebergSchema {
27
/// Constructs a schema keyed by the physical ID stored in the arrow field metadata.
28
pub fn from_arrow_schema(schema: &ArrowSchema) -> PolarsResult<Self> {
29
Self::try_from_arrow_fields_iter(schema.iter_values())
30
}
31
32
pub fn try_from_arrow_fields_iter<'a, I>(iter: I) -> PolarsResult<Self>
33
where
34
I: IntoIterator<Item = &'a ArrowField>,
35
{
36
let iter = iter.into_iter();
37
38
let mut out = PlIndexMap::with_capacity(iter.size_hint().0);
39
40
for arrow_field in iter {
41
let col: IcebergColumn = arrow_field_to_iceberg_column_rec(arrow_field, None)?;
42
let existing = out.insert(col.physical_id, col);
43
44
if let Some(existing) = existing {
45
polars_bail!(
46
Duplicate:
47
"IcebergSchema: duplicate physical ID {:?}",
48
existing,
49
)
50
}
51
}
52
53
Ok(Self(out))
54
}
55
}
56
57
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
58
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
59
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
60
pub struct IcebergColumn {
61
/// Output name
62
pub name: PlSmallStr,
63
/// This is expected to map from 'PARQUET:field_id'
64
pub physical_id: u32,
65
pub type_: IcebergColumnType,
66
}
67
68
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
69
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
70
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
71
pub enum IcebergColumnType {
72
Primitive {
73
/// This must not be a nested data type.
74
dtype: DataType,
75
},
76
List(Box<IcebergColumn>),
77
/// (values, width)
78
FixedSizeList(Box<IcebergColumn>, usize),
79
Struct(IcebergSchema),
80
}
81
82
impl IcebergColumnType {
83
pub fn to_polars_dtype(&self) -> DataType {
84
use IcebergColumnType::*;
85
86
match self {
87
Primitive { dtype } => dtype.clone(),
88
List(inner) => DataType::List(Box::new(inner.type_.to_polars_dtype())),
89
FixedSizeList(inner, width) => {
90
feature_gated!("dtype-array", {
91
DataType::Array(Box::new(inner.type_.to_polars_dtype()), *width)
92
})
93
},
94
Struct(fields) => feature_gated!("dtype-struct", {
95
DataType::Struct(
96
fields
97
.values()
98
.map(|col| Field::new(col.name.clone(), col.type_.to_polars_dtype()))
99
.collect(),
100
)
101
}),
102
}
103
}
104
105
pub fn is_nested(&self) -> bool {
106
use IcebergColumnType::*;
107
108
match self {
109
List(_) | FixedSizeList(..) | Struct(_) => true,
110
Primitive { .. } => false,
111
}
112
}
113
}
114
115
fn arrow_field_to_iceberg_column_rec(
116
field: &ArrowField,
117
field_id_override: Option<u32>,
118
) -> PolarsResult<IcebergColumn> {
119
const PARQUET_FIELD_ID_KEY: &str = "PARQUET:field_id";
120
121
let physical_id: u32 = field_id_override.ok_or(Cow::Borrowed("")).or_else(|_| {
122
field
123
.metadata
124
.as_deref()
125
.ok_or(Cow::Borrowed("metadata was None"))
126
.and_then(|md| {
127
md.get(PARQUET_FIELD_ID_KEY)
128
.ok_or(Cow::Borrowed("key not found in metadata"))
129
})
130
.and_then(|x| {
131
x.parse()
132
.map_err(|_| Cow::Owned(format!("could not parse value as u32: '{x}'")))
133
})
134
.map_err(|failed_reason: Cow<'_, str>| {
135
polars_err!(
136
SchemaFieldNotFound:
137
"IcebergSchema: failed to load '{PARQUET_FIELD_ID_KEY}' for field {}: {}",
138
&field.name,
139
failed_reason,
140
)
141
})
142
})?;
143
144
// Prevent accidental re-use.
145
#[expect(unused)]
146
let field_id_override: ();
147
148
use ArrowDataType as ADT;
149
150
let name = field.name.clone();
151
152
let type_ = match &field.dtype {
153
ADT::List(field) | ADT::LargeList(field) | ADT::Map(field, _) => {
154
// The `field` directly under the `Map` type does not contain a physical ID, so we add one in here.
155
// Note that this branch also catches `(Large)List` as the `Map` columns get loaded as that type
156
// from Parquet (currently unsure if this is intended).
157
let field_id_override = field
158
.metadata
159
.as_ref()
160
.is_none_or(|x| !x.contains_key(PARQUET_FIELD_ID_KEY))
161
.then_some(LIST_ELEMENT_DEFAULT_ID);
162
163
IcebergColumnType::List(Box::new(arrow_field_to_iceberg_column_rec(
164
field,
165
field_id_override,
166
)?))
167
},
168
169
#[cfg(feature = "dtype-array")]
170
ADT::FixedSizeList(field, width) => IcebergColumnType::FixedSizeList(
171
Box::new(arrow_field_to_iceberg_column_rec(field, None)?),
172
*width,
173
),
174
175
#[cfg(feature = "dtype-struct")]
176
ADT::Struct(fields) => {
177
IcebergColumnType::Struct(IcebergSchema::try_from_arrow_fields_iter(fields)?)
178
},
179
180
dtype => {
181
if let ADT::Dictionary(_key_type, value_type, _is_sorted) = dtype
182
&& !value_type.is_nested()
183
{
184
let dtype =
185
DataType::from_arrow_field(&ArrowField::new(name.clone(), dtype.clone(), true));
186
187
IcebergColumnType::Primitive { dtype }
188
} else if dtype.is_nested() {
189
polars_bail!(
190
ComputeError:
191
"IcebergSchema: unsupported arrow type: {:?}",
192
dtype,
193
)
194
} else {
195
let dtype =
196
DataType::from_arrow_field(&ArrowField::new(name.clone(), dtype.clone(), true));
197
198
IcebergColumnType::Primitive { dtype }
199
}
200
},
201
};
202
203
let out = IcebergColumn {
204
name,
205
physical_id,
206
type_,
207
};
208
209
Ok(out)
210
}
211
212
impl<T> FromIterator<T> for IcebergSchema
213
where
214
PlIndexMap<u32, IcebergColumn>: FromIterator<T>,
215
{
216
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
217
Self(PlIndexMap::<u32, IcebergColumn>::from_iter(iter))
218
}
219
}
220
221
impl std::hash::Hash for IcebergSchema {
222
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
223
for col in self.values() {
224
col.hash(state);
225
}
226
}
227
}
228
229
impl std::ops::Deref for IcebergSchema {
230
type Target = PlIndexMap<u32, IcebergColumn>;
231
232
fn deref(&self) -> &Self::Target {
233
&self.0
234
}
235
}
236
237
impl std::ops::DerefMut for IcebergSchema {
238
fn deref_mut(&mut self) -> &mut Self::Target {
239
&mut self.0
240
}
241
}
242
243