Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/simple.rs
6940 views
1
use arrow::array::{Array, FixedSizeBinaryArray, PrimitiveArray};
2
use arrow::bitmap::Bitmap;
3
use arrow::datatypes::{
4
ArrowDataType, DTYPE_CATEGORICAL_LEGACY, DTYPE_CATEGORICAL_NEW, DTYPE_ENUM_VALUES_LEGACY,
5
DTYPE_ENUM_VALUES_NEW, Field, IntegerType, IntervalUnit, TimeUnit,
6
};
7
use arrow::types::{NativeType, days_ms, i256};
8
use ethnum::I256;
9
use polars_compute::cast::CastOptionsImpl;
10
11
use super::utils::filter::Filter;
12
use super::{
13
BasicDecompressor, InitNested, NestedState, boolean, fixed_size_binary, null, primitive,
14
};
15
use crate::parquet::error::ParquetResult;
16
use crate::parquet::schema::types::{
17
PhysicalType, PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit,
18
};
19
use crate::parquet::types::int96_to_i64_ns;
20
use crate::read::ParquetError;
21
use crate::read::deserialize::binview;
22
use crate::read::deserialize::categorical::CategoricalDecoder;
23
use crate::read::deserialize::utils::PageDecoder;
24
25
/// An iterator adapter that maps an iterator of Pages a boxed [`Array`] of [`ArrowDataType`]
26
/// `dtype` with a maximum of `num_rows` elements.
27
pub fn page_iter_to_array(
28
pages: BasicDecompressor,
29
type_: &PrimitiveType,
30
field: Field,
31
filter: Option<Filter>,
32
init_nested: Option<Vec<InitNested>>,
33
) -> ParquetResult<(Option<NestedState>, Box<dyn Array>, Bitmap)> {
34
use ArrowDataType::*;
35
36
let physical_type = &type_.physical_type;
37
let logical_type = &type_.logical_type;
38
let dtype = field.dtype;
39
40
Ok(match (physical_type, dtype.to_logical_type()) {
41
(_, Null) => PageDecoder::new(&field.name, pages, dtype, null::NullDecoder, init_nested)?
42
.collect_boxed(filter)?,
43
(PhysicalType::Boolean, Boolean) => PageDecoder::new(
44
&field.name,
45
pages,
46
dtype,
47
boolean::BooleanDecoder,
48
init_nested,
49
)?
50
.collect_boxed(filter)?,
51
(PhysicalType::Int32, UInt8) => PageDecoder::new(
52
&field.name,
53
pages,
54
dtype,
55
primitive::IntDecoder::<i32, u8, _>::cast_as(),
56
init_nested,
57
)?
58
.collect_boxed(filter)?,
59
(PhysicalType::Int32, UInt16) => PageDecoder::new(
60
&field.name,
61
pages,
62
dtype,
63
primitive::IntDecoder::<i32, u16, _>::cast_as(),
64
init_nested,
65
)?
66
.collect_boxed(filter)?,
67
(PhysicalType::Int32, UInt32) => PageDecoder::new(
68
&field.name,
69
pages,
70
dtype,
71
primitive::IntDecoder::<i32, u32, _>::cast_as(),
72
init_nested,
73
)?
74
.collect_boxed(filter)?,
75
(PhysicalType::Int64, UInt32) => PageDecoder::new(
76
&field.name,
77
pages,
78
dtype,
79
primitive::IntDecoder::<i64, u32, _>::cast_as(),
80
init_nested,
81
)?
82
.collect_boxed(filter)?,
83
(PhysicalType::Int32, Int8) => PageDecoder::new(
84
&field.name,
85
pages,
86
dtype,
87
primitive::IntDecoder::<i32, i8, _>::cast_as(),
88
init_nested,
89
)?
90
.collect_boxed(filter)?,
91
(PhysicalType::Int32, Int16) => PageDecoder::new(
92
&field.name,
93
pages,
94
dtype,
95
primitive::IntDecoder::<i32, i16, _>::cast_as(),
96
init_nested,
97
)?
98
.collect_boxed(filter)?,
99
(PhysicalType::Int32, Int32 | Date32 | Time32(_)) => PageDecoder::new(
100
&field.name,
101
pages,
102
dtype,
103
primitive::IntDecoder::<i32, _, _>::unit(),
104
init_nested,
105
)?
106
.collect_boxed(filter)?,
107
(PhysicalType::Int64 | PhysicalType::Int96, Timestamp(time_unit, _)) => {
108
let time_unit = *time_unit;
109
return timestamp(
110
&field.name,
111
pages,
112
physical_type,
113
logical_type,
114
dtype,
115
filter,
116
time_unit,
117
init_nested,
118
);
119
},
120
(PhysicalType::FixedLenByteArray(_), FixedSizeBinary(_)) => {
121
let size = FixedSizeBinaryArray::get_size(&dtype);
122
123
PageDecoder::new(
124
&field.name,
125
pages,
126
dtype,
127
fixed_size_binary::BinaryDecoder { size },
128
init_nested,
129
)?
130
.collect_boxed(filter)?
131
},
132
(PhysicalType::FixedLenByteArray(12), Interval(IntervalUnit::YearMonth)) => {
133
// @TODO: Make a separate decoder for this
134
135
let n = 12;
136
let (nested, array, ptm) = PageDecoder::new(
137
&field.name,
138
pages,
139
ArrowDataType::FixedSizeBinary(n),
140
fixed_size_binary::BinaryDecoder { size: n },
141
init_nested,
142
)?
143
.collect(filter)?;
144
145
let values = array
146
.values()
147
.chunks_exact(n)
148
.map(|value: &[u8]| i32::from_le_bytes(value[..4].try_into().unwrap()))
149
.collect::<Vec<_>>();
150
let validity = array.validity().cloned();
151
152
(
153
nested,
154
PrimitiveArray::<i32>::try_new(dtype.clone(), values.into(), validity)?.to_boxed(),
155
ptm,
156
)
157
},
158
(PhysicalType::FixedLenByteArray(12), Interval(IntervalUnit::DayTime)) => {
159
// @TODO: Make a separate decoder for this
160
161
let n = 12;
162
let (nested, array, ptm) = PageDecoder::new(
163
&field.name,
164
pages,
165
ArrowDataType::FixedSizeBinary(n),
166
fixed_size_binary::BinaryDecoder { size: n },
167
init_nested,
168
)?
169
.collect(filter)?;
170
171
let values = array
172
.values()
173
.chunks_exact(n)
174
.map(super::super::convert_days_ms)
175
.collect::<Vec<_>>();
176
let validity = array.validity().cloned();
177
178
(
179
nested,
180
PrimitiveArray::<days_ms>::try_new(dtype.clone(), values.into(), validity)?
181
.to_boxed(),
182
ptm,
183
)
184
},
185
(PhysicalType::FixedLenByteArray(16), Int128) => {
186
let n = 16;
187
let (nested, array, ptm) = PageDecoder::new(
188
&field.name,
189
pages,
190
ArrowDataType::FixedSizeBinary(n),
191
fixed_size_binary::BinaryDecoder { size: n },
192
init_nested,
193
)?
194
.collect(filter)?;
195
196
let (_, values, validity) = array.into_inner();
197
let values = values
198
.try_transmute()
199
.expect("this should work since the parquet decoder has alignment constraints");
200
201
(
202
nested,
203
PrimitiveArray::<i128>::try_new(dtype.clone(), values, validity)?.to_boxed(),
204
ptm,
205
)
206
},
207
(PhysicalType::Int32, Decimal(_, _)) => PageDecoder::new(
208
&field.name,
209
pages,
210
dtype,
211
primitive::IntDecoder::<i32, i128, _>::cast_into(),
212
init_nested,
213
)?
214
.collect_boxed(filter)?,
215
(PhysicalType::Int64, Decimal(_, _)) => PageDecoder::new(
216
&field.name,
217
pages,
218
dtype,
219
primitive::IntDecoder::<i64, i128, _>::cast_into(),
220
init_nested,
221
)?
222
.collect_boxed(filter)?,
223
(PhysicalType::FixedLenByteArray(n), Decimal(_, _)) if *n > 16 => {
224
return Err(ParquetError::not_supported(format!(
225
"not implemented: can't decode Decimal128 type from Fixed Size Byte Array of len {n:?}"
226
)));
227
},
228
(PhysicalType::FixedLenByteArray(n), Decimal(_, _)) => {
229
// @TODO: Make a separate decoder for this
230
231
let n = *n;
232
233
let (nested, array, ptm) = PageDecoder::new(
234
&field.name,
235
pages,
236
ArrowDataType::FixedSizeBinary(n),
237
fixed_size_binary::BinaryDecoder { size: n },
238
init_nested,
239
)?
240
.collect(filter)?;
241
242
let values = array
243
.values()
244
.chunks_exact(n)
245
.map(|value: &[u8]| super::super::convert_i128(value, n))
246
.collect::<Vec<_>>();
247
let validity = array.validity().cloned();
248
249
(
250
nested,
251
PrimitiveArray::<i128>::try_new(dtype.clone(), values.into(), validity)?.to_boxed(),
252
ptm,
253
)
254
},
255
(PhysicalType::Int32, Decimal256(_, _)) => PageDecoder::new(
256
&field.name,
257
pages,
258
dtype,
259
primitive::IntDecoder::closure(|x: i32| i256(I256::new(x as i128))),
260
init_nested,
261
)?
262
.collect_boxed(filter)?,
263
(PhysicalType::Int64, Decimal256(_, _)) => PageDecoder::new(
264
&field.name,
265
pages,
266
dtype,
267
primitive::IntDecoder::closure(|x: i64| i256(I256::new(x as i128))),
268
init_nested,
269
)?
270
.collect_boxed(filter)?,
271
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 16 => {
272
// @TODO: Make a separate decoder for this
273
274
let n = *n;
275
276
let (nested, array, ptm) = PageDecoder::new(
277
&field.name,
278
pages,
279
ArrowDataType::FixedSizeBinary(n),
280
fixed_size_binary::BinaryDecoder { size: n },
281
init_nested,
282
)?
283
.collect(filter)?;
284
285
let values = array
286
.values()
287
.chunks_exact(n)
288
.map(|value: &[u8]| i256(I256::new(super::super::convert_i128(value, n))))
289
.collect::<Vec<_>>();
290
let validity = array.validity().cloned();
291
292
(
293
nested,
294
PrimitiveArray::<i256>::try_new(dtype.clone(), values.into(), validity)?.to_boxed(),
295
ptm,
296
)
297
},
298
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 32 => {
299
// @TODO: Make a separate decoder for this
300
301
let n = *n;
302
303
let (nested, array, ptm) = PageDecoder::new(
304
&field.name,
305
pages,
306
ArrowDataType::FixedSizeBinary(n),
307
fixed_size_binary::BinaryDecoder { size: n },
308
init_nested,
309
)?
310
.collect(filter)?;
311
312
let values = array
313
.values()
314
.chunks_exact(n)
315
.map(super::super::convert_i256)
316
.collect::<Vec<_>>();
317
let validity = array.validity().cloned();
318
319
(
320
nested,
321
PrimitiveArray::<i256>::try_new(dtype.clone(), values.into(), validity)?.to_boxed(),
322
ptm,
323
)
324
},
325
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 32 => {
326
return Err(ParquetError::not_supported(format!(
327
"Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}",
328
)));
329
},
330
(PhysicalType::Int32, Date64) => PageDecoder::new(
331
&field.name,
332
pages,
333
dtype,
334
primitive::IntDecoder::closure(|x: i32| i64::from(x) * 86400000),
335
init_nested,
336
)?
337
.collect_boxed(filter)?,
338
(PhysicalType::Int64, Date64) => PageDecoder::new(
339
&field.name,
340
pages,
341
dtype,
342
primitive::IntDecoder::<i64, _, _>::unit(),
343
init_nested,
344
)?
345
.collect_boxed(filter)?,
346
(PhysicalType::Int64, Int64 | Time64(_) | Duration(_)) => PageDecoder::new(
347
&field.name,
348
pages,
349
dtype,
350
primitive::IntDecoder::<i64, _, _>::unit(),
351
init_nested,
352
)?
353
.collect_boxed(filter)?,
354
355
(PhysicalType::Int64, UInt64) => PageDecoder::new(
356
&field.name,
357
pages,
358
dtype,
359
primitive::IntDecoder::<i64, u64, _>::cast_as(),
360
init_nested,
361
)?
362
.collect_boxed(filter)?,
363
364
// Float16
365
(PhysicalType::FixedLenByteArray(2), Float32) => {
366
// @NOTE: To reduce code bloat, we just use the FixedSizeBinary decoder.
367
368
let (nested, mut fsb_array, ptm) = PageDecoder::new(
369
&field.name,
370
pages,
371
ArrowDataType::FixedSizeBinary(2),
372
fixed_size_binary::BinaryDecoder { size: 2 },
373
init_nested,
374
)?
375
.collect(filter)?;
376
377
let validity = fsb_array.take_validity();
378
let values = fsb_array.values().as_slice();
379
assert_eq!(values.len() % 2, 0);
380
let values = values.chunks_exact(2);
381
let values = values
382
.map(|v| {
383
// SAFETY: We know that `v` is always of size two.
384
let le_bytes: [u8; 2] = unsafe { v.try_into().unwrap_unchecked() };
385
let v = arrow::types::f16::from_le_bytes(le_bytes);
386
v.to_f32()
387
})
388
.collect();
389
390
(
391
nested,
392
PrimitiveArray::<f32>::new(dtype, values, validity).to_boxed(),
393
ptm,
394
)
395
},
396
397
(PhysicalType::Float, Float32) => PageDecoder::new(
398
&field.name,
399
pages,
400
dtype,
401
primitive::FloatDecoder::<f32, _, _>::unit(),
402
init_nested,
403
)?
404
.collect_boxed(filter)?,
405
(PhysicalType::Double, Float64) => PageDecoder::new(
406
&field.name,
407
pages,
408
dtype,
409
primitive::FloatDecoder::<f64, _, _>::unit(),
410
init_nested,
411
)?
412
.collect_boxed(filter)?,
413
// Don't compile this code with `i32` as we don't use this in polars
414
(PhysicalType::ByteArray, LargeBinary | LargeUtf8) => {
415
let is_string = matches!(dtype, LargeUtf8);
416
PageDecoder::new(
417
&field.name,
418
pages,
419
dtype,
420
binview::BinViewDecoder { is_string },
421
init_nested,
422
)?
423
.collect(filter)?
424
},
425
(_, Binary | Utf8) => unreachable!(),
426
(PhysicalType::ByteArray, BinaryView | Utf8View) => {
427
let is_string = matches!(dtype, Utf8View);
428
PageDecoder::new(
429
&field.name,
430
pages,
431
dtype,
432
binview::BinViewDecoder { is_string },
433
init_nested,
434
)?
435
.collect(filter)?
436
},
437
(_, Dictionary(key_type, value_type, _)) => {
438
// @NOTE: This should only hit in two cases:
439
// - Polars enum's and categorical's
440
// - Int -> String which can be turned into categoricals
441
assert_eq!(value_type.as_ref(), &ArrowDataType::Utf8View);
442
443
if field.metadata.is_some_and(|md| {
444
md.contains_key(DTYPE_ENUM_VALUES_LEGACY)
445
|| md.contains_key(DTYPE_ENUM_VALUES_NEW)
446
|| md.contains_key(DTYPE_CATEGORICAL_NEW)
447
|| md.contains_key(DTYPE_CATEGORICAL_LEGACY)
448
}) && matches!(
449
key_type,
450
IntegerType::UInt8 | IntegerType::UInt16 | IntegerType::UInt32
451
) {
452
match key_type {
453
IntegerType::UInt8 => PageDecoder::new(
454
&field.name,
455
pages,
456
dtype,
457
CategoricalDecoder::<u8>::new(),
458
init_nested,
459
)?
460
.collect_boxed(filter)?,
461
IntegerType::UInt16 => PageDecoder::new(
462
&field.name,
463
pages,
464
dtype,
465
CategoricalDecoder::<u16>::new(),
466
init_nested,
467
)?
468
.collect_boxed(filter)?,
469
IntegerType::UInt32 => PageDecoder::new(
470
&field.name,
471
pages,
472
dtype,
473
CategoricalDecoder::<u32>::new(),
474
init_nested,
475
)?
476
.collect_boxed(filter)?,
477
_ => unreachable!(),
478
}
479
} else {
480
let (nested, array, ptm) = PageDecoder::new(
481
&field.name,
482
pages,
483
ArrowDataType::Utf8View,
484
binview::BinViewDecoder::new_string(),
485
init_nested,
486
)?
487
.collect(filter)?;
488
489
(
490
nested,
491
polars_compute::cast::cast(array.as_ref(), &dtype, CastOptionsImpl::default())
492
.unwrap(),
493
ptm,
494
)
495
}
496
},
497
(from, to) => {
498
return Err(ParquetError::not_supported(format!(
499
"reading parquet type {from:?} to {to:?} still not implemented",
500
)));
501
},
502
})
503
}
504
505
/// Unify the timestamp unit from parquet TimeUnit into arrow's TimeUnit
506
/// Returns (a int64 factor, is_multiplier)
507
fn unify_timestamp_unit(
508
logical_type: &Option<PrimitiveLogicalType>,
509
time_unit: TimeUnit,
510
) -> (i64, bool) {
511
if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = logical_type {
512
match (*unit, time_unit) {
513
(ParquetTimeUnit::Milliseconds, TimeUnit::Millisecond)
514
| (ParquetTimeUnit::Microseconds, TimeUnit::Microsecond)
515
| (ParquetTimeUnit::Nanoseconds, TimeUnit::Nanosecond) => (1, true),
516
517
(ParquetTimeUnit::Milliseconds, TimeUnit::Second)
518
| (ParquetTimeUnit::Microseconds, TimeUnit::Millisecond)
519
| (ParquetTimeUnit::Nanoseconds, TimeUnit::Microsecond) => (1000, false),
520
521
(ParquetTimeUnit::Microseconds, TimeUnit::Second)
522
| (ParquetTimeUnit::Nanoseconds, TimeUnit::Millisecond) => (1_000_000, false),
523
524
(ParquetTimeUnit::Nanoseconds, TimeUnit::Second) => (1_000_000_000, false),
525
526
(ParquetTimeUnit::Milliseconds, TimeUnit::Microsecond)
527
| (ParquetTimeUnit::Microseconds, TimeUnit::Nanosecond) => (1_000, true),
528
529
(ParquetTimeUnit::Milliseconds, TimeUnit::Nanosecond) => (1_000_000, true),
530
}
531
} else {
532
(1, true)
533
}
534
}
535
536
#[inline]
537
pub fn int96_to_i64_us(value: [u32; 3]) -> i64 {
538
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
539
const SECONDS_PER_DAY: i64 = 86_400;
540
const MICROS_PER_SECOND: i64 = 1_000_000;
541
542
let day = value[2] as i64;
543
let microseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000;
544
let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
545
546
seconds * MICROS_PER_SECOND + microseconds
547
}
548
549
#[inline]
550
pub fn int96_to_i64_ms(value: [u32; 3]) -> i64 {
551
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
552
const SECONDS_PER_DAY: i64 = 86_400;
553
const MILLIS_PER_SECOND: i64 = 1_000;
554
555
let day = value[2] as i64;
556
let milliseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000;
557
let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
558
559
seconds * MILLIS_PER_SECOND + milliseconds
560
}
561
562
#[inline]
563
pub fn int96_to_i64_s(value: [u32; 3]) -> i64 {
564
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
565
const SECONDS_PER_DAY: i64 = 86_400;
566
567
let day = value[2] as i64;
568
let seconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000_000;
569
let day_seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
570
571
day_seconds + seconds
572
}
573
574
#[expect(clippy::too_many_arguments)]
575
fn timestamp(
576
field_name: &str,
577
pages: BasicDecompressor,
578
physical_type: &PhysicalType,
579
logical_type: &Option<PrimitiveLogicalType>,
580
dtype: ArrowDataType,
581
filter: Option<Filter>,
582
time_unit: TimeUnit,
583
nested: Option<Vec<InitNested>>,
584
) -> ParquetResult<(Option<NestedState>, Box<dyn Array>, Bitmap)> {
585
if physical_type == &PhysicalType::Int96 {
586
return match time_unit {
587
TimeUnit::Nanosecond => PageDecoder::new(
588
field_name,
589
pages,
590
dtype,
591
primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_ns(x)),
592
nested,
593
)?
594
.collect_boxed(filter),
595
TimeUnit::Microsecond => PageDecoder::new(
596
field_name,
597
pages,
598
dtype,
599
primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_us(x)),
600
nested,
601
)?
602
.collect_boxed(filter),
603
TimeUnit::Millisecond => PageDecoder::new(
604
field_name,
605
pages,
606
dtype,
607
primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_ms(x)),
608
nested,
609
)?
610
.collect_boxed(filter),
611
TimeUnit::Second => PageDecoder::new(
612
field_name,
613
pages,
614
dtype,
615
primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_s(x)),
616
nested,
617
)?
618
.collect_boxed(filter),
619
};
620
};
621
622
if physical_type != &PhysicalType::Int64 {
623
return Err(ParquetError::not_supported(
624
"can't decode a timestamp from a non-int64 parquet type",
625
));
626
}
627
628
let (factor, is_multiplier) = unify_timestamp_unit(logical_type, time_unit);
629
match (factor, is_multiplier) {
630
(1, _) => PageDecoder::new(
631
field_name,
632
pages,
633
dtype,
634
primitive::IntDecoder::<i64, _, _>::unit(),
635
nested,
636
)?
637
.collect_boxed(filter),
638
(a, true) => PageDecoder::new(
639
field_name,
640
pages,
641
dtype,
642
primitive::IntDecoder::closure(|x: i64| x * a),
643
nested,
644
)?
645
.collect_boxed(filter),
646
(a, false) => PageDecoder::new(
647
field_name,
648
pages,
649
dtype,
650
primitive::IntDecoder::closure(|x: i64| x / a),
651
nested,
652
)?
653
.collect_boxed(filter),
654
}
655
}
656
657