Path: blob/main/crates/polars-stream/src/nodes/io_sources/multi_scan/mod.rs
8446 views
pub mod components;1pub mod config;2pub mod functions;3mod pipeline;4pub mod reader_interface;56use std::sync::{Arc, Mutex};78use pipeline::initialization::initialize_multi_scan_pipeline;9use polars_error::PolarsResult;10use polars_io::pl_async;11use polars_utils::format_pl_smallstr;12use polars_utils::pl_str::PlSmallStr;1314use crate::async_executor::{self, AbortOnDropHandle, TaskPriority};15use crate::async_primitives::connector;16use crate::async_primitives::wait_group::{WaitGroup, WaitToken};17use crate::execute::StreamingExecutionState;18use crate::graph::PortState;19use crate::metrics::MetricsBuilder;20use crate::nodes::ComputeNode;21use crate::nodes::io_sources::multi_scan::components::bridge::BridgeState;22use crate::nodes::io_sources::multi_scan::config::MultiScanConfig;23use crate::nodes::io_sources::multi_scan::functions::{24calc_max_concurrent_scans, calc_n_readers_pre_init,25};26use crate::nodes::io_sources::multi_scan::pipeline::models::InitializedPipelineState;27use crate::pipe::PortSender;2829pub struct MultiScan {30name: PlSmallStr,31state: MultiScanState,32metrics_builder: Option<MetricsBuilder>,33verbose: bool,34}3536impl MultiScan {37pub fn new(config: Arc<MultiScanConfig>) -> Self {38let name = format_pl_smallstr!("multi-scan[{}]", config.file_reader_builder.reader_name());39let verbose = config.verbose;4041MultiScan {42name,43state: MultiScanState::Uninitialized { config },44metrics_builder: None,45verbose,46}47}48}4950impl ComputeNode for MultiScan {51fn name(&self) -> &str {52&self.name53}5455fn set_metrics_builder(&mut self, metrics_builder: MetricsBuilder) {56self.metrics_builder = Some(metrics_builder);57}5859fn update_state(60&mut self,61recv: &mut [crate::graph::PortState],62send: &mut [crate::graph::PortState],63_state: &StreamingExecutionState,64) -> polars_error::PolarsResult<()> {65use MultiScanState::*;66assert!(recv.is_empty());67assert_eq!(send.len(), 1);6869send[0] = if send[0] == PortState::Done {70self.state = Finished;7172PortState::Done73} else {74// Refresh first - in case there is an error we end here instead of ending when we go75// into spawn.76async_executor::task_scope(|s| {77pl_async::get_runtime()78.block_on(s.spawn_task(TaskPriority::High, self.state.refresh(self.verbose)))79})?;8081match self.state {82Uninitialized { .. } | Initialized { .. } => PortState::Ready,83Finished => PortState::Done,84}85};8687Ok(())88}8990fn spawn<'env, 's>(91&'env mut self,92scope: &'s crate::async_executor::TaskScope<'s, 'env>,93recv_ports: &mut [Option<crate::pipe::RecvPort<'_>>],94send_ports: &mut [Option<crate::pipe::SendPort<'_>>],95state: &'s StreamingExecutionState,96join_handles: &mut Vec<crate::async_executor::JoinHandle<polars_error::PolarsResult<()>>>,97) {98assert!(recv_ports.is_empty() && send_ports.len() == 1);99100let phase_morsel_tx = send_ports[0].take().unwrap().serial();101let verbose = self.verbose;102103join_handles.push(scope.spawn_task(TaskPriority::Low, async move {104use MultiScanState::*;105106self.state107.initialize(state.clone(), self.metrics_builder.as_ref());108self.state.refresh(verbose).await?;109110match &mut self.state {111Uninitialized { .. } => unreachable!(),112113Finished => return Ok(()),114115Initialized {116phase_channel_tx,117wait_group,118..119} => {120use crate::async_primitives::connector::SendError;121122match phase_channel_tx.try_send((phase_morsel_tx, wait_group.token())) {123Ok(_) => wait_group.wait().await,124125// Should never: We only send the next value once the wait token is dropped.126Err(SendError::Full(_)) => unreachable!(),127128// Bridge has disconnected from the reader side. We know this because129// we are still holding `phase_channel_tx`.130Err(SendError::Closed(_)) => {131if verbose {132eprintln!("[MultiScan]: Bridge disconnected")133}134135let Initialized { task_handle, .. } =136std::mem::replace(&mut self.state, Finished)137else {138unreachable!()139};140141task_handle.await?;142143return Ok(());144},145}146},147}148149self.state.refresh(verbose).await150}));151}152}153154enum MultiScanState {155Uninitialized {156config: Arc<MultiScanConfig>,157},158159Initialized {160phase_channel_tx: connector::Sender<(PortSender, WaitToken)>,161/// Wait group sent to the bridge, only dropped when a disconnect happens at the bridge.162wait_group: WaitGroup,163bridge_state: Arc<Mutex<BridgeState>>,164/// Single join handle for all background tasks. Note, this does not include the bridge.165task_handle: AbortOnDropHandle<PolarsResult<()>>,166},167168Finished,169}170171impl MultiScanState {172/// Initialize state if not yet initialized.173fn initialize(174&mut self,175execution_state: StreamingExecutionState,176metrics_builder: Option<&MetricsBuilder>,177) {178use MultiScanState::*;179180let slf = std::mem::replace(self, Finished);181182let Uninitialized { config } = slf else {183*self = slf;184return;185};186187config188.file_reader_builder189.set_execution_state(&execution_state);190191if let Some(metrics_builder) = metrics_builder {192let io_metrics = metrics_builder.new_io_metrics();193194config.io_metrics.get_or_init(|| io_metrics.clone());195config.file_reader_builder.set_io_metrics(io_metrics);196}197198let num_pipelines = execution_state.num_pipelines;199200config.num_pipelines.store(num_pipelines);201202config.n_readers_pre_init.store(calc_n_readers_pre_init(203num_pipelines,204config.sources.len(),205config.pre_slice.as_ref(),206));207208config.max_concurrent_scans.store(calc_max_concurrent_scans(209num_pipelines,210config.sources.len(),211));212213let InitializedPipelineState {214task_handle,215phase_channel_tx,216bridge_state,217} = initialize_multi_scan_pipeline(config, execution_state);218219let wait_group = WaitGroup::default();220221*self = Initialized {222phase_channel_tx,223wait_group,224bridge_state,225task_handle,226};227}228229/// Refresh the state. This checks the bridge state if `self` is initialized and updates accordingly.230async fn refresh(&mut self, verbose: bool) -> PolarsResult<()> {231use MultiScanState::*;232use components::bridge::StopReason;233234// Take, so that if we error below the state will be left as finished.235let slf = std::mem::replace(self, MultiScanState::Finished);236237let slf = match slf {238Uninitialized { .. } | Finished => slf,239240#[expect(clippy::blocks_in_conditions)]241Initialized {242phase_channel_tx,243wait_group,244bridge_state,245task_handle,246} => match { *bridge_state.lock().unwrap() } {247BridgeState::NotYetStarted | BridgeState::Running => Initialized {248phase_channel_tx,249wait_group,250bridge_state,251task_handle,252},253254// Never the case: holding `phase_channel_tx` guarantees this.255BridgeState::Stopped(StopReason::ComputeNodeDisconnected) => unreachable!(),256257// If we are disconnected from the reader side, it could mean an error. Joining on258// the handle should catch this.259BridgeState::Stopped(StopReason::ReadersDisconnected) => {260if verbose {261eprintln!("[MultiScanState]: Readers disconnected")262}263264*self = Finished;265task_handle.await?;266Finished267},268},269};270271*self = slf;272273Ok(())274}275}276277278