Path: blob/main/crates/polars-arrow/src/io/avro/write/serialize.rs
6940 views
use avro_schema::schema::{Record, Schema as AvroSchema};1use avro_schema::write::encode;23use super::super::super::iterator::*;4use crate::array::*;5use crate::bitmap::utils::ZipValidity;6use crate::datatypes::{ArrowDataType, IntervalUnit, PhysicalType, PrimitiveType};7use crate::offset::Offset;8use crate::types::months_days_ns;910// Zigzag representation of false and true respectively.11const IS_NULL: u8 = 0;12const IS_VALID: u8 = 2;1314/// A type alias for a boxed [`StreamingIterator`], used to write arrays into avro rows15/// (i.e. a column -> row transposition of types known at run-time)16pub type BoxSerializer<'a> = Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>;1718fn utf8_required<O: Offset>(array: &Utf8Array<O>) -> BoxSerializer<'_> {19Box::new(BufStreamingIterator::new(20array.values_iter(),21|x, buf| {22encode::zigzag_encode(x.len() as i64, buf).unwrap();23buf.extend_from_slice(x.as_bytes());24},25vec![],26))27}2829fn utf8_optional<O: Offset>(array: &Utf8Array<O>) -> BoxSerializer<'_> {30Box::new(BufStreamingIterator::new(31array.iter(),32|x, buf| {33if let Some(x) = x {34buf.push(IS_VALID);35encode::zigzag_encode(x.len() as i64, buf).unwrap();36buf.extend_from_slice(x.as_bytes());37} else {38buf.push(IS_NULL);39}40},41vec![],42))43}4445fn binary_required<O: Offset>(array: &BinaryArray<O>) -> BoxSerializer<'_> {46Box::new(BufStreamingIterator::new(47array.values_iter(),48|x, buf| {49encode::zigzag_encode(x.len() as i64, buf).unwrap();50buf.extend_from_slice(x);51},52vec![],53))54}5556fn binary_optional<O: Offset>(array: &BinaryArray<O>) -> BoxSerializer<'_> {57Box::new(BufStreamingIterator::new(58array.iter(),59|x, buf| {60if let Some(x) = x {61buf.push(IS_VALID);62encode::zigzag_encode(x.len() as i64, buf).unwrap();63buf.extend_from_slice(x);64} else {65buf.push(IS_NULL);66}67},68vec![],69))70}7172fn fixed_size_binary_required(array: &FixedSizeBinaryArray) -> BoxSerializer<'_> {73Box::new(BufStreamingIterator::new(74array.values_iter(),75|x, buf| {76buf.extend_from_slice(x);77},78vec![],79))80}8182fn fixed_size_binary_optional(array: &FixedSizeBinaryArray) -> BoxSerializer<'_> {83Box::new(BufStreamingIterator::new(84array.iter(),85|x, buf| {86if let Some(x) = x {87buf.push(IS_VALID);88buf.extend_from_slice(x);89} else {90buf.push(IS_NULL);91}92},93vec![],94))95}9697fn list_required<'a, O: Offset>(array: &'a ListArray<O>, schema: &AvroSchema) -> BoxSerializer<'a> {98let mut inner = new_serializer(array.values().as_ref(), schema);99let lengths = array100.offsets()101.buffer()102.windows(2)103.map(|w| (w[1] - w[0]).to_usize() as i64);104105Box::new(BufStreamingIterator::new(106lengths,107move |length, buf| {108encode::zigzag_encode(length, buf).unwrap();109let mut rows = 0;110while let Some(item) = inner.next() {111buf.extend_from_slice(item);112rows += 1;113if rows == length {114encode::zigzag_encode(0, buf).unwrap();115break;116}117}118},119vec![],120))121}122123fn list_optional<'a, O: Offset>(array: &'a ListArray<O>, schema: &AvroSchema) -> BoxSerializer<'a> {124let mut inner = new_serializer(array.values().as_ref(), schema);125let lengths = array126.offsets()127.buffer()128.windows(2)129.map(|w| (w[1] - w[0]).to_usize() as i64);130let lengths = ZipValidity::new_with_validity(lengths, array.validity());131132Box::new(BufStreamingIterator::new(133lengths,134move |length, buf| {135if let Some(length) = length {136buf.push(IS_VALID);137encode::zigzag_encode(length, buf).unwrap();138let mut rows = 0;139while let Some(item) = inner.next() {140buf.extend_from_slice(item);141rows += 1;142if rows == length {143encode::zigzag_encode(0, buf).unwrap();144break;145}146}147} else {148buf.push(IS_NULL);149}150},151vec![],152))153}154155fn struct_required<'a>(array: &'a StructArray, schema: &Record) -> BoxSerializer<'a> {156let schemas = schema.fields.iter().map(|x| &x.schema);157let mut inner = array158.values()159.iter()160.zip(schemas)161.map(|(x, schema)| new_serializer(x.as_ref(), schema))162.collect::<Vec<_>>();163164Box::new(BufStreamingIterator::new(1650..array.len(),166move |_, buf| {167inner168.iter_mut()169.for_each(|item| buf.extend_from_slice(item.next().unwrap()))170},171vec![],172))173}174175fn struct_optional<'a>(array: &'a StructArray, schema: &Record) -> BoxSerializer<'a> {176let schemas = schema.fields.iter().map(|x| &x.schema);177let mut inner = array178.values()179.iter()180.zip(schemas)181.map(|(x, schema)| new_serializer(x.as_ref(), schema))182.collect::<Vec<_>>();183184let iterator = ZipValidity::new_with_validity(0..array.len(), array.validity());185186Box::new(BufStreamingIterator::new(187iterator,188move |maybe, buf| {189if maybe.is_some() {190buf.push(IS_VALID);191inner192.iter_mut()193.for_each(|item| buf.extend_from_slice(item.next().unwrap()))194} else {195buf.push(IS_NULL);196// skip the item197inner.iter_mut().for_each(|item| {198let _ = item.next().unwrap();199});200}201},202vec![],203))204}205206/// Creates a [`StreamingIterator`] trait object that presents items from `array`207/// encoded according to `schema`.208/// # Panic209/// This function panics iff the `dtype` is not supported (use [`can_serialize`] to check)210/// # Implementation211/// This function performs minimal CPU work: it dynamically dispatches based on the schema212/// and arrow type.213pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSerializer<'a> {214let dtype = array.dtype().to_physical_type();215216match (dtype, schema) {217(PhysicalType::Boolean, AvroSchema::Boolean) => {218let values = array.as_any().downcast_ref::<BooleanArray>().unwrap();219Box::new(BufStreamingIterator::new(220values.values_iter(),221|x, buf| {222buf.push(x as u8);223},224vec![],225))226},227(PhysicalType::Boolean, AvroSchema::Union(_)) => {228let values = array.as_any().downcast_ref::<BooleanArray>().unwrap();229Box::new(BufStreamingIterator::new(230values.iter(),231|x, buf| {232if let Some(x) = x {233buf.extend_from_slice(&[IS_VALID, x as u8]);234} else {235buf.push(IS_NULL);236}237},238vec![],239))240},241(PhysicalType::Utf8, AvroSchema::Union(_)) => {242utf8_optional::<i32>(array.as_any().downcast_ref().unwrap())243},244(PhysicalType::LargeUtf8, AvroSchema::Union(_)) => {245utf8_optional::<i64>(array.as_any().downcast_ref().unwrap())246},247(PhysicalType::Utf8, AvroSchema::String(_)) => {248utf8_required::<i32>(array.as_any().downcast_ref().unwrap())249},250(PhysicalType::LargeUtf8, AvroSchema::String(_)) => {251utf8_required::<i64>(array.as_any().downcast_ref().unwrap())252},253(PhysicalType::Binary, AvroSchema::Union(_)) => {254binary_optional::<i32>(array.as_any().downcast_ref().unwrap())255},256(PhysicalType::LargeBinary, AvroSchema::Union(_)) => {257binary_optional::<i64>(array.as_any().downcast_ref().unwrap())258},259(PhysicalType::FixedSizeBinary, AvroSchema::Union(_)) => {260fixed_size_binary_optional(array.as_any().downcast_ref().unwrap())261},262(PhysicalType::Binary, AvroSchema::Bytes(_)) => {263binary_required::<i32>(array.as_any().downcast_ref().unwrap())264},265(PhysicalType::LargeBinary, AvroSchema::Bytes(_)) => {266binary_required::<i64>(array.as_any().downcast_ref().unwrap())267},268(PhysicalType::FixedSizeBinary, AvroSchema::Fixed(_)) => {269fixed_size_binary_required(array.as_any().downcast_ref().unwrap())270},271272(PhysicalType::Primitive(PrimitiveType::Int32), AvroSchema::Union(_)) => {273let values = array274.as_any()275.downcast_ref::<PrimitiveArray<i32>>()276.unwrap();277Box::new(BufStreamingIterator::new(278values.iter(),279|x, buf| {280if let Some(x) = x {281buf.push(IS_VALID);282encode::zigzag_encode(*x as i64, buf).unwrap();283} else {284buf.push(IS_NULL);285}286},287vec![],288))289},290(PhysicalType::Primitive(PrimitiveType::Int32), AvroSchema::Int(_)) => {291let values = array292.as_any()293.downcast_ref::<PrimitiveArray<i32>>()294.unwrap();295Box::new(BufStreamingIterator::new(296values.values().iter(),297|x, buf| {298encode::zigzag_encode(*x as i64, buf).unwrap();299},300vec![],301))302},303(PhysicalType::Primitive(PrimitiveType::Int64), AvroSchema::Union(_)) => {304let values = array305.as_any()306.downcast_ref::<PrimitiveArray<i64>>()307.unwrap();308Box::new(BufStreamingIterator::new(309values.iter(),310|x, buf| {311if let Some(x) = x {312buf.push(IS_VALID);313encode::zigzag_encode(*x, buf).unwrap();314} else {315buf.push(IS_NULL);316}317},318vec![],319))320},321(PhysicalType::Primitive(PrimitiveType::Int64), AvroSchema::Long(_)) => {322let values = array323.as_any()324.downcast_ref::<PrimitiveArray<i64>>()325.unwrap();326Box::new(BufStreamingIterator::new(327values.values().iter(),328|x, buf| {329encode::zigzag_encode(*x, buf).unwrap();330},331vec![],332))333},334(PhysicalType::Primitive(PrimitiveType::Float32), AvroSchema::Union(_)) => {335let values = array336.as_any()337.downcast_ref::<PrimitiveArray<f32>>()338.unwrap();339Box::new(BufStreamingIterator::new(340values.iter(),341|x, buf| {342if let Some(x) = x {343buf.push(IS_VALID);344buf.extend(x.to_le_bytes())345} else {346buf.push(IS_NULL);347}348},349vec![],350))351},352(PhysicalType::Primitive(PrimitiveType::Float32), AvroSchema::Float) => {353let values = array354.as_any()355.downcast_ref::<PrimitiveArray<f32>>()356.unwrap();357Box::new(BufStreamingIterator::new(358values.values().iter(),359|x, buf| {360buf.extend_from_slice(&x.to_le_bytes());361},362vec![],363))364},365(PhysicalType::Primitive(PrimitiveType::Float64), AvroSchema::Union(_)) => {366let values = array367.as_any()368.downcast_ref::<PrimitiveArray<f64>>()369.unwrap();370Box::new(BufStreamingIterator::new(371values.iter(),372|x, buf| {373if let Some(x) = x {374buf.push(IS_VALID);375buf.extend(x.to_le_bytes())376} else {377buf.push(IS_NULL);378}379},380vec![],381))382},383(PhysicalType::Primitive(PrimitiveType::Float64), AvroSchema::Double) => {384let values = array385.as_any()386.downcast_ref::<PrimitiveArray<f64>>()387.unwrap();388Box::new(BufStreamingIterator::new(389values.values().iter(),390|x, buf| {391buf.extend_from_slice(&x.to_le_bytes());392},393vec![],394))395},396(PhysicalType::Primitive(PrimitiveType::Int128), AvroSchema::Bytes(_)) => {397let values = array398.as_any()399.downcast_ref::<PrimitiveArray<i128>>()400.unwrap();401Box::new(BufStreamingIterator::new(402values.values().iter(),403|x, buf| {404let len = ((x.leading_zeros() / 8) - ((x.leading_zeros() / 8) % 2)) as usize;405encode::zigzag_encode((16 - len) as i64, buf).unwrap();406buf.extend_from_slice(&x.to_be_bytes()[len..]);407},408vec![],409))410},411(PhysicalType::Primitive(PrimitiveType::Int128), AvroSchema::Union(_)) => {412let values = array413.as_any()414.downcast_ref::<PrimitiveArray<i128>>()415.unwrap();416Box::new(BufStreamingIterator::new(417values.iter(),418|x, buf| {419if let Some(x) = x {420buf.push(IS_VALID);421let len =422((x.leading_zeros() / 8) - ((x.leading_zeros() / 8) % 2)) as usize;423encode::zigzag_encode((16 - len) as i64, buf).unwrap();424buf.extend_from_slice(&x.to_be_bytes()[len..]);425} else {426buf.push(IS_NULL);427}428},429vec![],430))431},432(PhysicalType::Primitive(PrimitiveType::MonthDayNano), AvroSchema::Fixed(_)) => {433let values = array434.as_any()435.downcast_ref::<PrimitiveArray<months_days_ns>>()436.unwrap();437Box::new(BufStreamingIterator::new(438values.values().iter(),439interval_write,440vec![],441))442},443(PhysicalType::Primitive(PrimitiveType::MonthDayNano), AvroSchema::Union(_)) => {444let values = array445.as_any()446.downcast_ref::<PrimitiveArray<months_days_ns>>()447.unwrap();448Box::new(BufStreamingIterator::new(449values.iter(),450|x, buf| {451if let Some(x) = x {452buf.push(IS_VALID);453interval_write(x, buf)454} else {455buf.push(IS_NULL);456}457},458vec![],459))460},461462(PhysicalType::List, AvroSchema::Array(schema)) => {463list_required::<i32>(array.as_any().downcast_ref().unwrap(), schema.as_ref())464},465(PhysicalType::LargeList, AvroSchema::Array(schema)) => {466list_required::<i64>(array.as_any().downcast_ref().unwrap(), schema.as_ref())467},468(PhysicalType::List, AvroSchema::Union(inner)) => {469let schema = if let AvroSchema::Array(schema) = &inner[1] {470schema.as_ref()471} else {472unreachable!("The schema declaration does not match the deserialization")473};474list_optional::<i32>(array.as_any().downcast_ref().unwrap(), schema)475},476(PhysicalType::LargeList, AvroSchema::Union(inner)) => {477let schema = if let AvroSchema::Array(schema) = &inner[1] {478schema.as_ref()479} else {480unreachable!("The schema declaration does not match the deserialization")481};482list_optional::<i64>(array.as_any().downcast_ref().unwrap(), schema)483},484(PhysicalType::Struct, AvroSchema::Record(inner)) => {485struct_required(array.as_any().downcast_ref().unwrap(), inner)486},487(PhysicalType::Struct, AvroSchema::Union(inner)) => {488let inner = if let AvroSchema::Record(inner) = &inner[1] {489inner490} else {491unreachable!("The schema declaration does not match the deserialization")492};493struct_optional(array.as_any().downcast_ref().unwrap(), inner)494},495(a, b) => todo!("{:?} -> {:?} not supported", a, b),496}497}498499/// Whether [`new_serializer`] supports `dtype`.500pub fn can_serialize(dtype: &ArrowDataType) -> bool {501use ArrowDataType::*;502match dtype.to_logical_type() {503List(inner) => return can_serialize(&inner.dtype),504LargeList(inner) => return can_serialize(&inner.dtype),505Struct(inner) => return inner.iter().all(|inner| can_serialize(&inner.dtype)),506_ => {},507};508509matches!(510dtype,511Boolean512| Int32513| Int64514| Float32515| Float64516| Decimal(_, _)517| Utf8518| Binary519| FixedSizeBinary(_)520| LargeUtf8521| LargeBinary522| Interval(IntervalUnit::MonthDayNano)523)524}525526#[inline]527fn interval_write(x: &months_days_ns, buf: &mut Vec<u8>) {528// https://avro.apache.org/docs/current/spec.html#Duration529// 12 bytes, months, days, millis in LE530buf.reserve(12);531buf.extend(x.months().to_le_bytes());532buf.extend(x.days().to_le_bytes());533buf.extend(((x.ns() / 1_000_000) as i32).to_le_bytes());534}535536537