Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/write/mod.rs
6940 views
1
//! APIs to write to Parquet format.
2
//!
3
//! # Arrow/Parquet Interoperability
4
//! As of [parquet-format v2.9](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md)
5
//! there are Arrow [DataTypes](arrow::datatypes::ArrowDataType) which do not have a parquet
6
//! representation. These include but are not limited to:
7
//! * `ArrowDataType::Timestamp(TimeUnit::Second, _)`
8
//! * `ArrowDataType::Int64`
9
//! * `ArrowDataType::Duration`
10
//! * `ArrowDataType::Date64`
11
//! * `ArrowDataType::Time32(TimeUnit::Second)`
12
//!
13
//! The use of these arrow types will result in no logical type being stored within a parquet file.
14
15
mod binary;
16
mod binview;
17
mod boolean;
18
mod dictionary;
19
mod file;
20
mod fixed_size_binary;
21
mod nested;
22
mod pages;
23
mod primitive;
24
mod row_group;
25
mod schema;
26
mod utils;
27
28
use arrow::array::*;
29
use arrow::datatypes::*;
30
use arrow::types::{NativeType, days_ms, i256};
31
pub use nested::{num_values, write_rep_and_def};
32
pub use pages::{to_leaves, to_nested, to_parquet_leaves};
33
use polars_utils::pl_str::PlSmallStr;
34
pub use utils::write_def_levels;
35
36
pub use crate::parquet::compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel};
37
pub use crate::parquet::encoding::Encoding;
38
pub use crate::parquet::metadata::{
39
Descriptor, FileMetadata, KeyValue, SchemaDescriptor, ThriftFileMetadata,
40
};
41
pub use crate::parquet::page::{CompressedDataPage, CompressedPage, Page};
42
use crate::parquet::schema::Repetition;
43
use crate::parquet::schema::types::PrimitiveType as ParquetPrimitiveType;
44
pub use crate::parquet::schema::types::{
45
FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType,
46
};
47
pub use crate::parquet::write::{
48
Compressor, DynIter, DynStreamingIterator, RowGroupIterColumns, Version, compress,
49
write_metadata_sidecar,
50
};
51
pub use crate::parquet::{FallibleStreamingIterator, fallible_streaming_iterator};
52
53
/// The statistics to write
54
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
55
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
56
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
57
pub struct StatisticsOptions {
58
pub min_value: bool,
59
pub max_value: bool,
60
pub distinct_count: bool,
61
pub null_count: bool,
62
}
63
64
impl Default for StatisticsOptions {
65
fn default() -> Self {
66
Self {
67
min_value: true,
68
max_value: true,
69
distinct_count: false,
70
null_count: true,
71
}
72
}
73
}
74
75
/// Options to encode an array
76
#[derive(Clone, Copy)]
77
pub enum EncodeNullability {
78
Required,
79
Optional,
80
}
81
82
/// Currently supported options to write to parquet
83
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84
pub struct WriteOptions {
85
/// Whether to write statistics
86
pub statistics: StatisticsOptions,
87
/// The page and file version to use
88
pub version: Version,
89
/// The compression to apply to every page
90
pub compression: CompressionOptions,
91
/// The size to flush a page, defaults to 1024 * 1024 if None
92
pub data_page_size: Option<usize>,
93
}
94
95
#[derive(Clone)]
96
pub struct ColumnWriteOptions {
97
pub field_id: Option<i32>,
98
pub metadata: Vec<KeyValue>,
99
pub required: Option<bool>,
100
pub children: ChildWriteOptions,
101
}
102
103
#[derive(Clone)]
104
pub enum ChildWriteOptions {
105
Leaf(FieldWriteOptions),
106
ListLike(Box<ListLikeFieldWriteOptions>),
107
Struct(Box<StructFieldWriteOptions>),
108
}
109
110
impl ColumnWriteOptions {
111
pub fn to_leaves<'a>(&'a self, out: &mut Vec<&'a FieldWriteOptions>) {
112
match &self.children {
113
ChildWriteOptions::Leaf(o) => out.push(o),
114
ChildWriteOptions::ListLike(o) => o.child.to_leaves(out),
115
ChildWriteOptions::Struct(o) => {
116
for o in &o.children {
117
o.to_leaves(out);
118
}
119
},
120
}
121
}
122
}
123
124
#[derive(Clone)]
125
pub struct FieldWriteOptions {
126
pub encoding: Encoding,
127
}
128
129
impl ColumnWriteOptions {
130
pub fn default_with(children: ChildWriteOptions) -> Self {
131
Self {
132
field_id: None,
133
metadata: Vec::new(),
134
required: None,
135
children,
136
}
137
}
138
}
139
140
impl FieldWriteOptions {
141
pub fn default_with_encoding(encoding: Encoding) -> Self {
142
Self { encoding }
143
}
144
145
pub fn into_default_column_write_options(self) -> ColumnWriteOptions {
146
ColumnWriteOptions::default_with(ChildWriteOptions::Leaf(self))
147
}
148
}
149
150
#[derive(Clone)]
151
pub struct ListLikeFieldWriteOptions {
152
pub child: ColumnWriteOptions,
153
}
154
155
#[derive(Clone)]
156
pub struct StructFieldWriteOptions {
157
pub children: Vec<ColumnWriteOptions>,
158
}
159
160
use arrow::compute::aggregate::estimated_bytes_size;
161
use arrow::match_integer_type;
162
pub use file::FileWriter;
163
pub use pages::{Nested, array_to_columns, arrays_to_columns};
164
use polars_error::{PolarsResult, polars_bail};
165
pub use row_group::{RowGroupIterator, row_group_iter};
166
pub use schema::{schema_to_metadata_key, to_parquet_type};
167
168
use self::pages::{FixedSizeListNested, PrimitiveNested, StructNested};
169
use crate::write::dictionary::encode_as_dictionary_optional;
170
171
impl StatisticsOptions {
172
pub fn empty() -> Self {
173
Self {
174
min_value: false,
175
max_value: false,
176
distinct_count: false,
177
null_count: false,
178
}
179
}
180
181
pub fn full() -> Self {
182
Self {
183
min_value: true,
184
max_value: true,
185
distinct_count: true,
186
null_count: true,
187
}
188
}
189
190
pub fn is_empty(&self) -> bool {
191
!(self.min_value || self.max_value || self.distinct_count || self.null_count)
192
}
193
194
pub fn is_full(&self) -> bool {
195
self.min_value && self.max_value && self.distinct_count && self.null_count
196
}
197
}
198
199
impl WriteOptions {
200
pub fn has_statistics(&self) -> bool {
201
!self.statistics.is_empty()
202
}
203
}
204
205
impl EncodeNullability {
206
const fn new(is_optional: bool) -> Self {
207
if is_optional {
208
Self::Optional
209
} else {
210
Self::Required
211
}
212
}
213
214
fn is_optional(self) -> bool {
215
matches!(self, Self::Optional)
216
}
217
}
218
219
/// returns offset and length to slice the leaf values
220
pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
221
// find the deepest recursive dremel structure as that one determines how many values we must
222
// take
223
let mut out = (0, 0);
224
for nested in nested.iter().rev() {
225
match nested {
226
Nested::LargeList(l_nested) => {
227
let start = *l_nested.offsets.first();
228
let end = *l_nested.offsets.last();
229
return (start as usize, (end - start) as usize);
230
},
231
Nested::List(l_nested) => {
232
let start = *l_nested.offsets.first();
233
let end = *l_nested.offsets.last();
234
return (start as usize, (end - start) as usize);
235
},
236
Nested::FixedSizeList(nested) => return (0, nested.length * nested.width),
237
Nested::Primitive(nested) => out = (0, nested.length),
238
Nested::Struct(_) => {},
239
}
240
}
241
out
242
}
243
244
fn decimal_length_from_precision(precision: usize) -> usize {
245
// digits = floor(log_10(2^(8*n - 1) - 1))
246
// ceil(digits) = log10(2^(8*n - 1) - 1)
247
// 10^ceil(digits) = 2^(8*n - 1) - 1
248
// 10^ceil(digits) + 1 = 2^(8*n - 1)
249
// log2(10^ceil(digits) + 1) = (8*n - 1)
250
// log2(10^ceil(digits) + 1) + 1 = 8*n
251
// (log2(10^ceil(a) + 1) + 1) / 8 = n
252
(((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
253
}
254
255
/// Creates a parquet [`SchemaDescriptor`] from a [`ArrowSchema`].
256
pub fn to_parquet_schema(
257
schema: &ArrowSchema,
258
column_options: &[ColumnWriteOptions],
259
) -> PolarsResult<SchemaDescriptor> {
260
let parquet_types = schema
261
.iter_values()
262
.zip(column_options)
263
.map(|(field, options)| to_parquet_type(field, options))
264
.collect::<PolarsResult<Vec<_>>>()?;
265
Ok(SchemaDescriptor::new(
266
PlSmallStr::from_static("root"),
267
parquet_types,
268
))
269
}
270
271
/// Slices the [`Array`] to `Box<dyn Array>` and `Vec<Nested>`.
272
pub fn slice_parquet_array(
273
primitive_array: &mut dyn Array,
274
nested: &mut [Nested],
275
mut current_offset: usize,
276
mut current_length: usize,
277
) {
278
for nested in nested.iter_mut() {
279
match nested {
280
Nested::LargeList(l_nested) => {
281
l_nested.offsets.slice(current_offset, current_length + 1);
282
if let Some(validity) = l_nested.validity.as_mut() {
283
validity.slice(current_offset, current_length)
284
};
285
286
// Update the offset/ length so that the Primitive is sliced properly.
287
current_length = l_nested.offsets.range() as usize;
288
current_offset = *l_nested.offsets.first() as usize;
289
},
290
Nested::List(l_nested) => {
291
l_nested.offsets.slice(current_offset, current_length + 1);
292
if let Some(validity) = l_nested.validity.as_mut() {
293
validity.slice(current_offset, current_length)
294
};
295
296
// Update the offset/ length so that the Primitive is sliced properly.
297
current_length = l_nested.offsets.range() as usize;
298
current_offset = *l_nested.offsets.first() as usize;
299
},
300
Nested::Struct(StructNested {
301
validity, length, ..
302
}) => {
303
*length = current_length;
304
if let Some(validity) = validity.as_mut() {
305
validity.slice(current_offset, current_length)
306
};
307
},
308
Nested::Primitive(PrimitiveNested {
309
validity, length, ..
310
}) => {
311
*length = current_length;
312
if let Some(validity) = validity.as_mut() {
313
validity.slice(current_offset, current_length)
314
};
315
primitive_array.slice(current_offset, current_length);
316
},
317
Nested::FixedSizeList(FixedSizeListNested {
318
validity,
319
length,
320
width,
321
..
322
}) => {
323
if let Some(validity) = validity.as_mut() {
324
validity.slice(current_offset, current_length)
325
};
326
*length = current_length;
327
// Update the offset/ length so that the Primitive is sliced properly.
328
current_length *= *width;
329
current_offset *= *width;
330
},
331
}
332
}
333
}
334
335
/// Get the length of [`Array`] that should be sliced.
336
pub fn get_max_length(nested: &[Nested]) -> usize {
337
let mut length = 0;
338
for nested in nested.iter() {
339
match nested {
340
Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,
341
Nested::List(l_nested) => length += l_nested.offsets.range() as usize,
342
Nested::FixedSizeList(nested) => length += nested.length * nested.width,
343
_ => {},
344
}
345
}
346
length
347
}
348
349
/// Returns an iterator of [`Page`].
350
pub fn array_to_pages(
351
primitive_array: &dyn Array,
352
type_: ParquetPrimitiveType,
353
nested: &[Nested],
354
options: WriteOptions,
355
field_options: &FieldWriteOptions,
356
) -> PolarsResult<DynIter<'static, PolarsResult<Page>>> {
357
let mut encoding = field_options.encoding;
358
if let ArrowDataType::Dictionary(key_type, _, _) = primitive_array.dtype().to_logical_type() {
359
return match_integer_type!(key_type, |$T| {
360
dictionary::array_to_pages::<$T>(
361
primitive_array.as_any().downcast_ref().unwrap(),
362
type_,
363
&nested,
364
options,
365
encoding,
366
)
367
});
368
};
369
if let Encoding::RleDictionary = encoding {
370
// Only take this path for primitive columns
371
if matches!(nested.first(), Some(Nested::Primitive(_))) {
372
if let Some(result) =
373
encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)
374
{
375
return result;
376
}
377
}
378
379
// We didn't succeed, fallback to plain
380
encoding = Encoding::Plain;
381
}
382
383
let nested = nested.to_vec();
384
385
let number_of_rows = nested[0].len();
386
387
// note: this is not correct if the array is sliced - the estimation should happen on the
388
// primitive after sliced for parquet
389
let byte_size = estimated_bytes_size(primitive_array);
390
391
const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
392
let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
393
let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size
394
let bytes_per_row = if number_of_rows == 0 {
395
0
396
} else {
397
((byte_size as f64) / (number_of_rows as f64)) as usize
398
};
399
let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
400
401
let row_iter = (0..number_of_rows)
402
.step_by(rows_per_page)
403
.map(move |offset| {
404
let length = if offset + rows_per_page > number_of_rows {
405
number_of_rows - offset
406
} else {
407
rows_per_page
408
};
409
(offset, length)
410
});
411
412
let primitive_array = primitive_array.to_boxed();
413
414
let pages = row_iter.map(move |(offset, length)| {
415
let mut right_array = primitive_array.clone();
416
let mut right_nested = nested.clone();
417
slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);
418
419
array_to_page(
420
right_array.as_ref(),
421
type_.clone(),
422
&right_nested,
423
options,
424
encoding,
425
)
426
});
427
Ok(DynIter::new(pages))
428
}
429
430
/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
431
pub fn array_to_page(
432
array: &dyn Array,
433
type_: ParquetPrimitiveType,
434
nested: &[Nested],
435
options: WriteOptions,
436
encoding: Encoding,
437
) -> PolarsResult<Page> {
438
if nested.len() == 1 {
439
// special case where validity == def levels
440
return array_to_page_simple(array, type_, options, encoding);
441
}
442
array_to_page_nested(array, type_, nested, options, encoding)
443
}
444
445
/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.
446
pub fn array_to_page_simple(
447
array: &dyn Array,
448
type_: ParquetPrimitiveType,
449
options: WriteOptions,
450
encoding: Encoding,
451
) -> PolarsResult<Page> {
452
let dtype = array.dtype();
453
454
if type_.field_info.repetition == Repetition::Required && array.null_count() > 0 {
455
polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
456
}
457
458
match dtype.to_logical_type() {
459
ArrowDataType::Boolean => boolean::array_to_page(
460
array.as_any().downcast_ref().unwrap(),
461
options,
462
type_,
463
encoding,
464
),
465
// casts below MUST match the casts done at the metadata (field -> parquet type).
466
ArrowDataType::UInt8 => {
467
return primitive::array_to_page_integer::<u8, i32>(
468
array.as_any().downcast_ref().unwrap(),
469
options,
470
type_,
471
encoding,
472
);
473
},
474
ArrowDataType::UInt16 => {
475
return primitive::array_to_page_integer::<u16, i32>(
476
array.as_any().downcast_ref().unwrap(),
477
options,
478
type_,
479
encoding,
480
);
481
},
482
ArrowDataType::UInt32 => {
483
return primitive::array_to_page_integer::<u32, i32>(
484
array.as_any().downcast_ref().unwrap(),
485
options,
486
type_,
487
encoding,
488
);
489
},
490
ArrowDataType::UInt64 => {
491
return primitive::array_to_page_integer::<u64, i64>(
492
array.as_any().downcast_ref().unwrap(),
493
options,
494
type_,
495
encoding,
496
);
497
},
498
ArrowDataType::Int8 => {
499
return primitive::array_to_page_integer::<i8, i32>(
500
array.as_any().downcast_ref().unwrap(),
501
options,
502
type_,
503
encoding,
504
);
505
},
506
ArrowDataType::Int16 => {
507
return primitive::array_to_page_integer::<i16, i32>(
508
array.as_any().downcast_ref().unwrap(),
509
options,
510
type_,
511
encoding,
512
);
513
},
514
ArrowDataType::Int32 | ArrowDataType::Date32 | ArrowDataType::Time32(_) => {
515
return primitive::array_to_page_integer::<i32, i32>(
516
array.as_any().downcast_ref().unwrap(),
517
options,
518
type_,
519
encoding,
520
);
521
},
522
ArrowDataType::Int64
523
| ArrowDataType::Date64
524
| ArrowDataType::Time64(_)
525
| ArrowDataType::Timestamp(_, _)
526
| ArrowDataType::Duration(_) => {
527
return primitive::array_to_page_integer::<i64, i64>(
528
array.as_any().downcast_ref().unwrap(),
529
options,
530
type_,
531
encoding,
532
);
533
},
534
ArrowDataType::Float32 => primitive::array_to_page_plain::<f32, f32>(
535
array.as_any().downcast_ref().unwrap(),
536
options,
537
type_,
538
),
539
ArrowDataType::Float64 => primitive::array_to_page_plain::<f64, f64>(
540
array.as_any().downcast_ref().unwrap(),
541
options,
542
type_,
543
),
544
ArrowDataType::LargeUtf8 => {
545
let array =
546
polars_compute::cast::cast(array, &ArrowDataType::LargeBinary, Default::default())
547
.unwrap();
548
return binary::array_to_page::<i64>(
549
array.as_any().downcast_ref().unwrap(),
550
options,
551
type_,
552
encoding,
553
);
554
},
555
ArrowDataType::LargeBinary => {
556
return binary::array_to_page::<i64>(
557
array.as_any().downcast_ref().unwrap(),
558
options,
559
type_,
560
encoding,
561
);
562
},
563
ArrowDataType::BinaryView => {
564
return binview::array_to_page(
565
array.as_any().downcast_ref().unwrap(),
566
options,
567
type_,
568
encoding,
569
);
570
},
571
ArrowDataType::Utf8View => {
572
let array =
573
polars_compute::cast::cast(array, &ArrowDataType::BinaryView, Default::default())
574
.unwrap();
575
return binview::array_to_page(
576
array.as_any().downcast_ref().unwrap(),
577
options,
578
type_,
579
encoding,
580
);
581
},
582
ArrowDataType::Null => {
583
let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
584
primitive::array_to_page_plain::<i32, i32>(&array, options, type_)
585
},
586
ArrowDataType::Interval(IntervalUnit::YearMonth) => {
587
let array = array
588
.as_any()
589
.downcast_ref::<PrimitiveArray<i32>>()
590
.unwrap();
591
let mut values = Vec::<u8>::with_capacity(12 * array.len());
592
array.values().iter().for_each(|x| {
593
let bytes = &x.to_le_bytes();
594
values.extend_from_slice(bytes);
595
values.extend_from_slice(&[0; 8]);
596
});
597
let array = FixedSizeBinaryArray::new(
598
ArrowDataType::FixedSizeBinary(12),
599
values.into(),
600
array.validity().cloned(),
601
);
602
let statistics = if options.has_statistics() {
603
Some(fixed_size_binary::build_statistics(
604
&array,
605
type_.clone(),
606
&options.statistics,
607
))
608
} else {
609
None
610
};
611
fixed_size_binary::array_to_page(&array, options, type_, statistics)
612
},
613
ArrowDataType::Interval(IntervalUnit::DayTime) => {
614
let array = array
615
.as_any()
616
.downcast_ref::<PrimitiveArray<days_ms>>()
617
.unwrap();
618
let mut values = Vec::<u8>::with_capacity(12 * array.len());
619
array.values().iter().for_each(|x| {
620
let bytes = &x.to_le_bytes();
621
values.extend_from_slice(&[0; 4]); // months
622
values.extend_from_slice(bytes); // days and seconds
623
});
624
let array = FixedSizeBinaryArray::new(
625
ArrowDataType::FixedSizeBinary(12),
626
values.into(),
627
array.validity().cloned(),
628
);
629
let statistics = if options.has_statistics() {
630
Some(fixed_size_binary::build_statistics(
631
&array,
632
type_.clone(),
633
&options.statistics,
634
))
635
} else {
636
None
637
};
638
fixed_size_binary::array_to_page(&array, options, type_, statistics)
639
},
640
ArrowDataType::FixedSizeBinary(_) => {
641
let array = array.as_any().downcast_ref().unwrap();
642
let statistics = if options.has_statistics() {
643
Some(fixed_size_binary::build_statistics(
644
array,
645
type_.clone(),
646
&options.statistics,
647
))
648
} else {
649
None
650
};
651
652
fixed_size_binary::array_to_page(array, options, type_, statistics)
653
},
654
ArrowDataType::Decimal256(precision, _) => {
655
let precision = *precision;
656
let array = array
657
.as_any()
658
.downcast_ref::<PrimitiveArray<i256>>()
659
.unwrap();
660
if precision <= 9 {
661
let values = array
662
.values()
663
.iter()
664
.map(|x| x.0.as_i32())
665
.collect::<Vec<_>>()
666
.into();
667
668
let array = PrimitiveArray::<i32>::new(
669
ArrowDataType::Int32,
670
values,
671
array.validity().cloned(),
672
);
673
return primitive::array_to_page_integer::<i32, i32>(
674
&array, options, type_, encoding,
675
);
676
} else if precision <= 18 {
677
let values = array
678
.values()
679
.iter()
680
.map(|x| x.0.as_i64())
681
.collect::<Vec<_>>()
682
.into();
683
684
let array = PrimitiveArray::<i64>::new(
685
ArrowDataType::Int64,
686
values,
687
array.validity().cloned(),
688
);
689
return primitive::array_to_page_integer::<i64, i64>(
690
&array, options, type_, encoding,
691
);
692
} else if precision <= 38 {
693
let size = decimal_length_from_precision(precision);
694
let statistics = if options.has_statistics() {
695
let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
696
array,
697
type_.clone(),
698
size,
699
&options.statistics,
700
);
701
Some(stats)
702
} else {
703
None
704
};
705
706
let mut values = Vec::<u8>::with_capacity(size * array.len());
707
array.values().iter().for_each(|x| {
708
let bytes = &x.0.low().to_be_bytes()[16 - size..];
709
values.extend_from_slice(bytes)
710
});
711
let array = FixedSizeBinaryArray::new(
712
ArrowDataType::FixedSizeBinary(size),
713
values.into(),
714
array.validity().cloned(),
715
);
716
fixed_size_binary::array_to_page(&array, options, type_, statistics)
717
} else {
718
let size = 32;
719
let array = array
720
.as_any()
721
.downcast_ref::<PrimitiveArray<i256>>()
722
.unwrap();
723
let statistics = if options.has_statistics() {
724
let stats = fixed_size_binary::build_statistics_decimal256(
725
array,
726
type_.clone(),
727
size,
728
&options.statistics,
729
);
730
Some(stats)
731
} else {
732
None
733
};
734
let mut values = Vec::<u8>::with_capacity(size * array.len());
735
array.values().iter().for_each(|x| {
736
let bytes = &x.to_be_bytes();
737
values.extend_from_slice(bytes)
738
});
739
let array = FixedSizeBinaryArray::new(
740
ArrowDataType::FixedSizeBinary(size),
741
values.into(),
742
array.validity().cloned(),
743
);
744
745
fixed_size_binary::array_to_page(&array, options, type_, statistics)
746
}
747
},
748
ArrowDataType::Decimal(precision, _) => {
749
let precision = *precision;
750
let array = array
751
.as_any()
752
.downcast_ref::<PrimitiveArray<i128>>()
753
.unwrap();
754
if precision <= 9 {
755
let values = array
756
.values()
757
.iter()
758
.map(|x| *x as i32)
759
.collect::<Vec<_>>()
760
.into();
761
762
let array = PrimitiveArray::<i32>::new(
763
ArrowDataType::Int32,
764
values,
765
array.validity().cloned(),
766
);
767
return primitive::array_to_page_integer::<i32, i32>(
768
&array, options, type_, encoding,
769
);
770
} else if precision <= 18 {
771
let values = array
772
.values()
773
.iter()
774
.map(|x| *x as i64)
775
.collect::<Vec<_>>()
776
.into();
777
778
let array = PrimitiveArray::<i64>::new(
779
ArrowDataType::Int64,
780
values,
781
array.validity().cloned(),
782
);
783
return primitive::array_to_page_integer::<i64, i64>(
784
&array, options, type_, encoding,
785
);
786
} else {
787
let size = decimal_length_from_precision(precision);
788
789
let statistics = if options.has_statistics() {
790
let stats = fixed_size_binary::build_statistics_decimal(
791
array,
792
type_.clone(),
793
size,
794
&options.statistics,
795
);
796
Some(stats)
797
} else {
798
None
799
};
800
801
let mut values = Vec::<u8>::with_capacity(size * array.len());
802
array.values().iter().for_each(|x| {
803
let bytes = &x.to_be_bytes()[16 - size..];
804
values.extend_from_slice(bytes)
805
});
806
let array = FixedSizeBinaryArray::new(
807
ArrowDataType::FixedSizeBinary(size),
808
values.into(),
809
array.validity().cloned(),
810
);
811
fixed_size_binary::array_to_page(&array, options, type_, statistics)
812
}
813
},
814
ArrowDataType::Int128 => {
815
let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
816
let statistics = if options.has_statistics() {
817
let stats = fixed_size_binary::build_statistics_decimal(
818
array,
819
type_.clone(),
820
16,
821
&options.statistics,
822
);
823
Some(stats)
824
} else {
825
None
826
};
827
let array = FixedSizeBinaryArray::new(
828
ArrowDataType::FixedSizeBinary(16),
829
array.values().clone().try_transmute().unwrap(),
830
array.validity().cloned(),
831
);
832
fixed_size_binary::array_to_page(&array, options, type_, statistics)
833
},
834
other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),
835
}
836
.map(Page::Data)
837
}
838
839
fn array_to_page_nested(
840
array: &dyn Array,
841
type_: ParquetPrimitiveType,
842
nested: &[Nested],
843
options: WriteOptions,
844
_encoding: Encoding,
845
) -> PolarsResult<Page> {
846
if type_.field_info.repetition == Repetition::Required
847
&& array.validity().is_some_and(|v| v.unset_bits() > 0)
848
{
849
polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);
850
}
851
852
use ArrowDataType::*;
853
match array.dtype().to_logical_type() {
854
Null => {
855
let array = Int32Array::new_null(ArrowDataType::Int32, array.len());
856
primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
857
},
858
Boolean => {
859
let array = array.as_any().downcast_ref().unwrap();
860
boolean::nested_array_to_page(array, options, type_, nested)
861
},
862
LargeUtf8 => {
863
let array =
864
polars_compute::cast::cast(array, &LargeBinary, Default::default()).unwrap();
865
let array = array.as_any().downcast_ref().unwrap();
866
binary::nested_array_to_page::<i64>(array, options, type_, nested)
867
},
868
LargeBinary => {
869
let array = array.as_any().downcast_ref().unwrap();
870
binary::nested_array_to_page::<i64>(array, options, type_, nested)
871
},
872
BinaryView => {
873
let array = array.as_any().downcast_ref().unwrap();
874
binview::nested_array_to_page(array, options, type_, nested)
875
},
876
Utf8View => {
877
let array = polars_compute::cast::cast(array, &BinaryView, Default::default()).unwrap();
878
let array = array.as_any().downcast_ref().unwrap();
879
binview::nested_array_to_page(array, options, type_, nested)
880
},
881
UInt8 => {
882
let array = array.as_any().downcast_ref().unwrap();
883
primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)
884
},
885
UInt16 => {
886
let array = array.as_any().downcast_ref().unwrap();
887
primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)
888
},
889
UInt32 => {
890
let array = array.as_any().downcast_ref().unwrap();
891
primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)
892
},
893
UInt64 => {
894
let array = array.as_any().downcast_ref().unwrap();
895
primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)
896
},
897
Int8 => {
898
let array = array.as_any().downcast_ref().unwrap();
899
primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)
900
},
901
Int16 => {
902
let array = array.as_any().downcast_ref().unwrap();
903
primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)
904
},
905
Int32 | Date32 | Time32(_) => {
906
let array = array.as_any().downcast_ref().unwrap();
907
primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)
908
},
909
Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {
910
let array = array.as_any().downcast_ref().unwrap();
911
primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)
912
},
913
Float32 => {
914
let array = array.as_any().downcast_ref().unwrap();
915
primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)
916
},
917
Float64 => {
918
let array = array.as_any().downcast_ref().unwrap();
919
primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)
920
},
921
Decimal(precision, _) => {
922
let precision = *precision;
923
let array = array
924
.as_any()
925
.downcast_ref::<PrimitiveArray<i128>>()
926
.unwrap();
927
if precision <= 9 {
928
let values = array
929
.values()
930
.iter()
931
.map(|x| *x as i32)
932
.collect::<Vec<_>>()
933
.into();
934
935
let array = PrimitiveArray::<i32>::new(
936
ArrowDataType::Int32,
937
values,
938
array.validity().cloned(),
939
);
940
primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
941
} else if precision <= 18 {
942
let values = array
943
.values()
944
.iter()
945
.map(|x| *x as i64)
946
.collect::<Vec<_>>()
947
.into();
948
949
let array = PrimitiveArray::<i64>::new(
950
ArrowDataType::Int64,
951
values,
952
array.validity().cloned(),
953
);
954
primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
955
} else {
956
let size = decimal_length_from_precision(precision);
957
958
let statistics = if options.has_statistics() {
959
let stats = fixed_size_binary::build_statistics_decimal(
960
array,
961
type_.clone(),
962
size,
963
&options.statistics,
964
);
965
Some(stats)
966
} else {
967
None
968
};
969
970
let mut values = Vec::<u8>::with_capacity(size * array.len());
971
array.values().iter().for_each(|x| {
972
let bytes = &x.to_be_bytes()[16 - size..];
973
values.extend_from_slice(bytes)
974
});
975
let array = FixedSizeBinaryArray::new(
976
ArrowDataType::FixedSizeBinary(size),
977
values.into(),
978
array.validity().cloned(),
979
);
980
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
981
}
982
},
983
Decimal256(precision, _) => {
984
let precision = *precision;
985
let array = array
986
.as_any()
987
.downcast_ref::<PrimitiveArray<i256>>()
988
.unwrap();
989
if precision <= 9 {
990
let values = array
991
.values()
992
.iter()
993
.map(|x| x.0.as_i32())
994
.collect::<Vec<_>>()
995
.into();
996
997
let array = PrimitiveArray::<i32>::new(
998
ArrowDataType::Int32,
999
values,
1000
array.validity().cloned(),
1001
);
1002
primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)
1003
} else if precision <= 18 {
1004
let values = array
1005
.values()
1006
.iter()
1007
.map(|x| x.0.as_i64())
1008
.collect::<Vec<_>>()
1009
.into();
1010
1011
let array = PrimitiveArray::<i64>::new(
1012
ArrowDataType::Int64,
1013
values,
1014
array.validity().cloned(),
1015
);
1016
primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)
1017
} else if precision <= 38 {
1018
let size = decimal_length_from_precision(precision);
1019
let statistics = if options.has_statistics() {
1020
let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
1021
array,
1022
type_.clone(),
1023
size,
1024
&options.statistics,
1025
);
1026
Some(stats)
1027
} else {
1028
None
1029
};
1030
1031
let mut values = Vec::<u8>::with_capacity(size * array.len());
1032
array.values().iter().for_each(|x| {
1033
let bytes = &x.0.low().to_be_bytes()[16 - size..];
1034
values.extend_from_slice(bytes)
1035
});
1036
let array = FixedSizeBinaryArray::new(
1037
ArrowDataType::FixedSizeBinary(size),
1038
values.into(),
1039
array.validity().cloned(),
1040
);
1041
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1042
} else {
1043
let size = 32;
1044
let array = array
1045
.as_any()
1046
.downcast_ref::<PrimitiveArray<i256>>()
1047
.unwrap();
1048
let statistics = if options.has_statistics() {
1049
let stats = fixed_size_binary::build_statistics_decimal256(
1050
array,
1051
type_.clone(),
1052
size,
1053
&options.statistics,
1054
);
1055
Some(stats)
1056
} else {
1057
None
1058
};
1059
let mut values = Vec::<u8>::with_capacity(size * array.len());
1060
array.values().iter().for_each(|x| {
1061
let bytes = &x.to_be_bytes();
1062
values.extend_from_slice(bytes)
1063
});
1064
let array = FixedSizeBinaryArray::new(
1065
ArrowDataType::FixedSizeBinary(size),
1066
values.into(),
1067
array.validity().cloned(),
1068
);
1069
1070
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1071
}
1072
},
1073
Int128 => {
1074
let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();
1075
let statistics = if options.has_statistics() {
1076
let stats = fixed_size_binary::build_statistics_decimal(
1077
array,
1078
type_.clone(),
1079
16,
1080
&options.statistics,
1081
);
1082
Some(stats)
1083
} else {
1084
None
1085
};
1086
let array = FixedSizeBinaryArray::new(
1087
ArrowDataType::FixedSizeBinary(16),
1088
array.values().clone().try_transmute().unwrap(),
1089
array.validity().cloned(),
1090
);
1091
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
1092
},
1093
other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),
1094
}
1095
.map(Page::Data)
1096
}
1097
1098
fn transverse_recursive<T, F: Fn(&ArrowDataType) -> T + Clone>(
1099
dtype: &ArrowDataType,
1100
map: F,
1101
encodings: &mut Vec<T>,
1102
) {
1103
use arrow::datatypes::PhysicalType::*;
1104
match dtype.to_physical_type() {
1105
Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
1106
| Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => encodings.push(map(dtype)),
1107
List | FixedSizeList | LargeList => {
1108
let a = dtype.to_logical_type();
1109
if let ArrowDataType::List(inner) = a {
1110
transverse_recursive(&inner.dtype, map, encodings)
1111
} else if let ArrowDataType::LargeList(inner) = a {
1112
transverse_recursive(&inner.dtype, map, encodings)
1113
} else if let ArrowDataType::FixedSizeList(inner, _) = a {
1114
transverse_recursive(&inner.dtype, map, encodings)
1115
} else {
1116
unreachable!()
1117
}
1118
},
1119
Struct => {
1120
if let ArrowDataType::Struct(fields) = dtype.to_logical_type() {
1121
for field in fields {
1122
transverse_recursive(&field.dtype, map.clone(), encodings)
1123
}
1124
} else {
1125
unreachable!()
1126
}
1127
},
1128
Map => {
1129
if let ArrowDataType::Map(field, _) = dtype.to_logical_type() {
1130
if let ArrowDataType::Struct(fields) = field.dtype.to_logical_type() {
1131
for field in fields {
1132
transverse_recursive(&field.dtype, map.clone(), encodings)
1133
}
1134
} else {
1135
unreachable!()
1136
}
1137
} else {
1138
unreachable!()
1139
}
1140
},
1141
Union => todo!(),
1142
}
1143
}
1144
1145
/// Transverses the `dtype` up to its (parquet) columns and returns a vector of
1146
/// items based on `map`.
1147
///
1148
/// This is used to assign an [`Encoding`] to every parquet column based on the columns' type (see example)
1149
pub fn transverse<T, F: Fn(&ArrowDataType) -> T + Clone>(dtype: &ArrowDataType, map: F) -> Vec<T> {
1150
let mut encodings = vec![];
1151
transverse_recursive(dtype, map, &mut encodings);
1152
encodings
1153
}
1154
1155