Path: blob/main/crates/polars-io/src/ipc/ipc_reader_async.rs
6939 views
use std::sync::Arc;12use arrow::io::ipc::read::{FileMetadata, OutOfSpecKind, get_row_count};3use object_store::ObjectMeta;4use object_store::path::Path;5use polars_core::datatypes::IDX_DTYPE;6use polars_core::frame::DataFrame;7use polars_core::schema::{Schema, SchemaExt};8use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};9use polars_utils::mmap::MMapSemaphore;10use polars_utils::pl_str::PlSmallStr;1112use crate::RowIndex;13use crate::cloud::{14CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,15};16use crate::file_cache::{FileCacheEntry, init_entries_from_uri_list};17use crate::predicates::PhysicalIoExpr;18use crate::prelude::{IpcReader, materialize_projection};19use crate::shared::SerReader;2021/// An Arrow IPC reader implemented on top of PolarsObjectStore.22pub struct IpcReaderAsync {23store: PolarsObjectStore,24cache_entry: Arc<FileCacheEntry>,25path: Path,26}2728#[derive(Default, Clone)]29pub struct IpcReadOptions {30// Names of the columns to include in the output.31projection: Option<Arc<[PlSmallStr]>>,3233// The maximum number of rows to include in the output.34row_limit: Option<usize>,3536// Include a column with the row number under the provided name starting at the provided index.37row_index: Option<RowIndex>,3839// Only include rows that pass this predicate.40predicate: Option<Arc<dyn PhysicalIoExpr>>,41}4243impl IpcReadOptions {44pub fn with_projection(mut self, projection: Option<Arc<[PlSmallStr]>>) -> Self {45self.projection = projection;46self47}4849pub fn with_row_limit(mut self, row_limit: impl Into<Option<usize>>) -> Self {50self.row_limit = row_limit.into();51self52}5354pub fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {55self.row_index = row_index.into();56self57}5859pub fn with_predicate(mut self, predicate: impl Into<Option<Arc<dyn PhysicalIoExpr>>>) -> Self {60self.predicate = predicate.into();61self62}63}6465impl IpcReaderAsync {66pub async fn from_uri(67uri: &str,68cloud_options: Option<&CloudOptions>,69) -> PolarsResult<IpcReaderAsync> {70let cache_entry =71init_entries_from_uri_list([Arc::from(uri)].into_iter(), cloud_options)?[0].clone();72let (CloudLocation { prefix, .. }, store) =73build_object_store(uri, cloud_options, false).await?;7475let path = object_path_from_str(&prefix)?;7677Ok(Self {78store,79cache_entry,80path,81})82}8384async fn object_metadata(&self) -> PolarsResult<ObjectMeta> {85self.store.head(&self.path).await86}8788async fn file_size(&self) -> PolarsResult<usize> {89Ok(self.object_metadata().await?.size as usize)90}9192pub async fn metadata(&self) -> PolarsResult<FileMetadata> {93let file_size = self.file_size().await?;9495// TODO: Do a larger request and hope that the entire footer is contained within it to save one round-trip.96let footer_metadata =97self.store98.get_range(99&self.path,100file_size.checked_sub(FOOTER_METADATA_SIZE).ok_or_else(|| {101to_compute_err("ipc file size is smaller than the minimum")102})?..file_size,103)104.await?;105106let footer_size = deserialize_footer_metadata(107footer_metadata108.as_ref()109.try_into()110.map_err(to_compute_err)?,111)?;112113let footer = self114.store115.get_range(116&self.path,117file_size118.checked_sub(FOOTER_METADATA_SIZE + footer_size)119.ok_or_else(|| {120to_compute_err("invalid ipc footer metadata: footer size too large")121})?..file_size,122)123.await?;124125arrow::io::ipc::read::deserialize_footer(126footer.as_ref(),127footer_size.try_into().map_err(to_compute_err)?,128)129}130131pub async fn data(132&self,133metadata: Option<&FileMetadata>,134options: IpcReadOptions,135verbose: bool,136) -> PolarsResult<DataFrame> {137// TODO: Only download what is needed rather than the entire file by138// making use of the projection, row limit, predicate and such.139let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;140let bytes = MMapSemaphore::new_from_file(&file).unwrap();141142let projection = match options.projection.as_deref() {143Some(projection) => {144fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema {145if let Some(rc) = row_index {146let _ = schema.insert_at_index(0, rc.name.clone(), IDX_DTYPE);147}148schema149}150151// Retrieve the metadata for the schema so we can map column names to indices.152let fetched_metadata;153let metadata = if let Some(metadata) = metadata {154metadata155} else {156// This branch is happens when _metadata is None, which can happen if we Deserialize the execution plan.157fetched_metadata = self.metadata().await?;158&fetched_metadata159};160161let schema = prepare_schema(162Schema::from_arrow_schema(metadata.schema.as_ref()),163options.row_index.as_ref(),164);165166let hive_partitions = None;167168materialize_projection(169Some(projection),170&schema,171hive_partitions,172options.row_index.is_some(),173)174},175None => None,176};177178let reader = <IpcReader<_> as SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref()))179.with_row_index(options.row_index)180.with_n_rows(options.row_limit)181.with_projection(projection);182reader.finish_with_scan_ops(options.predicate, verbose)183}184185pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult<i64> {186// TODO: Only download what is needed rather than the entire file by187// making use of the projection, row limit, predicate and such.188let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;189let bytes = MMapSemaphore::new_from_file(&file).unwrap();190get_row_count(&mut std::io::Cursor::new(bytes.as_ref()))191}192}193194const FOOTER_METADATA_SIZE: usize = 10;195196// TODO: Move to polars-arrow and deduplicate parsing of footer metadata in197// sync and async readers.198fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult<usize> {199let footer_size: usize =200i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!()))201.try_into()202.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;203204if &bytes[4..] != b"ARROW1" {205polars_bail!(oos = OutOfSpecKind::InvalidFooter);206}207208Ok(footer_size)209}210211212