Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/csv/write/writer.rs
6939 views
1
use std::io::Write;
2
use std::num::NonZeroUsize;
3
4
use polars_core::POOL;
5
use polars_core::frame::DataFrame;
6
use polars_core::schema::Schema;
7
use polars_error::PolarsResult;
8
9
use super::write_impl::{write, write_bom, write_header};
10
use super::{QuoteStyle, SerializeOptions};
11
use crate::shared::SerWriter;
12
13
/// Write a DataFrame to csv.
14
///
15
/// Don't use a `Buffered` writer, the `CsvWriter` internally already buffers writes.
16
#[must_use]
17
pub struct CsvWriter<W: Write> {
18
/// File or Stream handler
19
buffer: W,
20
options: SerializeOptions,
21
header: bool,
22
bom: bool,
23
batch_size: NonZeroUsize,
24
n_threads: usize,
25
}
26
27
impl<W> SerWriter<W> for CsvWriter<W>
28
where
29
W: Write,
30
{
31
fn new(buffer: W) -> Self {
32
// 9f: all nanoseconds
33
let options = SerializeOptions {
34
time_format: Some("%T%.9f".to_string()),
35
..Default::default()
36
};
37
38
CsvWriter {
39
buffer,
40
options,
41
header: true,
42
bom: false,
43
batch_size: NonZeroUsize::new(1024).unwrap(),
44
n_threads: POOL.current_num_threads(),
45
}
46
}
47
48
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
49
if self.bom {
50
write_bom(&mut self.buffer)?;
51
}
52
let names = df
53
.get_column_names()
54
.into_iter()
55
.map(|x| x.as_str())
56
.collect::<Vec<_>>();
57
if self.header {
58
write_header(&mut self.buffer, names.as_slice(), &self.options)?;
59
}
60
write(
61
&mut self.buffer,
62
df,
63
self.batch_size.into(),
64
&self.options,
65
self.n_threads,
66
)
67
}
68
}
69
70
impl<W> CsvWriter<W>
71
where
72
W: Write,
73
{
74
/// Set whether to write UTF-8 BOM.
75
pub fn include_bom(mut self, include_bom: bool) -> Self {
76
self.bom = include_bom;
77
self
78
}
79
80
/// Set whether to write headers.
81
pub fn include_header(mut self, include_header: bool) -> Self {
82
self.header = include_header;
83
self
84
}
85
86
/// Set the CSV file's column separator as a byte character.
87
pub fn with_separator(mut self, separator: u8) -> Self {
88
self.options.separator = separator;
89
self
90
}
91
92
/// Set the batch size to use while writing the CSV.
93
pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
94
self.batch_size = batch_size;
95
self
96
}
97
98
/// Set the CSV file's date format.
99
pub fn with_date_format(mut self, format: Option<String>) -> Self {
100
if format.is_some() {
101
self.options.date_format = format;
102
}
103
self
104
}
105
106
/// Set the CSV file's time format.
107
pub fn with_time_format(mut self, format: Option<String>) -> Self {
108
if format.is_some() {
109
self.options.time_format = format;
110
}
111
self
112
}
113
114
/// Set the CSV file's datetime format.
115
pub fn with_datetime_format(mut self, format: Option<String>) -> Self {
116
if format.is_some() {
117
self.options.datetime_format = format;
118
}
119
self
120
}
121
122
/// Set the CSV file's forced scientific notation for floats.
123
pub fn with_float_scientific(mut self, scientific: Option<bool>) -> Self {
124
if scientific.is_some() {
125
self.options.float_scientific = scientific;
126
}
127
self
128
}
129
130
/// Set the CSV file's float precision.
131
pub fn with_float_precision(mut self, precision: Option<usize>) -> Self {
132
if precision.is_some() {
133
self.options.float_precision = precision;
134
}
135
self
136
}
137
138
/// Set the CSV decimal separator.
139
pub fn with_decimal_comma(mut self, decimal_comma: bool) -> Self {
140
self.options.decimal_comma = decimal_comma;
141
self
142
}
143
144
/// Set the single byte character used for quoting.
145
pub fn with_quote_char(mut self, char: u8) -> Self {
146
self.options.quote_char = char;
147
self
148
}
149
150
/// Set the CSV file's null value representation.
151
pub fn with_null_value(mut self, null_value: String) -> Self {
152
self.options.null = null_value;
153
self
154
}
155
156
/// Set the CSV file's line terminator.
157
pub fn with_line_terminator(mut self, line_terminator: String) -> Self {
158
self.options.line_terminator = line_terminator;
159
self
160
}
161
162
/// Set the CSV file's quoting behavior.
163
/// See more on [`QuoteStyle`].
164
pub fn with_quote_style(mut self, quote_style: QuoteStyle) -> Self {
165
self.options.quote_style = quote_style;
166
self
167
}
168
169
pub fn n_threads(mut self, n_threads: usize) -> Self {
170
self.n_threads = n_threads;
171
self
172
}
173
174
pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
175
let expects_bom = self.bom;
176
let expects_header = self.header;
177
Ok(BatchedWriter {
178
writer: self,
179
has_written_bom: !expects_bom,
180
has_written_header: !expects_header,
181
schema: schema.clone(),
182
})
183
}
184
}
185
186
pub struct BatchedWriter<W: Write> {
187
writer: CsvWriter<W>,
188
has_written_bom: bool,
189
has_written_header: bool,
190
schema: Schema,
191
}
192
193
impl<W: Write> BatchedWriter<W> {
194
/// Write a batch to the csv writer.
195
///
196
/// # Panics
197
/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
198
pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
199
if !self.has_written_bom {
200
self.has_written_bom = true;
201
write_bom(&mut self.writer.buffer)?;
202
}
203
204
if !self.has_written_header {
205
self.has_written_header = true;
206
let names = df
207
.get_column_names()
208
.into_iter()
209
.map(|x| x.as_str())
210
.collect::<Vec<_>>();
211
write_header(
212
&mut self.writer.buffer,
213
names.as_slice(),
214
&self.writer.options,
215
)?;
216
}
217
218
write(
219
&mut self.writer.buffer,
220
df,
221
self.writer.batch_size.into(),
222
&self.writer.options,
223
self.writer.n_threads,
224
)?;
225
Ok(())
226
}
227
228
/// Writes the header of the csv file if not done already. Returns the total size of the file.
229
pub fn finish(&mut self) -> PolarsResult<()> {
230
if !self.has_written_bom {
231
self.has_written_bom = true;
232
write_bom(&mut self.writer.buffer)?;
233
}
234
235
if !self.has_written_header {
236
self.has_written_header = true;
237
let names = self
238
.schema
239
.iter_names()
240
.map(|x| x.as_str())
241
.collect::<Vec<_>>();
242
write_header(&mut self.writer.buffer, &names, &self.writer.options)?;
243
};
244
245
Ok(())
246
}
247
}
248
249