Path: blob/main/crates/polars-parquet/src/arrow/write/schema.rs
8446 views
use std::sync::{Arc, LazyLock};12use arrow::datatypes::{3ArrowDataType, ArrowSchema, ExtensionType, Field, PARQUET_EMPTY_STRUCT, TimeUnit,4};5use arrow::io::ipc::write::{default_ipc_fields, schema_to_bytes};6use base64::Engine as _;7use base64::engine::general_purpose;8use polars_error::{PolarsResult, polars_bail};9use polars_utils::pl_str::PlSmallStr;1011use super::super::ARROW_SCHEMA_META_KEY;12use crate::arrow::write::decimal_length_from_precision;13use crate::parquet::metadata::KeyValue;14use crate::parquet::schema::Repetition;15use crate::parquet::schema::types::{16GroupConvertedType, GroupLogicalType, IntegerType, ParquetType, PhysicalType,17PrimitiveConvertedType, PrimitiveLogicalType, TimeUnit as ParquetTimeUnit,18};1920fn convert_field(field: Field) -> Field {21let mut metadata = field.metadata;2223if let ArrowDataType::Struct(fields) = &field.dtype24&& fields.is_empty()25{26Arc::make_mut(metadata.get_or_insert_default()).insert(27PlSmallStr::from_static(PARQUET_EMPTY_STRUCT),28PlSmallStr::EMPTY,29);30}3132Field {33name: field.name,34dtype: convert_dtype(field.dtype),35is_nullable: field.is_nullable,36metadata,37}38}3940fn convert_dtype(dtype: ArrowDataType) -> ArrowDataType {41use ArrowDataType as D;42match dtype {43D::LargeList(field) => D::LargeList(Box::new(convert_field(*field))),44D::Struct(mut fields) => {45for field in &mut fields {46*field = convert_field(std::mem::take(field))47}48D::Struct(fields)49},50D::Dictionary(it, dtype, sorted) => {51let dtype = convert_dtype(*dtype);52D::Dictionary(it, Box::new(dtype), sorted)53},54D::Extension(ext) => {55let dtype = convert_dtype(ext.inner);56D::Extension(Box::new(ExtensionType {57inner: dtype,58..*ext59}))60},61dt => dt,62}63}6465pub fn schema_to_metadata_key(schema: &ArrowSchema) -> KeyValue {66let serialized_schema = if schema.iter_values().any(|field| field.dtype.is_nested()) {67let schema = schema68.iter_values()69.map(|field| convert_field(field.clone()))70.map(|x| (x.name.clone(), x))71.collect();72schema_to_bytes(&schema, &default_ipc_fields(schema.iter_values()), None)73} else {74schema_to_bytes(schema, &default_ipc_fields(schema.iter_values()), None)75};7677// manually prepending the length to the schema as arrow uses the legacy IPC format78// TODO: change after addressing ARROW-977779let schema_len = serialized_schema.len();80let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);81len_prefix_schema.extend_from_slice(&[255u8, 255, 255, 255]);82len_prefix_schema.extend_from_slice(&(schema_len as u32).to_le_bytes());83len_prefix_schema.extend_from_slice(&serialized_schema);8485let encoded = general_purpose::STANDARD.encode(&len_prefix_schema);8687KeyValue {88key: ARROW_SCHEMA_META_KEY.to_string(),89value: Some(encoded),90}91}9293/// Creates a [`ParquetType`] from a [`Field`].94pub fn to_parquet_type(field: &Field) -> PolarsResult<ParquetType> {95let name = field.name.clone();96let repetition = if field.is_nullable {97Repetition::Optional98} else {99Repetition::Required100};101102let field_id: Option<i32> = None;103104// create type from field105let (physical_type, primitive_converted_type, primitive_logical_type) = match field106.dtype()107.to_storage()108{109ArrowDataType::Null => (110PhysicalType::Int32,111None,112Some(PrimitiveLogicalType::Unknown),113),114ArrowDataType::Boolean => (PhysicalType::Boolean, None, None),115ArrowDataType::Int32 => (PhysicalType::Int32, None, None),116// ArrowDataType::Duration(_) has no parquet representation => do not apply any logical type117ArrowDataType::Int64 | ArrowDataType::Duration(_) => (PhysicalType::Int64, None, None),118// no natural representation in parquet; leave it as is.119// arrow consumers MAY use the arrow schema in the metadata to parse them.120ArrowDataType::Date64 => (PhysicalType::Int64, None, None),121ArrowDataType::Float16 => (122PhysicalType::FixedLenByteArray(2),123None,124Some(PrimitiveLogicalType::Float16),125),126ArrowDataType::Float32 => (PhysicalType::Float, None, None),127ArrowDataType::Float64 => (PhysicalType::Double, None, None),128ArrowDataType::Binary | ArrowDataType::LargeBinary | ArrowDataType::BinaryView => {129(PhysicalType::ByteArray, None, None)130},131ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View => (132PhysicalType::ByteArray,133Some(PrimitiveConvertedType::Utf8),134Some(PrimitiveLogicalType::String),135),136ArrowDataType::Date32 => (137PhysicalType::Int32,138Some(PrimitiveConvertedType::Date),139Some(PrimitiveLogicalType::Date),140),141ArrowDataType::Int8 => (142PhysicalType::Int32,143Some(PrimitiveConvertedType::Int8),144Some(PrimitiveLogicalType::Integer(IntegerType::Int8)),145),146ArrowDataType::Int16 => (147PhysicalType::Int32,148Some(PrimitiveConvertedType::Int16),149Some(PrimitiveLogicalType::Integer(IntegerType::Int16)),150),151ArrowDataType::UInt8 => (152PhysicalType::Int32,153Some(PrimitiveConvertedType::Uint8),154Some(PrimitiveLogicalType::Integer(IntegerType::UInt8)),155),156ArrowDataType::UInt16 => (157PhysicalType::Int32,158Some(PrimitiveConvertedType::Uint16),159Some(PrimitiveLogicalType::Integer(IntegerType::UInt16)),160),161ArrowDataType::UInt32 => (162PhysicalType::Int32,163Some(PrimitiveConvertedType::Uint32),164Some(PrimitiveLogicalType::Integer(IntegerType::UInt32)),165),166ArrowDataType::UInt64 => (167PhysicalType::Int64,168Some(PrimitiveConvertedType::Uint64),169Some(PrimitiveLogicalType::Integer(IntegerType::UInt64)),170),171// no natural representation in parquet; leave it as is.172// arrow consumers MAY use the arrow schema in the metadata to parse them.173ArrowDataType::Timestamp(TimeUnit::Second, _) => (PhysicalType::Int64, None, None),174ArrowDataType::Timestamp(time_unit, zone) => (175PhysicalType::Int64,176None,177Some(PrimitiveLogicalType::Timestamp {178is_adjusted_to_utc: matches!(zone, Some(z) if !z.as_str().is_empty()),179unit: match time_unit {180TimeUnit::Second => unreachable!(),181TimeUnit::Millisecond => ParquetTimeUnit::Milliseconds,182TimeUnit::Microsecond => ParquetTimeUnit::Microseconds,183TimeUnit::Nanosecond => ParquetTimeUnit::Nanoseconds,184},185}),186),187// no natural representation in parquet; leave it as is.188// arrow consumers MAY use the arrow schema in the metadata to parse them.189ArrowDataType::Time32(TimeUnit::Second) => (PhysicalType::Int32, None, None),190ArrowDataType::Time32(TimeUnit::Millisecond) => (191PhysicalType::Int32,192Some(PrimitiveConvertedType::TimeMillis),193Some(PrimitiveLogicalType::Time {194is_adjusted_to_utc: false,195unit: ParquetTimeUnit::Milliseconds,196}),197),198ArrowDataType::Time64(time_unit) => (199PhysicalType::Int64,200match time_unit {201TimeUnit::Microsecond => Some(PrimitiveConvertedType::TimeMicros),202TimeUnit::Nanosecond => None,203_ => unreachable!(),204},205Some(PrimitiveLogicalType::Time {206is_adjusted_to_utc: false,207unit: match time_unit {208TimeUnit::Microsecond => ParquetTimeUnit::Microseconds,209TimeUnit::Nanosecond => ParquetTimeUnit::Nanoseconds,210_ => unreachable!(),211},212}),213),214ArrowDataType::Struct(fields) => {215if fields.is_empty() {216static ALLOW_EMPTY_STRUCTS: LazyLock<bool> = LazyLock::new(|| {217std::env::var("POLARS_ALLOW_PQ_EMPTY_STRUCT").is_ok_and(|v| v.as_str() == "1")218});219220if *ALLOW_EMPTY_STRUCTS {221return Ok(ParquetType::try_from_primitive(222name,223PhysicalType::Boolean,224repetition,225None,226None,227field_id,228)?);229} else {230polars_bail!(231InvalidOperation:232"Unable to write struct type with no child field to Parquet. Consider adding a dummy child field.",233)234}235}236237// recursively convert children to types/nodes238let fields = fields239.iter()240.map(to_parquet_type)241.collect::<PolarsResult<Vec<_>>>()?;242return Ok(ParquetType::from_group(243name, repetition, None, None, fields, field_id,244));245},246ArrowDataType::Dictionary(_, value, _) => {247assert!(!value.is_nested());248let dict_field = Field::new(name, value.as_ref().clone(), field.is_nullable);249return to_parquet_type(&dict_field);250},251ArrowDataType::FixedSizeBinary(size) => {252(PhysicalType::FixedLenByteArray(*size), None, None)253},254ArrowDataType::Decimal(precision, scale) => {255let precision = *precision;256let scale = *scale;257let logical_type = Some(PrimitiveLogicalType::Decimal(precision, scale));258259let physical_type = if precision <= 9 {260PhysicalType::Int32261} else if precision <= 18 {262PhysicalType::Int64263} else {264let len = decimal_length_from_precision(precision);265PhysicalType::FixedLenByteArray(len)266};267(268physical_type,269Some(PrimitiveConvertedType::Decimal(precision, scale)),270logical_type,271)272},273ArrowDataType::Decimal256(precision, scale) => {274let precision = *precision;275let scale = *scale;276let logical_type = Some(PrimitiveLogicalType::Decimal(precision, scale));277278if precision <= 9 {279(280PhysicalType::Int32,281Some(PrimitiveConvertedType::Decimal(precision, scale)),282logical_type,283)284} else if precision <= 18 {285(286PhysicalType::Int64,287Some(PrimitiveConvertedType::Decimal(precision, scale)),288logical_type,289)290} else if precision <= 38 {291let len = decimal_length_from_precision(precision);292(293PhysicalType::FixedLenByteArray(len),294Some(PrimitiveConvertedType::Decimal(precision, scale)),295logical_type,296)297} else {298(PhysicalType::FixedLenByteArray(32), None, None)299}300},301ArrowDataType::Interval(_) => (302PhysicalType::FixedLenByteArray(12),303Some(PrimitiveConvertedType::Interval),304None,305),306ArrowDataType::UInt128 | ArrowDataType::Int128 => {307(PhysicalType::FixedLenByteArray(16), None, None)308},309ArrowDataType::List(f)310| ArrowDataType::FixedSizeList(f, _)311| ArrowDataType::LargeList(f) => {312let mut f = f.clone();313f.name = PlSmallStr::from_static("element");314315return Ok(ParquetType::from_group(316name,317repetition,318Some(GroupConvertedType::List),319Some(GroupLogicalType::List),320vec![ParquetType::from_group(321PlSmallStr::from_static("list"),322Repetition::Repeated,323None,324None,325vec![to_parquet_type(&f)?],326None,327)],328field_id,329));330},331other => polars_bail!(nyi = "Writing the data type {other:?} is not yet implemented"),332};333334Ok(ParquetType::try_from_primitive(335name,336physical_type,337repetition,338primitive_converted_type,339primitive_logical_type,340field_id,341)?)342}343344345