Path: blob/main/crates/polars-io/src/parquet/read/mmap.rs
8512 views
use std::io::Cursor;12use arrow::array::Array;3use arrow::bitmap::Bitmap;4use arrow::datatypes::Field;5use polars_buffer::Buffer;6use polars_error::PolarsResult;7use polars_parquet::read::{8BasicDecompressor, ColumnChunkMetadata, Filter, PageReader, column_iter_to_arrays,9};10use polars_utils::mem::prefetch::prefetch_l2;1112/// Store columns data in two scenarios:13/// 1. a local memory mapped file14/// 2. data fetched from cloud storage on demand, in this case15/// a. the key in the hashmap is the start in the file16/// b. the value in the hashmap is the actual data.17///18/// For the fetched case we use a two phase approach:19/// a. identify all the needed columns20/// b. asynchronously fetch them in parallel, for example using object_store21/// c. store the data in this data structure22/// d. when all the data is available deserialize on multiple threads, for example using rayon23pub enum ColumnStore {24Local(Buffer<u8>),25}2627/// For local files memory maps all columns that are part of the parquet field `field_name`.28/// For cloud files the relevant memory regions should have been prefetched.29pub(super) fn mmap_columns<'a>(30store: &'a ColumnStore,31field_columns: &'a [&ColumnChunkMetadata],32) -> Vec<(&'a ColumnChunkMetadata, Buffer<u8>)> {33field_columns34.iter()35.map(|meta| _mmap_single_column(store, meta))36.collect()37}3839fn _mmap_single_column<'a>(40store: &'a ColumnStore,41meta: &'a ColumnChunkMetadata,42) -> (&'a ColumnChunkMetadata, Buffer<u8>) {43let byte_range = meta.byte_range();44let chunk = match store {45ColumnStore::Local(mem_slice) => mem_slice46.clone()47.sliced(byte_range.start as usize..byte_range.end as usize),48};49(meta, chunk)50}5152// similar to arrow2 serializer, except this accepts a slice instead of a vec.53// this allows us to memory map54pub fn to_deserializer(55columns: Vec<(&ColumnChunkMetadata, Buffer<u8>)>,56field: Field,57filter: Option<Filter>,58) -> PolarsResult<(Vec<Box<dyn Array>>, Bitmap)> {59let (columns, types): (Vec<_>, Vec<_>) = columns60.into_iter()61.map(|(column_meta, chunk)| {62// Advise fetching the data for the column chunk63prefetch_l2(&chunk);6465let pages = PageReader::new(Cursor::new(chunk), column_meta, vec![], usize::MAX);66(67BasicDecompressor::new(pages, vec![]),68&column_meta.descriptor().descriptor.primitive_type,69)70})71.unzip();7273column_iter_to_arrays(columns, types, field, filter)74}757677