Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/mod.rs
8446 views
1
use std::sync::Arc;
2
3
use polars_core::frame::DataFrame;
4
use polars_error::{PolarsResult, polars_ensure};
5
use polars_io::metrics::IOMetrics;
6
use polars_io::pl_async;
7
use polars_utils::format_pl_smallstr;
8
use polars_utils::pl_str::PlSmallStr;
9
10
use super::{ComputeNode, PortState};
11
use crate::async_executor;
12
use crate::async_primitives::connector;
13
use crate::execute::StreamingExecutionState;
14
use crate::metrics::MetricsBuilder;
15
use crate::morsel::{Morsel, MorselSeq, SourceToken};
16
use crate::nodes::TaskPriority;
17
use crate::nodes::io_sinks::components::partitioner::Partitioner;
18
use crate::nodes::io_sinks::config::{IOSinkNodeConfig, IOSinkTarget};
19
use crate::nodes::io_sinks::pipeline_initialization::partition_by::start_partition_sink_pipeline;
20
use crate::nodes::io_sinks::pipeline_initialization::single_file::start_single_file_sink_pipeline;
21
use crate::pipe::PortReceiver;
22
pub mod components;
23
pub mod config;
24
pub mod pipeline_initialization;
25
pub mod writers;
26
27
pub struct IOSinkNode {
28
name: PlSmallStr,
29
state: IOSinkNodeState,
30
io_metrics: Option<Arc<IOMetrics>>,
31
verbose: bool,
32
}
33
34
impl IOSinkNode {
35
pub fn new(config: impl Into<Box<IOSinkNodeConfig>>) -> Self {
36
let config = config.into();
37
38
let target_type = match &config.target {
39
IOSinkTarget::File(_) => "single-file",
40
IOSinkTarget::Partitioned(p) => match &p.partitioner {
41
Partitioner::Keyed(_) => "partition-keyed",
42
Partitioner::FileSize => "partition-file-size",
43
},
44
};
45
46
let extension = config.file_format.extension();
47
48
let name = format_pl_smallstr!("io-sink[{target_type}[{extension}]]");
49
let verbose = polars_core::config::verbose();
50
51
IOSinkNode {
52
name,
53
state: IOSinkNodeState::Uninitialized { config },
54
io_metrics: None,
55
verbose,
56
}
57
}
58
}
59
60
impl ComputeNode for IOSinkNode {
61
fn name(&self) -> &str {
62
&self.name
63
}
64
65
fn set_metrics_builder(&mut self, metrics_builder: MetricsBuilder) {
66
self.io_metrics = Some(metrics_builder.new_io_metrics());
67
}
68
69
fn update_state(
70
&mut self,
71
recv: &mut [crate::graph::PortState],
72
send: &mut [crate::graph::PortState],
73
execution_state: &StreamingExecutionState,
74
) -> polars_error::PolarsResult<()> {
75
assert_eq!(recv.len(), 1);
76
assert!(send.is_empty());
77
78
recv[0] = if recv[0] == PortState::Done {
79
// Ensure initialize / writes empty file for empty output.
80
self.state
81
.initialize(&self.name, execution_state, self.io_metrics.clone())?;
82
83
match std::mem::replace(&mut self.state, IOSinkNodeState::Finished) {
84
IOSinkNodeState::Initialized {
85
phase_channel_tx,
86
task_handle,
87
} => {
88
if self.verbose {
89
eprintln!(
90
"{}: Join on task_handle (recv PortState::Done)",
91
self.name()
92
);
93
}
94
drop(phase_channel_tx);
95
pl_async::get_runtime().block_on(task_handle)?;
96
},
97
IOSinkNodeState::Finished => {},
98
IOSinkNodeState::Uninitialized { .. } => unreachable!(),
99
};
100
101
PortState::Done
102
} else {
103
polars_ensure!(
104
!matches!(self.state, IOSinkNodeState::Finished),
105
ComputeError:
106
"unreachable: IO sink node state is 'Finished', but recv port \
107
state is not 'Done'."
108
);
109
110
PortState::Ready
111
};
112
113
Ok(())
114
}
115
116
fn spawn<'env, 's>(
117
&'env mut self,
118
scope: &'s crate::async_executor::TaskScope<'s, 'env>,
119
recv_ports: &mut [Option<crate::pipe::RecvPort<'_>>],
120
send_ports: &mut [Option<crate::pipe::SendPort<'_>>],
121
execution_state: &'s StreamingExecutionState,
122
join_handles: &mut Vec<crate::async_executor::JoinHandle<polars_error::PolarsResult<()>>>,
123
) {
124
assert_eq!(recv_ports.len(), 1);
125
assert!(send_ports.is_empty());
126
127
let phase_morsel_rx = recv_ports[0].take().unwrap().serial();
128
129
join_handles.push(scope.spawn_task(TaskPriority::Low, async move {
130
self.state
131
.initialize(&self.name, execution_state, self.io_metrics.clone())?;
132
133
let IOSinkNodeState::Initialized {
134
phase_channel_tx, ..
135
} = &mut self.state
136
else {
137
unreachable!()
138
};
139
140
if phase_channel_tx.send(phase_morsel_rx).await.is_err() {
141
let IOSinkNodeState::Initialized {
142
phase_channel_tx,
143
task_handle,
144
} = std::mem::replace(&mut self.state, IOSinkNodeState::Finished)
145
else {
146
unreachable!()
147
};
148
149
if self.verbose {
150
eprintln!(
151
"{}: Join on task_handle (phase_channel_tx Err)",
152
self.name()
153
);
154
}
155
156
drop(phase_channel_tx);
157
158
return Err(task_handle.await.unwrap_err());
159
}
160
161
Ok(())
162
}));
163
}
164
}
165
166
enum IOSinkNodeState {
167
Uninitialized {
168
config: Box<IOSinkNodeConfig>,
169
},
170
171
Initialized {
172
phase_channel_tx: connector::Sender<PortReceiver>,
173
/// Join handle for all background tasks.
174
task_handle: async_executor::AbortOnDropHandle<PolarsResult<()>>,
175
},
176
177
Finished,
178
}
179
180
impl IOSinkNodeState {
181
/// Initialize state if not yet initialized.
182
fn initialize(
183
&mut self,
184
node_name: &PlSmallStr,
185
execution_state: &StreamingExecutionState,
186
io_metrics: Option<Arc<IOMetrics>>,
187
) -> PolarsResult<()> {
188
use IOSinkNodeState::*;
189
190
if !matches!(self, Self::Uninitialized { .. }) {
191
return Ok(());
192
}
193
194
let Uninitialized { config } = std::mem::replace(self, Finished) else {
195
unreachable!()
196
};
197
198
let (phase_channel_tx, mut phase_channel_rx) = connector::connector::<PortReceiver>();
199
let (mut multi_phase_tx, multi_phase_rx) = connector::connector();
200
201
let _ = multi_phase_tx.try_send(Morsel::new(
202
DataFrame::empty_with_arc_schema(config.input_schema.clone()),
203
MorselSeq::new(0),
204
SourceToken::default(),
205
));
206
207
async_executor::spawn(TaskPriority::High, async move {
208
let mut morsel_seq: u64 = 1;
209
210
while let Ok(mut phase_rx) = phase_channel_rx.recv().await {
211
while let Ok(mut morsel) = phase_rx.recv().await {
212
morsel.set_seq(MorselSeq::new(morsel_seq));
213
morsel_seq = morsel_seq.saturating_add(1);
214
215
if multi_phase_tx.send(morsel).await.is_err() {
216
break;
217
}
218
}
219
}
220
});
221
222
let task_handle = match &config.target {
223
IOSinkTarget::File(_) => start_single_file_sink_pipeline(
224
node_name.clone(),
225
multi_phase_rx,
226
*config,
227
execution_state,
228
io_metrics,
229
)?,
230
231
IOSinkTarget::Partitioned { .. } => start_partition_sink_pipeline(
232
node_name,
233
multi_phase_rx,
234
*config,
235
execution_state,
236
io_metrics,
237
)?,
238
};
239
240
*self = Initialized {
241
phase_channel_tx,
242
task_handle,
243
};
244
245
Ok(())
246
}
247
}
248
249