Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/mmap/array.rs
6939 views
1
use std::collections::VecDeque;
2
use std::sync::Arc;
3
4
use polars_error::{PolarsResult, polars_bail, polars_err};
5
6
use crate::array::{Array, DictionaryKey, FixedSizeListArray, ListArray, StructArray, View};
7
use crate::datatypes::ArrowDataType;
8
use crate::ffi::mmap::create_array;
9
use crate::ffi::{ArrowArray, InternalArrowArray, export_array_to_c, try_from};
10
use crate::io::ipc::IpcField;
11
use crate::io::ipc::read::{Dictionaries, IpcBuffer, Node, OutOfSpecKind};
12
use crate::offset::Offset;
13
use crate::types::NativeType;
14
use crate::{match_integer_type, with_match_primitive_type_full};
15
16
fn get_buffer_bounds(buffers: &mut VecDeque<IpcBuffer>) -> PolarsResult<(usize, usize)> {
17
let buffer = buffers.pop_front().ok_or_else(
18
|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::ExpectedBuffer),
19
)?;
20
21
let offset: usize = buffer.offset().try_into().map_err(
22
|_| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
23
)?;
24
25
let length: usize = buffer.length().try_into().map_err(
26
|_| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
27
)?;
28
29
Ok((offset, length))
30
}
31
32
/// Checks that the length of `bytes` is at least `size_of::<T>() * expected_len`, and
33
/// returns a boolean indicating whether it is aligned.
34
fn check_bytes_len_and_is_aligned<T: NativeType>(
35
bytes: &[u8],
36
expected_len: usize,
37
) -> PolarsResult<bool> {
38
if bytes.len() < size_of::<T>() * expected_len {
39
polars_bail!(ComputeError: "buffer's length is too small in mmap")
40
};
41
42
Ok(bytemuck::try_cast_slice::<_, T>(bytes).is_ok())
43
}
44
45
fn get_buffer<'a, T: NativeType>(
46
data: &'a [u8],
47
block_offset: usize,
48
buffers: &mut VecDeque<IpcBuffer>,
49
num_rows: usize,
50
) -> PolarsResult<&'a [u8]> {
51
let (offset, length) = get_buffer_bounds(buffers)?;
52
53
// verify that they are in-bounds
54
let values = data
55
.get(block_offset + offset..block_offset + offset + length)
56
.ok_or_else(|| polars_err!(ComputeError: "buffer out of bounds"))?;
57
58
if !check_bytes_len_and_is_aligned::<T>(values, num_rows)? {
59
polars_bail!(ComputeError: "buffer not aligned for mmap");
60
}
61
62
Ok(values)
63
}
64
65
fn get_bytes<'a>(
66
data: &'a [u8],
67
block_offset: usize,
68
buffers: &mut VecDeque<IpcBuffer>,
69
) -> PolarsResult<&'a [u8]> {
70
let (offset, length) = get_buffer_bounds(buffers)?;
71
72
// verify that they are in-bounds
73
data.get(block_offset + offset..block_offset + offset + length)
74
.ok_or_else(|| polars_err!(ComputeError: "buffer out of bounds"))
75
}
76
77
fn get_validity<'a>(
78
data: &'a [u8],
79
block_offset: usize,
80
buffers: &mut VecDeque<IpcBuffer>,
81
null_count: usize,
82
) -> PolarsResult<Option<&'a [u8]>> {
83
let validity = get_buffer_bounds(buffers)?;
84
let (offset, length) = validity;
85
86
Ok(if null_count > 0 {
87
// verify that they are in-bounds and get its pointer
88
Some(
89
data.get(block_offset + offset..block_offset + offset + length)
90
.ok_or_else(|| polars_err!(ComputeError: "buffer out of bounds"))?,
91
)
92
} else {
93
None
94
})
95
}
96
97
fn get_num_rows_and_null_count(node: &Node) -> PolarsResult<(usize, usize)> {
98
let num_rows: usize = node
99
.length()
100
.try_into()
101
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
102
103
let null_count: usize = node
104
.null_count()
105
.try_into()
106
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
107
Ok((num_rows, null_count))
108
}
109
110
fn mmap_binary<O: Offset, T: AsRef<[u8]>>(
111
data: Arc<T>,
112
node: &Node,
113
block_offset: usize,
114
buffers: &mut VecDeque<IpcBuffer>,
115
) -> PolarsResult<ArrowArray> {
116
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
117
let data_ref = data.as_ref().as_ref();
118
119
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
120
121
let offsets = get_buffer::<O>(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr();
122
let values = get_buffer::<u8>(data_ref, block_offset, buffers, 0)?.as_ptr();
123
124
// NOTE: offsets and values invariants are _not_ validated
125
Ok(unsafe {
126
create_array(
127
data,
128
num_rows,
129
null_count,
130
[validity, Some(offsets), Some(values)].into_iter(),
131
[].into_iter(),
132
None,
133
None,
134
)
135
})
136
}
137
138
fn mmap_binview<T: AsRef<[u8]>>(
139
data: Arc<T>,
140
node: &Node,
141
block_offset: usize,
142
buffers: &mut VecDeque<IpcBuffer>,
143
variadic_buffer_counts: &mut VecDeque<usize>,
144
) -> PolarsResult<ArrowArray> {
145
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
146
let data_ref = data.as_ref().as_ref();
147
148
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
149
150
let views = get_buffer::<View>(data_ref, block_offset, buffers, num_rows)?;
151
152
let n_variadic = variadic_buffer_counts
153
.pop_front()
154
.ok_or_else(|| polars_err!(ComputeError: "expected variadic_buffer_count"))?;
155
156
let mut buffer_ptrs = Vec::with_capacity(n_variadic + 2);
157
buffer_ptrs.push(validity);
158
buffer_ptrs.push(Some(views.as_ptr()));
159
160
let mut variadic_buffer_sizes = Vec::with_capacity(n_variadic);
161
for _ in 0..n_variadic {
162
let variadic_buffer = get_bytes(data_ref, block_offset, buffers)?;
163
variadic_buffer_sizes.push(variadic_buffer.len() as i64);
164
buffer_ptrs.push(Some(variadic_buffer.as_ptr()));
165
}
166
buffer_ptrs.push(Some(variadic_buffer_sizes.as_ptr().cast::<u8>()));
167
168
// Move variadic buffer sizes in an Arc, so that it stays alive.
169
let data = Arc::new((data, variadic_buffer_sizes));
170
171
// NOTE: invariants are not validated
172
Ok(unsafe {
173
create_array(
174
data,
175
num_rows,
176
null_count,
177
buffer_ptrs.into_iter(),
178
[].into_iter(),
179
None,
180
None,
181
)
182
})
183
}
184
185
fn mmap_fixed_size_binary<T: AsRef<[u8]>>(
186
data: Arc<T>,
187
node: &Node,
188
block_offset: usize,
189
buffers: &mut VecDeque<IpcBuffer>,
190
dtype: &ArrowDataType,
191
) -> PolarsResult<ArrowArray> {
192
let bytes_per_row = if let ArrowDataType::FixedSizeBinary(bytes_per_row) = dtype {
193
bytes_per_row
194
} else {
195
polars_bail!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidDataType);
196
};
197
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
198
199
let data_ref = data.as_ref().as_ref();
200
201
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
202
let values =
203
get_buffer::<u8>(data_ref, block_offset, buffers, num_rows * bytes_per_row)?.as_ptr();
204
205
Ok(unsafe {
206
create_array(
207
data,
208
num_rows,
209
null_count,
210
[validity, Some(values)].into_iter(),
211
[].into_iter(),
212
None,
213
None,
214
)
215
})
216
}
217
218
fn mmap_null<T: AsRef<[u8]>>(
219
data: Arc<T>,
220
node: &Node,
221
_block_offset: usize,
222
_buffers: &mut VecDeque<IpcBuffer>,
223
) -> PolarsResult<ArrowArray> {
224
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
225
226
Ok(unsafe {
227
create_array(
228
data,
229
num_rows,
230
null_count,
231
[].into_iter(),
232
[].into_iter(),
233
None,
234
None,
235
)
236
})
237
}
238
239
fn mmap_boolean<T: AsRef<[u8]>>(
240
data: Arc<T>,
241
node: &Node,
242
block_offset: usize,
243
buffers: &mut VecDeque<IpcBuffer>,
244
) -> PolarsResult<ArrowArray> {
245
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
246
247
let data_ref = data.as_ref().as_ref();
248
249
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
250
251
let values = get_buffer_bounds(buffers)?;
252
let (offset, length) = values;
253
254
// verify that they are in-bounds and get its pointer
255
let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr();
256
257
Ok(unsafe {
258
create_array(
259
data,
260
num_rows,
261
null_count,
262
[validity, Some(values)].into_iter(),
263
[].into_iter(),
264
None,
265
None,
266
)
267
})
268
}
269
270
fn mmap_primitive<P: NativeType, T: AsRef<[u8]>>(
271
data: Arc<T>,
272
node: &Node,
273
block_offset: usize,
274
buffers: &mut VecDeque<IpcBuffer>,
275
) -> PolarsResult<ArrowArray> {
276
let data_ref = data.as_ref().as_ref();
277
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
278
279
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
280
281
let bytes = get_bytes(data_ref, block_offset, buffers)?;
282
let is_aligned = check_bytes_len_and_is_aligned::<P>(bytes, num_rows)?;
283
284
let out = if is_aligned || size_of::<T>() <= 8 {
285
assert!(
286
is_aligned,
287
"primitive type with size <= 8 bytes should have been aligned"
288
);
289
let bytes_ptr = bytes.as_ptr();
290
291
unsafe {
292
create_array(
293
data,
294
num_rows,
295
null_count,
296
[validity, Some(bytes_ptr)].into_iter(),
297
[].into_iter(),
298
None,
299
None,
300
)
301
}
302
} else {
303
let mut values = vec![P::default(); num_rows];
304
unsafe {
305
std::ptr::copy_nonoverlapping(
306
bytes.as_ptr(),
307
values.as_mut_ptr() as *mut u8,
308
bytes.len(),
309
)
310
};
311
// Now we need to keep the new buffer alive
312
let owned_data = Arc::new((
313
// We can drop the original ref if we don't have a validity
314
validity.and(Some(data)),
315
values,
316
));
317
let bytes_ptr = owned_data.1.as_ptr() as *mut u8;
318
319
unsafe {
320
create_array(
321
owned_data,
322
num_rows,
323
null_count,
324
[validity, Some(bytes_ptr)].into_iter(),
325
[].into_iter(),
326
None,
327
None,
328
)
329
}
330
};
331
332
Ok(out)
333
}
334
335
#[allow(clippy::too_many_arguments)]
336
fn mmap_list<O: Offset, T: AsRef<[u8]>>(
337
data: Arc<T>,
338
node: &Node,
339
block_offset: usize,
340
dtype: &ArrowDataType,
341
ipc_field: &IpcField,
342
dictionaries: &Dictionaries,
343
field_nodes: &mut VecDeque<Node>,
344
variadic_buffer_counts: &mut VecDeque<usize>,
345
buffers: &mut VecDeque<IpcBuffer>,
346
) -> PolarsResult<ArrowArray> {
347
let child = ListArray::<O>::try_get_child(dtype)?.dtype();
348
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
349
350
let data_ref = data.as_ref().as_ref();
351
352
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
353
354
let offsets = get_buffer::<O>(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr();
355
356
let values = get_array(
357
data.clone(),
358
block_offset,
359
child,
360
&ipc_field.fields[0],
361
dictionaries,
362
field_nodes,
363
variadic_buffer_counts,
364
buffers,
365
)?;
366
367
// NOTE: offsets and values invariants are _not_ validated
368
Ok(unsafe {
369
create_array(
370
data,
371
num_rows,
372
null_count,
373
[validity, Some(offsets)].into_iter(),
374
[values].into_iter(),
375
None,
376
None,
377
)
378
})
379
}
380
381
#[allow(clippy::too_many_arguments)]
382
fn mmap_fixed_size_list<T: AsRef<[u8]>>(
383
data: Arc<T>,
384
node: &Node,
385
block_offset: usize,
386
dtype: &ArrowDataType,
387
ipc_field: &IpcField,
388
dictionaries: &Dictionaries,
389
field_nodes: &mut VecDeque<Node>,
390
variadic_buffer_counts: &mut VecDeque<usize>,
391
buffers: &mut VecDeque<IpcBuffer>,
392
) -> PolarsResult<ArrowArray> {
393
let child = FixedSizeListArray::try_child_and_size(dtype)?.0.dtype();
394
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
395
396
let data_ref = data.as_ref().as_ref();
397
398
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
399
400
let values = get_array(
401
data.clone(),
402
block_offset,
403
child,
404
&ipc_field.fields[0],
405
dictionaries,
406
field_nodes,
407
variadic_buffer_counts,
408
buffers,
409
)?;
410
411
Ok(unsafe {
412
create_array(
413
data,
414
num_rows,
415
null_count,
416
[validity].into_iter(),
417
[values].into_iter(),
418
None,
419
None,
420
)
421
})
422
}
423
424
#[allow(clippy::too_many_arguments)]
425
fn mmap_struct<T: AsRef<[u8]>>(
426
data: Arc<T>,
427
node: &Node,
428
block_offset: usize,
429
dtype: &ArrowDataType,
430
ipc_field: &IpcField,
431
dictionaries: &Dictionaries,
432
field_nodes: &mut VecDeque<Node>,
433
variadic_buffer_counts: &mut VecDeque<usize>,
434
buffers: &mut VecDeque<IpcBuffer>,
435
) -> PolarsResult<ArrowArray> {
436
let children = StructArray::try_get_fields(dtype)?;
437
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
438
439
let data_ref = data.as_ref().as_ref();
440
441
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
442
443
let values = children
444
.iter()
445
.map(|f| &f.dtype)
446
.zip(ipc_field.fields.iter())
447
.map(|(child, ipc)| {
448
get_array(
449
data.clone(),
450
block_offset,
451
child,
452
ipc,
453
dictionaries,
454
field_nodes,
455
variadic_buffer_counts,
456
buffers,
457
)
458
})
459
.collect::<PolarsResult<Vec<_>>>()?;
460
461
Ok(unsafe {
462
create_array(
463
data,
464
num_rows,
465
null_count,
466
[validity].into_iter(),
467
values.into_iter(),
468
None,
469
None,
470
)
471
})
472
}
473
474
#[allow(clippy::too_many_arguments)]
475
fn mmap_dict<K: DictionaryKey, T: AsRef<[u8]>>(
476
data: Arc<T>,
477
node: &Node,
478
block_offset: usize,
479
_: &ArrowDataType,
480
ipc_field: &IpcField,
481
dictionaries: &Dictionaries,
482
_: &mut VecDeque<Node>,
483
buffers: &mut VecDeque<IpcBuffer>,
484
) -> PolarsResult<ArrowArray> {
485
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
486
487
let data_ref = data.as_ref().as_ref();
488
489
let dictionary = dictionaries
490
.get(&ipc_field.dictionary_id.unwrap())
491
.ok_or_else(|| polars_err!(ComputeError: "out-of-spec: missing dictionary"))?
492
.clone();
493
494
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
495
496
let values = get_buffer::<K>(data_ref, block_offset, buffers, num_rows)?.as_ptr();
497
498
Ok(unsafe {
499
create_array(
500
data,
501
num_rows,
502
null_count,
503
[validity, Some(values)].into_iter(),
504
[].into_iter(),
505
Some(export_array_to_c(dictionary)),
506
None,
507
)
508
})
509
}
510
511
#[allow(clippy::too_many_arguments)]
512
fn get_array<T: AsRef<[u8]>>(
513
data: Arc<T>,
514
block_offset: usize,
515
dtype: &ArrowDataType,
516
ipc_field: &IpcField,
517
dictionaries: &Dictionaries,
518
field_nodes: &mut VecDeque<Node>,
519
variadic_buffer_counts: &mut VecDeque<usize>,
520
buffers: &mut VecDeque<IpcBuffer>,
521
) -> PolarsResult<ArrowArray> {
522
use crate::datatypes::PhysicalType::*;
523
let node = field_nodes.pop_front().ok_or_else(
524
|| polars_err!(ComputeError: "out-of-spec: {:?}", OutOfSpecKind::ExpectedBuffer),
525
)?;
526
527
match dtype.to_physical_type() {
528
Null => mmap_null(data, &node, block_offset, buffers),
529
Boolean => mmap_boolean(data, &node, block_offset, buffers),
530
Primitive(p) => with_match_primitive_type_full!(p, |$T| {
531
mmap_primitive::<$T, _>(data, &node, block_offset, buffers)
532
}),
533
Utf8 | Binary => mmap_binary::<i32, _>(data, &node, block_offset, buffers),
534
Utf8View | BinaryView => {
535
mmap_binview(data, &node, block_offset, buffers, variadic_buffer_counts)
536
},
537
FixedSizeBinary => mmap_fixed_size_binary(data, &node, block_offset, buffers, dtype),
538
LargeBinary | LargeUtf8 => mmap_binary::<i64, _>(data, &node, block_offset, buffers),
539
List => mmap_list::<i32, _>(
540
data,
541
&node,
542
block_offset,
543
dtype,
544
ipc_field,
545
dictionaries,
546
field_nodes,
547
variadic_buffer_counts,
548
buffers,
549
),
550
LargeList => mmap_list::<i64, _>(
551
data,
552
&node,
553
block_offset,
554
dtype,
555
ipc_field,
556
dictionaries,
557
field_nodes,
558
variadic_buffer_counts,
559
buffers,
560
),
561
FixedSizeList => mmap_fixed_size_list(
562
data,
563
&node,
564
block_offset,
565
dtype,
566
ipc_field,
567
dictionaries,
568
field_nodes,
569
variadic_buffer_counts,
570
buffers,
571
),
572
Struct => mmap_struct(
573
data,
574
&node,
575
block_offset,
576
dtype,
577
ipc_field,
578
dictionaries,
579
field_nodes,
580
variadic_buffer_counts,
581
buffers,
582
),
583
Dictionary(key_type) => match_integer_type!(key_type, |$T| {
584
mmap_dict::<$T, _>(
585
data,
586
&node,
587
block_offset,
588
dtype,
589
ipc_field,
590
dictionaries,
591
field_nodes,
592
buffers,
593
)
594
}),
595
_ => todo!(),
596
}
597
}
598
599
#[allow(clippy::too_many_arguments)]
600
/// Maps a memory region to an [`Array`].
601
pub(crate) unsafe fn mmap<T: AsRef<[u8]>>(
602
data: Arc<T>,
603
block_offset: usize,
604
dtype: ArrowDataType,
605
ipc_field: &IpcField,
606
dictionaries: &Dictionaries,
607
field_nodes: &mut VecDeque<Node>,
608
variadic_buffer_counts: &mut VecDeque<usize>,
609
buffers: &mut VecDeque<IpcBuffer>,
610
) -> PolarsResult<Box<dyn Array>> {
611
let array = get_array(
612
data,
613
block_offset,
614
&dtype,
615
ipc_field,
616
dictionaries,
617
field_nodes,
618
variadic_buffer_counts,
619
buffers,
620
)?;
621
// The unsafety comes from the fact that `array` is not necessarily valid -
622
// the IPC file may be corrupted (e.g. invalid offsets or non-utf8 data)
623
unsafe { try_from(InternalArrowArray::new(array, dtype)) }
624
}
625
626