Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/parquet_bridge.rs
8512 views
1
// Bridges structs from thrift-generated code to rust enums.
2
3
pub use polars_utils::compression::*;
4
#[cfg(feature = "serde")]
5
use serde::{Deserialize, Serialize};
6
7
use super::thrift_format::{
8
BoundaryOrder as ParquetBoundaryOrder, CompressionCodec, DataPageHeader, DataPageHeaderV2,
9
DecimalType, Encoding as ParquetEncoding, FieldRepetitionType, IntType,
10
LogicalType as ParquetLogicalType, PageType as ParquetPageType, TimeType,
11
TimeUnit as ParquetTimeUnit, TimestampType,
12
};
13
use crate::parquet::error::ParquetError;
14
15
/// The repetition of a parquet field
16
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
17
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
18
pub enum Repetition {
19
/// When the field has no null values
20
Required,
21
/// When the field may have null values
22
Optional,
23
/// When the field may be repeated (list field)
24
Repeated,
25
}
26
27
impl TryFrom<FieldRepetitionType> for Repetition {
28
type Error = ParquetError;
29
30
fn try_from(repetition: FieldRepetitionType) -> Result<Self, Self::Error> {
31
Ok(match repetition {
32
FieldRepetitionType::REQUIRED => Repetition::Required,
33
FieldRepetitionType::OPTIONAL => Repetition::Optional,
34
FieldRepetitionType::REPEATED => Repetition::Repeated,
35
_ => return Err(ParquetError::oos("Thrift out of range")),
36
})
37
}
38
}
39
40
impl From<Repetition> for FieldRepetitionType {
41
fn from(repetition: Repetition) -> Self {
42
match repetition {
43
Repetition::Required => FieldRepetitionType::REQUIRED,
44
Repetition::Optional => FieldRepetitionType::OPTIONAL,
45
Repetition::Repeated => FieldRepetitionType::REPEATED,
46
}
47
}
48
}
49
50
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
51
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
52
pub enum Compression {
53
Uncompressed,
54
Snappy,
55
Gzip,
56
Lzo,
57
Brotli,
58
Lz4,
59
Zstd,
60
Lz4Raw,
61
}
62
63
impl TryFrom<CompressionCodec> for Compression {
64
type Error = ParquetError;
65
66
fn try_from(codec: CompressionCodec) -> Result<Self, Self::Error> {
67
Ok(match codec {
68
CompressionCodec::UNCOMPRESSED => Compression::Uncompressed,
69
CompressionCodec::SNAPPY => Compression::Snappy,
70
CompressionCodec::GZIP => Compression::Gzip,
71
CompressionCodec::LZO => Compression::Lzo,
72
CompressionCodec::BROTLI => Compression::Brotli,
73
CompressionCodec::LZ4 => Compression::Lz4,
74
CompressionCodec::ZSTD => Compression::Zstd,
75
CompressionCodec::LZ4_RAW => Compression::Lz4Raw,
76
_ => return Err(ParquetError::oos("Thrift out of range")),
77
})
78
}
79
}
80
81
impl From<Compression> for CompressionCodec {
82
fn from(codec: Compression) -> Self {
83
match codec {
84
Compression::Uncompressed => CompressionCodec::UNCOMPRESSED,
85
Compression::Snappy => CompressionCodec::SNAPPY,
86
Compression::Gzip => CompressionCodec::GZIP,
87
Compression::Lzo => CompressionCodec::LZO,
88
Compression::Brotli => CompressionCodec::BROTLI,
89
Compression::Lz4 => CompressionCodec::LZ4,
90
Compression::Zstd => CompressionCodec::ZSTD,
91
Compression::Lz4Raw => CompressionCodec::LZ4_RAW,
92
}
93
}
94
}
95
96
/// Defines the compression settings for writing a parquet file.
97
///
98
/// If None is provided as a compression setting, then the default compression level is used.
99
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
100
pub enum CompressionOptions {
101
Uncompressed,
102
Snappy,
103
Gzip(Option<GzipLevel>),
104
Lzo,
105
Brotli(Option<BrotliLevel>),
106
Lz4,
107
Zstd(Option<ZstdLevel>),
108
Lz4Raw,
109
}
110
111
impl From<CompressionOptions> for Compression {
112
fn from(value: CompressionOptions) -> Self {
113
match value {
114
CompressionOptions::Uncompressed => Compression::Uncompressed,
115
CompressionOptions::Snappy => Compression::Snappy,
116
CompressionOptions::Gzip(_) => Compression::Gzip,
117
CompressionOptions::Lzo => Compression::Lzo,
118
CompressionOptions::Brotli(_) => Compression::Brotli,
119
CompressionOptions::Lz4 => Compression::Lz4,
120
CompressionOptions::Zstd(_) => Compression::Zstd,
121
CompressionOptions::Lz4Raw => Compression::Lz4Raw,
122
}
123
}
124
}
125
126
impl From<CompressionOptions> for CompressionCodec {
127
fn from(codec: CompressionOptions) -> Self {
128
match codec {
129
CompressionOptions::Uncompressed => CompressionCodec::UNCOMPRESSED,
130
CompressionOptions::Snappy => CompressionCodec::SNAPPY,
131
CompressionOptions::Gzip(_) => CompressionCodec::GZIP,
132
CompressionOptions::Lzo => CompressionCodec::LZO,
133
CompressionOptions::Brotli(_) => CompressionCodec::BROTLI,
134
CompressionOptions::Lz4 => CompressionCodec::LZ4,
135
CompressionOptions::Zstd(_) => CompressionCodec::ZSTD,
136
CompressionOptions::Lz4Raw => CompressionCodec::LZ4_RAW,
137
}
138
}
139
}
140
141
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
142
pub enum PageType {
143
DataPage,
144
DataPageV2,
145
DictionaryPage,
146
}
147
148
impl TryFrom<ParquetPageType> for PageType {
149
type Error = ParquetError;
150
151
fn try_from(type_: ParquetPageType) -> Result<Self, Self::Error> {
152
Ok(match type_ {
153
ParquetPageType::DATA_PAGE => PageType::DataPage,
154
ParquetPageType::DATA_PAGE_V2 => PageType::DataPageV2,
155
ParquetPageType::DICTIONARY_PAGE => PageType::DictionaryPage,
156
_ => return Err(ParquetError::oos("Thrift out of range")),
157
})
158
}
159
}
160
161
impl From<PageType> for ParquetPageType {
162
fn from(type_: PageType) -> Self {
163
match type_ {
164
PageType::DataPage => ParquetPageType::DATA_PAGE,
165
PageType::DataPageV2 => ParquetPageType::DATA_PAGE_V2,
166
PageType::DictionaryPage => ParquetPageType::DICTIONARY_PAGE,
167
}
168
}
169
}
170
171
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
172
pub enum Encoding {
173
/// Default encoding.
174
/// BOOLEAN - 1 bit per value. 0 is false; 1 is true.
175
/// INT32 - 4 bytes per value. Stored as little-endian.
176
/// INT64 - 8 bytes per value. Stored as little-endian.
177
/// FLOAT - 4 bytes per value. IEEE. Stored as little-endian.
178
/// DOUBLE - 8 bytes per value. IEEE. Stored as little-endian.
179
/// BYTE_ARRAY - 4 byte length stored as little endian, followed by bytes.
180
/// FIXED_LEN_BYTE_ARRAY - Just the bytes.
181
Plain,
182
/// Deprecated: Dictionary encoding. The values in the dictionary are encoded in the
183
/// plain type.
184
/// in a data page use RLE_DICTIONARY instead.
185
/// in a Dictionary page use PLAIN instead
186
PlainDictionary,
187
/// Group packed run length encoding. Usable for definition/repetition levels
188
/// encoding and Booleans (on one bit: 0 is false; 1 is true.)
189
Rle,
190
/// Bit packed encoding. This can only be used if the data has a known max
191
/// width. Usable for definition/repetition levels encoding.
192
BitPacked,
193
/// Delta encoding for integers. This can be used for int columns and works best
194
/// on sorted data
195
DeltaBinaryPacked,
196
/// Encoding for byte arrays to separate the length values and the data. The lengths
197
/// are encoded using DELTA_BINARY_PACKED
198
DeltaLengthByteArray,
199
/// Incremental-encoded byte array. Prefix lengths are encoded using DELTA_BINARY_PACKED.
200
/// Suffixes are stored as delta length byte arrays.
201
DeltaByteArray,
202
/// Dictionary encoding: the ids are encoded using the RLE encoding
203
RleDictionary,
204
/// Encoding for floating-point data.
205
/// K byte-streams are created where K is the size in bytes of the data type.
206
/// The individual bytes of an FP value are scattered to the corresponding stream and
207
/// the streams are concatenated.
208
/// This itself does not reduce the size of the data but can lead to better compression
209
/// afterwards.
210
ByteStreamSplit,
211
}
212
213
impl TryFrom<ParquetEncoding> for Encoding {
214
type Error = ParquetError;
215
216
fn try_from(encoding: ParquetEncoding) -> Result<Self, Self::Error> {
217
Ok(match encoding {
218
ParquetEncoding::PLAIN => Encoding::Plain,
219
ParquetEncoding::PLAIN_DICTIONARY => Encoding::PlainDictionary,
220
ParquetEncoding::RLE => Encoding::Rle,
221
ParquetEncoding::BIT_PACKED => Encoding::BitPacked,
222
ParquetEncoding::DELTA_BINARY_PACKED => Encoding::DeltaBinaryPacked,
223
ParquetEncoding::DELTA_LENGTH_BYTE_ARRAY => Encoding::DeltaLengthByteArray,
224
ParquetEncoding::DELTA_BYTE_ARRAY => Encoding::DeltaByteArray,
225
ParquetEncoding::RLE_DICTIONARY => Encoding::RleDictionary,
226
ParquetEncoding::BYTE_STREAM_SPLIT => Encoding::ByteStreamSplit,
227
_ => return Err(ParquetError::oos("Thrift out of range")),
228
})
229
}
230
}
231
232
impl From<Encoding> for ParquetEncoding {
233
fn from(encoding: Encoding) -> Self {
234
match encoding {
235
Encoding::Plain => ParquetEncoding::PLAIN,
236
Encoding::PlainDictionary => ParquetEncoding::PLAIN_DICTIONARY,
237
Encoding::Rle => ParquetEncoding::RLE,
238
Encoding::BitPacked => ParquetEncoding::BIT_PACKED,
239
Encoding::DeltaBinaryPacked => ParquetEncoding::DELTA_BINARY_PACKED,
240
Encoding::DeltaLengthByteArray => ParquetEncoding::DELTA_LENGTH_BYTE_ARRAY,
241
Encoding::DeltaByteArray => ParquetEncoding::DELTA_BYTE_ARRAY,
242
Encoding::RleDictionary => ParquetEncoding::RLE_DICTIONARY,
243
Encoding::ByteStreamSplit => ParquetEncoding::BYTE_STREAM_SPLIT,
244
}
245
}
246
}
247
248
/// Enum to annotate whether lists of min/max elements inside ColumnIndex
249
/// are ordered and if so, in which direction.
250
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy, Default)]
251
pub enum BoundaryOrder {
252
#[default]
253
Unordered,
254
Ascending,
255
Descending,
256
}
257
258
impl TryFrom<ParquetBoundaryOrder> for BoundaryOrder {
259
type Error = ParquetError;
260
261
fn try_from(encoding: ParquetBoundaryOrder) -> Result<Self, Self::Error> {
262
Ok(match encoding {
263
ParquetBoundaryOrder::UNORDERED => BoundaryOrder::Unordered,
264
ParquetBoundaryOrder::ASCENDING => BoundaryOrder::Ascending,
265
ParquetBoundaryOrder::DESCENDING => BoundaryOrder::Descending,
266
_ => return Err(ParquetError::oos("BoundaryOrder Thrift value out of range")),
267
})
268
}
269
}
270
271
impl From<BoundaryOrder> for ParquetBoundaryOrder {
272
fn from(encoding: BoundaryOrder) -> Self {
273
match encoding {
274
BoundaryOrder::Unordered => ParquetBoundaryOrder::UNORDERED,
275
BoundaryOrder::Ascending => ParquetBoundaryOrder::ASCENDING,
276
BoundaryOrder::Descending => ParquetBoundaryOrder::DESCENDING,
277
}
278
}
279
}
280
281
pub trait DataPageHeaderExt {
282
fn encoding(&self) -> Encoding;
283
fn repetition_level_encoding(&self) -> Encoding;
284
fn definition_level_encoding(&self) -> Encoding;
285
}
286
287
impl DataPageHeaderExt for DataPageHeader {
288
fn encoding(&self) -> Encoding {
289
self.encoding.try_into().unwrap()
290
}
291
292
fn repetition_level_encoding(&self) -> Encoding {
293
self.repetition_level_encoding.try_into().unwrap()
294
}
295
296
fn definition_level_encoding(&self) -> Encoding {
297
self.definition_level_encoding.try_into().unwrap()
298
}
299
}
300
301
impl DataPageHeaderExt for DataPageHeaderV2 {
302
fn encoding(&self) -> Encoding {
303
self.encoding.try_into().unwrap()
304
}
305
306
fn repetition_level_encoding(&self) -> Encoding {
307
Encoding::Rle
308
}
309
310
fn definition_level_encoding(&self) -> Encoding {
311
Encoding::Rle
312
}
313
}
314
315
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
316
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
317
pub enum TimeUnit {
318
Milliseconds,
319
Microseconds,
320
Nanoseconds,
321
}
322
323
impl From<ParquetTimeUnit> for TimeUnit {
324
fn from(encoding: ParquetTimeUnit) -> Self {
325
match encoding {
326
ParquetTimeUnit::MILLIS(_) => TimeUnit::Milliseconds,
327
ParquetTimeUnit::MICROS(_) => TimeUnit::Microseconds,
328
ParquetTimeUnit::NANOS(_) => TimeUnit::Nanoseconds,
329
}
330
}
331
}
332
333
impl From<TimeUnit> for ParquetTimeUnit {
334
fn from(unit: TimeUnit) -> Self {
335
match unit {
336
TimeUnit::Milliseconds => ParquetTimeUnit::MILLIS(Default::default()),
337
TimeUnit::Microseconds => ParquetTimeUnit::MICROS(Default::default()),
338
TimeUnit::Nanoseconds => ParquetTimeUnit::NANOS(Default::default()),
339
}
340
}
341
}
342
343
/// Enum of all valid logical integer types
344
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
345
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
346
pub enum IntegerType {
347
Int8,
348
Int16,
349
Int32,
350
Int64,
351
UInt8,
352
UInt16,
353
UInt32,
354
UInt64,
355
}
356
357
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
358
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
359
pub enum PrimitiveLogicalType {
360
String,
361
Enum,
362
Decimal(usize, usize),
363
Date,
364
Time {
365
unit: TimeUnit,
366
is_adjusted_to_utc: bool,
367
},
368
Timestamp {
369
unit: TimeUnit,
370
is_adjusted_to_utc: bool,
371
},
372
Integer(IntegerType),
373
Unknown,
374
Json,
375
Bson,
376
Uuid,
377
Float16,
378
}
379
380
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
381
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
382
pub enum GroupLogicalType {
383
Map,
384
List,
385
}
386
387
impl From<GroupLogicalType> for ParquetLogicalType {
388
fn from(type_: GroupLogicalType) -> Self {
389
match type_ {
390
GroupLogicalType::Map => ParquetLogicalType::MAP(Default::default()),
391
GroupLogicalType::List => ParquetLogicalType::LIST(Default::default()),
392
}
393
}
394
}
395
396
impl From<(i32, bool)> for IntegerType {
397
fn from((bit_width, is_signed): (i32, bool)) -> Self {
398
match (bit_width, is_signed) {
399
(8, true) => IntegerType::Int8,
400
(16, true) => IntegerType::Int16,
401
(32, true) => IntegerType::Int32,
402
(64, true) => IntegerType::Int64,
403
(8, false) => IntegerType::UInt8,
404
(16, false) => IntegerType::UInt16,
405
(32, false) => IntegerType::UInt32,
406
(64, false) => IntegerType::UInt64,
407
// The above are the only possible annotations for parquet's int32. Anything else
408
// is a deviation to the parquet specification and we ignore
409
_ => IntegerType::Int32,
410
}
411
}
412
}
413
414
impl From<IntegerType> for (usize, bool) {
415
fn from(type_: IntegerType) -> (usize, bool) {
416
match type_ {
417
IntegerType::Int8 => (8, true),
418
IntegerType::Int16 => (16, true),
419
IntegerType::Int32 => (32, true),
420
IntegerType::Int64 => (64, true),
421
IntegerType::UInt8 => (8, false),
422
IntegerType::UInt16 => (16, false),
423
IntegerType::UInt32 => (32, false),
424
IntegerType::UInt64 => (64, false),
425
}
426
}
427
}
428
429
impl TryFrom<ParquetLogicalType> for PrimitiveLogicalType {
430
type Error = ParquetError;
431
432
fn try_from(type_: ParquetLogicalType) -> Result<Self, Self::Error> {
433
Ok(match type_ {
434
ParquetLogicalType::STRING(_) => PrimitiveLogicalType::String,
435
ParquetLogicalType::ENUM(_) => PrimitiveLogicalType::Enum,
436
ParquetLogicalType::DECIMAL(decimal) => PrimitiveLogicalType::Decimal(
437
decimal.precision.try_into()?,
438
decimal.scale.try_into()?,
439
),
440
ParquetLogicalType::DATE(_) => PrimitiveLogicalType::Date,
441
ParquetLogicalType::TIME(time) => PrimitiveLogicalType::Time {
442
unit: time.unit.into(),
443
is_adjusted_to_utc: time.is_adjusted_to_u_t_c,
444
},
445
ParquetLogicalType::TIMESTAMP(time) => PrimitiveLogicalType::Timestamp {
446
unit: time.unit.into(),
447
is_adjusted_to_utc: time.is_adjusted_to_u_t_c,
448
},
449
ParquetLogicalType::INTEGER(int) => {
450
PrimitiveLogicalType::Integer((int.bit_width as i32, int.is_signed).into())
451
},
452
ParquetLogicalType::UNKNOWN(_) => PrimitiveLogicalType::Unknown,
453
ParquetLogicalType::JSON(_) => PrimitiveLogicalType::Json,
454
ParquetLogicalType::BSON(_) => PrimitiveLogicalType::Bson,
455
ParquetLogicalType::UUID(_) => PrimitiveLogicalType::Uuid,
456
ParquetLogicalType::FLOAT16(_) => PrimitiveLogicalType::Float16,
457
_ => return Err(ParquetError::oos("LogicalType value out of range")),
458
})
459
}
460
}
461
462
impl TryFrom<ParquetLogicalType> for GroupLogicalType {
463
type Error = ParquetError;
464
465
fn try_from(type_: ParquetLogicalType) -> Result<Self, Self::Error> {
466
Ok(match type_ {
467
ParquetLogicalType::LIST(_) => GroupLogicalType::List,
468
ParquetLogicalType::MAP(_) => GroupLogicalType::Map,
469
_ => return Err(ParquetError::oos("LogicalType value out of range")),
470
})
471
}
472
}
473
474
impl From<PrimitiveLogicalType> for ParquetLogicalType {
475
fn from(type_: PrimitiveLogicalType) -> Self {
476
match type_ {
477
PrimitiveLogicalType::String => ParquetLogicalType::STRING(Default::default()),
478
PrimitiveLogicalType::Enum => ParquetLogicalType::ENUM(Default::default()),
479
PrimitiveLogicalType::Decimal(precision, scale) => {
480
ParquetLogicalType::DECIMAL(DecimalType {
481
precision: precision as i32,
482
scale: scale as i32,
483
})
484
},
485
PrimitiveLogicalType::Date => ParquetLogicalType::DATE(Default::default()),
486
PrimitiveLogicalType::Time {
487
unit,
488
is_adjusted_to_utc,
489
} => ParquetLogicalType::TIME(TimeType {
490
unit: unit.into(),
491
is_adjusted_to_u_t_c: is_adjusted_to_utc,
492
}),
493
PrimitiveLogicalType::Timestamp {
494
unit,
495
is_adjusted_to_utc,
496
} => ParquetLogicalType::TIMESTAMP(TimestampType {
497
unit: unit.into(),
498
is_adjusted_to_u_t_c: is_adjusted_to_utc,
499
}),
500
PrimitiveLogicalType::Integer(integer) => {
501
let (bit_width, is_signed) = integer.into();
502
ParquetLogicalType::INTEGER(IntType {
503
bit_width: bit_width as i8,
504
is_signed,
505
})
506
},
507
PrimitiveLogicalType::Unknown => ParquetLogicalType::UNKNOWN(Default::default()),
508
PrimitiveLogicalType::Json => ParquetLogicalType::JSON(Default::default()),
509
PrimitiveLogicalType::Bson => ParquetLogicalType::BSON(Default::default()),
510
PrimitiveLogicalType::Uuid => ParquetLogicalType::UUID(Default::default()),
511
PrimitiveLogicalType::Float16 => ParquetLogicalType::FLOAT16(Default::default()),
512
}
513
}
514
}
515
516
#[cfg(test)]
517
mod tests {
518
use super::*;
519
520
#[test]
521
fn round_trip_primitive() -> Result<(), ParquetError> {
522
use PrimitiveLogicalType::*;
523
let a = vec![
524
String,
525
Enum,
526
Decimal(3, 1),
527
Date,
528
Time {
529
unit: TimeUnit::Milliseconds,
530
is_adjusted_to_utc: true,
531
},
532
Timestamp {
533
unit: TimeUnit::Milliseconds,
534
is_adjusted_to_utc: true,
535
},
536
Integer(IntegerType::Int16),
537
Unknown,
538
Json,
539
Bson,
540
Uuid,
541
];
542
for a in a {
543
let c: ParquetLogicalType = a.into();
544
let e: PrimitiveLogicalType = c.try_into()?;
545
assert_eq!(e, a);
546
}
547
Ok(())
548
}
549
550
#[test]
551
fn round_trip_encoding() -> Result<(), ParquetError> {
552
use Encoding::*;
553
let a = vec![
554
Plain,
555
PlainDictionary,
556
Rle,
557
BitPacked,
558
DeltaBinaryPacked,
559
DeltaLengthByteArray,
560
DeltaByteArray,
561
RleDictionary,
562
ByteStreamSplit,
563
];
564
for a in a {
565
let c: ParquetEncoding = a.into();
566
let e: Encoding = c.try_into()?;
567
assert_eq!(e, a);
568
}
569
Ok(())
570
}
571
572
#[test]
573
fn round_compression() -> Result<(), ParquetError> {
574
use Compression::*;
575
let a = vec![Uncompressed, Snappy, Gzip, Lzo, Brotli, Lz4, Zstd, Lz4Raw];
576
for a in a {
577
let c: CompressionCodec = a.into();
578
let e: Compression = c.try_into()?;
579
assert_eq!(e, a);
580
}
581
Ok(())
582
}
583
}
584
585