Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/multi_scan/mod.rs
8446 views
1
pub mod components;
2
pub mod config;
3
pub mod functions;
4
mod pipeline;
5
pub mod reader_interface;
6
7
use std::sync::{Arc, Mutex};
8
9
use pipeline::initialization::initialize_multi_scan_pipeline;
10
use polars_error::PolarsResult;
11
use polars_io::pl_async;
12
use polars_utils::format_pl_smallstr;
13
use polars_utils::pl_str::PlSmallStr;
14
15
use crate::async_executor::{self, AbortOnDropHandle, TaskPriority};
16
use crate::async_primitives::connector;
17
use crate::async_primitives::wait_group::{WaitGroup, WaitToken};
18
use crate::execute::StreamingExecutionState;
19
use crate::graph::PortState;
20
use crate::metrics::MetricsBuilder;
21
use crate::nodes::ComputeNode;
22
use crate::nodes::io_sources::multi_scan::components::bridge::BridgeState;
23
use crate::nodes::io_sources::multi_scan::config::MultiScanConfig;
24
use crate::nodes::io_sources::multi_scan::functions::{
25
calc_max_concurrent_scans, calc_n_readers_pre_init,
26
};
27
use crate::nodes::io_sources::multi_scan::pipeline::models::InitializedPipelineState;
28
use crate::pipe::PortSender;
29
30
pub struct MultiScan {
31
name: PlSmallStr,
32
state: MultiScanState,
33
metrics_builder: Option<MetricsBuilder>,
34
verbose: bool,
35
}
36
37
impl MultiScan {
38
pub fn new(config: Arc<MultiScanConfig>) -> Self {
39
let name = format_pl_smallstr!("multi-scan[{}]", config.file_reader_builder.reader_name());
40
let verbose = config.verbose;
41
42
MultiScan {
43
name,
44
state: MultiScanState::Uninitialized { config },
45
metrics_builder: None,
46
verbose,
47
}
48
}
49
}
50
51
impl ComputeNode for MultiScan {
52
fn name(&self) -> &str {
53
&self.name
54
}
55
56
fn set_metrics_builder(&mut self, metrics_builder: MetricsBuilder) {
57
self.metrics_builder = Some(metrics_builder);
58
}
59
60
fn update_state(
61
&mut self,
62
recv: &mut [crate::graph::PortState],
63
send: &mut [crate::graph::PortState],
64
_state: &StreamingExecutionState,
65
) -> polars_error::PolarsResult<()> {
66
use MultiScanState::*;
67
assert!(recv.is_empty());
68
assert_eq!(send.len(), 1);
69
70
send[0] = if send[0] == PortState::Done {
71
self.state = Finished;
72
73
PortState::Done
74
} else {
75
// Refresh first - in case there is an error we end here instead of ending when we go
76
// into spawn.
77
async_executor::task_scope(|s| {
78
pl_async::get_runtime()
79
.block_on(s.spawn_task(TaskPriority::High, self.state.refresh(self.verbose)))
80
})?;
81
82
match self.state {
83
Uninitialized { .. } | Initialized { .. } => PortState::Ready,
84
Finished => PortState::Done,
85
}
86
};
87
88
Ok(())
89
}
90
91
fn spawn<'env, 's>(
92
&'env mut self,
93
scope: &'s crate::async_executor::TaskScope<'s, 'env>,
94
recv_ports: &mut [Option<crate::pipe::RecvPort<'_>>],
95
send_ports: &mut [Option<crate::pipe::SendPort<'_>>],
96
state: &'s StreamingExecutionState,
97
join_handles: &mut Vec<crate::async_executor::JoinHandle<polars_error::PolarsResult<()>>>,
98
) {
99
assert!(recv_ports.is_empty() && send_ports.len() == 1);
100
101
let phase_morsel_tx = send_ports[0].take().unwrap().serial();
102
let verbose = self.verbose;
103
104
join_handles.push(scope.spawn_task(TaskPriority::Low, async move {
105
use MultiScanState::*;
106
107
self.state
108
.initialize(state.clone(), self.metrics_builder.as_ref());
109
self.state.refresh(verbose).await?;
110
111
match &mut self.state {
112
Uninitialized { .. } => unreachable!(),
113
114
Finished => return Ok(()),
115
116
Initialized {
117
phase_channel_tx,
118
wait_group,
119
..
120
} => {
121
use crate::async_primitives::connector::SendError;
122
123
match phase_channel_tx.try_send((phase_morsel_tx, wait_group.token())) {
124
Ok(_) => wait_group.wait().await,
125
126
// Should never: We only send the next value once the wait token is dropped.
127
Err(SendError::Full(_)) => unreachable!(),
128
129
// Bridge has disconnected from the reader side. We know this because
130
// we are still holding `phase_channel_tx`.
131
Err(SendError::Closed(_)) => {
132
if verbose {
133
eprintln!("[MultiScan]: Bridge disconnected")
134
}
135
136
let Initialized { task_handle, .. } =
137
std::mem::replace(&mut self.state, Finished)
138
else {
139
unreachable!()
140
};
141
142
task_handle.await?;
143
144
return Ok(());
145
},
146
}
147
},
148
}
149
150
self.state.refresh(verbose).await
151
}));
152
}
153
}
154
155
enum MultiScanState {
156
Uninitialized {
157
config: Arc<MultiScanConfig>,
158
},
159
160
Initialized {
161
phase_channel_tx: connector::Sender<(PortSender, WaitToken)>,
162
/// Wait group sent to the bridge, only dropped when a disconnect happens at the bridge.
163
wait_group: WaitGroup,
164
bridge_state: Arc<Mutex<BridgeState>>,
165
/// Single join handle for all background tasks. Note, this does not include the bridge.
166
task_handle: AbortOnDropHandle<PolarsResult<()>>,
167
},
168
169
Finished,
170
}
171
172
impl MultiScanState {
173
/// Initialize state if not yet initialized.
174
fn initialize(
175
&mut self,
176
execution_state: StreamingExecutionState,
177
metrics_builder: Option<&MetricsBuilder>,
178
) {
179
use MultiScanState::*;
180
181
let slf = std::mem::replace(self, Finished);
182
183
let Uninitialized { config } = slf else {
184
*self = slf;
185
return;
186
};
187
188
config
189
.file_reader_builder
190
.set_execution_state(&execution_state);
191
192
if let Some(metrics_builder) = metrics_builder {
193
let io_metrics = metrics_builder.new_io_metrics();
194
195
config.io_metrics.get_or_init(|| io_metrics.clone());
196
config.file_reader_builder.set_io_metrics(io_metrics);
197
}
198
199
let num_pipelines = execution_state.num_pipelines;
200
201
config.num_pipelines.store(num_pipelines);
202
203
config.n_readers_pre_init.store(calc_n_readers_pre_init(
204
num_pipelines,
205
config.sources.len(),
206
config.pre_slice.as_ref(),
207
));
208
209
config.max_concurrent_scans.store(calc_max_concurrent_scans(
210
num_pipelines,
211
config.sources.len(),
212
));
213
214
let InitializedPipelineState {
215
task_handle,
216
phase_channel_tx,
217
bridge_state,
218
} = initialize_multi_scan_pipeline(config, execution_state);
219
220
let wait_group = WaitGroup::default();
221
222
*self = Initialized {
223
phase_channel_tx,
224
wait_group,
225
bridge_state,
226
task_handle,
227
};
228
}
229
230
/// Refresh the state. This checks the bridge state if `self` is initialized and updates accordingly.
231
async fn refresh(&mut self, verbose: bool) -> PolarsResult<()> {
232
use MultiScanState::*;
233
use components::bridge::StopReason;
234
235
// Take, so that if we error below the state will be left as finished.
236
let slf = std::mem::replace(self, MultiScanState::Finished);
237
238
let slf = match slf {
239
Uninitialized { .. } | Finished => slf,
240
241
#[expect(clippy::blocks_in_conditions)]
242
Initialized {
243
phase_channel_tx,
244
wait_group,
245
bridge_state,
246
task_handle,
247
} => match { *bridge_state.lock().unwrap() } {
248
BridgeState::NotYetStarted | BridgeState::Running => Initialized {
249
phase_channel_tx,
250
wait_group,
251
bridge_state,
252
task_handle,
253
},
254
255
// Never the case: holding `phase_channel_tx` guarantees this.
256
BridgeState::Stopped(StopReason::ComputeNodeDisconnected) => unreachable!(),
257
258
// If we are disconnected from the reader side, it could mean an error. Joining on
259
// the handle should catch this.
260
BridgeState::Stopped(StopReason::ReadersDisconnected) => {
261
if verbose {
262
eprintln!("[MultiScanState]: Readers disconnected")
263
}
264
265
*self = Finished;
266
task_handle.await?;
267
Finished
268
},
269
},
270
};
271
272
*self = slf;
273
274
Ok(())
275
}
276
}
277
278