Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/simple.rs
8509 views
use arrow::array::{Array, BinaryViewArray, FixedSizeBinaryArray, PrimitiveArray, StructArray};1use arrow::bitmap::Bitmap;2use arrow::datatypes::{3ArrowDataType, DTYPE_CATEGORICAL_LEGACY, DTYPE_CATEGORICAL_NEW, DTYPE_ENUM_VALUES_LEGACY,4DTYPE_ENUM_VALUES_NEW, Field, IntegerType, IntervalUnit, TimeUnit,5};6use arrow::types::{days_ms, i256};7use ethnum::I256;8use polars_compute::cast::CastOptionsImpl;9use polars_utils::float16::pf16;10use polars_utils::pl_str::PlSmallStr;1112use super::utils::filter::Filter;13use super::{14BasicDecompressor, InitNested, NestedState, boolean, fixed_size_binary, null, primitive,15};16use crate::parquet::error::ParquetResult;17use crate::parquet::schema::types::{18PhysicalType, PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit,19};20use crate::parquet::types::int96_to_i64_ns;21use crate::read::ParquetError;22use crate::read::deserialize::categorical::CategoricalDecoder;23use crate::read::deserialize::utils::PageDecoder;24use crate::read::deserialize::{binary, binview};2526/// An iterator adapter that maps an iterator of Pages a boxed [`Array`] of [`ArrowDataType`]27/// `dtype` with a maximum of `num_rows` elements.28pub fn page_iter_to_array(29pages: BasicDecompressor,30type_: &PrimitiveType,31field: Field,32filter: Option<Filter>,33init_nested: Option<Vec<InitNested>>,34) -> ParquetResult<(Option<NestedState>, Vec<Box<dyn Array>>, Bitmap)> {35use ArrowDataType::*;3637let physical_type = &type_.physical_type;38let logical_type = &type_.logical_type;39let is_pl_empty_struct = field.is_pl_pq_empty_struct();40let dtype = field.dtype;4142Ok(match (physical_type, dtype.to_storage()) {43(_, Null) => PageDecoder::new(&field.name, pages, dtype, null::NullDecoder, init_nested)?44.collect_boxed(filter)?,4546// Empty structs are roundtrippable by mapping to Boolean array.47(PhysicalType::Boolean, Struct(fs)) if fs.is_empty() && is_pl_empty_struct => {48let (nested, array, ptm) = PageDecoder::new(49&field.name,50pages,51ArrowDataType::Boolean,52boolean::BooleanDecoder,53init_nested,54)?55.collect(filter)?;5657let array = array58.into_iter()59.map(|mut array| {60StructArray::new(61dtype.clone(),62array.len(),63Vec::new(),64array.take_validity(),65)66.to_boxed()67})68.collect::<Vec<Box<dyn Array>>>();6970(nested, array, ptm)71},72(PhysicalType::Boolean, Boolean) => PageDecoder::new(73&field.name,74pages,75dtype,76boolean::BooleanDecoder,77init_nested,78)?79.collect_boxed(filter)?,80(PhysicalType::Int32, UInt8) => PageDecoder::new(81&field.name,82pages,83dtype,84primitive::IntDecoder::<i32, u8, _>::cast_as(),85init_nested,86)?87.collect_boxed(filter)?,88(PhysicalType::Int32, UInt16) => PageDecoder::new(89&field.name,90pages,91dtype,92primitive::IntDecoder::<i32, u16, _>::cast_as(),93init_nested,94)?95.collect_boxed(filter)?,96(PhysicalType::Int32, UInt32) => PageDecoder::new(97&field.name,98pages,99dtype,100primitive::IntDecoder::<i32, u32, _>::cast_as(),101init_nested,102)?103.collect_boxed(filter)?,104(PhysicalType::Int64, UInt32) => PageDecoder::new(105&field.name,106pages,107dtype,108primitive::IntDecoder::<i64, u32, _>::cast_as(),109init_nested,110)?111.collect_boxed(filter)?,112(PhysicalType::Int32, Int8) => PageDecoder::new(113&field.name,114pages,115dtype,116primitive::IntDecoder::<i32, i8, _>::cast_as(),117init_nested,118)?119.collect_boxed(filter)?,120(PhysicalType::Int32, Int16) => PageDecoder::new(121&field.name,122pages,123dtype,124primitive::IntDecoder::<i32, i16, _>::cast_as(),125init_nested,126)?127.collect_boxed(filter)?,128(PhysicalType::Int32, Int32 | Date32 | Time32(_)) => PageDecoder::new(129&field.name,130pages,131dtype,132primitive::IntDecoder::<i32, _, _>::unit(),133init_nested,134)?135.collect_boxed(filter)?,136(PhysicalType::Int64 | PhysicalType::Int96, Timestamp(time_unit, _)) => {137let time_unit = *time_unit;138return timestamp(139&field.name,140pages,141physical_type,142logical_type,143dtype,144filter,145time_unit,146init_nested,147);148},149(PhysicalType::FixedLenByteArray(_), FixedSizeBinary(_)) => {150let size = FixedSizeBinaryArray::get_size(&dtype);151152PageDecoder::new(153&field.name,154pages,155dtype,156fixed_size_binary::BinaryDecoder { size },157init_nested,158)?159.collect_boxed(filter)?160},161(PhysicalType::FixedLenByteArray(12), Interval(IntervalUnit::YearMonth)) => {162// @TODO: Make a separate decoder for this163164let n = 12;165let (nested, array, ptm) = PageDecoder::new(166&field.name,167pages,168ArrowDataType::FixedSizeBinary(n),169fixed_size_binary::BinaryDecoder { size: n },170init_nested,171)?172.collect(filter)?;173174let array = array175.into_iter()176.map(|array| {177let values = array178.values()179.chunks_exact(n)180.map(|value: &[u8]| i32::from_le_bytes(value[..4].try_into().unwrap()))181.collect::<Vec<_>>();182let validity = array.validity().cloned();183Ok(184PrimitiveArray::<i32>::try_new(dtype.clone(), values.into(), validity)?185.to_boxed(),186)187})188.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;189190(nested, array, ptm)191},192(PhysicalType::FixedLenByteArray(12), Interval(IntervalUnit::DayTime)) => {193// @TODO: Make a separate decoder for this194195let n = 12;196let (nested, array, ptm) = PageDecoder::new(197&field.name,198pages,199ArrowDataType::FixedSizeBinary(n),200fixed_size_binary::BinaryDecoder { size: n },201init_nested,202)?203.collect(filter)?;204205let array = array206.into_iter()207.map(|array| {208let values = array209.values()210.chunks_exact(n)211.map(super::super::convert_days_ms)212.collect::<Vec<_>>();213let validity = array.validity().cloned();214Ok(215PrimitiveArray::<days_ms>::try_new(dtype.clone(), values.into(), validity)?216.to_boxed(),217)218})219.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;220221(nested, array, ptm)222},223(PhysicalType::FixedLenByteArray(12), Interval(IntervalUnit::MonthDayMillis)) => {224// @TODO: Make a separate decoder for this225226const N_BYTES: usize = 12;227let (nested, array, ptm) = PageDecoder::new(228&field.name,229pages,230ArrowDataType::FixedSizeBinary(N_BYTES),231fixed_size_binary::BinaryDecoder { size: N_BYTES },232init_nested,233)?234.collect(filter)?;235236let out = array237.into_iter()238.map(|arr| convert_interval_bytes_to_month_day_nano_struct(arr).boxed())239.collect();240241(nested, out, ptm)242},243(PhysicalType::FixedLenByteArray(16), UInt128) => {244let n = 16;245let (nested, array, ptm) = PageDecoder::new(246&field.name,247pages,248ArrowDataType::FixedSizeBinary(n),249fixed_size_binary::BinaryDecoder { size: n },250init_nested,251)?252.collect(filter)?;253254let array = array255.into_iter()256.map(|array| {257let (_, values, validity) = array.into_inner();258let values = values.try_transmute().expect(259"this should work since the parquet decoder has alignment constraints",260);261Ok(262PrimitiveArray::<u128>::try_new(dtype.clone(), values, validity)?263.to_boxed(),264)265})266.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;267268(nested, array, ptm)269},270(PhysicalType::FixedLenByteArray(16), Int128) => {271let n = 16;272let (nested, array, ptm) = PageDecoder::new(273&field.name,274pages,275ArrowDataType::FixedSizeBinary(n),276fixed_size_binary::BinaryDecoder { size: n },277init_nested,278)?279.collect(filter)?;280281let array = array282.into_iter()283.map(|array| {284let (_, values, validity) = array.into_inner();285let values = values.try_transmute().expect(286"this should work since the parquet decoder has alignment constraints",287);288Ok(289PrimitiveArray::<i128>::try_new(dtype.clone(), values, validity)?290.to_boxed(),291)292})293.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;294295(nested, array, ptm)296},297(PhysicalType::Int32, Decimal(_, _)) => PageDecoder::new(298&field.name,299pages,300dtype,301primitive::IntDecoder::<i32, i128, _>::cast_into(),302init_nested,303)?304.collect_boxed(filter)?,305(PhysicalType::Int64, Decimal(_, _)) => PageDecoder::new(306&field.name,307pages,308dtype,309primitive::IntDecoder::<i64, i128, _>::cast_into(),310init_nested,311)?312.collect_boxed(filter)?,313(PhysicalType::FixedLenByteArray(n), Decimal(_, _)) if *n > 16 => {314return Err(ParquetError::not_supported(format!(315"not implemented: can't decode Decimal128 type from Fixed Size Byte Array of len {n:?}"316)));317},318(PhysicalType::FixedLenByteArray(n), Decimal(_, _)) => {319// @TODO: Make a separate decoder for this320321let n = *n;322323let (nested, array, ptm) = PageDecoder::new(324&field.name,325pages,326ArrowDataType::FixedSizeBinary(n),327fixed_size_binary::BinaryDecoder { size: n },328init_nested,329)?330.collect(filter)?;331332let array = array333.into_iter()334.map(|array| {335let values = array336.values()337.chunks_exact(n)338.map(|value: &[u8]| super::super::convert_i128(value, n))339.collect::<Vec<_>>();340let validity = array.validity().cloned();341Ok(342PrimitiveArray::<i128>::try_new(dtype.clone(), values.into(), validity)?343.to_boxed(),344)345})346.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;347348(nested, array, ptm)349},350(PhysicalType::ByteArray, Decimal(_, _)) => {351// @TODO: Make a separate decoder for this352353let (nested, array, ptm) = PageDecoder::new(354&field.name,355pages,356ArrowDataType::BinaryView,357binview::BinViewDecoder::new(false),358init_nested,359)?360.collect(filter)?;361362let array = array363.into_iter()364.map(|array| {365let array = array.as_any().downcast_ref::<BinaryViewArray>().unwrap();366let values = array367.values_iter()368.map(|value: &[u8]| {369if value.len() <= 16 {370Ok(super::super::convert_i128(value, value.len()))371} else {372Err(ParquetError::OutOfSpec(373"value has too many bytes for Decimal128".to_string(),374))375}376})377.collect::<ParquetResult<Vec<_>>>()?;378let validity = array.validity().cloned();379Ok(380PrimitiveArray::<i128>::try_new(dtype.clone(), values.into(), validity)?381.to_boxed(),382)383})384.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;385386(nested, array, ptm)387},388(PhysicalType::Int32, Decimal256(_, _)) => PageDecoder::new(389&field.name,390pages,391dtype,392primitive::IntDecoder::closure(|x: i32| i256(I256::new(x as i128))),393init_nested,394)?395.collect_boxed(filter)?,396(PhysicalType::Int64, Decimal256(_, _)) => PageDecoder::new(397&field.name,398pages,399dtype,400primitive::IntDecoder::closure(|x: i64| i256(I256::new(x as i128))),401init_nested,402)?403.collect_boxed(filter)?,404(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 16 => {405// @TODO: Make a separate decoder for this406407let n = *n;408409let (nested, array, ptm) = PageDecoder::new(410&field.name,411pages,412ArrowDataType::FixedSizeBinary(n),413fixed_size_binary::BinaryDecoder { size: n },414init_nested,415)?416.collect(filter)?;417418let array = array419.into_iter()420.map(|array| {421let values = array422.values()423.chunks_exact(n)424.map(|value: &[u8]| i256(I256::new(super::super::convert_i128(value, n))))425.collect::<Vec<_>>();426let validity = array.validity().cloned();427Ok(428PrimitiveArray::<i256>::try_new(dtype.clone(), values.into(), validity)?429.to_boxed(),430)431})432.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;433434(nested, array, ptm)435},436(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 32 => {437// @TODO: Make a separate decoder for this438439let n = *n;440441let (nested, array, ptm) = PageDecoder::new(442&field.name,443pages,444ArrowDataType::FixedSizeBinary(n),445fixed_size_binary::BinaryDecoder { size: n },446init_nested,447)?448.collect(filter)?;449450let array = array451.into_iter()452.map(|array| {453let values = array454.values()455.chunks_exact(n)456.map(super::super::convert_i256)457.collect::<Vec<_>>();458let validity = array.validity().cloned();459Ok(460PrimitiveArray::<i256>::try_new(dtype.clone(), values.into(), validity)?461.to_boxed(),462)463})464.collect::<ParquetResult<Vec<Box<dyn Array>>>>()?;465466(nested, array, ptm)467},468(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 32 => {469return Err(ParquetError::not_supported(format!(470"Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}",471)));472},473(PhysicalType::Int32, Date64) => PageDecoder::new(474&field.name,475pages,476dtype,477primitive::IntDecoder::closure(|x: i32| i64::from(x) * 86400000),478init_nested,479)?480.collect_boxed(filter)?,481(PhysicalType::Int64, Date64) => PageDecoder::new(482&field.name,483pages,484dtype,485primitive::IntDecoder::<i64, _, _>::unit(),486init_nested,487)?488.collect_boxed(filter)?,489(PhysicalType::Int64, Int64 | Time64(_) | Duration(_)) => PageDecoder::new(490&field.name,491pages,492dtype,493primitive::IntDecoder::<i64, _, _>::unit(),494init_nested,495)?496.collect_boxed(filter)?,497498(PhysicalType::Int64, UInt64) => PageDecoder::new(499&field.name,500pages,501dtype,502primitive::IntDecoder::<i64, u64, _>::cast_as(),503init_nested,504)?505.collect_boxed(filter)?,506507(PhysicalType::FixedLenByteArray(2), Float16) => PageDecoder::new(508&field.name,509pages,510dtype,511primitive::FloatDecoder::<pf16, _, _>::unit(),512init_nested,513)?514.collect_boxed(filter)?,515(PhysicalType::Float, Float32) => PageDecoder::new(516&field.name,517pages,518dtype,519primitive::FloatDecoder::<f32, _, _>::unit(),520init_nested,521)?522.collect_boxed(filter)?,523(PhysicalType::Double, Float64) => PageDecoder::new(524&field.name,525pages,526dtype,527primitive::FloatDecoder::<f64, _, _>::unit(),528init_nested,529)?530.collect_boxed(filter)?,531// Decoder for BinaryOffset532(PhysicalType::ByteArray, LargeBinary) => PageDecoder::new(533&field.name,534pages,535dtype,536binary::BinaryDecoder,537init_nested,538)?539.collect_boxed(filter)?,540// Don't compile this code with `i32` as we don't use this in polars541(PhysicalType::ByteArray, LargeUtf8) => {542let is_string = matches!(dtype, LargeUtf8);543PageDecoder::new(544&field.name,545pages,546dtype,547binview::BinViewDecoder::new(is_string),548init_nested,549)?550.collect_boxed(filter)?551},552(_, Binary | Utf8) => unreachable!(),553(PhysicalType::ByteArray, BinaryView | Utf8View) => {554let is_string = matches!(dtype, Utf8View);555PageDecoder::new(556&field.name,557pages,558dtype,559binview::BinViewDecoder::new(is_string),560init_nested,561)?562.collect_boxed(filter)?563},564(_, Dictionary(key_type, value_type, _)) => {565// @NOTE: This should only hit in two cases:566// - Polars enum's and categorical's567// - Int -> String which can be turned into categoricals568assert_eq!(value_type.as_ref(), &ArrowDataType::Utf8View);569570if field.metadata.is_some_and(|md| {571md.contains_key(DTYPE_ENUM_VALUES_LEGACY)572|| md.contains_key(DTYPE_ENUM_VALUES_NEW)573|| md.contains_key(DTYPE_CATEGORICAL_NEW)574|| md.contains_key(DTYPE_CATEGORICAL_LEGACY)575}) && matches!(576key_type,577IntegerType::UInt8 | IntegerType::UInt16 | IntegerType::UInt32578) {579match key_type {580IntegerType::UInt8 => PageDecoder::new(581&field.name,582pages,583dtype,584CategoricalDecoder::<u8>::new(),585init_nested,586)?587.collect_boxed(filter)?,588IntegerType::UInt16 => PageDecoder::new(589&field.name,590pages,591dtype,592CategoricalDecoder::<u16>::new(),593init_nested,594)?595.collect_boxed(filter)?,596IntegerType::UInt32 => PageDecoder::new(597&field.name,598pages,599dtype,600CategoricalDecoder::<u32>::new(),601init_nested,602)?603.collect_boxed(filter)?,604_ => unreachable!(),605}606} else {607let (nested, array, ptm) = PageDecoder::new(608&field.name,609pages,610ArrowDataType::Utf8View,611binview::BinViewDecoder::new_string(),612init_nested,613)?614.collect(filter)?;615616let array = array617.into_iter()618.map(|array| {619polars_compute::cast::cast(620array.as_ref(),621&dtype,622CastOptionsImpl::default(),623)624.unwrap()625})626.collect();627628(nested, array, ptm)629}630},631(from, to) => {632return Err(ParquetError::not_supported(format!(633"reading parquet type {from:?} to {to:?} still not implemented",634)));635},636})637}638639/// Unify the timestamp unit from parquet TimeUnit into arrow's TimeUnit640/// Returns (a int64 factor, is_multiplier)641fn unify_timestamp_unit(642logical_type: &Option<PrimitiveLogicalType>,643time_unit: TimeUnit,644) -> (i64, bool) {645if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = logical_type {646match (*unit, time_unit) {647(ParquetTimeUnit::Milliseconds, TimeUnit::Millisecond)648| (ParquetTimeUnit::Microseconds, TimeUnit::Microsecond)649| (ParquetTimeUnit::Nanoseconds, TimeUnit::Nanosecond) => (1, true),650651(ParquetTimeUnit::Milliseconds, TimeUnit::Second)652| (ParquetTimeUnit::Microseconds, TimeUnit::Millisecond)653| (ParquetTimeUnit::Nanoseconds, TimeUnit::Microsecond) => (1000, false),654655(ParquetTimeUnit::Microseconds, TimeUnit::Second)656| (ParquetTimeUnit::Nanoseconds, TimeUnit::Millisecond) => (1_000_000, false),657658(ParquetTimeUnit::Nanoseconds, TimeUnit::Second) => (1_000_000_000, false),659660(ParquetTimeUnit::Milliseconds, TimeUnit::Microsecond)661| (ParquetTimeUnit::Microseconds, TimeUnit::Nanosecond) => (1_000, true),662663(ParquetTimeUnit::Milliseconds, TimeUnit::Nanosecond) => (1_000_000, true),664}665} else {666(1, true)667}668}669670#[inline]671pub fn int96_to_i64_us(value: [u32; 3]) -> i64 {672const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;673const SECONDS_PER_DAY: i64 = 86_400;674const MICROS_PER_SECOND: i64 = 1_000_000;675676let day = value[2] as i64;677let microseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000;678let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;679680seconds * MICROS_PER_SECOND + microseconds681}682683#[inline]684pub fn int96_to_i64_ms(value: [u32; 3]) -> i64 {685const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;686const SECONDS_PER_DAY: i64 = 86_400;687const MILLIS_PER_SECOND: i64 = 1_000;688689let day = value[2] as i64;690let milliseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000;691let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;692693seconds * MILLIS_PER_SECOND + milliseconds694}695696#[inline]697pub fn int96_to_i64_s(value: [u32; 3]) -> i64 {698const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;699const SECONDS_PER_DAY: i64 = 86_400;700701let day = value[2] as i64;702let seconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000_000;703let day_seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;704705day_seconds + seconds706}707708#[expect(clippy::too_many_arguments)]709fn timestamp(710field_name: &str,711pages: BasicDecompressor,712physical_type: &PhysicalType,713logical_type: &Option<PrimitiveLogicalType>,714dtype: ArrowDataType,715filter: Option<Filter>,716time_unit: TimeUnit,717nested: Option<Vec<InitNested>>,718) -> ParquetResult<(Option<NestedState>, Vec<Box<dyn Array>>, Bitmap)> {719if physical_type == &PhysicalType::Int96 {720return match time_unit {721TimeUnit::Nanosecond => PageDecoder::new(722field_name,723pages,724dtype,725primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_ns(x)),726nested,727)?728.collect_boxed(filter),729TimeUnit::Microsecond => PageDecoder::new(730field_name,731pages,732dtype,733primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_us(x)),734nested,735)?736.collect_boxed(filter),737TimeUnit::Millisecond => PageDecoder::new(738field_name,739pages,740dtype,741primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_ms(x)),742nested,743)?744.collect_boxed(filter),745TimeUnit::Second => PageDecoder::new(746field_name,747pages,748dtype,749primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_s(x)),750nested,751)?752.collect_boxed(filter),753};754};755756if physical_type != &PhysicalType::Int64 {757return Err(ParquetError::not_supported(758"can't decode a timestamp from a non-int64 parquet type",759));760}761762let (factor, is_multiplier) = unify_timestamp_unit(logical_type, time_unit);763match (factor, is_multiplier) {764(1, _) => PageDecoder::new(765field_name,766pages,767dtype,768primitive::IntDecoder::<i64, _, _>::unit(),769nested,770)?771.collect_boxed(filter),772(a, true) => PageDecoder::new(773field_name,774pages,775dtype,776primitive::IntDecoder::closure(|x: i64| x * a),777nested,778)?779.collect_boxed(filter),780(a, false) => PageDecoder::new(781field_name,782pages,783dtype,784primitive::IntDecoder::closure(|x: i64| x / a),785nested,786)?787.collect_boxed(filter),788}789}790791/// Converts directly from Parquet INTERVAL to Struct.792fn convert_interval_bytes_to_month_day_nano_struct(793month_day_millis_bytes: FixedSizeBinaryArray,794) -> StructArray {795const ROW_WIDTH: usize = 12;796797let bytes: &[u8] = month_day_millis_bytes.values();798let output_length = bytes.len() / ROW_WIDTH;799800assert_eq!(bytes.len(), output_length * ROW_WIDTH);801802let (months_out, days_out, nanoseconds_out): (Vec<i32>, Vec<i32>, Vec<i64>) = (0803..output_length)804.map(|i| {805let bytes: [u8; ROW_WIDTH] =806unsafe { bytes.get_unchecked(i * ROW_WIDTH..(i + 1) * ROW_WIDTH) }807.try_into()808.unwrap();809810let months: i32 = i32::from_le_bytes(bytes[..4].try_into().unwrap());811let days: i32 = i32::from_le_bytes(bytes[4..8].try_into().unwrap());812let nanoseconds: i64 = (i32::from_le_bytes(bytes[8..12].try_into().unwrap()) as i64)813.checked_mul(1_000_000i64) // Convert milliseconds to nanoseconds.814.unwrap();815816(months, days, nanoseconds)817})818.collect();819820let struct_fields = vec![821Field::new(822PlSmallStr::from_static("months"),823ArrowDataType::Int32,824true,825),826Field::new(PlSmallStr::from_static("days"), ArrowDataType::Int32, true),827Field::new(828PlSmallStr::from_static("nanoseconds"),829ArrowDataType::Duration(TimeUnit::Nanosecond),830true,831),832];833834let struct_value_arrays = vec![835PrimitiveArray::<i32>::from_vec(months_out).boxed(),836PrimitiveArray::<i32>::from_vec(days_out).boxed(),837PrimitiveArray::<i64>::try_new(838ArrowDataType::Duration(TimeUnit::Nanosecond),839nanoseconds_out.into(),840None,841)842.unwrap()843.boxed(),844];845846StructArray::new(847ArrowDataType::Struct(struct_fields),848output_length,849struct_value_arrays,850month_day_millis_bytes.validity().cloned(),851)852}853854855