Path: blob/main/crates/polars-arrow/src/io/ipc/write/schema.rs
8433 views
use arrow_format::ipc::KeyValue;1use arrow_format::ipc::planus::Builder;23use super::super::IpcField;4use crate::datatypes::{5ArrowDataType, ArrowSchema, Field, IntegerType, IntervalUnit, Metadata, TimeUnit, UnionMode,6};7use crate::io::ipc::endianness::is_native_little_endian;89/// Converts a [ArrowSchema] and [IpcField]s to a flatbuffers-encoded [arrow_format::ipc::Message].10pub fn schema_to_bytes(11schema: &ArrowSchema,12ipc_fields: &[IpcField],13custom_metadata: Option<&Metadata>,14) -> Vec<u8> {15let schema = serialize_schema(schema, ipc_fields, custom_metadata);1617let message = arrow_format::ipc::Message {18version: arrow_format::ipc::MetadataVersion::V5,19header: Some(arrow_format::ipc::MessageHeader::Schema(Box::new(schema))),20body_length: 0,21custom_metadata: None,22};23let mut builder = Builder::new();24let footer_data = builder.finish(&message, None);25footer_data.to_vec()26}2728pub fn serialize_schema(29schema: &ArrowSchema,30ipc_fields: &[IpcField],31custom_schema_metadata: Option<&Metadata>,32) -> arrow_format::ipc::Schema {33let endianness = if is_native_little_endian() {34arrow_format::ipc::Endianness::Little35} else {36arrow_format::ipc::Endianness::Big37};3839let fields = schema40.iter_values()41.zip(ipc_fields.iter())42.map(|(field, ipc_field)| serialize_field(field, ipc_field))43.collect::<Vec<_>>();4445let mut custom_metadata: Vec<KeyValue> =46Vec::with_capacity(schema.metadata().len() + custom_schema_metadata.map_or(0, |x| x.len()));4748for (k, v) in schema.metadata() {49custom_metadata.push(KeyValue {50key: Some(k.to_string()),51value: Some(v.to_string()),52});53}5455if let Some(custom_schema_metadata) = custom_schema_metadata {56for (k, v) in custom_schema_metadata {57let kv = KeyValue {58key: Some(k.to_string()),59value: Some(v.to_string()),60};6162if let Some(i) = schema.metadata().keys().position(|x| x == k) {63custom_metadata[i] = kv64} else {65custom_metadata.push(kv);66}67}68}6970arrow_format::ipc::Schema {71endianness,72fields: Some(fields),73custom_metadata: (!custom_metadata.is_empty()).then_some(custom_metadata),74features: None, // todo add this one75}76}7778pub fn key_value(key: impl Into<String>, val: impl Into<String>) -> arrow_format::ipc::KeyValue {79arrow_format::ipc::KeyValue {80key: Some(key.into()),81value: Some(val.into()),82}83}8485fn write_metadata(metadata: &Metadata, kv_vec: &mut Vec<arrow_format::ipc::KeyValue>) {86for (k, v) in metadata {87if k.as_str() != "ARROW:extension:name" && k.as_str() != "ARROW:extension:metadata" {88kv_vec.push(key_value(k.clone().into_string(), v.clone().into_string()));89}90}91}9293fn write_extension(94name: &str,95metadata: Option<&str>,96kv_vec: &mut Vec<arrow_format::ipc::KeyValue>,97) {98if let Some(metadata) = metadata {99kv_vec.push(key_value("ARROW:extension:metadata".to_string(), metadata));100}101102kv_vec.push(key_value("ARROW:extension:name".to_string(), name));103}104105/// Create an IPC Field from an Arrow Field106pub(crate) fn serialize_field(field: &Field, ipc_field: &IpcField) -> arrow_format::ipc::Field {107// custom metadata.108let mut kv_vec = vec![];109if let ArrowDataType::Extension(ext) = field.dtype() {110write_extension(111&ext.name,112ext.metadata.as_ref().map(|x| x.as_str()),113&mut kv_vec,114);115}116117let type_ = serialize_type(field.dtype());118let children = serialize_children(field.dtype(), ipc_field);119120let dictionary = if let ArrowDataType::Dictionary(index_type, inner, is_ordered) =121field.dtype().to_storage()122{123if let ArrowDataType::Extension(ext) = inner.as_ref() {124write_extension(125ext.name.as_str(),126ext.metadata.as_ref().map(|x| x.as_str()),127&mut kv_vec,128);129}130Some(serialize_dictionary(131index_type,132ipc_field133.dictionary_id134.expect("All Dictionary types have `dict_id`"),135*is_ordered,136))137} else {138None139};140141if let Some(metadata) = &field.metadata {142write_metadata(metadata, &mut kv_vec);143}144145let custom_metadata = if !kv_vec.is_empty() {146Some(kv_vec)147} else {148None149};150151arrow_format::ipc::Field {152name: Some(field.name.to_string()),153nullable: field.is_nullable,154type_: Some(type_),155dictionary: dictionary.map(Box::new),156children: Some(children),157custom_metadata,158}159}160161fn serialize_time_unit(unit: &TimeUnit) -> arrow_format::ipc::TimeUnit {162match unit {163TimeUnit::Second => arrow_format::ipc::TimeUnit::Second,164TimeUnit::Millisecond => arrow_format::ipc::TimeUnit::Millisecond,165TimeUnit::Microsecond => arrow_format::ipc::TimeUnit::Microsecond,166TimeUnit::Nanosecond => arrow_format::ipc::TimeUnit::Nanosecond,167}168}169170fn serialize_type(dtype: &ArrowDataType) -> arrow_format::ipc::Type {171use ArrowDataType::*;172use arrow_format::ipc;173match dtype {174Null => ipc::Type::Null(Box::new(ipc::Null {})),175Boolean => ipc::Type::Bool(Box::new(ipc::Bool {})),176UInt8 => ipc::Type::Int(Box::new(ipc::Int {177bit_width: 8,178is_signed: false,179})),180UInt16 => ipc::Type::Int(Box::new(ipc::Int {181bit_width: 16,182is_signed: false,183})),184UInt32 => ipc::Type::Int(Box::new(ipc::Int {185bit_width: 32,186is_signed: false,187})),188UInt64 => ipc::Type::Int(Box::new(ipc::Int {189bit_width: 64,190is_signed: false,191})),192UInt128 => ipc::Type::Int(Box::new(ipc::Int {193bit_width: 128,194is_signed: false,195})),196Int8 => ipc::Type::Int(Box::new(ipc::Int {197bit_width: 8,198is_signed: true,199})),200Int16 => ipc::Type::Int(Box::new(ipc::Int {201bit_width: 16,202is_signed: true,203})),204Int32 => ipc::Type::Int(Box::new(ipc::Int {205bit_width: 32,206is_signed: true,207})),208Int64 => ipc::Type::Int(Box::new(ipc::Int {209bit_width: 64,210is_signed: true,211})),212Int128 => ipc::Type::Int(Box::new(ipc::Int {213bit_width: 128,214is_signed: true,215})),216Float16 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {217precision: ipc::Precision::Half,218})),219Float32 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {220precision: ipc::Precision::Single,221})),222Float64 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {223precision: ipc::Precision::Double,224})),225Decimal(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {226precision: *precision as i32,227scale: *scale as i32,228bit_width: 128,229})),230Decimal32(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {231precision: *precision as i32,232scale: *scale as i32,233bit_width: 32,234})),235Decimal64(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {236precision: *precision as i32,237scale: *scale as i32,238bit_width: 64,239})),240Decimal256(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {241precision: *precision as i32,242scale: *scale as i32,243bit_width: 256,244})),245Binary => ipc::Type::Binary(Box::new(ipc::Binary {})),246LargeBinary => ipc::Type::LargeBinary(Box::new(ipc::LargeBinary {})),247Utf8 => ipc::Type::Utf8(Box::new(ipc::Utf8 {})),248LargeUtf8 => ipc::Type::LargeUtf8(Box::new(ipc::LargeUtf8 {})),249FixedSizeBinary(size) => ipc::Type::FixedSizeBinary(Box::new(ipc::FixedSizeBinary {250byte_width: *size as i32,251})),252Date32 => ipc::Type::Date(Box::new(ipc::Date {253unit: ipc::DateUnit::Day,254})),255Date64 => ipc::Type::Date(Box::new(ipc::Date {256unit: ipc::DateUnit::Millisecond,257})),258Duration(unit) => ipc::Type::Duration(Box::new(ipc::Duration {259unit: serialize_time_unit(unit),260})),261Time32(unit) => ipc::Type::Time(Box::new(ipc::Time {262unit: serialize_time_unit(unit),263bit_width: 32,264})),265Time64(unit) => ipc::Type::Time(Box::new(ipc::Time {266unit: serialize_time_unit(unit),267bit_width: 64,268})),269Timestamp(unit, tz) => ipc::Type::Timestamp(Box::new(ipc::Timestamp {270unit: serialize_time_unit(unit),271timezone: tz.as_ref().map(|x| x.to_string()),272})),273Interval(unit) => ipc::Type::Interval(Box::new(ipc::Interval {274unit: match unit {275IntervalUnit::YearMonth => ipc::IntervalUnit::YearMonth,276IntervalUnit::DayTime => ipc::IntervalUnit::DayTime,277IntervalUnit::MonthDayNano => ipc::IntervalUnit::MonthDayNano,278IntervalUnit::MonthDayMillis => unimplemented!(),279},280})),281List(_) => ipc::Type::List(Box::new(ipc::List {})),282LargeList(_) => ipc::Type::LargeList(Box::new(ipc::LargeList {})),283FixedSizeList(_, size) => ipc::Type::FixedSizeList(Box::new(ipc::FixedSizeList {284list_size: *size as i32,285})),286Union(u) => ipc::Type::Union(Box::new(ipc::Union {287mode: match u.mode {288UnionMode::Dense => ipc::UnionMode::Dense,289UnionMode::Sparse => ipc::UnionMode::Sparse,290},291type_ids: u.ids.clone(),292})),293Map(_, keys_sorted) => ipc::Type::Map(Box::new(ipc::Map {294keys_sorted: *keys_sorted,295})),296Struct(_) => ipc::Type::Struct(Box::new(ipc::Struct {})),297Dictionary(_, v, _) => serialize_type(v),298Extension(ext) => serialize_type(&ext.inner),299Utf8View => ipc::Type::Utf8View(Box::new(ipc::Utf8View {})),300BinaryView => ipc::Type::BinaryView(Box::new(ipc::BinaryView {})),301Unknown => unimplemented!(),302}303}304305fn serialize_children(306dtype: &ArrowDataType,307ipc_field: &IpcField,308) -> Vec<arrow_format::ipc::Field> {309use ArrowDataType::*;310match dtype {311Null312| Boolean313| Int8314| Int16315| Int32316| Int64317| Int128318| UInt8319| UInt16320| UInt32321| UInt64322| UInt128323| Float16324| Float32325| Float64326| Timestamp(_, _)327| Date32328| Date64329| Time32(_)330| Time64(_)331| Duration(_)332| Interval(_)333| Binary334| FixedSizeBinary(_)335| LargeBinary336| Utf8337| LargeUtf8338| Utf8View339| BinaryView340| Decimal(_, _)341| Decimal32(_, _)342| Decimal64(_, _)343| Decimal256(_, _) => vec![],344FixedSizeList(inner, _) | LargeList(inner) | List(inner) | Map(inner, _) => {345vec![serialize_field(inner, &ipc_field.fields[0])]346},347Struct(fields) => fields348.iter()349.zip(ipc_field.fields.iter())350.map(|(field, ipc)| serialize_field(field, ipc))351.collect(),352Union(u) => u353.fields354.iter()355.zip(ipc_field.fields.iter())356.map(|(field, ipc)| serialize_field(field, ipc))357.collect(),358Dictionary(_, inner, _) => serialize_children(inner, ipc_field),359Extension(ext) => serialize_children(&ext.inner, ipc_field),360Unknown => unimplemented!(),361}362}363364/// Create an IPC dictionary encoding365pub(crate) fn serialize_dictionary(366index_type: &IntegerType,367dict_id: i64,368dict_is_ordered: bool,369) -> arrow_format::ipc::DictionaryEncoding {370use IntegerType::*;371let is_signed = match index_type {372Int8 | Int16 | Int32 | Int64 | Int128 => true,373UInt8 | UInt16 | UInt32 | UInt64 | UInt128 => false,374};375376let bit_width = match index_type {377Int8 | UInt8 => 8,378Int16 | UInt16 => 16,379Int32 | UInt32 => 32,380Int64 | UInt64 => 64,381Int128 | UInt128 => 128,382};383384let index_type = arrow_format::ipc::Int {385bit_width,386is_signed,387};388389arrow_format::ipc::DictionaryEncoding {390id: dict_id,391index_type: Some(Box::new(index_type)),392is_ordered: dict_is_ordered,393dictionary_kind: arrow_format::ipc::DictionaryKind::DenseArray,394}395}396397398