Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/csv.rs
6939 views
1
use std::cmp::Reverse;
2
use std::pin::Pin;
3
4
use polars_core::frame::DataFrame;
5
use polars_core::schema::SchemaRef;
6
use polars_error::PolarsResult;
7
use polars_io::SerWriter;
8
use polars_io::cloud::CloudOptions;
9
use polars_io::prelude::{CsvWriter, CsvWriterOptions};
10
use polars_plan::dsl::{SinkOptions, SinkTarget};
11
use polars_utils::priority::Priority;
12
13
use super::{SinkInputPort, SinkNode};
14
use crate::async_executor::spawn;
15
use crate::async_primitives::connector::{Receiver, Sender, connector};
16
use crate::async_primitives::linearizer::Linearizer;
17
use crate::execute::StreamingExecutionState;
18
use crate::morsel::MorselSeq;
19
use crate::nodes::io_sinks::parallelize_receive_task;
20
use crate::nodes::io_sinks::phase::PhaseOutcome;
21
use crate::nodes::{JoinHandle, TaskPriority};
22
23
type IOSend = Linearizer<Priority<Reverse<MorselSeq>, Vec<u8>>>;
24
25
pub struct CsvSinkNode {
26
target: SinkTarget,
27
schema: SchemaRef,
28
sink_options: SinkOptions,
29
write_options: CsvWriterOptions,
30
cloud_options: Option<CloudOptions>,
31
32
io_tx: Option<Sender<IOSend>>,
33
io_task: Option<tokio_util::task::AbortOnDropHandle<PolarsResult<()>>>,
34
}
35
impl CsvSinkNode {
36
pub fn new(
37
target: SinkTarget,
38
schema: SchemaRef,
39
sink_options: SinkOptions,
40
write_options: CsvWriterOptions,
41
cloud_options: Option<CloudOptions>,
42
) -> Self {
43
Self {
44
target,
45
schema,
46
sink_options,
47
write_options,
48
cloud_options,
49
50
io_tx: None,
51
io_task: None,
52
}
53
}
54
}
55
56
impl SinkNode for CsvSinkNode {
57
fn name(&self) -> &str {
58
"csv-sink"
59
}
60
61
fn is_sink_input_parallel(&self) -> bool {
62
true
63
}
64
65
fn initialize(&mut self, _state: &StreamingExecutionState) -> PolarsResult<()> {
66
let (io_tx, mut io_rx) = connector::<Linearizer<Priority<Reverse<MorselSeq>, Vec<u8>>>>();
67
68
// IO task.
69
//
70
// Task that will actually do write to the target file.
71
let target = self.target.clone();
72
let sink_options = self.sink_options.clone();
73
let schema = self.schema.clone();
74
let options = self.write_options.clone();
75
let cloud_options = self.cloud_options.clone();
76
let io_task = polars_io::pl_async::get_runtime().spawn(async move {
77
use tokio::io::AsyncWriteExt;
78
79
let mut file = target
80
.open_into_writeable_async(&sink_options, cloud_options.as_ref())
81
.await?;
82
83
// Write the header
84
if options.include_header || options.include_bom {
85
let mut writer = CsvWriter::new(&mut *file)
86
.include_bom(options.include_bom)
87
.include_header(options.include_header)
88
.with_separator(options.serialize_options.separator)
89
.with_line_terminator(options.serialize_options.line_terminator.clone())
90
.with_quote_char(options.serialize_options.quote_char)
91
.with_datetime_format(options.serialize_options.datetime_format.clone())
92
.with_date_format(options.serialize_options.date_format.clone())
93
.with_time_format(options.serialize_options.time_format.clone())
94
.with_float_scientific(options.serialize_options.float_scientific)
95
.with_float_precision(options.serialize_options.float_precision)
96
.with_decimal_comma(options.serialize_options.decimal_comma)
97
.with_null_value(options.serialize_options.null.clone())
98
.with_quote_style(options.serialize_options.quote_style)
99
.n_threads(1) // Disable rayon parallelism
100
.batched(&schema)?;
101
writer.write_batch(&DataFrame::empty_with_schema(&schema))?;
102
}
103
104
let mut file = file.try_into_async_writeable()?;
105
106
while let Ok(mut lin_rx) = io_rx.recv().await {
107
while let Some(Priority(_, buffer)) = lin_rx.get().await {
108
file.write_all(&buffer).await?;
109
}
110
}
111
112
file.sync_on_close(sink_options.sync_on_close).await?;
113
file.close().await?;
114
115
PolarsResult::Ok(())
116
});
117
118
self.io_tx = Some(io_tx);
119
self.io_task = Some(tokio_util::task::AbortOnDropHandle::new(io_task));
120
121
Ok(())
122
}
123
124
fn spawn_sink(
125
&mut self,
126
recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,
127
state: &StreamingExecutionState,
128
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
129
) {
130
let io_tx = self
131
.io_tx
132
.take()
133
.expect("not initialized / spawn called more than once");
134
let pass_rxs = parallelize_receive_task(
135
join_handles,
136
recv_port_rx,
137
state.num_pipelines,
138
self.sink_options.maintain_order,
139
io_tx,
140
);
141
142
// 16MB
143
const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;
144
145
// Encode task.
146
//
147
// Task encodes the columns into their corresponding CSV encoding.
148
join_handles.extend(pass_rxs.into_iter().map(|mut pass_rx| {
149
let schema = self.schema.clone();
150
let options = self.write_options.clone();
151
152
spawn(TaskPriority::High, async move {
153
// Amortize the allocations over time. If we see that we need to do way larger
154
// allocations, we adjust to that over time.
155
let mut allocation_size = DEFAULT_ALLOCATION_SIZE;
156
let options = options.clone();
157
158
while let Ok((mut rx, mut lin_tx)) = pass_rx.recv().await {
159
while let Ok(morsel) = rx.recv().await {
160
let (df, seq, _, consume_token) = morsel.into_inner();
161
162
let mut buffer = Vec::with_capacity(allocation_size);
163
let mut writer = CsvWriter::new(&mut buffer)
164
.include_bom(false) // Handled once in the IO task.
165
.include_header(false) // Handled once in the IO task.
166
.with_separator(options.serialize_options.separator)
167
.with_line_terminator(options.serialize_options.line_terminator.clone())
168
.with_quote_char(options.serialize_options.quote_char)
169
.with_datetime_format(options.serialize_options.datetime_format.clone())
170
.with_date_format(options.serialize_options.date_format.clone())
171
.with_time_format(options.serialize_options.time_format.clone())
172
.with_float_scientific(options.serialize_options.float_scientific)
173
.with_float_precision(options.serialize_options.float_precision)
174
.with_decimal_comma(options.serialize_options.decimal_comma)
175
.with_null_value(options.serialize_options.null.clone())
176
.with_quote_style(options.serialize_options.quote_style)
177
.n_threads(1) // Disable rayon parallelism
178
.batched(&schema)?;
179
180
writer.write_batch(&df)?;
181
182
allocation_size = allocation_size.max(buffer.len());
183
if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
184
return Ok(());
185
}
186
drop(consume_token); // Keep the consume_token until here to increase the
187
// backpressure.
188
}
189
}
190
191
PolarsResult::Ok(())
192
})
193
}));
194
}
195
196
fn finalize(
197
&mut self,
198
_state: &StreamingExecutionState,
199
) -> Option<Pin<Box<dyn Future<Output = PolarsResult<()>> + Send>>> {
200
// If we were never spawned, we need to make sure that the `tx` is taken. This signals to
201
// the IO task that it is done and prevents deadlocks.
202
drop(self.io_tx.take());
203
204
let io_task = self
205
.io_task
206
.take()
207
.expect("not initialized / finish called more than once");
208
209
// Wait for the IO task to complete.
210
Some(Box::pin(async move {
211
io_task
212
.await
213
.unwrap_or_else(|e| Err(std::io::Error::from(e).into()))
214
}))
215
}
216
}
217
218