Path: blob/main/crates/polars-arrow/src/io/avro/read/schema.rs
7884 views
use avro_schema::schema::{Enum, Fixed, Record, Schema as AvroSchema};1use polars_error::{PolarsResult, polars_bail};2use polars_utils::pl_str::PlSmallStr;34use crate::datatypes::*;56fn external_props(schema: &AvroSchema) -> Metadata {7let mut props = Metadata::new();8match schema {9AvroSchema::Record(Record { doc: Some(doc), .. })10| AvroSchema::Enum(Enum { doc: Some(doc), .. }) => {11props.insert(12PlSmallStr::from_static("avro::doc"),13PlSmallStr::from_str(doc.as_str()),14);15},16_ => {},17}18props19}2021/// Infers an [`ArrowSchema`] from the root [`Record`].22/// This23pub fn infer_schema(record: &Record) -> PolarsResult<ArrowSchema> {24record25.fields26.iter()27.map(|field| {28let field = schema_to_field(29&field.schema,30Some(&field.name),31external_props(&field.schema),32)?;3334Ok((field.name.clone(), field))35})36.collect::<PolarsResult<ArrowSchema>>()37}3839fn schema_to_field(40schema: &AvroSchema,41name: Option<&str>,42props: Metadata,43) -> PolarsResult<Field> {44let mut nullable = false;45let dtype = match schema {46AvroSchema::Null => ArrowDataType::Null,47AvroSchema::Boolean => ArrowDataType::Boolean,48AvroSchema::Int(logical) => match logical {49Some(logical) => match logical {50avro_schema::schema::IntLogical::Date => ArrowDataType::Date32,51avro_schema::schema::IntLogical::Time => {52ArrowDataType::Time32(TimeUnit::Millisecond)53},54},55None => ArrowDataType::Int32,56},57AvroSchema::Long(logical) => match logical {58Some(logical) => match logical {59avro_schema::schema::LongLogical::Time => {60ArrowDataType::Time64(TimeUnit::Microsecond)61},62avro_schema::schema::LongLogical::TimestampMillis => ArrowDataType::Timestamp(63TimeUnit::Millisecond,64Some(PlSmallStr::from_static("00:00")),65),66avro_schema::schema::LongLogical::TimestampMicros => ArrowDataType::Timestamp(67TimeUnit::Microsecond,68Some(PlSmallStr::from_static("00:00")),69),70avro_schema::schema::LongLogical::LocalTimestampMillis => {71ArrowDataType::Timestamp(TimeUnit::Millisecond, None)72},73avro_schema::schema::LongLogical::LocalTimestampMicros => {74ArrowDataType::Timestamp(TimeUnit::Microsecond, None)75},76},77None => ArrowDataType::Int64,78},79AvroSchema::Float => ArrowDataType::Float32,80AvroSchema::Double => ArrowDataType::Float64,81AvroSchema::Bytes(logical) => match logical {82Some(logical) => match logical {83avro_schema::schema::BytesLogical::Decimal(precision, scale) => {84ArrowDataType::Decimal(*precision, *scale)85},86},87None => ArrowDataType::Binary,88},89AvroSchema::String(_) => ArrowDataType::Utf8,90AvroSchema::Array(item_schema) => ArrowDataType::List(Box::new(schema_to_field(91item_schema,92Some("item"), // default name for list items93Metadata::default(),94)?)),95AvroSchema::Map(_) => todo!("Avro maps are mapped to MapArrays"),96AvroSchema::Union(schemas) => {97// If there are only two variants and one of them is null, set the other type as the field data type98let has_nullable = schemas.iter().any(|x| x == &AvroSchema::Null);99if has_nullable && schemas.len() == 2 {100nullable = true;101if let Some(schema) = schemas102.iter()103.find(|&schema| !matches!(schema, AvroSchema::Null))104{105schema_to_field(schema, None, Metadata::default())?.dtype106} else {107polars_bail!(nyi = "Can't read avro union {schema:?}");108}109} else {110let fields = schemas111.iter()112.map(|s| schema_to_field(s, None, Metadata::default()))113.collect::<PolarsResult<Vec<Field>>>()?;114ArrowDataType::Union(Box::new(UnionType {115fields,116ids: None,117mode: UnionMode::Dense,118}))119}120},121AvroSchema::Record(Record { fields, .. }) => {122let fields = fields123.iter()124.map(|field| {125let mut props = Metadata::new();126if let Some(doc) = &field.doc {127props.insert(128PlSmallStr::from_static("avro::doc"),129PlSmallStr::from_str(doc),130);131}132schema_to_field(&field.schema, Some(&field.name), props)133})134.collect::<PolarsResult<_>>()?;135ArrowDataType::Struct(fields)136},137AvroSchema::Enum { .. } => {138return Ok(Field::new(139PlSmallStr::from_str(name.unwrap_or_default()),140ArrowDataType::Dictionary(IntegerType::Int32, Box::new(ArrowDataType::Utf8), false),141false,142));143},144AvroSchema::Fixed(Fixed { size, logical, .. }) => match logical {145Some(logical) => match logical {146avro_schema::schema::FixedLogical::Decimal(precision, scale) => {147ArrowDataType::Decimal(*precision, *scale)148},149avro_schema::schema::FixedLogical::Duration => {150ArrowDataType::Interval(IntervalUnit::MonthDayNano)151},152},153None => ArrowDataType::FixedSizeBinary(*size),154},155};156157let name = name.unwrap_or_default();158159Ok(Field::new(PlSmallStr::from_str(name), dtype, nullable).with_metadata(props))160}161162163