Path: blob/main/crates/polars-stream/src/nodes/io_sources/ipc/mod.rs
8446 views
use std::io::Cursor;1use std::ops::Range;2use std::sync::Arc;34use arrow::io::ipc::read::{Dictionaries, read_dictionary_block};5use async_trait::async_trait;6use polars_core::prelude::DataType;7use polars_core::schema::{Schema, SchemaExt};8use polars_core::utils::arrow::io::ipc::read::{9BlockReader, FileMetadata, ProjectionInfo, prepare_projection, read_file_metadata,10};11use polars_error::{ErrString, PolarsError, PolarsResult};12use polars_io::cloud::CloudOptions;13use polars_io::ipc::IpcScanOptions;14use polars_io::pl_async;15use polars_io::utils::byte_source::{16BufferByteSource, ByteSource, DynByteSource, DynByteSourceBuilder,17};18use polars_io::utils::slice::SplitSlicePosition;19use polars_plan::dsl::ScanSource;20use polars_utils::IdxSize;21use polars_utils::bool::UnsafeBool;22use polars_utils::mem::prefetch::get_memory_prefetch_func;23use polars_utils::slice_enum::Slice;24use record_batch_data_fetch::RecordBatchDataFetcher;25use record_batch_decode::RecordBatchDecoder;2627use super::multi_scan::reader_interface::BeginReadArgs;28use super::multi_scan::reader_interface::output::FileReaderOutputRecv;29use crate::async_executor::{self, JoinHandle, TaskPriority};30use crate::async_primitives::wait_group::{WaitGroup, WaitToken};31use crate::metrics::OptIOMetrics;32use crate::morsel::{Morsel, MorselSeq, SourceToken, get_ideal_morsel_size};33use crate::nodes::io_sources::ipc::metadata::read_ipc_metadata_bytes;34use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;35use crate::nodes::io_sources::multi_scan::reader_interface::{36FileReader, FileReaderCallbacks, Projection,37};38use crate::nodes::io_sources::parquet::init::split_to_morsels;39use crate::utils::tokio_handle_ext::AbortOnDropHandle;4041pub mod builder;42mod metadata;43mod record_batch_data_fetch;44mod record_batch_decode;4546const ROW_COUNT_OVERFLOW_ERR: PolarsError = PolarsError::ComputeError(ErrString::new_static(47"\48IPC file produces more than 2^32 rows; \49consider compiling with polars-bigidx feature (pip install polars[rt64])",50));5152struct IpcFileReader {53scan_source: ScanSource,54cloud_options: Option<Arc<CloudOptions>>,55config: Arc<IpcScanOptions>,56metadata: Option<Arc<FileMetadata>>,57byte_source_builder: DynByteSourceBuilder,58record_batch_prefetch_sync: RecordBatchPrefetchSync,59io_metrics: OptIOMetrics,60verbose: bool,61init_data: Option<InitializedState>,62checked: UnsafeBool,63}6465struct RecordBatchPrefetchSync {66prefetch_limit: usize,67prefetch_semaphore: Arc<tokio::sync::Semaphore>,68shared_prefetch_wait_group_slot: Arc<std::sync::Mutex<Option<WaitGroup>>>,6970/// Waits for the previous reader to finish spawning prefetches.71prev_all_spawned: Option<WaitGroup>,72/// Dropped once the current reader has finished spawning prefetches.73current_all_spawned: Option<WaitToken>,74}7576#[derive(Clone)]77struct InitializedState {78file_metadata: Arc<FileMetadata>,79byte_source: Arc<DynByteSource>,80dictionaries: Arc<Option<Dictionaries>>,81}8283#[async_trait]84impl FileReader for IpcFileReader {85async fn initialize(&mut self) -> PolarsResult<()> {86if self.init_data.is_some() {87return Ok(());88}8990let verbose = self.verbose;91let scan_source = self.scan_source.clone();92let byte_source_builder = self.byte_source_builder.clone();93let cloud_options = self.cloud_options.clone();94let io_metrics = self.io_metrics.clone();9596let byte_source = pl_async::get_runtime()97.spawn(async move {98scan_source99.as_scan_source_ref()100.to_dyn_byte_source(101&byte_source_builder,102cloud_options.as_deref(),103io_metrics.0,104)105.await106})107.await108.unwrap()?;109110let mut byte_source = Arc::new(byte_source);111112let file_metadata = if let Some(v) = self.metadata.clone() {113v114} else {115let (metadata_bytes, opt_full_bytes) = {116let byte_source = byte_source.clone();117118pl_async::get_runtime()119.spawn(async move { read_ipc_metadata_bytes(&byte_source, verbose).await })120.await121.unwrap()?122};123124if let Some(full_bytes) = opt_full_bytes {125byte_source = Arc::new(DynByteSource::Buffer(BufferByteSource(full_bytes)));126}127128Arc::new(read_file_metadata(&mut std::io::Cursor::new(129metadata_bytes,130))?)131};132133let dictionaries = {134let byte_source_async = byte_source.clone();135let metadata_async = file_metadata.clone();136let checked = self.checked;137let dictionaries = pl_async::get_runtime()138.spawn(async move {139read_dictionaries(&byte_source_async, metadata_async, verbose, checked).await140})141.await142.unwrap()?;143Arc::new(Some(dictionaries))144};145146self.init_data = Some(InitializedState {147file_metadata,148byte_source,149dictionaries,150});151152Ok(())153}154155fn prepare_read(&mut self) -> PolarsResult<()> {156let wait_group_this_reader = WaitGroup::default();157let prefetch_all_spawned_token = wait_group_this_reader.token();158159let prev_wait_group: Option<WaitGroup> = self160.record_batch_prefetch_sync161.shared_prefetch_wait_group_slot162.try_lock()163.unwrap()164.replace(wait_group_this_reader);165166self.record_batch_prefetch_sync.prev_all_spawned = prev_wait_group;167self.record_batch_prefetch_sync.current_all_spawned = Some(prefetch_all_spawned_token);168169Ok(())170}171172fn begin_read(173&mut self,174args: BeginReadArgs,175) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {176let verbose = self.verbose;177178// Initialize.179let InitializedState {180file_metadata,181byte_source,182dictionaries,183} = self.init_data.clone().unwrap();184185let BeginReadArgs {186projection: Projection::Plain(projected_schema),187row_index,188pre_slice: pre_slice_arg,189predicate: None,190cast_columns_policy: _,191num_pipelines,192disable_morsel_split,193callbacks:194FileReaderCallbacks {195file_schema_tx,196n_rows_in_file_tx,197row_position_on_end_tx,198},199} = args200else {201panic!("unsupported args: {:?}", &args)202};203204debug_assert!(!matches!(pre_slice_arg, Some(Slice::Negative { .. })));205206let file_schema_pl = std::cell::LazyCell::new(|| {207Arc::new(Schema::from_arrow_schema(file_metadata.schema.as_ref()))208});209210// Handle callbacks that are ready now.211if let Some(file_schema_tx) = file_schema_tx {212_ = file_schema_tx.send(file_schema_pl.clone());213}214215// @NOTE. Negative slicing takes a 2-pass approach. The first pass gets the total row count216// by setting slice.len to 0, and uses this to convert the negative slice into a positive slice.217// The second pass uses the positive slice to fetch and slice the data.218let fetch_metadata_only = pre_slice_arg.as_ref().is_some_and(|x| x.len() == 0);219220// Always create a slice. If no slice was given, just make the biggest slice possible.221let slice_range: Range<usize> = pre_slice_arg222.clone()223.map_or(0..usize::MAX, Range::<usize>::from);224let n_rows_limit = if pre_slice_arg.is_some() {225Some(slice_range.end)226} else {227None228};229230// Avoid materializing projection info if we are projecting all the columns of this file.231let projection_indices: Option<Vec<usize>> = if let Some(first_mismatch_idx) =232(0..file_metadata.schema.len().min(projected_schema.len())).find(|&i| {233file_metadata.schema.get_at_index(i).unwrap().0234!= projected_schema.get_at_index(i).unwrap().0235}) {236let mut out = Vec::with_capacity(file_metadata.schema.len());237238out.extend(0..first_mismatch_idx);239240out.extend(241(first_mismatch_idx..projected_schema.len()).filter_map(|i| {242file_metadata243.schema244.index_of(projected_schema.get_at_index(i).unwrap().0)245}),246);247248Some(out)249} else if file_metadata.schema.len() > projected_schema.len() {250// Names match up to projected schema len.251Some((0..projected_schema.len()).collect::<Vec<_>>())252} else {253// Name order matches up to `file_metadata.schema.len()`, we are projecting all columns254// in this file.255None256};257258// Unstable.259let read_statistics_flags = self.config.record_batch_statistics;260261if verbose {262eprintln!(263"[IpcFileReader]: \264project: {} / {}, \265pre_slice: {:?}, \266read_record_batch_statistics_flags: {}\267",268projection_indices269.as_ref()270.map_or(file_metadata.schema.len(), |x| x.len()),271file_metadata.schema.len(),272pre_slice_arg,273read_statistics_flags274)275}276277let projection_info: Option<ProjectionInfo> =278projection_indices.map(|indices| prepare_projection(&file_metadata.schema, indices));279let projection_info = Arc::new(projection_info);280281let schema = projection_info.as_ref().as_ref().map_or(282file_metadata.schema.as_ref(),283|ProjectionInfo { schema, .. }| schema,284);285let pl_schema = Arc::new(286schema287.iter()288.map(|(n, f)| (n.clone(), DataType::from_arrow_field(f)))289.collect::<Schema>(),290);291292// Prepare parameters for Prefetch293let memory_prefetch_func = get_memory_prefetch_func(verbose);294295let record_batch_prefetch_size = self296.record_batch_prefetch_sync297.prefetch_limit298.min(file_metadata.blocks.len())299.max(1);300301let io_runtime = polars_io::pl_async::get_runtime();302let ideal_morsel_size = get_ideal_morsel_size();303304if verbose {305eprintln!(306"[IpcFileReader]: num_pipelines: {num_pipelines}, record_batch_prefetch_size: {record_batch_prefetch_size}, ideal_morsel_size: {ideal_morsel_size}"307);308eprintln!(309"[IpcFileReader]: record batch count: {:?}",310file_metadata.blocks.len()311);312}313314let record_batch_decoder = Arc::new(RecordBatchDecoder {315file_metadata: file_metadata.clone(),316pl_schema,317projection_info,318dictionaries: dictionaries.clone(),319row_index,320read_statistics_flags,321checked: self.checked,322});323324// Set up channels.325let (prefetch_send, mut prefetch_recv) =326tokio::sync::mpsc::channel(record_batch_prefetch_size);327let (decode_send, mut decode_recv) = tokio::sync::mpsc::channel(num_pipelines);328let (mut morsel_send, morsel_recv) = FileReaderOutputSend::new_serial();329330let rb_prefetch_semaphore = Arc::clone(&self.record_batch_prefetch_sync.prefetch_semaphore);331let rb_prefetch_prev_all_spawned =332Option::take(&mut self.record_batch_prefetch_sync.prev_all_spawned);333let rb_prefetch_current_all_spawned =334Option::take(&mut self.record_batch_prefetch_sync.current_all_spawned);335336// Task: Prefetch.337let byte_source = byte_source.clone();338let metadata = file_metadata.clone();339let prefetch_task = AbortOnDropHandle(io_runtime.spawn(async move {340let mut record_batch_data_fetcher = RecordBatchDataFetcher {341memory_prefetch_func,342metadata,343byte_source,344record_batch_idx: 0,345fetch_metadata_only,346n_rows_limit,347n_rows_in_file_tx,348row_position_on_end_tx,349prefetch_send,350rb_prefetch_semaphore,351rb_prefetch_current_all_spawned,352};353354if let Some(rb_prefetch_prev_all_spawned) = rb_prefetch_prev_all_spawned {355rb_prefetch_prev_all_spawned.wait().await;356}357358record_batch_data_fetcher.run().await?;359360PolarsResult::Ok(())361}));362363// Task: Decode.364let decode_task = AbortOnDropHandle(io_runtime.spawn(async move {365let mut current_row_offset: IdxSize = 0;366367while let Some((prefetch_task, permit)) = prefetch_recv.recv().await {368let mut record_batch_data = prefetch_task.await.unwrap()?;369record_batch_data.row_offset = Some(current_row_offset);370371// Fetch every record batch so we can track the total row count.372let rb_num_rows = record_batch_data.num_rows;373let rb_num_rows =374IdxSize::try_from(rb_num_rows).map_err(|_| ROW_COUNT_OVERFLOW_ERR)?;375376// Only pass to decoder if we need the data.377let record_batch_position = SplitSlicePosition::split_slice_at_file(378current_row_offset as usize,379rb_num_rows as usize,380slice_range.clone(),381);382383current_row_offset = current_row_offset384.checked_add(rb_num_rows)385.ok_or(ROW_COUNT_OVERFLOW_ERR)?;386387match record_batch_position {388SplitSlicePosition::Before => continue,389SplitSlicePosition::Overlapping(rows_offset, rows_len) => {390let record_batch_decoder = record_batch_decoder.clone();391let decode_fut = async_executor::spawn(TaskPriority::High, async move {392record_batch_decoder393.record_batch_data_to_df(record_batch_data, rows_offset, rows_len)394.await395});396if decode_send.send((decode_fut, permit)).await.is_err() {397break;398}399},400SplitSlicePosition::After => break,401};402}403404PolarsResult::Ok(())405}));406407// Task: Distributor.408// Distributes morsels across pipelines. This does not perform any CPU or I/O bound work -409// it is purely a dispatch loop. Run on the computational executor to reduce context switches.410let last_morsel_min_split = num_pipelines;411let distribute_task = async_executor::spawn(TaskPriority::High, async move {412let mut morsel_seq = MorselSeq::default();413// Note: We don't use this (it is handled by the bridge). But morsels require a source token.414let source_token = SourceToken::new();415416// Decode first non-empty morsel.417let mut next = None;418loop {419let Some((decode_fut, permit)) = decode_recv.recv().await else {420break;421};422let df = decode_fut.await?;423if df.height() == 0 {424continue;425}426427if disable_morsel_split {428if morsel_send429.send_morsel(Morsel::new(df, morsel_seq, source_token.clone()))430.await431.is_err()432{433return Ok(());434}435drop(permit);436morsel_seq = morsel_seq.successor();437continue;438}439440next = Some((df, permit));441break;442}443444while let Some((df, permit)) = next.take() {445// Try to decode the next non-empty morsel first, so we know446// whether the df is the last morsel.447448// Important: Drop this before awaiting the next one, or could449// deadlock if the permit limit is 1.450drop(permit);451loop {452let Some((decode_fut, permit)) = decode_recv.recv().await else {453break;454};455let next_df = decode_fut.await?;456if next_df.height() == 0 {457continue;458}459next = Some((next_df, permit));460break;461}462463for df in split_to_morsels(464&df,465ideal_morsel_size,466next.is_none(),467last_morsel_min_split,468) {469if morsel_send470.send_morsel(Morsel::new(df, morsel_seq, source_token.clone()))471.await472.is_err()473{474return Ok(());475}476morsel_seq = morsel_seq.successor();477}478}479PolarsResult::Ok(())480});481482// Orchestration.483let join_task = io_runtime.spawn(async move {484prefetch_task.await.unwrap()?;485decode_task.await.unwrap()?;486distribute_task.await?;487Ok(())488});489490let handle = AbortOnDropHandle(join_task);491492Ok((493morsel_recv,494async_executor::spawn(TaskPriority::Low, async move { handle.await.unwrap() }),495))496}497}498499async fn read_dictionaries(500byte_source: &DynByteSource,501file_metadata: Arc<FileMetadata>,502verbose: bool,503checked: UnsafeBool,504) -> PolarsResult<Dictionaries> {505let blocks = if let Some(blocks) = &file_metadata.dictionaries {506blocks507} else {508return Ok(Dictionaries::default());509};510511if verbose {512eprintln!("[IpcFileReader]: reading dictionaries ({:?})", blocks.len());513}514515let mut dictionaries = Dictionaries::default();516517let mut message_scratch = Vec::new();518let mut dictionary_scratch = Vec::new();519520for block in blocks {521let range = block.offset as usize522..block.offset as usize + block.meta_data_length as usize + block.body_length as usize;523let bytes = byte_source.get_range(range).await?;524525let mut reader = BlockReader::new(Cursor::new(bytes.as_ref()));526527read_dictionary_block(528&mut reader.reader,529file_metadata.as_ref(),530block,531true,532&mut dictionaries,533&mut message_scratch,534&mut dictionary_scratch,535checked,536)?;537}538539Ok(dictionaries)540}541542543