Path: blob/main/crates/polars-arrow/src/io/ipc/write/schema.rs
6940 views
use arrow_format::ipc::planus::Builder;12use super::super::IpcField;3use crate::datatypes::{4ArrowDataType, ArrowSchema, Field, IntegerType, IntervalUnit, Metadata, TimeUnit, UnionMode,5};6use crate::io::ipc::endianness::is_native_little_endian;78/// Converts a [ArrowSchema] and [IpcField]s to a flatbuffers-encoded [arrow_format::ipc::Message].9pub fn schema_to_bytes(10schema: &ArrowSchema,11ipc_fields: &[IpcField],12custom_metadata: Option<&Metadata>,13) -> Vec<u8> {14let schema = serialize_schema(schema, ipc_fields, custom_metadata);1516let message = arrow_format::ipc::Message {17version: arrow_format::ipc::MetadataVersion::V5,18header: Some(arrow_format::ipc::MessageHeader::Schema(Box::new(schema))),19body_length: 0,20custom_metadata: None, // todo: allow writing custom metadata21};22let mut builder = Builder::new();23let footer_data = builder.finish(&message, None);24footer_data.to_vec()25}2627pub fn serialize_schema(28schema: &ArrowSchema,29ipc_fields: &[IpcField],30custom_schema_metadata: Option<&Metadata>,31) -> arrow_format::ipc::Schema {32let endianness = if is_native_little_endian() {33arrow_format::ipc::Endianness::Little34} else {35arrow_format::ipc::Endianness::Big36};3738let fields = schema39.iter_values()40.zip(ipc_fields.iter())41.map(|(field, ipc_field)| serialize_field(field, ipc_field))42.collect::<Vec<_>>();4344let custom_metadata = custom_schema_metadata.and_then(|custom_meta| {45let as_kv = custom_meta46.iter()47.map(|(key, val)| key_value(key.clone().into_string(), val.clone().into_string()))48.collect::<Vec<_>>();49(!as_kv.is_empty()).then_some(as_kv)50});5152arrow_format::ipc::Schema {53endianness,54fields: Some(fields),55custom_metadata,56features: None, // todo add this one57}58}5960fn key_value(key: impl Into<String>, val: impl Into<String>) -> arrow_format::ipc::KeyValue {61arrow_format::ipc::KeyValue {62key: Some(key.into()),63value: Some(val.into()),64}65}6667fn write_metadata(metadata: &Metadata, kv_vec: &mut Vec<arrow_format::ipc::KeyValue>) {68for (k, v) in metadata {69if k.as_str() != "ARROW:extension:name" && k.as_str() != "ARROW:extension:metadata" {70kv_vec.push(key_value(k.clone().into_string(), v.clone().into_string()));71}72}73}7475fn write_extension(76name: &str,77metadata: Option<&str>,78kv_vec: &mut Vec<arrow_format::ipc::KeyValue>,79) {80if let Some(metadata) = metadata {81kv_vec.push(key_value("ARROW:extension:metadata".to_string(), metadata));82}8384kv_vec.push(key_value("ARROW:extension:name".to_string(), name));85}8687/// Create an IPC Field from an Arrow Field88pub(crate) fn serialize_field(field: &Field, ipc_field: &IpcField) -> arrow_format::ipc::Field {89// custom metadata.90let mut kv_vec = vec![];91if let ArrowDataType::Extension(ext) = field.dtype() {92write_extension(93&ext.name,94ext.metadata.as_ref().map(|x| x.as_str()),95&mut kv_vec,96);97}9899let type_ = serialize_type(field.dtype());100let children = serialize_children(field.dtype(), ipc_field);101102let dictionary = if let ArrowDataType::Dictionary(index_type, inner, is_ordered) = field.dtype()103{104if let ArrowDataType::Extension(ext) = inner.as_ref() {105write_extension(106ext.name.as_str(),107ext.metadata.as_ref().map(|x| x.as_str()),108&mut kv_vec,109);110}111Some(serialize_dictionary(112index_type,113ipc_field114.dictionary_id115.expect("All Dictionary types have `dict_id`"),116*is_ordered,117))118} else {119None120};121122if let Some(metadata) = &field.metadata {123write_metadata(metadata, &mut kv_vec);124}125126let custom_metadata = if !kv_vec.is_empty() {127Some(kv_vec)128} else {129None130};131132arrow_format::ipc::Field {133name: Some(field.name.to_string()),134nullable: field.is_nullable,135type_: Some(type_),136dictionary: dictionary.map(Box::new),137children: Some(children),138custom_metadata,139}140}141142fn serialize_time_unit(unit: &TimeUnit) -> arrow_format::ipc::TimeUnit {143match unit {144TimeUnit::Second => arrow_format::ipc::TimeUnit::Second,145TimeUnit::Millisecond => arrow_format::ipc::TimeUnit::Millisecond,146TimeUnit::Microsecond => arrow_format::ipc::TimeUnit::Microsecond,147TimeUnit::Nanosecond => arrow_format::ipc::TimeUnit::Nanosecond,148}149}150151fn serialize_type(dtype: &ArrowDataType) -> arrow_format::ipc::Type {152use ArrowDataType::*;153use arrow_format::ipc;154match dtype {155Null => ipc::Type::Null(Box::new(ipc::Null {})),156Boolean => ipc::Type::Bool(Box::new(ipc::Bool {})),157UInt8 => ipc::Type::Int(Box::new(ipc::Int {158bit_width: 8,159is_signed: false,160})),161UInt16 => ipc::Type::Int(Box::new(ipc::Int {162bit_width: 16,163is_signed: false,164})),165UInt32 => ipc::Type::Int(Box::new(ipc::Int {166bit_width: 32,167is_signed: false,168})),169UInt64 => ipc::Type::Int(Box::new(ipc::Int {170bit_width: 64,171is_signed: false,172})),173Int8 => ipc::Type::Int(Box::new(ipc::Int {174bit_width: 8,175is_signed: true,176})),177Int16 => ipc::Type::Int(Box::new(ipc::Int {178bit_width: 16,179is_signed: true,180})),181Int32 => ipc::Type::Int(Box::new(ipc::Int {182bit_width: 32,183is_signed: true,184})),185Int64 => ipc::Type::Int(Box::new(ipc::Int {186bit_width: 64,187is_signed: true,188})),189Int128 => ipc::Type::Int(Box::new(ipc::Int {190bit_width: 128,191is_signed: true,192})),193Float16 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {194precision: ipc::Precision::Half,195})),196Float32 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {197precision: ipc::Precision::Single,198})),199Float64 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {200precision: ipc::Precision::Double,201})),202Decimal(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {203precision: *precision as i32,204scale: *scale as i32,205bit_width: 128,206})),207Decimal32(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {208precision: *precision as i32,209scale: *scale as i32,210bit_width: 32,211})),212Decimal64(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {213precision: *precision as i32,214scale: *scale as i32,215bit_width: 64,216})),217Decimal256(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {218precision: *precision as i32,219scale: *scale as i32,220bit_width: 256,221})),222Binary => ipc::Type::Binary(Box::new(ipc::Binary {})),223LargeBinary => ipc::Type::LargeBinary(Box::new(ipc::LargeBinary {})),224Utf8 => ipc::Type::Utf8(Box::new(ipc::Utf8 {})),225LargeUtf8 => ipc::Type::LargeUtf8(Box::new(ipc::LargeUtf8 {})),226FixedSizeBinary(size) => ipc::Type::FixedSizeBinary(Box::new(ipc::FixedSizeBinary {227byte_width: *size as i32,228})),229Date32 => ipc::Type::Date(Box::new(ipc::Date {230unit: ipc::DateUnit::Day,231})),232Date64 => ipc::Type::Date(Box::new(ipc::Date {233unit: ipc::DateUnit::Millisecond,234})),235Duration(unit) => ipc::Type::Duration(Box::new(ipc::Duration {236unit: serialize_time_unit(unit),237})),238Time32(unit) => ipc::Type::Time(Box::new(ipc::Time {239unit: serialize_time_unit(unit),240bit_width: 32,241})),242Time64(unit) => ipc::Type::Time(Box::new(ipc::Time {243unit: serialize_time_unit(unit),244bit_width: 64,245})),246Timestamp(unit, tz) => ipc::Type::Timestamp(Box::new(ipc::Timestamp {247unit: serialize_time_unit(unit),248timezone: tz.as_ref().map(|x| x.to_string()),249})),250Interval(unit) => ipc::Type::Interval(Box::new(ipc::Interval {251unit: match unit {252IntervalUnit::YearMonth => ipc::IntervalUnit::YearMonth,253IntervalUnit::DayTime => ipc::IntervalUnit::DayTime,254IntervalUnit::MonthDayNano => ipc::IntervalUnit::MonthDayNano,255},256})),257List(_) => ipc::Type::List(Box::new(ipc::List {})),258LargeList(_) => ipc::Type::LargeList(Box::new(ipc::LargeList {})),259FixedSizeList(_, size) => ipc::Type::FixedSizeList(Box::new(ipc::FixedSizeList {260list_size: *size as i32,261})),262Union(u) => ipc::Type::Union(Box::new(ipc::Union {263mode: match u.mode {264UnionMode::Dense => ipc::UnionMode::Dense,265UnionMode::Sparse => ipc::UnionMode::Sparse,266},267type_ids: u.ids.clone(),268})),269Map(_, keys_sorted) => ipc::Type::Map(Box::new(ipc::Map {270keys_sorted: *keys_sorted,271})),272Struct(_) => ipc::Type::Struct(Box::new(ipc::Struct {})),273Dictionary(_, v, _) => serialize_type(v),274Extension(ext) => serialize_type(&ext.inner),275Utf8View => ipc::Type::Utf8View(Box::new(ipc::Utf8View {})),276BinaryView => ipc::Type::BinaryView(Box::new(ipc::BinaryView {})),277Unknown => unimplemented!(),278}279}280281fn serialize_children(282dtype: &ArrowDataType,283ipc_field: &IpcField,284) -> Vec<arrow_format::ipc::Field> {285use ArrowDataType::*;286match dtype {287Null288| Boolean289| Int8290| Int16291| Int32292| Int64293| UInt8294| UInt16295| UInt32296| UInt64297| Int128298| Float16299| Float32300| Float64301| Timestamp(_, _)302| Date32303| Date64304| Time32(_)305| Time64(_)306| Duration(_)307| Interval(_)308| Binary309| FixedSizeBinary(_)310| LargeBinary311| Utf8312| LargeUtf8313| Utf8View314| BinaryView315| Decimal(_, _)316| Decimal32(_, _)317| Decimal64(_, _)318| Decimal256(_, _) => vec![],319FixedSizeList(inner, _) | LargeList(inner) | List(inner) | Map(inner, _) => {320vec![serialize_field(inner, &ipc_field.fields[0])]321},322Struct(fields) => fields323.iter()324.zip(ipc_field.fields.iter())325.map(|(field, ipc)| serialize_field(field, ipc))326.collect(),327Union(u) => u328.fields329.iter()330.zip(ipc_field.fields.iter())331.map(|(field, ipc)| serialize_field(field, ipc))332.collect(),333Dictionary(_, inner, _) => serialize_children(inner, ipc_field),334Extension(ext) => serialize_children(&ext.inner, ipc_field),335Unknown => unimplemented!(),336}337}338339/// Create an IPC dictionary encoding340pub(crate) fn serialize_dictionary(341index_type: &IntegerType,342dict_id: i64,343dict_is_ordered: bool,344) -> arrow_format::ipc::DictionaryEncoding {345use IntegerType::*;346let is_signed = match index_type {347Int8 | Int16 | Int32 | Int64 | Int128 => true,348UInt8 | UInt16 | UInt32 | UInt64 => false,349};350351let bit_width = match index_type {352Int8 | UInt8 => 8,353Int16 | UInt16 => 16,354Int32 | UInt32 => 32,355Int64 | UInt64 => 64,356Int128 => 128,357};358359let index_type = arrow_format::ipc::Int {360bit_width,361is_signed,362};363364arrow_format::ipc::DictionaryEncoding {365id: dict_id,366index_type: Some(Box::new(index_type)),367is_ordered: dict_is_ordered,368dictionary_kind: arrow_format::ipc::DictionaryKind::DenseArray,369}370}371372373