Path: blob/main/crates/polars-parquet/src/arrow/write/schema.rs
6940 views
use std::borrow::Cow;1use std::sync::Arc;23use arrow::datatypes::{ArrowDataType, ArrowSchema, ExtensionType, Field, TimeUnit};4use arrow::io::ipc::write::{default_ipc_fields, schema_to_bytes};5use base64::Engine as _;6use base64::engine::general_purpose;7use polars_error::{PolarsResult, polars_bail};8use polars_utils::pl_str::PlSmallStr;910use super::super::ARROW_SCHEMA_META_KEY;11use super::ColumnWriteOptions;12use crate::arrow::write::decimal_length_from_precision;13use crate::parquet::metadata::KeyValue;14use crate::parquet::schema::Repetition;15use crate::parquet::schema::types::{16GroupConvertedType, GroupLogicalType, IntegerType, ParquetType, PhysicalType,17PrimitiveConvertedType, PrimitiveLogicalType, TimeUnit as ParquetTimeUnit,18};19use crate::write::ChildWriteOptions;2021fn convert_field(field: Field) -> Field {22Field {23name: field.name,24dtype: convert_dtype(field.dtype),25is_nullable: field.is_nullable,26metadata: field.metadata,27}28}2930fn convert_dtype(dtype: ArrowDataType) -> ArrowDataType {31use ArrowDataType as D;32match dtype {33D::LargeList(field) => D::LargeList(Box::new(convert_field(*field))),34D::Struct(mut fields) => {35for field in &mut fields {36*field = convert_field(std::mem::take(field))37}38D::Struct(fields)39},40D::BinaryView => D::LargeBinary,41D::Utf8View => D::LargeUtf8,42D::Dictionary(it, dtype, sorted) => {43let dtype = convert_dtype(*dtype);44D::Dictionary(it, Box::new(dtype), sorted)45},46D::Extension(ext) => {47let dtype = convert_dtype(ext.inner);48D::Extension(Box::new(ExtensionType {49inner: dtype,50..*ext51}))52},53dt => dt,54}55}5657fn insert_field_metadata(field: &mut Cow<Field>, options: &ColumnWriteOptions) {58if !options.metadata.is_empty() {59let field = field.to_mut();60let mut metadata = field.metadata.as_deref().cloned().unwrap_or_default();6162for kv in &options.metadata {63metadata.insert(64kv.key.as_str().into(),65kv.value.as_deref().unwrap_or_default().into(),66);67}68field.metadata = Some(Arc::new(metadata));69}7071if let Some(v) = options.required {72if v == field.is_nullable {73let field = field.to_mut();74field.is_nullable = !v;75}76}7778use ArrowDataType as D;79match field.dtype() {80D::Struct(f) => {81let ChildWriteOptions::Struct(o) = &options.children else {82unreachable!();83};8485let mut new_fields = Vec::new();86for (i, (child_field, child_options)) in f.iter().zip(o.children.as_slice()).enumerate()87{88let mut child_field = Cow::Borrowed(child_field);89insert_field_metadata(&mut child_field, child_options);9091if let Cow::Owned(child_field) = child_field {92new_fields.reserve(f.len());93new_fields.extend(f[..i].iter().cloned());94new_fields.push(child_field);95break;96}97}9899if new_fields.is_empty() {100return;101}102103new_fields.extend(104f[new_fields.len()..]105.iter()106.zip(&o.children[new_fields.len()..])107.map(|(child_field, child_options)| {108let mut child_field = Cow::Borrowed(child_field);109insert_field_metadata(&mut child_field, child_options);110child_field.into_owned()111}),112);113field114.to_mut()115.map_dtype_mut(|dtype| *dtype = D::Struct(new_fields));116},117D::List(f) | D::FixedSizeList(f, _) | D::LargeList(f) => {118let ChildWriteOptions::ListLike(o) = &options.children else {119unreachable!();120};121122let mut child_field = Cow::Borrowed(f.as_ref());123insert_field_metadata(&mut child_field, &o.child);124125if let Cow::Owned(child_field) = child_field {126let child_field = Box::new(child_field);127field.to_mut().map_dtype_mut(|dtype| {128*dtype = match dtype {129D::List(_) => D::List(child_field),130D::LargeList(_) => D::LargeList(child_field),131D::FixedSizeList(_, width) => D::FixedSizeList(child_field, *width),132_ => unreachable!(),133}134});135}136},137_ => {},138}139}140141pub fn schema_to_metadata_key(schema: &ArrowSchema, options: &[ColumnWriteOptions]) -> KeyValue {142let mut schema_mut = None;143for (f, options) in schema.iter_values().zip(options) {144let mut field = Cow::Borrowed(f);145insert_field_metadata(&mut field, options);146147if let Cow::Owned(field) = field {148let schema_mut = schema_mut.get_or_insert_with(|| schema.clone());149*schema_mut.get_mut(f.name.as_str()).unwrap() = field;150}151}152153let mut schema = schema;154if let Some(schema_mut) = &schema_mut {155schema = schema_mut;156}157158// Convert schema until more arrow readers are aware of binview159let serialized_schema = if schema.iter_values().any(|field| field.dtype.is_view()) {160let schema = schema161.iter_values()162.map(|field| convert_field(field.clone()))163.map(|x| (x.name.clone(), x))164.collect();165schema_to_bytes(&schema, &default_ipc_fields(schema.iter_values()), None)166} else {167schema_to_bytes(schema, &default_ipc_fields(schema.iter_values()), None)168};169170// manually prepending the length to the schema as arrow uses the legacy IPC format171// TODO: change after addressing ARROW-9777172let schema_len = serialized_schema.len();173let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);174len_prefix_schema.extend_from_slice(&[255u8, 255, 255, 255]);175len_prefix_schema.extend_from_slice(&(schema_len as u32).to_le_bytes());176len_prefix_schema.extend_from_slice(&serialized_schema);177178let encoded = general_purpose::STANDARD.encode(&len_prefix_schema);179180KeyValue {181key: ARROW_SCHEMA_META_KEY.to_string(),182value: Some(encoded),183}184}185186/// Creates a [`ParquetType`] from a [`Field`].187pub fn to_parquet_type(field: &Field, options: &ColumnWriteOptions) -> PolarsResult<ParquetType> {188let name = field.name.clone();189let repetition = if options.required.unwrap_or(!field.is_nullable) {190Repetition::Required191} else {192Repetition::Optional193};194195let field_id = options.field_id;196197// create type from field198let (physical_type, primitive_converted_type, primitive_logical_type) = match field199.dtype()200.to_logical_type()201{202ArrowDataType::Null => (203PhysicalType::Int32,204None,205Some(PrimitiveLogicalType::Unknown),206),207ArrowDataType::Boolean => (PhysicalType::Boolean, None, None),208ArrowDataType::Int32 => (PhysicalType::Int32, None, None),209// ArrowDataType::Duration(_) has no parquet representation => do not apply any logical type210ArrowDataType::Int64 | ArrowDataType::Duration(_) => (PhysicalType::Int64, None, None),211// no natural representation in parquet; leave it as is.212// arrow consumers MAY use the arrow schema in the metadata to parse them.213ArrowDataType::Date64 => (PhysicalType::Int64, None, None),214ArrowDataType::Float32 => (PhysicalType::Float, None, None),215ArrowDataType::Float64 => (PhysicalType::Double, None, None),216ArrowDataType::Binary | ArrowDataType::LargeBinary | ArrowDataType::BinaryView => {217(PhysicalType::ByteArray, None, None)218},219ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View => (220PhysicalType::ByteArray,221Some(PrimitiveConvertedType::Utf8),222Some(PrimitiveLogicalType::String),223),224ArrowDataType::Date32 => (225PhysicalType::Int32,226Some(PrimitiveConvertedType::Date),227Some(PrimitiveLogicalType::Date),228),229ArrowDataType::Int8 => (230PhysicalType::Int32,231Some(PrimitiveConvertedType::Int8),232Some(PrimitiveLogicalType::Integer(IntegerType::Int8)),233),234ArrowDataType::Int16 => (235PhysicalType::Int32,236Some(PrimitiveConvertedType::Int16),237Some(PrimitiveLogicalType::Integer(IntegerType::Int16)),238),239ArrowDataType::UInt8 => (240PhysicalType::Int32,241Some(PrimitiveConvertedType::Uint8),242Some(PrimitiveLogicalType::Integer(IntegerType::UInt8)),243),244ArrowDataType::UInt16 => (245PhysicalType::Int32,246Some(PrimitiveConvertedType::Uint16),247Some(PrimitiveLogicalType::Integer(IntegerType::UInt16)),248),249ArrowDataType::UInt32 => (250PhysicalType::Int32,251Some(PrimitiveConvertedType::Uint32),252Some(PrimitiveLogicalType::Integer(IntegerType::UInt32)),253),254ArrowDataType::UInt64 => (255PhysicalType::Int64,256Some(PrimitiveConvertedType::Uint64),257Some(PrimitiveLogicalType::Integer(IntegerType::UInt64)),258),259// no natural representation in parquet; leave it as is.260// arrow consumers MAY use the arrow schema in the metadata to parse them.261ArrowDataType::Timestamp(TimeUnit::Second, _) => (PhysicalType::Int64, None, None),262ArrowDataType::Timestamp(time_unit, zone) => (263PhysicalType::Int64,264None,265Some(PrimitiveLogicalType::Timestamp {266is_adjusted_to_utc: matches!(zone, Some(z) if !z.as_str().is_empty()),267unit: match time_unit {268TimeUnit::Second => unreachable!(),269TimeUnit::Millisecond => ParquetTimeUnit::Milliseconds,270TimeUnit::Microsecond => ParquetTimeUnit::Microseconds,271TimeUnit::Nanosecond => ParquetTimeUnit::Nanoseconds,272},273}),274),275// no natural representation in parquet; leave it as is.276// arrow consumers MAY use the arrow schema in the metadata to parse them.277ArrowDataType::Time32(TimeUnit::Second) => (PhysicalType::Int32, None, None),278ArrowDataType::Time32(TimeUnit::Millisecond) => (279PhysicalType::Int32,280Some(PrimitiveConvertedType::TimeMillis),281Some(PrimitiveLogicalType::Time {282is_adjusted_to_utc: false,283unit: ParquetTimeUnit::Milliseconds,284}),285),286ArrowDataType::Time64(time_unit) => (287PhysicalType::Int64,288match time_unit {289TimeUnit::Microsecond => Some(PrimitiveConvertedType::TimeMicros),290TimeUnit::Nanosecond => None,291_ => unreachable!(),292},293Some(PrimitiveLogicalType::Time {294is_adjusted_to_utc: false,295unit: match time_unit {296TimeUnit::Microsecond => ParquetTimeUnit::Microseconds,297TimeUnit::Nanosecond => ParquetTimeUnit::Nanoseconds,298_ => unreachable!(),299},300}),301),302ArrowDataType::Struct(fields) => {303if fields.is_empty() {304polars_bail!(InvalidOperation:305"Unable to write struct type with no child field to Parquet. Consider adding a dummy child field.".to_string(),306)307}308309let ChildWriteOptions::Struct(struct_write_options) = &options.children else {310unreachable!();311};312313assert_eq!(fields.len(), struct_write_options.children.len());314315// recursively convert children to types/nodes316let fields = fields317.iter()318.zip(struct_write_options.children.as_slice())319.map(|(f, c)| to_parquet_type(f, c))320.collect::<PolarsResult<Vec<_>>>()?;321return Ok(ParquetType::from_group(322name, repetition, None, None, fields, field_id,323));324},325ArrowDataType::Dictionary(_, value, _) => {326assert!(!value.is_nested());327let dict_field = Field::new(name, value.as_ref().clone(), field.is_nullable);328return to_parquet_type(&dict_field, options);329},330ArrowDataType::FixedSizeBinary(size) => {331(PhysicalType::FixedLenByteArray(*size), None, None)332},333ArrowDataType::Decimal(precision, scale) => {334let precision = *precision;335let scale = *scale;336let logical_type = Some(PrimitiveLogicalType::Decimal(precision, scale));337338let physical_type = if precision <= 9 {339PhysicalType::Int32340} else if precision <= 18 {341PhysicalType::Int64342} else {343let len = decimal_length_from_precision(precision);344PhysicalType::FixedLenByteArray(len)345};346(347physical_type,348Some(PrimitiveConvertedType::Decimal(precision, scale)),349logical_type,350)351},352ArrowDataType::Decimal256(precision, scale) => {353let precision = *precision;354let scale = *scale;355let logical_type = Some(PrimitiveLogicalType::Decimal(precision, scale));356357if precision <= 9 {358(359PhysicalType::Int32,360Some(PrimitiveConvertedType::Decimal(precision, scale)),361logical_type,362)363} else if precision <= 18 {364(365PhysicalType::Int64,366Some(PrimitiveConvertedType::Decimal(precision, scale)),367logical_type,368)369} else if precision <= 38 {370let len = decimal_length_from_precision(precision);371(372PhysicalType::FixedLenByteArray(len),373Some(PrimitiveConvertedType::Decimal(precision, scale)),374logical_type,375)376} else {377(PhysicalType::FixedLenByteArray(32), None, None)378}379},380ArrowDataType::Interval(_) => (381PhysicalType::FixedLenByteArray(12),382Some(PrimitiveConvertedType::Interval),383None,384),385ArrowDataType::Int128 => (PhysicalType::FixedLenByteArray(16), None, None),386ArrowDataType::List(f)387| ArrowDataType::FixedSizeList(f, _)388| ArrowDataType::LargeList(f) => {389let mut f = f.clone();390f.name = PlSmallStr::from_static("element");391392let ChildWriteOptions::ListLike(list_write_options) = &options.children else {393unreachable!();394};395396return Ok(ParquetType::from_group(397name,398repetition,399Some(GroupConvertedType::List),400Some(GroupLogicalType::List),401vec![ParquetType::from_group(402PlSmallStr::from_static("list"),403Repetition::Repeated,404None,405None,406vec![to_parquet_type(&f, &list_write_options.child)?],407None,408)],409field_id,410));411},412other => polars_bail!(nyi = "Writing the data type {other:?} is not yet implemented"),413};414415Ok(ParquetType::try_from_primitive(416name,417physical_type,418repetition,419primitive_converted_type,420primitive_logical_type,421field_id,422)?)423}424425426