Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/multi_scan/mod.rs
6939 views
1
pub mod components;
2
pub mod config;
3
pub mod functions;
4
mod pipeline;
5
pub mod reader_interface;
6
7
use std::sync::{Arc, Mutex};
8
9
use pipeline::initialization::initialize_multi_scan_pipeline;
10
use polars_error::PolarsResult;
11
use polars_io::pl_async;
12
use polars_utils::format_pl_smallstr;
13
use polars_utils::pl_str::PlSmallStr;
14
15
use crate::async_executor::{self, AbortOnDropHandle, TaskPriority};
16
use crate::async_primitives::connector;
17
use crate::async_primitives::wait_group::{WaitGroup, WaitToken};
18
use crate::execute::StreamingExecutionState;
19
use crate::graph::PortState;
20
use crate::morsel::Morsel;
21
use crate::nodes::ComputeNode;
22
use crate::nodes::io_sources::multi_scan::components::bridge::BridgeState;
23
use crate::nodes::io_sources::multi_scan::config::MultiScanConfig;
24
use crate::nodes::io_sources::multi_scan::functions::{
25
calc_max_concurrent_scans, calc_n_readers_pre_init,
26
};
27
use crate::nodes::io_sources::multi_scan::pipeline::models::InitializedPipelineState;
28
29
pub struct MultiScan {
30
name: PlSmallStr,
31
state: MultiScanState,
32
verbose: bool,
33
}
34
35
impl MultiScan {
36
pub fn new(config: Arc<MultiScanConfig>) -> Self {
37
let name = format_pl_smallstr!("multi-scan[{}]", config.file_reader_builder.reader_name());
38
let verbose = config.verbose;
39
40
MultiScan {
41
name,
42
state: MultiScanState::Uninitialized { config },
43
verbose,
44
}
45
}
46
}
47
48
impl ComputeNode for MultiScan {
49
fn name(&self) -> &str {
50
&self.name
51
}
52
53
fn update_state(
54
&mut self,
55
recv: &mut [crate::graph::PortState],
56
send: &mut [crate::graph::PortState],
57
_state: &StreamingExecutionState,
58
) -> polars_error::PolarsResult<()> {
59
use MultiScanState::*;
60
assert!(recv.is_empty());
61
assert_eq!(send.len(), 1);
62
63
send[0] = if send[0] == PortState::Done {
64
self.state = Finished;
65
66
PortState::Done
67
} else {
68
// Refresh first - in case there is an error we end here instead of ending when we go
69
// into spawn.
70
async_executor::task_scope(|s| {
71
pl_async::get_runtime()
72
.block_on(s.spawn_task(TaskPriority::High, self.state.refresh(self.verbose)))
73
})?;
74
75
match self.state {
76
Uninitialized { .. } | Initialized { .. } => PortState::Ready,
77
Finished => PortState::Done,
78
}
79
};
80
81
Ok(())
82
}
83
84
fn spawn<'env, 's>(
85
&'env mut self,
86
scope: &'s crate::async_executor::TaskScope<'s, 'env>,
87
recv_ports: &mut [Option<crate::pipe::RecvPort<'_>>],
88
send_ports: &mut [Option<crate::pipe::SendPort<'_>>],
89
state: &'s StreamingExecutionState,
90
join_handles: &mut Vec<crate::async_executor::JoinHandle<polars_error::PolarsResult<()>>>,
91
) {
92
assert!(recv_ports.is_empty() && send_ports.len() == 1);
93
94
let phase_morsel_tx = send_ports[0].take().unwrap().serial();
95
let verbose = self.verbose;
96
97
join_handles.push(scope.spawn_task(TaskPriority::Low, async move {
98
use MultiScanState::*;
99
100
self.state.initialize(state);
101
self.state.refresh(verbose).await?;
102
103
match &mut self.state {
104
Uninitialized { .. } => unreachable!(),
105
106
Finished => return Ok(()),
107
108
Initialized {
109
phase_channel_tx,
110
wait_group,
111
..
112
} => {
113
use crate::async_primitives::connector::SendError;
114
115
match phase_channel_tx.try_send((phase_morsel_tx, wait_group.token())) {
116
Ok(_) => wait_group.wait().await,
117
118
// Should never: We only send the next value once the wait token is dropped.
119
Err(SendError::Full(_)) => unreachable!(),
120
121
// Bridge has disconnected from the reader side. We know this because
122
// we are still holding `phase_channel_tx`.
123
Err(SendError::Closed(_)) => {
124
if verbose {
125
eprintln!("[MultiScan]: Bridge disconnected")
126
}
127
128
let Initialized { task_handle, .. } =
129
std::mem::replace(&mut self.state, Finished)
130
else {
131
unreachable!()
132
};
133
134
task_handle.await?;
135
136
return Ok(());
137
},
138
}
139
},
140
}
141
142
self.state.refresh(verbose).await
143
}));
144
}
145
}
146
147
enum MultiScanState {
148
Uninitialized {
149
config: Arc<MultiScanConfig>,
150
},
151
152
Initialized {
153
phase_channel_tx: connector::Sender<(connector::Sender<Morsel>, WaitToken)>,
154
/// Wait group sent to the bridge, only dropped when a disconnect happens at the bridge.
155
wait_group: WaitGroup,
156
bridge_state: Arc<Mutex<BridgeState>>,
157
/// Single join handle for all background tasks. Note, this does not include the bridge.
158
task_handle: AbortOnDropHandle<PolarsResult<()>>,
159
},
160
161
Finished,
162
}
163
164
impl MultiScanState {
165
/// Initialize state if not yet initialized.
166
fn initialize(&mut self, execution_state: &StreamingExecutionState) {
167
use MultiScanState::*;
168
169
let slf = std::mem::replace(self, Finished);
170
171
let Uninitialized { config } = slf else {
172
*self = slf;
173
return;
174
};
175
176
config
177
.file_reader_builder
178
.set_execution_state(execution_state);
179
180
let num_pipelines = execution_state.num_pipelines;
181
182
config.num_pipelines.store(num_pipelines);
183
184
config.n_readers_pre_init.store(calc_n_readers_pre_init(
185
num_pipelines,
186
config.sources.len(),
187
config.pre_slice.as_ref(),
188
));
189
190
config.max_concurrent_scans.store(calc_max_concurrent_scans(
191
num_pipelines,
192
config.sources.len(),
193
));
194
195
let InitializedPipelineState {
196
task_handle,
197
phase_channel_tx,
198
bridge_state,
199
} = initialize_multi_scan_pipeline(config);
200
201
let wait_group = WaitGroup::default();
202
203
*self = Initialized {
204
phase_channel_tx,
205
wait_group,
206
bridge_state,
207
task_handle,
208
};
209
}
210
211
/// Refresh the state. This checks the bridge state if `self` is initialized and updates accordingly.
212
async fn refresh(&mut self, verbose: bool) -> PolarsResult<()> {
213
use MultiScanState::*;
214
use components::bridge::StopReason;
215
216
// Take, so that if we error below the state will be left as finished.
217
let slf = std::mem::replace(self, MultiScanState::Finished);
218
219
let slf = match slf {
220
Uninitialized { .. } | Finished => slf,
221
222
#[expect(clippy::blocks_in_conditions)]
223
Initialized {
224
phase_channel_tx,
225
wait_group,
226
bridge_state,
227
task_handle,
228
} => match { *bridge_state.lock().unwrap() } {
229
BridgeState::NotYetStarted | BridgeState::Running => Initialized {
230
phase_channel_tx,
231
wait_group,
232
bridge_state,
233
task_handle,
234
},
235
236
// Never the case: holding `phase_channel_tx` guarantees this.
237
BridgeState::Stopped(StopReason::ComputeNodeDisconnected) => unreachable!(),
238
239
// If we are disconnected from the reader side, it could mean an error. Joining on
240
// the handle should catch this.
241
BridgeState::Stopped(StopReason::ReadersDisconnected) => {
242
if verbose {
243
eprintln!("[MultiScanState]: Readers disconnected")
244
}
245
246
*self = Finished;
247
task_handle.await?;
248
Finished
249
},
250
},
251
};
252
253
*self = slf;
254
255
Ok(())
256
}
257
}
258
259