Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/write/serialize/mod.rs
8424 views
1
#![allow(clippy::ptr_arg)] // false positive in clippy, see https://github.com/rust-lang/rust-clippy/issues/8463
2
use arrow_format::ipc;
3
4
use super::super::compression;
5
use super::super::endianness::is_native_little_endian;
6
use super::common::{Compression, pad_to_64};
7
use crate::array::*;
8
use crate::bitmap::Bitmap;
9
use crate::datatypes::PhysicalType;
10
use crate::offset::{Offset, OffsetsBuffer};
11
use crate::trusted_len::TrustedLen;
12
use crate::types::NativeType;
13
use crate::{match_integer_type, with_match_primitive_type_full};
14
mod binary;
15
mod binview;
16
mod boolean;
17
mod fixed_size_binary;
18
mod fixed_sized_list;
19
mod list;
20
mod map;
21
mod primitive;
22
mod struct_;
23
mod union;
24
25
use binary::*;
26
use binview::*;
27
use boolean::*;
28
use fixed_size_binary::*;
29
use fixed_sized_list::*;
30
use list::*;
31
use map::*;
32
use primitive::*;
33
use struct_::*;
34
use union::*;
35
36
/// Writes an [`Array`] to `arrow_data`
37
pub fn write(
38
array: &dyn Array,
39
buffers: &mut Vec<ipc::Buffer>,
40
arrow_data: &mut Vec<u8>,
41
nodes: &mut Vec<ipc::FieldNode>,
42
offset: &mut i64,
43
is_little_endian: bool,
44
compression: Option<Compression>,
45
) {
46
nodes.push(ipc::FieldNode {
47
length: array.len() as i64,
48
null_count: array.null_count() as i64,
49
});
50
use PhysicalType::*;
51
match array.dtype().to_physical_type() {
52
Null => (),
53
Boolean => write_boolean(
54
array.as_any().downcast_ref().unwrap(),
55
buffers,
56
arrow_data,
57
offset,
58
is_little_endian,
59
compression,
60
),
61
Primitive(primitive) => with_match_primitive_type_full!(primitive, |$T| {
62
let array = array.as_any().downcast_ref().unwrap();
63
write_primitive::<$T>(array, buffers, arrow_data, offset, is_little_endian, compression)
64
}),
65
Binary => write_binary::<i32>(
66
array.as_any().downcast_ref().unwrap(),
67
buffers,
68
arrow_data,
69
offset,
70
is_little_endian,
71
compression,
72
),
73
LargeBinary => write_binary::<i64>(
74
array.as_any().downcast_ref().unwrap(),
75
buffers,
76
arrow_data,
77
offset,
78
is_little_endian,
79
compression,
80
),
81
FixedSizeBinary => write_fixed_size_binary(
82
array.as_any().downcast_ref().unwrap(),
83
buffers,
84
arrow_data,
85
offset,
86
is_little_endian,
87
compression,
88
),
89
Utf8 => write_utf8::<i32>(
90
array.as_any().downcast_ref().unwrap(),
91
buffers,
92
arrow_data,
93
offset,
94
is_little_endian,
95
compression,
96
),
97
LargeUtf8 => write_utf8::<i64>(
98
array.as_any().downcast_ref().unwrap(),
99
buffers,
100
arrow_data,
101
offset,
102
is_little_endian,
103
compression,
104
),
105
List => write_list::<i32>(
106
array.as_any().downcast_ref().unwrap(),
107
buffers,
108
arrow_data,
109
nodes,
110
offset,
111
is_little_endian,
112
compression,
113
),
114
LargeList => write_list::<i64>(
115
array.as_any().downcast_ref().unwrap(),
116
buffers,
117
arrow_data,
118
nodes,
119
offset,
120
is_little_endian,
121
compression,
122
),
123
FixedSizeList => write_fixed_size_list(
124
array.as_any().downcast_ref().unwrap(),
125
buffers,
126
arrow_data,
127
nodes,
128
offset,
129
is_little_endian,
130
compression,
131
),
132
Struct => write_struct(
133
array.as_any().downcast_ref().unwrap(),
134
buffers,
135
arrow_data,
136
nodes,
137
offset,
138
is_little_endian,
139
compression,
140
),
141
Dictionary(key_type) => match_integer_type!(key_type, |$T| {
142
let array: &DictionaryArray<$T> = array.as_any().downcast_ref().unwrap();
143
let keys_array: &PrimitiveArray<$T> = array.keys().as_any().downcast_ref().unwrap();
144
145
write_primitive::<$T>(
146
keys_array,
147
buffers,
148
arrow_data,
149
offset,
150
is_little_endian,
151
compression
152
)
153
}),
154
Union => {
155
write_union(
156
array.as_any().downcast_ref().unwrap(),
157
buffers,
158
arrow_data,
159
nodes,
160
offset,
161
is_little_endian,
162
compression,
163
);
164
},
165
Map => {
166
write_map(
167
array.as_any().downcast_ref().unwrap(),
168
buffers,
169
arrow_data,
170
nodes,
171
offset,
172
is_little_endian,
173
compression,
174
);
175
},
176
Utf8View => write_binview(
177
array.as_any().downcast_ref::<Utf8ViewArray>().unwrap(),
178
buffers,
179
arrow_data,
180
offset,
181
is_little_endian,
182
compression,
183
),
184
BinaryView => write_binview(
185
array.as_any().downcast_ref::<BinaryViewArray>().unwrap(),
186
buffers,
187
arrow_data,
188
offset,
189
is_little_endian,
190
compression,
191
),
192
}
193
}
194
195
#[inline]
196
fn pad_buffer_to_64(buffer: &mut Vec<u8>, length: usize) {
197
let pad_len = pad_to_64(length);
198
for _ in 0..pad_len {
199
buffer.push(0u8);
200
}
201
}
202
203
/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.
204
fn write_bytes(
205
bytes: &[u8],
206
buffers: &mut Vec<ipc::Buffer>,
207
arrow_data: &mut Vec<u8>,
208
offset: &mut i64,
209
compression: Option<Compression>,
210
) {
211
let start = arrow_data.len();
212
if let Some(compression) = compression {
213
arrow_data.extend_from_slice(&(bytes.len() as i64).to_le_bytes());
214
match compression {
215
Compression::LZ4 => {
216
compression::compress_lz4(bytes, arrow_data).unwrap();
217
},
218
Compression::ZSTD(level) => {
219
compression::compress_zstd(bytes, arrow_data, level).unwrap();
220
},
221
}
222
} else {
223
arrow_data.extend_from_slice(bytes);
224
};
225
226
buffers.push(finish_buffer(arrow_data, start, offset));
227
}
228
229
fn write_bitmap(
230
bitmap: Option<&Bitmap>,
231
length: usize,
232
buffers: &mut Vec<ipc::Buffer>,
233
arrow_data: &mut Vec<u8>,
234
offset: &mut i64,
235
compression: Option<Compression>,
236
) {
237
match bitmap {
238
Some(bitmap) => {
239
assert_eq!(bitmap.len(), length);
240
let (slice, slice_offset, _) = bitmap.as_slice();
241
if slice_offset != 0 {
242
// case where we can't slice the bitmap as the offsets are not multiple of 8
243
let bytes = Bitmap::from_trusted_len_iter(bitmap.iter());
244
let (slice, _, _) = bytes.as_slice();
245
write_bytes(slice, buffers, arrow_data, offset, compression)
246
} else {
247
write_bytes(slice, buffers, arrow_data, offset, compression)
248
}
249
},
250
None => {
251
buffers.push(ipc::Buffer {
252
offset: *offset,
253
length: 0,
254
});
255
},
256
}
257
}
258
259
/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.
260
fn write_buffer<T: NativeType>(
261
buffer: &[T],
262
buffers: &mut Vec<ipc::Buffer>,
263
arrow_data: &mut Vec<u8>,
264
offset: &mut i64,
265
is_little_endian: bool,
266
compression: Option<Compression>,
267
) {
268
let start = arrow_data.len();
269
if let Some(compression) = compression {
270
_write_compressed_buffer(buffer, arrow_data, is_little_endian, compression);
271
} else {
272
_write_buffer(buffer, arrow_data, is_little_endian);
273
};
274
275
buffers.push(finish_buffer(arrow_data, start, offset));
276
}
277
278
#[inline]
279
fn _write_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(
280
buffer: I,
281
arrow_data: &mut Vec<u8>,
282
is_little_endian: bool,
283
) {
284
let len = buffer.size_hint().0;
285
arrow_data.reserve(len * size_of::<T>());
286
if is_little_endian {
287
buffer
288
.map(|x| T::to_le_bytes(&x))
289
.for_each(|x| arrow_data.extend_from_slice(x.as_ref()))
290
} else {
291
buffer
292
.map(|x| T::to_be_bytes(&x))
293
.for_each(|x| arrow_data.extend_from_slice(x.as_ref()))
294
}
295
}
296
297
#[inline]
298
fn _write_compressed_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(
299
buffer: I,
300
arrow_data: &mut Vec<u8>,
301
is_little_endian: bool,
302
compression: Compression,
303
) {
304
let len = buffer.size_hint().0;
305
let mut swapped = Vec::with_capacity(len * size_of::<T>());
306
if is_little_endian {
307
buffer
308
.map(|x| T::to_le_bytes(&x))
309
.for_each(|x| swapped.extend_from_slice(x.as_ref()));
310
} else {
311
buffer
312
.map(|x| T::to_be_bytes(&x))
313
.for_each(|x| swapped.extend_from_slice(x.as_ref()))
314
};
315
arrow_data.extend_from_slice(&(swapped.len() as i64).to_le_bytes());
316
match compression {
317
Compression::LZ4 => {
318
compression::compress_lz4(&swapped, arrow_data).unwrap();
319
},
320
Compression::ZSTD(level) => {
321
compression::compress_zstd(&swapped, arrow_data, level).unwrap();
322
},
323
}
324
}
325
326
fn _write_buffer<T: NativeType>(buffer: &[T], arrow_data: &mut Vec<u8>, is_little_endian: bool) {
327
if is_little_endian == is_native_little_endian() {
328
// in native endianness we can use the bytes directly.
329
let buffer = bytemuck::cast_slice(buffer);
330
arrow_data.extend_from_slice(buffer);
331
} else {
332
_write_buffer_from_iter(buffer.iter().copied(), arrow_data, is_little_endian)
333
}
334
}
335
336
fn _write_compressed_buffer<T: NativeType>(
337
buffer: &[T],
338
arrow_data: &mut Vec<u8>,
339
is_little_endian: bool,
340
compression: Compression,
341
) {
342
if is_little_endian == is_native_little_endian() {
343
let bytes = bytemuck::cast_slice(buffer);
344
arrow_data.extend_from_slice(&(bytes.len() as i64).to_le_bytes());
345
match compression {
346
Compression::LZ4 => {
347
compression::compress_lz4(bytes, arrow_data).unwrap();
348
},
349
Compression::ZSTD(level) => {
350
compression::compress_zstd(bytes, arrow_data, level).unwrap();
351
},
352
}
353
} else {
354
todo!()
355
}
356
}
357
358
/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.
359
#[inline]
360
fn write_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(
361
buffer: I,
362
buffers: &mut Vec<ipc::Buffer>,
363
arrow_data: &mut Vec<u8>,
364
offset: &mut i64,
365
is_little_endian: bool,
366
compression: Option<Compression>,
367
) {
368
let start = arrow_data.len();
369
370
if let Some(compression) = compression {
371
_write_compressed_buffer_from_iter(buffer, arrow_data, is_little_endian, compression);
372
} else {
373
_write_buffer_from_iter(buffer, arrow_data, is_little_endian);
374
}
375
376
buffers.push(finish_buffer(arrow_data, start, offset));
377
}
378
379
fn finish_buffer(arrow_data: &mut Vec<u8>, start: usize, offset: &mut i64) -> ipc::Buffer {
380
let buffer_len = (arrow_data.len() - start) as i64;
381
382
pad_buffer_to_64(arrow_data, arrow_data.len() - start);
383
let total_len = (arrow_data.len() - start) as i64;
384
385
let buffer = ipc::Buffer {
386
offset: *offset,
387
length: buffer_len,
388
};
389
*offset += total_len;
390
buffer
391
}
392
393