Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/parquet/read/mmap.rs
6940 views
1
use arrow::array::Array;
2
use arrow::bitmap::Bitmap;
3
use arrow::datatypes::Field;
4
use polars_error::PolarsResult;
5
use polars_parquet::read::{
6
BasicDecompressor, ColumnChunkMetadata, Filter, PageReader, column_iter_to_arrays,
7
};
8
use polars_utils::mmap::{MemReader, MemSlice};
9
10
/// Store columns data in two scenarios:
11
/// 1. a local memory mapped file
12
/// 2. data fetched from cloud storage on demand, in this case
13
/// a. the key in the hashmap is the start in the file
14
/// b. the value in the hashmap is the actual data.
15
///
16
/// For the fetched case we use a two phase approach:
17
/// a. identify all the needed columns
18
/// b. asynchronously fetch them in parallel, for example using object_store
19
/// c. store the data in this data structure
20
/// d. when all the data is available deserialize on multiple threads, for example using rayon
21
pub enum ColumnStore {
22
Local(MemSlice),
23
}
24
25
/// For local files memory maps all columns that are part of the parquet field `field_name`.
26
/// For cloud files the relevant memory regions should have been prefetched.
27
pub(super) fn mmap_columns<'a>(
28
store: &'a ColumnStore,
29
field_columns: &'a [&ColumnChunkMetadata],
30
) -> Vec<(&'a ColumnChunkMetadata, MemSlice)> {
31
field_columns
32
.iter()
33
.map(|meta| _mmap_single_column(store, meta))
34
.collect()
35
}
36
37
fn _mmap_single_column<'a>(
38
store: &'a ColumnStore,
39
meta: &'a ColumnChunkMetadata,
40
) -> (&'a ColumnChunkMetadata, MemSlice) {
41
let byte_range = meta.byte_range();
42
let chunk = match store {
43
ColumnStore::Local(mem_slice) => {
44
mem_slice.slice(byte_range.start as usize..byte_range.end as usize)
45
},
46
};
47
(meta, chunk)
48
}
49
50
// similar to arrow2 serializer, except this accepts a slice instead of a vec.
51
// this allows us to memory map
52
pub fn to_deserializer(
53
columns: Vec<(&ColumnChunkMetadata, MemSlice)>,
54
field: Field,
55
filter: Option<Filter>,
56
) -> PolarsResult<(Box<dyn Array>, Bitmap)> {
57
let (columns, types): (Vec<_>, Vec<_>) = columns
58
.into_iter()
59
.map(|(column_meta, chunk)| {
60
// Advise fetching the data for the column chunk
61
chunk.prefetch();
62
63
let pages = PageReader::new(MemReader::new(chunk), column_meta, vec![], usize::MAX);
64
(
65
BasicDecompressor::new(pages, vec![]),
66
&column_meta.descriptor().descriptor.primitive_type,
67
)
68
})
69
.unzip();
70
71
column_iter_to_arrays(columns, types, field, filter)
72
}
73
74