Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/avro/read/schema.rs
7884 views
1
use avro_schema::schema::{Enum, Fixed, Record, Schema as AvroSchema};
2
use polars_error::{PolarsResult, polars_bail};
3
use polars_utils::pl_str::PlSmallStr;
4
5
use crate::datatypes::*;
6
7
fn external_props(schema: &AvroSchema) -> Metadata {
8
let mut props = Metadata::new();
9
match schema {
10
AvroSchema::Record(Record { doc: Some(doc), .. })
11
| AvroSchema::Enum(Enum { doc: Some(doc), .. }) => {
12
props.insert(
13
PlSmallStr::from_static("avro::doc"),
14
PlSmallStr::from_str(doc.as_str()),
15
);
16
},
17
_ => {},
18
}
19
props
20
}
21
22
/// Infers an [`ArrowSchema`] from the root [`Record`].
23
/// This
24
pub fn infer_schema(record: &Record) -> PolarsResult<ArrowSchema> {
25
record
26
.fields
27
.iter()
28
.map(|field| {
29
let field = schema_to_field(
30
&field.schema,
31
Some(&field.name),
32
external_props(&field.schema),
33
)?;
34
35
Ok((field.name.clone(), field))
36
})
37
.collect::<PolarsResult<ArrowSchema>>()
38
}
39
40
fn schema_to_field(
41
schema: &AvroSchema,
42
name: Option<&str>,
43
props: Metadata,
44
) -> PolarsResult<Field> {
45
let mut nullable = false;
46
let dtype = match schema {
47
AvroSchema::Null => ArrowDataType::Null,
48
AvroSchema::Boolean => ArrowDataType::Boolean,
49
AvroSchema::Int(logical) => match logical {
50
Some(logical) => match logical {
51
avro_schema::schema::IntLogical::Date => ArrowDataType::Date32,
52
avro_schema::schema::IntLogical::Time => {
53
ArrowDataType::Time32(TimeUnit::Millisecond)
54
},
55
},
56
None => ArrowDataType::Int32,
57
},
58
AvroSchema::Long(logical) => match logical {
59
Some(logical) => match logical {
60
avro_schema::schema::LongLogical::Time => {
61
ArrowDataType::Time64(TimeUnit::Microsecond)
62
},
63
avro_schema::schema::LongLogical::TimestampMillis => ArrowDataType::Timestamp(
64
TimeUnit::Millisecond,
65
Some(PlSmallStr::from_static("00:00")),
66
),
67
avro_schema::schema::LongLogical::TimestampMicros => ArrowDataType::Timestamp(
68
TimeUnit::Microsecond,
69
Some(PlSmallStr::from_static("00:00")),
70
),
71
avro_schema::schema::LongLogical::LocalTimestampMillis => {
72
ArrowDataType::Timestamp(TimeUnit::Millisecond, None)
73
},
74
avro_schema::schema::LongLogical::LocalTimestampMicros => {
75
ArrowDataType::Timestamp(TimeUnit::Microsecond, None)
76
},
77
},
78
None => ArrowDataType::Int64,
79
},
80
AvroSchema::Float => ArrowDataType::Float32,
81
AvroSchema::Double => ArrowDataType::Float64,
82
AvroSchema::Bytes(logical) => match logical {
83
Some(logical) => match logical {
84
avro_schema::schema::BytesLogical::Decimal(precision, scale) => {
85
ArrowDataType::Decimal(*precision, *scale)
86
},
87
},
88
None => ArrowDataType::Binary,
89
},
90
AvroSchema::String(_) => ArrowDataType::Utf8,
91
AvroSchema::Array(item_schema) => ArrowDataType::List(Box::new(schema_to_field(
92
item_schema,
93
Some("item"), // default name for list items
94
Metadata::default(),
95
)?)),
96
AvroSchema::Map(_) => todo!("Avro maps are mapped to MapArrays"),
97
AvroSchema::Union(schemas) => {
98
// If there are only two variants and one of them is null, set the other type as the field data type
99
let has_nullable = schemas.iter().any(|x| x == &AvroSchema::Null);
100
if has_nullable && schemas.len() == 2 {
101
nullable = true;
102
if let Some(schema) = schemas
103
.iter()
104
.find(|&schema| !matches!(schema, AvroSchema::Null))
105
{
106
schema_to_field(schema, None, Metadata::default())?.dtype
107
} else {
108
polars_bail!(nyi = "Can't read avro union {schema:?}");
109
}
110
} else {
111
let fields = schemas
112
.iter()
113
.map(|s| schema_to_field(s, None, Metadata::default()))
114
.collect::<PolarsResult<Vec<Field>>>()?;
115
ArrowDataType::Union(Box::new(UnionType {
116
fields,
117
ids: None,
118
mode: UnionMode::Dense,
119
}))
120
}
121
},
122
AvroSchema::Record(Record { fields, .. }) => {
123
let fields = fields
124
.iter()
125
.map(|field| {
126
let mut props = Metadata::new();
127
if let Some(doc) = &field.doc {
128
props.insert(
129
PlSmallStr::from_static("avro::doc"),
130
PlSmallStr::from_str(doc),
131
);
132
}
133
schema_to_field(&field.schema, Some(&field.name), props)
134
})
135
.collect::<PolarsResult<_>>()?;
136
ArrowDataType::Struct(fields)
137
},
138
AvroSchema::Enum { .. } => {
139
return Ok(Field::new(
140
PlSmallStr::from_str(name.unwrap_or_default()),
141
ArrowDataType::Dictionary(IntegerType::Int32, Box::new(ArrowDataType::Utf8), false),
142
false,
143
));
144
},
145
AvroSchema::Fixed(Fixed { size, logical, .. }) => match logical {
146
Some(logical) => match logical {
147
avro_schema::schema::FixedLogical::Decimal(precision, scale) => {
148
ArrowDataType::Decimal(*precision, *scale)
149
},
150
avro_schema::schema::FixedLogical::Duration => {
151
ArrowDataType::Interval(IntervalUnit::MonthDayNano)
152
},
153
},
154
None => ArrowDataType::FixedSizeBinary(*size),
155
},
156
};
157
158
let name = name.unwrap_or_default();
159
160
Ok(Field::new(PlSmallStr::from_str(name), dtype, nullable).with_metadata(props))
161
}
162
163