Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/mod.rs
6939 views
1
use std::pin::Pin;
2
use std::sync::{Arc, LazyLock, Mutex};
3
4
use futures::StreamExt;
5
use futures::stream::FuturesUnordered;
6
use polars_core::config;
7
use polars_core::frame::DataFrame;
8
use polars_core::prelude::Column;
9
use polars_core::schema::SchemaRef;
10
use polars_error::PolarsResult;
11
12
use self::metrics::WriteMetrics;
13
use super::{ComputeNode, JoinHandle, Morsel, PortState, RecvPort, SendPort, TaskScope};
14
use crate::async_executor::{AbortOnDropHandle, spawn};
15
use crate::async_primitives::connector::{Receiver, Sender, connector};
16
use crate::async_primitives::distributor_channel;
17
use crate::async_primitives::linearizer::{Inserter, Linearizer};
18
use crate::async_primitives::wait_group::WaitGroup;
19
use crate::execute::StreamingExecutionState;
20
use crate::nodes::TaskPriority;
21
22
mod metrics;
23
mod phase;
24
use phase::PhaseOutcome;
25
26
#[cfg(feature = "csv")]
27
pub mod csv;
28
#[cfg(feature = "ipc")]
29
pub mod ipc;
30
#[cfg(feature = "json")]
31
pub mod json;
32
#[cfg(feature = "parquet")]
33
pub mod parquet;
34
pub mod partition;
35
36
// This needs to be low to increase the backpressure.
37
static DEFAULT_SINK_LINEARIZER_BUFFER_SIZE: LazyLock<usize> = LazyLock::new(|| {
38
std::env::var("POLARS_DEFAULT_SINK_LINEARIZER_BUFFER_SIZE")
39
.map(|x| x.parse().unwrap())
40
.unwrap_or(1)
41
});
42
43
static DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE: LazyLock<usize> = LazyLock::new(|| {
44
std::env::var("POLARS_DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE")
45
.map(|x| x.parse().unwrap())
46
.unwrap_or(1)
47
});
48
49
pub enum SinkInputPort {
50
Serial(Receiver<Morsel>),
51
Parallel(Vec<Receiver<Morsel>>),
52
}
53
54
impl SinkInputPort {
55
pub fn serial(self) -> Receiver<Morsel> {
56
match self {
57
Self::Serial(s) => s,
58
_ => panic!(),
59
}
60
}
61
62
pub fn parallel(self) -> Vec<Receiver<Morsel>> {
63
match self {
64
Self::Parallel(s) => s,
65
_ => panic!(),
66
}
67
}
68
}
69
70
/// Spawn a task that linearizes and buffers morsels until a given a maximum chunk size is reached
71
/// and then distributes the columns amongst worker tasks.
72
fn buffer_and_distribute_columns_task(
73
mut recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,
74
mut dist_tx: distributor_channel::Sender<(usize, usize, Column)>,
75
chunk_size: usize,
76
schema: SchemaRef,
77
metrics: Arc<Mutex<Option<WriteMetrics>>>,
78
) -> JoinHandle<PolarsResult<()>> {
79
spawn(TaskPriority::High, async move {
80
let mut seq = 0usize;
81
let mut buffer = DataFrame::empty_with_schema(schema.as_ref());
82
83
let mut metrics_ = metrics.lock().unwrap().take();
84
while let Ok((outcome, rx)) = recv_port_rx.recv().await {
85
let mut rx = rx.serial();
86
while let Ok(morsel) = rx.recv().await {
87
let (df, _, _, consume_token) = morsel.into_inner();
88
89
if let Some(metrics) = metrics_.as_mut() {
90
metrics.append(&df)?;
91
}
92
93
// @NOTE: This also performs schema validation.
94
buffer.vstack_mut(&df)?;
95
96
while buffer.height() >= chunk_size {
97
let df;
98
(df, buffer) = buffer.split_at(buffer.height().min(chunk_size) as i64);
99
100
for (i, column) in df.take_columns().into_iter().enumerate() {
101
if dist_tx.send((seq, i, column)).await.is_err() {
102
return Ok(());
103
}
104
}
105
seq += 1;
106
}
107
drop(consume_token); // Increase the backpressure. Only free up a pipeline when the
108
// morsel has started encoding in its entirety. This still
109
// allows for parallelism of Morsels, but prevents large
110
// bunches of Morsels from stacking up here.
111
}
112
113
outcome.stopped();
114
}
115
if let Some(metrics_) = metrics_ {
116
*metrics.lock().unwrap() = Some(metrics_);
117
}
118
119
// Don't write an empty row group at the end.
120
if buffer.is_empty() {
121
return Ok(());
122
}
123
124
// Flush the remaining rows.
125
assert!(buffer.height() <= chunk_size);
126
for (i, column) in buffer.take_columns().into_iter().enumerate() {
127
if dist_tx.send((seq, i, column)).await.is_err() {
128
return Ok(());
129
}
130
}
131
132
PolarsResult::Ok(())
133
})
134
}
135
136
#[allow(clippy::type_complexity)]
137
pub fn parallelize_receive_task<T: Ord + Send + 'static>(
138
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
139
mut recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,
140
num_pipelines: usize,
141
maintain_order: bool,
142
mut io_tx: Sender<Linearizer<T>>,
143
) -> Vec<Receiver<(Receiver<Morsel>, Inserter<T>)>> {
144
// Phase Handling Task -> Encode Tasks.
145
let (mut pass_txs, pass_rxs) = (0..num_pipelines)
146
.map(|_| connector())
147
.collect::<(Vec<_>, Vec<_>)>();
148
149
join_handles.push(spawn(TaskPriority::High, async move {
150
while let Ok((outcome, port_rxs)) = recv_port_rx.recv().await {
151
let port_rxs = port_rxs.parallel();
152
let (lin_rx, lin_txs) = Linearizer::<T>::new_with_maintain_order(
153
num_pipelines,
154
*DEFAULT_SINK_LINEARIZER_BUFFER_SIZE,
155
maintain_order,
156
);
157
158
for ((pass_tx, port_rx), lin_tx) in pass_txs.iter_mut().zip(port_rxs).zip(lin_txs) {
159
if pass_tx.send((port_rx, lin_tx)).await.is_err() {
160
return Ok(());
161
}
162
}
163
if io_tx.send(lin_rx).await.is_err() {
164
return Ok(());
165
}
166
167
outcome.stopped();
168
}
169
170
Ok(())
171
}));
172
173
pass_rxs
174
}
175
176
pub trait SinkNode {
177
fn name(&self) -> &str;
178
179
fn is_sink_input_parallel(&self) -> bool;
180
181
fn do_maintain_order(&self) -> bool {
182
true
183
}
184
185
fn spawn_sink(
186
&mut self,
187
recv_ports_recv: Receiver<(PhaseOutcome, SinkInputPort)>,
188
state: &StreamingExecutionState,
189
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
190
);
191
192
/// Callback that gets called once before the sink is spawned.
193
fn initialize(&mut self, state: &StreamingExecutionState) -> PolarsResult<()> {
194
_ = state;
195
Ok(())
196
}
197
198
/// Callback for when the query has finished successfully.
199
///
200
/// This should only be called when the writing is finished and all the join handles have been
201
/// awaited.
202
fn finalize(
203
&mut self,
204
state: &StreamingExecutionState,
205
) -> Option<Pin<Box<dyn Future<Output = PolarsResult<()>> + Send>>> {
206
_ = state;
207
None
208
}
209
210
/// Fetch metrics for a specific sink.
211
///
212
/// This should only be called when the writing is finished and all the join handles have been
213
/// awaited.
214
fn get_metrics(&self) -> PolarsResult<Option<WriteMetrics>> {
215
Ok(None)
216
}
217
}
218
219
/// The state needed to manage a spawned [`SinkNode`].
220
struct StartedSinkComputeNode {
221
input_send: Sender<(PhaseOutcome, SinkInputPort)>,
222
join_handles: FuturesUnordered<AbortOnDropHandle<PolarsResult<()>>>,
223
}
224
225
/// A [`ComputeNode`] to wrap a [`SinkNode`].
226
pub struct SinkComputeNode {
227
sink: Box<dyn SinkNode + Send>,
228
started: Option<StartedSinkComputeNode>,
229
state: SinkState,
230
}
231
232
enum SinkState {
233
/// Initial state of a [`SinkComputeNode`].
234
///
235
/// This still requires `sink.initialize` to be called on the `SinkNode`.
236
Uninitialized,
237
238
/// Active state of a [`SinkComputeNode`].
239
///
240
/// When finished, the `sink.finalize` method should be called.
241
Initialized,
242
243
/// Final state for the [`SinkComputeNode`].
244
///
245
/// Receive port is Done and [`SinkNode`] is finalized.
246
Finished,
247
}
248
249
impl SinkComputeNode {
250
pub fn new(sink: Box<dyn SinkNode + Send>) -> Self {
251
Self {
252
sink,
253
started: None,
254
state: SinkState::Uninitialized,
255
}
256
}
257
}
258
259
impl<T: SinkNode + Send + 'static> From<T> for SinkComputeNode {
260
fn from(value: T) -> Self {
261
Self::new(Box::new(value))
262
}
263
}
264
265
impl ComputeNode for SinkComputeNode {
266
fn name(&self) -> &str {
267
self.sink.name()
268
}
269
270
fn update_state(
271
&mut self,
272
recv: &mut [PortState],
273
_send: &mut [PortState],
274
state: &StreamingExecutionState,
275
) -> PolarsResult<()> {
276
// Ensure that initialize is only called once.
277
if matches!(self.state, SinkState::Uninitialized) {
278
self.sink.initialize(state)?;
279
self.state = SinkState::Initialized;
280
}
281
282
if recv[0] != PortState::Done {
283
recv[0] = PortState::Ready;
284
}
285
286
if recv[0] == PortState::Done && !matches!(self.state, SinkState::Finished) {
287
let started = self.started.take();
288
let finalize = self.sink.finalize(state);
289
290
state.spawn_subphase_task(async move {
291
// We need to join on all started tasks before finalizing the node because the
292
// unfinished tasks might still need access to the node.
293
//
294
// Note, that if the sink never received any data, this `started` might be None.
295
// However, we do still need to finalize the node otherwise no file will be
296
// created.
297
if let Some(mut started) = started {
298
drop(started.input_send);
299
// Either the task finished or some error occurred.
300
while let Some(ret) = started.join_handles.next().await {
301
ret?;
302
}
303
}
304
305
if let Some(finalize) = finalize {
306
finalize.await?;
307
}
308
309
PolarsResult::Ok(())
310
});
311
312
self.state = SinkState::Finished;
313
}
314
315
Ok(())
316
}
317
318
fn spawn<'env, 's>(
319
&'env mut self,
320
scope: &'s TaskScope<'s, 'env>,
321
recv_ports: &mut [Option<RecvPort<'_>>],
322
send_ports: &mut [Option<SendPort<'_>>],
323
state: &'s StreamingExecutionState,
324
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
325
) {
326
assert_eq!(recv_ports.len(), 1);
327
assert!(send_ports.is_empty());
328
329
let name = self.name().to_string();
330
let started = self.started.get_or_insert_with(|| {
331
let (tx, rx) = connector();
332
let mut join_handles = Vec::new();
333
334
self.sink.spawn_sink(rx, state, &mut join_handles);
335
// One of the tasks might throw an error. In which case, we need to cancel all
336
// handles and find the error.
337
let join_handles: FuturesUnordered<_> =
338
join_handles.drain(..).map(AbortOnDropHandle::new).collect();
339
340
StartedSinkComputeNode {
341
input_send: tx,
342
join_handles,
343
}
344
});
345
346
let wait_group = WaitGroup::default();
347
let recv = recv_ports[0].take().unwrap();
348
let sink_input = if self.sink.is_sink_input_parallel() {
349
SinkInputPort::Parallel(recv.parallel())
350
} else {
351
SinkInputPort::Serial(recv.serial_with_maintain_order(self.sink.do_maintain_order()))
352
};
353
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
354
let (token, outcome) = PhaseOutcome::new_shared_wait(wait_group.token());
355
if started.input_send.send((outcome, sink_input)).await.is_ok() {
356
// Wait for the phase to finish.
357
wait_group.wait().await;
358
if !token.did_finish() {
359
return Ok(());
360
}
361
362
if config::verbose() {
363
eprintln!("[{name}]: Last data sent.");
364
}
365
}
366
367
// Either the task finished or some error occurred.
368
while let Some(ret) = started.join_handles.next().await {
369
ret?;
370
}
371
372
Ok(())
373
}));
374
}
375
376
fn get_output(&mut self) -> PolarsResult<Option<DataFrame>> {
377
Ok(None)
378
}
379
}
380
381