Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/write/schema.rs
8446 views
1
use std::sync::{Arc, LazyLock};
2
3
use arrow::datatypes::{
4
ArrowDataType, ArrowSchema, ExtensionType, Field, PARQUET_EMPTY_STRUCT, TimeUnit,
5
};
6
use arrow::io::ipc::write::{default_ipc_fields, schema_to_bytes};
7
use base64::Engine as _;
8
use base64::engine::general_purpose;
9
use polars_error::{PolarsResult, polars_bail};
10
use polars_utils::pl_str::PlSmallStr;
11
12
use super::super::ARROW_SCHEMA_META_KEY;
13
use crate::arrow::write::decimal_length_from_precision;
14
use crate::parquet::metadata::KeyValue;
15
use crate::parquet::schema::Repetition;
16
use crate::parquet::schema::types::{
17
GroupConvertedType, GroupLogicalType, IntegerType, ParquetType, PhysicalType,
18
PrimitiveConvertedType, PrimitiveLogicalType, TimeUnit as ParquetTimeUnit,
19
};
20
21
fn convert_field(field: Field) -> Field {
22
let mut metadata = field.metadata;
23
24
if let ArrowDataType::Struct(fields) = &field.dtype
25
&& fields.is_empty()
26
{
27
Arc::make_mut(metadata.get_or_insert_default()).insert(
28
PlSmallStr::from_static(PARQUET_EMPTY_STRUCT),
29
PlSmallStr::EMPTY,
30
);
31
}
32
33
Field {
34
name: field.name,
35
dtype: convert_dtype(field.dtype),
36
is_nullable: field.is_nullable,
37
metadata,
38
}
39
}
40
41
fn convert_dtype(dtype: ArrowDataType) -> ArrowDataType {
42
use ArrowDataType as D;
43
match dtype {
44
D::LargeList(field) => D::LargeList(Box::new(convert_field(*field))),
45
D::Struct(mut fields) => {
46
for field in &mut fields {
47
*field = convert_field(std::mem::take(field))
48
}
49
D::Struct(fields)
50
},
51
D::Dictionary(it, dtype, sorted) => {
52
let dtype = convert_dtype(*dtype);
53
D::Dictionary(it, Box::new(dtype), sorted)
54
},
55
D::Extension(ext) => {
56
let dtype = convert_dtype(ext.inner);
57
D::Extension(Box::new(ExtensionType {
58
inner: dtype,
59
..*ext
60
}))
61
},
62
dt => dt,
63
}
64
}
65
66
pub fn schema_to_metadata_key(schema: &ArrowSchema) -> KeyValue {
67
let serialized_schema = if schema.iter_values().any(|field| field.dtype.is_nested()) {
68
let schema = schema
69
.iter_values()
70
.map(|field| convert_field(field.clone()))
71
.map(|x| (x.name.clone(), x))
72
.collect();
73
schema_to_bytes(&schema, &default_ipc_fields(schema.iter_values()), None)
74
} else {
75
schema_to_bytes(schema, &default_ipc_fields(schema.iter_values()), None)
76
};
77
78
// manually prepending the length to the schema as arrow uses the legacy IPC format
79
// TODO: change after addressing ARROW-9777
80
let schema_len = serialized_schema.len();
81
let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
82
len_prefix_schema.extend_from_slice(&[255u8, 255, 255, 255]);
83
len_prefix_schema.extend_from_slice(&(schema_len as u32).to_le_bytes());
84
len_prefix_schema.extend_from_slice(&serialized_schema);
85
86
let encoded = general_purpose::STANDARD.encode(&len_prefix_schema);
87
88
KeyValue {
89
key: ARROW_SCHEMA_META_KEY.to_string(),
90
value: Some(encoded),
91
}
92
}
93
94
/// Creates a [`ParquetType`] from a [`Field`].
95
pub fn to_parquet_type(field: &Field) -> PolarsResult<ParquetType> {
96
let name = field.name.clone();
97
let repetition = if field.is_nullable {
98
Repetition::Optional
99
} else {
100
Repetition::Required
101
};
102
103
let field_id: Option<i32> = None;
104
105
// create type from field
106
let (physical_type, primitive_converted_type, primitive_logical_type) = match field
107
.dtype()
108
.to_storage()
109
{
110
ArrowDataType::Null => (
111
PhysicalType::Int32,
112
None,
113
Some(PrimitiveLogicalType::Unknown),
114
),
115
ArrowDataType::Boolean => (PhysicalType::Boolean, None, None),
116
ArrowDataType::Int32 => (PhysicalType::Int32, None, None),
117
// ArrowDataType::Duration(_) has no parquet representation => do not apply any logical type
118
ArrowDataType::Int64 | ArrowDataType::Duration(_) => (PhysicalType::Int64, None, None),
119
// no natural representation in parquet; leave it as is.
120
// arrow consumers MAY use the arrow schema in the metadata to parse them.
121
ArrowDataType::Date64 => (PhysicalType::Int64, None, None),
122
ArrowDataType::Float16 => (
123
PhysicalType::FixedLenByteArray(2),
124
None,
125
Some(PrimitiveLogicalType::Float16),
126
),
127
ArrowDataType::Float32 => (PhysicalType::Float, None, None),
128
ArrowDataType::Float64 => (PhysicalType::Double, None, None),
129
ArrowDataType::Binary | ArrowDataType::LargeBinary | ArrowDataType::BinaryView => {
130
(PhysicalType::ByteArray, None, None)
131
},
132
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View => (
133
PhysicalType::ByteArray,
134
Some(PrimitiveConvertedType::Utf8),
135
Some(PrimitiveLogicalType::String),
136
),
137
ArrowDataType::Date32 => (
138
PhysicalType::Int32,
139
Some(PrimitiveConvertedType::Date),
140
Some(PrimitiveLogicalType::Date),
141
),
142
ArrowDataType::Int8 => (
143
PhysicalType::Int32,
144
Some(PrimitiveConvertedType::Int8),
145
Some(PrimitiveLogicalType::Integer(IntegerType::Int8)),
146
),
147
ArrowDataType::Int16 => (
148
PhysicalType::Int32,
149
Some(PrimitiveConvertedType::Int16),
150
Some(PrimitiveLogicalType::Integer(IntegerType::Int16)),
151
),
152
ArrowDataType::UInt8 => (
153
PhysicalType::Int32,
154
Some(PrimitiveConvertedType::Uint8),
155
Some(PrimitiveLogicalType::Integer(IntegerType::UInt8)),
156
),
157
ArrowDataType::UInt16 => (
158
PhysicalType::Int32,
159
Some(PrimitiveConvertedType::Uint16),
160
Some(PrimitiveLogicalType::Integer(IntegerType::UInt16)),
161
),
162
ArrowDataType::UInt32 => (
163
PhysicalType::Int32,
164
Some(PrimitiveConvertedType::Uint32),
165
Some(PrimitiveLogicalType::Integer(IntegerType::UInt32)),
166
),
167
ArrowDataType::UInt64 => (
168
PhysicalType::Int64,
169
Some(PrimitiveConvertedType::Uint64),
170
Some(PrimitiveLogicalType::Integer(IntegerType::UInt64)),
171
),
172
// no natural representation in parquet; leave it as is.
173
// arrow consumers MAY use the arrow schema in the metadata to parse them.
174
ArrowDataType::Timestamp(TimeUnit::Second, _) => (PhysicalType::Int64, None, None),
175
ArrowDataType::Timestamp(time_unit, zone) => (
176
PhysicalType::Int64,
177
None,
178
Some(PrimitiveLogicalType::Timestamp {
179
is_adjusted_to_utc: matches!(zone, Some(z) if !z.as_str().is_empty()),
180
unit: match time_unit {
181
TimeUnit::Second => unreachable!(),
182
TimeUnit::Millisecond => ParquetTimeUnit::Milliseconds,
183
TimeUnit::Microsecond => ParquetTimeUnit::Microseconds,
184
TimeUnit::Nanosecond => ParquetTimeUnit::Nanoseconds,
185
},
186
}),
187
),
188
// no natural representation in parquet; leave it as is.
189
// arrow consumers MAY use the arrow schema in the metadata to parse them.
190
ArrowDataType::Time32(TimeUnit::Second) => (PhysicalType::Int32, None, None),
191
ArrowDataType::Time32(TimeUnit::Millisecond) => (
192
PhysicalType::Int32,
193
Some(PrimitiveConvertedType::TimeMillis),
194
Some(PrimitiveLogicalType::Time {
195
is_adjusted_to_utc: false,
196
unit: ParquetTimeUnit::Milliseconds,
197
}),
198
),
199
ArrowDataType::Time64(time_unit) => (
200
PhysicalType::Int64,
201
match time_unit {
202
TimeUnit::Microsecond => Some(PrimitiveConvertedType::TimeMicros),
203
TimeUnit::Nanosecond => None,
204
_ => unreachable!(),
205
},
206
Some(PrimitiveLogicalType::Time {
207
is_adjusted_to_utc: false,
208
unit: match time_unit {
209
TimeUnit::Microsecond => ParquetTimeUnit::Microseconds,
210
TimeUnit::Nanosecond => ParquetTimeUnit::Nanoseconds,
211
_ => unreachable!(),
212
},
213
}),
214
),
215
ArrowDataType::Struct(fields) => {
216
if fields.is_empty() {
217
static ALLOW_EMPTY_STRUCTS: LazyLock<bool> = LazyLock::new(|| {
218
std::env::var("POLARS_ALLOW_PQ_EMPTY_STRUCT").is_ok_and(|v| v.as_str() == "1")
219
});
220
221
if *ALLOW_EMPTY_STRUCTS {
222
return Ok(ParquetType::try_from_primitive(
223
name,
224
PhysicalType::Boolean,
225
repetition,
226
None,
227
None,
228
field_id,
229
)?);
230
} else {
231
polars_bail!(
232
InvalidOperation:
233
"Unable to write struct type with no child field to Parquet. Consider adding a dummy child field.",
234
)
235
}
236
}
237
238
// recursively convert children to types/nodes
239
let fields = fields
240
.iter()
241
.map(to_parquet_type)
242
.collect::<PolarsResult<Vec<_>>>()?;
243
return Ok(ParquetType::from_group(
244
name, repetition, None, None, fields, field_id,
245
));
246
},
247
ArrowDataType::Dictionary(_, value, _) => {
248
assert!(!value.is_nested());
249
let dict_field = Field::new(name, value.as_ref().clone(), field.is_nullable);
250
return to_parquet_type(&dict_field);
251
},
252
ArrowDataType::FixedSizeBinary(size) => {
253
(PhysicalType::FixedLenByteArray(*size), None, None)
254
},
255
ArrowDataType::Decimal(precision, scale) => {
256
let precision = *precision;
257
let scale = *scale;
258
let logical_type = Some(PrimitiveLogicalType::Decimal(precision, scale));
259
260
let physical_type = if precision <= 9 {
261
PhysicalType::Int32
262
} else if precision <= 18 {
263
PhysicalType::Int64
264
} else {
265
let len = decimal_length_from_precision(precision);
266
PhysicalType::FixedLenByteArray(len)
267
};
268
(
269
physical_type,
270
Some(PrimitiveConvertedType::Decimal(precision, scale)),
271
logical_type,
272
)
273
},
274
ArrowDataType::Decimal256(precision, scale) => {
275
let precision = *precision;
276
let scale = *scale;
277
let logical_type = Some(PrimitiveLogicalType::Decimal(precision, scale));
278
279
if precision <= 9 {
280
(
281
PhysicalType::Int32,
282
Some(PrimitiveConvertedType::Decimal(precision, scale)),
283
logical_type,
284
)
285
} else if precision <= 18 {
286
(
287
PhysicalType::Int64,
288
Some(PrimitiveConvertedType::Decimal(precision, scale)),
289
logical_type,
290
)
291
} else if precision <= 38 {
292
let len = decimal_length_from_precision(precision);
293
(
294
PhysicalType::FixedLenByteArray(len),
295
Some(PrimitiveConvertedType::Decimal(precision, scale)),
296
logical_type,
297
)
298
} else {
299
(PhysicalType::FixedLenByteArray(32), None, None)
300
}
301
},
302
ArrowDataType::Interval(_) => (
303
PhysicalType::FixedLenByteArray(12),
304
Some(PrimitiveConvertedType::Interval),
305
None,
306
),
307
ArrowDataType::UInt128 | ArrowDataType::Int128 => {
308
(PhysicalType::FixedLenByteArray(16), None, None)
309
},
310
ArrowDataType::List(f)
311
| ArrowDataType::FixedSizeList(f, _)
312
| ArrowDataType::LargeList(f) => {
313
let mut f = f.clone();
314
f.name = PlSmallStr::from_static("element");
315
316
return Ok(ParquetType::from_group(
317
name,
318
repetition,
319
Some(GroupConvertedType::List),
320
Some(GroupLogicalType::List),
321
vec![ParquetType::from_group(
322
PlSmallStr::from_static("list"),
323
Repetition::Repeated,
324
None,
325
None,
326
vec![to_parquet_type(&f)?],
327
None,
328
)],
329
field_id,
330
));
331
},
332
other => polars_bail!(nyi = "Writing the data type {other:?} is not yet implemented"),
333
};
334
335
Ok(ParquetType::try_from_primitive(
336
name,
337
physical_type,
338
repetition,
339
primitive_converted_type,
340
primitive_logical_type,
341
field_id,
342
)?)
343
}
344
345