Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/mmap/array.rs
8412 views
1
use std::collections::VecDeque;
2
use std::sync::Arc;
3
4
use polars_error::{PolarsResult, polars_bail, polars_err};
5
6
use crate::array::{Array, DictionaryKey, FixedSizeListArray, ListArray, StructArray, View};
7
use crate::datatypes::ArrowDataType;
8
use crate::ffi::mmap::create_array;
9
use crate::ffi::{ArrowArray, InternalArrowArray, export_array_to_c, try_from};
10
use crate::io::ipc::IpcField;
11
use crate::io::ipc::read::{Dictionaries, IpcBuffer, Node, OutOfSpecKind};
12
use crate::offset::Offset;
13
use crate::types::NativeType;
14
use crate::{match_integer_type, with_match_primitive_type_full};
15
16
fn get_buffer_bounds(buffers: &mut VecDeque<IpcBuffer>) -> PolarsResult<(usize, usize)> {
17
let buffer = buffers.pop_front().ok_or_else(
18
|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::ExpectedBuffer),
19
)?;
20
21
let offset: usize = buffer.offset().try_into().map_err(
22
|_| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
23
)?;
24
25
let length: usize = buffer.length().try_into().map_err(
26
|_| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
27
)?;
28
29
Ok((offset, length))
30
}
31
32
/// Checks that the length of `bytes` is at least `size_of::<T>() * expected_len`, and
33
/// returns a boolean indicating whether it is aligned.
34
fn check_bytes_len_and_is_aligned<T: NativeType>(
35
bytes: &[u8],
36
expected_len: usize,
37
) -> PolarsResult<bool> {
38
if bytes.len() < size_of::<T>() * expected_len {
39
polars_bail!(ComputeError: "buffer's length is too small in mmap")
40
};
41
42
Ok(bytemuck::try_cast_slice::<_, T>(bytes).is_ok())
43
}
44
45
fn get_buffer<'a, T: NativeType>(
46
data: &'a [u8],
47
block_offset: usize,
48
buffers: &mut VecDeque<IpcBuffer>,
49
num_rows: usize,
50
) -> PolarsResult<&'a [u8]> {
51
let (offset, length) = get_buffer_bounds(buffers)?;
52
53
// verify that they are in-bounds
54
let values = data
55
.get(block_offset + offset..block_offset + offset + length)
56
.ok_or_else(|| polars_err!(ComputeError: "buffer out of bounds"))?;
57
58
if !check_bytes_len_and_is_aligned::<T>(values, num_rows)? {
59
polars_bail!(ComputeError: "buffer not aligned for mmap");
60
}
61
62
Ok(values)
63
}
64
65
fn get_bytes<'a>(
66
data: &'a [u8],
67
block_offset: usize,
68
buffers: &mut VecDeque<IpcBuffer>,
69
) -> PolarsResult<&'a [u8]> {
70
let (offset, length) = get_buffer_bounds(buffers)?;
71
72
// verify that they are in-bounds
73
data.get(block_offset + offset..block_offset + offset + length)
74
.ok_or_else(|| polars_err!(ComputeError: "buffer out of bounds"))
75
}
76
77
fn get_validity<'a>(
78
data: &'a [u8],
79
block_offset: usize,
80
buffers: &mut VecDeque<IpcBuffer>,
81
null_count: usize,
82
) -> PolarsResult<Option<&'a [u8]>> {
83
let validity = get_buffer_bounds(buffers)?;
84
let (offset, length) = validity;
85
86
Ok(if null_count > 0 {
87
// verify that they are in-bounds and get its pointer
88
Some(
89
data.get(block_offset + offset..block_offset + offset + length)
90
.ok_or_else(|| polars_err!(ComputeError: "buffer out of bounds"))?,
91
)
92
} else {
93
None
94
})
95
}
96
97
fn get_num_rows_and_null_count(node: &Node) -> PolarsResult<(usize, usize)> {
98
let num_rows: usize = node
99
.length()
100
.try_into()
101
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
102
103
let null_count: usize = node
104
.null_count()
105
.try_into()
106
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
107
Ok((num_rows, null_count))
108
}
109
110
fn mmap_binary<O: Offset, T: AsRef<[u8]> + Send + Sync + 'static>(
111
data: Arc<T>,
112
node: &Node,
113
block_offset: usize,
114
buffers: &mut VecDeque<IpcBuffer>,
115
) -> PolarsResult<ArrowArray> {
116
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
117
let data_ref = data.as_ref().as_ref();
118
119
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
120
121
let offsets = get_buffer::<O>(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr();
122
let values = get_buffer::<u8>(data_ref, block_offset, buffers, 0)?.as_ptr();
123
124
// NOTE: offsets and values invariants are _not_ validated
125
Ok(unsafe {
126
create_array(
127
data,
128
num_rows,
129
null_count,
130
[validity, Some(offsets), Some(values)].into_iter(),
131
[].into_iter(),
132
None,
133
None,
134
)
135
})
136
}
137
138
fn mmap_binview<T: AsRef<[u8]> + Send + Sync + 'static>(
139
data: Arc<T>,
140
node: &Node,
141
block_offset: usize,
142
buffers: &mut VecDeque<IpcBuffer>,
143
variadic_buffer_counts: &mut VecDeque<usize>,
144
) -> PolarsResult<ArrowArray> {
145
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
146
let data_ref = data.as_ref().as_ref();
147
148
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
149
150
let views = get_buffer::<View>(data_ref, block_offset, buffers, num_rows)?;
151
152
let n_variadic = variadic_buffer_counts
153
.pop_front()
154
.ok_or_else(|| polars_err!(ComputeError: "expected variadic_buffer_count"))?;
155
156
let mut buffer_ptrs = Vec::with_capacity(n_variadic + 2);
157
buffer_ptrs.push(validity);
158
buffer_ptrs.push(Some(views.as_ptr()));
159
160
let mut variadic_buffer_sizes = Vec::with_capacity(n_variadic);
161
for _ in 0..n_variadic {
162
let variadic_buffer = get_bytes(data_ref, block_offset, buffers)?;
163
variadic_buffer_sizes.push(variadic_buffer.len() as i64);
164
buffer_ptrs.push(Some(variadic_buffer.as_ptr()));
165
}
166
buffer_ptrs.push(Some(variadic_buffer_sizes.as_ptr().cast::<u8>()));
167
168
// Move variadic buffer sizes in an Arc, so that it stays alive.
169
let data = Arc::new((data, variadic_buffer_sizes));
170
171
// NOTE: invariants are not validated
172
Ok(unsafe {
173
create_array(
174
data,
175
num_rows,
176
null_count,
177
buffer_ptrs.into_iter(),
178
[].into_iter(),
179
None,
180
None,
181
)
182
})
183
}
184
185
fn mmap_fixed_size_binary<T: AsRef<[u8]> + Send + Sync + 'static>(
186
data: Arc<T>,
187
node: &Node,
188
block_offset: usize,
189
buffers: &mut VecDeque<IpcBuffer>,
190
dtype: &ArrowDataType,
191
) -> PolarsResult<ArrowArray> {
192
let bytes_per_row = if let ArrowDataType::FixedSizeBinary(bytes_per_row) = dtype {
193
bytes_per_row
194
} else {
195
polars_bail!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidDataType);
196
};
197
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
198
199
let data_ref = data.as_ref().as_ref();
200
201
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
202
let values =
203
get_buffer::<u8>(data_ref, block_offset, buffers, num_rows * bytes_per_row)?.as_ptr();
204
205
Ok(unsafe {
206
create_array(
207
data,
208
num_rows,
209
null_count,
210
[validity, Some(values)].into_iter(),
211
[].into_iter(),
212
None,
213
None,
214
)
215
})
216
}
217
218
fn mmap_null<T: AsRef<[u8]> + Send + Sync + 'static>(
219
data: Arc<T>,
220
node: &Node,
221
_block_offset: usize,
222
_buffers: &mut VecDeque<IpcBuffer>,
223
) -> PolarsResult<ArrowArray> {
224
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
225
226
Ok(unsafe {
227
create_array(
228
data,
229
num_rows,
230
null_count,
231
[].into_iter(),
232
[].into_iter(),
233
None,
234
None,
235
)
236
})
237
}
238
239
fn mmap_boolean<T: AsRef<[u8]> + Send + Sync + 'static>(
240
data: Arc<T>,
241
node: &Node,
242
block_offset: usize,
243
buffers: &mut VecDeque<IpcBuffer>,
244
) -> PolarsResult<ArrowArray> {
245
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
246
247
let data_ref = data.as_ref().as_ref();
248
249
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
250
251
let values = get_buffer_bounds(buffers)?;
252
let (offset, length) = values;
253
254
// verify that they are in-bounds and get its pointer
255
let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr();
256
257
Ok(unsafe {
258
create_array(
259
data,
260
num_rows,
261
null_count,
262
[validity, Some(values)].into_iter(),
263
[].into_iter(),
264
None,
265
None,
266
)
267
})
268
}
269
270
fn mmap_primitive<P: NativeType, T: AsRef<[u8]> + Send + Sync + 'static>(
271
data: Arc<T>,
272
node: &Node,
273
block_offset: usize,
274
buffers: &mut VecDeque<IpcBuffer>,
275
) -> PolarsResult<ArrowArray> {
276
let data_ref = data.as_ref().as_ref();
277
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
278
279
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
280
281
let bytes = get_bytes(data_ref, block_offset, buffers)?;
282
let is_aligned = check_bytes_len_and_is_aligned::<P>(bytes, num_rows)?;
283
284
let out = if is_aligned || size_of::<T>() <= 8 {
285
assert!(
286
is_aligned,
287
"primitive type with size <= 8 bytes should have been aligned"
288
);
289
let bytes_ptr = bytes.as_ptr();
290
291
unsafe {
292
create_array(
293
data,
294
num_rows,
295
null_count,
296
[validity, Some(bytes_ptr)].into_iter(),
297
[].into_iter(),
298
None,
299
None,
300
)
301
}
302
} else {
303
let mut values: Vec<P> = Vec::with_capacity(num_rows);
304
let num_bytes = num_rows * std::mem::size_of::<P>();
305
306
assert!(bytes.len() >= num_bytes);
307
unsafe {
308
std::ptr::copy_nonoverlapping(
309
bytes.as_ptr(),
310
values.as_mut_ptr() as *mut u8,
311
num_bytes,
312
);
313
314
values.set_len(num_rows);
315
};
316
317
// Now we need to keep the new buffer alive
318
let owned_data = Arc::new((
319
// We can drop the original ref if we don't have a validity
320
validity.and(Some(data)),
321
values,
322
));
323
let bytes_ptr = owned_data.1.as_ptr() as *mut u8;
324
325
unsafe {
326
create_array(
327
owned_data,
328
num_rows,
329
null_count,
330
[validity, Some(bytes_ptr)].into_iter(),
331
[].into_iter(),
332
None,
333
None,
334
)
335
}
336
};
337
338
Ok(out)
339
}
340
341
#[allow(clippy::too_many_arguments)]
342
fn mmap_list<O: Offset, T: AsRef<[u8]> + Send + Sync + 'static>(
343
data: Arc<T>,
344
node: &Node,
345
block_offset: usize,
346
dtype: &ArrowDataType,
347
ipc_field: &IpcField,
348
dictionaries: &Dictionaries,
349
field_nodes: &mut VecDeque<Node>,
350
variadic_buffer_counts: &mut VecDeque<usize>,
351
buffers: &mut VecDeque<IpcBuffer>,
352
) -> PolarsResult<ArrowArray> {
353
let child = ListArray::<O>::try_get_child(dtype)?.dtype();
354
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
355
356
let data_ref = data.as_ref().as_ref();
357
358
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
359
360
let offsets = get_buffer::<O>(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr();
361
362
let values = get_array(
363
data.clone(),
364
block_offset,
365
child,
366
&ipc_field.fields[0],
367
dictionaries,
368
field_nodes,
369
variadic_buffer_counts,
370
buffers,
371
)?;
372
373
// NOTE: offsets and values invariants are _not_ validated
374
Ok(unsafe {
375
create_array(
376
data,
377
num_rows,
378
null_count,
379
[validity, Some(offsets)].into_iter(),
380
[values].into_iter(),
381
None,
382
None,
383
)
384
})
385
}
386
387
#[allow(clippy::too_many_arguments)]
388
fn mmap_fixed_size_list<T: AsRef<[u8]> + Send + Sync + 'static>(
389
data: Arc<T>,
390
node: &Node,
391
block_offset: usize,
392
dtype: &ArrowDataType,
393
ipc_field: &IpcField,
394
dictionaries: &Dictionaries,
395
field_nodes: &mut VecDeque<Node>,
396
variadic_buffer_counts: &mut VecDeque<usize>,
397
buffers: &mut VecDeque<IpcBuffer>,
398
) -> PolarsResult<ArrowArray> {
399
let child = FixedSizeListArray::try_child_and_size(dtype)?.0.dtype();
400
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
401
402
let data_ref = data.as_ref().as_ref();
403
404
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
405
406
let values = get_array(
407
data.clone(),
408
block_offset,
409
child,
410
&ipc_field.fields[0],
411
dictionaries,
412
field_nodes,
413
variadic_buffer_counts,
414
buffers,
415
)?;
416
417
Ok(unsafe {
418
create_array(
419
data,
420
num_rows,
421
null_count,
422
[validity].into_iter(),
423
[values].into_iter(),
424
None,
425
None,
426
)
427
})
428
}
429
430
#[allow(clippy::too_many_arguments)]
431
fn mmap_struct<T: AsRef<[u8]> + Send + Sync + 'static>(
432
data: Arc<T>,
433
node: &Node,
434
block_offset: usize,
435
dtype: &ArrowDataType,
436
ipc_field: &IpcField,
437
dictionaries: &Dictionaries,
438
field_nodes: &mut VecDeque<Node>,
439
variadic_buffer_counts: &mut VecDeque<usize>,
440
buffers: &mut VecDeque<IpcBuffer>,
441
) -> PolarsResult<ArrowArray> {
442
let children = StructArray::try_get_fields(dtype)?;
443
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
444
445
let data_ref = data.as_ref().as_ref();
446
447
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
448
449
let values = children
450
.iter()
451
.map(|f| &f.dtype)
452
.zip(ipc_field.fields.iter())
453
.map(|(child, ipc)| {
454
get_array(
455
data.clone(),
456
block_offset,
457
child,
458
ipc,
459
dictionaries,
460
field_nodes,
461
variadic_buffer_counts,
462
buffers,
463
)
464
})
465
.collect::<PolarsResult<Vec<_>>>()?;
466
467
Ok(unsafe {
468
create_array(
469
data,
470
num_rows,
471
null_count,
472
[validity].into_iter(),
473
values.into_iter(),
474
None,
475
None,
476
)
477
})
478
}
479
480
#[allow(clippy::too_many_arguments)]
481
fn mmap_dict<K: DictionaryKey, T: AsRef<[u8]> + Send + Sync + 'static>(
482
data: Arc<T>,
483
node: &Node,
484
block_offset: usize,
485
_: &ArrowDataType,
486
ipc_field: &IpcField,
487
dictionaries: &Dictionaries,
488
_: &mut VecDeque<Node>,
489
buffers: &mut VecDeque<IpcBuffer>,
490
) -> PolarsResult<ArrowArray> {
491
let (num_rows, null_count) = get_num_rows_and_null_count(node)?;
492
493
let data_ref = data.as_ref().as_ref();
494
495
let dictionary = dictionaries
496
.get(&ipc_field.dictionary_id.unwrap())
497
.ok_or_else(|| polars_err!(ComputeError: "out-of-spec: missing dictionary"))?
498
.clone();
499
500
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());
501
502
let values = get_buffer::<K>(data_ref, block_offset, buffers, num_rows)?.as_ptr();
503
504
Ok(unsafe {
505
create_array(
506
data,
507
num_rows,
508
null_count,
509
[validity, Some(values)].into_iter(),
510
[].into_iter(),
511
Some(export_array_to_c(dictionary)),
512
None,
513
)
514
})
515
}
516
517
#[allow(clippy::too_many_arguments)]
518
fn get_array<T: AsRef<[u8]> + Send + Sync + 'static>(
519
data: Arc<T>,
520
block_offset: usize,
521
dtype: &ArrowDataType,
522
ipc_field: &IpcField,
523
dictionaries: &Dictionaries,
524
field_nodes: &mut VecDeque<Node>,
525
variadic_buffer_counts: &mut VecDeque<usize>,
526
buffers: &mut VecDeque<IpcBuffer>,
527
) -> PolarsResult<ArrowArray> {
528
use crate::datatypes::PhysicalType::*;
529
let node = field_nodes.pop_front().ok_or_else(
530
|| polars_err!(ComputeError: "out-of-spec: {:?}", OutOfSpecKind::ExpectedBuffer),
531
)?;
532
533
match dtype.to_physical_type() {
534
Null => mmap_null(data, &node, block_offset, buffers),
535
Boolean => mmap_boolean(data, &node, block_offset, buffers),
536
Primitive(p) => with_match_primitive_type_full!(p, |$T| {
537
mmap_primitive::<$T, _>(data, &node, block_offset, buffers)
538
}),
539
Utf8 | Binary => mmap_binary::<i32, _>(data, &node, block_offset, buffers),
540
Utf8View | BinaryView => {
541
mmap_binview(data, &node, block_offset, buffers, variadic_buffer_counts)
542
},
543
FixedSizeBinary => mmap_fixed_size_binary(data, &node, block_offset, buffers, dtype),
544
LargeBinary | LargeUtf8 => mmap_binary::<i64, _>(data, &node, block_offset, buffers),
545
List => mmap_list::<i32, _>(
546
data,
547
&node,
548
block_offset,
549
dtype,
550
ipc_field,
551
dictionaries,
552
field_nodes,
553
variadic_buffer_counts,
554
buffers,
555
),
556
LargeList => mmap_list::<i64, _>(
557
data,
558
&node,
559
block_offset,
560
dtype,
561
ipc_field,
562
dictionaries,
563
field_nodes,
564
variadic_buffer_counts,
565
buffers,
566
),
567
FixedSizeList => mmap_fixed_size_list(
568
data,
569
&node,
570
block_offset,
571
dtype,
572
ipc_field,
573
dictionaries,
574
field_nodes,
575
variadic_buffer_counts,
576
buffers,
577
),
578
Struct => mmap_struct(
579
data,
580
&node,
581
block_offset,
582
dtype,
583
ipc_field,
584
dictionaries,
585
field_nodes,
586
variadic_buffer_counts,
587
buffers,
588
),
589
Dictionary(key_type) => match_integer_type!(key_type, |$T| {
590
mmap_dict::<$T, _>(
591
data,
592
&node,
593
block_offset,
594
dtype,
595
ipc_field,
596
dictionaries,
597
field_nodes,
598
buffers,
599
)
600
}),
601
_ => todo!(),
602
}
603
}
604
605
#[allow(clippy::too_many_arguments)]
606
/// Maps a memory region to an [`Array`].
607
pub(crate) unsafe fn mmap<T: AsRef<[u8]> + Send + Sync + 'static>(
608
data: Arc<T>,
609
block_offset: usize,
610
dtype: ArrowDataType,
611
ipc_field: &IpcField,
612
dictionaries: &Dictionaries,
613
field_nodes: &mut VecDeque<Node>,
614
variadic_buffer_counts: &mut VecDeque<usize>,
615
buffers: &mut VecDeque<IpcBuffer>,
616
) -> PolarsResult<Box<dyn Array>> {
617
let array = get_array(
618
data,
619
block_offset,
620
&dtype,
621
ipc_field,
622
dictionaries,
623
field_nodes,
624
variadic_buffer_counts,
625
buffers,
626
)?;
627
// The unsafety comes from the fact that `array` is not necessarily valid -
628
// the IPC file may be corrupted (e.g. invalid offsets or non-utf8 data)
629
unsafe { try_from(InternalArrowArray::new(array, dtype)) }
630
}
631
632