Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/parquet_bridge.rs
6940 views
1
// Bridges structs from thrift-generated code to rust enums.
2
3
#[cfg(feature = "serde")]
4
use serde::{Deserialize, Serialize};
5
6
use super::thrift_format::{
7
BoundaryOrder as ParquetBoundaryOrder, CompressionCodec, DataPageHeader, DataPageHeaderV2,
8
DecimalType, Encoding as ParquetEncoding, FieldRepetitionType, IntType,
9
LogicalType as ParquetLogicalType, PageType as ParquetPageType, TimeType,
10
TimeUnit as ParquetTimeUnit, TimestampType,
11
};
12
use crate::parquet::error::ParquetError;
13
14
/// The repetition of a parquet field
15
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
16
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
17
pub enum Repetition {
18
/// When the field has no null values
19
Required,
20
/// When the field may have null values
21
Optional,
22
/// When the field may be repeated (list field)
23
Repeated,
24
}
25
26
impl TryFrom<FieldRepetitionType> for Repetition {
27
type Error = ParquetError;
28
29
fn try_from(repetition: FieldRepetitionType) -> Result<Self, Self::Error> {
30
Ok(match repetition {
31
FieldRepetitionType::REQUIRED => Repetition::Required,
32
FieldRepetitionType::OPTIONAL => Repetition::Optional,
33
FieldRepetitionType::REPEATED => Repetition::Repeated,
34
_ => return Err(ParquetError::oos("Thrift out of range")),
35
})
36
}
37
}
38
39
impl From<Repetition> for FieldRepetitionType {
40
fn from(repetition: Repetition) -> Self {
41
match repetition {
42
Repetition::Required => FieldRepetitionType::REQUIRED,
43
Repetition::Optional => FieldRepetitionType::OPTIONAL,
44
Repetition::Repeated => FieldRepetitionType::REPEATED,
45
}
46
}
47
}
48
49
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
50
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
51
pub enum Compression {
52
Uncompressed,
53
Snappy,
54
Gzip,
55
Lzo,
56
Brotli,
57
Lz4,
58
Zstd,
59
Lz4Raw,
60
}
61
62
impl TryFrom<CompressionCodec> for Compression {
63
type Error = ParquetError;
64
65
fn try_from(codec: CompressionCodec) -> Result<Self, Self::Error> {
66
Ok(match codec {
67
CompressionCodec::UNCOMPRESSED => Compression::Uncompressed,
68
CompressionCodec::SNAPPY => Compression::Snappy,
69
CompressionCodec::GZIP => Compression::Gzip,
70
CompressionCodec::LZO => Compression::Lzo,
71
CompressionCodec::BROTLI => Compression::Brotli,
72
CompressionCodec::LZ4 => Compression::Lz4,
73
CompressionCodec::ZSTD => Compression::Zstd,
74
CompressionCodec::LZ4_RAW => Compression::Lz4Raw,
75
_ => return Err(ParquetError::oos("Thrift out of range")),
76
})
77
}
78
}
79
80
impl From<Compression> for CompressionCodec {
81
fn from(codec: Compression) -> Self {
82
match codec {
83
Compression::Uncompressed => CompressionCodec::UNCOMPRESSED,
84
Compression::Snappy => CompressionCodec::SNAPPY,
85
Compression::Gzip => CompressionCodec::GZIP,
86
Compression::Lzo => CompressionCodec::LZO,
87
Compression::Brotli => CompressionCodec::BROTLI,
88
Compression::Lz4 => CompressionCodec::LZ4,
89
Compression::Zstd => CompressionCodec::ZSTD,
90
Compression::Lz4Raw => CompressionCodec::LZ4_RAW,
91
}
92
}
93
}
94
95
/// Defines the compression settings for writing a parquet file.
96
///
97
/// If None is provided as a compression setting, then the default compression level is used.
98
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
99
pub enum CompressionOptions {
100
Uncompressed,
101
Snappy,
102
Gzip(Option<GzipLevel>),
103
Lzo,
104
Brotli(Option<BrotliLevel>),
105
Lz4,
106
Zstd(Option<ZstdLevel>),
107
Lz4Raw,
108
}
109
110
impl From<CompressionOptions> for Compression {
111
fn from(value: CompressionOptions) -> Self {
112
match value {
113
CompressionOptions::Uncompressed => Compression::Uncompressed,
114
CompressionOptions::Snappy => Compression::Snappy,
115
CompressionOptions::Gzip(_) => Compression::Gzip,
116
CompressionOptions::Lzo => Compression::Lzo,
117
CompressionOptions::Brotli(_) => Compression::Brotli,
118
CompressionOptions::Lz4 => Compression::Lz4,
119
CompressionOptions::Zstd(_) => Compression::Zstd,
120
CompressionOptions::Lz4Raw => Compression::Lz4Raw,
121
}
122
}
123
}
124
125
impl From<CompressionOptions> for CompressionCodec {
126
fn from(codec: CompressionOptions) -> Self {
127
match codec {
128
CompressionOptions::Uncompressed => CompressionCodec::UNCOMPRESSED,
129
CompressionOptions::Snappy => CompressionCodec::SNAPPY,
130
CompressionOptions::Gzip(_) => CompressionCodec::GZIP,
131
CompressionOptions::Lzo => CompressionCodec::LZO,
132
CompressionOptions::Brotli(_) => CompressionCodec::BROTLI,
133
CompressionOptions::Lz4 => CompressionCodec::LZ4,
134
CompressionOptions::Zstd(_) => CompressionCodec::ZSTD,
135
CompressionOptions::Lz4Raw => CompressionCodec::LZ4_RAW,
136
}
137
}
138
}
139
140
/// Defines valid compression levels.
141
pub(crate) trait CompressionLevel<T: std::fmt::Display + std::cmp::PartialOrd> {
142
const MINIMUM_LEVEL: T;
143
const MAXIMUM_LEVEL: T;
144
145
/// Tests if the provided compression level is valid.
146
fn is_valid_level(level: T) -> Result<(), ParquetError> {
147
let compression_range = Self::MINIMUM_LEVEL..=Self::MAXIMUM_LEVEL;
148
if compression_range.contains(&level) {
149
Ok(())
150
} else {
151
Err(ParquetError::InvalidParameter(format!(
152
"valid compression range {}..={} exceeded.",
153
compression_range.start(),
154
compression_range.end()
155
)))
156
}
157
}
158
}
159
160
/// Represents a valid brotli compression level.
161
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
162
pub struct BrotliLevel(u32);
163
164
impl Default for BrotliLevel {
165
fn default() -> Self {
166
Self(1)
167
}
168
}
169
170
impl CompressionLevel<u32> for BrotliLevel {
171
const MINIMUM_LEVEL: u32 = 0;
172
const MAXIMUM_LEVEL: u32 = 11;
173
}
174
175
impl BrotliLevel {
176
/// Attempts to create a brotli compression level.
177
///
178
/// Compression levels must be valid.
179
pub fn try_new(level: u32) -> Result<Self, ParquetError> {
180
Self::is_valid_level(level).map(|_| Self(level))
181
}
182
183
/// Returns the compression level.
184
pub fn compression_level(&self) -> u32 {
185
self.0
186
}
187
}
188
189
/// Represents a valid gzip compression level.
190
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
191
pub struct GzipLevel(u8);
192
193
impl Default for GzipLevel {
194
fn default() -> Self {
195
// The default as of miniz_oxide 0.5.1 is 6 for compression level
196
// (miniz_oxide::deflate::CompressionLevel::DefaultLevel)
197
Self(6)
198
}
199
}
200
201
impl CompressionLevel<u8> for GzipLevel {
202
const MINIMUM_LEVEL: u8 = 0;
203
const MAXIMUM_LEVEL: u8 = 9;
204
}
205
206
impl GzipLevel {
207
/// Attempts to create a gzip compression level.
208
///
209
/// Compression levels must be valid (i.e. be acceptable for [`flate2::Compression`]).
210
pub fn try_new(level: u8) -> Result<Self, ParquetError> {
211
Self::is_valid_level(level).map(|_| Self(level))
212
}
213
214
/// Returns the compression level.
215
pub fn compression_level(&self) -> u8 {
216
self.0
217
}
218
}
219
220
#[cfg(feature = "gzip")]
221
impl From<GzipLevel> for flate2::Compression {
222
fn from(level: GzipLevel) -> Self {
223
Self::new(level.compression_level() as u32)
224
}
225
}
226
227
/// Represents a valid zstd compression level.
228
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
229
pub struct ZstdLevel(i32);
230
231
impl CompressionLevel<i32> for ZstdLevel {
232
// zstd binds to C, and hence zstd::compression_level_range() is not const as this calls the
233
// underlying C library.
234
const MINIMUM_LEVEL: i32 = 1;
235
const MAXIMUM_LEVEL: i32 = 22;
236
}
237
238
impl ZstdLevel {
239
/// Attempts to create a zstd compression level from a given compression level.
240
///
241
/// Compression levels must be valid (i.e. be acceptable for [`zstd::compression_level_range`]).
242
pub fn try_new(level: i32) -> Result<Self, ParquetError> {
243
Self::is_valid_level(level).map(|_| Self(level))
244
}
245
246
/// Returns the compression level.
247
pub fn compression_level(&self) -> i32 {
248
self.0
249
}
250
}
251
252
#[cfg(feature = "zstd")]
253
impl Default for ZstdLevel {
254
fn default() -> Self {
255
Self(zstd::DEFAULT_COMPRESSION_LEVEL)
256
}
257
}
258
259
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
260
pub enum PageType {
261
DataPage,
262
DataPageV2,
263
DictionaryPage,
264
}
265
266
impl TryFrom<ParquetPageType> for PageType {
267
type Error = ParquetError;
268
269
fn try_from(type_: ParquetPageType) -> Result<Self, Self::Error> {
270
Ok(match type_ {
271
ParquetPageType::DATA_PAGE => PageType::DataPage,
272
ParquetPageType::DATA_PAGE_V2 => PageType::DataPageV2,
273
ParquetPageType::DICTIONARY_PAGE => PageType::DictionaryPage,
274
_ => return Err(ParquetError::oos("Thrift out of range")),
275
})
276
}
277
}
278
279
impl From<PageType> for ParquetPageType {
280
fn from(type_: PageType) -> Self {
281
match type_ {
282
PageType::DataPage => ParquetPageType::DATA_PAGE,
283
PageType::DataPageV2 => ParquetPageType::DATA_PAGE_V2,
284
PageType::DictionaryPage => ParquetPageType::DICTIONARY_PAGE,
285
}
286
}
287
}
288
289
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
290
pub enum Encoding {
291
/// Default encoding.
292
/// BOOLEAN - 1 bit per value. 0 is false; 1 is true.
293
/// INT32 - 4 bytes per value. Stored as little-endian.
294
/// INT64 - 8 bytes per value. Stored as little-endian.
295
/// FLOAT - 4 bytes per value. IEEE. Stored as little-endian.
296
/// DOUBLE - 8 bytes per value. IEEE. Stored as little-endian.
297
/// BYTE_ARRAY - 4 byte length stored as little endian, followed by bytes.
298
/// FIXED_LEN_BYTE_ARRAY - Just the bytes.
299
Plain,
300
/// Deprecated: Dictionary encoding. The values in the dictionary are encoded in the
301
/// plain type.
302
/// in a data page use RLE_DICTIONARY instead.
303
/// in a Dictionary page use PLAIN instead
304
PlainDictionary,
305
/// Group packed run length encoding. Usable for definition/repetition levels
306
/// encoding and Booleans (on one bit: 0 is false; 1 is true.)
307
Rle,
308
/// Bit packed encoding. This can only be used if the data has a known max
309
/// width. Usable for definition/repetition levels encoding.
310
BitPacked,
311
/// Delta encoding for integers. This can be used for int columns and works best
312
/// on sorted data
313
DeltaBinaryPacked,
314
/// Encoding for byte arrays to separate the length values and the data. The lengths
315
/// are encoded using DELTA_BINARY_PACKED
316
DeltaLengthByteArray,
317
/// Incremental-encoded byte array. Prefix lengths are encoded using DELTA_BINARY_PACKED.
318
/// Suffixes are stored as delta length byte arrays.
319
DeltaByteArray,
320
/// Dictionary encoding: the ids are encoded using the RLE encoding
321
RleDictionary,
322
/// Encoding for floating-point data.
323
/// K byte-streams are created where K is the size in bytes of the data type.
324
/// The individual bytes of an FP value are scattered to the corresponding stream and
325
/// the streams are concatenated.
326
/// This itself does not reduce the size of the data but can lead to better compression
327
/// afterwards.
328
ByteStreamSplit,
329
}
330
331
impl TryFrom<ParquetEncoding> for Encoding {
332
type Error = ParquetError;
333
334
fn try_from(encoding: ParquetEncoding) -> Result<Self, Self::Error> {
335
Ok(match encoding {
336
ParquetEncoding::PLAIN => Encoding::Plain,
337
ParquetEncoding::PLAIN_DICTIONARY => Encoding::PlainDictionary,
338
ParquetEncoding::RLE => Encoding::Rle,
339
ParquetEncoding::BIT_PACKED => Encoding::BitPacked,
340
ParquetEncoding::DELTA_BINARY_PACKED => Encoding::DeltaBinaryPacked,
341
ParquetEncoding::DELTA_LENGTH_BYTE_ARRAY => Encoding::DeltaLengthByteArray,
342
ParquetEncoding::DELTA_BYTE_ARRAY => Encoding::DeltaByteArray,
343
ParquetEncoding::RLE_DICTIONARY => Encoding::RleDictionary,
344
ParquetEncoding::BYTE_STREAM_SPLIT => Encoding::ByteStreamSplit,
345
_ => return Err(ParquetError::oos("Thrift out of range")),
346
})
347
}
348
}
349
350
impl From<Encoding> for ParquetEncoding {
351
fn from(encoding: Encoding) -> Self {
352
match encoding {
353
Encoding::Plain => ParquetEncoding::PLAIN,
354
Encoding::PlainDictionary => ParquetEncoding::PLAIN_DICTIONARY,
355
Encoding::Rle => ParquetEncoding::RLE,
356
Encoding::BitPacked => ParquetEncoding::BIT_PACKED,
357
Encoding::DeltaBinaryPacked => ParquetEncoding::DELTA_BINARY_PACKED,
358
Encoding::DeltaLengthByteArray => ParquetEncoding::DELTA_LENGTH_BYTE_ARRAY,
359
Encoding::DeltaByteArray => ParquetEncoding::DELTA_BYTE_ARRAY,
360
Encoding::RleDictionary => ParquetEncoding::RLE_DICTIONARY,
361
Encoding::ByteStreamSplit => ParquetEncoding::BYTE_STREAM_SPLIT,
362
}
363
}
364
}
365
366
/// Enum to annotate whether lists of min/max elements inside ColumnIndex
367
/// are ordered and if so, in which direction.
368
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
369
pub enum BoundaryOrder {
370
Unordered,
371
Ascending,
372
Descending,
373
}
374
375
impl Default for BoundaryOrder {
376
fn default() -> Self {
377
Self::Unordered
378
}
379
}
380
381
impl TryFrom<ParquetBoundaryOrder> for BoundaryOrder {
382
type Error = ParquetError;
383
384
fn try_from(encoding: ParquetBoundaryOrder) -> Result<Self, Self::Error> {
385
Ok(match encoding {
386
ParquetBoundaryOrder::UNORDERED => BoundaryOrder::Unordered,
387
ParquetBoundaryOrder::ASCENDING => BoundaryOrder::Ascending,
388
ParquetBoundaryOrder::DESCENDING => BoundaryOrder::Descending,
389
_ => return Err(ParquetError::oos("BoundaryOrder Thrift value out of range")),
390
})
391
}
392
}
393
394
impl From<BoundaryOrder> for ParquetBoundaryOrder {
395
fn from(encoding: BoundaryOrder) -> Self {
396
match encoding {
397
BoundaryOrder::Unordered => ParquetBoundaryOrder::UNORDERED,
398
BoundaryOrder::Ascending => ParquetBoundaryOrder::ASCENDING,
399
BoundaryOrder::Descending => ParquetBoundaryOrder::DESCENDING,
400
}
401
}
402
}
403
404
pub trait DataPageHeaderExt {
405
fn encoding(&self) -> Encoding;
406
fn repetition_level_encoding(&self) -> Encoding;
407
fn definition_level_encoding(&self) -> Encoding;
408
}
409
410
impl DataPageHeaderExt for DataPageHeader {
411
fn encoding(&self) -> Encoding {
412
self.encoding.try_into().unwrap()
413
}
414
415
fn repetition_level_encoding(&self) -> Encoding {
416
self.repetition_level_encoding.try_into().unwrap()
417
}
418
419
fn definition_level_encoding(&self) -> Encoding {
420
self.definition_level_encoding.try_into().unwrap()
421
}
422
}
423
424
impl DataPageHeaderExt for DataPageHeaderV2 {
425
fn encoding(&self) -> Encoding {
426
self.encoding.try_into().unwrap()
427
}
428
429
fn repetition_level_encoding(&self) -> Encoding {
430
Encoding::Rle
431
}
432
433
fn definition_level_encoding(&self) -> Encoding {
434
Encoding::Rle
435
}
436
}
437
438
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
439
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
440
pub enum TimeUnit {
441
Milliseconds,
442
Microseconds,
443
Nanoseconds,
444
}
445
446
impl From<ParquetTimeUnit> for TimeUnit {
447
fn from(encoding: ParquetTimeUnit) -> Self {
448
match encoding {
449
ParquetTimeUnit::MILLIS(_) => TimeUnit::Milliseconds,
450
ParquetTimeUnit::MICROS(_) => TimeUnit::Microseconds,
451
ParquetTimeUnit::NANOS(_) => TimeUnit::Nanoseconds,
452
}
453
}
454
}
455
456
impl From<TimeUnit> for ParquetTimeUnit {
457
fn from(unit: TimeUnit) -> Self {
458
match unit {
459
TimeUnit::Milliseconds => ParquetTimeUnit::MILLIS(Default::default()),
460
TimeUnit::Microseconds => ParquetTimeUnit::MICROS(Default::default()),
461
TimeUnit::Nanoseconds => ParquetTimeUnit::NANOS(Default::default()),
462
}
463
}
464
}
465
466
/// Enum of all valid logical integer types
467
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
468
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
469
pub enum IntegerType {
470
Int8,
471
Int16,
472
Int32,
473
Int64,
474
UInt8,
475
UInt16,
476
UInt32,
477
UInt64,
478
}
479
480
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
481
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
482
pub enum PrimitiveLogicalType {
483
String,
484
Enum,
485
Decimal(usize, usize),
486
Date,
487
Time {
488
unit: TimeUnit,
489
is_adjusted_to_utc: bool,
490
},
491
Timestamp {
492
unit: TimeUnit,
493
is_adjusted_to_utc: bool,
494
},
495
Integer(IntegerType),
496
Unknown,
497
Json,
498
Bson,
499
Uuid,
500
Float16,
501
}
502
503
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
504
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
505
pub enum GroupLogicalType {
506
Map,
507
List,
508
}
509
510
impl From<GroupLogicalType> for ParquetLogicalType {
511
fn from(type_: GroupLogicalType) -> Self {
512
match type_ {
513
GroupLogicalType::Map => ParquetLogicalType::MAP(Default::default()),
514
GroupLogicalType::List => ParquetLogicalType::LIST(Default::default()),
515
}
516
}
517
}
518
519
impl From<(i32, bool)> for IntegerType {
520
fn from((bit_width, is_signed): (i32, bool)) -> Self {
521
match (bit_width, is_signed) {
522
(8, true) => IntegerType::Int8,
523
(16, true) => IntegerType::Int16,
524
(32, true) => IntegerType::Int32,
525
(64, true) => IntegerType::Int64,
526
(8, false) => IntegerType::UInt8,
527
(16, false) => IntegerType::UInt16,
528
(32, false) => IntegerType::UInt32,
529
(64, false) => IntegerType::UInt64,
530
// The above are the only possible annotations for parquet's int32. Anything else
531
// is a deviation to the parquet specification and we ignore
532
_ => IntegerType::Int32,
533
}
534
}
535
}
536
537
impl From<IntegerType> for (usize, bool) {
538
fn from(type_: IntegerType) -> (usize, bool) {
539
match type_ {
540
IntegerType::Int8 => (8, true),
541
IntegerType::Int16 => (16, true),
542
IntegerType::Int32 => (32, true),
543
IntegerType::Int64 => (64, true),
544
IntegerType::UInt8 => (8, false),
545
IntegerType::UInt16 => (16, false),
546
IntegerType::UInt32 => (32, false),
547
IntegerType::UInt64 => (64, false),
548
}
549
}
550
}
551
552
impl TryFrom<ParquetLogicalType> for PrimitiveLogicalType {
553
type Error = ParquetError;
554
555
fn try_from(type_: ParquetLogicalType) -> Result<Self, Self::Error> {
556
Ok(match type_ {
557
ParquetLogicalType::STRING(_) => PrimitiveLogicalType::String,
558
ParquetLogicalType::ENUM(_) => PrimitiveLogicalType::Enum,
559
ParquetLogicalType::DECIMAL(decimal) => PrimitiveLogicalType::Decimal(
560
decimal.precision.try_into()?,
561
decimal.scale.try_into()?,
562
),
563
ParquetLogicalType::DATE(_) => PrimitiveLogicalType::Date,
564
ParquetLogicalType::TIME(time) => PrimitiveLogicalType::Time {
565
unit: time.unit.into(),
566
is_adjusted_to_utc: time.is_adjusted_to_u_t_c,
567
},
568
ParquetLogicalType::TIMESTAMP(time) => PrimitiveLogicalType::Timestamp {
569
unit: time.unit.into(),
570
is_adjusted_to_utc: time.is_adjusted_to_u_t_c,
571
},
572
ParquetLogicalType::INTEGER(int) => {
573
PrimitiveLogicalType::Integer((int.bit_width as i32, int.is_signed).into())
574
},
575
ParquetLogicalType::UNKNOWN(_) => PrimitiveLogicalType::Unknown,
576
ParquetLogicalType::JSON(_) => PrimitiveLogicalType::Json,
577
ParquetLogicalType::BSON(_) => PrimitiveLogicalType::Bson,
578
ParquetLogicalType::UUID(_) => PrimitiveLogicalType::Uuid,
579
ParquetLogicalType::FLOAT16(_) => PrimitiveLogicalType::Float16,
580
_ => return Err(ParquetError::oos("LogicalType value out of range")),
581
})
582
}
583
}
584
585
impl TryFrom<ParquetLogicalType> for GroupLogicalType {
586
type Error = ParquetError;
587
588
fn try_from(type_: ParquetLogicalType) -> Result<Self, Self::Error> {
589
Ok(match type_ {
590
ParquetLogicalType::LIST(_) => GroupLogicalType::List,
591
ParquetLogicalType::MAP(_) => GroupLogicalType::Map,
592
_ => return Err(ParquetError::oos("LogicalType value out of range")),
593
})
594
}
595
}
596
597
impl From<PrimitiveLogicalType> for ParquetLogicalType {
598
fn from(type_: PrimitiveLogicalType) -> Self {
599
match type_ {
600
PrimitiveLogicalType::String => ParquetLogicalType::STRING(Default::default()),
601
PrimitiveLogicalType::Enum => ParquetLogicalType::ENUM(Default::default()),
602
PrimitiveLogicalType::Decimal(precision, scale) => {
603
ParquetLogicalType::DECIMAL(DecimalType {
604
precision: precision as i32,
605
scale: scale as i32,
606
})
607
},
608
PrimitiveLogicalType::Date => ParquetLogicalType::DATE(Default::default()),
609
PrimitiveLogicalType::Time {
610
unit,
611
is_adjusted_to_utc,
612
} => ParquetLogicalType::TIME(TimeType {
613
unit: unit.into(),
614
is_adjusted_to_u_t_c: is_adjusted_to_utc,
615
}),
616
PrimitiveLogicalType::Timestamp {
617
unit,
618
is_adjusted_to_utc,
619
} => ParquetLogicalType::TIMESTAMP(TimestampType {
620
unit: unit.into(),
621
is_adjusted_to_u_t_c: is_adjusted_to_utc,
622
}),
623
PrimitiveLogicalType::Integer(integer) => {
624
let (bit_width, is_signed) = integer.into();
625
ParquetLogicalType::INTEGER(IntType {
626
bit_width: bit_width as i8,
627
is_signed,
628
})
629
},
630
PrimitiveLogicalType::Unknown => ParquetLogicalType::UNKNOWN(Default::default()),
631
PrimitiveLogicalType::Json => ParquetLogicalType::JSON(Default::default()),
632
PrimitiveLogicalType::Bson => ParquetLogicalType::BSON(Default::default()),
633
PrimitiveLogicalType::Uuid => ParquetLogicalType::UUID(Default::default()),
634
PrimitiveLogicalType::Float16 => ParquetLogicalType::FLOAT16(Default::default()),
635
}
636
}
637
}
638
639
#[cfg(test)]
640
mod tests {
641
use super::*;
642
643
#[test]
644
fn round_trip_primitive() -> Result<(), ParquetError> {
645
use PrimitiveLogicalType::*;
646
let a = vec![
647
String,
648
Enum,
649
Decimal(3, 1),
650
Date,
651
Time {
652
unit: TimeUnit::Milliseconds,
653
is_adjusted_to_utc: true,
654
},
655
Timestamp {
656
unit: TimeUnit::Milliseconds,
657
is_adjusted_to_utc: true,
658
},
659
Integer(IntegerType::Int16),
660
Unknown,
661
Json,
662
Bson,
663
Uuid,
664
];
665
for a in a {
666
let c: ParquetLogicalType = a.into();
667
let e: PrimitiveLogicalType = c.try_into()?;
668
assert_eq!(e, a);
669
}
670
Ok(())
671
}
672
673
#[test]
674
fn round_trip_encoding() -> Result<(), ParquetError> {
675
use Encoding::*;
676
let a = vec![
677
Plain,
678
PlainDictionary,
679
Rle,
680
BitPacked,
681
DeltaBinaryPacked,
682
DeltaLengthByteArray,
683
DeltaByteArray,
684
RleDictionary,
685
ByteStreamSplit,
686
];
687
for a in a {
688
let c: ParquetEncoding = a.into();
689
let e: Encoding = c.try_into()?;
690
assert_eq!(e, a);
691
}
692
Ok(())
693
}
694
695
#[test]
696
fn round_compression() -> Result<(), ParquetError> {
697
use Compression::*;
698
let a = vec![Uncompressed, Snappy, Gzip, Lzo, Brotli, Lz4, Zstd, Lz4Raw];
699
for a in a {
700
let c: CompressionCodec = a.into();
701
let e: Compression = c.try_into()?;
702
assert_eq!(e, a);
703
}
704
Ok(())
705
}
706
}
707
708