Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/writers/csv/mod.rs
8479 views
1
use std::sync::Arc;
2
3
use polars_core::config;
4
use polars_core::schema::SchemaRef;
5
use polars_error::PolarsResult;
6
use polars_io::pl_async;
7
use polars_io::prelude::{CsvSerializer, CsvWriterOptions};
8
use polars_utils::index::NonZeroIdxSize;
9
10
use crate::async_executor::{self, TaskPriority};
11
use crate::async_primitives::connector;
12
use crate::nodes::io_sinks::components::sink_morsel::{SinkMorsel, SinkMorselPermit};
13
use crate::nodes::io_sinks::components::size::{
14
NonZeroRowCountAndSize, RowCountAndSize, TakeableRowsProvider,
15
};
16
use crate::nodes::io_sinks::writers::interface::{
17
FileOpenTaskHandle, FileWriterStarter, ideal_sink_morsel_size_env,
18
};
19
use crate::utils::tokio_handle_ext;
20
21
mod io_writer;
22
mod morsel_serializer;
23
24
pub struct CsvWriterStarter {
25
pub options: Arc<CsvWriterOptions>,
26
/// `Mutex` is to handle `dyn ColumnSerializer` not being `Sync`.
27
pub base_serializer: std::sync::Mutex<CsvSerializer>,
28
pub schema: SchemaRef,
29
pub initialized_state: std::sync::Mutex<Option<InitializedState>>,
30
}
31
32
#[derive(Clone)]
33
pub struct InitializedState {
34
pub ideal_morsel_size: NonZeroRowCountAndSize,
35
pub base_allocation_size: usize,
36
}
37
38
impl CsvWriterStarter {
39
fn initialized_state(&self) -> InitializedState {
40
let mut initialized_state = self.initialized_state.lock().unwrap();
41
42
if initialized_state.is_none() {
43
let (env_num_rows, env_num_bytes) = ideal_sink_morsel_size_env();
44
45
let ideal_morsel_size = RowCountAndSize {
46
num_rows: env_num_rows.unwrap_or(25 * 1024),
47
num_bytes: env_num_bytes.unwrap_or(8 * 1024 * 1024),
48
};
49
50
let serialized_row_size_estimate = u64::saturating_mul(self.schema.len() as _, 25);
51
52
let base_allocation_size: usize = u64::min(
53
64 * 1024 * 1024,
54
u64::min(
55
ideal_morsel_size.num_bytes.div_ceil(2).saturating_mul(5),
56
u64::saturating_mul(
57
serialized_row_size_estimate,
58
ideal_morsel_size.num_rows as _,
59
),
60
),
61
) as _;
62
63
if config::verbose() {
64
eprintln!("[CsvWriterStarter]: base_allocation_size: {base_allocation_size}")
65
}
66
67
let ideal_morsel_size = NonZeroRowCountAndSize::new(ideal_morsel_size).unwrap();
68
69
*initialized_state = Some(InitializedState {
70
ideal_morsel_size,
71
base_allocation_size,
72
})
73
}
74
75
initialized_state.clone().unwrap()
76
}
77
}
78
79
impl FileWriterStarter for CsvWriterStarter {
80
fn writer_name(&self) -> &str {
81
"csv"
82
}
83
84
fn takeable_rows_provider(&self) -> TakeableRowsProvider {
85
TakeableRowsProvider {
86
max_size: self.initialized_state().ideal_morsel_size,
87
byte_size_min_rows: NonZeroIdxSize::new(256).unwrap(),
88
allow_non_max_size: true,
89
}
90
}
91
92
fn start_file_writer(
93
&self,
94
morsel_rx: connector::Receiver<SinkMorsel>,
95
file: FileOpenTaskHandle,
96
num_pipelines: std::num::NonZeroUsize,
97
) -> PolarsResult<async_executor::JoinHandle<PolarsResult<()>>> {
98
let (filled_serializer_tx, filled_serializer_rx) = tokio::sync::mpsc::channel::<(
99
async_executor::AbortOnDropHandle<PolarsResult<morsel_serializer::MorselSerializer>>,
100
SinkMorselPermit,
101
)>(num_pipelines.get());
102
103
let max_serializers = num_pipelines.get();
104
let (reuse_serializer_tx, reuse_serializer_rx) =
105
tokio::sync::mpsc::channel::<morsel_serializer::MorselSerializer>(max_serializers);
106
107
let io_handle = tokio_handle_ext::AbortOnDropHandle(
108
pl_async::get_runtime().spawn(
109
io_writer::IOWriter {
110
file,
111
filled_serializer_rx,
112
reuse_serializer_tx,
113
schema: Arc::clone(&self.schema),
114
options: Arc::clone(&self.options),
115
}
116
.run(),
117
),
118
);
119
120
let base_csv_serializer = { self.base_serializer.lock().unwrap().clone() };
121
let base_allocation_size = self.initialized_state().base_allocation_size;
122
123
let serializer_handle = async_executor::spawn(
124
TaskPriority::High,
125
morsel_serializer::MorselSerializerPipeline {
126
morsel_rx,
127
filled_serializer_tx,
128
reuse_serializer_rx,
129
base_csv_serializer,
130
base_allocation_size,
131
max_serializers,
132
}
133
.run(),
134
);
135
136
Ok(async_executor::spawn(TaskPriority::Low, async move {
137
io_handle.await.unwrap()?;
138
serializer_handle.await;
139
Ok(())
140
}))
141
}
142
}
143
144