Path: blob/main/crates/polars-stream/src/nodes/io_sources/ipc/builder.rs
8446 views
use std::num::NonZeroUsize;1use std::sync::Arc;23use arrow::io::ipc::read::FileMetadata;4use polars_core::config;5use polars_io::cloud::CloudOptions;6use polars_io::ipc::IpcScanOptions;7use polars_plan::dsl::ScanSource;8use polars_utils::relaxed_cell::RelaxedCell;910use super::{DynByteSourceBuilder, IpcFileReader};11use crate::async_primitives::wait_group::WaitGroup;12#[cfg(feature = "ipc")]13use crate::metrics::IOMetrics;14use crate::nodes::io_sources::multi_scan::reader_interface::FileReader;15use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;16use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;1718pub struct IpcReaderBuilder {19pub first_metadata: Option<Arc<FileMetadata>>,20pub options: Arc<IpcScanOptions>,21pub prefetch_limit: RelaxedCell<usize>,22pub prefetch_semaphore: std::sync::OnceLock<Arc<tokio::sync::Semaphore>>,23pub shared_prefetch_wait_group_slot: Arc<std::sync::Mutex<Option<WaitGroup>>>,24pub io_metrics: std::sync::OnceLock<Arc<IOMetrics>>,25}2627impl std::fmt::Debug for IpcReaderBuilder {28fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {29f.debug_struct("IpcBuilder")30.field("first_metadata", &self.first_metadata)31.field("options", &self.options)32.field("prefetch_semaphore", &self.prefetch_semaphore)33.finish()34}35}3637#[cfg(feature = "ipc")]38impl FileReaderBuilder for IpcReaderBuilder {39fn reader_name(&self) -> &str {40"ipc"41}4243fn reader_capabilities(&self) -> ReaderCapabilities {44use ReaderCapabilities as RC;4546RC::ROW_INDEX | RC::PRE_SLICE47}4849fn set_execution_state(&self, execution_state: &crate::execute::StreamingExecutionState) {50let prefetch_limit = std::env::var("POLARS_RECORD_BATCH_PREFETCH_SIZE")51.map(|x| {52x.parse::<NonZeroUsize>()53.unwrap_or_else(|_| {54panic!("invalid value for POLARS_RECORD_BATCH_PREFETCH_SIZE: {x}")55})56.get()57})58.unwrap_or(execution_state.num_pipelines.saturating_mul(2))59.max(1);6061self.prefetch_limit.store(prefetch_limit);6263if config::verbose() {64eprintln!(65"[IpcReaderBuilder]: prefetch_limit: {}",66self.prefetch_limit.load()67);68}6970self.prefetch_semaphore71.set(Arc::new(tokio::sync::Semaphore::new(prefetch_limit)))72.unwrap()73}7475fn set_io_metrics(&self, io_metrics: Arc<IOMetrics>) {76self.io_metrics.set(io_metrics).ok().unwrap()77}7879fn build_file_reader(80&self,81source: ScanSource,82cloud_options: Option<Arc<CloudOptions>>,83scan_source_idx: usize,84) -> Box<dyn FileReader> {85use crate::metrics::OptIOMetrics;86use crate::nodes::io_sources::ipc::RecordBatchPrefetchSync;8788let scan_source = source;89let config = self.options.clone();90let verbose = config::verbose();9192let metadata = if scan_source_idx == 0 {93self.first_metadata.clone()94} else {95None96};9798let byte_source_builder =99if scan_source.is_cloud_url() || polars_config::config().force_async() {100DynByteSourceBuilder::ObjectStore101} else {102DynByteSourceBuilder::Mmap103};104105let reader = IpcFileReader {106scan_source,107cloud_options,108config,109metadata,110byte_source_builder,111record_batch_prefetch_sync: RecordBatchPrefetchSync {112prefetch_limit: self.prefetch_limit.load(),113prefetch_semaphore: Arc::clone(self.prefetch_semaphore.get().unwrap()),114shared_prefetch_wait_group_slot: Arc::clone(&self.shared_prefetch_wait_group_slot),115prev_all_spawned: None,116current_all_spawned: None,117},118io_metrics: OptIOMetrics(self.io_metrics.get().cloned()),119verbose,120init_data: None,121checked: self.options.checked,122};123124Box::new(reader) as Box<dyn FileReader>125}126}127128129