Path: blob/main/crates/polars-io/src/csv/write/write_impl.rs
8406 views
mod serializer;12use arrow::array::NullArray;3use arrow::legacy::time_zone::Tz;4use polars_core::POOL;5use polars_core::prelude::*;6use polars_error::polars_ensure;7use polars_utils::reuse_vec::reuse_vec;8use rayon::prelude::*;9use serializer::{serializer_for, string_serializer};1011use crate::csv::write::SerializeOptions;1213type ColumnSerializer<'a> =14dyn crate::csv::write::write_impl::serializer::Serializer<'a> + Send + 'a;1516/// Writes CSV from DataFrames.17pub struct CsvSerializer {18serializers: Vec<Box<ColumnSerializer<'static>>>,19options: Arc<SerializeOptions>,20datetime_formats: Arc<[PlSmallStr]>,21time_zones: Arc<[Option<Tz>]>,22}2324impl Clone for CsvSerializer {25fn clone(&self) -> Self {26Self {27serializers: vec![],28options: self.options.clone(),29datetime_formats: self.datetime_formats.clone(),30time_zones: self.time_zones.clone(),31}32}33}3435impl CsvSerializer {36pub fn new(schema: SchemaRef, options: Arc<SerializeOptions>) -> PolarsResult<Self> {37for dtype in schema.iter_values() {38let nested = match dtype {39DataType::List(_) => true,40#[cfg(feature = "dtype-struct")]41DataType::Struct(_) => true,42#[cfg(feature = "object")]43DataType::Object(_) => {44return Err(PolarsError::ComputeError(45"csv writer does not support object dtype".into(),46));47},48_ => false,49};50polars_ensure!(51!nested,52ComputeError: "CSV format does not support nested data",53);54}5556// Check that the double quote is valid UTF-8.57polars_ensure!(58std::str::from_utf8(&[options.quote_char, options.quote_char]).is_ok(),59ComputeError: "quote char results in invalid utf-8",60);6162let (datetime_formats, time_zones): (Vec<PlSmallStr>, Vec<Option<Tz>>) = schema63.iter_values()64.map(|dtype| {65let (datetime_format_str, time_zone) = match dtype {66DataType::Datetime(TimeUnit::Milliseconds, tz) => {67let (format, tz_parsed) = match tz {68#[cfg(feature = "timezones")]69Some(tz) => (70options71.datetime_format72.as_deref()73.unwrap_or("%FT%H:%M:%S.%3f%z"),74tz.parse::<Tz>().ok(),75),76_ => (77options78.datetime_format79.as_deref()80.unwrap_or("%FT%H:%M:%S.%3f"),81None,82),83};84(format, tz_parsed)85},86DataType::Datetime(TimeUnit::Microseconds, tz) => {87let (format, tz_parsed) = match tz {88#[cfg(feature = "timezones")]89Some(tz) => (90options91.datetime_format92.as_deref()93.unwrap_or("%FT%H:%M:%S.%6f%z"),94tz.parse::<Tz>().ok(),95),96_ => (97options98.datetime_format99.as_deref()100.unwrap_or("%FT%H:%M:%S.%6f"),101None,102),103};104(format, tz_parsed)105},106DataType::Datetime(TimeUnit::Nanoseconds, tz) => {107let (format, tz_parsed) = match tz {108#[cfg(feature = "timezones")]109Some(tz) => (110options111.datetime_format112.as_deref()113.unwrap_or("%FT%H:%M:%S.%9f%z"),114tz.parse::<Tz>().ok(),115),116_ => (117options118.datetime_format119.as_deref()120.unwrap_or("%FT%H:%M:%S.%9f"),121None,122),123};124(format, tz_parsed)125},126_ => ("", None),127};128129(datetime_format_str.into(), time_zone)130})131.collect();132133Ok(Self {134serializers: vec![],135options,136datetime_formats: Arc::from_iter(datetime_formats),137time_zones: Arc::from_iter(time_zones),138})139}140141/// # Panics142/// Panics if a column has >1 chunk.143pub fn serialize_to_csv<'a>(144&'a mut self,145df: &'a DataFrame,146buffer: &mut Vec<u8>,147) -> PolarsResult<()> {148if df.height() == 0 || df.width() == 0 {149return Ok(());150}151152let options = Arc::clone(&self.options);153let options = options.as_ref();154155let mut serializers_vec = reuse_vec(std::mem::take(&mut self.serializers));156let serializers = self.build_serializers(df.columns(), &mut serializers_vec)?;157158for _ in 0..df.height() {159serializers[0].serialize(buffer, options);160for serializer in &mut serializers[1..] {161buffer.push(options.separator);162serializer.serialize(buffer, options);163}164165buffer.extend_from_slice(options.line_terminator.as_bytes());166}167168self.serializers = reuse_vec(serializers_vec);169170Ok(())171}172173/// # Panics174/// Panics if a column has >1 chunk.175fn build_serializers<'a, 'b>(176&'a mut self,177columns: &'a [Column],178serializers: &'b mut Vec<Box<ColumnSerializer<'a>>>,179) -> PolarsResult<&'b mut [Box<ColumnSerializer<'a>>]> {180serializers.clear();181serializers.reserve(columns.len());182183for (i, c) in columns.iter().enumerate() {184assert_eq!(c.n_chunks(), 1);185186serializers.push(serializer_for(187c.as_materialized_series().chunks()[0].as_ref(),188Arc::as_ref(&self.options),189c.dtype(),190self.datetime_formats[i].as_str(),191self.time_zones[i],192)?)193}194195Ok(serializers)196}197}198199pub(crate) fn write(200mut writer: impl std::io::Write,201df: &DataFrame,202chunk_size: usize,203options: Arc<SerializeOptions>,204n_threads: usize,205) -> PolarsResult<()> {206let len = df.height();207let total_rows_per_pool_iter = n_threads * chunk_size;208209let mut n_rows_finished = 0;210211let csv_serializer = CsvSerializer::new(Arc::clone(df.schema()), options)?;212213let mut buffers: Vec<(Vec<u8>, CsvSerializer)> = (0..n_threads)214.map(|_| (Vec::new(), csv_serializer.clone()))215.collect();216while n_rows_finished < len {217let buf_writer =218|thread_no, write_buffer: &mut Vec<_>, csv_serializer: &mut CsvSerializer| {219let thread_offset = thread_no * chunk_size;220let total_offset = n_rows_finished + thread_offset;221let mut df = df.slice(total_offset as i64, chunk_size);222// the `series.iter` needs rechunked series.223// we don't do this on the whole as this probably needs much less rechunking224// so will be faster.225// and allows writing `pl.concat([df] * 100, rechunk=False).write_csv()` as the rechunk226// would go OOM227df.rechunk_mut();228229csv_serializer.serialize_to_csv(&df, write_buffer)?;230231Ok(())232};233234if n_threads > 1 {235POOL.install(|| {236buffers237.par_iter_mut()238.enumerate()239.map(|(i, (w, s))| buf_writer(i, w, s))240.collect::<PolarsResult<()>>()241})?;242} else {243let (w, s) = &mut buffers[0];244buf_writer(0, w, s)?;245}246247for (write_buffer, _) in &mut buffers {248writer.write_all(write_buffer)?;249write_buffer.clear();250}251252n_rows_finished += total_rows_per_pool_iter;253}254Ok(())255}256257/// Writes a CSV header to `writer`.258pub fn csv_header(names: &[&str], options: &SerializeOptions) -> PolarsResult<Vec<u8>> {259let mut header = Vec::new();260261// A hack, but it works for this case.262let fake_arr = NullArray::new(ArrowDataType::Null, 0);263let mut names_serializer = string_serializer(264|iter: &mut std::slice::Iter<&str>| iter.next().copied(),265options,266|_| names.iter(),267&fake_arr,268);269for i in 0..names.len() {270names_serializer.serialize(&mut header, options);271if i != names.len() - 1 {272header.push(options.separator);273}274}275header.extend_from_slice(options.line_terminator.as_bytes());276Ok(header)277}278279pub const UTF8_BOM: [u8; 3] = [0xEF, 0xBB, 0xBF];280281282