Path: blob/main/crates/polars-stream/src/nodes/io_sources/multi_scan/mod.rs
6939 views
pub mod components;1pub mod config;2pub mod functions;3mod pipeline;4pub mod reader_interface;56use std::sync::{Arc, Mutex};78use pipeline::initialization::initialize_multi_scan_pipeline;9use polars_error::PolarsResult;10use polars_io::pl_async;11use polars_utils::format_pl_smallstr;12use polars_utils::pl_str::PlSmallStr;1314use crate::async_executor::{self, AbortOnDropHandle, TaskPriority};15use crate::async_primitives::connector;16use crate::async_primitives::wait_group::{WaitGroup, WaitToken};17use crate::execute::StreamingExecutionState;18use crate::graph::PortState;19use crate::morsel::Morsel;20use crate::nodes::ComputeNode;21use crate::nodes::io_sources::multi_scan::components::bridge::BridgeState;22use crate::nodes::io_sources::multi_scan::config::MultiScanConfig;23use crate::nodes::io_sources::multi_scan::functions::{24calc_max_concurrent_scans, calc_n_readers_pre_init,25};26use crate::nodes::io_sources::multi_scan::pipeline::models::InitializedPipelineState;2728pub struct MultiScan {29name: PlSmallStr,30state: MultiScanState,31verbose: bool,32}3334impl MultiScan {35pub fn new(config: Arc<MultiScanConfig>) -> Self {36let name = format_pl_smallstr!("multi-scan[{}]", config.file_reader_builder.reader_name());37let verbose = config.verbose;3839MultiScan {40name,41state: MultiScanState::Uninitialized { config },42verbose,43}44}45}4647impl ComputeNode for MultiScan {48fn name(&self) -> &str {49&self.name50}5152fn update_state(53&mut self,54recv: &mut [crate::graph::PortState],55send: &mut [crate::graph::PortState],56_state: &StreamingExecutionState,57) -> polars_error::PolarsResult<()> {58use MultiScanState::*;59assert!(recv.is_empty());60assert_eq!(send.len(), 1);6162send[0] = if send[0] == PortState::Done {63self.state = Finished;6465PortState::Done66} else {67// Refresh first - in case there is an error we end here instead of ending when we go68// into spawn.69async_executor::task_scope(|s| {70pl_async::get_runtime()71.block_on(s.spawn_task(TaskPriority::High, self.state.refresh(self.verbose)))72})?;7374match self.state {75Uninitialized { .. } | Initialized { .. } => PortState::Ready,76Finished => PortState::Done,77}78};7980Ok(())81}8283fn spawn<'env, 's>(84&'env mut self,85scope: &'s crate::async_executor::TaskScope<'s, 'env>,86recv_ports: &mut [Option<crate::pipe::RecvPort<'_>>],87send_ports: &mut [Option<crate::pipe::SendPort<'_>>],88state: &'s StreamingExecutionState,89join_handles: &mut Vec<crate::async_executor::JoinHandle<polars_error::PolarsResult<()>>>,90) {91assert!(recv_ports.is_empty() && send_ports.len() == 1);9293let phase_morsel_tx = send_ports[0].take().unwrap().serial();94let verbose = self.verbose;9596join_handles.push(scope.spawn_task(TaskPriority::Low, async move {97use MultiScanState::*;9899self.state.initialize(state);100self.state.refresh(verbose).await?;101102match &mut self.state {103Uninitialized { .. } => unreachable!(),104105Finished => return Ok(()),106107Initialized {108phase_channel_tx,109wait_group,110..111} => {112use crate::async_primitives::connector::SendError;113114match phase_channel_tx.try_send((phase_morsel_tx, wait_group.token())) {115Ok(_) => wait_group.wait().await,116117// Should never: We only send the next value once the wait token is dropped.118Err(SendError::Full(_)) => unreachable!(),119120// Bridge has disconnected from the reader side. We know this because121// we are still holding `phase_channel_tx`.122Err(SendError::Closed(_)) => {123if verbose {124eprintln!("[MultiScan]: Bridge disconnected")125}126127let Initialized { task_handle, .. } =128std::mem::replace(&mut self.state, Finished)129else {130unreachable!()131};132133task_handle.await?;134135return Ok(());136},137}138},139}140141self.state.refresh(verbose).await142}));143}144}145146enum MultiScanState {147Uninitialized {148config: Arc<MultiScanConfig>,149},150151Initialized {152phase_channel_tx: connector::Sender<(connector::Sender<Morsel>, WaitToken)>,153/// Wait group sent to the bridge, only dropped when a disconnect happens at the bridge.154wait_group: WaitGroup,155bridge_state: Arc<Mutex<BridgeState>>,156/// Single join handle for all background tasks. Note, this does not include the bridge.157task_handle: AbortOnDropHandle<PolarsResult<()>>,158},159160Finished,161}162163impl MultiScanState {164/// Initialize state if not yet initialized.165fn initialize(&mut self, execution_state: &StreamingExecutionState) {166use MultiScanState::*;167168let slf = std::mem::replace(self, Finished);169170let Uninitialized { config } = slf else {171*self = slf;172return;173};174175config176.file_reader_builder177.set_execution_state(execution_state);178179let num_pipelines = execution_state.num_pipelines;180181config.num_pipelines.store(num_pipelines);182183config.n_readers_pre_init.store(calc_n_readers_pre_init(184num_pipelines,185config.sources.len(),186config.pre_slice.as_ref(),187));188189config.max_concurrent_scans.store(calc_max_concurrent_scans(190num_pipelines,191config.sources.len(),192));193194let InitializedPipelineState {195task_handle,196phase_channel_tx,197bridge_state,198} = initialize_multi_scan_pipeline(config);199200let wait_group = WaitGroup::default();201202*self = Initialized {203phase_channel_tx,204wait_group,205bridge_state,206task_handle,207};208}209210/// Refresh the state. This checks the bridge state if `self` is initialized and updates accordingly.211async fn refresh(&mut self, verbose: bool) -> PolarsResult<()> {212use MultiScanState::*;213use components::bridge::StopReason;214215// Take, so that if we error below the state will be left as finished.216let slf = std::mem::replace(self, MultiScanState::Finished);217218let slf = match slf {219Uninitialized { .. } | Finished => slf,220221#[expect(clippy::blocks_in_conditions)]222Initialized {223phase_channel_tx,224wait_group,225bridge_state,226task_handle,227} => match { *bridge_state.lock().unwrap() } {228BridgeState::NotYetStarted | BridgeState::Running => Initialized {229phase_channel_tx,230wait_group,231bridge_state,232task_handle,233},234235// Never the case: holding `phase_channel_tx` guarantees this.236BridgeState::Stopped(StopReason::ComputeNodeDisconnected) => unreachable!(),237238// If we are disconnected from the reader side, it could mean an error. Joining on239// the handle should catch this.240BridgeState::Stopped(StopReason::ReadersDisconnected) => {241if verbose {242eprintln!("[MultiScanState]: Readers disconnected")243}244245*self = Finished;246task_handle.await?;247Finished248},249},250};251252*self = slf;253254Ok(())255}256}257258259