Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/avro/write/schema.rs
6940 views
1
use avro_schema::schema::{
2
BytesLogical, Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Record,
3
Schema as AvroSchema,
4
};
5
use polars_error::{PolarsResult, polars_bail};
6
7
use crate::datatypes::*;
8
9
/// Converts a [`ArrowSchema`] to an Avro [`Record`].
10
pub fn to_record(schema: &ArrowSchema, name: String) -> PolarsResult<Record> {
11
let mut name_counter: i32 = 0;
12
let fields = schema
13
.iter_values()
14
.map(|f| field_to_field(f, &mut name_counter))
15
.collect::<PolarsResult<_>>()?;
16
Ok(Record {
17
name,
18
namespace: None,
19
doc: None,
20
aliases: vec![],
21
fields,
22
})
23
}
24
25
fn field_to_field(field: &Field, name_counter: &mut i32) -> PolarsResult<AvroField> {
26
let schema = type_to_schema(field.dtype(), field.is_nullable, name_counter)?;
27
Ok(AvroField::new(field.name.to_string(), schema))
28
}
29
30
fn type_to_schema(
31
dtype: &ArrowDataType,
32
is_nullable: bool,
33
name_counter: &mut i32,
34
) -> PolarsResult<AvroSchema> {
35
Ok(if is_nullable {
36
AvroSchema::Union(vec![
37
AvroSchema::Null,
38
_type_to_schema(dtype, name_counter)?,
39
])
40
} else {
41
_type_to_schema(dtype, name_counter)?
42
})
43
}
44
45
fn _get_field_name(name_counter: &mut i32) -> String {
46
*name_counter += 1;
47
format!("r{name_counter}")
48
}
49
50
fn _type_to_schema(dtype: &ArrowDataType, name_counter: &mut i32) -> PolarsResult<AvroSchema> {
51
Ok(match dtype.to_logical_type() {
52
ArrowDataType::Null => AvroSchema::Null,
53
ArrowDataType::Boolean => AvroSchema::Boolean,
54
ArrowDataType::Int32 => AvroSchema::Int(None),
55
ArrowDataType::Int64 => AvroSchema::Long(None),
56
ArrowDataType::Float32 => AvroSchema::Float,
57
ArrowDataType::Float64 => AvroSchema::Double,
58
ArrowDataType::Binary => AvroSchema::Bytes(None),
59
ArrowDataType::LargeBinary => AvroSchema::Bytes(None),
60
ArrowDataType::Utf8 => AvroSchema::String(None),
61
ArrowDataType::LargeUtf8 => AvroSchema::String(None),
62
ArrowDataType::LargeList(inner) | ArrowDataType::List(inner) => {
63
AvroSchema::Array(Box::new(type_to_schema(
64
&inner.dtype,
65
inner.is_nullable,
66
name_counter,
67
)?))
68
},
69
ArrowDataType::Struct(fields) => AvroSchema::Record(Record::new(
70
_get_field_name(name_counter),
71
fields
72
.iter()
73
.map(|f| field_to_field(f, name_counter))
74
.collect::<PolarsResult<Vec<_>>>()?,
75
)),
76
ArrowDataType::Date32 => AvroSchema::Int(Some(IntLogical::Date)),
77
ArrowDataType::Time32(TimeUnit::Millisecond) => AvroSchema::Int(Some(IntLogical::Time)),
78
ArrowDataType::Time64(TimeUnit::Microsecond) => AvroSchema::Long(Some(LongLogical::Time)),
79
ArrowDataType::Timestamp(TimeUnit::Millisecond, None) => {
80
AvroSchema::Long(Some(LongLogical::LocalTimestampMillis))
81
},
82
ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => {
83
AvroSchema::Long(Some(LongLogical::LocalTimestampMicros))
84
},
85
ArrowDataType::Interval(IntervalUnit::MonthDayNano) => {
86
let mut fixed = Fixed::new("", 12);
87
fixed.logical = Some(FixedLogical::Duration);
88
AvroSchema::Fixed(fixed)
89
},
90
ArrowDataType::FixedSizeBinary(size) => AvroSchema::Fixed(Fixed::new("", *size)),
91
ArrowDataType::Decimal(p, s) => AvroSchema::Bytes(Some(BytesLogical::Decimal(*p, *s))),
92
other => polars_bail!(nyi = "write {other:?} to avro"),
93
})
94
}
95
96