Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/avro/read/deserialize.rs
7884 views
1
use std::sync::Arc;
2
3
use avro_schema::file::Block;
4
use avro_schema::schema::{Enum, Field as AvroField, Record, Schema as AvroSchema};
5
use polars_error::{PolarsResult, polars_bail, polars_err};
6
7
use super::nested::*;
8
use super::util;
9
use crate::array::*;
10
use crate::datatypes::*;
11
use crate::record_batch::RecordBatchT;
12
use crate::types::months_days_ns;
13
use crate::with_match_primitive_type_full;
14
15
fn make_mutable(
16
dtype: &ArrowDataType,
17
avro_field: Option<&AvroSchema>,
18
capacity: usize,
19
) -> PolarsResult<Box<dyn MutableArray>> {
20
Ok(match dtype.to_physical_type() {
21
PhysicalType::Boolean => {
22
Box::new(MutableBooleanArray::with_capacity(capacity)) as Box<dyn MutableArray>
23
},
24
PhysicalType::Primitive(primitive) => with_match_primitive_type_full!(primitive, |$T| {
25
Box::new(MutablePrimitiveArray::<$T>::with_capacity(capacity).to(dtype.clone()))
26
as Box<dyn MutableArray>
27
}),
28
PhysicalType::Binary => {
29
Box::new(MutableBinaryArray::<i32>::with_capacity(capacity)) as Box<dyn MutableArray>
30
},
31
PhysicalType::Utf8 => {
32
Box::new(MutableUtf8Array::<i32>::with_capacity(capacity)) as Box<dyn MutableArray>
33
},
34
PhysicalType::Dictionary(_) => {
35
if let Some(AvroSchema::Enum(Enum { symbols, .. })) = avro_field {
36
let values = Utf8Array::<i32>::from_slice(symbols);
37
Box::new(FixedItemsUtf8Dictionary::with_capacity(values, capacity))
38
as Box<dyn MutableArray>
39
} else {
40
unreachable!()
41
}
42
},
43
_ => match dtype {
44
ArrowDataType::List(inner) => {
45
let values = make_mutable(inner.dtype(), None, 0)?;
46
Box::new(DynMutableListArray::<i32>::new_from(
47
values,
48
dtype.clone(),
49
capacity,
50
)) as Box<dyn MutableArray>
51
},
52
ArrowDataType::FixedSizeBinary(size) => {
53
Box::new(MutableFixedSizeBinaryArray::with_capacity(*size, capacity))
54
as Box<dyn MutableArray>
55
},
56
ArrowDataType::Struct(fields) => {
57
let values = fields
58
.iter()
59
.map(|field| make_mutable(field.dtype(), None, capacity))
60
.collect::<PolarsResult<Vec<_>>>()?;
61
Box::new(DynMutableStructArray::new(values, dtype.clone())) as Box<dyn MutableArray>
62
},
63
ArrowDataType::Extension(ext) => make_mutable(&ext.inner, avro_field, capacity)?,
64
other => {
65
polars_bail!(nyi = "Deserializing type {other:#?} is still not implemented")
66
},
67
},
68
})
69
}
70
71
fn is_union_null_first(avro_field: &AvroSchema) -> bool {
72
if let AvroSchema::Union(schemas) = avro_field {
73
schemas[0] == AvroSchema::Null
74
} else {
75
unreachable!()
76
}
77
}
78
79
fn deserialize_item<'a>(
80
array: &mut dyn MutableArray,
81
is_nullable: bool,
82
avro_field: &AvroSchema,
83
mut block: &'a [u8],
84
) -> PolarsResult<&'a [u8]> {
85
if is_nullable {
86
let variant = util::zigzag_i64(&mut block)?;
87
let is_null_first = is_union_null_first(avro_field);
88
if is_null_first && variant == 0 || !is_null_first && variant != 0 {
89
array.push_null();
90
return Ok(block);
91
}
92
}
93
deserialize_value(array, avro_field, block)
94
}
95
96
fn deserialize_value<'a>(
97
array: &mut dyn MutableArray,
98
avro_field: &AvroSchema,
99
mut block: &'a [u8],
100
) -> PolarsResult<&'a [u8]> {
101
let dtype = array.dtype();
102
match dtype {
103
ArrowDataType::List(inner) => {
104
let is_nullable = inner.is_nullable;
105
let avro_inner = match avro_field {
106
AvroSchema::Array(inner) => inner.as_ref(),
107
AvroSchema::Union(u) => match &u.as_slice() {
108
&[AvroSchema::Array(inner), _] | &[_, AvroSchema::Array(inner)] => {
109
inner.as_ref()
110
},
111
_ => unreachable!(),
112
},
113
_ => unreachable!(),
114
};
115
116
let array = array
117
.as_mut_any()
118
.downcast_mut::<DynMutableListArray<i32>>()
119
.unwrap();
120
// Arrays are encoded as a series of blocks.
121
loop {
122
// Each block consists of a long count value, followed by that many array items.
123
let len = util::zigzag_i64(&mut block)?;
124
let len = if len < 0 {
125
// Avro spec: If a block's count is negative, its absolute value is used,
126
// and the count is followed immediately by a long block size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when projecting a record to a subset of its fields.
127
let _ = util::zigzag_i64(&mut block)?;
128
129
-len
130
} else {
131
len
132
};
133
134
// A block with count zero indicates the end of the array.
135
if len == 0 {
136
break;
137
}
138
139
// Each item is encoded per the array’s item schema.
140
let values = array.mut_values();
141
for _ in 0..len {
142
block = deserialize_item(values, is_nullable, avro_inner, block)?;
143
}
144
}
145
array.try_push_valid()?;
146
},
147
ArrowDataType::Struct(inner_fields) => {
148
let fields = match avro_field {
149
AvroSchema::Record(Record { fields, .. }) => fields,
150
AvroSchema::Union(u) => match &u.as_slice() {
151
&[AvroSchema::Record(Record { fields, .. }), _]
152
| &[_, AvroSchema::Record(Record { fields, .. })] => fields,
153
_ => unreachable!(),
154
},
155
_ => unreachable!(),
156
};
157
158
let is_nullable = inner_fields
159
.iter()
160
.map(|x| x.is_nullable)
161
.collect::<Vec<_>>();
162
let array = array
163
.as_mut_any()
164
.downcast_mut::<DynMutableStructArray>()
165
.unwrap();
166
167
for (index, (field, is_nullable)) in fields.iter().zip(is_nullable.iter()).enumerate() {
168
let values = array.mut_values(index);
169
block = deserialize_item(values, *is_nullable, &field.schema, block)?;
170
}
171
array.try_push_valid()?;
172
},
173
_ => match dtype.to_physical_type() {
174
PhysicalType::Boolean => {
175
let is_valid = block[0] == 1;
176
block = &block[1..];
177
let array = array
178
.as_mut_any()
179
.downcast_mut::<MutableBooleanArray>()
180
.unwrap();
181
array.push(Some(is_valid))
182
},
183
PhysicalType::Primitive(primitive) => match primitive {
184
PrimitiveType::Int32 => {
185
let value = util::zigzag_i64(&mut block)? as i32;
186
let array = array
187
.as_mut_any()
188
.downcast_mut::<MutablePrimitiveArray<i32>>()
189
.unwrap();
190
array.push(Some(value))
191
},
192
PrimitiveType::Int64 => {
193
let value = util::zigzag_i64(&mut block)?;
194
let array = array
195
.as_mut_any()
196
.downcast_mut::<MutablePrimitiveArray<i64>>()
197
.unwrap();
198
array.push(Some(value))
199
},
200
PrimitiveType::Float32 => {
201
let value = f32::from_le_bytes(block[..size_of::<f32>()].try_into().unwrap());
202
block = &block[size_of::<f32>()..];
203
let array = array
204
.as_mut_any()
205
.downcast_mut::<MutablePrimitiveArray<f32>>()
206
.unwrap();
207
array.push(Some(value))
208
},
209
PrimitiveType::Float64 => {
210
let value = f64::from_le_bytes(block[..size_of::<f64>()].try_into().unwrap());
211
block = &block[size_of::<f64>()..];
212
let array = array
213
.as_mut_any()
214
.downcast_mut::<MutablePrimitiveArray<f64>>()
215
.unwrap();
216
array.push(Some(value))
217
},
218
PrimitiveType::MonthDayNano => {
219
// https://avro.apache.org/docs/current/spec.html#Duration
220
// 12 bytes, months, days, millis in LE
221
let data = &block[..12];
222
block = &block[12..];
223
224
let value = months_days_ns::new(
225
i32::from_le_bytes([data[0], data[1], data[2], data[3]]),
226
i32::from_le_bytes([data[4], data[5], data[6], data[7]]),
227
i32::from_le_bytes([data[8], data[9], data[10], data[11]]) as i64
228
* 1_000_000,
229
);
230
231
let array = array
232
.as_mut_any()
233
.downcast_mut::<MutablePrimitiveArray<months_days_ns>>()
234
.unwrap();
235
array.push(Some(value))
236
},
237
PrimitiveType::Int128 => {
238
let avro_inner = match avro_field {
239
AvroSchema::Bytes(_) | AvroSchema::Fixed(_) => avro_field,
240
AvroSchema::Union(u) => match &u.as_slice() {
241
&[e, AvroSchema::Null] | &[AvroSchema::Null, e] => e,
242
_ => unreachable!(),
243
},
244
_ => unreachable!(),
245
};
246
let len = match avro_inner {
247
AvroSchema::Bytes(_) => {
248
util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
249
polars_err!(
250
oos = "Avro format contains a non-usize number of bytes"
251
)
252
})?
253
},
254
AvroSchema::Fixed(b) => b.size,
255
_ => unreachable!(),
256
};
257
if len > 16 {
258
polars_bail!(oos = "Avro decimal bytes return more than 16 bytes")
259
}
260
let mut bytes = [0u8; 16];
261
bytes[..len].copy_from_slice(&block[..len]);
262
block = &block[len..];
263
let data = i128::from_be_bytes(bytes) >> (8 * (16 - len));
264
let array = array
265
.as_mut_any()
266
.downcast_mut::<MutablePrimitiveArray<i128>>()
267
.unwrap();
268
array.push(Some(data))
269
},
270
_ => unreachable!(),
271
},
272
PhysicalType::Utf8 => {
273
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
274
polars_err!(oos = "Avro format contains a non-usize number of bytes")
275
})?;
276
let data = simdutf8::basic::from_utf8(&block[..len])?;
277
block = &block[len..];
278
279
let array = array
280
.as_mut_any()
281
.downcast_mut::<MutableUtf8Array<i32>>()
282
.unwrap();
283
array.push(Some(data))
284
},
285
PhysicalType::Binary => {
286
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
287
polars_err!(oos = "Avro format contains a non-usize number of bytes")
288
})?;
289
let data = &block[..len];
290
block = &block[len..];
291
292
let array = array
293
.as_mut_any()
294
.downcast_mut::<MutableBinaryArray<i32>>()
295
.unwrap();
296
array.push(Some(data));
297
},
298
PhysicalType::FixedSizeBinary => {
299
let array = array
300
.as_mut_any()
301
.downcast_mut::<MutableFixedSizeBinaryArray>()
302
.unwrap();
303
let len = array.size();
304
let data = &block[..len];
305
block = &block[len..];
306
array.push(Some(data));
307
},
308
PhysicalType::Dictionary(_) => {
309
let index = util::zigzag_i64(&mut block)? as i32;
310
let array = array
311
.as_mut_any()
312
.downcast_mut::<FixedItemsUtf8Dictionary>()
313
.unwrap();
314
array.push_valid(index);
315
},
316
_ => todo!(),
317
},
318
};
319
Ok(block)
320
}
321
322
fn skip_item<'a>(
323
field: &Field,
324
avro_field: &AvroSchema,
325
mut block: &'a [u8],
326
) -> PolarsResult<&'a [u8]> {
327
if field.is_nullable {
328
let variant = util::zigzag_i64(&mut block)?;
329
let is_null_first = is_union_null_first(avro_field);
330
if is_null_first && variant == 0 || !is_null_first && variant != 0 {
331
return Ok(block);
332
}
333
}
334
match &field.dtype {
335
ArrowDataType::List(inner) => {
336
let avro_inner = match avro_field {
337
AvroSchema::Array(inner) => inner.as_ref(),
338
AvroSchema::Union(u) => match &u.as_slice() {
339
&[AvroSchema::Array(inner), _] | &[_, AvroSchema::Array(inner)] => {
340
inner.as_ref()
341
},
342
_ => unreachable!(),
343
},
344
_ => unreachable!(),
345
};
346
347
loop {
348
let len = util::zigzag_i64(&mut block)?;
349
let (len, bytes) = if len < 0 {
350
// Avro spec: If a block's count is negative, its absolute value is used,
351
// and the count is followed immediately by a long block size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when projecting a record to a subset of its fields.
352
let bytes = util::zigzag_i64(&mut block)?;
353
354
(-len, Some(bytes))
355
} else {
356
(len, None)
357
};
358
359
let bytes: Option<usize> = bytes
360
.map(|bytes| {
361
bytes
362
.try_into()
363
.map_err(|_| polars_err!(oos = "Avro block size negative or too large"))
364
})
365
.transpose()?;
366
367
if len == 0 {
368
break;
369
}
370
371
if let Some(bytes) = bytes {
372
block = &block[bytes..];
373
} else {
374
for _ in 0..len {
375
block = skip_item(inner, avro_inner, block)?;
376
}
377
}
378
}
379
},
380
ArrowDataType::Struct(inner_fields) => {
381
let fields = match avro_field {
382
AvroSchema::Record(Record { fields, .. }) => fields,
383
AvroSchema::Union(u) => match &u.as_slice() {
384
&[AvroSchema::Record(Record { fields, .. }), _]
385
| &[_, AvroSchema::Record(Record { fields, .. })] => fields,
386
_ => unreachable!(),
387
},
388
_ => unreachable!(),
389
};
390
391
for (field, avro_field) in inner_fields.iter().zip(fields.iter()) {
392
block = skip_item(field, &avro_field.schema, block)?;
393
}
394
},
395
_ => match field.dtype.to_physical_type() {
396
PhysicalType::Boolean => {
397
let _ = block[0] == 1;
398
block = &block[1..];
399
},
400
PhysicalType::Primitive(primitive) => match primitive {
401
PrimitiveType::Int32 => {
402
let _ = util::zigzag_i64(&mut block)?;
403
},
404
PrimitiveType::Int64 => {
405
let _ = util::zigzag_i64(&mut block)?;
406
},
407
PrimitiveType::Float32 => {
408
block = &block[size_of::<f32>()..];
409
},
410
PrimitiveType::Float64 => {
411
block = &block[size_of::<f64>()..];
412
},
413
PrimitiveType::MonthDayNano => {
414
block = &block[12..];
415
},
416
PrimitiveType::Int128 => {
417
let avro_inner = match avro_field {
418
AvroSchema::Bytes(_) | AvroSchema::Fixed(_) => avro_field,
419
AvroSchema::Union(u) => match &u.as_slice() {
420
&[e, AvroSchema::Null] | &[AvroSchema::Null, e] => e,
421
_ => unreachable!(),
422
},
423
_ => unreachable!(),
424
};
425
let len = match avro_inner {
426
AvroSchema::Bytes(_) => {
427
util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
428
polars_err!(
429
oos = "Avro format contains a non-usize number of bytes"
430
)
431
})?
432
},
433
AvroSchema::Fixed(b) => b.size,
434
_ => unreachable!(),
435
};
436
block = &block[len..];
437
},
438
_ => unreachable!(),
439
},
440
PhysicalType::Utf8 | PhysicalType::Binary => {
441
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
442
polars_err!(oos = "Avro format contains a non-usize number of bytes")
443
})?;
444
block = &block[len..];
445
},
446
PhysicalType::FixedSizeBinary => {
447
let len = if let ArrowDataType::FixedSizeBinary(len) = &field.dtype {
448
*len
449
} else {
450
unreachable!()
451
};
452
453
block = &block[len..];
454
},
455
PhysicalType::Dictionary(_) => {
456
let _ = util::zigzag_i64(&mut block)? as i32;
457
},
458
_ => todo!(),
459
},
460
}
461
Ok(block)
462
}
463
464
/// Deserializes a [`Block`] assumed to be encoded according to [`AvroField`] into [`RecordBatchT`],
465
/// using `projection` to ignore `avro_fields`.
466
/// # Panics
467
/// `fields`, `avro_fields` and `projection` must have the same length.
468
pub fn deserialize(
469
block: &Block,
470
fields: &ArrowSchema,
471
avro_fields: &[AvroField],
472
projection: &[bool],
473
) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
474
assert_eq!(fields.len(), avro_fields.len());
475
assert_eq!(fields.len(), projection.len());
476
477
let rows = block.number_of_rows;
478
let mut block = block.data.as_ref();
479
480
// create mutables, one per field
481
let mut arrays: Vec<Box<dyn MutableArray>> = fields
482
.iter_values()
483
.zip(avro_fields.iter())
484
.zip(projection.iter())
485
.map(|((field, avro_field), projection)| {
486
if *projection {
487
make_mutable(&field.dtype, Some(&avro_field.schema), rows)
488
} else {
489
// just something; we are not going to use it
490
make_mutable(&ArrowDataType::Int32, None, 0)
491
}
492
})
493
.collect::<PolarsResult<_>>()?;
494
495
// this is _the_ expensive transpose (rows -> columns)
496
for _ in 0..rows {
497
let iter = arrays
498
.iter_mut()
499
.zip(fields.iter_values())
500
.zip(avro_fields.iter())
501
.zip(projection.iter());
502
503
for (((array, field), avro_field), projection) in iter {
504
block = if *projection {
505
deserialize_item(array.as_mut(), field.is_nullable, &avro_field.schema, block)
506
} else {
507
skip_item(field, &avro_field.schema, block)
508
}?
509
}
510
}
511
512
let projected_schema = fields
513
.iter_values()
514
.zip(projection)
515
.filter_map(|(f, p)| (*p).then_some(f))
516
.cloned()
517
.collect();
518
519
RecordBatchT::try_new(
520
rows,
521
Arc::new(projected_schema),
522
arrays
523
.iter_mut()
524
.zip(projection.iter())
525
.filter_map(|x| x.1.then(|| x.0))
526
.map(|array| array.as_box())
527
.collect(),
528
)
529
}
530
531