Path: blob/main/crates/polars-parquet/src/arrow/write/mod.rs
8460 views
//! APIs to write to Parquet format.1//!2//! # Arrow/Parquet Interoperability3//! As of [parquet-format v2.9](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md)4//! there are Arrow [DataTypes](arrow::datatypes::ArrowDataType) which do not have a parquet5//! representation. These include but are not limited to:6//! * `ArrowDataType::Timestamp(TimeUnit::Second, _)`7//! * `ArrowDataType::Int64`8//! * `ArrowDataType::Duration`9//! * `ArrowDataType::Date64`10//! * `ArrowDataType::Time32(TimeUnit::Second)`11//!12//! The use of these arrow types will result in no logical type being stored within a parquet file.1314mod binary;15mod binview;16mod boolean;17mod dictionary;18mod file;19mod fixed_size_binary;20mod nested;21mod pages;22mod primitive;23mod row_group;24mod schema;25mod utils;2627use arrow::array::*;28use arrow::bitmap::Bitmap;29use arrow::datatypes::*;30use arrow::types::{NativeType, days_ms, i256};31pub use nested::{num_values, write_rep_and_def};32pub use pages::{to_leaves, to_nested, to_parquet_leaves};33use polars_utils::float16::pf16;34use polars_utils::pl_str::PlSmallStr;35pub use utils::write_def_levels;3637pub use crate::parquet::compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel};38pub use crate::parquet::encoding::Encoding;39pub use crate::parquet::metadata::{40Descriptor, FileMetadata, KeyValue, SchemaDescriptor, ThriftFileMetadata,41};42pub use crate::parquet::page::{CompressedDataPage, CompressedPage, Page};43use crate::parquet::schema::Repetition;44use crate::parquet::schema::types::PrimitiveType as ParquetPrimitiveType;45pub use crate::parquet::schema::types::{46FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType,47};48pub use crate::parquet::write::{49Compressor, DynIter, DynStreamingIterator, RowGroupIterColumns, Version, compress,50write_metadata_sidecar,51};52pub use crate::parquet::{FallibleStreamingIterator, fallible_streaming_iterator};53use crate::write::fixed_size_binary::build_statistics_float16;5455/// The statistics to write56#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]57#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]58#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]59pub struct StatisticsOptions {60pub min_value: bool,61pub max_value: bool,62pub distinct_count: bool,63pub null_count: bool,64}6566impl Default for StatisticsOptions {67fn default() -> Self {68Self {69min_value: true,70max_value: true,71distinct_count: false,72null_count: true,73}74}75}7677/// Options to encode an array78#[derive(Clone, Copy)]79pub enum EncodeNullability {80Required,81Optional,82}8384/// Currently supported options to write to parquet85#[derive(Debug, Clone, Copy, PartialEq, Eq)]86pub struct WriteOptions {87/// Whether to write statistics88pub statistics: StatisticsOptions,89/// The page and file version to use90pub version: Version,91/// The compression to apply to every page92pub compression: CompressionOptions,93/// The size to flush a page, defaults to 1024 * 1024 if None94pub data_page_size: Option<usize>,95}9697use arrow::compute::aggregate::estimated_bytes_size;98use arrow::match_integer_type;99pub use file::FileWriter;100pub use pages::{Nested, array_to_columns, arrays_to_columns};101use polars_error::{PolarsResult, polars_bail};102pub use row_group::{RowGroupIterator, row_group_iter};103pub use schema::{schema_to_metadata_key, to_parquet_type};104105use self::pages::{FixedSizeListNested, PrimitiveNested, StructNested};106use crate::write::dictionary::encode_as_dictionary_optional;107108impl StatisticsOptions {109pub fn empty() -> Self {110Self {111min_value: false,112max_value: false,113distinct_count: false,114null_count: false,115}116}117118pub fn full() -> Self {119Self {120min_value: true,121max_value: true,122distinct_count: true,123null_count: true,124}125}126127pub fn is_empty(&self) -> bool {128!(self.min_value || self.max_value || self.distinct_count || self.null_count)129}130131pub fn is_full(&self) -> bool {132self.min_value && self.max_value && self.distinct_count && self.null_count133}134}135136impl WriteOptions {137pub fn has_statistics(&self) -> bool {138!self.statistics.is_empty()139}140}141142impl EncodeNullability {143const fn new(is_optional: bool) -> Self {144if is_optional {145Self::Optional146} else {147Self::Required148}149}150151fn is_optional(self) -> bool {152matches!(self, Self::Optional)153}154}155156/// `data_page_size`: Set a target threshold for the approximate encoded size of data157/// pages within a column chunk (in bytes). If None, use the default data page size of 1MByte.158/// See: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html159pub(crate) fn row_slice_ranges(160number_of_rows: usize,161byte_size: usize,162options: WriteOptions,163) -> impl Iterator<Item = (usize, usize)> {164const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; // 1 MB165let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);166let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size167168let bytes_per_row = if number_of_rows == 0 {1690170} else {171((byte_size as f64) / (number_of_rows as f64)) as usize172};173let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);174175(0..number_of_rows)176.step_by(rows_per_page)177.map(move |offset| {178let length = (offset + rows_per_page).min(number_of_rows) - offset;179(offset, length)180})181}182183/// returns offset and length to slice the leaf values184pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {185// find the deepest recursive dremel structure as that one determines how many values we must186// take187let mut out = (0, 0);188for nested in nested.iter().rev() {189match nested {190Nested::LargeList(l_nested) => {191let start = *l_nested.offsets.first();192let end = *l_nested.offsets.last();193return (start as usize, (end - start) as usize);194},195Nested::List(l_nested) => {196let start = *l_nested.offsets.first();197let end = *l_nested.offsets.last();198return (start as usize, (end - start) as usize);199},200Nested::FixedSizeList(nested) => return (0, nested.length * nested.width),201Nested::Primitive(nested) => out = (0, nested.length),202Nested::Struct(_) => {},203}204}205out206}207208fn decimal_length_from_precision(precision: usize) -> usize {209// digits = floor(log_10(2^(8*n - 1) - 1))210// ceil(digits) = log10(2^(8*n - 1) - 1)211// 10^ceil(digits) = 2^(8*n - 1) - 1212// 10^ceil(digits) + 1 = 2^(8*n - 1)213// log2(10^ceil(digits) + 1) = (8*n - 1)214// log2(10^ceil(digits) + 1) + 1 = 8*n215// (log2(10^ceil(a) + 1) + 1) / 8 = n216(((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize217}218219/// Creates a parquet [`SchemaDescriptor`] from a [`ArrowSchema`].220pub fn to_parquet_schema(schema: &ArrowSchema) -> PolarsResult<SchemaDescriptor> {221let parquet_types = schema222.iter_values()223.map(to_parquet_type)224.collect::<PolarsResult<Vec<_>>>()?;225Ok(SchemaDescriptor::new(226PlSmallStr::from_static("root"),227parquet_types,228))229}230231/// Slices the [`Array`] to `Box<dyn Array>` and `Vec<Nested>`.232pub fn slice_parquet_array(233primitive_array: &mut dyn Array,234nested: &mut [Nested],235mut current_offset: usize,236mut current_length: usize,237) {238for nested in nested.iter_mut() {239match nested {240Nested::LargeList(l_nested) => {241l_nested.offsets.slice(current_offset, current_length + 1);242if let Some(validity) = l_nested.validity.as_mut() {243validity.slice(current_offset, current_length)244};245246// Update the offset/ length so that the Primitive is sliced properly.247current_length = l_nested.offsets.range() as usize;248current_offset = *l_nested.offsets.first() as usize;249},250Nested::List(l_nested) => {251l_nested.offsets.slice(current_offset, current_length + 1);252if let Some(validity) = l_nested.validity.as_mut() {253validity.slice(current_offset, current_length)254};255256// Update the offset/ length so that the Primitive is sliced properly.257current_length = l_nested.offsets.range() as usize;258current_offset = *l_nested.offsets.first() as usize;259},260Nested::Struct(StructNested {261validity, length, ..262}) => {263*length = current_length;264if let Some(validity) = validity.as_mut() {265validity.slice(current_offset, current_length)266};267},268Nested::Primitive(PrimitiveNested {269validity, length, ..270}) => {271*length = current_length;272if let Some(validity) = validity.as_mut() {273validity.slice(current_offset, current_length)274};275primitive_array.slice(current_offset, current_length);276},277Nested::FixedSizeList(FixedSizeListNested {278validity,279length,280width,281..282}) => {283if let Some(validity) = validity.as_mut() {284validity.slice(current_offset, current_length)285};286*length = current_length;287// Update the offset/ length so that the Primitive is sliced properly.288current_length *= *width;289current_offset *= *width;290},291}292}293}294295/// Get the length of [`Array`] that should be sliced.296pub fn get_max_length(nested: &[Nested]) -> usize {297let mut length = 0;298for nested in nested.iter() {299match nested {300Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,301Nested::List(l_nested) => length += l_nested.offsets.range() as usize,302Nested::FixedSizeList(nested) => length += nested.length * nested.width,303_ => {},304}305}306length307}308309/// Returns an iterator of [`Page`].310pub fn array_to_pages(311primitive_array: &dyn Array,312type_: ParquetPrimitiveType,313nested: &[Nested],314options: WriteOptions,315mut encoding: Encoding,316) -> PolarsResult<DynIter<'static, PolarsResult<Page>>> {317if let ArrowDataType::Dictionary(key_type, _, _) = primitive_array.dtype().to_storage() {318return match_integer_type!(key_type, |$T| {319dictionary::array_to_pages::<$T>(320primitive_array.as_any().downcast_ref().unwrap(),321type_,322&nested,323options,324encoding,325)326});327};328if let Encoding::RleDictionary = encoding {329// Only take this path for primitive columns330if matches!(nested.first(), Some(Nested::Primitive(_))) {331if let Some(result) =332encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)333{334return result;335}336}337338// We didn't succeed, fallback to plain339encoding = Encoding::Plain;340}341342let nested = nested.to_vec();343let number_of_rows = nested[0].len();344// note: this is not correct if the array is sliced - the estimation should happen on the345// primitive after sliced for parquet346let byte_size = estimated_bytes_size(primitive_array);347let primitive_array = primitive_array.to_boxed();348349let pages =350row_slice_ranges(number_of_rows, byte_size, options).map(move |(offset, length)| {351let mut right_array = primitive_array.clone();352let mut right_nested = nested.clone();353slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);354355array_to_page(356right_array.as_ref(),357type_.clone(),358&right_nested,359options,360encoding,361)362});363Ok(DynIter::new(pages))364}365366/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.367pub fn array_to_page(368array: &dyn Array,369type_: ParquetPrimitiveType,370nested: &[Nested],371options: WriteOptions,372encoding: Encoding,373) -> PolarsResult<Page> {374if nested.len() == 1 {375// special case where validity == def levels376return array_to_page_simple(array, type_, options, encoding);377}378array_to_page_nested(array, type_, nested, options, encoding)379}380381/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.382pub fn array_to_page_simple(383array: &dyn Array,384type_: ParquetPrimitiveType,385options: WriteOptions,386encoding: Encoding,387) -> PolarsResult<Page> {388let dtype = array.dtype();389390if type_.field_info.repetition == Repetition::Required && array.null_count() > 0 {391polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);392}393394match dtype {395// Map empty struct to boolean array with same validity.396ArrowDataType::Struct(fs) if fs.is_empty() => boolean::array_to_page(397&BooleanArray::new(398ArrowDataType::Boolean,399Bitmap::new_zeroed(array.len()),400array.validity().cloned(),401),402options,403type_,404encoding,405),406407ArrowDataType::Boolean => boolean::array_to_page(408array.as_any().downcast_ref().unwrap(),409options,410type_,411encoding,412),413// casts below MUST match the casts done at the metadata (field -> parquet type).414ArrowDataType::UInt8 => {415return primitive::array_to_page_integer::<u8, i32>(416array.as_any().downcast_ref().unwrap(),417options,418type_,419encoding,420);421},422ArrowDataType::UInt16 => {423return primitive::array_to_page_integer::<u16, i32>(424array.as_any().downcast_ref().unwrap(),425options,426type_,427encoding,428);429},430ArrowDataType::UInt32 => {431return primitive::array_to_page_integer::<u32, i32>(432array.as_any().downcast_ref().unwrap(),433options,434type_,435encoding,436);437},438ArrowDataType::UInt64 => {439return primitive::array_to_page_integer::<u64, i64>(440array.as_any().downcast_ref().unwrap(),441options,442type_,443encoding,444);445},446ArrowDataType::Int8 => {447return primitive::array_to_page_integer::<i8, i32>(448array.as_any().downcast_ref().unwrap(),449options,450type_,451encoding,452);453},454ArrowDataType::Int16 => {455return primitive::array_to_page_integer::<i16, i32>(456array.as_any().downcast_ref().unwrap(),457options,458type_,459encoding,460);461},462ArrowDataType::Int32 | ArrowDataType::Date32 | ArrowDataType::Time32(_) => {463return primitive::array_to_page_integer::<i32, i32>(464array.as_any().downcast_ref().unwrap(),465options,466type_,467encoding,468);469},470ArrowDataType::Int64471| ArrowDataType::Date64472| ArrowDataType::Time64(_)473| ArrowDataType::Timestamp(_, _)474| ArrowDataType::Duration(_) => {475return primitive::array_to_page_integer::<i64, i64>(476array.as_any().downcast_ref().unwrap(),477options,478type_,479encoding,480);481},482ArrowDataType::Float16 => {483let array: &PrimitiveArray<pf16> = array.as_any().downcast_ref().unwrap();484let statistics = options485.has_statistics()486.then(|| build_statistics_float16(array, type_.clone(), &options.statistics));487let array = FixedSizeBinaryArray::new(488ArrowDataType::FixedSizeBinary(2),489array.values().clone().try_transmute().unwrap(),490array.validity().cloned(),491);492fixed_size_binary::array_to_page(&array, options, type_, statistics)493},494ArrowDataType::Float32 => primitive::array_to_page_plain::<f32, f32>(495array.as_any().downcast_ref().unwrap(),496options,497type_,498),499ArrowDataType::Float64 => primitive::array_to_page_plain::<f64, f64>(500array.as_any().downcast_ref().unwrap(),501options,502type_,503),504ArrowDataType::LargeUtf8 => {505let array =506polars_compute::cast::cast(array, &ArrowDataType::LargeBinary, Default::default())507.unwrap();508return binary::array_to_page::<i64>(509array.as_any().downcast_ref().unwrap(),510options,511type_,512encoding,513);514},515ArrowDataType::LargeBinary => {516return binary::array_to_page::<i64>(517array.as_any().downcast_ref().unwrap(),518options,519type_,520encoding,521);522},523ArrowDataType::BinaryView => {524return binview::array_to_page(525array.as_any().downcast_ref().unwrap(),526options,527type_,528encoding,529);530},531ArrowDataType::Utf8View => {532let array =533polars_compute::cast::cast(array, &ArrowDataType::BinaryView, Default::default())534.unwrap();535return binview::array_to_page(536array.as_any().downcast_ref().unwrap(),537options,538type_,539encoding,540);541},542ArrowDataType::Null => {543let array = Int32Array::new_null(ArrowDataType::Int32, array.len());544primitive::array_to_page_plain::<i32, i32>(&array, options, type_)545},546ArrowDataType::Interval(IntervalUnit::YearMonth) => {547let array = array548.as_any()549.downcast_ref::<PrimitiveArray<i32>>()550.unwrap();551let mut values = Vec::<u8>::with_capacity(12 * array.len());552array.values().iter().for_each(|x| {553let bytes = &x.to_le_bytes();554values.extend_from_slice(bytes);555values.extend_from_slice(&[0; 8]);556});557let array = FixedSizeBinaryArray::new(558ArrowDataType::FixedSizeBinary(12),559values.into(),560array.validity().cloned(),561);562let statistics = if options.has_statistics() {563Some(fixed_size_binary::build_statistics(564&array,565type_.clone(),566&options.statistics,567))568} else {569None570};571fixed_size_binary::array_to_page(&array, options, type_, statistics)572},573ArrowDataType::Interval(IntervalUnit::DayTime) => {574let array = array575.as_any()576.downcast_ref::<PrimitiveArray<days_ms>>()577.unwrap();578let mut values = Vec::<u8>::with_capacity(12 * array.len());579array.values().iter().for_each(|x| {580let bytes = &x.to_le_bytes();581values.extend_from_slice(&[0; 4]); // months582values.extend_from_slice(bytes); // days and seconds583});584let array = FixedSizeBinaryArray::new(585ArrowDataType::FixedSizeBinary(12),586values.into(),587array.validity().cloned(),588);589let statistics = if options.has_statistics() {590Some(fixed_size_binary::build_statistics(591&array,592type_.clone(),593&options.statistics,594))595} else {596None597};598fixed_size_binary::array_to_page(&array, options, type_, statistics)599},600ArrowDataType::FixedSizeBinary(_) => {601let array = array.as_any().downcast_ref().unwrap();602let statistics = if options.has_statistics() {603Some(fixed_size_binary::build_statistics(604array,605type_.clone(),606&options.statistics,607))608} else {609None610};611612fixed_size_binary::array_to_page(array, options, type_, statistics)613},614ArrowDataType::Decimal256(precision, _) => {615let precision = *precision;616let array = array617.as_any()618.downcast_ref::<PrimitiveArray<i256>>()619.unwrap();620if precision <= 9 {621let values = array622.values()623.iter()624.map(|x| x.0.as_i32())625.collect::<Vec<_>>()626.into();627628let array = PrimitiveArray::<i32>::new(629ArrowDataType::Int32,630values,631array.validity().cloned(),632);633return primitive::array_to_page_integer::<i32, i32>(634&array, options, type_, encoding,635);636} else if precision <= 18 {637let values = array638.values()639.iter()640.map(|x| x.0.as_i64())641.collect::<Vec<_>>()642.into();643644let array = PrimitiveArray::<i64>::new(645ArrowDataType::Int64,646values,647array.validity().cloned(),648);649return primitive::array_to_page_integer::<i64, i64>(650&array, options, type_, encoding,651);652} else if precision <= 38 {653let size = decimal_length_from_precision(precision);654let statistics = if options.has_statistics() {655let stats = fixed_size_binary::build_statistics_decimal256_with_i128(656array,657type_.clone(),658size,659&options.statistics,660);661Some(stats)662} else {663None664};665666let mut values = Vec::<u8>::with_capacity(size * array.len());667array.values().iter().for_each(|x| {668let bytes = &x.0.low().to_be_bytes()[16 - size..];669values.extend_from_slice(bytes)670});671let array = FixedSizeBinaryArray::new(672ArrowDataType::FixedSizeBinary(size),673values.into(),674array.validity().cloned(),675);676fixed_size_binary::array_to_page(&array, options, type_, statistics)677} else {678let size = 32;679let array = array680.as_any()681.downcast_ref::<PrimitiveArray<i256>>()682.unwrap();683let statistics = if options.has_statistics() {684let stats = fixed_size_binary::build_statistics_decimal256(685array,686type_.clone(),687size,688&options.statistics,689);690Some(stats)691} else {692None693};694let mut values = Vec::<u8>::with_capacity(size * array.len());695array.values().iter().for_each(|x| {696let bytes = &x.to_be_bytes();697values.extend_from_slice(bytes)698});699let array = FixedSizeBinaryArray::new(700ArrowDataType::FixedSizeBinary(size),701values.into(),702array.validity().cloned(),703);704705fixed_size_binary::array_to_page(&array, options, type_, statistics)706}707},708ArrowDataType::Decimal(precision, _) => {709let precision = *precision;710let array = array711.as_any()712.downcast_ref::<PrimitiveArray<i128>>()713.unwrap();714if precision <= 9 {715let values = array716.values()717.iter()718.map(|x| *x as i32)719.collect::<Vec<_>>()720.into();721722let array = PrimitiveArray::<i32>::new(723ArrowDataType::Int32,724values,725array.validity().cloned(),726);727return primitive::array_to_page_integer::<i32, i32>(728&array, options, type_, encoding,729);730} else if precision <= 18 {731let values = array732.values()733.iter()734.map(|x| *x as i64)735.collect::<Vec<_>>()736.into();737738let array = PrimitiveArray::<i64>::new(739ArrowDataType::Int64,740values,741array.validity().cloned(),742);743return primitive::array_to_page_integer::<i64, i64>(744&array, options, type_, encoding,745);746} else {747let size = decimal_length_from_precision(precision);748749let statistics = if options.has_statistics() {750let stats = fixed_size_binary::build_statistics_decimal(751array,752type_.clone(),753size,754&options.statistics,755);756Some(stats)757} else {758None759};760761let mut values = Vec::<u8>::with_capacity(size * array.len());762array.values().iter().for_each(|x| {763let bytes = &x.to_be_bytes()[16 - size..];764values.extend_from_slice(bytes)765});766let array = FixedSizeBinaryArray::new(767ArrowDataType::FixedSizeBinary(size),768values.into(),769array.validity().cloned(),770);771fixed_size_binary::array_to_page(&array, options, type_, statistics)772}773},774ArrowDataType::UInt128 => {775let array: &PrimitiveArray<u128> = array.as_any().downcast_ref().unwrap();776let statistics = if options.has_statistics() {777let stats = fixed_size_binary::build_statistics_decimal(778array,779type_.clone(),78016,781&options.statistics,782);783Some(stats)784} else {785None786};787let array = FixedSizeBinaryArray::new(788ArrowDataType::FixedSizeBinary(16),789array.values().clone().try_transmute().unwrap(),790array.validity().cloned(),791);792fixed_size_binary::array_to_page(&array, options, type_, statistics)793},794ArrowDataType::Int128 => {795let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();796let statistics = if options.has_statistics() {797let stats = fixed_size_binary::build_statistics_decimal(798array,799type_.clone(),80016,801&options.statistics,802);803Some(stats)804} else {805None806};807let array = FixedSizeBinaryArray::new(808ArrowDataType::FixedSizeBinary(16),809array.values().clone().try_transmute().unwrap(),810array.validity().cloned(),811);812fixed_size_binary::array_to_page(&array, options, type_, statistics)813},814ArrowDataType::Extension(ext) => {815let mut boxed = array.to_boxed();816assert!(matches!(boxed.dtype(), ArrowDataType::Extension(ext2) if ext2 == ext));817*boxed.dtype_mut() = ext.inner.clone();818return array_to_page_simple(boxed.as_ref(), type_, options, encoding);819},820other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),821}822.map(Page::Data)823}824825fn array_to_page_nested(826array: &dyn Array,827type_: ParquetPrimitiveType,828nested: &[Nested],829options: WriteOptions,830_encoding: Encoding,831) -> PolarsResult<Page> {832if type_.field_info.repetition == Repetition::Required833&& array.validity().is_some_and(|v| v.unset_bits() > 0)834{835polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);836}837838use ArrowDataType::*;839match array.dtype().to_storage() {840Null => {841let array = Int32Array::new_null(ArrowDataType::Int32, array.len());842primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)843},844// Map empty struct to boolean array with same validity.845Struct(fs) if fs.is_empty() => {846let array = BooleanArray::new(847ArrowDataType::Boolean,848Bitmap::new_zeroed(array.len()),849array.validity().cloned(),850);851boolean::nested_array_to_page(&array, options, type_, nested)852},853Boolean => {854let array = array.as_any().downcast_ref().unwrap();855boolean::nested_array_to_page(array, options, type_, nested)856},857LargeUtf8 => {858let array =859polars_compute::cast::cast(array, &LargeBinary, Default::default()).unwrap();860let array = array.as_any().downcast_ref().unwrap();861binary::nested_array_to_page::<i64>(array, options, type_, nested)862},863LargeBinary => {864let array = array.as_any().downcast_ref().unwrap();865binary::nested_array_to_page::<i64>(array, options, type_, nested)866},867BinaryView => {868let array = array.as_any().downcast_ref().unwrap();869binview::nested_array_to_page(array, options, type_, nested)870},871Utf8View => {872let array = polars_compute::cast::cast(array, &BinaryView, Default::default()).unwrap();873let array = array.as_any().downcast_ref().unwrap();874binview::nested_array_to_page(array, options, type_, nested)875},876UInt8 => {877let array = array.as_any().downcast_ref().unwrap();878primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)879},880UInt16 => {881let array = array.as_any().downcast_ref().unwrap();882primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)883},884UInt32 => {885let array = array.as_any().downcast_ref().unwrap();886primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)887},888UInt64 => {889let array = array.as_any().downcast_ref().unwrap();890primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)891},892Int8 => {893let array = array.as_any().downcast_ref().unwrap();894primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)895},896Int16 => {897let array = array.as_any().downcast_ref().unwrap();898primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)899},900Int32 | Date32 | Time32(_) => {901let array = array.as_any().downcast_ref().unwrap();902primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)903},904Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {905let array = array.as_any().downcast_ref().unwrap();906primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)907},908Float16 => {909let array: &PrimitiveArray<pf16> = array.as_any().downcast_ref().unwrap();910let statistics = options911.has_statistics()912.then(|| build_statistics_float16(array, type_.clone(), &options.statistics));913let array = FixedSizeBinaryArray::new(914ArrowDataType::FixedSizeBinary(2),915array.values().clone().try_transmute().unwrap(),916array.validity().cloned(),917);918fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)919},920Float32 => {921let array = array.as_any().downcast_ref().unwrap();922primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)923},924Float64 => {925let array = array.as_any().downcast_ref().unwrap();926primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)927},928Decimal(precision, _) => {929let precision = *precision;930let array = array931.as_any()932.downcast_ref::<PrimitiveArray<i128>>()933.unwrap();934if precision <= 9 {935let values = array936.values()937.iter()938.map(|x| *x as i32)939.collect::<Vec<_>>()940.into();941942let array = PrimitiveArray::<i32>::new(943ArrowDataType::Int32,944values,945array.validity().cloned(),946);947primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)948} else if precision <= 18 {949let values = array950.values()951.iter()952.map(|x| *x as i64)953.collect::<Vec<_>>()954.into();955956let array = PrimitiveArray::<i64>::new(957ArrowDataType::Int64,958values,959array.validity().cloned(),960);961primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)962} else {963let size = decimal_length_from_precision(precision);964965let statistics = if options.has_statistics() {966let stats = fixed_size_binary::build_statistics_decimal(967array,968type_.clone(),969size,970&options.statistics,971);972Some(stats)973} else {974None975};976977let mut values = Vec::<u8>::with_capacity(size * array.len());978array.values().iter().for_each(|x| {979let bytes = &x.to_be_bytes()[16 - size..];980values.extend_from_slice(bytes)981});982let array = FixedSizeBinaryArray::new(983ArrowDataType::FixedSizeBinary(size),984values.into(),985array.validity().cloned(),986);987fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)988}989},990Decimal256(precision, _) => {991let precision = *precision;992let array = array993.as_any()994.downcast_ref::<PrimitiveArray<i256>>()995.unwrap();996if precision <= 9 {997let values = array998.values()999.iter()1000.map(|x| x.0.as_i32())1001.collect::<Vec<_>>()1002.into();10031004let array = PrimitiveArray::<i32>::new(1005ArrowDataType::Int32,1006values,1007array.validity().cloned(),1008);1009primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)1010} else if precision <= 18 {1011let values = array1012.values()1013.iter()1014.map(|x| x.0.as_i64())1015.collect::<Vec<_>>()1016.into();10171018let array = PrimitiveArray::<i64>::new(1019ArrowDataType::Int64,1020values,1021array.validity().cloned(),1022);1023primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)1024} else if precision <= 38 {1025let size = decimal_length_from_precision(precision);1026let statistics = if options.has_statistics() {1027let stats = fixed_size_binary::build_statistics_decimal256_with_i128(1028array,1029type_.clone(),1030size,1031&options.statistics,1032);1033Some(stats)1034} else {1035None1036};10371038let mut values = Vec::<u8>::with_capacity(size * array.len());1039array.values().iter().for_each(|x| {1040let bytes = &x.0.low().to_be_bytes()[16 - size..];1041values.extend_from_slice(bytes)1042});1043let array = FixedSizeBinaryArray::new(1044ArrowDataType::FixedSizeBinary(size),1045values.into(),1046array.validity().cloned(),1047);1048fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)1049} else {1050let size = 32;1051let array = array1052.as_any()1053.downcast_ref::<PrimitiveArray<i256>>()1054.unwrap();1055let statistics = if options.has_statistics() {1056let stats = fixed_size_binary::build_statistics_decimal256(1057array,1058type_.clone(),1059size,1060&options.statistics,1061);1062Some(stats)1063} else {1064None1065};1066let mut values = Vec::<u8>::with_capacity(size * array.len());1067array.values().iter().for_each(|x| {1068let bytes = &x.to_be_bytes();1069values.extend_from_slice(bytes)1070});1071let array = FixedSizeBinaryArray::new(1072ArrowDataType::FixedSizeBinary(size),1073values.into(),1074array.validity().cloned(),1075);10761077fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)1078}1079},1080Int128 => {1081let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();1082// Can't write min/max statistics for signed 128-bit integer, see #25965.1083let mut no_mm_options = options;1084no_mm_options.statistics.min_value = false;1085no_mm_options.statistics.max_value = false;1086let statistics = if no_mm_options.has_statistics() {1087let stats = fixed_size_binary::build_statistics_decimal(1088array,1089type_.clone(),109016,1091&no_mm_options.statistics,1092);1093Some(stats)1094} else {1095None1096};1097let array = FixedSizeBinaryArray::new(1098ArrowDataType::FixedSizeBinary(16),1099array.values().clone().try_transmute().unwrap(),1100array.validity().cloned(),1101);1102fixed_size_binary::nested_array_to_page(1103&array,1104no_mm_options,1105type_,1106nested,1107statistics,1108)1109},1110UInt128 => {1111let array: &PrimitiveArray<u128> = array.as_any().downcast_ref().unwrap();1112let statistics = if options.has_statistics() {1113let stats = fixed_size_binary::build_statistics_decimal(1114array,1115type_.clone(),111616,1117&options.statistics,1118);1119Some(stats)1120} else {1121None1122};1123let array = FixedSizeBinaryArray::new(1124ArrowDataType::FixedSizeBinary(16),1125array.values().clone().try_transmute().unwrap(),1126array.validity().cloned(),1127);1128fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)1129},1130other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),1131}1132.map(Page::Data)1133}11341135fn get_encodings_recursive(dtype: &ArrowDataType, encodings: &mut Vec<Encoding>) {1136use arrow::datatypes::PhysicalType::*;1137match dtype.to_physical_type() {1138Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf81139| Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => {1140encodings.push(get_primitive_dtype_encoding(dtype))1141},1142List | FixedSizeList | LargeList => {1143let a = dtype.to_storage();1144if let ArrowDataType::List(inner) = a {1145get_encodings_recursive(&inner.dtype, encodings)1146} else if let ArrowDataType::LargeList(inner) = a {1147get_encodings_recursive(&inner.dtype, encodings)1148} else if let ArrowDataType::FixedSizeList(inner, _) = a {1149get_encodings_recursive(&inner.dtype, encodings)1150} else {1151unreachable!()1152}1153},1154Struct => {1155if let ArrowDataType::Struct(fields) = dtype.to_storage() {1156if fields.is_empty() {1157// 0-field struct writes as a boolean column representing outer validity.1158encodings.push(Encoding::Rle)1159}11601161for field in fields {1162get_encodings_recursive(&field.dtype, encodings)1163}1164} else {1165unreachable!()1166}1167},1168Map => {1169if let ArrowDataType::Map(field, _) = dtype.to_storage() {1170if let ArrowDataType::Struct(fields) = field.dtype.to_storage() {1171for field in fields {1172get_encodings_recursive(&field.dtype, encodings)1173}1174} else {1175unreachable!()1176}1177} else {1178unreachable!()1179}1180},1181Union => todo!(),1182}1183}11841185/// Transverses the `dtype` up to its (parquet) columns and returns a vector of1186/// items based on `map`.1187///1188/// This is used to assign an [`Encoding`] to every parquet column based on the columns' type (see example)1189pub fn get_dtype_encoding(dtype: &ArrowDataType) -> Vec<Encoding> {1190let mut encodings = vec![];1191get_encodings_recursive(dtype, &mut encodings);1192encodings1193}11941195fn get_primitive_dtype_encoding(dtype: &ArrowDataType) -> Encoding {1196match dtype.to_physical_type() {1197PhysicalType::Dictionary(_)1198| PhysicalType::LargeBinary1199| PhysicalType::LargeUtf81200| PhysicalType::Utf8View1201| PhysicalType::BinaryView1202| PhysicalType::Primitive(_) => Encoding::RleDictionary,1203// remaining is plain1204_ => Encoding::Plain,1205}1206}120712081209