Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks2/mod.rs
7884 views
1
use polars_core::frame::DataFrame;
2
use polars_error::{PolarsResult, polars_ensure};
3
use polars_io::pl_async;
4
use polars_utils::format_pl_smallstr;
5
use polars_utils::pl_str::PlSmallStr;
6
7
use super::{ComputeNode, PortState};
8
use crate::async_executor;
9
use crate::async_primitives::connector;
10
use crate::execute::StreamingExecutionState;
11
use crate::morsel::{Morsel, MorselSeq, SourceToken};
12
use crate::nodes::TaskPriority;
13
use crate::nodes::io_sinks2::components::partitioner::Partitioner;
14
use crate::nodes::io_sinks2::config::{IOSinkNodeConfig, IOSinkTarget};
15
use crate::nodes::io_sinks2::pipeline_initialization::partition_by::start_partition_sink_pipeline;
16
use crate::nodes::io_sinks2::pipeline_initialization::single_file::start_single_file_sink_pipeline;
17
use crate::pipe::PortReceiver;
18
pub mod components;
19
pub mod config;
20
pub mod pipeline_initialization;
21
pub mod writers;
22
23
pub struct IOSinkNode {
24
name: PlSmallStr,
25
state: IOSinkNodeState,
26
verbose: bool,
27
}
28
29
impl IOSinkNode {
30
pub fn new(config: impl Into<Box<IOSinkNodeConfig>>) -> Self {
31
let config = config.into();
32
33
let target_type = match &config.target {
34
IOSinkTarget::File(_) => "single-file",
35
IOSinkTarget::Partitioned(p) => match &p.partitioner {
36
Partitioner::Keyed(_) => "partition-keyed",
37
Partitioner::FileSize => "partition-file-size",
38
},
39
};
40
41
let extension = config.file_format.extension();
42
43
let name = format_pl_smallstr!("io-sink[{target_type}[{extension}]]");
44
let verbose = polars_core::config::verbose();
45
46
IOSinkNode {
47
name,
48
state: IOSinkNodeState::Uninitialized { config },
49
verbose,
50
}
51
}
52
}
53
54
impl ComputeNode for IOSinkNode {
55
fn name(&self) -> &str {
56
&self.name
57
}
58
59
fn update_state(
60
&mut self,
61
recv: &mut [crate::graph::PortState],
62
send: &mut [crate::graph::PortState],
63
execution_state: &StreamingExecutionState,
64
) -> polars_error::PolarsResult<()> {
65
assert_eq!(recv.len(), 1);
66
assert!(send.is_empty());
67
68
recv[0] = if recv[0] == PortState::Done {
69
// Ensure initialize / writes empty file for empty output.
70
self.state.initialize(&self.name, execution_state)?;
71
72
match std::mem::replace(&mut self.state, IOSinkNodeState::Finished) {
73
IOSinkNodeState::Initialized {
74
phase_channel_tx,
75
task_handle,
76
} => {
77
if self.verbose {
78
eprintln!(
79
"{}: Join on task_handle (recv PortState::Done)",
80
self.name()
81
);
82
}
83
drop(phase_channel_tx);
84
pl_async::get_runtime().block_on(task_handle)?;
85
},
86
IOSinkNodeState::Finished => {},
87
IOSinkNodeState::Uninitialized { .. } => unreachable!(),
88
};
89
90
PortState::Done
91
} else {
92
polars_ensure!(
93
!matches!(self.state, IOSinkNodeState::Finished),
94
ComputeError:
95
"unreachable: IO sink node state is 'Finished', but recv port \
96
state is not 'Done'."
97
);
98
99
PortState::Ready
100
};
101
102
Ok(())
103
}
104
105
fn spawn<'env, 's>(
106
&'env mut self,
107
scope: &'s crate::async_executor::TaskScope<'s, 'env>,
108
recv_ports: &mut [Option<crate::pipe::RecvPort<'_>>],
109
send_ports: &mut [Option<crate::pipe::SendPort<'_>>],
110
execution_state: &'s StreamingExecutionState,
111
join_handles: &mut Vec<crate::async_executor::JoinHandle<polars_error::PolarsResult<()>>>,
112
) {
113
assert_eq!(recv_ports.len(), 1);
114
assert!(send_ports.is_empty());
115
116
let phase_morsel_rx = recv_ports[0].take().unwrap().serial();
117
118
join_handles.push(scope.spawn_task(TaskPriority::Low, async move {
119
self.state.initialize(&self.name, execution_state)?;
120
121
let IOSinkNodeState::Initialized {
122
phase_channel_tx, ..
123
} = &mut self.state
124
else {
125
unreachable!()
126
};
127
128
if phase_channel_tx.send(phase_morsel_rx).await.is_err() {
129
let IOSinkNodeState::Initialized {
130
phase_channel_tx,
131
task_handle,
132
} = std::mem::replace(&mut self.state, IOSinkNodeState::Finished)
133
else {
134
unreachable!()
135
};
136
137
if self.verbose {
138
eprintln!(
139
"{}: Join on task_handle (phase_channel_tx Err)",
140
self.name()
141
);
142
}
143
144
drop(phase_channel_tx);
145
146
return Err(task_handle.await.unwrap_err());
147
}
148
149
Ok(())
150
}));
151
}
152
}
153
154
enum IOSinkNodeState {
155
Uninitialized {
156
config: Box<IOSinkNodeConfig>,
157
},
158
159
Initialized {
160
phase_channel_tx: connector::Sender<PortReceiver>,
161
/// Join handle for all background tasks.
162
task_handle: async_executor::AbortOnDropHandle<PolarsResult<()>>,
163
},
164
165
Finished,
166
}
167
168
impl IOSinkNodeState {
169
/// Initialize state if not yet initialized.
170
fn initialize(
171
&mut self,
172
node_name: &PlSmallStr,
173
execution_state: &StreamingExecutionState,
174
) -> PolarsResult<()> {
175
use IOSinkNodeState::*;
176
177
if !matches!(self, Self::Uninitialized { .. }) {
178
return Ok(());
179
}
180
181
let Uninitialized { mut config } = std::mem::replace(self, Finished) else {
182
unreachable!()
183
};
184
185
config.num_pipelines = execution_state.num_pipelines;
186
187
let (phase_channel_tx, mut phase_channel_rx) = connector::connector::<PortReceiver>();
188
let (mut multi_phase_tx, multi_phase_rx) = connector::connector();
189
190
let first_morsel = Morsel::new(
191
DataFrame::empty_with_arc_schema(config.input_schema.clone()),
192
MorselSeq::new(0),
193
SourceToken::default(),
194
);
195
196
async_executor::spawn(TaskPriority::High, async move {
197
multi_phase_tx.send(first_morsel).await.unwrap();
198
199
let mut morsel_seq: u64 = 1;
200
201
while let Ok(mut phase_rx) = phase_channel_rx.recv().await {
202
while let Ok(mut morsel) = phase_rx.recv().await {
203
morsel.set_seq(MorselSeq::new(morsel_seq));
204
morsel_seq = morsel_seq.saturating_add(1);
205
206
if multi_phase_tx.send(morsel).await.is_err() {
207
break;
208
}
209
}
210
}
211
});
212
213
let task_handle = match &config.target {
214
IOSinkTarget::File(_) => {
215
start_single_file_sink_pipeline(node_name.clone(), multi_phase_rx, *config)?
216
},
217
218
IOSinkTarget::Partitioned { .. } => {
219
start_partition_sink_pipeline(node_name, multi_phase_rx, *config, execution_state)?
220
},
221
};
222
223
*self = Initialized {
224
phase_channel_tx,
225
task_handle,
226
};
227
228
Ok(())
229
}
230
}
231
232