Path: blob/main/crates/polars-io/src/csv/write/writer.rs
6939 views
use std::io::Write;1use std::num::NonZeroUsize;23use polars_core::POOL;4use polars_core::frame::DataFrame;5use polars_core::schema::Schema;6use polars_error::PolarsResult;78use super::write_impl::{write, write_bom, write_header};9use super::{QuoteStyle, SerializeOptions};10use crate::shared::SerWriter;1112/// Write a DataFrame to csv.13///14/// Don't use a `Buffered` writer, the `CsvWriter` internally already buffers writes.15#[must_use]16pub struct CsvWriter<W: Write> {17/// File or Stream handler18buffer: W,19options: SerializeOptions,20header: bool,21bom: bool,22batch_size: NonZeroUsize,23n_threads: usize,24}2526impl<W> SerWriter<W> for CsvWriter<W>27where28W: Write,29{30fn new(buffer: W) -> Self {31// 9f: all nanoseconds32let options = SerializeOptions {33time_format: Some("%T%.9f".to_string()),34..Default::default()35};3637CsvWriter {38buffer,39options,40header: true,41bom: false,42batch_size: NonZeroUsize::new(1024).unwrap(),43n_threads: POOL.current_num_threads(),44}45}4647fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {48if self.bom {49write_bom(&mut self.buffer)?;50}51let names = df52.get_column_names()53.into_iter()54.map(|x| x.as_str())55.collect::<Vec<_>>();56if self.header {57write_header(&mut self.buffer, names.as_slice(), &self.options)?;58}59write(60&mut self.buffer,61df,62self.batch_size.into(),63&self.options,64self.n_threads,65)66}67}6869impl<W> CsvWriter<W>70where71W: Write,72{73/// Set whether to write UTF-8 BOM.74pub fn include_bom(mut self, include_bom: bool) -> Self {75self.bom = include_bom;76self77}7879/// Set whether to write headers.80pub fn include_header(mut self, include_header: bool) -> Self {81self.header = include_header;82self83}8485/// Set the CSV file's column separator as a byte character.86pub fn with_separator(mut self, separator: u8) -> Self {87self.options.separator = separator;88self89}9091/// Set the batch size to use while writing the CSV.92pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {93self.batch_size = batch_size;94self95}9697/// Set the CSV file's date format.98pub fn with_date_format(mut self, format: Option<String>) -> Self {99if format.is_some() {100self.options.date_format = format;101}102self103}104105/// Set the CSV file's time format.106pub fn with_time_format(mut self, format: Option<String>) -> Self {107if format.is_some() {108self.options.time_format = format;109}110self111}112113/// Set the CSV file's datetime format.114pub fn with_datetime_format(mut self, format: Option<String>) -> Self {115if format.is_some() {116self.options.datetime_format = format;117}118self119}120121/// Set the CSV file's forced scientific notation for floats.122pub fn with_float_scientific(mut self, scientific: Option<bool>) -> Self {123if scientific.is_some() {124self.options.float_scientific = scientific;125}126self127}128129/// Set the CSV file's float precision.130pub fn with_float_precision(mut self, precision: Option<usize>) -> Self {131if precision.is_some() {132self.options.float_precision = precision;133}134self135}136137/// Set the CSV decimal separator.138pub fn with_decimal_comma(mut self, decimal_comma: bool) -> Self {139self.options.decimal_comma = decimal_comma;140self141}142143/// Set the single byte character used for quoting.144pub fn with_quote_char(mut self, char: u8) -> Self {145self.options.quote_char = char;146self147}148149/// Set the CSV file's null value representation.150pub fn with_null_value(mut self, null_value: String) -> Self {151self.options.null = null_value;152self153}154155/// Set the CSV file's line terminator.156pub fn with_line_terminator(mut self, line_terminator: String) -> Self {157self.options.line_terminator = line_terminator;158self159}160161/// Set the CSV file's quoting behavior.162/// See more on [`QuoteStyle`].163pub fn with_quote_style(mut self, quote_style: QuoteStyle) -> Self {164self.options.quote_style = quote_style;165self166}167168pub fn n_threads(mut self, n_threads: usize) -> Self {169self.n_threads = n_threads;170self171}172173pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {174let expects_bom = self.bom;175let expects_header = self.header;176Ok(BatchedWriter {177writer: self,178has_written_bom: !expects_bom,179has_written_header: !expects_header,180schema: schema.clone(),181})182}183}184185pub struct BatchedWriter<W: Write> {186writer: CsvWriter<W>,187has_written_bom: bool,188has_written_header: bool,189schema: Schema,190}191192impl<W: Write> BatchedWriter<W> {193/// Write a batch to the csv writer.194///195/// # Panics196/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.197pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {198if !self.has_written_bom {199self.has_written_bom = true;200write_bom(&mut self.writer.buffer)?;201}202203if !self.has_written_header {204self.has_written_header = true;205let names = df206.get_column_names()207.into_iter()208.map(|x| x.as_str())209.collect::<Vec<_>>();210write_header(211&mut self.writer.buffer,212names.as_slice(),213&self.writer.options,214)?;215}216217write(218&mut self.writer.buffer,219df,220self.writer.batch_size.into(),221&self.writer.options,222self.writer.n_threads,223)?;224Ok(())225}226227/// Writes the header of the csv file if not done already. Returns the total size of the file.228pub fn finish(&mut self) -> PolarsResult<()> {229if !self.has_written_bom {230self.has_written_bom = true;231write_bom(&mut self.writer.buffer)?;232}233234if !self.has_written_header {235self.has_written_header = true;236let names = self237.schema238.iter_names()239.map(|x| x.as_str())240.collect::<Vec<_>>();241write_header(&mut self.writer.buffer, &names, &self.writer.options)?;242};243244Ok(())245}246}247248249