Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/parquet/read/mmap.rs
8512 views
1
use std::io::Cursor;
2
3
use arrow::array::Array;
4
use arrow::bitmap::Bitmap;
5
use arrow::datatypes::Field;
6
use polars_buffer::Buffer;
7
use polars_error::PolarsResult;
8
use polars_parquet::read::{
9
BasicDecompressor, ColumnChunkMetadata, Filter, PageReader, column_iter_to_arrays,
10
};
11
use polars_utils::mem::prefetch::prefetch_l2;
12
13
/// Store columns data in two scenarios:
14
/// 1. a local memory mapped file
15
/// 2. data fetched from cloud storage on demand, in this case
16
/// a. the key in the hashmap is the start in the file
17
/// b. the value in the hashmap is the actual data.
18
///
19
/// For the fetched case we use a two phase approach:
20
/// a. identify all the needed columns
21
/// b. asynchronously fetch them in parallel, for example using object_store
22
/// c. store the data in this data structure
23
/// d. when all the data is available deserialize on multiple threads, for example using rayon
24
pub enum ColumnStore {
25
Local(Buffer<u8>),
26
}
27
28
/// For local files memory maps all columns that are part of the parquet field `field_name`.
29
/// For cloud files the relevant memory regions should have been prefetched.
30
pub(super) fn mmap_columns<'a>(
31
store: &'a ColumnStore,
32
field_columns: &'a [&ColumnChunkMetadata],
33
) -> Vec<(&'a ColumnChunkMetadata, Buffer<u8>)> {
34
field_columns
35
.iter()
36
.map(|meta| _mmap_single_column(store, meta))
37
.collect()
38
}
39
40
fn _mmap_single_column<'a>(
41
store: &'a ColumnStore,
42
meta: &'a ColumnChunkMetadata,
43
) -> (&'a ColumnChunkMetadata, Buffer<u8>) {
44
let byte_range = meta.byte_range();
45
let chunk = match store {
46
ColumnStore::Local(mem_slice) => mem_slice
47
.clone()
48
.sliced(byte_range.start as usize..byte_range.end as usize),
49
};
50
(meta, chunk)
51
}
52
53
// similar to arrow2 serializer, except this accepts a slice instead of a vec.
54
// this allows us to memory map
55
pub fn to_deserializer(
56
columns: Vec<(&ColumnChunkMetadata, Buffer<u8>)>,
57
field: Field,
58
filter: Option<Filter>,
59
) -> PolarsResult<(Vec<Box<dyn Array>>, Bitmap)> {
60
let (columns, types): (Vec<_>, Vec<_>) = columns
61
.into_iter()
62
.map(|(column_meta, chunk)| {
63
// Advise fetching the data for the column chunk
64
prefetch_l2(&chunk);
65
66
let pages = PageReader::new(Cursor::new(chunk), column_meta, vec![], usize::MAX);
67
(
68
BasicDecompressor::new(pages, vec![]),
69
&column_meta.descriptor().descriptor.primitive_type,
70
)
71
})
72
.unzip();
73
74
column_iter_to_arrays(columns, types, field, filter)
75
}
76
77