Path: blob/main/crates/polars-io/src/ipc/ipc_reader_async.rs
8424 views
use std::sync::Arc;12use arrow::io::ipc::read::{FileMetadata, OutOfSpecKind, get_row_count};3use object_store::ObjectMeta;4use object_store::path::Path;5use polars_core::datatypes::IDX_DTYPE;6use polars_core::frame::DataFrame;7use polars_core::schema::{Schema, SchemaExt};8use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};9use polars_utils::mmap::MMapSemaphore;10use polars_utils::pl_path::PlRefPath;11use polars_utils::pl_str::PlSmallStr;1213use crate::RowIndex;14use crate::cloud::{15CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,16};17use crate::file_cache::{FileCacheEntry, init_entries_from_uri_list};18use crate::predicates::PhysicalIoExpr;19use crate::prelude::{IpcReader, materialize_projection};20use crate::shared::SerReader;2122/// An Arrow IPC reader implemented on top of PolarsObjectStore.23pub struct IpcReaderAsync {24store: PolarsObjectStore,25cache_entry: Arc<FileCacheEntry>,26path: Path,27}2829#[derive(Default, Clone)]30pub struct IpcReadOptions {31// Names of the columns to include in the output.32projection: Option<Arc<[PlSmallStr]>>,3334// The maximum number of rows to include in the output.35row_limit: Option<usize>,3637// Include a column with the row number under the provided name starting at the provided index.38row_index: Option<RowIndex>,3940// Only include rows that pass this predicate.41predicate: Option<Arc<dyn PhysicalIoExpr>>,42}4344impl IpcReadOptions {45pub fn with_projection(mut self, projection: Option<Arc<[PlSmallStr]>>) -> Self {46self.projection = projection;47self48}4950pub fn with_row_limit(mut self, row_limit: impl Into<Option<usize>>) -> Self {51self.row_limit = row_limit.into();52self53}5455pub fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {56self.row_index = row_index.into();57self58}5960pub fn with_predicate(mut self, predicate: impl Into<Option<Arc<dyn PhysicalIoExpr>>>) -> Self {61self.predicate = predicate.into();62self63}64}6566impl IpcReaderAsync {67pub async fn from_uri(68uri: PlRefPath,69cloud_options: Option<&CloudOptions>,70) -> PolarsResult<IpcReaderAsync> {71let cache_entry =72init_entries_from_uri_list([uri.clone()].into_iter(), cloud_options).await?[0].clone();73let (CloudLocation { prefix, .. }, store) =74build_object_store(uri, cloud_options, false).await?;7576let path = object_path_from_str(&prefix)?;7778Ok(Self {79store,80cache_entry,81path,82})83}8485async fn object_metadata(&self) -> PolarsResult<ObjectMeta> {86self.store.head(&self.path).await87}8889async fn file_size(&self) -> PolarsResult<usize> {90Ok(self.object_metadata().await?.size as usize)91}9293pub async fn metadata(&self) -> PolarsResult<FileMetadata> {94let file_size = self.file_size().await?;9596// TODO: Do a larger request and hope that the entire footer is contained within it to save one round-trip.97let footer_metadata =98self.store99.get_range(100&self.path,101file_size.checked_sub(FOOTER_METADATA_SIZE).ok_or_else(|| {102to_compute_err("ipc file size is smaller than the minimum")103})?..file_size,104)105.await?;106107let footer_size = deserialize_footer_metadata(108footer_metadata109.as_ref()110.try_into()111.map_err(to_compute_err)?,112)?;113114let footer = self115.store116.get_range(117&self.path,118file_size119.checked_sub(FOOTER_METADATA_SIZE + footer_size)120.ok_or_else(|| {121to_compute_err("invalid ipc footer metadata: footer size too large")122})?..file_size,123)124.await?;125126arrow::io::ipc::read::deserialize_footer(127footer.as_ref(),128footer_size.try_into().map_err(to_compute_err)?,129)130}131132pub async fn data(133&self,134metadata: Option<&FileMetadata>,135options: IpcReadOptions,136verbose: bool,137) -> PolarsResult<DataFrame> {138// TODO: Only download what is needed rather than the entire file by139// making use of the projection, row limit, predicate and such.140let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;141let bytes = MMapSemaphore::new_from_file(&file).unwrap();142143let projection = match options.projection.as_deref() {144Some(projection) => {145fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema {146if let Some(rc) = row_index {147let _ = schema.insert_at_index(0, rc.name.clone(), IDX_DTYPE);148}149schema150}151152// Retrieve the metadata for the schema so we can map column names to indices.153let fetched_metadata;154let metadata = if let Some(metadata) = metadata {155metadata156} else {157// This branch is happens when _metadata is None, which can happen if we Deserialize the execution plan.158fetched_metadata = self.metadata().await?;159&fetched_metadata160};161162let schema = prepare_schema(163Schema::from_arrow_schema(metadata.schema.as_ref()),164options.row_index.as_ref(),165);166167let hive_partitions = None;168169materialize_projection(170Some(projection),171&schema,172hive_partitions,173options.row_index.is_some(),174)175},176None => None,177};178179let reader = <IpcReader<_> as SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref()))180.with_row_index(options.row_index)181.with_n_rows(options.row_limit)182.with_projection(projection);183reader.finish_with_scan_ops(options.predicate, verbose)184}185186pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult<i64> {187// TODO: Only download what is needed rather than the entire file by188// making use of the projection, row limit, predicate and such.189let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;190let bytes = MMapSemaphore::new_from_file(&file).unwrap();191get_row_count(&mut std::io::Cursor::new(bytes.as_ref()))192}193}194195const FOOTER_METADATA_SIZE: usize = 10;196197// TODO: Move to polars-arrow and deduplicate parsing of footer metadata in198// sync and async readers.199fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult<usize> {200let footer_size: usize =201i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!()))202.try_into()203.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;204205if &bytes[4..] != b"ARROW1" {206polars_bail!(oos = OutOfSpecKind::InvalidFooter);207}208209Ok(footer_size)210}211212213