Path: blob/main/crates/polars-io/src/csv/write/writer.rs
8430 views
use std::io::Write;1use std::num::NonZeroUsize;2use std::sync::Arc;34use polars_core::POOL;5use polars_core::frame::DataFrame;6use polars_core::schema::Schema;7use polars_error::PolarsResult;8use polars_utils::pl_str::PlSmallStr;910use super::write_impl::{UTF8_BOM, csv_header, write};11use super::{QuoteStyle, SerializeOptions};12use crate::shared::SerWriter;1314/// Write a DataFrame to csv.15///16/// Don't use a `Buffered` writer, the `CsvWriter` internally already buffers writes.17#[must_use]18pub struct CsvWriter<W: Write> {19/// File or Stream handler20buffer: W,21options: Arc<SerializeOptions>,22header: bool,23bom: bool,24batch_size: NonZeroUsize,25n_threads: usize,26}2728impl<W> SerWriter<W> for CsvWriter<W>29where30W: Write,31{32fn new(buffer: W) -> Self {33let options = SerializeOptions::default();3435CsvWriter {36buffer,37options: options.into(),38header: true,39bom: false,40batch_size: NonZeroUsize::new(1024).unwrap(),41n_threads: POOL.current_num_threads(),42}43}4445fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {46if self.bom {47self.buffer.write_all(&UTF8_BOM)?;48}49let names = df50.get_column_names()51.into_iter()52.map(|x| x.as_str())53.collect::<Vec<_>>();54if self.header {55self.buffer56.write_all(&csv_header(names.as_slice(), &self.options)?)?;57}58write(59&mut self.buffer,60df,61self.batch_size.into(),62self.options.clone(),63self.n_threads,64)65}66}6768impl<W> CsvWriter<W>69where70W: Write,71{72fn options_mut(&mut self) -> &mut SerializeOptions {73Arc::make_mut(&mut self.options)74}7576/// Set whether to write UTF-8 UTF8_BOM.77pub fn include_bom(mut self, include_bom: bool) -> Self {78self.bom = include_bom;79self80}8182/// Set whether to write headers.83pub fn include_header(mut self, include_header: bool) -> Self {84self.header = include_header;85self86}8788/// Set the CSV file's column separator as a byte character.89pub fn with_separator(mut self, separator: u8) -> Self {90self.options_mut().separator = separator;91self92}9394/// Set the batch size to use while writing the CSV.95pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {96self.batch_size = batch_size;97self98}99100/// Set the CSV file's date format.101pub fn with_date_format(mut self, format: Option<PlSmallStr>) -> Self {102if format.is_some() {103self.options_mut().date_format = format;104}105self106}107108/// Set the CSV file's time format.109pub fn with_time_format(mut self, format: Option<PlSmallStr>) -> Self {110if format.is_some() {111self.options_mut().time_format = format;112}113self114}115116/// Set the CSV file's datetime format.117pub fn with_datetime_format(mut self, format: Option<PlSmallStr>) -> Self {118if format.is_some() {119self.options_mut().datetime_format = format;120}121self122}123124/// Set the CSV file's forced scientific notation for floats.125pub fn with_float_scientific(mut self, scientific: Option<bool>) -> Self {126if scientific.is_some() {127self.options_mut().float_scientific = scientific;128}129self130}131132/// Set the CSV file's float precision.133pub fn with_float_precision(mut self, precision: Option<usize>) -> Self {134if precision.is_some() {135self.options_mut().float_precision = precision;136}137self138}139140/// Set the CSV decimal separator.141pub fn with_decimal_comma(mut self, decimal_comma: bool) -> Self {142self.options_mut().decimal_comma = decimal_comma;143self144}145146/// Set the single byte character used for quoting.147pub fn with_quote_char(mut self, char: u8) -> Self {148self.options_mut().quote_char = char;149self150}151152/// Set the CSV file's null value representation.153pub fn with_null_value(mut self, null_value: PlSmallStr) -> Self {154self.options_mut().null = null_value;155self156}157158/// Set the CSV file's line terminator.159pub fn with_line_terminator(mut self, line_terminator: PlSmallStr) -> Self {160self.options_mut().line_terminator = line_terminator;161self162}163164/// Set the CSV file's quoting behavior.165/// See more on [`QuoteStyle`].166pub fn with_quote_style(mut self, quote_style: QuoteStyle) -> Self {167self.options_mut().quote_style = quote_style;168self169}170171pub fn n_threads(mut self, n_threads: usize) -> Self {172self.n_threads = n_threads;173self174}175176pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {177let expects_bom = self.bom;178let expects_header = self.header;179Ok(BatchedWriter {180writer: self,181has_written_bom: !expects_bom,182has_written_header: !expects_header,183schema: schema.clone(),184})185}186}187188pub struct BatchedWriter<W: Write> {189writer: CsvWriter<W>,190has_written_bom: bool,191has_written_header: bool,192schema: Schema,193}194195impl<W: Write> BatchedWriter<W> {196/// Write a batch to the csv writer.197///198/// # Panics199/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.200pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {201if !self.has_written_bom {202self.has_written_bom = true;203self.writer.buffer.write_all(&UTF8_BOM)?;204}205206if !self.has_written_header {207self.has_written_header = true;208let names = df209.get_column_names()210.into_iter()211.map(|x| x.as_str())212.collect::<Vec<_>>();213214self.writer215.buffer216.write_all(&csv_header(names.as_slice(), &self.writer.options)?)?;217}218219write(220&mut self.writer.buffer,221df,222self.writer.batch_size.into(),223self.writer.options.clone(),224self.writer.n_threads,225)?;226Ok(())227}228229/// Writes the header of the csv file if not done already. Returns the total size of the file.230pub fn finish(&mut self) -> PolarsResult<()> {231if !self.has_written_bom {232self.has_written_bom = true;233self.writer.buffer.write_all(&UTF8_BOM)?;234}235236if !self.has_written_header {237self.has_written_header = true;238let names = self239.schema240.iter_names()241.map(|x| x.as_str())242.collect::<Vec<_>>();243244self.writer245.buffer246.write_all(&csv_header(&names, &self.writer.options)?)?;247};248249Ok(())250}251}252253254