Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/write/schema.rs
8433 views
1
use arrow_format::ipc::KeyValue;
2
use arrow_format::ipc::planus::Builder;
3
4
use super::super::IpcField;
5
use crate::datatypes::{
6
ArrowDataType, ArrowSchema, Field, IntegerType, IntervalUnit, Metadata, TimeUnit, UnionMode,
7
};
8
use crate::io::ipc::endianness::is_native_little_endian;
9
10
/// Converts a [ArrowSchema] and [IpcField]s to a flatbuffers-encoded [arrow_format::ipc::Message].
11
pub fn schema_to_bytes(
12
schema: &ArrowSchema,
13
ipc_fields: &[IpcField],
14
custom_metadata: Option<&Metadata>,
15
) -> Vec<u8> {
16
let schema = serialize_schema(schema, ipc_fields, custom_metadata);
17
18
let message = arrow_format::ipc::Message {
19
version: arrow_format::ipc::MetadataVersion::V5,
20
header: Some(arrow_format::ipc::MessageHeader::Schema(Box::new(schema))),
21
body_length: 0,
22
custom_metadata: None,
23
};
24
let mut builder = Builder::new();
25
let footer_data = builder.finish(&message, None);
26
footer_data.to_vec()
27
}
28
29
pub fn serialize_schema(
30
schema: &ArrowSchema,
31
ipc_fields: &[IpcField],
32
custom_schema_metadata: Option<&Metadata>,
33
) -> arrow_format::ipc::Schema {
34
let endianness = if is_native_little_endian() {
35
arrow_format::ipc::Endianness::Little
36
} else {
37
arrow_format::ipc::Endianness::Big
38
};
39
40
let fields = schema
41
.iter_values()
42
.zip(ipc_fields.iter())
43
.map(|(field, ipc_field)| serialize_field(field, ipc_field))
44
.collect::<Vec<_>>();
45
46
let mut custom_metadata: Vec<KeyValue> =
47
Vec::with_capacity(schema.metadata().len() + custom_schema_metadata.map_or(0, |x| x.len()));
48
49
for (k, v) in schema.metadata() {
50
custom_metadata.push(KeyValue {
51
key: Some(k.to_string()),
52
value: Some(v.to_string()),
53
});
54
}
55
56
if let Some(custom_schema_metadata) = custom_schema_metadata {
57
for (k, v) in custom_schema_metadata {
58
let kv = KeyValue {
59
key: Some(k.to_string()),
60
value: Some(v.to_string()),
61
};
62
63
if let Some(i) = schema.metadata().keys().position(|x| x == k) {
64
custom_metadata[i] = kv
65
} else {
66
custom_metadata.push(kv);
67
}
68
}
69
}
70
71
arrow_format::ipc::Schema {
72
endianness,
73
fields: Some(fields),
74
custom_metadata: (!custom_metadata.is_empty()).then_some(custom_metadata),
75
features: None, // todo add this one
76
}
77
}
78
79
pub fn key_value(key: impl Into<String>, val: impl Into<String>) -> arrow_format::ipc::KeyValue {
80
arrow_format::ipc::KeyValue {
81
key: Some(key.into()),
82
value: Some(val.into()),
83
}
84
}
85
86
fn write_metadata(metadata: &Metadata, kv_vec: &mut Vec<arrow_format::ipc::KeyValue>) {
87
for (k, v) in metadata {
88
if k.as_str() != "ARROW:extension:name" && k.as_str() != "ARROW:extension:metadata" {
89
kv_vec.push(key_value(k.clone().into_string(), v.clone().into_string()));
90
}
91
}
92
}
93
94
fn write_extension(
95
name: &str,
96
metadata: Option<&str>,
97
kv_vec: &mut Vec<arrow_format::ipc::KeyValue>,
98
) {
99
if let Some(metadata) = metadata {
100
kv_vec.push(key_value("ARROW:extension:metadata".to_string(), metadata));
101
}
102
103
kv_vec.push(key_value("ARROW:extension:name".to_string(), name));
104
}
105
106
/// Create an IPC Field from an Arrow Field
107
pub(crate) fn serialize_field(field: &Field, ipc_field: &IpcField) -> arrow_format::ipc::Field {
108
// custom metadata.
109
let mut kv_vec = vec![];
110
if let ArrowDataType::Extension(ext) = field.dtype() {
111
write_extension(
112
&ext.name,
113
ext.metadata.as_ref().map(|x| x.as_str()),
114
&mut kv_vec,
115
);
116
}
117
118
let type_ = serialize_type(field.dtype());
119
let children = serialize_children(field.dtype(), ipc_field);
120
121
let dictionary = if let ArrowDataType::Dictionary(index_type, inner, is_ordered) =
122
field.dtype().to_storage()
123
{
124
if let ArrowDataType::Extension(ext) = inner.as_ref() {
125
write_extension(
126
ext.name.as_str(),
127
ext.metadata.as_ref().map(|x| x.as_str()),
128
&mut kv_vec,
129
);
130
}
131
Some(serialize_dictionary(
132
index_type,
133
ipc_field
134
.dictionary_id
135
.expect("All Dictionary types have `dict_id`"),
136
*is_ordered,
137
))
138
} else {
139
None
140
};
141
142
if let Some(metadata) = &field.metadata {
143
write_metadata(metadata, &mut kv_vec);
144
}
145
146
let custom_metadata = if !kv_vec.is_empty() {
147
Some(kv_vec)
148
} else {
149
None
150
};
151
152
arrow_format::ipc::Field {
153
name: Some(field.name.to_string()),
154
nullable: field.is_nullable,
155
type_: Some(type_),
156
dictionary: dictionary.map(Box::new),
157
children: Some(children),
158
custom_metadata,
159
}
160
}
161
162
fn serialize_time_unit(unit: &TimeUnit) -> arrow_format::ipc::TimeUnit {
163
match unit {
164
TimeUnit::Second => arrow_format::ipc::TimeUnit::Second,
165
TimeUnit::Millisecond => arrow_format::ipc::TimeUnit::Millisecond,
166
TimeUnit::Microsecond => arrow_format::ipc::TimeUnit::Microsecond,
167
TimeUnit::Nanosecond => arrow_format::ipc::TimeUnit::Nanosecond,
168
}
169
}
170
171
fn serialize_type(dtype: &ArrowDataType) -> arrow_format::ipc::Type {
172
use ArrowDataType::*;
173
use arrow_format::ipc;
174
match dtype {
175
Null => ipc::Type::Null(Box::new(ipc::Null {})),
176
Boolean => ipc::Type::Bool(Box::new(ipc::Bool {})),
177
UInt8 => ipc::Type::Int(Box::new(ipc::Int {
178
bit_width: 8,
179
is_signed: false,
180
})),
181
UInt16 => ipc::Type::Int(Box::new(ipc::Int {
182
bit_width: 16,
183
is_signed: false,
184
})),
185
UInt32 => ipc::Type::Int(Box::new(ipc::Int {
186
bit_width: 32,
187
is_signed: false,
188
})),
189
UInt64 => ipc::Type::Int(Box::new(ipc::Int {
190
bit_width: 64,
191
is_signed: false,
192
})),
193
UInt128 => ipc::Type::Int(Box::new(ipc::Int {
194
bit_width: 128,
195
is_signed: false,
196
})),
197
Int8 => ipc::Type::Int(Box::new(ipc::Int {
198
bit_width: 8,
199
is_signed: true,
200
})),
201
Int16 => ipc::Type::Int(Box::new(ipc::Int {
202
bit_width: 16,
203
is_signed: true,
204
})),
205
Int32 => ipc::Type::Int(Box::new(ipc::Int {
206
bit_width: 32,
207
is_signed: true,
208
})),
209
Int64 => ipc::Type::Int(Box::new(ipc::Int {
210
bit_width: 64,
211
is_signed: true,
212
})),
213
Int128 => ipc::Type::Int(Box::new(ipc::Int {
214
bit_width: 128,
215
is_signed: true,
216
})),
217
Float16 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {
218
precision: ipc::Precision::Half,
219
})),
220
Float32 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {
221
precision: ipc::Precision::Single,
222
})),
223
Float64 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {
224
precision: ipc::Precision::Double,
225
})),
226
Decimal(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {
227
precision: *precision as i32,
228
scale: *scale as i32,
229
bit_width: 128,
230
})),
231
Decimal32(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {
232
precision: *precision as i32,
233
scale: *scale as i32,
234
bit_width: 32,
235
})),
236
Decimal64(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {
237
precision: *precision as i32,
238
scale: *scale as i32,
239
bit_width: 64,
240
})),
241
Decimal256(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {
242
precision: *precision as i32,
243
scale: *scale as i32,
244
bit_width: 256,
245
})),
246
Binary => ipc::Type::Binary(Box::new(ipc::Binary {})),
247
LargeBinary => ipc::Type::LargeBinary(Box::new(ipc::LargeBinary {})),
248
Utf8 => ipc::Type::Utf8(Box::new(ipc::Utf8 {})),
249
LargeUtf8 => ipc::Type::LargeUtf8(Box::new(ipc::LargeUtf8 {})),
250
FixedSizeBinary(size) => ipc::Type::FixedSizeBinary(Box::new(ipc::FixedSizeBinary {
251
byte_width: *size as i32,
252
})),
253
Date32 => ipc::Type::Date(Box::new(ipc::Date {
254
unit: ipc::DateUnit::Day,
255
})),
256
Date64 => ipc::Type::Date(Box::new(ipc::Date {
257
unit: ipc::DateUnit::Millisecond,
258
})),
259
Duration(unit) => ipc::Type::Duration(Box::new(ipc::Duration {
260
unit: serialize_time_unit(unit),
261
})),
262
Time32(unit) => ipc::Type::Time(Box::new(ipc::Time {
263
unit: serialize_time_unit(unit),
264
bit_width: 32,
265
})),
266
Time64(unit) => ipc::Type::Time(Box::new(ipc::Time {
267
unit: serialize_time_unit(unit),
268
bit_width: 64,
269
})),
270
Timestamp(unit, tz) => ipc::Type::Timestamp(Box::new(ipc::Timestamp {
271
unit: serialize_time_unit(unit),
272
timezone: tz.as_ref().map(|x| x.to_string()),
273
})),
274
Interval(unit) => ipc::Type::Interval(Box::new(ipc::Interval {
275
unit: match unit {
276
IntervalUnit::YearMonth => ipc::IntervalUnit::YearMonth,
277
IntervalUnit::DayTime => ipc::IntervalUnit::DayTime,
278
IntervalUnit::MonthDayNano => ipc::IntervalUnit::MonthDayNano,
279
IntervalUnit::MonthDayMillis => unimplemented!(),
280
},
281
})),
282
List(_) => ipc::Type::List(Box::new(ipc::List {})),
283
LargeList(_) => ipc::Type::LargeList(Box::new(ipc::LargeList {})),
284
FixedSizeList(_, size) => ipc::Type::FixedSizeList(Box::new(ipc::FixedSizeList {
285
list_size: *size as i32,
286
})),
287
Union(u) => ipc::Type::Union(Box::new(ipc::Union {
288
mode: match u.mode {
289
UnionMode::Dense => ipc::UnionMode::Dense,
290
UnionMode::Sparse => ipc::UnionMode::Sparse,
291
},
292
type_ids: u.ids.clone(),
293
})),
294
Map(_, keys_sorted) => ipc::Type::Map(Box::new(ipc::Map {
295
keys_sorted: *keys_sorted,
296
})),
297
Struct(_) => ipc::Type::Struct(Box::new(ipc::Struct {})),
298
Dictionary(_, v, _) => serialize_type(v),
299
Extension(ext) => serialize_type(&ext.inner),
300
Utf8View => ipc::Type::Utf8View(Box::new(ipc::Utf8View {})),
301
BinaryView => ipc::Type::BinaryView(Box::new(ipc::BinaryView {})),
302
Unknown => unimplemented!(),
303
}
304
}
305
306
fn serialize_children(
307
dtype: &ArrowDataType,
308
ipc_field: &IpcField,
309
) -> Vec<arrow_format::ipc::Field> {
310
use ArrowDataType::*;
311
match dtype {
312
Null
313
| Boolean
314
| Int8
315
| Int16
316
| Int32
317
| Int64
318
| Int128
319
| UInt8
320
| UInt16
321
| UInt32
322
| UInt64
323
| UInt128
324
| Float16
325
| Float32
326
| Float64
327
| Timestamp(_, _)
328
| Date32
329
| Date64
330
| Time32(_)
331
| Time64(_)
332
| Duration(_)
333
| Interval(_)
334
| Binary
335
| FixedSizeBinary(_)
336
| LargeBinary
337
| Utf8
338
| LargeUtf8
339
| Utf8View
340
| BinaryView
341
| Decimal(_, _)
342
| Decimal32(_, _)
343
| Decimal64(_, _)
344
| Decimal256(_, _) => vec![],
345
FixedSizeList(inner, _) | LargeList(inner) | List(inner) | Map(inner, _) => {
346
vec![serialize_field(inner, &ipc_field.fields[0])]
347
},
348
Struct(fields) => fields
349
.iter()
350
.zip(ipc_field.fields.iter())
351
.map(|(field, ipc)| serialize_field(field, ipc))
352
.collect(),
353
Union(u) => u
354
.fields
355
.iter()
356
.zip(ipc_field.fields.iter())
357
.map(|(field, ipc)| serialize_field(field, ipc))
358
.collect(),
359
Dictionary(_, inner, _) => serialize_children(inner, ipc_field),
360
Extension(ext) => serialize_children(&ext.inner, ipc_field),
361
Unknown => unimplemented!(),
362
}
363
}
364
365
/// Create an IPC dictionary encoding
366
pub(crate) fn serialize_dictionary(
367
index_type: &IntegerType,
368
dict_id: i64,
369
dict_is_ordered: bool,
370
) -> arrow_format::ipc::DictionaryEncoding {
371
use IntegerType::*;
372
let is_signed = match index_type {
373
Int8 | Int16 | Int32 | Int64 | Int128 => true,
374
UInt8 | UInt16 | UInt32 | UInt64 | UInt128 => false,
375
};
376
377
let bit_width = match index_type {
378
Int8 | UInt8 => 8,
379
Int16 | UInt16 => 16,
380
Int32 | UInt32 => 32,
381
Int64 | UInt64 => 64,
382
Int128 | UInt128 => 128,
383
};
384
385
let index_type = arrow_format::ipc::Int {
386
bit_width,
387
is_signed,
388
};
389
390
arrow_format::ipc::DictionaryEncoding {
391
id: dict_id,
392
index_type: Some(Box::new(index_type)),
393
is_ordered: dict_is_ordered,
394
dictionary_kind: arrow_format::ipc::DictionaryKind::DenseArray,
395
}
396
}
397
398