Path: blob/main/crates/polars-io/src/parquet/read/async_impl.rs
6940 views
//! Read parquet files in parallel from the Object Store without a third party crate.12use arrow::datatypes::ArrowSchemaRef;3use object_store::path::Path as ObjectPath;4use polars_core::prelude::*;5use polars_parquet::write::FileMetadata;67use crate::cloud::{8CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,9};10use crate::parquet::metadata::FileMetadataRef;1112pub struct ParquetObjectStore {13store: PolarsObjectStore,14path: ObjectPath,15length: Option<usize>,16metadata: Option<FileMetadataRef>,17schema: Option<ArrowSchemaRef>,18}1920impl ParquetObjectStore {21pub async fn from_uri(22uri: &str,23options: Option<&CloudOptions>,24metadata: Option<FileMetadataRef>,25) -> PolarsResult<Self> {26let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options, false).await?;27let path = object_path_from_str(&prefix)?;2829Ok(ParquetObjectStore {30store,31path,32length: None,33metadata,34schema: None,35})36}3738/// Initialize the length property of the object, unless it has already been fetched.39async fn length(&mut self) -> PolarsResult<usize> {40if self.length.is_none() {41self.length = Some(self.store.head(&self.path).await?.size as usize);42}43Ok(self.length.unwrap())44}4546/// Number of rows in the parquet file.47pub async fn num_rows(&mut self) -> PolarsResult<usize> {48let metadata = self.get_metadata().await?;49Ok(metadata.num_rows)50}5152/// Fetch the metadata of the parquet file, do not memoize it.53async fn fetch_metadata(&mut self) -> PolarsResult<FileMetadata> {54let length = self.length().await?;55fetch_metadata(&self.store, &self.path, length).await56}5758/// Fetch and memoize the metadata of the parquet file.59pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {60if self.metadata.is_none() {61self.metadata = Some(Arc::new(self.fetch_metadata().await?));62}63Ok(self.metadata.as_ref().unwrap())64}6566pub async fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {67self.schema = Some(match self.schema.as_ref() {68Some(schema) => Arc::clone(schema),69None => {70let metadata = self.get_metadata().await?;71let arrow_schema = polars_parquet::arrow::read::infer_schema(metadata)?;72Arc::new(arrow_schema)73},74});7576Ok(self.schema.clone().unwrap())77}78}7980fn read_n<const N: usize>(reader: &mut &[u8]) -> Option<[u8; N]> {81if N <= reader.len() {82let (head, tail) = reader.split_at(N);83*reader = tail;84Some(head.try_into().unwrap())85} else {86None87}88}8990fn read_i32le(reader: &mut &[u8]) -> Option<i32> {91read_n(reader).map(i32::from_le_bytes)92}9394/// Asynchronously reads the files' metadata95pub async fn fetch_metadata(96store: &PolarsObjectStore,97path: &ObjectPath,98file_byte_length: usize,99) -> PolarsResult<FileMetadata> {100let footer_header_bytes = store101.get_range(102path,103file_byte_length104.checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize)105.ok_or_else(|| {106polars_parquet::parquet::error::ParquetError::OutOfSpec(107"not enough bytes to contain parquet footer".to_string(),108)109})?..file_byte_length,110)111.await?;112113let footer_byte_length: usize = {114let reader = &mut footer_header_bytes.as_ref();115let footer_byte_size = read_i32le(reader).unwrap();116let magic = read_n(reader).unwrap();117debug_assert!(reader.is_empty());118if magic != polars_parquet::parquet::PARQUET_MAGIC {119return Err(polars_parquet::parquet::error::ParquetError::OutOfSpec(120"incorrect magic in parquet footer".to_string(),121)122.into());123}124footer_byte_size.try_into().map_err(|_| {125polars_parquet::parquet::error::ParquetError::OutOfSpec(126"negative footer byte length".to_string(),127)128})?129};130131let footer_bytes = store132.get_range(133path,134file_byte_length135.checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize + footer_byte_length)136.ok_or_else(|| {137polars_parquet::parquet::error::ParquetError::OutOfSpec(138"not enough bytes to contain parquet footer".to_string(),139)140})?..file_byte_length,141)142.await?;143144Ok(polars_parquet::parquet::read::deserialize_metadata(145std::io::Cursor::new(footer_bytes.as_ref()),146// TODO: Describe why this makes sense. Taken from the previous147// implementation which said "a highly nested but sparse struct could148// result in many allocations".149footer_bytes.as_ref().len() * 2 + 1024,150)?)151}152153154