Path: blob/main/crates/polars-stream/src/nodes/io_sources/ipc.rs
6939 views
use std::cmp::Reverse;1use std::io::Cursor;2use std::ops::Range;3use std::sync::Arc;45use arrow::array::TryExtend;6use async_trait::async_trait;7use polars_core::frame::DataFrame;8use polars_core::prelude::DataType;9use polars_core::schema::{Schema, SchemaExt};10use polars_core::utils::arrow::io::ipc::read::{11FileMetadata, ProjectionInfo, get_row_count_from_blocks, prepare_projection, read_file_metadata,12};13use polars_error::{ErrString, PolarsError, PolarsResult, polars_err};14use polars_io::RowIndex;15use polars_io::cloud::CloudOptions;16use polars_plan::dsl::{ScanSource, ScanSourceRef};17use polars_utils::IdxSize;18use polars_utils::mmap::MemSlice;19use polars_utils::priority::Priority;20use polars_utils::slice_enum::Slice;2122use super::multi_scan::reader_interface::output::FileReaderOutputRecv;23use super::multi_scan::reader_interface::{BeginReadArgs, calc_row_position_after_slice};24use crate::async_executor::{AbortOnDropHandle, JoinHandle, TaskPriority, spawn};25use crate::async_primitives::distributor_channel::distributor_channel;26use crate::async_primitives::linearizer::Linearizer;27use crate::morsel::{Morsel, MorselSeq, SourceToken, get_ideal_morsel_size};28use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;29use crate::nodes::io_sources::multi_scan::reader_interface::{30FileReader, FileReaderCallbacks, Projection,31};32use crate::{DEFAULT_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_LINEARIZER_BUFFER_SIZE};3334pub mod builder {35use std::sync::Arc;3637use arrow::io::ipc::read::FileMetadata;38use polars_core::config;39use polars_io::cloud::CloudOptions;40use polars_plan::dsl::ScanSource;4142use super::IpcFileReader;43use crate::nodes::io_sources::multi_scan::reader_interface::FileReader;44use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;45use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;4647#[derive(Debug)]48pub struct IpcReaderBuilder {49#[expect(unused)]50pub first_metadata: Option<Arc<FileMetadata>>,51}5253#[cfg(feature = "ipc")]54impl FileReaderBuilder for IpcReaderBuilder {55fn reader_name(&self) -> &str {56"ipc"57}5859fn reader_capabilities(&self) -> ReaderCapabilities {60use ReaderCapabilities as RC;6162RC::NEEDS_FILE_CACHE_INIT | RC::ROW_INDEX | RC::PRE_SLICE | RC::NEGATIVE_PRE_SLICE63}6465fn build_file_reader(66&self,67source: ScanSource,68cloud_options: Option<Arc<CloudOptions>>,69#[expect(unused)] scan_source_idx: usize,70) -> Box<dyn FileReader> {71let scan_source = source;72let verbose = config::verbose();7374// FIXME: For some reason the metadata does not match on idx == 0, and we end up with75// * ComputeError: out-of-spec: InvalidBuffersLength { buffers_size: 1508, file_size: 763 }76//77// let metadata: Option<Arc<FileMetadata>> = if scan_source_idx == 0 {78// self.first_metadata.clone()79// } else {80// None81// };82let metadata = None;8384let reader = IpcFileReader {85scan_source,86cloud_options,87metadata,88verbose,89init_data: None,90};9192Box::new(reader) as Box<dyn FileReader>93}94}95}9697const ROW_COUNT_OVERFLOW_ERR: PolarsError = PolarsError::ComputeError(ErrString::new_static(98"\99IPC file produces more than 2^32 rows; \100consider compiling with polars-bigidx feature (polars-u64-idx package on python)",101));102103struct IpcFileReader {104scan_source: ScanSource,105cloud_options: Option<Arc<CloudOptions>>,106metadata: Option<Arc<FileMetadata>>,107verbose: bool,108109init_data: Option<InitializedState>,110}111112#[derive(Clone)]113struct InitializedState {114memslice: MemSlice,115file_metadata: Arc<FileMetadata>,116// Lazily initialized - getting this involves iterating record batches.117n_rows_in_file: Option<IdxSize>,118}119120/// Move `slice` forward by `n` and return the slice until then.121fn slice_take(slice: &mut Range<usize>, n: usize) -> Range<usize> {122let offset = slice.start;123let length = slice.len();124125assert!(offset <= n);126127let chunk_length = (n - offset).min(length);128let rng = offset..offset + chunk_length;129*slice = 0..length - chunk_length;130131rng132}133134fn get_max_morsel_size() -> usize {135std::env::var("POLARS_STREAMING_IPC_SOURCE_MAX_MORSEL_SIZE")136.map_or_else(137|_| get_ideal_morsel_size(),138|v| {139v.parse::<usize>().expect(140"POLARS_STREAMING_IPC_SOURCE_MAX_MORSEL_SIZE does not contain valid size",141)142},143)144.max(1)145}146147#[async_trait]148impl FileReader for IpcFileReader {149async fn initialize(&mut self) -> PolarsResult<()> {150if self.init_data.is_some() {151return Ok(());152}153154// check_latest: IR resolution does not download IPC.155// TODO: Streaming reads156if let ScanSourceRef::Path(addr) = self.scan_source.as_scan_source_ref() {157polars_io::file_cache::init_entries_from_uri_list(158[Arc::from(addr.to_str())].into_iter(),159self.cloud_options.as_deref(),160)?;161}162163let memslice = self164.scan_source165.as_scan_source_ref()166.to_memslice_async_check_latest(self.scan_source.run_async())?;167168let file_metadata = if let Some(v) = self.metadata.clone() {169v170} else {171Arc::new(read_file_metadata(&mut std::io::Cursor::new(172memslice.as_ref(),173))?)174};175176self.init_data = Some(InitializedState {177memslice,178file_metadata,179n_rows_in_file: None,180});181182Ok(())183}184185fn begin_read(186&mut self,187args: BeginReadArgs,188) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {189let verbose = self.verbose;190191let InitializedState {192memslice,193file_metadata,194n_rows_in_file: _,195} = self.init_data.clone().unwrap();196197let BeginReadArgs {198projection: Projection::Plain(projected_schema),199row_index,200pre_slice: pre_slice_arg,201predicate: None,202cast_columns_policy: _,203num_pipelines,204callbacks:205FileReaderCallbacks {206file_schema_tx,207n_rows_in_file_tx,208row_position_on_end_tx,209},210} = args211else {212panic!("unsupported args: {:?}", &args)213};214215let file_schema_pl = std::cell::LazyCell::new(|| {216Arc::new(Schema::from_arrow_schema(file_metadata.schema.as_ref()))217});218219let normalized_pre_slice = if let Some(pre_slice) = pre_slice_arg.clone() {220Some(pre_slice.restrict_to_bounds(usize::try_from(self._n_rows_in_file()?).unwrap()))221} else {222None223};224225if let Some(mut n_rows_in_file_tx) = n_rows_in_file_tx {226_ = n_rows_in_file_tx.try_send(self._n_rows_in_file()?);227}228229if let Some(mut row_position_on_end_tx) = row_position_on_end_tx {230_ = row_position_on_end_tx231.try_send(self._row_position_after_slice(normalized_pre_slice.clone())?);232}233234if let Some(mut file_schema_tx) = file_schema_tx {235_ = file_schema_tx.try_send(file_schema_pl.clone());236}237238if normalized_pre_slice.as_ref().is_some_and(|x| x.len() == 0) {239let (_, rx) = FileReaderOutputSend::new_serial();240241if verbose {242eprintln!(243"[IpcFileReader]: early return: \244n_rows_in_file: {}, \245pre_slice: {:?}, \246resolved_pre_slice: {:?} \247",248self._n_rows_in_file()?,249pre_slice_arg,250normalized_pre_slice251)252}253254return Ok((rx, spawn(TaskPriority::Low, std::future::ready(Ok(())))));255}256257// Prepare parameters for tasks258259// Always create a slice. If no slice was given, just make the biggest slice possible.260let slice: Range<usize> = normalized_pre_slice261.clone()262.map_or(0..usize::MAX, Range::<usize>::from);263264// Avoid materializing projection info if we are projecting all the columns of this file.265let projection_indices: Option<Vec<usize>> = if let Some(first_mismatch_idx) =266(0..file_metadata.schema.len().min(projected_schema.len())).find(|&i| {267file_metadata.schema.get_at_index(i).unwrap().0268!= projected_schema.get_at_index(i).unwrap().0269}) {270let mut out = Vec::with_capacity(file_metadata.schema.len());271272out.extend(0..first_mismatch_idx);273274out.extend(275(first_mismatch_idx..projected_schema.len()).filter_map(|i| {276file_metadata277.schema278.index_of(projected_schema.get_at_index(i).unwrap().0)279}),280);281282Some(out)283} else if file_metadata.schema.len() > projected_schema.len() {284// Names match up to projected schema len.285Some((0..projected_schema.len()).collect::<Vec<_>>())286} else {287// Name order matches up to `file_metadata.schema.len()`, we are projecting all columns288// in this file.289None290};291292if verbose {293eprintln!(294"[IpcFileReader]: \295project: {} / {}, \296pre_slice: {:?}, \297resolved_pre_slice: {:?} \298",299projection_indices300.as_ref()301.map_or(file_metadata.schema.len(), |x| x.len()),302file_metadata.schema.len(),303pre_slice_arg,304normalized_pre_slice305)306}307308let projection_info: Option<ProjectionInfo> =309projection_indices.map(|indices| prepare_projection(&file_metadata.schema, indices));310311// Split size for morsels.312let max_morsel_size = get_max_morsel_size();313314let metadata = file_metadata;315316/// Messages sent from Walker task to Decoder tasks.317struct BatchMessage {318row_idx_offset: IdxSize,319slice: Range<usize>,320block_range: Range<usize>,321morsel_seq_base: u64,322}323324let (mut morsel_sender, morsel_rx) = FileReaderOutputSend::new_serial();325326// Walker task -> Decoder tasks.327let (mut batch_tx, batch_rxs) =328distributor_channel::<BatchMessage>(num_pipelines, *DEFAULT_DISTRIBUTOR_BUFFER_SIZE);329// Decoder tasks -> Distributor task.330let (mut decoded_rx, decoded_tx) =331Linearizer::<Priority<Reverse<MorselSeq>, DataFrame>>::new(332num_pipelines,333*DEFAULT_LINEARIZER_BUFFER_SIZE,334);335336// Explicitly linearize here to redistribute morsels from large record batches.337//338// If record batches in the source IPC file are large, one decoder might produce many339// morsels at the same time. At the same time, other decoders might not produce anything.340// Therefore, we would like to distribute the output of a single decoder task over the341// available output pipelines.342//343// Note, we can theoretically use `FileReaderOutputSend::parallel()` as it also linearizes344// internally, but this behavior is an implementation detail rather than a guarantee.345let distributor_handle = AbortOnDropHandle::new(spawn(TaskPriority::High, async move {346// Note: We don't use this (it is handled by the bridge). But morsels require a source token.347let source_token = SourceToken::new();348349while let Some(Priority(Reverse(seq), df)) = decoded_rx.get().await {350let morsel = Morsel::new(df, seq, source_token.clone());351352if morsel_sender.send_morsel(morsel).await.is_err() {353break;354}355}356357PolarsResult::Ok(())358}));359360// Decoder tasks.361//362// Tasks a IPC file and certain number of blocks and decodes each block as a record batch.363// Then, all record batches are concatenated into a DataFrame. If the resulting DataFrame364// is too large, which happens when we have one very large block, the DataFrame is split365// into smaller pieces an spread among the pipelines.366let decoder_handles = decoded_tx367.into_iter()368.zip(batch_rxs)369.map(|(mut send, mut rx)| {370let memslice = memslice.clone();371let metadata = metadata.clone();372let row_index = row_index.clone();373let projection_info = projection_info.clone();374AbortOnDropHandle::new(spawn(TaskPriority::Low, async move {375// Amortize allocations.376let mut data_scratch = Vec::new();377let mut message_scratch = Vec::new();378379let schema = projection_info.as_ref().map_or(380metadata.schema.as_ref(),381|ProjectionInfo { schema, .. }| schema,382);383let pl_schema = schema384.iter()385.map(|(n, f)| (n.clone(), DataType::from_arrow_field(f)))386.collect::<Schema>();387388while let Ok(m) = rx.recv().await {389let BatchMessage {390row_idx_offset,391slice,392morsel_seq_base,393block_range,394} = m;395396// If we don't project any columns we cannot read properly from the file,397// so we just create an empty frame with the proper height.398let mut df = if pl_schema.is_empty() {399DataFrame::empty_with_height(slice.len())400} else {401use polars_core::utils::arrow::io::ipc;402403let mut reader = ipc::read::FileReader::new_with_projection_info(404Cursor::new(memslice.as_ref()),405metadata.as_ref().clone(),406projection_info.clone(),407None,408);409410reader.set_current_block(block_range.start);411reader.set_scratches((412std::mem::take(&mut data_scratch),413std::mem::take(&mut message_scratch),414));415416// Create the DataFrame with the appropriate schema and append all the record417// batches to it. This will perform schema validation as well.418let mut df = DataFrame::empty_with_schema(&pl_schema);419df.try_extend(reader.by_ref().take(block_range.len()))?;420421(data_scratch, message_scratch) = reader.take_scratches();422df = df.slice(slice.start as i64, slice.len());423424df425};426427if let Some(RowIndex { name, offset: _ }) = &row_index {428let offset = row_idx_offset + slice.start as IdxSize;429df = df.with_row_index(name.clone(), Some(offset))?;430}431432// If the block is very large, we want to split the block amongst the433// pipelines. That will at least allow some parallelism.434if df.height() > max_morsel_size && verbose {435eprintln!(436"IpcFileReader encountered a (too) large record batch \437of {} rows. Splitting and continuing.",438df.height()439);440}441442for i in 0..df.height().div_ceil(max_morsel_size) {443let morsel_df = df.slice((i * max_morsel_size) as i64, max_morsel_size);444let seq = MorselSeq::new(morsel_seq_base + i as u64);445if send446.insert(Priority(Reverse(seq), morsel_df))447.await448.is_err()449{450break;451}452}453}454455PolarsResult::Ok(())456}))457})458.collect::<Vec<_>>();459460let memslice = memslice;461let metadata = metadata;462let row_index = row_index;463let projection_info = projection_info;464465// Walker task.466//467// Walks all the sources and supplies block ranges to the decoder tasks.468let walker_handle = AbortOnDropHandle::new(spawn(TaskPriority::Low, async move {469let mut morsel_seq: u64 = 0;470let mut row_idx_offset: IdxSize = row_index.as_ref().map_or(0, |ri| ri.offset);471let mut slice: Range<usize> = slice;472473struct Batch {474row_idx_offset: IdxSize,475block_start: usize,476num_rows: usize,477}478479// Batch completion parameters480let batch_size_limit = get_ideal_morsel_size();481let sliced_batch_size_limit = slice.len().div_ceil(num_pipelines);482let batch_block_limit = metadata.blocks.len().div_ceil(num_pipelines);483484use polars_core::utils::arrow::io::ipc;485486let mut reader = ipc::read::FileReader::new_with_projection_info(487Cursor::new(memslice.as_ref()),488metadata.as_ref().clone(),489projection_info.clone(),490None,491);492493if slice.start > 0 {494// Skip over all blocks that the slice would skip anyway.495let new_offset = reader.skip_blocks_till_limit(slice.start as u64)?;496497row_idx_offset += (slice.start as u64 - new_offset) as IdxSize;498slice = new_offset as usize..new_offset as usize + slice.len();499}500501'read: {502// If we skip the entire file. Don't even try to read from it.503if reader.get_current_block() == reader.metadata().blocks.len() {504break 'read;505}506507let mut batch = Batch {508row_idx_offset,509block_start: reader.get_current_block(),510num_rows: 0,511};512513// We don't yet want to commit these values to the state in case this batch gets514// cancelled.515let mut uncommitted_slice = slice.clone();516let mut uncommitted_row_idx_offset = row_idx_offset;517while !slice.is_empty() {518let mut is_batch_complete = false;519520match reader.next_record_batch() {521None if batch.num_rows == 0 => break,522523// If we have no more record batches available, we want to send what is524// left.525None => is_batch_complete = true,526Some(record_batch) => {527let rb_num_rows = record_batch?.length()? as usize;528batch.num_rows += rb_num_rows;529530// We need to ensure that we are not overflowing the IdxSize maximum531// capacity.532let rb_num_rows = IdxSize::try_from(rb_num_rows)533.map_err(|_| ROW_COUNT_OVERFLOW_ERR)?;534uncommitted_row_idx_offset = uncommitted_row_idx_offset535.checked_add(rb_num_rows)536.ok_or(ROW_COUNT_OVERFLOW_ERR)?;537},538}539540let current_block = reader.get_current_block();541542// Subdivide into batches for large files.543is_batch_complete |= batch.num_rows >= batch_size_limit;544// Subdivide into batches if the file is sliced.545is_batch_complete |= batch.num_rows >= sliced_batch_size_limit;546// Subdivide into batches for small files.547is_batch_complete |= current_block - batch.block_start >= batch_block_limit;548549// Batch blocks such that we send appropriately sized morsels. We guarantee a550// lower bound here, but not an upper bound.551if is_batch_complete {552let batch_slice = slice_take(&mut uncommitted_slice, batch.num_rows);553let batch_slice_len = batch_slice.len();554let block_range = batch.block_start..current_block;555556let message = BatchMessage {557row_idx_offset: batch.row_idx_offset,558slice: batch_slice,559morsel_seq_base: morsel_seq,560block_range,561};562563if batch_tx.send(message).await.is_err() {564// This should only happen if the receiver of the decoder565// has broken off, meaning no further input will be needed.566break 'read;567}568569// Commit the changes to the state.570// Now, we know that the a decoder will process it.571//572// This might generate several morsels if the record batch is very large.573morsel_seq += batch_slice_len.div_ceil(max_morsel_size) as u64;574slice = uncommitted_slice.clone();575row_idx_offset = uncommitted_row_idx_offset;576577batch = Batch {578row_idx_offset,579block_start: current_block,580num_rows: 0,581};582}583}584} // 'read585586PolarsResult::Ok(())587}));588589Ok((590morsel_rx,591spawn(TaskPriority::Low, async move {592distributor_handle.await?;593594for handle in decoder_handles {595handle.await?;596}597598walker_handle.await?;599Ok(())600}),601))602}603604async fn n_rows_in_file(&mut self) -> PolarsResult<IdxSize> {605self._n_rows_in_file()606}607608async fn row_position_after_slice(609&mut self,610pre_slice: Option<Slice>,611) -> PolarsResult<IdxSize> {612self._row_position_after_slice(pre_slice)613}614}615616impl IpcFileReader {617fn _n_rows_in_file(&mut self) -> PolarsResult<IdxSize> {618let InitializedState {619memslice,620file_metadata,621n_rows_in_file,622} = self.init_data.as_mut().unwrap();623624if n_rows_in_file.is_none() {625let n_rows: i64 = get_row_count_from_blocks(626&mut std::io::Cursor::new(memslice.as_ref()),627&file_metadata.blocks,628)?;629630let n_rows = IdxSize::try_from(n_rows)631.map_err(|_| polars_err!(bigidx, ctx = "ipc file", size = n_rows))?;632633*n_rows_in_file = Some(n_rows);634}635636Ok(n_rows_in_file.unwrap())637}638639fn _row_position_after_slice(&mut self, pre_slice: Option<Slice>) -> PolarsResult<IdxSize> {640Ok(calc_row_position_after_slice(641self._n_rows_in_file()?,642pre_slice,643))644}645}646647648