Path: blob/main/crates/polars-io/src/parquet/read/async_impl.rs
8448 views
//! Read parquet files in parallel from the Object Store without a third party crate.12use arrow::datatypes::ArrowSchemaRef;3use object_store::path::Path as ObjectPath;4use polars_core::prelude::*;5use polars_parquet::write::FileMetadata;6use polars_utils::pl_path::PlRefPath;78use crate::cloud::{9CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,10};11use crate::parquet::metadata::FileMetadataRef;1213pub struct ParquetObjectStore {14store: PolarsObjectStore,15path: ObjectPath,16length: Option<usize>,17metadata: Option<FileMetadataRef>,18schema: Option<ArrowSchemaRef>,19}2021impl ParquetObjectStore {22pub async fn from_uri(23uri: PlRefPath,24options: Option<&CloudOptions>,25metadata: Option<FileMetadataRef>,26) -> PolarsResult<Self> {27let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options, false).await?;28let path = object_path_from_str(&prefix)?;2930Ok(ParquetObjectStore {31store,32path,33length: None,34metadata,35schema: None,36})37}3839/// Initialize the length property of the object, unless it has already been fetched.40async fn length(&mut self) -> PolarsResult<usize> {41if self.length.is_none() {42self.length = Some(self.store.head(&self.path).await?.size as usize);43}44Ok(self.length.unwrap())45}4647/// Number of rows in the parquet file.48pub async fn num_rows(&mut self) -> PolarsResult<usize> {49let metadata = self.get_metadata().await?;50Ok(metadata.num_rows)51}5253/// Fetch the metadata of the parquet file, do not memoize it.54async fn fetch_metadata(&mut self) -> PolarsResult<FileMetadata> {55let length = self.length().await?;56fetch_metadata(&self.store, &self.path, length).await57}5859/// Fetch and memoize the metadata of the parquet file.60pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {61if self.metadata.is_none() {62self.metadata = Some(Arc::new(self.fetch_metadata().await?));63}64Ok(self.metadata.as_ref().unwrap())65}6667pub async fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {68self.schema = Some(match self.schema.as_ref() {69Some(schema) => Arc::clone(schema),70None => {71let metadata = self.get_metadata().await?;72let arrow_schema = polars_parquet::arrow::read::infer_schema(metadata)?;73Arc::new(arrow_schema)74},75});7677Ok(self.schema.clone().unwrap())78}79}8081fn read_n<const N: usize>(reader: &mut &[u8]) -> Option<[u8; N]> {82if N <= reader.len() {83let (head, tail) = reader.split_at(N);84*reader = tail;85Some(head.try_into().unwrap())86} else {87None88}89}9091fn read_i32le(reader: &mut &[u8]) -> Option<i32> {92read_n(reader).map(i32::from_le_bytes)93}9495/// Asynchronously reads the files' metadata96pub async fn fetch_metadata(97store: &PolarsObjectStore,98path: &ObjectPath,99file_byte_length: usize,100) -> PolarsResult<FileMetadata> {101let footer_header_bytes = store102.get_range(103path,104file_byte_length105.checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize)106.ok_or_else(|| {107polars_parquet::parquet::error::ParquetError::OutOfSpec(108"not enough bytes to contain parquet footer".to_string(),109)110})?..file_byte_length,111)112.await?;113114let footer_byte_length: usize = {115let reader = &mut footer_header_bytes.as_ref();116let footer_byte_size = read_i32le(reader).unwrap();117let magic = read_n(reader).unwrap();118debug_assert!(reader.is_empty());119if magic != polars_parquet::parquet::PARQUET_MAGIC {120return Err(polars_parquet::parquet::error::ParquetError::OutOfSpec(121"incorrect magic in parquet footer".to_string(),122)123.into());124}125footer_byte_size.try_into().map_err(|_| {126polars_parquet::parquet::error::ParquetError::OutOfSpec(127"negative footer byte length".to_string(),128)129})?130};131132let footer_bytes = store133.get_range(134path,135file_byte_length136.checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize + footer_byte_length)137.ok_or_else(|| {138polars_parquet::parquet::error::ParquetError::OutOfSpec(139"not enough bytes to contain parquet footer".to_string(),140)141})?..file_byte_length,142)143.await?;144145Ok(polars_parquet::parquet::read::deserialize_metadata(146std::io::Cursor::new(footer_bytes.as_ref()),147// TODO: Describe why this makes sense. Taken from the previous148// implementation which said "a highly nested but sparse struct could149// result in many allocations".150footer_bytes.as_ref().len() * 2 + 1024,151)?)152}153154155