Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/write/schema.rs
6940 views
1
use arrow_format::ipc::planus::Builder;
2
3
use super::super::IpcField;
4
use crate::datatypes::{
5
ArrowDataType, ArrowSchema, Field, IntegerType, IntervalUnit, Metadata, TimeUnit, UnionMode,
6
};
7
use crate::io::ipc::endianness::is_native_little_endian;
8
9
/// Converts a [ArrowSchema] and [IpcField]s to a flatbuffers-encoded [arrow_format::ipc::Message].
10
pub fn schema_to_bytes(
11
schema: &ArrowSchema,
12
ipc_fields: &[IpcField],
13
custom_metadata: Option<&Metadata>,
14
) -> Vec<u8> {
15
let schema = serialize_schema(schema, ipc_fields, custom_metadata);
16
17
let message = arrow_format::ipc::Message {
18
version: arrow_format::ipc::MetadataVersion::V5,
19
header: Some(arrow_format::ipc::MessageHeader::Schema(Box::new(schema))),
20
body_length: 0,
21
custom_metadata: None, // todo: allow writing custom metadata
22
};
23
let mut builder = Builder::new();
24
let footer_data = builder.finish(&message, None);
25
footer_data.to_vec()
26
}
27
28
pub fn serialize_schema(
29
schema: &ArrowSchema,
30
ipc_fields: &[IpcField],
31
custom_schema_metadata: Option<&Metadata>,
32
) -> arrow_format::ipc::Schema {
33
let endianness = if is_native_little_endian() {
34
arrow_format::ipc::Endianness::Little
35
} else {
36
arrow_format::ipc::Endianness::Big
37
};
38
39
let fields = schema
40
.iter_values()
41
.zip(ipc_fields.iter())
42
.map(|(field, ipc_field)| serialize_field(field, ipc_field))
43
.collect::<Vec<_>>();
44
45
let custom_metadata = custom_schema_metadata.and_then(|custom_meta| {
46
let as_kv = custom_meta
47
.iter()
48
.map(|(key, val)| key_value(key.clone().into_string(), val.clone().into_string()))
49
.collect::<Vec<_>>();
50
(!as_kv.is_empty()).then_some(as_kv)
51
});
52
53
arrow_format::ipc::Schema {
54
endianness,
55
fields: Some(fields),
56
custom_metadata,
57
features: None, // todo add this one
58
}
59
}
60
61
fn key_value(key: impl Into<String>, val: impl Into<String>) -> arrow_format::ipc::KeyValue {
62
arrow_format::ipc::KeyValue {
63
key: Some(key.into()),
64
value: Some(val.into()),
65
}
66
}
67
68
fn write_metadata(metadata: &Metadata, kv_vec: &mut Vec<arrow_format::ipc::KeyValue>) {
69
for (k, v) in metadata {
70
if k.as_str() != "ARROW:extension:name" && k.as_str() != "ARROW:extension:metadata" {
71
kv_vec.push(key_value(k.clone().into_string(), v.clone().into_string()));
72
}
73
}
74
}
75
76
fn write_extension(
77
name: &str,
78
metadata: Option<&str>,
79
kv_vec: &mut Vec<arrow_format::ipc::KeyValue>,
80
) {
81
if let Some(metadata) = metadata {
82
kv_vec.push(key_value("ARROW:extension:metadata".to_string(), metadata));
83
}
84
85
kv_vec.push(key_value("ARROW:extension:name".to_string(), name));
86
}
87
88
/// Create an IPC Field from an Arrow Field
89
pub(crate) fn serialize_field(field: &Field, ipc_field: &IpcField) -> arrow_format::ipc::Field {
90
// custom metadata.
91
let mut kv_vec = vec![];
92
if let ArrowDataType::Extension(ext) = field.dtype() {
93
write_extension(
94
&ext.name,
95
ext.metadata.as_ref().map(|x| x.as_str()),
96
&mut kv_vec,
97
);
98
}
99
100
let type_ = serialize_type(field.dtype());
101
let children = serialize_children(field.dtype(), ipc_field);
102
103
let dictionary = if let ArrowDataType::Dictionary(index_type, inner, is_ordered) = field.dtype()
104
{
105
if let ArrowDataType::Extension(ext) = inner.as_ref() {
106
write_extension(
107
ext.name.as_str(),
108
ext.metadata.as_ref().map(|x| x.as_str()),
109
&mut kv_vec,
110
);
111
}
112
Some(serialize_dictionary(
113
index_type,
114
ipc_field
115
.dictionary_id
116
.expect("All Dictionary types have `dict_id`"),
117
*is_ordered,
118
))
119
} else {
120
None
121
};
122
123
if let Some(metadata) = &field.metadata {
124
write_metadata(metadata, &mut kv_vec);
125
}
126
127
let custom_metadata = if !kv_vec.is_empty() {
128
Some(kv_vec)
129
} else {
130
None
131
};
132
133
arrow_format::ipc::Field {
134
name: Some(field.name.to_string()),
135
nullable: field.is_nullable,
136
type_: Some(type_),
137
dictionary: dictionary.map(Box::new),
138
children: Some(children),
139
custom_metadata,
140
}
141
}
142
143
fn serialize_time_unit(unit: &TimeUnit) -> arrow_format::ipc::TimeUnit {
144
match unit {
145
TimeUnit::Second => arrow_format::ipc::TimeUnit::Second,
146
TimeUnit::Millisecond => arrow_format::ipc::TimeUnit::Millisecond,
147
TimeUnit::Microsecond => arrow_format::ipc::TimeUnit::Microsecond,
148
TimeUnit::Nanosecond => arrow_format::ipc::TimeUnit::Nanosecond,
149
}
150
}
151
152
fn serialize_type(dtype: &ArrowDataType) -> arrow_format::ipc::Type {
153
use ArrowDataType::*;
154
use arrow_format::ipc;
155
match dtype {
156
Null => ipc::Type::Null(Box::new(ipc::Null {})),
157
Boolean => ipc::Type::Bool(Box::new(ipc::Bool {})),
158
UInt8 => ipc::Type::Int(Box::new(ipc::Int {
159
bit_width: 8,
160
is_signed: false,
161
})),
162
UInt16 => ipc::Type::Int(Box::new(ipc::Int {
163
bit_width: 16,
164
is_signed: false,
165
})),
166
UInt32 => ipc::Type::Int(Box::new(ipc::Int {
167
bit_width: 32,
168
is_signed: false,
169
})),
170
UInt64 => ipc::Type::Int(Box::new(ipc::Int {
171
bit_width: 64,
172
is_signed: false,
173
})),
174
Int8 => ipc::Type::Int(Box::new(ipc::Int {
175
bit_width: 8,
176
is_signed: true,
177
})),
178
Int16 => ipc::Type::Int(Box::new(ipc::Int {
179
bit_width: 16,
180
is_signed: true,
181
})),
182
Int32 => ipc::Type::Int(Box::new(ipc::Int {
183
bit_width: 32,
184
is_signed: true,
185
})),
186
Int64 => ipc::Type::Int(Box::new(ipc::Int {
187
bit_width: 64,
188
is_signed: true,
189
})),
190
Int128 => ipc::Type::Int(Box::new(ipc::Int {
191
bit_width: 128,
192
is_signed: true,
193
})),
194
Float16 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {
195
precision: ipc::Precision::Half,
196
})),
197
Float32 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {
198
precision: ipc::Precision::Single,
199
})),
200
Float64 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {
201
precision: ipc::Precision::Double,
202
})),
203
Decimal(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {
204
precision: *precision as i32,
205
scale: *scale as i32,
206
bit_width: 128,
207
})),
208
Decimal32(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {
209
precision: *precision as i32,
210
scale: *scale as i32,
211
bit_width: 32,
212
})),
213
Decimal64(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {
214
precision: *precision as i32,
215
scale: *scale as i32,
216
bit_width: 64,
217
})),
218
Decimal256(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {
219
precision: *precision as i32,
220
scale: *scale as i32,
221
bit_width: 256,
222
})),
223
Binary => ipc::Type::Binary(Box::new(ipc::Binary {})),
224
LargeBinary => ipc::Type::LargeBinary(Box::new(ipc::LargeBinary {})),
225
Utf8 => ipc::Type::Utf8(Box::new(ipc::Utf8 {})),
226
LargeUtf8 => ipc::Type::LargeUtf8(Box::new(ipc::LargeUtf8 {})),
227
FixedSizeBinary(size) => ipc::Type::FixedSizeBinary(Box::new(ipc::FixedSizeBinary {
228
byte_width: *size as i32,
229
})),
230
Date32 => ipc::Type::Date(Box::new(ipc::Date {
231
unit: ipc::DateUnit::Day,
232
})),
233
Date64 => ipc::Type::Date(Box::new(ipc::Date {
234
unit: ipc::DateUnit::Millisecond,
235
})),
236
Duration(unit) => ipc::Type::Duration(Box::new(ipc::Duration {
237
unit: serialize_time_unit(unit),
238
})),
239
Time32(unit) => ipc::Type::Time(Box::new(ipc::Time {
240
unit: serialize_time_unit(unit),
241
bit_width: 32,
242
})),
243
Time64(unit) => ipc::Type::Time(Box::new(ipc::Time {
244
unit: serialize_time_unit(unit),
245
bit_width: 64,
246
})),
247
Timestamp(unit, tz) => ipc::Type::Timestamp(Box::new(ipc::Timestamp {
248
unit: serialize_time_unit(unit),
249
timezone: tz.as_ref().map(|x| x.to_string()),
250
})),
251
Interval(unit) => ipc::Type::Interval(Box::new(ipc::Interval {
252
unit: match unit {
253
IntervalUnit::YearMonth => ipc::IntervalUnit::YearMonth,
254
IntervalUnit::DayTime => ipc::IntervalUnit::DayTime,
255
IntervalUnit::MonthDayNano => ipc::IntervalUnit::MonthDayNano,
256
},
257
})),
258
List(_) => ipc::Type::List(Box::new(ipc::List {})),
259
LargeList(_) => ipc::Type::LargeList(Box::new(ipc::LargeList {})),
260
FixedSizeList(_, size) => ipc::Type::FixedSizeList(Box::new(ipc::FixedSizeList {
261
list_size: *size as i32,
262
})),
263
Union(u) => ipc::Type::Union(Box::new(ipc::Union {
264
mode: match u.mode {
265
UnionMode::Dense => ipc::UnionMode::Dense,
266
UnionMode::Sparse => ipc::UnionMode::Sparse,
267
},
268
type_ids: u.ids.clone(),
269
})),
270
Map(_, keys_sorted) => ipc::Type::Map(Box::new(ipc::Map {
271
keys_sorted: *keys_sorted,
272
})),
273
Struct(_) => ipc::Type::Struct(Box::new(ipc::Struct {})),
274
Dictionary(_, v, _) => serialize_type(v),
275
Extension(ext) => serialize_type(&ext.inner),
276
Utf8View => ipc::Type::Utf8View(Box::new(ipc::Utf8View {})),
277
BinaryView => ipc::Type::BinaryView(Box::new(ipc::BinaryView {})),
278
Unknown => unimplemented!(),
279
}
280
}
281
282
fn serialize_children(
283
dtype: &ArrowDataType,
284
ipc_field: &IpcField,
285
) -> Vec<arrow_format::ipc::Field> {
286
use ArrowDataType::*;
287
match dtype {
288
Null
289
| Boolean
290
| Int8
291
| Int16
292
| Int32
293
| Int64
294
| UInt8
295
| UInt16
296
| UInt32
297
| UInt64
298
| Int128
299
| Float16
300
| Float32
301
| Float64
302
| Timestamp(_, _)
303
| Date32
304
| Date64
305
| Time32(_)
306
| Time64(_)
307
| Duration(_)
308
| Interval(_)
309
| Binary
310
| FixedSizeBinary(_)
311
| LargeBinary
312
| Utf8
313
| LargeUtf8
314
| Utf8View
315
| BinaryView
316
| Decimal(_, _)
317
| Decimal32(_, _)
318
| Decimal64(_, _)
319
| Decimal256(_, _) => vec![],
320
FixedSizeList(inner, _) | LargeList(inner) | List(inner) | Map(inner, _) => {
321
vec![serialize_field(inner, &ipc_field.fields[0])]
322
},
323
Struct(fields) => fields
324
.iter()
325
.zip(ipc_field.fields.iter())
326
.map(|(field, ipc)| serialize_field(field, ipc))
327
.collect(),
328
Union(u) => u
329
.fields
330
.iter()
331
.zip(ipc_field.fields.iter())
332
.map(|(field, ipc)| serialize_field(field, ipc))
333
.collect(),
334
Dictionary(_, inner, _) => serialize_children(inner, ipc_field),
335
Extension(ext) => serialize_children(&ext.inner, ipc_field),
336
Unknown => unimplemented!(),
337
}
338
}
339
340
/// Create an IPC dictionary encoding
341
pub(crate) fn serialize_dictionary(
342
index_type: &IntegerType,
343
dict_id: i64,
344
dict_is_ordered: bool,
345
) -> arrow_format::ipc::DictionaryEncoding {
346
use IntegerType::*;
347
let is_signed = match index_type {
348
Int8 | Int16 | Int32 | Int64 | Int128 => true,
349
UInt8 | UInt16 | UInt32 | UInt64 => false,
350
};
351
352
let bit_width = match index_type {
353
Int8 | UInt8 => 8,
354
Int16 | UInt16 => 16,
355
Int32 | UInt32 => 32,
356
Int64 | UInt64 => 64,
357
Int128 => 128,
358
};
359
360
let index_type = arrow_format::ipc::Int {
361
bit_width,
362
is_signed,
363
};
364
365
arrow_format::ipc::DictionaryEncoding {
366
id: dict_id,
367
index_type: Some(Box::new(index_type)),
368
is_ordered: dict_is_ordered,
369
dictionary_kind: arrow_format::ipc::DictionaryKind::DenseArray,
370
}
371
}
372
373