Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/write/mod.rs
8460 views
1
//! APIs to write to Parquet format.
2
//!
3
//! # Arrow/Parquet Interoperability
4
//! As of [parquet-format v2.9](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md)
5
//! there are Arrow [DataTypes](arrow::datatypes::ArrowDataType) which do not have a parquet
6
//! representation. These include but are not limited to:
7
//! * `ArrowDataType::Timestamp(TimeUnit::Second, _)`
8
//! * `ArrowDataType::Int64`
9
//! * `ArrowDataType::Duration`
10
//! * `ArrowDataType::Date64`
11
//! * `ArrowDataType::Time32(TimeUnit::Second)`
12
//!
13
//! The use of these arrow types will result in no logical type being stored within a parquet file.
14
15
mod binary;
16
mod binview;
17
mod boolean;
18
mod dictionary;
19
mod file;
20
mod fixed_size_binary;
21
mod nested;
22
mod pages;
23
mod primitive;
24
mod row_group;
25
mod schema;
26
mod utils;
27
28
use arrow::array::*;
29
use arrow::bitmap::Bitmap;
30
use arrow::datatypes::*;
31
use arrow::types::{NativeType, days_ms, i256};
32
pub use nested::{num_values, write_rep_and_def};
33
pub use pages::{to_leaves, to_nested, to_parquet_leaves};
34
use polars_utils::float16::pf16;
35
use polars_utils::pl_str::PlSmallStr;
36
pub use utils::write_def_levels;
37
38
pub use crate::parquet::compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel};
39
pub use crate::parquet::encoding::Encoding;
40
pub use crate::parquet::metadata::{
41
Descriptor, FileMetadata, KeyValue, SchemaDescriptor, ThriftFileMetadata,
42
};
43
pub use crate::parquet::page::{CompressedDataPage, CompressedPage, Page};
44
use crate::parquet::schema::Repetition;
45
use crate::parquet::schema::types::PrimitiveType as ParquetPrimitiveType;
46
pub use crate::parquet::schema::types::{
47
FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType,
48
};
49
pub use crate::parquet::write::{
50
Compressor, DynIter, DynStreamingIterator, RowGroupIterColumns, Version, compress,
51
write_metadata_sidecar,
52
};
53
pub use crate::parquet::{FallibleStreamingIterator, fallible_streaming_iterator};
54
use crate::write::fixed_size_binary::build_statistics_float16;
55
56
/// The statistics to write
57
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
58
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
59
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
60
pub struct StatisticsOptions {
61
pub min_value: bool,
62
pub max_value: bool,
63
pub distinct_count: bool,
64
pub null_count: bool,
65
}
66
67
impl Default for StatisticsOptions {
68
fn default() -> Self {
69
Self {
70
min_value: true,
71
max_value: true,
72
distinct_count: false,
73
null_count: true,
74
}
75
}
76
}
77
78
/// Options to encode an array
79
#[derive(Clone, Copy)]
80
pub enum EncodeNullability {
81
Required,
82
Optional,
83
}
84
85
/// Currently supported options to write to parquet
86
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87
pub struct WriteOptions {
88
/// Whether to write statistics
89
pub statistics: StatisticsOptions,
90
/// The page and file version to use
91
pub version: Version,
92
/// The compression to apply to every page
93
pub compression: CompressionOptions,
94
/// The size to flush a page, defaults to 1024 * 1024 if None
95
pub data_page_size: Option<usize>,
96
}
97
98
use arrow::compute::aggregate::estimated_bytes_size;
99
use arrow::match_integer_type;
100
pub use file::FileWriter;
101
pub use pages::{Nested, array_to_columns, arrays_to_columns};
102
use polars_error::{PolarsResult, polars_bail};
103
pub use row_group::{RowGroupIterator, row_group_iter};
104
pub use schema::{schema_to_metadata_key, to_parquet_type};
105
106
use self::pages::{FixedSizeListNested, PrimitiveNested, StructNested};
107
use crate::write::dictionary::encode_as_dictionary_optional;
108
109
impl StatisticsOptions {
110
pub fn empty() -> Self {
111
Self {
112
min_value: false,
113
max_value: false,
114
distinct_count: false,
115
null_count: false,
116
}
117
}
118
119
pub fn full() -> Self {
120
Self {
121
min_value: true,
122
max_value: true,
123
distinct_count: true,
124
null_count: true,
125
}
126
}
127
128
pub fn is_empty(&self) -> bool {
129
!(self.min_value || self.max_value || self.distinct_count || self.null_count)
130
}
131
132
pub fn is_full(&self) -> bool {
133
self.min_value && self.max_value && self.distinct_count && self.null_count
134
}
135
}
136
137
impl WriteOptions {
138
pub fn has_statistics(&self) -> bool {
139
!self.statistics.is_empty()
140
}
141
}
142
143
impl EncodeNullability {
144
const fn new(is_optional: bool) -> Self {
145
if is_optional {
146
Self::Optional
147
} else {
148
Self::Required
149
}
150
}
151
152
fn is_optional(self) -> bool {
153
matches!(self, Self::Optional)
154
}
155
}
156
157
/// `data_page_size`: Set a target threshold for the approximate encoded size of data
158
/// pages within a column chunk (in bytes). If None, use the default data page size of 1MByte.
159
/// See: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html
160
pub(crate) fn row_slice_ranges(
161
number_of_rows: usize,
162
byte_size: usize,
163
options: WriteOptions,
164
) -> impl Iterator<Item = (usize, usize)> {
165
const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; // 1 MB
166
let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
167
let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size
168
169
let bytes_per_row = if number_of_rows == 0 {
170
0
171
} else {
172
((byte_size as f64) / (number_of_rows as f64)) as usize
173
};
174
let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
175
176
(0..number_of_rows)
177
.step_by(rows_per_page)
178
.map(move |offset| {
179
let length = (offset + rows_per_page).min(number_of_rows) - offset;
180
(offset, length)
181
})
182
}
183
184
/// returns offset and length to slice the leaf values
185
pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
186
// find the deepest recursive dremel structure as that one determines how many values we must
187
// take
188
let mut out = (0, 0);
189
for nested in nested.iter().rev() {
190
match nested {
191
Nested::LargeList(l_nested) => {
192
let start = *l_nested.offsets.first();
193
let end = *l_nested.offsets.last();
194
return (start as usize, (end - start) as usize);
195
},
196
Nested::List(l_nested) => {
197
let start = *l_nested.offsets.first();
198
let end = *l_nested.offsets.last();
199
return (start as usize, (end - start) as usize);
200
},
201
Nested::FixedSizeList(nested) => return (0, nested.length * nested.width),
202
Nested::Primitive(nested) => out = (0, nested.length),
203
Nested::Struct(_) => {},
204
}
205
}
206
out
207
}
208
209
fn decimal_length_from_precision(precision: usize) -> usize {
210
// digits = floor(log_10(2^(8*n - 1) - 1))
211
// ceil(digits) = log10(2^(8*n - 1) - 1)
212
// 10^ceil(digits) = 2^(8*n - 1) - 1
213
// 10^ceil(digits) + 1 = 2^(8*n - 1)
214
// log2(10^ceil(digits) + 1) = (8*n - 1)
215
// log2(10^ceil(digits) + 1) + 1 = 8*n
216
// (log2(10^ceil(a) + 1) + 1) / 8 = n
217
(((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
218
}
219
220
/// Creates a parquet [`SchemaDescriptor`] from a [`ArrowSchema`].
221
pub fn to_parquet_schema(schema: &ArrowSchema) -> PolarsResult<SchemaDescriptor> {
222
let parquet_types = schema
223
.iter_values()
224
.map(to_parquet_type)
225
.collect::<PolarsResult<Vec<_>>>()?;
226
Ok(SchemaDescriptor::new(
227
PlSmallStr::from_static("root"),
228
parquet_types,
229
))
230
}
231
232
/// Slices the [`Array`] to `Box<dyn Array>` and `Vec<Nested>`.
233
pub fn slice_parquet_array(
234
primitive_array: &mut dyn Array,
235
nested: &mut [Nested],
236
mut current_offset: usize,
237
mut current_length: usize,
238
) {
239
for nested in nested.iter_mut() {
240
match nested {
241
Nested::LargeList(l_nested) => {
242
l_nested.offsets.slice(current_offset, current_length + 1);
243
if let Some(validity) = l_nested.validity.as_mut() {
244
validity.slice(current_offset, current_length)
245
};
246
247
// Update the offset/ length so that the Primitive is sliced properly.
248
current_length = l_nested.offsets.range() as usize;
249
current_offset = *l_nested.offsets.first() as usize;
250
},
251
Nested::List(l_nested) => {
252
l_nested.offsets.slice(current_offset, current_length + 1);
253
if let Some(validity) = l_nested.validity.as_mut() {
254
validity.slice(current_offset, current_length)
255
};
256
257
// Update the offset/ length so that the Primitive is sliced properly.
258
current_length = l_nested.offsets.range() as usize;
259
current_offset = *l_nested.offsets.first() as usize;
260
},
261
Nested::Struct(StructNested {
262
validity, length, ..
263
}) => {
264
*length = current_length;
265
if let Some(validity) = validity.as_mut() {
266
validity.slice(current_offset, current_length)
267
};
268
},
269
Nested::Primitive(PrimitiveNested {
270
validity, length, ..
271
}) => {
272
*length = current_length;
273
if let Some(validity) = validity.as_mut() {
274
validity.slice(current_offset, current_length)
275
};
276
primitive_array.slice(current_offset, current_length);
277
},
278
Nested::FixedSizeList(FixedSizeListNested {
279
validity,
280
length,
281
width,
282
..
283
}) => {
284
if let Some(validity) = validity.as_mut() {
285
validity.slice(current_offset, current_length)
286
};
287
*length = current_length;
288
// Update the offset/ length so that the Primitive is sliced properly.
289
current_length *= *width;
290
current_offset *= *width;
291
},
292
}
293
}
294
}
295
296
/// Get the length of [`Array`] that should be sliced.
297
pub fn get_max_length(nested: &[Nested]) -> usize {
298
let mut length = 0;
299
for nested in nested.iter() {
300
match nested {
301
Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
302
Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
303
Nested::FixedSizeList(nested) => length += nested.length * nested.width,
304
_ => {},
305
}
306
}
307
length
308
}
309
310
/// Returns an iterator of [`Page`].
311
pub fn array_to_pages(
312
primitive_array: &dyn Array,
313
type_: ParquetPrimitiveType,
314
nested: &[Nested],
315
options: WriteOptions,
316
mut encoding: Encoding,
317
) -> PolarsResult<DynIter<'static, PolarsResult<Page>>> {
318
if let ArrowDataType::Dictionary(key_type, _, _) = primitive_array.dtype().to_storage() {
319
return match_integer_type!(key_type, |$T| {
320
dictionary::array_to_pages::<$T>(
321
primitive_array.as_any().downcast_ref().unwrap(),
322
type_,
323
&nested,
324
options,
325
encoding,
326
)
327
});
328
};
329
if let Encoding::RleDictionary = encoding {
330
// Only take this path for primitive columns
331
if matches!(nested.first(), Some(Nested::Primitive(_))) {
332
if let Some(result) =
333
encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)
334
{
335
return result;
336
}
337
}
338
339
// We didn't succeed, fallback to plain
340
encoding = Encoding::Plain;
341
}
342
343
let nested = nested.to_vec();
344
let number_of_rows = nested[0].len();
345
// note: this is not correct if the array is sliced - the estimation should happen on the
346
// primitive after sliced for parquet
347
let byte_size = estimated_bytes_size(primitive_array);
348
let primitive_array = primitive_array.to_boxed();
349
350
let pages =
351
row_slice_ranges(number_of_rows, byte_size, options).map(move |(offset, length)| {
352
let mut right_array = primitive_array.clone();
353
let mut right_nested = nested.clone();
354
slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
355
356
array_to_page(
357
right_array.as_ref(),
358
type_.clone(),
359
&right_nested,
360
options,
361
encoding,
362
)
363
});
364
Ok(DynIter::new(pages))
365
}
366
367
/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
368
pub fn array_to_page(
369
array: &dyn Array,
370
type_: ParquetPrimitiveType,
371
nested: &[Nested],
372
options: WriteOptions,
373
encoding: Encoding,
374
) -> PolarsResult<Page> {
375
if nested.len() == 1 {
376
// special case where validity == def levels
377
return array_to_page_simple(array, type_, options, encoding);
378
}
379
array_to_page_nested(array, type_, nested, options, encoding)
380
}
381
382
/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
383
pub fn array_to_page_simple(
384
array: &dyn Array,
385
type_: ParquetPrimitiveType,
386
options: WriteOptions,
387
encoding: Encoding,
388
) -> PolarsResult<Page> {
389
let dtype = array.dtype();
390
391
if type_.field_info.repetition == Repetition::Required && array.null_count() > 0 {
392
polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
393
}
394
395
match dtype {
396
// Map empty struct to boolean array with same validity.
397
ArrowDataType::Struct(fs) if fs.is_empty() => boolean::array_to_page(
398
&BooleanArray::new(
399
ArrowDataType::Boolean,
400
Bitmap::new_zeroed(array.len()),
401
array.validity().cloned(),
402
),
403
options,
404
type_,
405
encoding,
406
),
407
408
ArrowDataType::Boolean => boolean::array_to_page(
409
array.as_any().downcast_ref().unwrap(),
410
options,
411
type_,
412
encoding,
413
),
414
// casts below MUST match the casts done at the metadata (field -> parquet type).
415
ArrowDataType::UInt8 => {
416
return primitive::array_to_page_integer::<u8, i32>(
417
array.as_any().downcast_ref().unwrap(),
418
options,
419
type_,
420
encoding,
421
);
422
},
423
ArrowDataType::UInt16 => {
424
return primitive::array_to_page_integer::<u16, i32>(
425
array.as_any().downcast_ref().unwrap(),
426
options,
427
type_,
428
encoding,
429
);
430
},
431
ArrowDataType::UInt32 => {
432
return primitive::array_to_page_integer::<u32, i32>(
433
array.as_any().downcast_ref().unwrap(),
434
options,
435
type_,
436
encoding,
437
);
438
},
439
ArrowDataType::UInt64 => {
440
return primitive::array_to_page_integer::<u64, i64>(
441
array.as_any().downcast_ref().unwrap(),
442
options,
443
type_,
444
encoding,
445
);
446
},
447
ArrowDataType::Int8 => {
448
return primitive::array_to_page_integer::<i8, i32>(
449
array.as_any().downcast_ref().unwrap(),
450
options,
451
type_,
452
encoding,
453
);
454
},
455
ArrowDataType::Int16 => {
456
return primitive::array_to_page_integer::<i16, i32>(
457
array.as_any().downcast_ref().unwrap(),
458
options,
459
type_,
460
encoding,
461
);
462
},
463
ArrowDataType::Int32 | ArrowDataType::Date32 | ArrowDataType::Time32(_) => {
464
return primitive::array_to_page_integer::<i32, i32>(
465
array.as_any().downcast_ref().unwrap(),
466
options,
467
type_,
468
encoding,
469
);
470
},
471
ArrowDataType::Int64
472
| ArrowDataType::Date64
473
| ArrowDataType::Time64(_)
474
| ArrowDataType::Timestamp(_, _)
475
| ArrowDataType::Duration(_) => {
476
return primitive::array_to_page_integer::<i64, i64>(
477
array.as_any().downcast_ref().unwrap(),
478
options,
479
type_,
480
encoding,
481
);
482
},
483
ArrowDataType::Float16 => {
484
let array: &PrimitiveArray<pf16> = array.as_any().downcast_ref().unwrap();
485
let statistics = options
486
.has_statistics()
487
.then(|| build_statistics_float16(array, type_.clone(), &options.statistics));
488
let array = FixedSizeBinaryArray::new(
489
ArrowDataType::FixedSizeBinary(2),
490
array.values().clone().try_transmute().unwrap(),
491
array.validity().cloned(),
492
);
493
fixed_size_binary::array_to_page(&array, options, type_, statistics)
494
},
495
ArrowDataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
496
array.as_any().downcast_ref().unwrap(),
497
options,
498
type_,
499
),
500
ArrowDataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
501
array.as_any().downcast_ref().unwrap(),
502
options,
503
type_,
504
),
505
ArrowDataType::LargeUtf8 => {
506
let array =
507
polars_compute::cast::cast(array, &ArrowDataType::LargeBinary, Default::default())
508
.unwrap();
509
return binary::array_to_page::<i64>(
510
array.as_any().downcast_ref().unwrap(),
511
options,
512
type_,
513
encoding,
514
);
515
},
516
ArrowDataType::LargeBinary => {
517
return binary::array_to_page::<i64>(
518
array.as_any().downcast_ref().unwrap(),
519
options,
520
type_,
521
encoding,
522
);
523
},
524
ArrowDataType::BinaryView => {
525
return binview::array_to_page(
526
array.as_any().downcast_ref().unwrap(),
527
options,
528
type_,
529
encoding,
530
);
531
},
532
ArrowDataType::Utf8View => {
533
let array =
534
polars_compute::cast::cast(array, &ArrowDataType::BinaryView, Default::default())
535
.unwrap();
536
return binview::array_to_page(
537
array.as_any().downcast_ref().unwrap(),
538
options,
539
type_,
540
encoding,
541
);
542
},
543
ArrowDataType::Null => {
544
let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
545
primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
546
},
547
ArrowDataType::Interval(IntervalUnit::YearMonth) => {
548
let array = array
549
.as_any()
550
.downcast_ref::<PrimitiveArray<i32>>()
551
.unwrap();
552
let mut values = Vec::<u8>::with_capacity(12 * array.len());
553
array.values().iter().for_each(|x| {
554
let bytes = &x.to_le_bytes();
555
values.extend_from_slice(bytes);
556
values.extend_from_slice(&[0; 8]);
557
});
558
let array = FixedSizeBinaryArray::new(
559
ArrowDataType::FixedSizeBinary(12),
560
values.into(),
561
array.validity().cloned(),
562
);
563
let statistics = if options.has_statistics() {
564
Some(fixed_size_binary::build_statistics(
565
&array,
566
type_.clone(),
567
&options.statistics,
568
))
569
} else {
570
None
571
};
572
fixed_size_binary::array_to_page(&array, options, type_, statistics)
573
},
574
ArrowDataType::Interval(IntervalUnit::DayTime) => {
575
let array = array
576
.as_any()
577
.downcast_ref::<PrimitiveArray<days_ms>>()
578
.unwrap();
579
let mut values = Vec::<u8>::with_capacity(12 * array.len());
580
array.values().iter().for_each(|x| {
581
let bytes = &x.to_le_bytes();
582
values.extend_from_slice(&[0; 4]); // months
583
values.extend_from_slice(bytes); // days and seconds
584
});
585
let array = FixedSizeBinaryArray::new(
586
ArrowDataType::FixedSizeBinary(12),
587
values.into(),
588
array.validity().cloned(),
589
);
590
let statistics = if options.has_statistics() {
591
Some(fixed_size_binary::build_statistics(
592
&array,
593
type_.clone(),
594
&options.statistics,
595
))
596
} else {
597
None
598
};
599
fixed_size_binary::array_to_page(&array, options, type_, statistics)
600
},
601
ArrowDataType::FixedSizeBinary(_) => {
602
let array = array.as_any().downcast_ref().unwrap();
603
let statistics = if options.has_statistics() {
604
Some(fixed_size_binary::build_statistics(
605
array,
606
type_.clone(),
607
&options.statistics,
608
))
609
} else {
610
None
611
};
612
613
fixed_size_binary::array_to_page(array, options, type_, statistics)
614
},
615
ArrowDataType::Decimal256(precision, _) => {
616
let precision = *precision;
617
let array = array
618
.as_any()
619
.downcast_ref::<PrimitiveArray<i256>>()
620
.unwrap();
621
if precision <= 9 {
622
let values = array
623
.values()
624
.iter()
625
.map(|x| x.0.as_i32())
626
.collect::<Vec<_>>()
627
.into();
628
629
let array = PrimitiveArray::<i32>::new(
630
ArrowDataType::Int32,
631
values,
632
array.validity().cloned(),
633
);
634
return primitive::array_to_page_integer::<i32, i32>(
635
&array, options, type_, encoding,
636
);
637
} else if precision <= 18 {
638
let values = array
639
.values()
640
.iter()
641
.map(|x| x.0.as_i64())
642
.collect::<Vec<_>>()
643
.into();
644
645
let array = PrimitiveArray::<i64>::new(
646
ArrowDataType::Int64,
647
values,
648
array.validity().cloned(),
649
);
650
return primitive::array_to_page_integer::<i64, i64>(
651
&array, options, type_, encoding,
652
);
653
} else if precision <= 38 {
654
let size = decimal_length_from_precision(precision);
655
let statistics = if options.has_statistics() {
656
let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
657
array,
658
type_.clone(),
659
size,
660
&options.statistics,
661
);
662
Some(stats)
663
} else {
664
None
665
};
666
667
let mut values = Vec::<u8>::with_capacity(size * array.len());
668
array.values().iter().for_each(|x| {
669
let bytes = &x.0.low().to_be_bytes()[16 - size..];
670
values.extend_from_slice(bytes)
671
});
672
let array = FixedSizeBinaryArray::new(
673
ArrowDataType::FixedSizeBinary(size),
674
values.into(),
675
array.validity().cloned(),
676
);
677
fixed_size_binary::array_to_page(&array, options, type_, statistics)
678
} else {
679
let size = 32;
680
let array = array
681
.as_any()
682
.downcast_ref::<PrimitiveArray<i256>>()
683
.unwrap();
684
let statistics = if options.has_statistics() {
685
let stats = fixed_size_binary::build_statistics_decimal256(
686
array,
687
type_.clone(),
688
size,
689
&options.statistics,
690
);
691
Some(stats)
692
} else {
693
None
694
};
695
let mut values = Vec::<u8>::with_capacity(size * array.len());
696
array.values().iter().for_each(|x| {
697
let bytes = &x.to_be_bytes();
698
values.extend_from_slice(bytes)
699
});
700
let array = FixedSizeBinaryArray::new(
701
ArrowDataType::FixedSizeBinary(size),
702
values.into(),
703
array.validity().cloned(),
704
);
705
706
fixed_size_binary::array_to_page(&array, options, type_, statistics)
707
}
708
},
709
ArrowDataType::Decimal(precision, _) => {
710
let precision = *precision;
711
let array = array
712
.as_any()
713
.downcast_ref::<PrimitiveArray<i128>>()
714
.unwrap();
715
if precision <= 9 {
716
let values = array
717
.values()
718
.iter()
719
.map(|x| *x as i32)
720
.collect::<Vec<_>>()
721
.into();
722
723
let array = PrimitiveArray::<i32>::new(
724
ArrowDataType::Int32,
725
values,
726
array.validity().cloned(),
727
);
728
return primitive::array_to_page_integer::<i32, i32>(
729
&array, options, type_, encoding,
730
);
731
} else if precision <= 18 {
732
let values = array
733
.values()
734
.iter()
735
.map(|x| *x as i64)
736
.collect::<Vec<_>>()
737
.into();
738
739
let array = PrimitiveArray::<i64>::new(
740
ArrowDataType::Int64,
741
values,
742
array.validity().cloned(),
743
);
744
return primitive::array_to_page_integer::<i64, i64>(
745
&array, options, type_, encoding,
746
);
747
} else {
748
let size = decimal_length_from_precision(precision);
749
750
let statistics = if options.has_statistics() {
751
let stats = fixed_size_binary::build_statistics_decimal(
752
array,
753
type_.clone(),
754
size,
755
&options.statistics,
756
);
757
Some(stats)
758
} else {
759
None
760
};
761
762
let mut values = Vec::<u8>::with_capacity(size * array.len());
763
array.values().iter().for_each(|x| {
764
let bytes = &x.to_be_bytes()[16 - size..];
765
values.extend_from_slice(bytes)
766
});
767
let array = FixedSizeBinaryArray::new(
768
ArrowDataType::FixedSizeBinary(size),
769
values.into(),
770
array.validity().cloned(),
771
);
772
fixed_size_binary::array_to_page(&array, options, type_, statistics)
773
}
774
},
775
ArrowDataType::UInt128 => {
776
let array: &PrimitiveArray<u128> = array.as_any().downcast_ref().unwrap();
777
let statistics = if options.has_statistics() {
778
let stats = fixed_size_binary::build_statistics_decimal(
779
array,
780
type_.clone(),
781
16,
782
&options.statistics,
783
);
784
Some(stats)
785
} else {
786
None
787
};
788
let array = FixedSizeBinaryArray::new(
789
ArrowDataType::FixedSizeBinary(16),
790
array.values().clone().try_transmute().unwrap(),
791
array.validity().cloned(),
792
);
793
fixed_size_binary::array_to_page(&array, options, type_, statistics)
794
},
795
ArrowDataType::Int128 => {
796
let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
797
let statistics = if options.has_statistics() {
798
let stats = fixed_size_binary::build_statistics_decimal(
799
array,
800
type_.clone(),
801
16,
802
&options.statistics,
803
);
804
Some(stats)
805
} else {
806
None
807
};
808
let array = FixedSizeBinaryArray::new(
809
ArrowDataType::FixedSizeBinary(16),
810
array.values().clone().try_transmute().unwrap(),
811
array.validity().cloned(),
812
);
813
fixed_size_binary::array_to_page(&array, options, type_, statistics)
814
},
815
ArrowDataType::Extension(ext) => {
816
let mut boxed = array.to_boxed();
817
assert!(matches!(boxed.dtype(), ArrowDataType::Extension(ext2) if ext2 == ext));
818
*boxed.dtype_mut() = ext.inner.clone();
819
return array_to_page_simple(boxed.as_ref(), type_, options, encoding);
820
},
821
other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),
822
}
823
.map(Page::Data)
824
}
825
826
fn array_to_page_nested(
827
array: &dyn Array,
828
type_: ParquetPrimitiveType,
829
nested: &[Nested],
830
options: WriteOptions,
831
_encoding: Encoding,
832
) -> PolarsResult<Page> {
833
if type_.field_info.repetition == Repetition::Required
834
&& array.validity().is_some_and(|v| v.unset_bits() > 0)
835
{
836
polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
837
}
838
839
use ArrowDataType::*;
840
match array.dtype().to_storage() {
841
Null => {
842
let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
843
primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
844
},
845
// Map empty struct to boolean array with same validity.
846
Struct(fs) if fs.is_empty() => {
847
let array = BooleanArray::new(
848
ArrowDataType::Boolean,
849
Bitmap::new_zeroed(array.len()),
850
array.validity().cloned(),
851
);
852
boolean::nested_array_to_page(&array, options, type_, nested)
853
},
854
Boolean => {
855
let array = array.as_any().downcast_ref().unwrap();
856
boolean::nested_array_to_page(array, options, type_, nested)
857
},
858
LargeUtf8 => {
859
let array =
860
polars_compute::cast::cast(array, &LargeBinary, Default::default()).unwrap();
861
let array = array.as_any().downcast_ref().unwrap();
862
binary::nested_array_to_page::<i64>(array, options, type_, nested)
863
},
864
LargeBinary => {
865
let array = array.as_any().downcast_ref().unwrap();
866
binary::nested_array_to_page::<i64>(array, options, type_, nested)
867
},
868
BinaryView => {
869
let array = array.as_any().downcast_ref().unwrap();
870
binview::nested_array_to_page(array, options, type_, nested)
871
},
872
Utf8View => {
873
let array = polars_compute::cast::cast(array, &BinaryView, Default::default()).unwrap();
874
let array = array.as_any().downcast_ref().unwrap();
875
binview::nested_array_to_page(array, options, type_, nested)
876
},
877
UInt8 => {
878
let array = array.as_any().downcast_ref().unwrap();
879
primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
880
},
881
UInt16 => {
882
let array = array.as_any().downcast_ref().unwrap();
883
primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
884
},
885
UInt32 => {
886
let array = array.as_any().downcast_ref().unwrap();
887
primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
888
},
889
UInt64 => {
890
let array = array.as_any().downcast_ref().unwrap();
891
primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
892
},
893
Int8 => {
894
let array = array.as_any().downcast_ref().unwrap();
895
primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
896
},
897
Int16 => {
898
let array = array.as_any().downcast_ref().unwrap();
899
primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
900
},
901
Int32 | Date32 | Time32(_) => {
902
let array = array.as_any().downcast_ref().unwrap();
903
primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
904
},
905
Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
906
let array = array.as_any().downcast_ref().unwrap();
907
primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
908
},
909
Float16 => {
910
let array: &PrimitiveArray<pf16> = array.as_any().downcast_ref().unwrap();
911
let statistics = options
912
.has_statistics()
913
.then(|| build_statistics_float16(array, type_.clone(), &options.statistics));
914
let array = FixedSizeBinaryArray::new(
915
ArrowDataType::FixedSizeBinary(2),
916
array.values().clone().try_transmute().unwrap(),
917
array.validity().cloned(),
918
);
919
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
920
},
921
Float32 => {
922
let array = array.as_any().downcast_ref().unwrap();
923
primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
924
},
925
Float64 => {
926
let array = array.as_any().downcast_ref().unwrap();
927
primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
928
},
929
Decimal(precision, _) => {
930
let precision = *precision;
931
let array = array
932
.as_any()
933
.downcast_ref::<PrimitiveArray<i128>>()
934
.unwrap();
935
if precision <= 9 {
936
let values = array
937
.values()
938
.iter()
939
.map(|x| *x as i32)
940
.collect::<Vec<_>>()
941
.into();
942
943
let array = PrimitiveArray::<i32>::new(
944
ArrowDataType::Int32,
945
values,
946
array.validity().cloned(),
947
);
948
primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
949
} else if precision <= 18 {
950
let values = array
951
.values()
952
.iter()
953
.map(|x| *x as i64)
954
.collect::<Vec<_>>()
955
.into();
956
957
let array = PrimitiveArray::<i64>::new(
958
ArrowDataType::Int64,
959
values,
960
array.validity().cloned(),
961
);
962
primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
963
} else {
964
let size = decimal_length_from_precision(precision);
965
966
let statistics = if options.has_statistics() {
967
let stats = fixed_size_binary::build_statistics_decimal(
968
array,
969
type_.clone(),
970
size,
971
&options.statistics,
972
);
973
Some(stats)
974
} else {
975
None
976
};
977
978
let mut values = Vec::<u8>::with_capacity(size * array.len());
979
array.values().iter().for_each(|x| {
980
let bytes = &x.to_be_bytes()[16 - size..];
981
values.extend_from_slice(bytes)
982
});
983
let array = FixedSizeBinaryArray::new(
984
ArrowDataType::FixedSizeBinary(size),
985
values.into(),
986
array.validity().cloned(),
987
);
988
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
989
}
990
},
991
Decimal256(precision, _) => {
992
let precision = *precision;
993
let array = array
994
.as_any()
995
.downcast_ref::<PrimitiveArray<i256>>()
996
.unwrap();
997
if precision <= 9 {
998
let values = array
999
.values()
1000
.iter()
1001
.map(|x| x.0.as_i32())
1002
.collect::<Vec<_>>()
1003
.into();
1004
1005
let array = PrimitiveArray::<i32>::new(
1006
ArrowDataType::Int32,
1007
values,
1008
array.validity().cloned(),
1009
);
1010
primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
1011
} else if precision <= 18 {
1012
let values = array
1013
.values()
1014
.iter()
1015
.map(|x| x.0.as_i64())
1016
.collect::<Vec<_>>()
1017
.into();
1018
1019
let array = PrimitiveArray::<i64>::new(
1020
ArrowDataType::Int64,
1021
values,
1022
array.validity().cloned(),
1023
);
1024
primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
1025
} else if precision <= 38 {
1026
let size = decimal_length_from_precision(precision);
1027
let statistics = if options.has_statistics() {
1028
let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
1029
array,
1030
type_.clone(),
1031
size,
1032
&options.statistics,
1033
);
1034
Some(stats)
1035
} else {
1036
None
1037
};
1038
1039
let mut values = Vec::<u8>::with_capacity(size * array.len());
1040
array.values().iter().for_each(|x| {
1041
let bytes = &x.0.low().to_be_bytes()[16 - size..];
1042
values.extend_from_slice(bytes)
1043
});
1044
let array = FixedSizeBinaryArray::new(
1045
ArrowDataType::FixedSizeBinary(size),
1046
values.into(),
1047
array.validity().cloned(),
1048
);
1049
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1050
} else {
1051
let size = 32;
1052
let array = array
1053
.as_any()
1054
.downcast_ref::<PrimitiveArray<i256>>()
1055
.unwrap();
1056
let statistics = if options.has_statistics() {
1057
let stats = fixed_size_binary::build_statistics_decimal256(
1058
array,
1059
type_.clone(),
1060
size,
1061
&options.statistics,
1062
);
1063
Some(stats)
1064
} else {
1065
None
1066
};
1067
let mut values = Vec::<u8>::with_capacity(size * array.len());
1068
array.values().iter().for_each(|x| {
1069
let bytes = &x.to_be_bytes();
1070
values.extend_from_slice(bytes)
1071
});
1072
let array = FixedSizeBinaryArray::new(
1073
ArrowDataType::FixedSizeBinary(size),
1074
values.into(),
1075
array.validity().cloned(),
1076
);
1077
1078
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1079
}
1080
},
1081
Int128 => {
1082
let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
1083
// Can't write min/max statistics for signed 128-bit integer, see #25965.
1084
let mut no_mm_options = options;
1085
no_mm_options.statistics.min_value = false;
1086
no_mm_options.statistics.max_value = false;
1087
let statistics = if no_mm_options.has_statistics() {
1088
let stats = fixed_size_binary::build_statistics_decimal(
1089
array,
1090
type_.clone(),
1091
16,
1092
&no_mm_options.statistics,
1093
);
1094
Some(stats)
1095
} else {
1096
None
1097
};
1098
let array = FixedSizeBinaryArray::new(
1099
ArrowDataType::FixedSizeBinary(16),
1100
array.values().clone().try_transmute().unwrap(),
1101
array.validity().cloned(),
1102
);
1103
fixed_size_binary::nested_array_to_page(
1104
&array,
1105
no_mm_options,
1106
type_,
1107
nested,
1108
statistics,
1109
)
1110
},
1111
UInt128 => {
1112
let array: &PrimitiveArray<u128> = array.as_any().downcast_ref().unwrap();
1113
let statistics = if options.has_statistics() {
1114
let stats = fixed_size_binary::build_statistics_decimal(
1115
array,
1116
type_.clone(),
1117
16,
1118
&options.statistics,
1119
);
1120
Some(stats)
1121
} else {
1122
None
1123
};
1124
let array = FixedSizeBinaryArray::new(
1125
ArrowDataType::FixedSizeBinary(16),
1126
array.values().clone().try_transmute().unwrap(),
1127
array.validity().cloned(),
1128
);
1129
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1130
},
1131
other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),
1132
}
1133
.map(Page::Data)
1134
}
1135
1136
fn get_encodings_recursive(dtype: &ArrowDataType, encodings: &mut Vec<Encoding>) {
1137
use arrow::datatypes::PhysicalType::*;
1138
match dtype.to_physical_type() {
1139
Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
1140
| Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => {
1141
encodings.push(get_primitive_dtype_encoding(dtype))
1142
},
1143
List | FixedSizeList | LargeList => {
1144
let a = dtype.to_storage();
1145
if let ArrowDataType::List(inner) = a {
1146
get_encodings_recursive(&inner.dtype, encodings)
1147
} else if let ArrowDataType::LargeList(inner) = a {
1148
get_encodings_recursive(&inner.dtype, encodings)
1149
} else if let ArrowDataType::FixedSizeList(inner, _) = a {
1150
get_encodings_recursive(&inner.dtype, encodings)
1151
} else {
1152
unreachable!()
1153
}
1154
},
1155
Struct => {
1156
if let ArrowDataType::Struct(fields) = dtype.to_storage() {
1157
if fields.is_empty() {
1158
// 0-field struct writes as a boolean column representing outer validity.
1159
encodings.push(Encoding::Rle)
1160
}
1161
1162
for field in fields {
1163
get_encodings_recursive(&field.dtype, encodings)
1164
}
1165
} else {
1166
unreachable!()
1167
}
1168
},
1169
Map => {
1170
if let ArrowDataType::Map(field, _) = dtype.to_storage() {
1171
if let ArrowDataType::Struct(fields) = field.dtype.to_storage() {
1172
for field in fields {
1173
get_encodings_recursive(&field.dtype, encodings)
1174
}
1175
} else {
1176
unreachable!()
1177
}
1178
} else {
1179
unreachable!()
1180
}
1181
},
1182
Union => todo!(),
1183
}
1184
}
1185
1186
/// Transverses the `dtype` up to its (parquet) columns and returns a vector of
1187
/// items based on `map`.
1188
///
1189
/// This is used to assign an [`Encoding`] to every parquet column based on the columns' type (see example)
1190
pub fn get_dtype_encoding(dtype: &ArrowDataType) -> Vec<Encoding> {
1191
let mut encodings = vec![];
1192
get_encodings_recursive(dtype, &mut encodings);
1193
encodings
1194
}
1195
1196
fn get_primitive_dtype_encoding(dtype: &ArrowDataType) -> Encoding {
1197
match dtype.to_physical_type() {
1198
PhysicalType::Dictionary(_)
1199
| PhysicalType::LargeBinary
1200
| PhysicalType::LargeUtf8
1201
| PhysicalType::Utf8View
1202
| PhysicalType::BinaryView
1203
| PhysicalType::Primitive(_) => Encoding::RleDictionary,
1204
// remaining is plain
1205
_ => Encoding::Plain,
1206
}
1207
}
1208
1209