Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/schema/convert.rs
8489 views
1
//! This module has entry points, [`parquet_to_arrow_schema`] and the more configurable [`parquet_to_arrow_schema_with_options`].
2
use std::sync::Arc;
3
4
use arrow::datatypes::{ArrowDataType, ArrowSchema, Field, IntervalUnit, Metadata, TimeUnit};
5
use polars_utils::format_pl_smallstr;
6
use polars_utils::pl_str::PlSmallStr;
7
8
use crate::arrow::read::schema::SchemaInferenceOptions;
9
use crate::parquet::schema::Repetition;
10
use crate::parquet::schema::types::{
11
FieldInfo, GroupConvertedType, GroupLogicalType, IntegerType, ParquetType, PhysicalType,
12
PrimitiveConvertedType, PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit,
13
};
14
15
/// Converts [`ParquetType`]s to a [`Field`], ignoring parquet fields that do not contain
16
/// any physical column.
17
pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> ArrowSchema {
18
parquet_to_arrow_schema_with_options(fields, &None)
19
}
20
21
/// Like [`parquet_to_arrow_schema`] but with configurable options which affect the behavior of schema inference
22
pub fn parquet_to_arrow_schema_with_options(
23
fields: &[ParquetType],
24
options: &Option<SchemaInferenceOptions>,
25
) -> ArrowSchema {
26
fields
27
.iter()
28
.filter_map(|f| to_field(f, options.as_ref().unwrap_or(&Default::default())))
29
.map(|x| (x.name.clone(), x))
30
.collect()
31
}
32
33
fn from_int32(
34
logical_type: Option<PrimitiveLogicalType>,
35
converted_type: Option<PrimitiveConvertedType>,
36
) -> ArrowDataType {
37
use PrimitiveLogicalType::*;
38
match (logical_type, converted_type) {
39
// handle logical types first
40
(Some(Integer(t)), _) => match t {
41
IntegerType::Int8 => ArrowDataType::Int8,
42
IntegerType::Int16 => ArrowDataType::Int16,
43
IntegerType::Int32 => ArrowDataType::Int32,
44
IntegerType::UInt8 => ArrowDataType::UInt8,
45
IntegerType::UInt16 => ArrowDataType::UInt16,
46
IntegerType::UInt32 => ArrowDataType::UInt32,
47
// The above are the only possible annotations for parquet's int32. Anything else
48
// is a deviation to the parquet specification and we ignore
49
_ => ArrowDataType::Int32,
50
},
51
(Some(Decimal(precision, scale)), _) => ArrowDataType::Decimal(precision, scale),
52
(Some(Date), _) => ArrowDataType::Date32,
53
(Some(Time { unit, .. }), _) => match unit {
54
ParquetTimeUnit::Milliseconds => ArrowDataType::Time32(TimeUnit::Millisecond),
55
// MILLIS is the only possible annotation for parquet's int32. Anything else
56
// is a deviation to the parquet specification and we ignore
57
_ => ArrowDataType::Int32,
58
},
59
// handle converted types:
60
(_, Some(PrimitiveConvertedType::Uint8)) => ArrowDataType::UInt8,
61
(_, Some(PrimitiveConvertedType::Uint16)) => ArrowDataType::UInt16,
62
(_, Some(PrimitiveConvertedType::Uint32)) => ArrowDataType::UInt32,
63
(_, Some(PrimitiveConvertedType::Int8)) => ArrowDataType::Int8,
64
(_, Some(PrimitiveConvertedType::Int16)) => ArrowDataType::Int16,
65
(_, Some(PrimitiveConvertedType::Int32)) => ArrowDataType::Int32,
66
(_, Some(PrimitiveConvertedType::Date)) => ArrowDataType::Date32,
67
(_, Some(PrimitiveConvertedType::TimeMillis)) => {
68
ArrowDataType::Time32(TimeUnit::Millisecond)
69
},
70
(_, Some(PrimitiveConvertedType::Decimal(precision, scale))) => {
71
ArrowDataType::Decimal(precision, scale)
72
},
73
(_, _) => ArrowDataType::Int32,
74
}
75
}
76
77
fn from_int64(
78
logical_type: Option<PrimitiveLogicalType>,
79
converted_type: Option<PrimitiveConvertedType>,
80
) -> ArrowDataType {
81
use PrimitiveLogicalType::*;
82
match (logical_type, converted_type) {
83
// handle logical types first
84
(Some(Integer(integer)), _) => match integer {
85
IntegerType::UInt64 => ArrowDataType::UInt64,
86
IntegerType::Int64 => ArrowDataType::Int64,
87
_ => ArrowDataType::Int64,
88
},
89
(
90
Some(Timestamp {
91
is_adjusted_to_utc,
92
unit,
93
}),
94
_,
95
) => {
96
let timezone = if is_adjusted_to_utc {
97
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
98
// A TIMESTAMP with isAdjustedToUTC=true is defined as [...] elapsed since the Unix epoch
99
Some(PlSmallStr::from_static("+00:00"))
100
} else {
101
// PARQUET:
102
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
103
// A TIMESTAMP with isAdjustedToUTC=false represents [...] such
104
// timestamps should always be displayed the same way, regardless of the local time zone in effect
105
// ARROW:
106
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
107
// If the time zone is null or equal to an empty string, the data is "time
108
// zone naive" and shall be displayed *as is* to the user, not localized
109
// to the locale of the user.
110
None
111
};
112
113
match unit {
114
ParquetTimeUnit::Milliseconds => {
115
ArrowDataType::Timestamp(TimeUnit::Millisecond, timezone)
116
},
117
ParquetTimeUnit::Microseconds => {
118
ArrowDataType::Timestamp(TimeUnit::Microsecond, timezone)
119
},
120
ParquetTimeUnit::Nanoseconds => {
121
ArrowDataType::Timestamp(TimeUnit::Nanosecond, timezone)
122
},
123
}
124
},
125
(Some(Time { unit, .. }), _) => match unit {
126
ParquetTimeUnit::Microseconds => ArrowDataType::Time64(TimeUnit::Microsecond),
127
ParquetTimeUnit::Nanoseconds => ArrowDataType::Time64(TimeUnit::Nanosecond),
128
// MILLIS is only possible for int32. Appearing in int64 is a deviation
129
// to parquet's spec, which we ignore
130
_ => ArrowDataType::Int64,
131
},
132
(Some(Decimal(precision, scale)), _) => ArrowDataType::Decimal(precision, scale),
133
// handle converted types:
134
(_, Some(PrimitiveConvertedType::TimeMicros)) => {
135
ArrowDataType::Time64(TimeUnit::Microsecond)
136
},
137
(_, Some(PrimitiveConvertedType::TimestampMillis)) => {
138
ArrowDataType::Timestamp(TimeUnit::Millisecond, None)
139
},
140
(_, Some(PrimitiveConvertedType::TimestampMicros)) => {
141
ArrowDataType::Timestamp(TimeUnit::Microsecond, None)
142
},
143
(_, Some(PrimitiveConvertedType::Int64)) => ArrowDataType::Int64,
144
(_, Some(PrimitiveConvertedType::Uint64)) => ArrowDataType::UInt64,
145
(_, Some(PrimitiveConvertedType::Decimal(precision, scale))) => {
146
ArrowDataType::Decimal(precision, scale)
147
},
148
149
(_, _) => ArrowDataType::Int64,
150
}
151
}
152
153
fn from_byte_array(
154
logical_type: &Option<PrimitiveLogicalType>,
155
converted_type: &Option<PrimitiveConvertedType>,
156
) -> ArrowDataType {
157
match (logical_type, converted_type) {
158
(Some(PrimitiveLogicalType::Decimal(precision, scale)), _) => {
159
ArrowDataType::Decimal(*precision, *scale)
160
},
161
(None, Some(PrimitiveConvertedType::Decimal(precision, scale))) => {
162
ArrowDataType::Decimal(*precision, *scale)
163
},
164
(Some(PrimitiveLogicalType::String), _) => ArrowDataType::Utf8View,
165
(Some(PrimitiveLogicalType::Json), _) => ArrowDataType::BinaryView,
166
(Some(PrimitiveLogicalType::Bson), _) => ArrowDataType::BinaryView,
167
(Some(PrimitiveLogicalType::Enum), _) => ArrowDataType::BinaryView,
168
(_, Some(PrimitiveConvertedType::Json)) => ArrowDataType::BinaryView,
169
(_, Some(PrimitiveConvertedType::Bson)) => ArrowDataType::BinaryView,
170
(_, Some(PrimitiveConvertedType::Enum)) => ArrowDataType::BinaryView,
171
(_, Some(PrimitiveConvertedType::Utf8)) => ArrowDataType::Utf8View,
172
(_, _) => ArrowDataType::BinaryView,
173
}
174
}
175
176
fn from_fixed_len_byte_array(
177
length: usize,
178
logical_type: Option<PrimitiveLogicalType>,
179
converted_type: Option<PrimitiveConvertedType>,
180
) -> ArrowDataType {
181
match (logical_type, converted_type) {
182
(Some(PrimitiveLogicalType::Decimal(precision, scale)), _) => {
183
ArrowDataType::Decimal(precision, scale)
184
},
185
(None, Some(PrimitiveConvertedType::Decimal(precision, scale))) => {
186
ArrowDataType::Decimal(precision, scale)
187
},
188
(None, Some(PrimitiveConvertedType::Interval)) => {
189
ArrowDataType::Interval(IntervalUnit::MonthDayMillis)
190
},
191
_ => ArrowDataType::FixedSizeBinary(length),
192
}
193
}
194
195
/// Maps a [`PhysicalType`] with optional metadata to a [`ArrowDataType`]
196
fn to_primitive_type_inner(
197
primitive_type: &PrimitiveType,
198
options: &SchemaInferenceOptions,
199
) -> ArrowDataType {
200
match primitive_type.physical_type {
201
PhysicalType::Boolean => ArrowDataType::Boolean,
202
PhysicalType::Int32 => {
203
from_int32(primitive_type.logical_type, primitive_type.converted_type)
204
},
205
PhysicalType::Int64 => {
206
from_int64(primitive_type.logical_type, primitive_type.converted_type)
207
},
208
PhysicalType::Int96 => ArrowDataType::Timestamp(options.int96_coerce_to_timeunit, None),
209
PhysicalType::Float => ArrowDataType::Float32,
210
PhysicalType::Double => ArrowDataType::Float64,
211
PhysicalType::ByteArray => {
212
from_byte_array(&primitive_type.logical_type, &primitive_type.converted_type)
213
},
214
PhysicalType::FixedLenByteArray(length) => from_fixed_len_byte_array(
215
length,
216
primitive_type.logical_type,
217
primitive_type.converted_type,
218
),
219
}
220
}
221
222
/// Entry point for converting parquet primitive type to arrow type.
223
///
224
/// This function takes care of repetition.
225
fn to_primitive_type(
226
primitive_type: &PrimitiveType,
227
options: &SchemaInferenceOptions,
228
) -> ArrowDataType {
229
let base_type = to_primitive_type_inner(primitive_type, options);
230
231
if primitive_type.field_info.repetition == Repetition::Repeated {
232
ArrowDataType::LargeList(Box::new(Field::new(
233
primitive_type.field_info.name.clone(),
234
base_type,
235
is_nullable(&primitive_type.field_info),
236
)))
237
} else {
238
base_type
239
}
240
}
241
242
fn non_repeated_group(
243
logical_type: &Option<GroupLogicalType>,
244
converted_type: &Option<GroupConvertedType>,
245
fields: &[ParquetType],
246
parent_name: &str,
247
options: &SchemaInferenceOptions,
248
) -> Option<ArrowDataType> {
249
debug_assert!(!fields.is_empty());
250
match (logical_type, converted_type) {
251
(Some(GroupLogicalType::List), _) => to_list(fields, parent_name, options),
252
(None, Some(GroupConvertedType::List)) => to_list(fields, parent_name, options),
253
(Some(GroupLogicalType::Map), _) => to_list(fields, parent_name, options),
254
(None, Some(GroupConvertedType::Map) | Some(GroupConvertedType::MapKeyValue)) => {
255
to_map(fields, options)
256
},
257
_ => to_struct(fields, options),
258
}
259
}
260
261
/// Converts a parquet group type to an arrow [`ArrowDataType::Struct`].
262
/// Returns [`None`] if all its fields are empty
263
fn to_struct(fields: &[ParquetType], options: &SchemaInferenceOptions) -> Option<ArrowDataType> {
264
let fields = fields
265
.iter()
266
.filter_map(|f| to_field(f, options))
267
.collect::<Vec<Field>>();
268
if fields.is_empty() {
269
None
270
} else {
271
Some(ArrowDataType::Struct(fields))
272
}
273
}
274
275
/// Converts a parquet group type to an arrow [`ArrowDataType::Struct`].
276
/// Returns [`None`] if all its fields are empty
277
fn to_map(fields: &[ParquetType], options: &SchemaInferenceOptions) -> Option<ArrowDataType> {
278
let inner = to_field(&fields[0], options)?;
279
Some(ArrowDataType::Map(Box::new(inner), false))
280
}
281
282
/// Entry point for converting parquet group type.
283
///
284
/// This function takes care of logical type and repetition.
285
fn to_group_type(
286
field_info: &FieldInfo,
287
logical_type: &Option<GroupLogicalType>,
288
converted_type: &Option<GroupConvertedType>,
289
fields: &[ParquetType],
290
parent_name: &str,
291
options: &SchemaInferenceOptions,
292
) -> Option<ArrowDataType> {
293
debug_assert!(!fields.is_empty());
294
if field_info.repetition == Repetition::Repeated {
295
Some(ArrowDataType::LargeList(Box::new(Field::new(
296
field_info.name.clone(),
297
to_struct(fields, options)?,
298
is_nullable(field_info),
299
))))
300
} else {
301
non_repeated_group(logical_type, converted_type, fields, parent_name, options)
302
}
303
}
304
305
/// Checks whether this schema is nullable.
306
pub(crate) fn is_nullable(field_info: &FieldInfo) -> bool {
307
match field_info.repetition {
308
Repetition::Optional => true,
309
Repetition::Repeated => true,
310
Repetition::Required => false,
311
}
312
}
313
314
/// Converts parquet schema to arrow field.
315
/// Returns `None` iff the parquet type has no associated primitive types,
316
/// i.e. if it is a column-less group type.
317
fn to_field(type_: &ParquetType, options: &SchemaInferenceOptions) -> Option<Field> {
318
let field_info = type_.get_field_info();
319
320
let metadata: Option<Arc<Metadata>> = field_info.id.map(|x: i32| {
321
Arc::new(
322
[(
323
PlSmallStr::from_static("PARQUET:field_id"),
324
format_pl_smallstr!("{x}"),
325
)]
326
.into(),
327
)
328
});
329
330
let mut arrow_field = Field::new(
331
field_info.name.clone(),
332
to_dtype(type_, options)?,
333
is_nullable(type_.get_field_info()),
334
);
335
336
arrow_field.metadata = metadata;
337
338
Some(arrow_field)
339
}
340
341
/// Converts a parquet list to arrow list.
342
///
343
/// To fully understand this algorithm, please refer to
344
/// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md).
345
fn to_list(
346
fields: &[ParquetType],
347
parent_name: &str,
348
options: &SchemaInferenceOptions,
349
) -> Option<ArrowDataType> {
350
let item = fields.first().unwrap();
351
352
let item_type = match item {
353
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type_inner(primitive, options)),
354
ParquetType::GroupType { fields, .. } => {
355
if fields.len() == 1 && item.name() != "array" && {
356
// item.name() != format!("{parent_name}_tuple")
357
let cmp = [parent_name, "_tuple"];
358
let len_1 = parent_name.len();
359
let len = len_1 + "_tuple".len();
360
361
item.name().len() != len || [&item.name()[..len_1], &item.name()[len_1..]] != cmp
362
} {
363
// extract the repetition field
364
let nested_item = fields.first().unwrap();
365
to_dtype(nested_item, options)
366
} else {
367
to_struct(fields, options)
368
}
369
},
370
}?;
371
372
// Check that the name of the list child is "list", in which case we
373
// get the child nullability and name (normally "element") from the nested
374
// group type.
375
// Without this step, the child incorrectly inherits the parent's optionality
376
let (list_item_name, item_is_optional) = match item {
377
ParquetType::GroupType {
378
field_info, fields, ..
379
} if field_info.name.as_str() == "list" && fields.len() == 1 => {
380
let field = fields.first().unwrap();
381
(
382
field.get_field_info().name.clone(),
383
field.get_field_info().repetition == Repetition::Optional,
384
)
385
},
386
_ => (
387
item.get_field_info().name.clone(),
388
item.get_field_info().repetition == Repetition::Optional,
389
),
390
};
391
392
Some(ArrowDataType::LargeList(Box::new(Field::new(
393
list_item_name,
394
item_type,
395
item_is_optional,
396
))))
397
}
398
399
/// Converts parquet schema to arrow data type.
400
///
401
/// This function discards schema name.
402
///
403
/// If this schema is a primitive type and not included in the leaves, the result is
404
/// Ok(None).
405
///
406
/// If this schema is a group type and none of its children is reserved in the
407
/// conversion, the result is Ok(None).
408
pub(crate) fn to_dtype(
409
type_: &ParquetType,
410
options: &SchemaInferenceOptions,
411
) -> Option<ArrowDataType> {
412
match type_ {
413
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type(primitive, options)),
414
ParquetType::GroupType {
415
field_info,
416
logical_type,
417
converted_type,
418
fields,
419
} => {
420
if fields.is_empty() {
421
None
422
} else {
423
to_group_type(
424
field_info,
425
logical_type,
426
converted_type,
427
fields,
428
field_info.name.as_str(),
429
options,
430
)
431
}
432
},
433
}
434
}
435
436
#[cfg(test)]
437
mod tests {
438
use polars_error::*;
439
440
use super::*;
441
use crate::parquet::metadata::SchemaDescriptor;
442
443
#[test]
444
fn test_flat_primitives() -> PolarsResult<()> {
445
let message = "
446
message test_schema {
447
REQUIRED BOOLEAN boolean;
448
REQUIRED INT32 int8 (INT_8);
449
REQUIRED INT32 int16 (INT_16);
450
REQUIRED INT32 uint8 (INTEGER(8,false));
451
REQUIRED INT32 uint16 (INTEGER(16,false));
452
REQUIRED INT32 int32;
453
REQUIRED INT64 int64 ;
454
OPTIONAL DOUBLE double;
455
OPTIONAL FLOAT float;
456
OPTIONAL BINARY string (UTF8);
457
OPTIONAL BINARY string_2 (STRING);
458
}
459
";
460
let expected = &[
461
Field::new("boolean".into(), ArrowDataType::Boolean, false),
462
Field::new("int8".into(), ArrowDataType::Int8, false),
463
Field::new("int16".into(), ArrowDataType::Int16, false),
464
Field::new("uint8".into(), ArrowDataType::UInt8, false),
465
Field::new("uint16".into(), ArrowDataType::UInt16, false),
466
Field::new("int32".into(), ArrowDataType::Int32, false),
467
Field::new("int64".into(), ArrowDataType::Int64, false),
468
Field::new("double".into(), ArrowDataType::Float64, true),
469
Field::new("float".into(), ArrowDataType::Float32, true),
470
Field::new("string".into(), ArrowDataType::Utf8View, true),
471
Field::new("string_2".into(), ArrowDataType::Utf8View, true),
472
];
473
474
let parquet_schema = SchemaDescriptor::try_from_message(message)?;
475
let fields = parquet_to_arrow_schema(parquet_schema.fields());
476
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
477
478
assert_eq!(fields, expected);
479
Ok(())
480
}
481
482
#[test]
483
fn test_byte_array_fields() -> PolarsResult<()> {
484
let message = "
485
message test_schema {
486
REQUIRED BYTE_ARRAY binary;
487
REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
488
}
489
";
490
let expected = vec![
491
Field::new("binary".into(), ArrowDataType::BinaryView, false),
492
Field::new(
493
"fixed_binary".into(),
494
ArrowDataType::FixedSizeBinary(20),
495
false,
496
),
497
];
498
499
let parquet_schema = SchemaDescriptor::try_from_message(message)?;
500
let fields = parquet_to_arrow_schema(parquet_schema.fields());
501
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
502
503
assert_eq!(fields, expected);
504
Ok(())
505
}
506
507
#[test]
508
fn test_duplicate_fields() -> PolarsResult<()> {
509
let message = "
510
message test_schema {
511
REQUIRED BOOLEAN boolean;
512
REQUIRED INT32 int8 (INT_8);
513
}
514
";
515
let expected = &[
516
Field::new("boolean".into(), ArrowDataType::Boolean, false),
517
Field::new("int8".into(), ArrowDataType::Int8, false),
518
];
519
520
let parquet_schema = SchemaDescriptor::try_from_message(message)?;
521
let fields = parquet_to_arrow_schema(parquet_schema.fields());
522
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
523
524
assert_eq!(fields, expected);
525
Ok(())
526
}
527
528
#[ignore]
529
#[test]
530
fn test_parquet_lists() -> PolarsResult<()> {
531
let mut arrow_fields = Vec::new();
532
533
// LIST encoding example taken from parquet-format/LogicalTypes.md
534
let message_type = "
535
message test_schema {
536
REQUIRED GROUP my_list (LIST) {
537
REPEATED GROUP list {
538
OPTIONAL BINARY element (UTF8);
539
}
540
}
541
OPTIONAL GROUP my_list (LIST) {
542
REPEATED GROUP list {
543
REQUIRED BINARY element (UTF8);
544
}
545
}
546
OPTIONAL GROUP array_of_arrays (LIST) {
547
REPEATED GROUP list {
548
REQUIRED GROUP element (LIST) {
549
REPEATED GROUP list {
550
REQUIRED INT32 element;
551
}
552
}
553
}
554
}
555
OPTIONAL GROUP my_list (LIST) {
556
REPEATED GROUP element {
557
REQUIRED BINARY str (UTF8);
558
}
559
}
560
OPTIONAL GROUP my_list (LIST) {
561
REPEATED INT32 element;
562
}
563
OPTIONAL GROUP my_list (LIST) {
564
REPEATED GROUP element {
565
REQUIRED BINARY str (UTF8);
566
REQUIRED INT32 num;
567
}
568
}
569
OPTIONAL GROUP my_list (LIST) {
570
REPEATED GROUP array {
571
REQUIRED BINARY str (UTF8);
572
}
573
574
}
575
OPTIONAL GROUP my_list (LIST) {
576
REPEATED GROUP my_list_tuple {
577
REQUIRED BINARY str (UTF8);
578
}
579
}
580
REPEATED INT32 name;
581
}
582
";
583
584
// // List<String> (list non-null, elements nullable)
585
// required group my_list (LIST) {
586
// repeated group list {
587
// optional binary element (UTF8);
588
// }
589
// }
590
{
591
arrow_fields.push(Field::new(
592
"my_list".into(),
593
ArrowDataType::LargeList(Box::new(Field::new(
594
"element".into(),
595
ArrowDataType::Utf8,
596
true,
597
))),
598
false,
599
));
600
}
601
602
// // List<String> (list nullable, elements non-null)
603
// optional group my_list (LIST) {
604
// repeated group list {
605
// required binary element (UTF8);
606
// }
607
// }
608
{
609
arrow_fields.push(Field::new(
610
"my_list".into(),
611
ArrowDataType::LargeList(Box::new(Field::new(
612
"element".into(),
613
ArrowDataType::Utf8,
614
false,
615
))),
616
true,
617
));
618
}
619
620
// Element types can be nested structures. For example, a list of lists:
621
//
622
// // List<List<Integer>>
623
// optional group array_of_arrays (LIST) {
624
// repeated group list {
625
// required group element (LIST) {
626
// repeated group list {
627
// required int32 element;
628
// }
629
// }
630
// }
631
// }
632
{
633
let arrow_inner_list = ArrowDataType::LargeList(Box::new(Field::new(
634
"element".into(),
635
ArrowDataType::Int32,
636
false,
637
)));
638
arrow_fields.push(Field::new(
639
"array_of_arrays".into(),
640
ArrowDataType::LargeList(Box::new(Field::new(
641
PlSmallStr::from_static("element"),
642
arrow_inner_list,
643
false,
644
))),
645
true,
646
));
647
}
648
649
// // List<String> (list nullable, elements non-null)
650
// optional group my_list (LIST) {
651
// repeated group element {
652
// required binary str (UTF8);
653
// };
654
// }
655
{
656
arrow_fields.push(Field::new(
657
"my_list".into(),
658
ArrowDataType::LargeList(Box::new(Field::new(
659
"element".into(),
660
ArrowDataType::Utf8,
661
false,
662
))),
663
true,
664
));
665
}
666
667
// // List<Integer> (nullable list, non-null elements)
668
// optional group my_list (LIST) {
669
// repeated int32 element;
670
// }
671
{
672
arrow_fields.push(Field::new(
673
"my_list".into(),
674
ArrowDataType::LargeList(Box::new(Field::new(
675
"element".into(),
676
ArrowDataType::Int32,
677
false,
678
))),
679
true,
680
));
681
}
682
683
// // List<Tuple<String, Integer>> (nullable list, non-null elements)
684
// optional group my_list (LIST) {
685
// repeated group element {
686
// required binary str (UTF8);
687
// required int32 num;
688
// };
689
// }
690
{
691
let arrow_struct = ArrowDataType::Struct(vec![
692
Field::new("str".into(), ArrowDataType::Utf8, false),
693
Field::new("num".into(), ArrowDataType::Int32, false),
694
]);
695
arrow_fields.push(Field::new(
696
"my_list".into(),
697
ArrowDataType::LargeList(Box::new(Field::new(
698
"element".into(),
699
arrow_struct,
700
false,
701
))),
702
true,
703
));
704
}
705
706
// // List<OneTuple<String>> (nullable list, non-null elements)
707
// optional group my_list (LIST) {
708
// repeated group array {
709
// required binary str (UTF8);
710
// };
711
// }
712
// Special case: group is named array
713
{
714
let arrow_struct =
715
ArrowDataType::Struct(vec![Field::new("str".into(), ArrowDataType::Utf8, false)]);
716
arrow_fields.push(Field::new(
717
"my_list".into(),
718
ArrowDataType::LargeList(Box::new(Field::new("array".into(), arrow_struct, false))),
719
true,
720
));
721
}
722
723
// // List<OneTuple<String>> (nullable list, non-null elements)
724
// optional group my_list (LIST) {
725
// repeated group my_list_tuple {
726
// required binary str (UTF8);
727
// };
728
// }
729
// Special case: group named ends in _tuple
730
{
731
let arrow_struct =
732
ArrowDataType::Struct(vec![Field::new("str".into(), ArrowDataType::Utf8, false)]);
733
arrow_fields.push(Field::new(
734
"my_list".into(),
735
ArrowDataType::LargeList(Box::new(Field::new(
736
"my_list_tuple".into(),
737
arrow_struct,
738
false,
739
))),
740
true,
741
));
742
}
743
744
// One-level encoding: Only allows required lists with required cells
745
// repeated value_type name
746
{
747
arrow_fields.push(Field::new(
748
"name".into(),
749
ArrowDataType::LargeList(Box::new(Field::new(
750
"name".into(),
751
ArrowDataType::Int32,
752
false,
753
))),
754
false,
755
));
756
}
757
758
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
759
let fields = parquet_to_arrow_schema(parquet_schema.fields());
760
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
761
762
assert_eq!(arrow_fields, fields);
763
Ok(())
764
}
765
766
#[test]
767
fn test_parquet_list_with_struct() -> PolarsResult<()> {
768
let mut arrow_fields = Vec::new();
769
770
let message_type = "
771
message eventlog {
772
REQUIRED group events (LIST) {
773
REPEATED group array {
774
REQUIRED BYTE_ARRAY event_name (STRING);
775
REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true));
776
}
777
}
778
}
779
";
780
781
{
782
let struct_fields = vec![
783
Field::new("event_name".into(), ArrowDataType::Utf8View, false),
784
Field::new(
785
"event_time".into(),
786
ArrowDataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
787
false,
788
),
789
];
790
arrow_fields.push(Field::new(
791
"events".into(),
792
ArrowDataType::LargeList(Box::new(Field::new(
793
"array".into(),
794
ArrowDataType::Struct(struct_fields),
795
false,
796
))),
797
false,
798
));
799
}
800
801
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
802
let fields = parquet_to_arrow_schema(parquet_schema.fields());
803
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
804
805
assert_eq!(arrow_fields, fields);
806
Ok(())
807
}
808
809
#[test]
810
fn test_parquet_list_nullable() -> PolarsResult<()> {
811
let mut arrow_fields = Vec::new();
812
813
let message_type = "
814
message test_schema {
815
REQUIRED GROUP my_list1 (LIST) {
816
REPEATED GROUP list {
817
OPTIONAL BINARY element (UTF8);
818
}
819
}
820
OPTIONAL GROUP my_list2 (LIST) {
821
REPEATED GROUP list {
822
REQUIRED BINARY element (UTF8);
823
}
824
}
825
REQUIRED GROUP my_list3 (LIST) {
826
REPEATED GROUP list {
827
REQUIRED BINARY element (UTF8);
828
}
829
}
830
}
831
";
832
833
// // List<String> (list non-null, elements nullable)
834
// required group my_list1 (LIST) {
835
// repeated group list {
836
// optional binary element (UTF8);
837
// }
838
// }
839
{
840
arrow_fields.push(Field::new(
841
"my_list1".into(),
842
ArrowDataType::LargeList(Box::new(Field::new(
843
"element".into(),
844
ArrowDataType::Utf8View,
845
true,
846
))),
847
false,
848
));
849
}
850
851
// // List<String> (list nullable, elements non-null)
852
// optional group my_list2 (LIST) {
853
// repeated group list {
854
// required binary element (UTF8);
855
// }
856
// }
857
{
858
arrow_fields.push(Field::new(
859
"my_list2".into(),
860
ArrowDataType::LargeList(Box::new(Field::new(
861
"element".into(),
862
ArrowDataType::Utf8View,
863
false,
864
))),
865
true,
866
));
867
}
868
869
// // List<String> (list non-null, elements non-null)
870
// repeated group my_list3 (LIST) {
871
// repeated group list {
872
// required binary element (UTF8);
873
// }
874
// }
875
{
876
arrow_fields.push(Field::new(
877
"my_list3".into(),
878
ArrowDataType::LargeList(Box::new(Field::new(
879
"element".into(),
880
ArrowDataType::Utf8View,
881
false,
882
))),
883
false,
884
));
885
}
886
887
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
888
let fields = parquet_to_arrow_schema(parquet_schema.fields());
889
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
890
891
assert_eq!(arrow_fields, fields);
892
Ok(())
893
}
894
895
#[test]
896
fn test_nested_schema() -> PolarsResult<()> {
897
let mut arrow_fields = Vec::new();
898
{
899
let group1_fields = vec![
900
Field::new("leaf1".into(), ArrowDataType::Boolean, false),
901
Field::new("leaf2".into(), ArrowDataType::Int32, false),
902
];
903
let group1_struct =
904
Field::new("group1".into(), ArrowDataType::Struct(group1_fields), false);
905
arrow_fields.push(group1_struct);
906
907
let leaf3_field = Field::new("leaf3".into(), ArrowDataType::Int64, false);
908
arrow_fields.push(leaf3_field);
909
}
910
911
let message_type = "
912
message test_schema {
913
REQUIRED GROUP group1 {
914
REQUIRED BOOLEAN leaf1;
915
REQUIRED INT32 leaf2;
916
}
917
REQUIRED INT64 leaf3;
918
}
919
";
920
921
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
922
let fields = parquet_to_arrow_schema(parquet_schema.fields());
923
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
924
925
assert_eq!(arrow_fields, fields);
926
Ok(())
927
}
928
929
#[ignore]
930
#[test]
931
fn test_repeated_nested_schema() -> PolarsResult<()> {
932
let mut arrow_fields = Vec::new();
933
{
934
arrow_fields.push(Field::new("leaf1".into(), ArrowDataType::Int32, true));
935
936
let inner_group_list = Field::new(
937
"innerGroup".into(),
938
ArrowDataType::LargeList(Box::new(Field::new(
939
"innerGroup".into(),
940
ArrowDataType::Struct(vec![Field::new(
941
"leaf3".into(),
942
ArrowDataType::Int32,
943
true,
944
)]),
945
false,
946
))),
947
false,
948
);
949
950
let outer_group_list = Field::new(
951
"outerGroup".into(),
952
ArrowDataType::LargeList(Box::new(Field::new(
953
"outerGroup".into(),
954
ArrowDataType::Struct(vec![
955
Field::new("leaf2".into(), ArrowDataType::Int32, true),
956
inner_group_list,
957
]),
958
false,
959
))),
960
false,
961
);
962
arrow_fields.push(outer_group_list);
963
}
964
965
let message_type = "
966
message test_schema {
967
OPTIONAL INT32 leaf1;
968
REPEATED GROUP outerGroup {
969
OPTIONAL INT32 leaf2;
970
REPEATED GROUP innerGroup {
971
OPTIONAL INT32 leaf3;
972
}
973
}
974
}
975
";
976
977
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
978
let fields = parquet_to_arrow_schema(parquet_schema.fields());
979
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
980
981
assert_eq!(arrow_fields, fields);
982
Ok(())
983
}
984
985
#[ignore]
986
#[test]
987
fn test_column_desc_to_field() -> PolarsResult<()> {
988
let message_type = "
989
message test_schema {
990
REQUIRED BOOLEAN boolean;
991
REQUIRED INT32 int8 (INT_8);
992
REQUIRED INT32 uint8 (INTEGER(8,false));
993
REQUIRED INT32 int16 (INT_16);
994
REQUIRED INT32 uint16 (INTEGER(16,false));
995
REQUIRED INT32 int32;
996
REQUIRED INT64 int64;
997
OPTIONAL DOUBLE double;
998
OPTIONAL FLOAT float;
999
OPTIONAL BINARY string (UTF8);
1000
REPEATED BOOLEAN bools;
1001
OPTIONAL INT32 date (DATE);
1002
OPTIONAL INT32 time_milli (TIME_MILLIS);
1003
OPTIONAL INT64 time_micro (TIME_MICROS);
1004
OPTIONAL INT64 time_nano (TIME(NANOS,false));
1005
OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1006
REQUIRED INT64 ts_micro (TIMESTAMP_MICROS);
1007
REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true));
1008
}
1009
";
1010
let arrow_fields = vec![
1011
Field::new("boolean".into(), ArrowDataType::Boolean, false),
1012
Field::new("int8".into(), ArrowDataType::Int8, false),
1013
Field::new("uint8".into(), ArrowDataType::UInt8, false),
1014
Field::new("int16".into(), ArrowDataType::Int16, false),
1015
Field::new("uint16".into(), ArrowDataType::UInt16, false),
1016
Field::new("int32".into(), ArrowDataType::Int32, false),
1017
Field::new("int64".into(), ArrowDataType::Int64, false),
1018
Field::new("double".into(), ArrowDataType::Float64, true),
1019
Field::new("float".into(), ArrowDataType::Float32, true),
1020
Field::new("string".into(), ArrowDataType::Utf8, true),
1021
Field::new(
1022
"bools".into(),
1023
ArrowDataType::LargeList(Box::new(Field::new(
1024
"bools".into(),
1025
ArrowDataType::Boolean,
1026
false,
1027
))),
1028
false,
1029
),
1030
Field::new("date".into(), ArrowDataType::Date32, true),
1031
Field::new(
1032
"time_milli".into(),
1033
ArrowDataType::Time32(TimeUnit::Millisecond),
1034
true,
1035
),
1036
Field::new(
1037
"time_micro".into(),
1038
ArrowDataType::Time64(TimeUnit::Microsecond),
1039
true,
1040
),
1041
Field::new(
1042
"time_nano".into(),
1043
ArrowDataType::Time64(TimeUnit::Nanosecond),
1044
true,
1045
),
1046
Field::new(
1047
"ts_milli".into(),
1048
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1049
true,
1050
),
1051
Field::new(
1052
"ts_micro".into(),
1053
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
1054
false,
1055
),
1056
Field::new(
1057
"ts_nano".into(),
1058
ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
1059
false,
1060
),
1061
];
1062
1063
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
1064
let fields = parquet_to_arrow_schema(parquet_schema.fields());
1065
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
1066
1067
assert_eq!(arrow_fields, fields);
1068
Ok(())
1069
}
1070
1071
#[test]
1072
fn test_field_to_column_desc() -> PolarsResult<()> {
1073
let message_type = "
1074
message arrow_schema {
1075
REQUIRED BOOLEAN boolean;
1076
REQUIRED INT32 int8 (INT_8);
1077
REQUIRED INT32 int16 (INTEGER(16,true));
1078
REQUIRED INT32 int32;
1079
REQUIRED INT64 int64;
1080
OPTIONAL DOUBLE double;
1081
OPTIONAL FLOAT float;
1082
OPTIONAL BINARY string (STRING);
1083
OPTIONAL GROUP bools (LIST) {
1084
REPEATED GROUP list {
1085
OPTIONAL BOOLEAN element;
1086
}
1087
}
1088
REQUIRED GROUP bools_non_null (LIST) {
1089
REPEATED GROUP list {
1090
REQUIRED BOOLEAN element;
1091
}
1092
}
1093
OPTIONAL INT32 date (DATE);
1094
OPTIONAL INT32 time_milli (TIME(MILLIS,false));
1095
OPTIONAL INT64 time_micro (TIME_MICROS);
1096
OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
1097
REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
1098
REQUIRED GROUP struct {
1099
REQUIRED BOOLEAN bools;
1100
REQUIRED INT32 uint32 (INTEGER(32,false));
1101
REQUIRED GROUP int32 (LIST) {
1102
REPEATED GROUP list {
1103
OPTIONAL INT32 element;
1104
}
1105
}
1106
}
1107
REQUIRED BINARY dictionary_strings (STRING);
1108
}
1109
";
1110
1111
let arrow_fields = vec![
1112
Field::new("boolean".into(), ArrowDataType::Boolean, false),
1113
Field::new("int8".into(), ArrowDataType::Int8, false),
1114
Field::new("int16".into(), ArrowDataType::Int16, false),
1115
Field::new("int32".into(), ArrowDataType::Int32, false),
1116
Field::new("int64".into(), ArrowDataType::Int64, false),
1117
Field::new("double".into(), ArrowDataType::Float64, true),
1118
Field::new("float".into(), ArrowDataType::Float32, true),
1119
Field::new("string".into(), ArrowDataType::Utf8View, true),
1120
Field::new(
1121
"bools".into(),
1122
ArrowDataType::LargeList(Box::new(Field::new(
1123
"element".into(),
1124
ArrowDataType::Boolean,
1125
true,
1126
))),
1127
true,
1128
),
1129
Field::new(
1130
"bools_non_null".into(),
1131
ArrowDataType::LargeList(Box::new(Field::new(
1132
"element".into(),
1133
ArrowDataType::Boolean,
1134
false,
1135
))),
1136
false,
1137
),
1138
Field::new("date".into(), ArrowDataType::Date32, true),
1139
Field::new(
1140
"time_milli".into(),
1141
ArrowDataType::Time32(TimeUnit::Millisecond),
1142
true,
1143
),
1144
Field::new(
1145
"time_micro".into(),
1146
ArrowDataType::Time64(TimeUnit::Microsecond),
1147
true,
1148
),
1149
Field::new(
1150
"ts_milli".into(),
1151
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1152
true,
1153
),
1154
Field::new(
1155
"ts_micro".into(),
1156
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
1157
false,
1158
),
1159
Field::new(
1160
"struct".into(),
1161
ArrowDataType::Struct(vec![
1162
Field::new("bools".into(), ArrowDataType::Boolean, false),
1163
Field::new("uint32".into(), ArrowDataType::UInt32, false),
1164
Field::new(
1165
"int32".into(),
1166
ArrowDataType::LargeList(Box::new(Field::new(
1167
"element".into(),
1168
ArrowDataType::Int32,
1169
true,
1170
))),
1171
false,
1172
),
1173
]),
1174
false,
1175
),
1176
Field::new("dictionary_strings".into(), ArrowDataType::Utf8View, false),
1177
];
1178
1179
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
1180
let fields = parquet_to_arrow_schema(parquet_schema.fields());
1181
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
1182
1183
assert_eq!(arrow_fields, fields);
1184
Ok(())
1185
}
1186
1187
#[test]
1188
fn test_int96_options() -> PolarsResult<()> {
1189
for tu in [
1190
TimeUnit::Second,
1191
TimeUnit::Microsecond,
1192
TimeUnit::Millisecond,
1193
TimeUnit::Nanosecond,
1194
] {
1195
let message_type = "
1196
message arrow_schema {
1197
REQUIRED INT96 int96_field;
1198
OPTIONAL GROUP int96_list (LIST) {
1199
REPEATED GROUP list {
1200
OPTIONAL INT96 element;
1201
}
1202
}
1203
REQUIRED GROUP int96_struct {
1204
REQUIRED INT96 int96_field;
1205
}
1206
}
1207
";
1208
let coerced_to = ArrowDataType::Timestamp(tu, None);
1209
let arrow_fields = vec![
1210
Field::new("int96_field".into(), coerced_to.clone(), false),
1211
Field::new(
1212
"int96_list".into(),
1213
ArrowDataType::LargeList(Box::new(Field::new(
1214
"element".into(),
1215
coerced_to.clone(),
1216
true,
1217
))),
1218
true,
1219
),
1220
Field::new(
1221
"int96_struct".into(),
1222
ArrowDataType::Struct(vec![Field::new(
1223
"int96_field".into(),
1224
coerced_to.clone(),
1225
false,
1226
)]),
1227
false,
1228
),
1229
];
1230
1231
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
1232
let fields = parquet_to_arrow_schema_with_options(
1233
parquet_schema.fields(),
1234
&Some(SchemaInferenceOptions {
1235
int96_coerce_to_timeunit: tu,
1236
}),
1237
);
1238
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
1239
assert_eq!(arrow_fields, fields);
1240
}
1241
Ok(())
1242
}
1243
}
1244
1245