Path: blob/main/crates/polars-arrow/src/io/avro/write/schema.rs
6940 views
use avro_schema::schema::{1BytesLogical, Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Record,2Schema as AvroSchema,3};4use polars_error::{PolarsResult, polars_bail};56use crate::datatypes::*;78/// Converts a [`ArrowSchema`] to an Avro [`Record`].9pub fn to_record(schema: &ArrowSchema, name: String) -> PolarsResult<Record> {10let mut name_counter: i32 = 0;11let fields = schema12.iter_values()13.map(|f| field_to_field(f, &mut name_counter))14.collect::<PolarsResult<_>>()?;15Ok(Record {16name,17namespace: None,18doc: None,19aliases: vec![],20fields,21})22}2324fn field_to_field(field: &Field, name_counter: &mut i32) -> PolarsResult<AvroField> {25let schema = type_to_schema(field.dtype(), field.is_nullable, name_counter)?;26Ok(AvroField::new(field.name.to_string(), schema))27}2829fn type_to_schema(30dtype: &ArrowDataType,31is_nullable: bool,32name_counter: &mut i32,33) -> PolarsResult<AvroSchema> {34Ok(if is_nullable {35AvroSchema::Union(vec![36AvroSchema::Null,37_type_to_schema(dtype, name_counter)?,38])39} else {40_type_to_schema(dtype, name_counter)?41})42}4344fn _get_field_name(name_counter: &mut i32) -> String {45*name_counter += 1;46format!("r{name_counter}")47}4849fn _type_to_schema(dtype: &ArrowDataType, name_counter: &mut i32) -> PolarsResult<AvroSchema> {50Ok(match dtype.to_logical_type() {51ArrowDataType::Null => AvroSchema::Null,52ArrowDataType::Boolean => AvroSchema::Boolean,53ArrowDataType::Int32 => AvroSchema::Int(None),54ArrowDataType::Int64 => AvroSchema::Long(None),55ArrowDataType::Float32 => AvroSchema::Float,56ArrowDataType::Float64 => AvroSchema::Double,57ArrowDataType::Binary => AvroSchema::Bytes(None),58ArrowDataType::LargeBinary => AvroSchema::Bytes(None),59ArrowDataType::Utf8 => AvroSchema::String(None),60ArrowDataType::LargeUtf8 => AvroSchema::String(None),61ArrowDataType::LargeList(inner) | ArrowDataType::List(inner) => {62AvroSchema::Array(Box::new(type_to_schema(63&inner.dtype,64inner.is_nullable,65name_counter,66)?))67},68ArrowDataType::Struct(fields) => AvroSchema::Record(Record::new(69_get_field_name(name_counter),70fields71.iter()72.map(|f| field_to_field(f, name_counter))73.collect::<PolarsResult<Vec<_>>>()?,74)),75ArrowDataType::Date32 => AvroSchema::Int(Some(IntLogical::Date)),76ArrowDataType::Time32(TimeUnit::Millisecond) => AvroSchema::Int(Some(IntLogical::Time)),77ArrowDataType::Time64(TimeUnit::Microsecond) => AvroSchema::Long(Some(LongLogical::Time)),78ArrowDataType::Timestamp(TimeUnit::Millisecond, None) => {79AvroSchema::Long(Some(LongLogical::LocalTimestampMillis))80},81ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => {82AvroSchema::Long(Some(LongLogical::LocalTimestampMicros))83},84ArrowDataType::Interval(IntervalUnit::MonthDayNano) => {85let mut fixed = Fixed::new("", 12);86fixed.logical = Some(FixedLogical::Duration);87AvroSchema::Fixed(fixed)88},89ArrowDataType::FixedSizeBinary(size) => AvroSchema::Fixed(Fixed::new("", *size)),90ArrowDataType::Decimal(p, s) => AvroSchema::Bytes(Some(BytesLogical::Decimal(*p, *s))),91other => polars_bail!(nyi = "write {other:?} to avro"),92})93}949596