Path: blob/main/crates/polars-parquet/src/arrow/write/mod.rs
6940 views
//! APIs to write to Parquet format.1//!2//! # Arrow/Parquet Interoperability3//! As of [parquet-format v2.9](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md)4//! there are Arrow [DataTypes](arrow::datatypes::ArrowDataType) which do not have a parquet5//! representation. These include but are not limited to:6//! * `ArrowDataType::Timestamp(TimeUnit::Second, _)`7//! * `ArrowDataType::Int64`8//! * `ArrowDataType::Duration`9//! * `ArrowDataType::Date64`10//! * `ArrowDataType::Time32(TimeUnit::Second)`11//!12//! The use of these arrow types will result in no logical type being stored within a parquet file.1314mod binary;15mod binview;16mod boolean;17mod dictionary;18mod file;19mod fixed_size_binary;20mod nested;21mod pages;22mod primitive;23mod row_group;24mod schema;25mod utils;2627use arrow::array::*;28use arrow::datatypes::*;29use arrow::types::{NativeType, days_ms, i256};30pub use nested::{num_values, write_rep_and_def};31pub use pages::{to_leaves, to_nested, to_parquet_leaves};32use polars_utils::pl_str::PlSmallStr;33pub use utils::write_def_levels;3435pub use crate::parquet::compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel};36pub use crate::parquet::encoding::Encoding;37pub use crate::parquet::metadata::{38Descriptor, FileMetadata, KeyValue, SchemaDescriptor, ThriftFileMetadata,39};40pub use crate::parquet::page::{CompressedDataPage, CompressedPage, Page};41use crate::parquet::schema::Repetition;42use crate::parquet::schema::types::PrimitiveType as ParquetPrimitiveType;43pub use crate::parquet::schema::types::{44FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType,45};46pub use crate::parquet::write::{47Compressor, DynIter, DynStreamingIterator, RowGroupIterColumns, Version, compress,48write_metadata_sidecar,49};50pub use crate::parquet::{FallibleStreamingIterator, fallible_streaming_iterator};5152/// The statistics to write53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]54#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]55#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]56pub struct StatisticsOptions {57pub min_value: bool,58pub max_value: bool,59pub distinct_count: bool,60pub null_count: bool,61}6263impl Default for StatisticsOptions {64fn default() -> Self {65Self {66min_value: true,67max_value: true,68distinct_count: false,69null_count: true,70}71}72}7374/// Options to encode an array75#[derive(Clone, Copy)]76pub enum EncodeNullability {77Required,78Optional,79}8081/// Currently supported options to write to parquet82#[derive(Debug, Clone, Copy, PartialEq, Eq)]83pub struct WriteOptions {84/// Whether to write statistics85pub statistics: StatisticsOptions,86/// The page and file version to use87pub version: Version,88/// The compression to apply to every page89pub compression: CompressionOptions,90/// The size to flush a page, defaults to 1024 * 1024 if None91pub data_page_size: Option<usize>,92}9394#[derive(Clone)]95pub struct ColumnWriteOptions {96pub field_id: Option<i32>,97pub metadata: Vec<KeyValue>,98pub required: Option<bool>,99pub children: ChildWriteOptions,100}101102#[derive(Clone)]103pub enum ChildWriteOptions {104Leaf(FieldWriteOptions),105ListLike(Box<ListLikeFieldWriteOptions>),106Struct(Box<StructFieldWriteOptions>),107}108109impl ColumnWriteOptions {110pub fn to_leaves<'a>(&'a self, out: &mut Vec<&'a FieldWriteOptions>) {111match &self.children {112ChildWriteOptions::Leaf(o) => out.push(o),113ChildWriteOptions::ListLike(o) => o.child.to_leaves(out),114ChildWriteOptions::Struct(o) => {115for o in &o.children {116o.to_leaves(out);117}118},119}120}121}122123#[derive(Clone)]124pub struct FieldWriteOptions {125pub encoding: Encoding,126}127128impl ColumnWriteOptions {129pub fn default_with(children: ChildWriteOptions) -> Self {130Self {131field_id: None,132metadata: Vec::new(),133required: None,134children,135}136}137}138139impl FieldWriteOptions {140pub fn default_with_encoding(encoding: Encoding) -> Self {141Self { encoding }142}143144pub fn into_default_column_write_options(self) -> ColumnWriteOptions {145ColumnWriteOptions::default_with(ChildWriteOptions::Leaf(self))146}147}148149#[derive(Clone)]150pub struct ListLikeFieldWriteOptions {151pub child: ColumnWriteOptions,152}153154#[derive(Clone)]155pub struct StructFieldWriteOptions {156pub children: Vec<ColumnWriteOptions>,157}158159use arrow::compute::aggregate::estimated_bytes_size;160use arrow::match_integer_type;161pub use file::FileWriter;162pub use pages::{Nested, array_to_columns, arrays_to_columns};163use polars_error::{PolarsResult, polars_bail};164pub use row_group::{RowGroupIterator, row_group_iter};165pub use schema::{schema_to_metadata_key, to_parquet_type};166167use self::pages::{FixedSizeListNested, PrimitiveNested, StructNested};168use crate::write::dictionary::encode_as_dictionary_optional;169170impl StatisticsOptions {171pub fn empty() -> Self {172Self {173min_value: false,174max_value: false,175distinct_count: false,176null_count: false,177}178}179180pub fn full() -> Self {181Self {182min_value: true,183max_value: true,184distinct_count: true,185null_count: true,186}187}188189pub fn is_empty(&self) -> bool {190!(self.min_value || self.max_value || self.distinct_count || self.null_count)191}192193pub fn is_full(&self) -> bool {194self.min_value && self.max_value && self.distinct_count && self.null_count195}196}197198impl WriteOptions {199pub fn has_statistics(&self) -> bool {200!self.statistics.is_empty()201}202}203204impl EncodeNullability {205const fn new(is_optional: bool) -> Self {206if is_optional {207Self::Optional208} else {209Self::Required210}211}212213fn is_optional(self) -> bool {214matches!(self, Self::Optional)215}216}217218/// returns offset and length to slice the leaf values219pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {220// find the deepest recursive dremel structure as that one determines how many values we must221// take222let mut out = (0, 0);223for nested in nested.iter().rev() {224match nested {225Nested::LargeList(l_nested) => {226let start = *l_nested.offsets.first();227let end = *l_nested.offsets.last();228return (start as usize, (end - start) as usize);229},230Nested::List(l_nested) => {231let start = *l_nested.offsets.first();232let end = *l_nested.offsets.last();233return (start as usize, (end - start) as usize);234},235Nested::FixedSizeList(nested) => return (0, nested.length * nested.width),236Nested::Primitive(nested) => out = (0, nested.length),237Nested::Struct(_) => {},238}239}240out241}242243fn decimal_length_from_precision(precision: usize) -> usize {244// digits = floor(log_10(2^(8*n - 1) - 1))245// ceil(digits) = log10(2^(8*n - 1) - 1)246// 10^ceil(digits) = 2^(8*n - 1) - 1247// 10^ceil(digits) + 1 = 2^(8*n - 1)248// log2(10^ceil(digits) + 1) = (8*n - 1)249// log2(10^ceil(digits) + 1) + 1 = 8*n250// (log2(10^ceil(a) + 1) + 1) / 8 = n251(((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize252}253254/// Creates a parquet [`SchemaDescriptor`] from a [`ArrowSchema`].255pub fn to_parquet_schema(256schema: &ArrowSchema,257column_options: &[ColumnWriteOptions],258) -> PolarsResult<SchemaDescriptor> {259let parquet_types = schema260.iter_values()261.zip(column_options)262.map(|(field, options)| to_parquet_type(field, options))263.collect::<PolarsResult<Vec<_>>>()?;264Ok(SchemaDescriptor::new(265PlSmallStr::from_static("root"),266parquet_types,267))268}269270/// Slices the [`Array`] to `Box<dyn Array>` and `Vec<Nested>`.271pub fn slice_parquet_array(272primitive_array: &mut dyn Array,273nested: &mut [Nested],274mut current_offset: usize,275mut current_length: usize,276) {277for nested in nested.iter_mut() {278match nested {279Nested::LargeList(l_nested) => {280l_nested.offsets.slice(current_offset, current_length + 1);281if let Some(validity) = l_nested.validity.as_mut() {282validity.slice(current_offset, current_length)283};284285// Update the offset/ length so that the Primitive is sliced properly.286current_length = l_nested.offsets.range() as usize;287current_offset = *l_nested.offsets.first() as usize;288},289Nested::List(l_nested) => {290l_nested.offsets.slice(current_offset, current_length + 1);291if let Some(validity) = l_nested.validity.as_mut() {292validity.slice(current_offset, current_length)293};294295// Update the offset/ length so that the Primitive is sliced properly.296current_length = l_nested.offsets.range() as usize;297current_offset = *l_nested.offsets.first() as usize;298},299Nested::Struct(StructNested {300validity, length, ..301}) => {302*length = current_length;303if let Some(validity) = validity.as_mut() {304validity.slice(current_offset, current_length)305};306},307Nested::Primitive(PrimitiveNested {308validity, length, ..309}) => {310*length = current_length;311if let Some(validity) = validity.as_mut() {312validity.slice(current_offset, current_length)313};314primitive_array.slice(current_offset, current_length);315},316Nested::FixedSizeList(FixedSizeListNested {317validity,318length,319width,320..321}) => {322if let Some(validity) = validity.as_mut() {323validity.slice(current_offset, current_length)324};325*length = current_length;326// Update the offset/ length so that the Primitive is sliced properly.327current_length *= *width;328current_offset *= *width;329},330}331}332}333334/// Get the length of [`Array`] that should be sliced.335pub fn get_max_length(nested: &[Nested]) -> usize {336let mut length = 0;337for nested in nested.iter() {338match nested {339Nested::LargeList(l_nested) => length += l_nested.offsets.range() as usize,340Nested::List(l_nested) => length += l_nested.offsets.range() as usize,341Nested::FixedSizeList(nested) => length += nested.length * nested.width,342_ => {},343}344}345length346}347348/// Returns an iterator of [`Page`].349pub fn array_to_pages(350primitive_array: &dyn Array,351type_: ParquetPrimitiveType,352nested: &[Nested],353options: WriteOptions,354field_options: &FieldWriteOptions,355) -> PolarsResult<DynIter<'static, PolarsResult<Page>>> {356let mut encoding = field_options.encoding;357if let ArrowDataType::Dictionary(key_type, _, _) = primitive_array.dtype().to_logical_type() {358return match_integer_type!(key_type, |$T| {359dictionary::array_to_pages::<$T>(360primitive_array.as_any().downcast_ref().unwrap(),361type_,362&nested,363options,364encoding,365)366});367};368if let Encoding::RleDictionary = encoding {369// Only take this path for primitive columns370if matches!(nested.first(), Some(Nested::Primitive(_))) {371if let Some(result) =372encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)373{374return result;375}376}377378// We didn't succeed, fallback to plain379encoding = Encoding::Plain;380}381382let nested = nested.to_vec();383384let number_of_rows = nested[0].len();385386// note: this is not correct if the array is sliced - the estimation should happen on the387// primitive after sliced for parquet388let byte_size = estimated_bytes_size(primitive_array);389390const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;391let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);392let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size393let bytes_per_row = if number_of_rows == 0 {3940395} else {396((byte_size as f64) / (number_of_rows as f64)) as usize397};398let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);399400let row_iter = (0..number_of_rows)401.step_by(rows_per_page)402.map(move |offset| {403let length = if offset + rows_per_page > number_of_rows {404number_of_rows - offset405} else {406rows_per_page407};408(offset, length)409});410411let primitive_array = primitive_array.to_boxed();412413let pages = row_iter.map(move |(offset, length)| {414let mut right_array = primitive_array.clone();415let mut right_nested = nested.clone();416slice_parquet_array(right_array.as_mut(), &mut right_nested, offset, length);417418array_to_page(419right_array.as_ref(),420type_.clone(),421&right_nested,422options,423encoding,424)425});426Ok(DynIter::new(pages))427}428429/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.430pub fn array_to_page(431array: &dyn Array,432type_: ParquetPrimitiveType,433nested: &[Nested],434options: WriteOptions,435encoding: Encoding,436) -> PolarsResult<Page> {437if nested.len() == 1 {438// special case where validity == def levels439return array_to_page_simple(array, type_, options, encoding);440}441array_to_page_nested(array, type_, nested, options, encoding)442}443444/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`.445pub fn array_to_page_simple(446array: &dyn Array,447type_: ParquetPrimitiveType,448options: WriteOptions,449encoding: Encoding,450) -> PolarsResult<Page> {451let dtype = array.dtype();452453if type_.field_info.repetition == Repetition::Required && array.null_count() > 0 {454polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);455}456457match dtype.to_logical_type() {458ArrowDataType::Boolean => boolean::array_to_page(459array.as_any().downcast_ref().unwrap(),460options,461type_,462encoding,463),464// casts below MUST match the casts done at the metadata (field -> parquet type).465ArrowDataType::UInt8 => {466return primitive::array_to_page_integer::<u8, i32>(467array.as_any().downcast_ref().unwrap(),468options,469type_,470encoding,471);472},473ArrowDataType::UInt16 => {474return primitive::array_to_page_integer::<u16, i32>(475array.as_any().downcast_ref().unwrap(),476options,477type_,478encoding,479);480},481ArrowDataType::UInt32 => {482return primitive::array_to_page_integer::<u32, i32>(483array.as_any().downcast_ref().unwrap(),484options,485type_,486encoding,487);488},489ArrowDataType::UInt64 => {490return primitive::array_to_page_integer::<u64, i64>(491array.as_any().downcast_ref().unwrap(),492options,493type_,494encoding,495);496},497ArrowDataType::Int8 => {498return primitive::array_to_page_integer::<i8, i32>(499array.as_any().downcast_ref().unwrap(),500options,501type_,502encoding,503);504},505ArrowDataType::Int16 => {506return primitive::array_to_page_integer::<i16, i32>(507array.as_any().downcast_ref().unwrap(),508options,509type_,510encoding,511);512},513ArrowDataType::Int32 | ArrowDataType::Date32 | ArrowDataType::Time32(_) => {514return primitive::array_to_page_integer::<i32, i32>(515array.as_any().downcast_ref().unwrap(),516options,517type_,518encoding,519);520},521ArrowDataType::Int64522| ArrowDataType::Date64523| ArrowDataType::Time64(_)524| ArrowDataType::Timestamp(_, _)525| ArrowDataType::Duration(_) => {526return primitive::array_to_page_integer::<i64, i64>(527array.as_any().downcast_ref().unwrap(),528options,529type_,530encoding,531);532},533ArrowDataType::Float32 => primitive::array_to_page_plain::<f32, f32>(534array.as_any().downcast_ref().unwrap(),535options,536type_,537),538ArrowDataType::Float64 => primitive::array_to_page_plain::<f64, f64>(539array.as_any().downcast_ref().unwrap(),540options,541type_,542),543ArrowDataType::LargeUtf8 => {544let array =545polars_compute::cast::cast(array, &ArrowDataType::LargeBinary, Default::default())546.unwrap();547return binary::array_to_page::<i64>(548array.as_any().downcast_ref().unwrap(),549options,550type_,551encoding,552);553},554ArrowDataType::LargeBinary => {555return binary::array_to_page::<i64>(556array.as_any().downcast_ref().unwrap(),557options,558type_,559encoding,560);561},562ArrowDataType::BinaryView => {563return binview::array_to_page(564array.as_any().downcast_ref().unwrap(),565options,566type_,567encoding,568);569},570ArrowDataType::Utf8View => {571let array =572polars_compute::cast::cast(array, &ArrowDataType::BinaryView, Default::default())573.unwrap();574return binview::array_to_page(575array.as_any().downcast_ref().unwrap(),576options,577type_,578encoding,579);580},581ArrowDataType::Null => {582let array = Int32Array::new_null(ArrowDataType::Int32, array.len());583primitive::array_to_page_plain::<i32, i32>(&array, options, type_)584},585ArrowDataType::Interval(IntervalUnit::YearMonth) => {586let array = array587.as_any()588.downcast_ref::<PrimitiveArray<i32>>()589.unwrap();590let mut values = Vec::<u8>::with_capacity(12 * array.len());591array.values().iter().for_each(|x| {592let bytes = &x.to_le_bytes();593values.extend_from_slice(bytes);594values.extend_from_slice(&[0; 8]);595});596let array = FixedSizeBinaryArray::new(597ArrowDataType::FixedSizeBinary(12),598values.into(),599array.validity().cloned(),600);601let statistics = if options.has_statistics() {602Some(fixed_size_binary::build_statistics(603&array,604type_.clone(),605&options.statistics,606))607} else {608None609};610fixed_size_binary::array_to_page(&array, options, type_, statistics)611},612ArrowDataType::Interval(IntervalUnit::DayTime) => {613let array = array614.as_any()615.downcast_ref::<PrimitiveArray<days_ms>>()616.unwrap();617let mut values = Vec::<u8>::with_capacity(12 * array.len());618array.values().iter().for_each(|x| {619let bytes = &x.to_le_bytes();620values.extend_from_slice(&[0; 4]); // months621values.extend_from_slice(bytes); // days and seconds622});623let array = FixedSizeBinaryArray::new(624ArrowDataType::FixedSizeBinary(12),625values.into(),626array.validity().cloned(),627);628let statistics = if options.has_statistics() {629Some(fixed_size_binary::build_statistics(630&array,631type_.clone(),632&options.statistics,633))634} else {635None636};637fixed_size_binary::array_to_page(&array, options, type_, statistics)638},639ArrowDataType::FixedSizeBinary(_) => {640let array = array.as_any().downcast_ref().unwrap();641let statistics = if options.has_statistics() {642Some(fixed_size_binary::build_statistics(643array,644type_.clone(),645&options.statistics,646))647} else {648None649};650651fixed_size_binary::array_to_page(array, options, type_, statistics)652},653ArrowDataType::Decimal256(precision, _) => {654let precision = *precision;655let array = array656.as_any()657.downcast_ref::<PrimitiveArray<i256>>()658.unwrap();659if precision <= 9 {660let values = array661.values()662.iter()663.map(|x| x.0.as_i32())664.collect::<Vec<_>>()665.into();666667let array = PrimitiveArray::<i32>::new(668ArrowDataType::Int32,669values,670array.validity().cloned(),671);672return primitive::array_to_page_integer::<i32, i32>(673&array, options, type_, encoding,674);675} else if precision <= 18 {676let values = array677.values()678.iter()679.map(|x| x.0.as_i64())680.collect::<Vec<_>>()681.into();682683let array = PrimitiveArray::<i64>::new(684ArrowDataType::Int64,685values,686array.validity().cloned(),687);688return primitive::array_to_page_integer::<i64, i64>(689&array, options, type_, encoding,690);691} else if precision <= 38 {692let size = decimal_length_from_precision(precision);693let statistics = if options.has_statistics() {694let stats = fixed_size_binary::build_statistics_decimal256_with_i128(695array,696type_.clone(),697size,698&options.statistics,699);700Some(stats)701} else {702None703};704705let mut values = Vec::<u8>::with_capacity(size * array.len());706array.values().iter().for_each(|x| {707let bytes = &x.0.low().to_be_bytes()[16 - size..];708values.extend_from_slice(bytes)709});710let array = FixedSizeBinaryArray::new(711ArrowDataType::FixedSizeBinary(size),712values.into(),713array.validity().cloned(),714);715fixed_size_binary::array_to_page(&array, options, type_, statistics)716} else {717let size = 32;718let array = array719.as_any()720.downcast_ref::<PrimitiveArray<i256>>()721.unwrap();722let statistics = if options.has_statistics() {723let stats = fixed_size_binary::build_statistics_decimal256(724array,725type_.clone(),726size,727&options.statistics,728);729Some(stats)730} else {731None732};733let mut values = Vec::<u8>::with_capacity(size * array.len());734array.values().iter().for_each(|x| {735let bytes = &x.to_be_bytes();736values.extend_from_slice(bytes)737});738let array = FixedSizeBinaryArray::new(739ArrowDataType::FixedSizeBinary(size),740values.into(),741array.validity().cloned(),742);743744fixed_size_binary::array_to_page(&array, options, type_, statistics)745}746},747ArrowDataType::Decimal(precision, _) => {748let precision = *precision;749let array = array750.as_any()751.downcast_ref::<PrimitiveArray<i128>>()752.unwrap();753if precision <= 9 {754let values = array755.values()756.iter()757.map(|x| *x as i32)758.collect::<Vec<_>>()759.into();760761let array = PrimitiveArray::<i32>::new(762ArrowDataType::Int32,763values,764array.validity().cloned(),765);766return primitive::array_to_page_integer::<i32, i32>(767&array, options, type_, encoding,768);769} else if precision <= 18 {770let values = array771.values()772.iter()773.map(|x| *x as i64)774.collect::<Vec<_>>()775.into();776777let array = PrimitiveArray::<i64>::new(778ArrowDataType::Int64,779values,780array.validity().cloned(),781);782return primitive::array_to_page_integer::<i64, i64>(783&array, options, type_, encoding,784);785} else {786let size = decimal_length_from_precision(precision);787788let statistics = if options.has_statistics() {789let stats = fixed_size_binary::build_statistics_decimal(790array,791type_.clone(),792size,793&options.statistics,794);795Some(stats)796} else {797None798};799800let mut values = Vec::<u8>::with_capacity(size * array.len());801array.values().iter().for_each(|x| {802let bytes = &x.to_be_bytes()[16 - size..];803values.extend_from_slice(bytes)804});805let array = FixedSizeBinaryArray::new(806ArrowDataType::FixedSizeBinary(size),807values.into(),808array.validity().cloned(),809);810fixed_size_binary::array_to_page(&array, options, type_, statistics)811}812},813ArrowDataType::Int128 => {814let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();815let statistics = if options.has_statistics() {816let stats = fixed_size_binary::build_statistics_decimal(817array,818type_.clone(),81916,820&options.statistics,821);822Some(stats)823} else {824None825};826let array = FixedSizeBinaryArray::new(827ArrowDataType::FixedSizeBinary(16),828array.values().clone().try_transmute().unwrap(),829array.validity().cloned(),830);831fixed_size_binary::array_to_page(&array, options, type_, statistics)832},833other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),834}835.map(Page::Data)836}837838fn array_to_page_nested(839array: &dyn Array,840type_: ParquetPrimitiveType,841nested: &[Nested],842options: WriteOptions,843_encoding: Encoding,844) -> PolarsResult<Page> {845if type_.field_info.repetition == Repetition::Required846&& array.validity().is_some_and(|v| v.unset_bits() > 0)847{848polars_bail!(InvalidOperation: "writing a missing value to required parquet column '{}'", type_.field_info.name);849}850851use ArrowDataType::*;852match array.dtype().to_logical_type() {853Null => {854let array = Int32Array::new_null(ArrowDataType::Int32, array.len());855primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)856},857Boolean => {858let array = array.as_any().downcast_ref().unwrap();859boolean::nested_array_to_page(array, options, type_, nested)860},861LargeUtf8 => {862let array =863polars_compute::cast::cast(array, &LargeBinary, Default::default()).unwrap();864let array = array.as_any().downcast_ref().unwrap();865binary::nested_array_to_page::<i64>(array, options, type_, nested)866},867LargeBinary => {868let array = array.as_any().downcast_ref().unwrap();869binary::nested_array_to_page::<i64>(array, options, type_, nested)870},871BinaryView => {872let array = array.as_any().downcast_ref().unwrap();873binview::nested_array_to_page(array, options, type_, nested)874},875Utf8View => {876let array = polars_compute::cast::cast(array, &BinaryView, Default::default()).unwrap();877let array = array.as_any().downcast_ref().unwrap();878binview::nested_array_to_page(array, options, type_, nested)879},880UInt8 => {881let array = array.as_any().downcast_ref().unwrap();882primitive::nested_array_to_page::<u8, i32>(array, options, type_, nested)883},884UInt16 => {885let array = array.as_any().downcast_ref().unwrap();886primitive::nested_array_to_page::<u16, i32>(array, options, type_, nested)887},888UInt32 => {889let array = array.as_any().downcast_ref().unwrap();890primitive::nested_array_to_page::<u32, i32>(array, options, type_, nested)891},892UInt64 => {893let array = array.as_any().downcast_ref().unwrap();894primitive::nested_array_to_page::<u64, i64>(array, options, type_, nested)895},896Int8 => {897let array = array.as_any().downcast_ref().unwrap();898primitive::nested_array_to_page::<i8, i32>(array, options, type_, nested)899},900Int16 => {901let array = array.as_any().downcast_ref().unwrap();902primitive::nested_array_to_page::<i16, i32>(array, options, type_, nested)903},904Int32 | Date32 | Time32(_) => {905let array = array.as_any().downcast_ref().unwrap();906primitive::nested_array_to_page::<i32, i32>(array, options, type_, nested)907},908Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => {909let array = array.as_any().downcast_ref().unwrap();910primitive::nested_array_to_page::<i64, i64>(array, options, type_, nested)911},912Float32 => {913let array = array.as_any().downcast_ref().unwrap();914primitive::nested_array_to_page::<f32, f32>(array, options, type_, nested)915},916Float64 => {917let array = array.as_any().downcast_ref().unwrap();918primitive::nested_array_to_page::<f64, f64>(array, options, type_, nested)919},920Decimal(precision, _) => {921let precision = *precision;922let array = array923.as_any()924.downcast_ref::<PrimitiveArray<i128>>()925.unwrap();926if precision <= 9 {927let values = array928.values()929.iter()930.map(|x| *x as i32)931.collect::<Vec<_>>()932.into();933934let array = PrimitiveArray::<i32>::new(935ArrowDataType::Int32,936values,937array.validity().cloned(),938);939primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)940} else if precision <= 18 {941let values = array942.values()943.iter()944.map(|x| *x as i64)945.collect::<Vec<_>>()946.into();947948let array = PrimitiveArray::<i64>::new(949ArrowDataType::Int64,950values,951array.validity().cloned(),952);953primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)954} else {955let size = decimal_length_from_precision(precision);956957let statistics = if options.has_statistics() {958let stats = fixed_size_binary::build_statistics_decimal(959array,960type_.clone(),961size,962&options.statistics,963);964Some(stats)965} else {966None967};968969let mut values = Vec::<u8>::with_capacity(size * array.len());970array.values().iter().for_each(|x| {971let bytes = &x.to_be_bytes()[16 - size..];972values.extend_from_slice(bytes)973});974let array = FixedSizeBinaryArray::new(975ArrowDataType::FixedSizeBinary(size),976values.into(),977array.validity().cloned(),978);979fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)980}981},982Decimal256(precision, _) => {983let precision = *precision;984let array = array985.as_any()986.downcast_ref::<PrimitiveArray<i256>>()987.unwrap();988if precision <= 9 {989let values = array990.values()991.iter()992.map(|x| x.0.as_i32())993.collect::<Vec<_>>()994.into();995996let array = PrimitiveArray::<i32>::new(997ArrowDataType::Int32,998values,999array.validity().cloned(),1000);1001primitive::nested_array_to_page::<i32, i32>(&array, options, type_, nested)1002} else if precision <= 18 {1003let values = array1004.values()1005.iter()1006.map(|x| x.0.as_i64())1007.collect::<Vec<_>>()1008.into();10091010let array = PrimitiveArray::<i64>::new(1011ArrowDataType::Int64,1012values,1013array.validity().cloned(),1014);1015primitive::nested_array_to_page::<i64, i64>(&array, options, type_, nested)1016} else if precision <= 38 {1017let size = decimal_length_from_precision(precision);1018let statistics = if options.has_statistics() {1019let stats = fixed_size_binary::build_statistics_decimal256_with_i128(1020array,1021type_.clone(),1022size,1023&options.statistics,1024);1025Some(stats)1026} else {1027None1028};10291030let mut values = Vec::<u8>::with_capacity(size * array.len());1031array.values().iter().for_each(|x| {1032let bytes = &x.0.low().to_be_bytes()[16 - size..];1033values.extend_from_slice(bytes)1034});1035let array = FixedSizeBinaryArray::new(1036ArrowDataType::FixedSizeBinary(size),1037values.into(),1038array.validity().cloned(),1039);1040fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)1041} else {1042let size = 32;1043let array = array1044.as_any()1045.downcast_ref::<PrimitiveArray<i256>>()1046.unwrap();1047let statistics = if options.has_statistics() {1048let stats = fixed_size_binary::build_statistics_decimal256(1049array,1050type_.clone(),1051size,1052&options.statistics,1053);1054Some(stats)1055} else {1056None1057};1058let mut values = Vec::<u8>::with_capacity(size * array.len());1059array.values().iter().for_each(|x| {1060let bytes = &x.to_be_bytes();1061values.extend_from_slice(bytes)1062});1063let array = FixedSizeBinaryArray::new(1064ArrowDataType::FixedSizeBinary(size),1065values.into(),1066array.validity().cloned(),1067);10681069fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)1070}1071},1072Int128 => {1073let array: &PrimitiveArray<i128> = array.as_any().downcast_ref().unwrap();1074let statistics = if options.has_statistics() {1075let stats = fixed_size_binary::build_statistics_decimal(1076array,1077type_.clone(),107816,1079&options.statistics,1080);1081Some(stats)1082} else {1083None1084};1085let array = FixedSizeBinaryArray::new(1086ArrowDataType::FixedSizeBinary(16),1087array.values().clone().try_transmute().unwrap(),1088array.validity().cloned(),1089);1090fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)1091},1092other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),1093}1094.map(Page::Data)1095}10961097fn transverse_recursive<T, F: Fn(&ArrowDataType) -> T + Clone>(1098dtype: &ArrowDataType,1099map: F,1100encodings: &mut Vec<T>,1101) {1102use arrow::datatypes::PhysicalType::*;1103match dtype.to_physical_type() {1104Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf81105| Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => encodings.push(map(dtype)),1106List | FixedSizeList | LargeList => {1107let a = dtype.to_logical_type();1108if let ArrowDataType::List(inner) = a {1109transverse_recursive(&inner.dtype, map, encodings)1110} else if let ArrowDataType::LargeList(inner) = a {1111transverse_recursive(&inner.dtype, map, encodings)1112} else if let ArrowDataType::FixedSizeList(inner, _) = a {1113transverse_recursive(&inner.dtype, map, encodings)1114} else {1115unreachable!()1116}1117},1118Struct => {1119if let ArrowDataType::Struct(fields) = dtype.to_logical_type() {1120for field in fields {1121transverse_recursive(&field.dtype, map.clone(), encodings)1122}1123} else {1124unreachable!()1125}1126},1127Map => {1128if let ArrowDataType::Map(field, _) = dtype.to_logical_type() {1129if let ArrowDataType::Struct(fields) = field.dtype.to_logical_type() {1130for field in fields {1131transverse_recursive(&field.dtype, map.clone(), encodings)1132}1133} else {1134unreachable!()1135}1136} else {1137unreachable!()1138}1139},1140Union => todo!(),1141}1142}11431144/// Transverses the `dtype` up to its (parquet) columns and returns a vector of1145/// items based on `map`.1146///1147/// This is used to assign an [`Encoding`] to every parquet column based on the columns' type (see example)1148pub fn transverse<T, F: Fn(&ArrowDataType) -> T + Clone>(dtype: &ArrowDataType, map: F) -> Vec<T> {1149let mut encodings = vec![];1150transverse_recursive(dtype, map, &mut encodings);1151encodings1152}115311541155