Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/simple.rs
8509 views
1
use arrow::array::{Array, BinaryViewArray, FixedSizeBinaryArray, PrimitiveArray, StructArray};
2
use arrow::bitmap::Bitmap;
3
use arrow::datatypes::{
4
ArrowDataType, DTYPE_CATEGORICAL_LEGACY, DTYPE_CATEGORICAL_NEW, DTYPE_ENUM_VALUES_LEGACY,
5
DTYPE_ENUM_VALUES_NEW, Field, IntegerType, IntervalUnit, TimeUnit,
6
};
7
use arrow::types::{days_ms, i256};
8
use ethnum::I256;
9
use polars_compute::cast::CastOptionsImpl;
10
use polars_utils::float16::pf16;
11
use polars_utils::pl_str::PlSmallStr;
12
13
use super::utils::filter::Filter;
14
use super::{
15
BasicDecompressor, InitNested, NestedState, boolean, fixed_size_binary, null, primitive,
16
};
17
use crate::parquet::error::ParquetResult;
18
use crate::parquet::schema::types::{
19
PhysicalType, PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit,
20
};
21
use crate::parquet::types::int96_to_i64_ns;
22
use crate::read::ParquetError;
23
use crate::read::deserialize::categorical::CategoricalDecoder;
24
use crate::read::deserialize::utils::PageDecoder;
25
use crate::read::deserialize::{binary, binview};
26
27
/// An iterator adapter that maps an iterator of Pages a boxed [`Array`] of [`ArrowDataType`]
28
/// `dtype` with a maximum of `num_rows` elements.
29
pub fn page_iter_to_array(
30
pages: BasicDecompressor,
31
type_: &PrimitiveType,
32
field: Field,
33
filter: Option<Filter>,
34
init_nested: Option<Vec<InitNested>>,
35
) -> ParquetResult<(Option<NestedState>, Vec<Box<dyn Array>>, Bitmap)> {
36
use ArrowDataType::*;
37
38
let physical_type = &type_.physical_type;
39
let logical_type = &type_.logical_type;
40
let is_pl_empty_struct = field.is_pl_pq_empty_struct();
41
let dtype = field.dtype;
42
43
Ok(match (physical_type, dtype.to_storage()) {
44
(_, Null) => PageDecoder::new(&field.name, pages, dtype, null::NullDecoder, init_nested)?
45
.collect_boxed(filter)?,
46
47
// Empty structs are roundtrippable by mapping to Boolean array.
48
(PhysicalType::Boolean, Struct(fs)) if fs.is_empty() && is_pl_empty_struct => {
49
let (nested, array, ptm) = PageDecoder::new(
50
&field.name,
51
pages,
52
ArrowDataType::Boolean,
53
boolean::BooleanDecoder,
54
init_nested,
55
)?
56
.collect(filter)?;
57
58
let array = array
59
.into_iter()
60
.map(|mut array| {
61
StructArray::new(
62
dtype.clone(),
63
array.len(),
64
Vec::new(),
65
array.take_validity(),
66
)
67
.to_boxed()
68
})
69
.collect::<Vec<Box<dyn Array>>>();
70
71
(nested, array, ptm)
72
},
73
(PhysicalType::Boolean, Boolean) => PageDecoder::new(
74
&field.name,
75
pages,
76
dtype,
77
boolean::BooleanDecoder,
78
init_nested,
79
)?
80
.collect_boxed(filter)?,
81
(PhysicalType::Int32, UInt8) => PageDecoder::new(
82
&field.name,
83
pages,
84
dtype,
85
primitive::IntDecoder::<i32, u8, _>::cast_as(),
86
init_nested,
87
)?
88
.collect_boxed(filter)?,
89
(PhysicalType::Int32, UInt16) => PageDecoder::new(
90
&field.name,
91
pages,
92
dtype,
93
primitive::IntDecoder::<i32, u16, _>::cast_as(),
94
init_nested,
95
)?
96
.collect_boxed(filter)?,
97
(PhysicalType::Int32, UInt32) => PageDecoder::new(
98
&field.name,
99
pages,
100
dtype,
101
primitive::IntDecoder::<i32, u32, _>::cast_as(),
102
init_nested,
103
)?
104
.collect_boxed(filter)?,
105
(PhysicalType::Int64, UInt32) => PageDecoder::new(
106
&field.name,
107
pages,
108
dtype,
109
primitive::IntDecoder::<i64, u32, _>::cast_as(),
110
init_nested,
111
)?
112
.collect_boxed(filter)?,
113
(PhysicalType::Int32, Int8) => PageDecoder::new(
114
&field.name,
115
pages,
116
dtype,
117
primitive::IntDecoder::<i32, i8, _>::cast_as(),
118
init_nested,
119
)?
120
.collect_boxed(filter)?,
121
(PhysicalType::Int32, Int16) => PageDecoder::new(
122
&field.name,
123
pages,
124
dtype,
125
primitive::IntDecoder::<i32, i16, _>::cast_as(),
126
init_nested,
127
)?
128
.collect_boxed(filter)?,
129
(PhysicalType::Int32, Int32 | Date32 | Time32(_)) => PageDecoder::new(
130
&field.name,
131
pages,
132
dtype,
133
primitive::IntDecoder::<i32, _, _>::unit(),
134
init_nested,
135
)?
136
.collect_boxed(filter)?,
137
(PhysicalType::Int64 | PhysicalType::Int96, Timestamp(time_unit, _)) => {
138
let time_unit = *time_unit;
139
return timestamp(
140
&field.name,
141
pages,
142
physical_type,
143
logical_type,
144
dtype,
145
filter,
146
time_unit,
147
init_nested,
148
);
149
},
150
(PhysicalType::FixedLenByteArray(_), FixedSizeBinary(_)) => {
151
let size = FixedSizeBinaryArray::get_size(&dtype);
152
153
PageDecoder::new(
154
&field.name,
155
pages,
156
dtype,
157
fixed_size_binary::BinaryDecoder { size },
158
init_nested,
159
)?
160
.collect_boxed(filter)?
161
},
162
(PhysicalType::FixedLenByteArray(12), Interval(IntervalUnit::YearMonth)) => {
163
// @TODO: Make a separate decoder for this
164
165
let n = 12;
166
let (nested, array, ptm) = PageDecoder::new(
167
&field.name,
168
pages,
169
ArrowDataType::FixedSizeBinary(n),
170
fixed_size_binary::BinaryDecoder { size: n },
171
init_nested,
172
)?
173
.collect(filter)?;
174
175
let array = array
176
.into_iter()
177
.map(|array| {
178
let values = array
179
.values()
180
.chunks_exact(n)
181
.map(|value: &[u8]| i32::from_le_bytes(value[..4].try_into().unwrap()))
182
.collect::<Vec<_>>();
183
let validity = array.validity().cloned();
184
Ok(
185
PrimitiveArray::<i32>::try_new(dtype.clone(), values.into(), validity)?
186
.to_boxed(),
187
)
188
})
189
.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;
190
191
(nested, array, ptm)
192
},
193
(PhysicalType::FixedLenByteArray(12), Interval(IntervalUnit::DayTime)) => {
194
// @TODO: Make a separate decoder for this
195
196
let n = 12;
197
let (nested, array, ptm) = PageDecoder::new(
198
&field.name,
199
pages,
200
ArrowDataType::FixedSizeBinary(n),
201
fixed_size_binary::BinaryDecoder { size: n },
202
init_nested,
203
)?
204
.collect(filter)?;
205
206
let array = array
207
.into_iter()
208
.map(|array| {
209
let values = array
210
.values()
211
.chunks_exact(n)
212
.map(super::super::convert_days_ms)
213
.collect::<Vec<_>>();
214
let validity = array.validity().cloned();
215
Ok(
216
PrimitiveArray::<days_ms>::try_new(dtype.clone(), values.into(), validity)?
217
.to_boxed(),
218
)
219
})
220
.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;
221
222
(nested, array, ptm)
223
},
224
(PhysicalType::FixedLenByteArray(12), Interval(IntervalUnit::MonthDayMillis)) => {
225
// @TODO: Make a separate decoder for this
226
227
const N_BYTES: usize = 12;
228
let (nested, array, ptm) = PageDecoder::new(
229
&field.name,
230
pages,
231
ArrowDataType::FixedSizeBinary(N_BYTES),
232
fixed_size_binary::BinaryDecoder { size: N_BYTES },
233
init_nested,
234
)?
235
.collect(filter)?;
236
237
let out = array
238
.into_iter()
239
.map(|arr| convert_interval_bytes_to_month_day_nano_struct(arr).boxed())
240
.collect();
241
242
(nested, out, ptm)
243
},
244
(PhysicalType::FixedLenByteArray(16), UInt128) => {
245
let n = 16;
246
let (nested, array, ptm) = PageDecoder::new(
247
&field.name,
248
pages,
249
ArrowDataType::FixedSizeBinary(n),
250
fixed_size_binary::BinaryDecoder { size: n },
251
init_nested,
252
)?
253
.collect(filter)?;
254
255
let array = array
256
.into_iter()
257
.map(|array| {
258
let (_, values, validity) = array.into_inner();
259
let values = values.try_transmute().expect(
260
"this should work since the parquet decoder has alignment constraints",
261
);
262
Ok(
263
PrimitiveArray::<u128>::try_new(dtype.clone(), values, validity)?
264
.to_boxed(),
265
)
266
})
267
.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;
268
269
(nested, array, ptm)
270
},
271
(PhysicalType::FixedLenByteArray(16), Int128) => {
272
let n = 16;
273
let (nested, array, ptm) = PageDecoder::new(
274
&field.name,
275
pages,
276
ArrowDataType::FixedSizeBinary(n),
277
fixed_size_binary::BinaryDecoder { size: n },
278
init_nested,
279
)?
280
.collect(filter)?;
281
282
let array = array
283
.into_iter()
284
.map(|array| {
285
let (_, values, validity) = array.into_inner();
286
let values = values.try_transmute().expect(
287
"this should work since the parquet decoder has alignment constraints",
288
);
289
Ok(
290
PrimitiveArray::<i128>::try_new(dtype.clone(), values, validity)?
291
.to_boxed(),
292
)
293
})
294
.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;
295
296
(nested, array, ptm)
297
},
298
(PhysicalType::Int32, Decimal(_, _)) => PageDecoder::new(
299
&field.name,
300
pages,
301
dtype,
302
primitive::IntDecoder::<i32, i128, _>::cast_into(),
303
init_nested,
304
)?
305
.collect_boxed(filter)?,
306
(PhysicalType::Int64, Decimal(_, _)) => PageDecoder::new(
307
&field.name,
308
pages,
309
dtype,
310
primitive::IntDecoder::<i64, i128, _>::cast_into(),
311
init_nested,
312
)?
313
.collect_boxed(filter)?,
314
(PhysicalType::FixedLenByteArray(n), Decimal(_, _)) if *n > 16 => {
315
return Err(ParquetError::not_supported(format!(
316
"not implemented: can't decode Decimal128 type from Fixed Size Byte Array of len {n:?}"
317
)));
318
},
319
(PhysicalType::FixedLenByteArray(n), Decimal(_, _)) => {
320
// @TODO: Make a separate decoder for this
321
322
let n = *n;
323
324
let (nested, array, ptm) = PageDecoder::new(
325
&field.name,
326
pages,
327
ArrowDataType::FixedSizeBinary(n),
328
fixed_size_binary::BinaryDecoder { size: n },
329
init_nested,
330
)?
331
.collect(filter)?;
332
333
let array = array
334
.into_iter()
335
.map(|array| {
336
let values = array
337
.values()
338
.chunks_exact(n)
339
.map(|value: &[u8]| super::super::convert_i128(value, n))
340
.collect::<Vec<_>>();
341
let validity = array.validity().cloned();
342
Ok(
343
PrimitiveArray::<i128>::try_new(dtype.clone(), values.into(), validity)?
344
.to_boxed(),
345
)
346
})
347
.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;
348
349
(nested, array, ptm)
350
},
351
(PhysicalType::ByteArray, Decimal(_, _)) => {
352
// @TODO: Make a separate decoder for this
353
354
let (nested, array, ptm) = PageDecoder::new(
355
&field.name,
356
pages,
357
ArrowDataType::BinaryView,
358
binview::BinViewDecoder::new(false),
359
init_nested,
360
)?
361
.collect(filter)?;
362
363
let array = array
364
.into_iter()
365
.map(|array| {
366
let array = array.as_any().downcast_ref::<BinaryViewArray>().unwrap();
367
let values = array
368
.values_iter()
369
.map(|value: &[u8]| {
370
if value.len() <= 16 {
371
Ok(super::super::convert_i128(value, value.len()))
372
} else {
373
Err(ParquetError::OutOfSpec(
374
"value has too many bytes for Decimal128".to_string(),
375
))
376
}
377
})
378
.collect::<ParquetResult<Vec<_>>>()?;
379
let validity = array.validity().cloned();
380
Ok(
381
PrimitiveArray::<i128>::try_new(dtype.clone(), values.into(), validity)?
382
.to_boxed(),
383
)
384
})
385
.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;
386
387
(nested, array, ptm)
388
},
389
(PhysicalType::Int32, Decimal256(_, _)) => PageDecoder::new(
390
&field.name,
391
pages,
392
dtype,
393
primitive::IntDecoder::closure(|x: i32| i256(I256::new(x as i128))),
394
init_nested,
395
)?
396
.collect_boxed(filter)?,
397
(PhysicalType::Int64, Decimal256(_, _)) => PageDecoder::new(
398
&field.name,
399
pages,
400
dtype,
401
primitive::IntDecoder::closure(|x: i64| i256(I256::new(x as i128))),
402
init_nested,
403
)?
404
.collect_boxed(filter)?,
405
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 16 => {
406
// @TODO: Make a separate decoder for this
407
408
let n = *n;
409
410
let (nested, array, ptm) = PageDecoder::new(
411
&field.name,
412
pages,
413
ArrowDataType::FixedSizeBinary(n),
414
fixed_size_binary::BinaryDecoder { size: n },
415
init_nested,
416
)?
417
.collect(filter)?;
418
419
let array = array
420
.into_iter()
421
.map(|array| {
422
let values = array
423
.values()
424
.chunks_exact(n)
425
.map(|value: &[u8]| i256(I256::new(super::super::convert_i128(value, n))))
426
.collect::<Vec<_>>();
427
let validity = array.validity().cloned();
428
Ok(
429
PrimitiveArray::<i256>::try_new(dtype.clone(), values.into(), validity)?
430
.to_boxed(),
431
)
432
})
433
.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;
434
435
(nested, array, ptm)
436
},
437
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 32 => {
438
// @TODO: Make a separate decoder for this
439
440
let n = *n;
441
442
let (nested, array, ptm) = PageDecoder::new(
443
&field.name,
444
pages,
445
ArrowDataType::FixedSizeBinary(n),
446
fixed_size_binary::BinaryDecoder { size: n },
447
init_nested,
448
)?
449
.collect(filter)?;
450
451
let array = array
452
.into_iter()
453
.map(|array| {
454
let values = array
455
.values()
456
.chunks_exact(n)
457
.map(super::super::convert_i256)
458
.collect::<Vec<_>>();
459
let validity = array.validity().cloned();
460
Ok(
461
PrimitiveArray::<i256>::try_new(dtype.clone(), values.into(), validity)?
462
.to_boxed(),
463
)
464
})
465
.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;
466
467
(nested, array, ptm)
468
},
469
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 32 => {
470
return Err(ParquetError::not_supported(format!(
471
"Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}",
472
)));
473
},
474
(PhysicalType::Int32, Date64) => PageDecoder::new(
475
&field.name,
476
pages,
477
dtype,
478
primitive::IntDecoder::closure(|x: i32| i64::from(x) * 86400000),
479
init_nested,
480
)?
481
.collect_boxed(filter)?,
482
(PhysicalType::Int64, Date64) => PageDecoder::new(
483
&field.name,
484
pages,
485
dtype,
486
primitive::IntDecoder::<i64, _, _>::unit(),
487
init_nested,
488
)?
489
.collect_boxed(filter)?,
490
(PhysicalType::Int64, Int64 | Time64(_) | Duration(_)) => PageDecoder::new(
491
&field.name,
492
pages,
493
dtype,
494
primitive::IntDecoder::<i64, _, _>::unit(),
495
init_nested,
496
)?
497
.collect_boxed(filter)?,
498
499
(PhysicalType::Int64, UInt64) => PageDecoder::new(
500
&field.name,
501
pages,
502
dtype,
503
primitive::IntDecoder::<i64, u64, _>::cast_as(),
504
init_nested,
505
)?
506
.collect_boxed(filter)?,
507
508
(PhysicalType::FixedLenByteArray(2), Float16) => PageDecoder::new(
509
&field.name,
510
pages,
511
dtype,
512
primitive::FloatDecoder::<pf16, _, _>::unit(),
513
init_nested,
514
)?
515
.collect_boxed(filter)?,
516
(PhysicalType::Float, Float32) => PageDecoder::new(
517
&field.name,
518
pages,
519
dtype,
520
primitive::FloatDecoder::<f32, _, _>::unit(),
521
init_nested,
522
)?
523
.collect_boxed(filter)?,
524
(PhysicalType::Double, Float64) => PageDecoder::new(
525
&field.name,
526
pages,
527
dtype,
528
primitive::FloatDecoder::<f64, _, _>::unit(),
529
init_nested,
530
)?
531
.collect_boxed(filter)?,
532
// Decoder for BinaryOffset
533
(PhysicalType::ByteArray, LargeBinary) => PageDecoder::new(
534
&field.name,
535
pages,
536
dtype,
537
binary::BinaryDecoder,
538
init_nested,
539
)?
540
.collect_boxed(filter)?,
541
// Don't compile this code with `i32` as we don't use this in polars
542
(PhysicalType::ByteArray, LargeUtf8) => {
543
let is_string = matches!(dtype, LargeUtf8);
544
PageDecoder::new(
545
&field.name,
546
pages,
547
dtype,
548
binview::BinViewDecoder::new(is_string),
549
init_nested,
550
)?
551
.collect_boxed(filter)?
552
},
553
(_, Binary | Utf8) => unreachable!(),
554
(PhysicalType::ByteArray, BinaryView | Utf8View) => {
555
let is_string = matches!(dtype, Utf8View);
556
PageDecoder::new(
557
&field.name,
558
pages,
559
dtype,
560
binview::BinViewDecoder::new(is_string),
561
init_nested,
562
)?
563
.collect_boxed(filter)?
564
},
565
(_, Dictionary(key_type, value_type, _)) => {
566
// @NOTE: This should only hit in two cases:
567
// - Polars enum's and categorical's
568
// - Int -> String which can be turned into categoricals
569
assert_eq!(value_type.as_ref(), &ArrowDataType::Utf8View);
570
571
if field.metadata.is_some_and(|md| {
572
md.contains_key(DTYPE_ENUM_VALUES_LEGACY)
573
|| md.contains_key(DTYPE_ENUM_VALUES_NEW)
574
|| md.contains_key(DTYPE_CATEGORICAL_NEW)
575
|| md.contains_key(DTYPE_CATEGORICAL_LEGACY)
576
}) && matches!(
577
key_type,
578
IntegerType::UInt8 | IntegerType::UInt16 | IntegerType::UInt32
579
) {
580
match key_type {
581
IntegerType::UInt8 => PageDecoder::new(
582
&field.name,
583
pages,
584
dtype,
585
CategoricalDecoder::<u8>::new(),
586
init_nested,
587
)?
588
.collect_boxed(filter)?,
589
IntegerType::UInt16 => PageDecoder::new(
590
&field.name,
591
pages,
592
dtype,
593
CategoricalDecoder::<u16>::new(),
594
init_nested,
595
)?
596
.collect_boxed(filter)?,
597
IntegerType::UInt32 => PageDecoder::new(
598
&field.name,
599
pages,
600
dtype,
601
CategoricalDecoder::<u32>::new(),
602
init_nested,
603
)?
604
.collect_boxed(filter)?,
605
_ => unreachable!(),
606
}
607
} else {
608
let (nested, array, ptm) = PageDecoder::new(
609
&field.name,
610
pages,
611
ArrowDataType::Utf8View,
612
binview::BinViewDecoder::new_string(),
613
init_nested,
614
)?
615
.collect(filter)?;
616
617
let array = array
618
.into_iter()
619
.map(|array| {
620
polars_compute::cast::cast(
621
array.as_ref(),
622
&dtype,
623
CastOptionsImpl::default(),
624
)
625
.unwrap()
626
})
627
.collect();
628
629
(nested, array, ptm)
630
}
631
},
632
(from, to) => {
633
return Err(ParquetError::not_supported(format!(
634
"reading parquet type {from:?} to {to:?} still not implemented",
635
)));
636
},
637
})
638
}
639
640
/// Unify the timestamp unit from parquet TimeUnit into arrow's TimeUnit
641
/// Returns (a int64 factor, is_multiplier)
642
fn unify_timestamp_unit(
643
logical_type: &Option<PrimitiveLogicalType>,
644
time_unit: TimeUnit,
645
) -> (i64, bool) {
646
if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = logical_type {
647
match (*unit, time_unit) {
648
(ParquetTimeUnit::Milliseconds, TimeUnit::Millisecond)
649
| (ParquetTimeUnit::Microseconds, TimeUnit::Microsecond)
650
| (ParquetTimeUnit::Nanoseconds, TimeUnit::Nanosecond) => (1, true),
651
652
(ParquetTimeUnit::Milliseconds, TimeUnit::Second)
653
| (ParquetTimeUnit::Microseconds, TimeUnit::Millisecond)
654
| (ParquetTimeUnit::Nanoseconds, TimeUnit::Microsecond) => (1000, false),
655
656
(ParquetTimeUnit::Microseconds, TimeUnit::Second)
657
| (ParquetTimeUnit::Nanoseconds, TimeUnit::Millisecond) => (1_000_000, false),
658
659
(ParquetTimeUnit::Nanoseconds, TimeUnit::Second) => (1_000_000_000, false),
660
661
(ParquetTimeUnit::Milliseconds, TimeUnit::Microsecond)
662
| (ParquetTimeUnit::Microseconds, TimeUnit::Nanosecond) => (1_000, true),
663
664
(ParquetTimeUnit::Milliseconds, TimeUnit::Nanosecond) => (1_000_000, true),
665
}
666
} else {
667
(1, true)
668
}
669
}
670
671
#[inline]
672
pub fn int96_to_i64_us(value: [u32; 3]) -> i64 {
673
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
674
const SECONDS_PER_DAY: i64 = 86_400;
675
const MICROS_PER_SECOND: i64 = 1_000_000;
676
677
let day = value[2] as i64;
678
let microseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000;
679
let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
680
681
seconds * MICROS_PER_SECOND + microseconds
682
}
683
684
#[inline]
685
pub fn int96_to_i64_ms(value: [u32; 3]) -> i64 {
686
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
687
const SECONDS_PER_DAY: i64 = 86_400;
688
const MILLIS_PER_SECOND: i64 = 1_000;
689
690
let day = value[2] as i64;
691
let milliseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000;
692
let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
693
694
seconds * MILLIS_PER_SECOND + milliseconds
695
}
696
697
#[inline]
698
pub fn int96_to_i64_s(value: [u32; 3]) -> i64 {
699
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
700
const SECONDS_PER_DAY: i64 = 86_400;
701
702
let day = value[2] as i64;
703
let seconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000_000;
704
let day_seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
705
706
day_seconds + seconds
707
}
708
709
#[expect(clippy::too_many_arguments)]
710
fn timestamp(
711
field_name: &str,
712
pages: BasicDecompressor,
713
physical_type: &PhysicalType,
714
logical_type: &Option<PrimitiveLogicalType>,
715
dtype: ArrowDataType,
716
filter: Option<Filter>,
717
time_unit: TimeUnit,
718
nested: Option<Vec<InitNested>>,
719
) -> ParquetResult<(Option<NestedState>, Vec<Box<dyn Array>>, Bitmap)> {
720
if physical_type == &PhysicalType::Int96 {
721
return match time_unit {
722
TimeUnit::Nanosecond => PageDecoder::new(
723
field_name,
724
pages,
725
dtype,
726
primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_ns(x)),
727
nested,
728
)?
729
.collect_boxed(filter),
730
TimeUnit::Microsecond => PageDecoder::new(
731
field_name,
732
pages,
733
dtype,
734
primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_us(x)),
735
nested,
736
)?
737
.collect_boxed(filter),
738
TimeUnit::Millisecond => PageDecoder::new(
739
field_name,
740
pages,
741
dtype,
742
primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_ms(x)),
743
nested,
744
)?
745
.collect_boxed(filter),
746
TimeUnit::Second => PageDecoder::new(
747
field_name,
748
pages,
749
dtype,
750
primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_s(x)),
751
nested,
752
)?
753
.collect_boxed(filter),
754
};
755
};
756
757
if physical_type != &PhysicalType::Int64 {
758
return Err(ParquetError::not_supported(
759
"can't decode a timestamp from a non-int64 parquet type",
760
));
761
}
762
763
let (factor, is_multiplier) = unify_timestamp_unit(logical_type, time_unit);
764
match (factor, is_multiplier) {
765
(1, _) => PageDecoder::new(
766
field_name,
767
pages,
768
dtype,
769
primitive::IntDecoder::<i64, _, _>::unit(),
770
nested,
771
)?
772
.collect_boxed(filter),
773
(a, true) => PageDecoder::new(
774
field_name,
775
pages,
776
dtype,
777
primitive::IntDecoder::closure(|x: i64| x * a),
778
nested,
779
)?
780
.collect_boxed(filter),
781
(a, false) => PageDecoder::new(
782
field_name,
783
pages,
784
dtype,
785
primitive::IntDecoder::closure(|x: i64| x / a),
786
nested,
787
)?
788
.collect_boxed(filter),
789
}
790
}
791
792
/// Converts directly from Parquet INTERVAL to Struct.
793
fn convert_interval_bytes_to_month_day_nano_struct(
794
month_day_millis_bytes: FixedSizeBinaryArray,
795
) -> StructArray {
796
const ROW_WIDTH: usize = 12;
797
798
let bytes: &[u8] = month_day_millis_bytes.values();
799
let output_length = bytes.len() / ROW_WIDTH;
800
801
assert_eq!(bytes.len(), output_length * ROW_WIDTH);
802
803
let (months_out, days_out, nanoseconds_out): (Vec<i32>, Vec<i32>, Vec<i64>) = (0
804
..output_length)
805
.map(|i| {
806
let bytes: [u8; ROW_WIDTH] =
807
unsafe { bytes.get_unchecked(i * ROW_WIDTH..(i + 1) * ROW_WIDTH) }
808
.try_into()
809
.unwrap();
810
811
let months: i32 = i32::from_le_bytes(bytes[..4].try_into().unwrap());
812
let days: i32 = i32::from_le_bytes(bytes[4..8].try_into().unwrap());
813
let nanoseconds: i64 = (i32::from_le_bytes(bytes[8..12].try_into().unwrap()) as i64)
814
.checked_mul(1_000_000i64) // Convert milliseconds to nanoseconds.
815
.unwrap();
816
817
(months, days, nanoseconds)
818
})
819
.collect();
820
821
let struct_fields = vec![
822
Field::new(
823
PlSmallStr::from_static("months"),
824
ArrowDataType::Int32,
825
true,
826
),
827
Field::new(PlSmallStr::from_static("days"), ArrowDataType::Int32, true),
828
Field::new(
829
PlSmallStr::from_static("nanoseconds"),
830
ArrowDataType::Duration(TimeUnit::Nanosecond),
831
true,
832
),
833
];
834
835
let struct_value_arrays = vec![
836
PrimitiveArray::<i32>::from_vec(months_out).boxed(),
837
PrimitiveArray::<i32>::from_vec(days_out).boxed(),
838
PrimitiveArray::<i64>::try_new(
839
ArrowDataType::Duration(TimeUnit::Nanosecond),
840
nanoseconds_out.into(),
841
None,
842
)
843
.unwrap()
844
.boxed(),
845
];
846
847
StructArray::new(
848
ArrowDataType::Struct(struct_fields),
849
output_length,
850
struct_value_arrays,
851
month_day_millis_bytes.validity().cloned(),
852
)
853
}
854
855