Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/write/serialize/mod.rs
6940 views
1
#![allow(clippy::ptr_arg)] // false positive in clippy, see https://github.com/rust-lang/rust-clippy/issues/8463
2
use arrow_format::ipc;
3
4
use super::super::compression;
5
use super::super::endianness::is_native_little_endian;
6
use super::common::{Compression, pad_to_64};
7
use crate::array::*;
8
use crate::bitmap::Bitmap;
9
use crate::datatypes::PhysicalType;
10
use crate::offset::{Offset, OffsetsBuffer};
11
use crate::trusted_len::TrustedLen;
12
use crate::types::NativeType;
13
use crate::{match_integer_type, with_match_primitive_type_full};
14
mod binary;
15
mod binview;
16
mod boolean;
17
mod dictionary;
18
mod fixed_size_binary;
19
mod fixed_sized_list;
20
mod list;
21
mod map;
22
mod primitive;
23
mod struct_;
24
mod union;
25
26
use binary::*;
27
use binview::*;
28
use boolean::*;
29
pub(super) use dictionary::*;
30
use fixed_size_binary::*;
31
use fixed_sized_list::*;
32
use list::*;
33
use map::*;
34
use primitive::*;
35
use struct_::*;
36
use union::*;
37
38
/// Writes an [`Array`] to `arrow_data`
39
pub fn write(
40
array: &dyn Array,
41
buffers: &mut Vec<ipc::Buffer>,
42
arrow_data: &mut Vec<u8>,
43
nodes: &mut Vec<ipc::FieldNode>,
44
offset: &mut i64,
45
is_little_endian: bool,
46
compression: Option<Compression>,
47
) {
48
nodes.push(ipc::FieldNode {
49
length: array.len() as i64,
50
null_count: array.null_count() as i64,
51
});
52
use PhysicalType::*;
53
match array.dtype().to_physical_type() {
54
Null => (),
55
Boolean => write_boolean(
56
array.as_any().downcast_ref().unwrap(),
57
buffers,
58
arrow_data,
59
offset,
60
is_little_endian,
61
compression,
62
),
63
Primitive(primitive) => with_match_primitive_type_full!(primitive, |$T| {
64
let array = array.as_any().downcast_ref().unwrap();
65
write_primitive::<$T>(array, buffers, arrow_data, offset, is_little_endian, compression)
66
}),
67
Binary => write_binary::<i32>(
68
array.as_any().downcast_ref().unwrap(),
69
buffers,
70
arrow_data,
71
offset,
72
is_little_endian,
73
compression,
74
),
75
LargeBinary => write_binary::<i64>(
76
array.as_any().downcast_ref().unwrap(),
77
buffers,
78
arrow_data,
79
offset,
80
is_little_endian,
81
compression,
82
),
83
FixedSizeBinary => write_fixed_size_binary(
84
array.as_any().downcast_ref().unwrap(),
85
buffers,
86
arrow_data,
87
offset,
88
is_little_endian,
89
compression,
90
),
91
Utf8 => write_utf8::<i32>(
92
array.as_any().downcast_ref().unwrap(),
93
buffers,
94
arrow_data,
95
offset,
96
is_little_endian,
97
compression,
98
),
99
LargeUtf8 => write_utf8::<i64>(
100
array.as_any().downcast_ref().unwrap(),
101
buffers,
102
arrow_data,
103
offset,
104
is_little_endian,
105
compression,
106
),
107
List => write_list::<i32>(
108
array.as_any().downcast_ref().unwrap(),
109
buffers,
110
arrow_data,
111
nodes,
112
offset,
113
is_little_endian,
114
compression,
115
),
116
LargeList => write_list::<i64>(
117
array.as_any().downcast_ref().unwrap(),
118
buffers,
119
arrow_data,
120
nodes,
121
offset,
122
is_little_endian,
123
compression,
124
),
125
FixedSizeList => write_fixed_size_list(
126
array.as_any().downcast_ref().unwrap(),
127
buffers,
128
arrow_data,
129
nodes,
130
offset,
131
is_little_endian,
132
compression,
133
),
134
Struct => write_struct(
135
array.as_any().downcast_ref().unwrap(),
136
buffers,
137
arrow_data,
138
nodes,
139
offset,
140
is_little_endian,
141
compression,
142
),
143
Dictionary(key_type) => match_integer_type!(key_type, |$T| {
144
write_dictionary::<$T>(
145
array.as_any().downcast_ref().unwrap(),
146
buffers,
147
arrow_data,
148
nodes,
149
offset,
150
is_little_endian,
151
compression,
152
true,
153
);
154
}),
155
Union => {
156
write_union(
157
array.as_any().downcast_ref().unwrap(),
158
buffers,
159
arrow_data,
160
nodes,
161
offset,
162
is_little_endian,
163
compression,
164
);
165
},
166
Map => {
167
write_map(
168
array.as_any().downcast_ref().unwrap(),
169
buffers,
170
arrow_data,
171
nodes,
172
offset,
173
is_little_endian,
174
compression,
175
);
176
},
177
Utf8View => write_binview(
178
array.as_any().downcast_ref::<Utf8ViewArray>().unwrap(),
179
buffers,
180
arrow_data,
181
offset,
182
is_little_endian,
183
compression,
184
),
185
BinaryView => write_binview(
186
array.as_any().downcast_ref::<BinaryViewArray>().unwrap(),
187
buffers,
188
arrow_data,
189
offset,
190
is_little_endian,
191
compression,
192
),
193
}
194
}
195
196
#[inline]
197
fn pad_buffer_to_64(buffer: &mut Vec<u8>, length: usize) {
198
let pad_len = pad_to_64(length);
199
for _ in 0..pad_len {
200
buffer.push(0u8);
201
}
202
}
203
204
/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.
205
fn write_bytes(
206
bytes: &[u8],
207
buffers: &mut Vec<ipc::Buffer>,
208
arrow_data: &mut Vec<u8>,
209
offset: &mut i64,
210
compression: Option<Compression>,
211
) {
212
let start = arrow_data.len();
213
if let Some(compression) = compression {
214
arrow_data.extend_from_slice(&(bytes.len() as i64).to_le_bytes());
215
match compression {
216
Compression::LZ4 => {
217
compression::compress_lz4(bytes, arrow_data).unwrap();
218
},
219
Compression::ZSTD => {
220
compression::compress_zstd(bytes, arrow_data).unwrap();
221
},
222
}
223
} else {
224
arrow_data.extend_from_slice(bytes);
225
};
226
227
buffers.push(finish_buffer(arrow_data, start, offset));
228
}
229
230
fn write_bitmap(
231
bitmap: Option<&Bitmap>,
232
length: usize,
233
buffers: &mut Vec<ipc::Buffer>,
234
arrow_data: &mut Vec<u8>,
235
offset: &mut i64,
236
compression: Option<Compression>,
237
) {
238
match bitmap {
239
Some(bitmap) => {
240
assert_eq!(bitmap.len(), length);
241
let (slice, slice_offset, _) = bitmap.as_slice();
242
if slice_offset != 0 {
243
// case where we can't slice the bitmap as the offsets are not multiple of 8
244
let bytes = Bitmap::from_trusted_len_iter(bitmap.iter());
245
let (slice, _, _) = bytes.as_slice();
246
write_bytes(slice, buffers, arrow_data, offset, compression)
247
} else {
248
write_bytes(slice, buffers, arrow_data, offset, compression)
249
}
250
},
251
None => {
252
buffers.push(ipc::Buffer {
253
offset: *offset,
254
length: 0,
255
});
256
},
257
}
258
}
259
260
/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.
261
fn write_buffer<T: NativeType>(
262
buffer: &[T],
263
buffers: &mut Vec<ipc::Buffer>,
264
arrow_data: &mut Vec<u8>,
265
offset: &mut i64,
266
is_little_endian: bool,
267
compression: Option<Compression>,
268
) {
269
let start = arrow_data.len();
270
if let Some(compression) = compression {
271
_write_compressed_buffer(buffer, arrow_data, is_little_endian, compression);
272
} else {
273
_write_buffer(buffer, arrow_data, is_little_endian);
274
};
275
276
buffers.push(finish_buffer(arrow_data, start, offset));
277
}
278
279
#[inline]
280
fn _write_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(
281
buffer: I,
282
arrow_data: &mut Vec<u8>,
283
is_little_endian: bool,
284
) {
285
let len = buffer.size_hint().0;
286
arrow_data.reserve(len * size_of::<T>());
287
if is_little_endian {
288
buffer
289
.map(|x| T::to_le_bytes(&x))
290
.for_each(|x| arrow_data.extend_from_slice(x.as_ref()))
291
} else {
292
buffer
293
.map(|x| T::to_be_bytes(&x))
294
.for_each(|x| arrow_data.extend_from_slice(x.as_ref()))
295
}
296
}
297
298
#[inline]
299
fn _write_compressed_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(
300
buffer: I,
301
arrow_data: &mut Vec<u8>,
302
is_little_endian: bool,
303
compression: Compression,
304
) {
305
let len = buffer.size_hint().0;
306
let mut swapped = Vec::with_capacity(len * size_of::<T>());
307
if is_little_endian {
308
buffer
309
.map(|x| T::to_le_bytes(&x))
310
.for_each(|x| swapped.extend_from_slice(x.as_ref()));
311
} else {
312
buffer
313
.map(|x| T::to_be_bytes(&x))
314
.for_each(|x| swapped.extend_from_slice(x.as_ref()))
315
};
316
arrow_data.extend_from_slice(&(swapped.len() as i64).to_le_bytes());
317
match compression {
318
Compression::LZ4 => {
319
compression::compress_lz4(&swapped, arrow_data).unwrap();
320
},
321
Compression::ZSTD => {
322
compression::compress_zstd(&swapped, arrow_data).unwrap();
323
},
324
}
325
}
326
327
fn _write_buffer<T: NativeType>(buffer: &[T], arrow_data: &mut Vec<u8>, is_little_endian: bool) {
328
if is_little_endian == is_native_little_endian() {
329
// in native endianness we can use the bytes directly.
330
let buffer = bytemuck::cast_slice(buffer);
331
arrow_data.extend_from_slice(buffer);
332
} else {
333
_write_buffer_from_iter(buffer.iter().copied(), arrow_data, is_little_endian)
334
}
335
}
336
337
fn _write_compressed_buffer<T: NativeType>(
338
buffer: &[T],
339
arrow_data: &mut Vec<u8>,
340
is_little_endian: bool,
341
compression: Compression,
342
) {
343
if is_little_endian == is_native_little_endian() {
344
let bytes = bytemuck::cast_slice(buffer);
345
arrow_data.extend_from_slice(&(bytes.len() as i64).to_le_bytes());
346
match compression {
347
Compression::LZ4 => {
348
compression::compress_lz4(bytes, arrow_data).unwrap();
349
},
350
Compression::ZSTD => {
351
compression::compress_zstd(bytes, arrow_data).unwrap();
352
},
353
}
354
} else {
355
todo!()
356
}
357
}
358
359
/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.
360
#[inline]
361
fn write_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(
362
buffer: I,
363
buffers: &mut Vec<ipc::Buffer>,
364
arrow_data: &mut Vec<u8>,
365
offset: &mut i64,
366
is_little_endian: bool,
367
compression: Option<Compression>,
368
) {
369
let start = arrow_data.len();
370
371
if let Some(compression) = compression {
372
_write_compressed_buffer_from_iter(buffer, arrow_data, is_little_endian, compression);
373
} else {
374
_write_buffer_from_iter(buffer, arrow_data, is_little_endian);
375
}
376
377
buffers.push(finish_buffer(arrow_data, start, offset));
378
}
379
380
fn finish_buffer(arrow_data: &mut Vec<u8>, start: usize, offset: &mut i64) -> ipc::Buffer {
381
let buffer_len = (arrow_data.len() - start) as i64;
382
383
pad_buffer_to_64(arrow_data, arrow_data.len() - start);
384
let total_len = (arrow_data.len() - start) as i64;
385
386
let buffer = ipc::Buffer {
387
offset: *offset,
388
length: buffer_len,
389
};
390
*offset += total_len;
391
buffer
392
}
393
394