Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/schema/io_message/from_message.rs
7885 views
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements. See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership. The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License. You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied. See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
//! Parquet schema parser.
19
//! Provides methods to parse and validate string message type into Parquet
20
//! [`ParquetType`](crate::parquet::schema::types::ParquetType).
21
//!
22
//! # Example
23
//!
24
//! ```rust
25
//! use polars_parquet::parquet::schema::io_message::from_message;
26
//!
27
//! let message_type = "
28
//! message spark_schema {
29
//! OPTIONAL BYTE_ARRAY a (UTF8);
30
//! REQUIRED INT32 b;
31
//! REQUIRED DOUBLE c;
32
//! REQUIRED BOOLEAN d;
33
//! OPTIONAL group e (LIST) {
34
//! REPEATED group list {
35
//! REQUIRED INT32 element;
36
//! }
37
//! }
38
//! }
39
//! ";
40
//!
41
//! let schema = from_message(message_type).expect("Expected valid schema");
42
//! println!("{:?}", schema);
43
//! ```
44
45
use polars_parquet_format::Type;
46
use polars_utils::pl_str::PlSmallStr;
47
use types::PrimitiveLogicalType;
48
49
use super::super::types::{ParquetType, TimeUnit};
50
use super::super::*;
51
use crate::parquet::error::{ParquetError, ParquetResult};
52
use crate::parquet::schema::types::{GroupConvertedType, PrimitiveConvertedType};
53
54
fn is_logical_type(s: &str) -> bool {
55
matches!(
56
s,
57
"INTEGER"
58
| "MAP"
59
| "LIST"
60
| "ENUM"
61
| "DECIMAL"
62
| "DATE"
63
| "TIME"
64
| "TIMESTAMP"
65
| "STRING"
66
| "JSON"
67
| "BSON"
68
| "UUID"
69
| "UNKNOWN"
70
| "INTERVAL"
71
)
72
}
73
74
fn is_converted_type(s: &str) -> bool {
75
matches!(
76
s,
77
"UTF8"
78
| "ENUM"
79
| "DECIMAL"
80
| "DATE"
81
| "TIME_MILLIS"
82
| "TIME_MICROS"
83
| "TIMESTAMP_MILLIS"
84
| "TIMESTAMP_MICROS"
85
| "UINT_8"
86
| "UINT_16"
87
| "UINT_32"
88
| "UINT_64"
89
| "INT_8"
90
| "INT_16"
91
| "INT_32"
92
| "INT_64"
93
| "JSON"
94
| "BSON"
95
| "INTERVAL"
96
)
97
}
98
99
fn converted_group_from_str(s: &str) -> ParquetResult<GroupConvertedType> {
100
Ok(match s {
101
"MAP" => GroupConvertedType::Map,
102
"MAP_KEY_VALUE" => GroupConvertedType::MapKeyValue,
103
"LIST" => GroupConvertedType::List,
104
other => {
105
return Err(ParquetError::oos(format!("Invalid converted type {other}")));
106
},
107
})
108
}
109
110
fn converted_primitive_from_str(s: &str) -> Option<PrimitiveConvertedType> {
111
use PrimitiveConvertedType::*;
112
Some(match s {
113
"UTF8" => Utf8,
114
"ENUM" => Enum,
115
"DECIMAL" => Decimal(0, 0),
116
"DATE" => Date,
117
"TIME_MILLIS" => TimeMillis,
118
"TIME_MICROS" => TimeMicros,
119
"TIMESTAMP_MILLIS" => TimestampMillis,
120
"TIMESTAMP_MICROS" => TimestampMicros,
121
"UINT_8" => Uint8,
122
"UINT_16" => Uint16,
123
"UINT_32" => Uint32,
124
"UINT_64" => Uint64,
125
"INT_8" => Int8,
126
"INT_16" => Int16,
127
"INT_32" => Int32,
128
"INT_64" => Int64,
129
"JSON" => Json,
130
"BSON" => Bson,
131
"INTERVAL" => Interval,
132
_ => return None,
133
})
134
}
135
136
fn repetition_from_str(s: &str) -> ParquetResult<Repetition> {
137
Ok(match s {
138
"REQUIRED" => Repetition::Required,
139
"OPTIONAL" => Repetition::Optional,
140
"REPEATED" => Repetition::Repeated,
141
other => return Err(ParquetError::oos(format!("Invalid repetition {other}"))),
142
})
143
}
144
145
fn type_from_str(s: &str) -> ParquetResult<Type> {
146
match s {
147
"BOOLEAN" => Ok(Type::BOOLEAN),
148
"INT32" => Ok(Type::INT32),
149
"INT64" => Ok(Type::INT64),
150
"INT96" => Ok(Type::INT96),
151
"FLOAT" => Ok(Type::FLOAT),
152
"DOUBLE" => Ok(Type::DOUBLE),
153
"BYTE_ARRAY" | "BINARY" => Ok(Type::BYTE_ARRAY),
154
"FIXED_LEN_BYTE_ARRAY" => Ok(Type::FIXED_LEN_BYTE_ARRAY),
155
other => Err(ParquetError::oos(format!("Invalid type {other}"))),
156
}
157
}
158
159
/// Parses message type as string into a Parquet [`ParquetType`](crate::parquet::schema::types::ParquetType).
160
///
161
/// This could, for example, be used to extract individual columns.
162
///
163
/// Returns Parquet general error when parsing or validation fails.
164
pub fn from_message(message_type: &str) -> ParquetResult<ParquetType> {
165
let mut parser = Parser {
166
tokenizer: &mut Tokenizer::from_str(message_type),
167
};
168
parser.parse_message_type()
169
}
170
171
/// Tokenizer to split message type string into tokens that are separated using characters
172
/// defined in `is_schema_delim` method. Tokenizer also preserves delimiters as tokens.
173
/// Tokenizer provides Iterator interface to process tokens; it also allows to step back
174
/// to reprocess previous tokens.
175
struct Tokenizer<'a> {
176
// List of all tokens for a string
177
tokens: Vec<&'a str>,
178
// Current index of vector
179
index: usize,
180
}
181
182
impl<'a> Tokenizer<'a> {
183
// Create tokenizer from message type string
184
pub fn from_str(string: &'a str) -> Self {
185
let vec = string
186
.split_whitespace()
187
.flat_map(Self::split_token)
188
.collect();
189
Tokenizer {
190
tokens: vec,
191
index: 0,
192
}
193
}
194
195
// List of all special characters in schema
196
fn is_schema_delim(c: char) -> bool {
197
c == ';' || c == '{' || c == '}' || c == '(' || c == ')' || c == '=' || c == ','
198
}
199
200
/// Splits string into tokens; input string can already be token or can contain
201
/// delimiters, e.g. required" -> Vec("required") and
202
/// "(UTF8);" -> Vec("(", "UTF8", ")", ";")
203
fn split_token(string: &str) -> Vec<&str> {
204
let mut buffer: Vec<&str> = Vec::new();
205
let mut tail = string;
206
while let Some(index) = tail.find(Self::is_schema_delim) {
207
let (h, t) = tail.split_at(index);
208
if !h.is_empty() {
209
buffer.push(h);
210
}
211
buffer.push(&t[0..1]);
212
tail = &t[1..];
213
}
214
if !tail.is_empty() {
215
buffer.push(tail);
216
}
217
buffer
218
}
219
220
// Move pointer to a previous element
221
fn backtrack(&mut self) {
222
self.index -= 1;
223
}
224
}
225
226
impl<'a> Iterator for Tokenizer<'a> {
227
type Item = &'a str;
228
229
fn next(&mut self) -> Option<&'a str> {
230
if self.index < self.tokens.len() {
231
self.index += 1;
232
Some(self.tokens[self.index - 1])
233
} else {
234
None
235
}
236
}
237
}
238
239
/// Internal Schema parser.
240
/// Traverses message type using tokenizer and parses each group/primitive type
241
/// recursively.
242
struct Parser<'a> {
243
tokenizer: &'a mut Tokenizer<'a>,
244
}
245
246
// Utility function to assert token on validity.
247
fn assert_token(token: Option<&str>, expected: &str) -> ParquetResult<()> {
248
match token {
249
Some(value) if value == expected => Ok(()),
250
Some(other) => Err(ParquetError::oos(format!(
251
"Expected '{expected}', found token '{other}'"
252
))),
253
None => Err(ParquetError::oos(format!(
254
"Expected '{expected}', but no token found (None)"
255
))),
256
}
257
}
258
259
// Utility function to parse i32 or return general error.
260
fn parse_i32(value: Option<&str>, not_found_msg: &str, parse_fail_msg: &str) -> ParquetResult<i32> {
261
value
262
.ok_or_else(|| ParquetError::oos(not_found_msg))
263
.and_then(|v| {
264
v.parse::<i32>()
265
.map_err(|_| ParquetError::oos(parse_fail_msg))
266
})
267
}
268
269
// Utility function to parse boolean or return general error.
270
#[inline]
271
fn parse_bool(
272
value: Option<&str>,
273
not_found_msg: &str,
274
parse_fail_msg: &str,
275
) -> ParquetResult<bool> {
276
value
277
.ok_or_else(|| ParquetError::oos(not_found_msg))
278
.and_then(|v| {
279
v.to_lowercase()
280
.parse::<bool>()
281
.map_err(|_| ParquetError::oos(parse_fail_msg))
282
})
283
}
284
285
// Utility function to parse TimeUnit or return general error.
286
fn parse_timeunit(
287
value: Option<&str>,
288
not_found_msg: &str,
289
parse_fail_msg: &str,
290
) -> ParquetResult<TimeUnit> {
291
value
292
.ok_or_else(|| ParquetError::oos(not_found_msg))
293
.and_then(|v| match v.to_uppercase().as_str() {
294
"MILLIS" => Ok(TimeUnit::Milliseconds),
295
"MICROS" => Ok(TimeUnit::Microseconds),
296
"NANOS" => Ok(TimeUnit::Nanoseconds),
297
_ => Err(ParquetError::oos(parse_fail_msg)),
298
})
299
}
300
301
impl Parser<'_> {
302
// Entry function to parse message type, uses internal tokenizer.
303
fn parse_message_type(&mut self) -> ParquetResult<ParquetType> {
304
// Check that message type starts with "message".
305
match self.tokenizer.next() {
306
Some("message") => {
307
let name = self
308
.tokenizer
309
.next()
310
.ok_or_else(|| ParquetError::oos("Expected name, found None"))?;
311
let fields = self.parse_child_types()?;
312
Ok(ParquetType::new_root(PlSmallStr::from_str(name), fields))
313
},
314
_ => Err(ParquetError::oos(
315
"Message type does not start with 'message'",
316
)),
317
}
318
}
319
320
// Parses child types for a current group type.
321
// This is only invoked on root and group types.
322
fn parse_child_types(&mut self) -> ParquetResult<Vec<ParquetType>> {
323
assert_token(self.tokenizer.next(), "{")?;
324
let mut vec = Vec::new();
325
while let Some(value) = self.tokenizer.next() {
326
if value == "}" {
327
break;
328
} else {
329
self.tokenizer.backtrack();
330
vec.push(self.add_type()?);
331
}
332
}
333
Ok(vec)
334
}
335
336
fn add_type(&mut self) -> ParquetResult<ParquetType> {
337
// Parse repetition
338
let repetition = self
339
.tokenizer
340
.next()
341
.ok_or_else(|| ParquetError::oos("Expected repetition, found None"))
342
.and_then(|v| repetition_from_str(&v.to_uppercase()))?;
343
344
match self.tokenizer.next() {
345
Some(group) if group.to_uppercase() == "GROUP" => self.add_group_type(repetition),
346
Some(type_string) => {
347
let physical_type = type_from_str(&type_string.to_uppercase())?;
348
self.add_primitive_type(repetition, physical_type)
349
},
350
None => Err(ParquetError::oos(
351
"Invalid type, could not extract next token",
352
)),
353
}
354
}
355
356
fn add_group_type(&mut self, repetition: Repetition) -> ParquetResult<ParquetType> {
357
// Parse name of the group type
358
let name = self
359
.tokenizer
360
.next()
361
.ok_or_else(|| ParquetError::oos("Expected name, found None"))?;
362
363
// Parse converted type if exists
364
let converted_type = if let Some("(") = self.tokenizer.next() {
365
let converted_type = self
366
.tokenizer
367
.next()
368
.ok_or_else(|| ParquetError::oos("Expected converted type, found None"))
369
.and_then(|v| converted_group_from_str(&v.to_uppercase()))?;
370
assert_token(self.tokenizer.next(), ")")?;
371
Some(converted_type)
372
} else {
373
self.tokenizer.backtrack();
374
None
375
};
376
377
// Parse optional id
378
let id = if let Some("=") = self.tokenizer.next() {
379
self.tokenizer.next().and_then(|v| v.parse::<i32>().ok())
380
} else {
381
self.tokenizer.backtrack();
382
None
383
};
384
385
let fields = self.parse_child_types()?;
386
387
Ok(ParquetType::from_converted(
388
PlSmallStr::from_str(name),
389
fields,
390
repetition,
391
converted_type,
392
id,
393
))
394
}
395
396
fn add_primitive_type(
397
&mut self,
398
repetition: Repetition,
399
physical_type: Type,
400
) -> ParquetResult<ParquetType> {
401
// Read type length if the type is FIXED_LEN_BYTE_ARRAY.
402
let length = if physical_type == Type::FIXED_LEN_BYTE_ARRAY {
403
assert_token(self.tokenizer.next(), "(")?;
404
let length = parse_i32(
405
self.tokenizer.next(),
406
"Expected length for FIXED_LEN_BYTE_ARRAY, found None",
407
"Failed to parse length for FIXED_LEN_BYTE_ARRAY",
408
)?;
409
assert_token(self.tokenizer.next(), ")")?;
410
Some(length)
411
} else {
412
None
413
};
414
415
// Parse name of the primitive type
416
let name = self
417
.tokenizer
418
.next()
419
.ok_or_else(|| ParquetError::oos("Expected name, found None"))?;
420
421
// Parse logical types
422
let (converted_type, logical_type) = if let Some("(") = self.tokenizer.next() {
423
let (is_logical_type, converted_type, token) = self
424
.tokenizer
425
.next()
426
.ok_or_else(|| ParquetError::oos("Expected converted or logical type, found None"))
427
.and_then(|v| {
428
let string = v.to_uppercase();
429
Ok(if is_logical_type(&string) {
430
(true, None, string)
431
} else if is_converted_type(&string) {
432
(false, converted_primitive_from_str(&string), string)
433
} else {
434
return Err(ParquetError::oos(format!(
435
"Expected converted or logical type, found {string}"
436
)));
437
})
438
})?;
439
440
let logical_type = if is_logical_type {
441
Some(self.parse_logical_type(&token)?)
442
} else {
443
None
444
};
445
446
// converted type decimal
447
let converted_type = match converted_type {
448
Some(PrimitiveConvertedType::Decimal(_, _)) => {
449
Some(self.parse_converted_decimal()?)
450
},
451
other => other,
452
};
453
454
assert_token(self.tokenizer.next(), ")")?;
455
(converted_type, logical_type)
456
} else {
457
self.tokenizer.backtrack();
458
(None, None)
459
};
460
461
// Parse optional id
462
let id = if let Some("=") = self.tokenizer.next() {
463
self.tokenizer.next().and_then(|v| v.parse::<i32>().ok())
464
} else {
465
self.tokenizer.backtrack();
466
None
467
};
468
assert_token(self.tokenizer.next(), ";")?;
469
470
ParquetType::try_from_primitive(
471
PlSmallStr::from_str(name),
472
(physical_type, length).try_into()?,
473
repetition,
474
converted_type,
475
logical_type,
476
id,
477
)
478
}
479
480
fn parse_converted_decimal(&mut self) -> ParquetResult<PrimitiveConvertedType> {
481
assert_token(self.tokenizer.next(), "(")?;
482
// Parse precision
483
let precision = parse_i32(
484
self.tokenizer.next(),
485
"Expected precision, found None",
486
"Failed to parse precision for DECIMAL type",
487
)?;
488
489
// Parse scale
490
let scale = if let Some(",") = self.tokenizer.next() {
491
parse_i32(
492
self.tokenizer.next(),
493
"Expected scale, found None",
494
"Failed to parse scale for DECIMAL type",
495
)?
496
} else {
497
// Scale is not provided, set it to 0.
498
self.tokenizer.backtrack();
499
0
500
};
501
502
assert_token(self.tokenizer.next(), ")")?;
503
Ok(PrimitiveConvertedType::Decimal(
504
precision.try_into()?,
505
scale.try_into()?,
506
))
507
}
508
509
fn parse_logical_type(&mut self, tpe: &str) -> ParquetResult<PrimitiveLogicalType> {
510
Ok(match tpe {
511
"ENUM" => PrimitiveLogicalType::Enum,
512
"DATE" => PrimitiveLogicalType::Date,
513
"DECIMAL" => {
514
let (precision, scale) = if let Some("(") = self.tokenizer.next() {
515
let precision = parse_i32(
516
self.tokenizer.next(),
517
"Expected precision, found None",
518
"Failed to parse precision for DECIMAL type",
519
)?;
520
let scale = if let Some(",") = self.tokenizer.next() {
521
parse_i32(
522
self.tokenizer.next(),
523
"Expected scale, found None",
524
"Failed to parse scale for DECIMAL type",
525
)?
526
} else {
527
self.tokenizer.backtrack();
528
0
529
};
530
assert_token(self.tokenizer.next(), ")")?;
531
(precision, scale)
532
} else {
533
self.tokenizer.backtrack();
534
(0, 0)
535
};
536
PrimitiveLogicalType::Decimal(precision.try_into()?, scale.try_into()?)
537
},
538
"TIME" => {
539
let (unit, is_adjusted_to_utc) = if let Some("(") = self.tokenizer.next() {
540
let unit = parse_timeunit(
541
self.tokenizer.next(),
542
"Invalid timeunit found",
543
"Failed to parse timeunit for TIME type",
544
)?;
545
let is_adjusted_to_utc = if let Some(",") = self.tokenizer.next() {
546
parse_bool(
547
self.tokenizer.next(),
548
"Invalid boolean found",
549
"Failed to parse timezone info for TIME type",
550
)?
551
} else {
552
self.tokenizer.backtrack();
553
false
554
};
555
assert_token(self.tokenizer.next(), ")")?;
556
(unit, is_adjusted_to_utc)
557
} else {
558
self.tokenizer.backtrack();
559
(TimeUnit::Milliseconds, false)
560
};
561
PrimitiveLogicalType::Time {
562
is_adjusted_to_utc,
563
unit,
564
}
565
},
566
"TIMESTAMP" => {
567
let (unit, is_adjusted_to_utc) = if let Some("(") = self.tokenizer.next() {
568
let unit = parse_timeunit(
569
self.tokenizer.next(),
570
"Invalid timeunit found",
571
"Failed to parse timeunit for TIMESTAMP type",
572
)?;
573
let is_adjusted_to_utc = if let Some(",") = self.tokenizer.next() {
574
parse_bool(
575
self.tokenizer.next(),
576
"Invalid boolean found",
577
"Failed to parse timezone info for TIMESTAMP type",
578
)?
579
} else {
580
// Invalid token for unit
581
self.tokenizer.backtrack();
582
false
583
};
584
assert_token(self.tokenizer.next(), ")")?;
585
(unit, is_adjusted_to_utc)
586
} else {
587
self.tokenizer.backtrack();
588
(TimeUnit::Milliseconds, false)
589
};
590
PrimitiveLogicalType::Timestamp {
591
is_adjusted_to_utc,
592
unit,
593
}
594
},
595
"INTEGER" => {
596
let (bit_width, is_signed) = if let Some("(") = self.tokenizer.next() {
597
let bit_width = parse_i32(
598
self.tokenizer.next(),
599
"Invalid bit_width found",
600
"Failed to parse bit_width for INTEGER type",
601
)?;
602
let is_signed = if let Some(",") = self.tokenizer.next() {
603
parse_bool(
604
self.tokenizer.next(),
605
"Invalid boolean found",
606
"Failed to parse is_signed for INTEGER type",
607
)?
608
} else {
609
// Invalid token for unit
610
self.tokenizer.backtrack();
611
return Err(ParquetError::oos("INTEGER requires sign"));
612
};
613
assert_token(self.tokenizer.next(), ")")?;
614
(bit_width, is_signed)
615
} else {
616
// Invalid token for unit
617
self.tokenizer.backtrack();
618
return Err(ParquetError::oos("INTEGER requires width and sign"));
619
};
620
PrimitiveLogicalType::Integer((bit_width, is_signed).into())
621
},
622
"STRING" => PrimitiveLogicalType::String,
623
"JSON" => PrimitiveLogicalType::Json,
624
"BSON" => PrimitiveLogicalType::Bson,
625
"UUID" => PrimitiveLogicalType::Uuid,
626
"UNKNOWN" => PrimitiveLogicalType::Unknown,
627
"INTERVAL" => return Err(ParquetError::oos("Interval logical type not yet supported")),
628
_ => unreachable!(),
629
})
630
}
631
}
632
633
#[cfg(test)]
634
mod tests {
635
use types::IntegerType;
636
637
use super::*;
638
use crate::parquet::schema::types::PhysicalType;
639
640
#[test]
641
fn test_tokenize_empty_string() {
642
assert_eq!(Tokenizer::from_str("").next(), None);
643
}
644
645
#[test]
646
fn test_tokenize_delimiters() {
647
let mut iter = Tokenizer::from_str(",;{}()=");
648
assert_eq!(iter.next(), Some(","));
649
assert_eq!(iter.next(), Some(";"));
650
assert_eq!(iter.next(), Some("{"));
651
assert_eq!(iter.next(), Some("}"));
652
assert_eq!(iter.next(), Some("("));
653
assert_eq!(iter.next(), Some(")"));
654
assert_eq!(iter.next(), Some("="));
655
assert_eq!(iter.next(), None);
656
}
657
658
#[test]
659
fn test_tokenize_delimiters_with_whitespaces() {
660
let mut iter = Tokenizer::from_str(" , ; { } ( ) = ");
661
assert_eq!(iter.next(), Some(","));
662
assert_eq!(iter.next(), Some(";"));
663
assert_eq!(iter.next(), Some("{"));
664
assert_eq!(iter.next(), Some("}"));
665
assert_eq!(iter.next(), Some("("));
666
assert_eq!(iter.next(), Some(")"));
667
assert_eq!(iter.next(), Some("="));
668
assert_eq!(iter.next(), None);
669
}
670
671
#[test]
672
fn test_tokenize_words() {
673
let mut iter = Tokenizer::from_str("abc def ghi jkl mno");
674
assert_eq!(iter.next(), Some("abc"));
675
assert_eq!(iter.next(), Some("def"));
676
assert_eq!(iter.next(), Some("ghi"));
677
assert_eq!(iter.next(), Some("jkl"));
678
assert_eq!(iter.next(), Some("mno"));
679
assert_eq!(iter.next(), None);
680
}
681
682
#[test]
683
fn test_tokenize_backtrack() {
684
let mut iter = Tokenizer::from_str("abc;");
685
assert_eq!(iter.next(), Some("abc"));
686
assert_eq!(iter.next(), Some(";"));
687
iter.backtrack();
688
assert_eq!(iter.next(), Some(";"));
689
assert_eq!(iter.next(), None);
690
}
691
692
#[test]
693
fn test_tokenize_message_type() {
694
let schema = "
695
message schema {
696
required int32 a;
697
optional binary c (UTF8);
698
required group d {
699
required int32 a;
700
optional binary c (UTF8);
701
}
702
required group e (LIST) {
703
repeated group list {
704
required int32 element;
705
}
706
}
707
}
708
";
709
let iter = Tokenizer::from_str(schema);
710
let mut res = Vec::new();
711
for token in iter {
712
res.push(token);
713
}
714
assert_eq!(
715
res,
716
vec![
717
"message", "schema", "{", "required", "int32", "a", ";", "optional", "binary", "c",
718
"(", "UTF8", ")", ";", "required", "group", "d", "{", "required", "int32", "a",
719
";", "optional", "binary", "c", "(", "UTF8", ")", ";", "}", "required", "group",
720
"e", "(", "LIST", ")", "{", "repeated", "group", "list", "{", "required", "int32",
721
"element", ";", "}", "}", "}"
722
]
723
);
724
}
725
726
#[test]
727
fn test_assert_token() {
728
assert!(assert_token(Some("a"), "a").is_ok());
729
assert!(assert_token(Some("a"), "b").is_err());
730
assert!(assert_token(None, "b").is_err());
731
}
732
733
#[test]
734
fn test_parse_message_type_invalid() {
735
let mut iter = Tokenizer::from_str("test");
736
let result = Parser {
737
tokenizer: &mut iter,
738
}
739
.parse_message_type();
740
assert!(result.is_err());
741
assert_eq!(
742
result.unwrap_err().to_string(),
743
"File out of specification: Message type does not start with 'message'"
744
);
745
}
746
747
#[test]
748
fn test_parse_message_type_no_name() {
749
let mut iter = Tokenizer::from_str("message");
750
let result = Parser {
751
tokenizer: &mut iter,
752
}
753
.parse_message_type();
754
assert!(result.is_err());
755
assert_eq!(
756
result.unwrap_err().to_string(),
757
"File out of specification: Expected name, found None"
758
);
759
}
760
761
#[test]
762
fn test_parse_message_type_fixed_byte_array() {
763
let schema = "
764
message schema {
765
REQUIRED FIXED_LEN_BYTE_ARRAY col;
766
}
767
";
768
let mut iter = Tokenizer::from_str(schema);
769
let result = Parser {
770
tokenizer: &mut iter,
771
}
772
.parse_message_type();
773
assert!(result.is_err());
774
775
let schema = "
776
message schema {
777
REQUIRED FIXED_LEN_BYTE_ARRAY(16) col;
778
}
779
";
780
let mut iter = Tokenizer::from_str(schema);
781
let result = Parser {
782
tokenizer: &mut iter,
783
}
784
.parse_message_type();
785
assert!(result.is_ok());
786
}
787
788
#[test]
789
fn test_parse_message_type_decimal() {
790
// It is okay for decimal to omit precision and scale with right syntax.
791
// Here we test wrong syntax of decimal type
792
793
// Invalid decimal syntax
794
let schema = "
795
message root {
796
optional int32 f1 (DECIMAL();
797
}
798
";
799
let mut iter = Tokenizer::from_str(schema);
800
let result = Parser {
801
tokenizer: &mut iter,
802
}
803
.parse_message_type();
804
assert!(result.is_err());
805
806
// Invalid decimal, need precision and scale
807
let schema = "
808
message root {
809
optional int32 f1 (DECIMAL());
810
}
811
";
812
let mut iter = Tokenizer::from_str(schema);
813
let result = Parser {
814
tokenizer: &mut iter,
815
}
816
.parse_message_type();
817
assert!(result.is_err());
818
819
// Invalid decimal because of `,` - has precision, needs scale
820
let schema = "
821
message root {
822
optional int32 f1 (DECIMAL(8,));
823
}
824
";
825
let mut iter = Tokenizer::from_str(schema);
826
let result = Parser {
827
tokenizer: &mut iter,
828
}
829
.parse_message_type();
830
assert!(result.is_err());
831
}
832
833
#[test]
834
fn test_parse_decimal_wrong() {
835
// Invalid decimal because, we always require either precision or scale to be
836
// specified as part of converted type
837
let schema = "
838
message root {
839
optional int32 f3 (DECIMAL);
840
}
841
";
842
let mut iter = Tokenizer::from_str(schema);
843
let result = Parser {
844
tokenizer: &mut iter,
845
}
846
.parse_message_type();
847
assert!(result.is_err());
848
849
// Valid decimal (precision, scale)
850
let schema = "
851
message root {
852
optional int32 f1 (DECIMAL(8, 3));
853
optional int32 f2 (DECIMAL(8));
854
}
855
";
856
let mut iter = Tokenizer::from_str(schema);
857
let result = Parser {
858
tokenizer: &mut iter,
859
}
860
.parse_message_type();
861
assert!(result.is_ok());
862
}
863
864
#[test]
865
fn test_parse_message_type_compare_1() -> ParquetResult<()> {
866
let schema = "
867
message root {
868
optional fixed_len_byte_array(5) f1 (DECIMAL(9, 3));
869
optional fixed_len_byte_array (16) f2 (DECIMAL (38, 18));
870
}
871
";
872
let mut iter = Tokenizer::from_str(schema);
873
let message = Parser {
874
tokenizer: &mut iter,
875
}
876
.parse_message_type()
877
.unwrap();
878
879
let fields = vec![
880
ParquetType::try_from_primitive(
881
PlSmallStr::from_static("f1"),
882
PhysicalType::FixedLenByteArray(5),
883
Repetition::Optional,
884
None,
885
Some(PrimitiveLogicalType::Decimal(9, 3)),
886
None,
887
)?,
888
ParquetType::try_from_primitive(
889
PlSmallStr::from_static("f2"),
890
PhysicalType::FixedLenByteArray(16),
891
Repetition::Optional,
892
None,
893
Some(PrimitiveLogicalType::Decimal(38, 18)),
894
None,
895
)?,
896
];
897
898
let expected = ParquetType::new_root(PlSmallStr::from_static("root"), fields);
899
900
assert_eq!(message, expected);
901
Ok(())
902
}
903
904
#[test]
905
fn test_parse_message_type_compare_2() -> ParquetResult<()> {
906
let schema = "
907
message root {
908
required group a0 {
909
optional group a1 (LIST) {
910
repeated binary a2 (UTF8);
911
}
912
913
optional group b1 (LIST) {
914
repeated group b2 {
915
optional int32 b3;
916
optional double b4;
917
}
918
}
919
}
920
}
921
";
922
let mut iter = Tokenizer::from_str(schema);
923
let message = Parser {
924
tokenizer: &mut iter,
925
}
926
.parse_message_type()
927
.unwrap();
928
929
let a2 = ParquetType::try_from_primitive(
930
"a2".into(),
931
PhysicalType::ByteArray,
932
Repetition::Repeated,
933
Some(PrimitiveConvertedType::Utf8),
934
None,
935
None,
936
)?;
937
let a1 = ParquetType::from_converted(
938
"a1".into(),
939
vec![a2],
940
Repetition::Optional,
941
Some(GroupConvertedType::List),
942
None,
943
);
944
let b2 = ParquetType::from_converted(
945
"b2".into(),
946
vec![
947
ParquetType::from_physical("b3".into(), PhysicalType::Int32),
948
ParquetType::from_physical("b4".into(), PhysicalType::Double),
949
],
950
Repetition::Repeated,
951
None,
952
None,
953
);
954
let b1 = ParquetType::from_converted(
955
"b1".into(),
956
vec![b2],
957
Repetition::Optional,
958
Some(GroupConvertedType::List),
959
None,
960
);
961
let a0 = ParquetType::from_converted(
962
"a0".into(),
963
vec![a1, b1],
964
Repetition::Required,
965
None,
966
None,
967
);
968
969
let expected = ParquetType::new_root("root".into(), vec![a0]);
970
971
assert_eq!(message, expected);
972
Ok(())
973
}
974
975
#[test]
976
fn test_parse_message_type_compare_3() -> ParquetResult<()> {
977
let schema = "
978
message root {
979
required int32 _1 (INT_8);
980
required int32 _2 (INT_16);
981
required float _3;
982
required double _4;
983
optional int32 _5 (DATE);
984
optional binary _6 (UTF8);
985
}
986
";
987
let mut iter = Tokenizer::from_str(schema);
988
let message = Parser {
989
tokenizer: &mut iter,
990
}
991
.parse_message_type()
992
.unwrap();
993
994
let f1 = ParquetType::try_from_primitive(
995
"_1".into(),
996
PhysicalType::Int32,
997
Repetition::Required,
998
Some(PrimitiveConvertedType::Int8),
999
None,
1000
None,
1001
)?;
1002
let f2 = ParquetType::try_from_primitive(
1003
"_2".into(),
1004
PhysicalType::Int32,
1005
Repetition::Required,
1006
Some(PrimitiveConvertedType::Int16),
1007
None,
1008
None,
1009
)?;
1010
let f3 = ParquetType::try_from_primitive(
1011
"_3".into(),
1012
PhysicalType::Float,
1013
Repetition::Required,
1014
None,
1015
None,
1016
None,
1017
)?;
1018
let f4 = ParquetType::try_from_primitive(
1019
"_4".into(),
1020
PhysicalType::Double,
1021
Repetition::Required,
1022
None,
1023
None,
1024
None,
1025
)?;
1026
let f5 = ParquetType::try_from_primitive(
1027
"_5".into(),
1028
PhysicalType::Int32,
1029
Repetition::Optional,
1030
None,
1031
Some(PrimitiveLogicalType::Date),
1032
None,
1033
)?;
1034
let f6 = ParquetType::try_from_primitive(
1035
"_6".into(),
1036
PhysicalType::ByteArray,
1037
Repetition::Optional,
1038
Some(PrimitiveConvertedType::Utf8),
1039
None,
1040
None,
1041
)?;
1042
1043
let fields = vec![f1, f2, f3, f4, f5, f6];
1044
1045
let expected = ParquetType::new_root("root".into(), fields);
1046
assert_eq!(message, expected);
1047
Ok(())
1048
}
1049
1050
#[test]
1051
fn test_parse_message_type_compare_4() -> ParquetResult<()> {
1052
let schema = "
1053
message root {
1054
required int32 _1 (INTEGER(8,true));
1055
required int32 _2 (INTEGER(16,false));
1056
required float _3;
1057
required double _4;
1058
optional int32 _5 (DATE);
1059
optional int32 _6 (TIME(MILLIS,false));
1060
optional int64 _7 (TIME(MICROS,true));
1061
optional int64 _8 (TIMESTAMP(MILLIS,true));
1062
optional int64 _9 (TIMESTAMP(NANOS,false));
1063
optional binary _10 (STRING);
1064
}
1065
";
1066
let mut iter = Tokenizer::from_str(schema);
1067
let message = Parser {
1068
tokenizer: &mut iter,
1069
}
1070
.parse_message_type()?;
1071
1072
let f1 = ParquetType::try_from_primitive(
1073
"_1".into(),
1074
PhysicalType::Int32,
1075
Repetition::Required,
1076
None,
1077
Some(PrimitiveLogicalType::Integer(IntegerType::Int8)),
1078
None,
1079
)?;
1080
let f2 = ParquetType::try_from_primitive(
1081
"_2".into(),
1082
PhysicalType::Int32,
1083
Repetition::Required,
1084
None,
1085
Some(PrimitiveLogicalType::Integer(IntegerType::UInt16)),
1086
None,
1087
)?;
1088
let f3 = ParquetType::try_from_primitive(
1089
"_3".into(),
1090
PhysicalType::Float,
1091
Repetition::Required,
1092
None,
1093
None,
1094
None,
1095
)?;
1096
let f4 = ParquetType::try_from_primitive(
1097
"_4".into(),
1098
PhysicalType::Double,
1099
Repetition::Required,
1100
None,
1101
None,
1102
None,
1103
)?;
1104
let f5 = ParquetType::try_from_primitive(
1105
"_5".into(),
1106
PhysicalType::Int32,
1107
Repetition::Optional,
1108
None,
1109
Some(PrimitiveLogicalType::Date),
1110
None,
1111
)?;
1112
let f6 = ParquetType::try_from_primitive(
1113
"_6".into(),
1114
PhysicalType::Int32,
1115
Repetition::Optional,
1116
None,
1117
Some(PrimitiveLogicalType::Time {
1118
is_adjusted_to_utc: false,
1119
unit: TimeUnit::Milliseconds,
1120
}),
1121
None,
1122
)?;
1123
let f7 = ParquetType::try_from_primitive(
1124
"_7".into(),
1125
PhysicalType::Int64,
1126
Repetition::Optional,
1127
None,
1128
Some(PrimitiveLogicalType::Time {
1129
is_adjusted_to_utc: true,
1130
unit: TimeUnit::Microseconds,
1131
}),
1132
None,
1133
)?;
1134
let f8 = ParquetType::try_from_primitive(
1135
"_8".into(),
1136
PhysicalType::Int64,
1137
Repetition::Optional,
1138
None,
1139
Some(PrimitiveLogicalType::Timestamp {
1140
is_adjusted_to_utc: true,
1141
unit: TimeUnit::Milliseconds,
1142
}),
1143
None,
1144
)?;
1145
let f9 = ParquetType::try_from_primitive(
1146
"_9".into(),
1147
PhysicalType::Int64,
1148
Repetition::Optional,
1149
None,
1150
Some(PrimitiveLogicalType::Timestamp {
1151
is_adjusted_to_utc: false,
1152
unit: TimeUnit::Nanoseconds,
1153
}),
1154
None,
1155
)?;
1156
1157
let f10 = ParquetType::try_from_primitive(
1158
"_10".into(),
1159
PhysicalType::ByteArray,
1160
Repetition::Optional,
1161
None,
1162
Some(PrimitiveLogicalType::String),
1163
None,
1164
)?;
1165
1166
let fields = vec![f1, f2, f3, f4, f5, f6, f7, f8, f9, f10];
1167
1168
let expected = ParquetType::new_root("root".into(), fields);
1169
assert_eq!(message, expected);
1170
Ok(())
1171
}
1172
}
1173
1174