Path: blob/main/crates/polars-io/src/parquet/read/mmap.rs
6940 views
use arrow::array::Array;1use arrow::bitmap::Bitmap;2use arrow::datatypes::Field;3use polars_error::PolarsResult;4use polars_parquet::read::{5BasicDecompressor, ColumnChunkMetadata, Filter, PageReader, column_iter_to_arrays,6};7use polars_utils::mmap::{MemReader, MemSlice};89/// Store columns data in two scenarios:10/// 1. a local memory mapped file11/// 2. data fetched from cloud storage on demand, in this case12/// a. the key in the hashmap is the start in the file13/// b. the value in the hashmap is the actual data.14///15/// For the fetched case we use a two phase approach:16/// a. identify all the needed columns17/// b. asynchronously fetch them in parallel, for example using object_store18/// c. store the data in this data structure19/// d. when all the data is available deserialize on multiple threads, for example using rayon20pub enum ColumnStore {21Local(MemSlice),22}2324/// For local files memory maps all columns that are part of the parquet field `field_name`.25/// For cloud files the relevant memory regions should have been prefetched.26pub(super) fn mmap_columns<'a>(27store: &'a ColumnStore,28field_columns: &'a [&ColumnChunkMetadata],29) -> Vec<(&'a ColumnChunkMetadata, MemSlice)> {30field_columns31.iter()32.map(|meta| _mmap_single_column(store, meta))33.collect()34}3536fn _mmap_single_column<'a>(37store: &'a ColumnStore,38meta: &'a ColumnChunkMetadata,39) -> (&'a ColumnChunkMetadata, MemSlice) {40let byte_range = meta.byte_range();41let chunk = match store {42ColumnStore::Local(mem_slice) => {43mem_slice.slice(byte_range.start as usize..byte_range.end as usize)44},45};46(meta, chunk)47}4849// similar to arrow2 serializer, except this accepts a slice instead of a vec.50// this allows us to memory map51pub fn to_deserializer(52columns: Vec<(&ColumnChunkMetadata, MemSlice)>,53field: Field,54filter: Option<Filter>,55) -> PolarsResult<(Box<dyn Array>, Bitmap)> {56let (columns, types): (Vec<_>, Vec<_>) = columns57.into_iter()58.map(|(column_meta, chunk)| {59// Advise fetching the data for the column chunk60chunk.prefetch();6162let pages = PageReader::new(MemReader::new(chunk), column_meta, vec![], usize::MAX);63(64BasicDecompressor::new(pages, vec![]),65&column_meta.descriptor().descriptor.primitive_type,66)67})68.unzip();6970column_iter_to_arrays(columns, types, field, filter)71}727374