Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/schema/convert.rs
6940 views
1
//! This module has entry points, [`parquet_to_arrow_schema`] and the more configurable [`parquet_to_arrow_schema_with_options`].
2
use std::sync::Arc;
3
4
use arrow::datatypes::{ArrowDataType, ArrowSchema, Field, IntervalUnit, Metadata, TimeUnit};
5
use polars_utils::format_pl_smallstr;
6
use polars_utils::pl_str::PlSmallStr;
7
8
use crate::arrow::read::schema::SchemaInferenceOptions;
9
use crate::parquet::schema::Repetition;
10
use crate::parquet::schema::types::{
11
FieldInfo, GroupConvertedType, GroupLogicalType, IntegerType, ParquetType, PhysicalType,
12
PrimitiveConvertedType, PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit,
13
};
14
15
/// Converts [`ParquetType`]s to a [`Field`], ignoring parquet fields that do not contain
16
/// any physical column.
17
pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> ArrowSchema {
18
parquet_to_arrow_schema_with_options(fields, &None)
19
}
20
21
/// Like [`parquet_to_arrow_schema`] but with configurable options which affect the behavior of schema inference
22
pub fn parquet_to_arrow_schema_with_options(
23
fields: &[ParquetType],
24
options: &Option<SchemaInferenceOptions>,
25
) -> ArrowSchema {
26
fields
27
.iter()
28
.filter_map(|f| to_field(f, options.as_ref().unwrap_or(&Default::default())))
29
.map(|x| (x.name.clone(), x))
30
.collect()
31
}
32
33
fn from_int32(
34
logical_type: Option<PrimitiveLogicalType>,
35
converted_type: Option<PrimitiveConvertedType>,
36
) -> ArrowDataType {
37
use PrimitiveLogicalType::*;
38
match (logical_type, converted_type) {
39
// handle logical types first
40
(Some(Integer(t)), _) => match t {
41
IntegerType::Int8 => ArrowDataType::Int8,
42
IntegerType::Int16 => ArrowDataType::Int16,
43
IntegerType::Int32 => ArrowDataType::Int32,
44
IntegerType::UInt8 => ArrowDataType::UInt8,
45
IntegerType::UInt16 => ArrowDataType::UInt16,
46
IntegerType::UInt32 => ArrowDataType::UInt32,
47
// The above are the only possible annotations for parquet's int32. Anything else
48
// is a deviation to the parquet specification and we ignore
49
_ => ArrowDataType::Int32,
50
},
51
(Some(Decimal(precision, scale)), _) => ArrowDataType::Decimal(precision, scale),
52
(Some(Date), _) => ArrowDataType::Date32,
53
(Some(Time { unit, .. }), _) => match unit {
54
ParquetTimeUnit::Milliseconds => ArrowDataType::Time32(TimeUnit::Millisecond),
55
// MILLIS is the only possible annotation for parquet's int32. Anything else
56
// is a deviation to the parquet specification and we ignore
57
_ => ArrowDataType::Int32,
58
},
59
// handle converted types:
60
(_, Some(PrimitiveConvertedType::Uint8)) => ArrowDataType::UInt8,
61
(_, Some(PrimitiveConvertedType::Uint16)) => ArrowDataType::UInt16,
62
(_, Some(PrimitiveConvertedType::Uint32)) => ArrowDataType::UInt32,
63
(_, Some(PrimitiveConvertedType::Int8)) => ArrowDataType::Int8,
64
(_, Some(PrimitiveConvertedType::Int16)) => ArrowDataType::Int16,
65
(_, Some(PrimitiveConvertedType::Int32)) => ArrowDataType::Int32,
66
(_, Some(PrimitiveConvertedType::Date)) => ArrowDataType::Date32,
67
(_, Some(PrimitiveConvertedType::TimeMillis)) => {
68
ArrowDataType::Time32(TimeUnit::Millisecond)
69
},
70
(_, Some(PrimitiveConvertedType::Decimal(precision, scale))) => {
71
ArrowDataType::Decimal(precision, scale)
72
},
73
(_, _) => ArrowDataType::Int32,
74
}
75
}
76
77
fn from_int64(
78
logical_type: Option<PrimitiveLogicalType>,
79
converted_type: Option<PrimitiveConvertedType>,
80
) -> ArrowDataType {
81
use PrimitiveLogicalType::*;
82
match (logical_type, converted_type) {
83
// handle logical types first
84
(Some(Integer(integer)), _) => match integer {
85
IntegerType::UInt64 => ArrowDataType::UInt64,
86
IntegerType::Int64 => ArrowDataType::Int64,
87
_ => ArrowDataType::Int64,
88
},
89
(
90
Some(Timestamp {
91
is_adjusted_to_utc,
92
unit,
93
}),
94
_,
95
) => {
96
let timezone = if is_adjusted_to_utc {
97
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
98
// A TIMESTAMP with isAdjustedToUTC=true is defined as [...] elapsed since the Unix epoch
99
Some(PlSmallStr::from_static("+00:00"))
100
} else {
101
// PARQUET:
102
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
103
// A TIMESTAMP with isAdjustedToUTC=false represents [...] such
104
// timestamps should always be displayed the same way, regardless of the local time zone in effect
105
// ARROW:
106
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
107
// If the time zone is null or equal to an empty string, the data is "time
108
// zone naive" and shall be displayed *as is* to the user, not localized
109
// to the locale of the user.
110
None
111
};
112
113
match unit {
114
ParquetTimeUnit::Milliseconds => {
115
ArrowDataType::Timestamp(TimeUnit::Millisecond, timezone)
116
},
117
ParquetTimeUnit::Microseconds => {
118
ArrowDataType::Timestamp(TimeUnit::Microsecond, timezone)
119
},
120
ParquetTimeUnit::Nanoseconds => {
121
ArrowDataType::Timestamp(TimeUnit::Nanosecond, timezone)
122
},
123
}
124
},
125
(Some(Time { unit, .. }), _) => match unit {
126
ParquetTimeUnit::Microseconds => ArrowDataType::Time64(TimeUnit::Microsecond),
127
ParquetTimeUnit::Nanoseconds => ArrowDataType::Time64(TimeUnit::Nanosecond),
128
// MILLIS is only possible for int32. Appearing in int64 is a deviation
129
// to parquet's spec, which we ignore
130
_ => ArrowDataType::Int64,
131
},
132
(Some(Decimal(precision, scale)), _) => ArrowDataType::Decimal(precision, scale),
133
// handle converted types:
134
(_, Some(PrimitiveConvertedType::TimeMicros)) => {
135
ArrowDataType::Time64(TimeUnit::Microsecond)
136
},
137
(_, Some(PrimitiveConvertedType::TimestampMillis)) => {
138
ArrowDataType::Timestamp(TimeUnit::Millisecond, None)
139
},
140
(_, Some(PrimitiveConvertedType::TimestampMicros)) => {
141
ArrowDataType::Timestamp(TimeUnit::Microsecond, None)
142
},
143
(_, Some(PrimitiveConvertedType::Int64)) => ArrowDataType::Int64,
144
(_, Some(PrimitiveConvertedType::Uint64)) => ArrowDataType::UInt64,
145
(_, Some(PrimitiveConvertedType::Decimal(precision, scale))) => {
146
ArrowDataType::Decimal(precision, scale)
147
},
148
149
(_, _) => ArrowDataType::Int64,
150
}
151
}
152
153
fn from_byte_array(
154
logical_type: &Option<PrimitiveLogicalType>,
155
converted_type: &Option<PrimitiveConvertedType>,
156
) -> ArrowDataType {
157
match (logical_type, converted_type) {
158
(Some(PrimitiveLogicalType::String), _) => ArrowDataType::Utf8View,
159
(Some(PrimitiveLogicalType::Json), _) => ArrowDataType::BinaryView,
160
(Some(PrimitiveLogicalType::Bson), _) => ArrowDataType::BinaryView,
161
(Some(PrimitiveLogicalType::Enum), _) => ArrowDataType::BinaryView,
162
(_, Some(PrimitiveConvertedType::Json)) => ArrowDataType::BinaryView,
163
(_, Some(PrimitiveConvertedType::Bson)) => ArrowDataType::BinaryView,
164
(_, Some(PrimitiveConvertedType::Enum)) => ArrowDataType::BinaryView,
165
(_, Some(PrimitiveConvertedType::Utf8)) => ArrowDataType::Utf8View,
166
(_, _) => ArrowDataType::BinaryView,
167
}
168
}
169
170
fn from_fixed_len_byte_array(
171
length: usize,
172
logical_type: Option<PrimitiveLogicalType>,
173
converted_type: Option<PrimitiveConvertedType>,
174
) -> ArrowDataType {
175
match (logical_type, converted_type) {
176
(Some(PrimitiveLogicalType::Decimal(precision, scale)), _) => {
177
ArrowDataType::Decimal(precision, scale)
178
},
179
(None, Some(PrimitiveConvertedType::Decimal(precision, scale))) => {
180
ArrowDataType::Decimal(precision, scale)
181
},
182
(None, Some(PrimitiveConvertedType::Interval)) => {
183
// There is currently no reliable way of determining which IntervalUnit
184
// to return. Thus without the original Arrow schema, the results
185
// would be incorrect if all 12 bytes of the interval are populated
186
ArrowDataType::Interval(IntervalUnit::DayTime)
187
},
188
_ => ArrowDataType::FixedSizeBinary(length),
189
}
190
}
191
192
/// Maps a [`PhysicalType`] with optional metadata to a [`ArrowDataType`]
193
fn to_primitive_type_inner(
194
primitive_type: &PrimitiveType,
195
options: &SchemaInferenceOptions,
196
) -> ArrowDataType {
197
match primitive_type.physical_type {
198
PhysicalType::Boolean => ArrowDataType::Boolean,
199
PhysicalType::Int32 => {
200
from_int32(primitive_type.logical_type, primitive_type.converted_type)
201
},
202
PhysicalType::Int64 => {
203
from_int64(primitive_type.logical_type, primitive_type.converted_type)
204
},
205
PhysicalType::Int96 => ArrowDataType::Timestamp(options.int96_coerce_to_timeunit, None),
206
PhysicalType::Float => ArrowDataType::Float32,
207
PhysicalType::Double => ArrowDataType::Float64,
208
PhysicalType::ByteArray => {
209
from_byte_array(&primitive_type.logical_type, &primitive_type.converted_type)
210
},
211
PhysicalType::FixedLenByteArray(length) => from_fixed_len_byte_array(
212
length,
213
primitive_type.logical_type,
214
primitive_type.converted_type,
215
),
216
}
217
}
218
219
/// Entry point for converting parquet primitive type to arrow type.
220
///
221
/// This function takes care of repetition.
222
fn to_primitive_type(
223
primitive_type: &PrimitiveType,
224
options: &SchemaInferenceOptions,
225
) -> ArrowDataType {
226
let base_type = to_primitive_type_inner(primitive_type, options);
227
228
if primitive_type.field_info.repetition == Repetition::Repeated {
229
ArrowDataType::LargeList(Box::new(Field::new(
230
primitive_type.field_info.name.clone(),
231
base_type,
232
is_nullable(&primitive_type.field_info),
233
)))
234
} else {
235
base_type
236
}
237
}
238
239
fn non_repeated_group(
240
logical_type: &Option<GroupLogicalType>,
241
converted_type: &Option<GroupConvertedType>,
242
fields: &[ParquetType],
243
parent_name: &str,
244
options: &SchemaInferenceOptions,
245
) -> Option<ArrowDataType> {
246
debug_assert!(!fields.is_empty());
247
match (logical_type, converted_type) {
248
(Some(GroupLogicalType::List), _) => to_list(fields, parent_name, options),
249
(None, Some(GroupConvertedType::List)) => to_list(fields, parent_name, options),
250
(Some(GroupLogicalType::Map), _) => to_list(fields, parent_name, options),
251
(None, Some(GroupConvertedType::Map) | Some(GroupConvertedType::MapKeyValue)) => {
252
to_map(fields, options)
253
},
254
_ => to_struct(fields, options),
255
}
256
}
257
258
/// Converts a parquet group type to an arrow [`ArrowDataType::Struct`].
259
/// Returns [`None`] if all its fields are empty
260
fn to_struct(fields: &[ParquetType], options: &SchemaInferenceOptions) -> Option<ArrowDataType> {
261
let fields = fields
262
.iter()
263
.filter_map(|f| to_field(f, options))
264
.collect::<Vec<Field>>();
265
if fields.is_empty() {
266
None
267
} else {
268
Some(ArrowDataType::Struct(fields))
269
}
270
}
271
272
/// Converts a parquet group type to an arrow [`ArrowDataType::Struct`].
273
/// Returns [`None`] if all its fields are empty
274
fn to_map(fields: &[ParquetType], options: &SchemaInferenceOptions) -> Option<ArrowDataType> {
275
let inner = to_field(&fields[0], options)?;
276
Some(ArrowDataType::Map(Box::new(inner), false))
277
}
278
279
/// Entry point for converting parquet group type.
280
///
281
/// This function takes care of logical type and repetition.
282
fn to_group_type(
283
field_info: &FieldInfo,
284
logical_type: &Option<GroupLogicalType>,
285
converted_type: &Option<GroupConvertedType>,
286
fields: &[ParquetType],
287
parent_name: &str,
288
options: &SchemaInferenceOptions,
289
) -> Option<ArrowDataType> {
290
debug_assert!(!fields.is_empty());
291
if field_info.repetition == Repetition::Repeated {
292
Some(ArrowDataType::LargeList(Box::new(Field::new(
293
field_info.name.clone(),
294
to_struct(fields, options)?,
295
is_nullable(field_info),
296
))))
297
} else {
298
non_repeated_group(logical_type, converted_type, fields, parent_name, options)
299
}
300
}
301
302
/// Checks whether this schema is nullable.
303
pub(crate) fn is_nullable(field_info: &FieldInfo) -> bool {
304
match field_info.repetition {
305
Repetition::Optional => true,
306
Repetition::Repeated => true,
307
Repetition::Required => false,
308
}
309
}
310
311
/// Converts parquet schema to arrow field.
312
/// Returns `None` iff the parquet type has no associated primitive types,
313
/// i.e. if it is a column-less group type.
314
fn to_field(type_: &ParquetType, options: &SchemaInferenceOptions) -> Option<Field> {
315
let field_info = type_.get_field_info();
316
317
let metadata: Option<Arc<Metadata>> = field_info.id.map(|x: i32| {
318
Arc::new(
319
[(
320
PlSmallStr::from_static("PARQUET:field_id"),
321
format_pl_smallstr!("{x}"),
322
)]
323
.into(),
324
)
325
});
326
327
let mut arrow_field = Field::new(
328
field_info.name.clone(),
329
to_dtype(type_, options)?,
330
is_nullable(type_.get_field_info()),
331
);
332
333
arrow_field.metadata = metadata;
334
335
Some(arrow_field)
336
}
337
338
/// Converts a parquet list to arrow list.
339
///
340
/// To fully understand this algorithm, please refer to
341
/// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md).
342
fn to_list(
343
fields: &[ParquetType],
344
parent_name: &str,
345
options: &SchemaInferenceOptions,
346
) -> Option<ArrowDataType> {
347
let item = fields.first().unwrap();
348
349
let item_type = match item {
350
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type_inner(primitive, options)),
351
ParquetType::GroupType { fields, .. } => {
352
if fields.len() == 1 && item.name() != "array" && {
353
// item.name() != format!("{parent_name}_tuple")
354
let cmp = [parent_name, "_tuple"];
355
let len_1 = parent_name.len();
356
let len = len_1 + "_tuple".len();
357
358
item.name().len() != len || [&item.name()[..len_1], &item.name()[len_1..]] != cmp
359
} {
360
// extract the repetition field
361
let nested_item = fields.first().unwrap();
362
to_dtype(nested_item, options)
363
} else {
364
to_struct(fields, options)
365
}
366
},
367
}?;
368
369
// Check that the name of the list child is "list", in which case we
370
// get the child nullability and name (normally "element") from the nested
371
// group type.
372
// Without this step, the child incorrectly inherits the parent's optionality
373
let (list_item_name, item_is_optional) = match item {
374
ParquetType::GroupType {
375
field_info, fields, ..
376
} if field_info.name.as_str() == "list" && fields.len() == 1 => {
377
let field = fields.first().unwrap();
378
(
379
field.get_field_info().name.clone(),
380
field.get_field_info().repetition == Repetition::Optional,
381
)
382
},
383
_ => (
384
item.get_field_info().name.clone(),
385
item.get_field_info().repetition == Repetition::Optional,
386
),
387
};
388
389
Some(ArrowDataType::LargeList(Box::new(Field::new(
390
list_item_name,
391
item_type,
392
item_is_optional,
393
))))
394
}
395
396
/// Converts parquet schema to arrow data type.
397
///
398
/// This function discards schema name.
399
///
400
/// If this schema is a primitive type and not included in the leaves, the result is
401
/// Ok(None).
402
///
403
/// If this schema is a group type and none of its children is reserved in the
404
/// conversion, the result is Ok(None).
405
pub(crate) fn to_dtype(
406
type_: &ParquetType,
407
options: &SchemaInferenceOptions,
408
) -> Option<ArrowDataType> {
409
match type_ {
410
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type(primitive, options)),
411
ParquetType::GroupType {
412
field_info,
413
logical_type,
414
converted_type,
415
fields,
416
} => {
417
if fields.is_empty() {
418
None
419
} else {
420
to_group_type(
421
field_info,
422
logical_type,
423
converted_type,
424
fields,
425
field_info.name.as_str(),
426
options,
427
)
428
}
429
},
430
}
431
}
432
433
#[cfg(test)]
434
mod tests {
435
use polars_error::*;
436
437
use super::*;
438
use crate::parquet::metadata::SchemaDescriptor;
439
440
#[test]
441
fn test_flat_primitives() -> PolarsResult<()> {
442
let message = "
443
message test_schema {
444
REQUIRED BOOLEAN boolean;
445
REQUIRED INT32 int8 (INT_8);
446
REQUIRED INT32 int16 (INT_16);
447
REQUIRED INT32 uint8 (INTEGER(8,false));
448
REQUIRED INT32 uint16 (INTEGER(16,false));
449
REQUIRED INT32 int32;
450
REQUIRED INT64 int64 ;
451
OPTIONAL DOUBLE double;
452
OPTIONAL FLOAT float;
453
OPTIONAL BINARY string (UTF8);
454
OPTIONAL BINARY string_2 (STRING);
455
}
456
";
457
let expected = &[
458
Field::new("boolean".into(), ArrowDataType::Boolean, false),
459
Field::new("int8".into(), ArrowDataType::Int8, false),
460
Field::new("int16".into(), ArrowDataType::Int16, false),
461
Field::new("uint8".into(), ArrowDataType::UInt8, false),
462
Field::new("uint16".into(), ArrowDataType::UInt16, false),
463
Field::new("int32".into(), ArrowDataType::Int32, false),
464
Field::new("int64".into(), ArrowDataType::Int64, false),
465
Field::new("double".into(), ArrowDataType::Float64, true),
466
Field::new("float".into(), ArrowDataType::Float32, true),
467
Field::new("string".into(), ArrowDataType::Utf8View, true),
468
Field::new("string_2".into(), ArrowDataType::Utf8View, true),
469
];
470
471
let parquet_schema = SchemaDescriptor::try_from_message(message)?;
472
let fields = parquet_to_arrow_schema(parquet_schema.fields());
473
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
474
475
assert_eq!(fields, expected);
476
Ok(())
477
}
478
479
#[test]
480
fn test_byte_array_fields() -> PolarsResult<()> {
481
let message = "
482
message test_schema {
483
REQUIRED BYTE_ARRAY binary;
484
REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
485
}
486
";
487
let expected = vec![
488
Field::new("binary".into(), ArrowDataType::BinaryView, false),
489
Field::new(
490
"fixed_binary".into(),
491
ArrowDataType::FixedSizeBinary(20),
492
false,
493
),
494
];
495
496
let parquet_schema = SchemaDescriptor::try_from_message(message)?;
497
let fields = parquet_to_arrow_schema(parquet_schema.fields());
498
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
499
500
assert_eq!(fields, expected);
501
Ok(())
502
}
503
504
#[test]
505
fn test_duplicate_fields() -> PolarsResult<()> {
506
let message = "
507
message test_schema {
508
REQUIRED BOOLEAN boolean;
509
REQUIRED INT32 int8 (INT_8);
510
}
511
";
512
let expected = &[
513
Field::new("boolean".into(), ArrowDataType::Boolean, false),
514
Field::new("int8".into(), ArrowDataType::Int8, false),
515
];
516
517
let parquet_schema = SchemaDescriptor::try_from_message(message)?;
518
let fields = parquet_to_arrow_schema(parquet_schema.fields());
519
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
520
521
assert_eq!(fields, expected);
522
Ok(())
523
}
524
525
#[ignore]
526
#[test]
527
fn test_parquet_lists() -> PolarsResult<()> {
528
let mut arrow_fields = Vec::new();
529
530
// LIST encoding example taken from parquet-format/LogicalTypes.md
531
let message_type = "
532
message test_schema {
533
REQUIRED GROUP my_list (LIST) {
534
REPEATED GROUP list {
535
OPTIONAL BINARY element (UTF8);
536
}
537
}
538
OPTIONAL GROUP my_list (LIST) {
539
REPEATED GROUP list {
540
REQUIRED BINARY element (UTF8);
541
}
542
}
543
OPTIONAL GROUP array_of_arrays (LIST) {
544
REPEATED GROUP list {
545
REQUIRED GROUP element (LIST) {
546
REPEATED GROUP list {
547
REQUIRED INT32 element;
548
}
549
}
550
}
551
}
552
OPTIONAL GROUP my_list (LIST) {
553
REPEATED GROUP element {
554
REQUIRED BINARY str (UTF8);
555
}
556
}
557
OPTIONAL GROUP my_list (LIST) {
558
REPEATED INT32 element;
559
}
560
OPTIONAL GROUP my_list (LIST) {
561
REPEATED GROUP element {
562
REQUIRED BINARY str (UTF8);
563
REQUIRED INT32 num;
564
}
565
}
566
OPTIONAL GROUP my_list (LIST) {
567
REPEATED GROUP array {
568
REQUIRED BINARY str (UTF8);
569
}
570
571
}
572
OPTIONAL GROUP my_list (LIST) {
573
REPEATED GROUP my_list_tuple {
574
REQUIRED BINARY str (UTF8);
575
}
576
}
577
REPEATED INT32 name;
578
}
579
";
580
581
// // List<String> (list non-null, elements nullable)
582
// required group my_list (LIST) {
583
// repeated group list {
584
// optional binary element (UTF8);
585
// }
586
// }
587
{
588
arrow_fields.push(Field::new(
589
"my_list".into(),
590
ArrowDataType::LargeList(Box::new(Field::new(
591
"element".into(),
592
ArrowDataType::Utf8,
593
true,
594
))),
595
false,
596
));
597
}
598
599
// // List<String> (list nullable, elements non-null)
600
// optional group my_list (LIST) {
601
// repeated group list {
602
// required binary element (UTF8);
603
// }
604
// }
605
{
606
arrow_fields.push(Field::new(
607
"my_list".into(),
608
ArrowDataType::LargeList(Box::new(Field::new(
609
"element".into(),
610
ArrowDataType::Utf8,
611
false,
612
))),
613
true,
614
));
615
}
616
617
// Element types can be nested structures. For example, a list of lists:
618
//
619
// // List<List<Integer>>
620
// optional group array_of_arrays (LIST) {
621
// repeated group list {
622
// required group element (LIST) {
623
// repeated group list {
624
// required int32 element;
625
// }
626
// }
627
// }
628
// }
629
{
630
let arrow_inner_list = ArrowDataType::LargeList(Box::new(Field::new(
631
"element".into(),
632
ArrowDataType::Int32,
633
false,
634
)));
635
arrow_fields.push(Field::new(
636
"array_of_arrays".into(),
637
ArrowDataType::LargeList(Box::new(Field::new(
638
PlSmallStr::from_static("element"),
639
arrow_inner_list,
640
false,
641
))),
642
true,
643
));
644
}
645
646
// // List<String> (list nullable, elements non-null)
647
// optional group my_list (LIST) {
648
// repeated group element {
649
// required binary str (UTF8);
650
// };
651
// }
652
{
653
arrow_fields.push(Field::new(
654
"my_list".into(),
655
ArrowDataType::LargeList(Box::new(Field::new(
656
"element".into(),
657
ArrowDataType::Utf8,
658
false,
659
))),
660
true,
661
));
662
}
663
664
// // List<Integer> (nullable list, non-null elements)
665
// optional group my_list (LIST) {
666
// repeated int32 element;
667
// }
668
{
669
arrow_fields.push(Field::new(
670
"my_list".into(),
671
ArrowDataType::LargeList(Box::new(Field::new(
672
"element".into(),
673
ArrowDataType::Int32,
674
false,
675
))),
676
true,
677
));
678
}
679
680
// // List<Tuple<String, Integer>> (nullable list, non-null elements)
681
// optional group my_list (LIST) {
682
// repeated group element {
683
// required binary str (UTF8);
684
// required int32 num;
685
// };
686
// }
687
{
688
let arrow_struct = ArrowDataType::Struct(vec![
689
Field::new("str".into(), ArrowDataType::Utf8, false),
690
Field::new("num".into(), ArrowDataType::Int32, false),
691
]);
692
arrow_fields.push(Field::new(
693
"my_list".into(),
694
ArrowDataType::LargeList(Box::new(Field::new(
695
"element".into(),
696
arrow_struct,
697
false,
698
))),
699
true,
700
));
701
}
702
703
// // List<OneTuple<String>> (nullable list, non-null elements)
704
// optional group my_list (LIST) {
705
// repeated group array {
706
// required binary str (UTF8);
707
// };
708
// }
709
// Special case: group is named array
710
{
711
let arrow_struct =
712
ArrowDataType::Struct(vec![Field::new("str".into(), ArrowDataType::Utf8, false)]);
713
arrow_fields.push(Field::new(
714
"my_list".into(),
715
ArrowDataType::LargeList(Box::new(Field::new("array".into(), arrow_struct, false))),
716
true,
717
));
718
}
719
720
// // List<OneTuple<String>> (nullable list, non-null elements)
721
// optional group my_list (LIST) {
722
// repeated group my_list_tuple {
723
// required binary str (UTF8);
724
// };
725
// }
726
// Special case: group named ends in _tuple
727
{
728
let arrow_struct =
729
ArrowDataType::Struct(vec![Field::new("str".into(), ArrowDataType::Utf8, false)]);
730
arrow_fields.push(Field::new(
731
"my_list".into(),
732
ArrowDataType::LargeList(Box::new(Field::new(
733
"my_list_tuple".into(),
734
arrow_struct,
735
false,
736
))),
737
true,
738
));
739
}
740
741
// One-level encoding: Only allows required lists with required cells
742
// repeated value_type name
743
{
744
arrow_fields.push(Field::new(
745
"name".into(),
746
ArrowDataType::LargeList(Box::new(Field::new(
747
"name".into(),
748
ArrowDataType::Int32,
749
false,
750
))),
751
false,
752
));
753
}
754
755
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
756
let fields = parquet_to_arrow_schema(parquet_schema.fields());
757
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
758
759
assert_eq!(arrow_fields, fields);
760
Ok(())
761
}
762
763
#[test]
764
fn test_parquet_list_with_struct() -> PolarsResult<()> {
765
let mut arrow_fields = Vec::new();
766
767
let message_type = "
768
message eventlog {
769
REQUIRED group events (LIST) {
770
REPEATED group array {
771
REQUIRED BYTE_ARRAY event_name (STRING);
772
REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true));
773
}
774
}
775
}
776
";
777
778
{
779
let struct_fields = vec![
780
Field::new("event_name".into(), ArrowDataType::Utf8View, false),
781
Field::new(
782
"event_time".into(),
783
ArrowDataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
784
false,
785
),
786
];
787
arrow_fields.push(Field::new(
788
"events".into(),
789
ArrowDataType::LargeList(Box::new(Field::new(
790
"array".into(),
791
ArrowDataType::Struct(struct_fields),
792
false,
793
))),
794
false,
795
));
796
}
797
798
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
799
let fields = parquet_to_arrow_schema(parquet_schema.fields());
800
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
801
802
assert_eq!(arrow_fields, fields);
803
Ok(())
804
}
805
806
#[test]
807
fn test_parquet_list_nullable() -> PolarsResult<()> {
808
let mut arrow_fields = Vec::new();
809
810
let message_type = "
811
message test_schema {
812
REQUIRED GROUP my_list1 (LIST) {
813
REPEATED GROUP list {
814
OPTIONAL BINARY element (UTF8);
815
}
816
}
817
OPTIONAL GROUP my_list2 (LIST) {
818
REPEATED GROUP list {
819
REQUIRED BINARY element (UTF8);
820
}
821
}
822
REQUIRED GROUP my_list3 (LIST) {
823
REPEATED GROUP list {
824
REQUIRED BINARY element (UTF8);
825
}
826
}
827
}
828
";
829
830
// // List<String> (list non-null, elements nullable)
831
// required group my_list1 (LIST) {
832
// repeated group list {
833
// optional binary element (UTF8);
834
// }
835
// }
836
{
837
arrow_fields.push(Field::new(
838
"my_list1".into(),
839
ArrowDataType::LargeList(Box::new(Field::new(
840
"element".into(),
841
ArrowDataType::Utf8View,
842
true,
843
))),
844
false,
845
));
846
}
847
848
// // List<String> (list nullable, elements non-null)
849
// optional group my_list2 (LIST) {
850
// repeated group list {
851
// required binary element (UTF8);
852
// }
853
// }
854
{
855
arrow_fields.push(Field::new(
856
"my_list2".into(),
857
ArrowDataType::LargeList(Box::new(Field::new(
858
"element".into(),
859
ArrowDataType::Utf8View,
860
false,
861
))),
862
true,
863
));
864
}
865
866
// // List<String> (list non-null, elements non-null)
867
// repeated group my_list3 (LIST) {
868
// repeated group list {
869
// required binary element (UTF8);
870
// }
871
// }
872
{
873
arrow_fields.push(Field::new(
874
"my_list3".into(),
875
ArrowDataType::LargeList(Box::new(Field::new(
876
"element".into(),
877
ArrowDataType::Utf8View,
878
false,
879
))),
880
false,
881
));
882
}
883
884
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
885
let fields = parquet_to_arrow_schema(parquet_schema.fields());
886
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
887
888
assert_eq!(arrow_fields, fields);
889
Ok(())
890
}
891
892
#[test]
893
fn test_nested_schema() -> PolarsResult<()> {
894
let mut arrow_fields = Vec::new();
895
{
896
let group1_fields = vec![
897
Field::new("leaf1".into(), ArrowDataType::Boolean, false),
898
Field::new("leaf2".into(), ArrowDataType::Int32, false),
899
];
900
let group1_struct =
901
Field::new("group1".into(), ArrowDataType::Struct(group1_fields), false);
902
arrow_fields.push(group1_struct);
903
904
let leaf3_field = Field::new("leaf3".into(), ArrowDataType::Int64, false);
905
arrow_fields.push(leaf3_field);
906
}
907
908
let message_type = "
909
message test_schema {
910
REQUIRED GROUP group1 {
911
REQUIRED BOOLEAN leaf1;
912
REQUIRED INT32 leaf2;
913
}
914
REQUIRED INT64 leaf3;
915
}
916
";
917
918
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
919
let fields = parquet_to_arrow_schema(parquet_schema.fields());
920
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
921
922
assert_eq!(arrow_fields, fields);
923
Ok(())
924
}
925
926
#[ignore]
927
#[test]
928
fn test_repeated_nested_schema() -> PolarsResult<()> {
929
let mut arrow_fields = Vec::new();
930
{
931
arrow_fields.push(Field::new("leaf1".into(), ArrowDataType::Int32, true));
932
933
let inner_group_list = Field::new(
934
"innerGroup".into(),
935
ArrowDataType::LargeList(Box::new(Field::new(
936
"innerGroup".into(),
937
ArrowDataType::Struct(vec![Field::new(
938
"leaf3".into(),
939
ArrowDataType::Int32,
940
true,
941
)]),
942
false,
943
))),
944
false,
945
);
946
947
let outer_group_list = Field::new(
948
"outerGroup".into(),
949
ArrowDataType::LargeList(Box::new(Field::new(
950
"outerGroup".into(),
951
ArrowDataType::Struct(vec![
952
Field::new("leaf2".into(), ArrowDataType::Int32, true),
953
inner_group_list,
954
]),
955
false,
956
))),
957
false,
958
);
959
arrow_fields.push(outer_group_list);
960
}
961
962
let message_type = "
963
message test_schema {
964
OPTIONAL INT32 leaf1;
965
REPEATED GROUP outerGroup {
966
OPTIONAL INT32 leaf2;
967
REPEATED GROUP innerGroup {
968
OPTIONAL INT32 leaf3;
969
}
970
}
971
}
972
";
973
974
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
975
let fields = parquet_to_arrow_schema(parquet_schema.fields());
976
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
977
978
assert_eq!(arrow_fields, fields);
979
Ok(())
980
}
981
982
#[ignore]
983
#[test]
984
fn test_column_desc_to_field() -> PolarsResult<()> {
985
let message_type = "
986
message test_schema {
987
REQUIRED BOOLEAN boolean;
988
REQUIRED INT32 int8 (INT_8);
989
REQUIRED INT32 uint8 (INTEGER(8,false));
990
REQUIRED INT32 int16 (INT_16);
991
REQUIRED INT32 uint16 (INTEGER(16,false));
992
REQUIRED INT32 int32;
993
REQUIRED INT64 int64;
994
OPTIONAL DOUBLE double;
995
OPTIONAL FLOAT float;
996
OPTIONAL BINARY string (UTF8);
997
REPEATED BOOLEAN bools;
998
OPTIONAL INT32 date (DATE);
999
OPTIONAL INT32 time_milli (TIME_MILLIS);
1000
OPTIONAL INT64 time_micro (TIME_MICROS);
1001
OPTIONAL INT64 time_nano (TIME(NANOS,false));
1002
OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1003
REQUIRED INT64 ts_micro (TIMESTAMP_MICROS);
1004
REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true));
1005
}
1006
";
1007
let arrow_fields = vec![
1008
Field::new("boolean".into(), ArrowDataType::Boolean, false),
1009
Field::new("int8".into(), ArrowDataType::Int8, false),
1010
Field::new("uint8".into(), ArrowDataType::UInt8, false),
1011
Field::new("int16".into(), ArrowDataType::Int16, false),
1012
Field::new("uint16".into(), ArrowDataType::UInt16, false),
1013
Field::new("int32".into(), ArrowDataType::Int32, false),
1014
Field::new("int64".into(), ArrowDataType::Int64, false),
1015
Field::new("double".into(), ArrowDataType::Float64, true),
1016
Field::new("float".into(), ArrowDataType::Float32, true),
1017
Field::new("string".into(), ArrowDataType::Utf8, true),
1018
Field::new(
1019
"bools".into(),
1020
ArrowDataType::LargeList(Box::new(Field::new(
1021
"bools".into(),
1022
ArrowDataType::Boolean,
1023
false,
1024
))),
1025
false,
1026
),
1027
Field::new("date".into(), ArrowDataType::Date32, true),
1028
Field::new(
1029
"time_milli".into(),
1030
ArrowDataType::Time32(TimeUnit::Millisecond),
1031
true,
1032
),
1033
Field::new(
1034
"time_micro".into(),
1035
ArrowDataType::Time64(TimeUnit::Microsecond),
1036
true,
1037
),
1038
Field::new(
1039
"time_nano".into(),
1040
ArrowDataType::Time64(TimeUnit::Nanosecond),
1041
true,
1042
),
1043
Field::new(
1044
"ts_milli".into(),
1045
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1046
true,
1047
),
1048
Field::new(
1049
"ts_micro".into(),
1050
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
1051
false,
1052
),
1053
Field::new(
1054
"ts_nano".into(),
1055
ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
1056
false,
1057
),
1058
];
1059
1060
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
1061
let fields = parquet_to_arrow_schema(parquet_schema.fields());
1062
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
1063
1064
assert_eq!(arrow_fields, fields);
1065
Ok(())
1066
}
1067
1068
#[test]
1069
fn test_field_to_column_desc() -> PolarsResult<()> {
1070
let message_type = "
1071
message arrow_schema {
1072
REQUIRED BOOLEAN boolean;
1073
REQUIRED INT32 int8 (INT_8);
1074
REQUIRED INT32 int16 (INTEGER(16,true));
1075
REQUIRED INT32 int32;
1076
REQUIRED INT64 int64;
1077
OPTIONAL DOUBLE double;
1078
OPTIONAL FLOAT float;
1079
OPTIONAL BINARY string (STRING);
1080
OPTIONAL GROUP bools (LIST) {
1081
REPEATED GROUP list {
1082
OPTIONAL BOOLEAN element;
1083
}
1084
}
1085
REQUIRED GROUP bools_non_null (LIST) {
1086
REPEATED GROUP list {
1087
REQUIRED BOOLEAN element;
1088
}
1089
}
1090
OPTIONAL INT32 date (DATE);
1091
OPTIONAL INT32 time_milli (TIME(MILLIS,false));
1092
OPTIONAL INT64 time_micro (TIME_MICROS);
1093
OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1094
REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
1095
REQUIRED GROUP struct {
1096
REQUIRED BOOLEAN bools;
1097
REQUIRED INT32 uint32 (INTEGER(32,false));
1098
REQUIRED GROUP int32 (LIST) {
1099
REPEATED GROUP list {
1100
OPTIONAL INT32 element;
1101
}
1102
}
1103
}
1104
REQUIRED BINARY dictionary_strings (STRING);
1105
}
1106
";
1107
1108
let arrow_fields = vec![
1109
Field::new("boolean".into(), ArrowDataType::Boolean, false),
1110
Field::new("int8".into(), ArrowDataType::Int8, false),
1111
Field::new("int16".into(), ArrowDataType::Int16, false),
1112
Field::new("int32".into(), ArrowDataType::Int32, false),
1113
Field::new("int64".into(), ArrowDataType::Int64, false),
1114
Field::new("double".into(), ArrowDataType::Float64, true),
1115
Field::new("float".into(), ArrowDataType::Float32, true),
1116
Field::new("string".into(), ArrowDataType::Utf8View, true),
1117
Field::new(
1118
"bools".into(),
1119
ArrowDataType::LargeList(Box::new(Field::new(
1120
"element".into(),
1121
ArrowDataType::Boolean,
1122
true,
1123
))),
1124
true,
1125
),
1126
Field::new(
1127
"bools_non_null".into(),
1128
ArrowDataType::LargeList(Box::new(Field::new(
1129
"element".into(),
1130
ArrowDataType::Boolean,
1131
false,
1132
))),
1133
false,
1134
),
1135
Field::new("date".into(), ArrowDataType::Date32, true),
1136
Field::new(
1137
"time_milli".into(),
1138
ArrowDataType::Time32(TimeUnit::Millisecond),
1139
true,
1140
),
1141
Field::new(
1142
"time_micro".into(),
1143
ArrowDataType::Time64(TimeUnit::Microsecond),
1144
true,
1145
),
1146
Field::new(
1147
"ts_milli".into(),
1148
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1149
true,
1150
),
1151
Field::new(
1152
"ts_micro".into(),
1153
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
1154
false,
1155
),
1156
Field::new(
1157
"struct".into(),
1158
ArrowDataType::Struct(vec![
1159
Field::new("bools".into(), ArrowDataType::Boolean, false),
1160
Field::new("uint32".into(), ArrowDataType::UInt32, false),
1161
Field::new(
1162
"int32".into(),
1163
ArrowDataType::LargeList(Box::new(Field::new(
1164
"element".into(),
1165
ArrowDataType::Int32,
1166
true,
1167
))),
1168
false,
1169
),
1170
]),
1171
false,
1172
),
1173
Field::new("dictionary_strings".into(), ArrowDataType::Utf8View, false),
1174
];
1175
1176
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
1177
let fields = parquet_to_arrow_schema(parquet_schema.fields());
1178
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
1179
1180
assert_eq!(arrow_fields, fields);
1181
Ok(())
1182
}
1183
1184
#[test]
1185
fn test_int96_options() -> PolarsResult<()> {
1186
for tu in [
1187
TimeUnit::Second,
1188
TimeUnit::Microsecond,
1189
TimeUnit::Millisecond,
1190
TimeUnit::Nanosecond,
1191
] {
1192
let message_type = "
1193
message arrow_schema {
1194
REQUIRED INT96 int96_field;
1195
OPTIONAL GROUP int96_list (LIST) {
1196
REPEATED GROUP list {
1197
OPTIONAL INT96 element;
1198
}
1199
}
1200
REQUIRED GROUP int96_struct {
1201
REQUIRED INT96 int96_field;
1202
}
1203
}
1204
";
1205
let coerced_to = ArrowDataType::Timestamp(tu, None);
1206
let arrow_fields = vec![
1207
Field::new("int96_field".into(), coerced_to.clone(), false),
1208
Field::new(
1209
"int96_list".into(),
1210
ArrowDataType::LargeList(Box::new(Field::new(
1211
"element".into(),
1212
coerced_to.clone(),
1213
true,
1214
))),
1215
true,
1216
),
1217
Field::new(
1218
"int96_struct".into(),
1219
ArrowDataType::Struct(vec![Field::new(
1220
"int96_field".into(),
1221
coerced_to.clone(),
1222
false,
1223
)]),
1224
false,
1225
),
1226
];
1227
1228
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
1229
let fields = parquet_to_arrow_schema_with_options(
1230
parquet_schema.fields(),
1231
&Some(SchemaInferenceOptions {
1232
int96_coerce_to_timeunit: tu,
1233
}),
1234
);
1235
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
1236
assert_eq!(arrow_fields, fields);
1237
}
1238
Ok(())
1239
}
1240
}
1241
1242