Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/parquet.rs
6939 views
1
use std::cmp::Reverse;
2
use std::io::BufWriter;
3
use std::pin::Pin;
4
use std::sync::{Arc, Mutex};
5
6
use polars_core::prelude::{ArrowSchema, CompatLevel};
7
use polars_core::schema::SchemaRef;
8
use polars_error::PolarsResult;
9
use polars_io::cloud::CloudOptions;
10
use polars_io::parquet::write::BatchedWriter;
11
use polars_io::prelude::{ParquetWriteOptions, get_column_write_options};
12
use polars_io::schema_to_arrow_checked;
13
use polars_parquet::parquet::error::ParquetResult;
14
use polars_parquet::read::ParquetError;
15
use polars_parquet::write::{
16
ColumnWriteOptions, CompressedPage, Compressor, FileWriter, SchemaDescriptor, Version,
17
WriteOptions, array_to_columns, to_parquet_schema,
18
};
19
use polars_plan::dsl::{SinkOptions, SinkTarget};
20
use polars_utils::priority::Priority;
21
use polars_utils::relaxed_cell::RelaxedCell;
22
23
use super::metrics::WriteMetrics;
24
use super::{
25
DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE, SinkInputPort,
26
SinkNode, buffer_and_distribute_columns_task,
27
};
28
use crate::async_executor::spawn;
29
use crate::async_primitives::connector::{Receiver, connector};
30
use crate::async_primitives::distributor_channel::distributor_channel;
31
use crate::async_primitives::linearizer::Linearizer;
32
use crate::execute::StreamingExecutionState;
33
use crate::nodes::io_sinks::phase::PhaseOutcome;
34
use crate::nodes::{JoinHandle, TaskPriority};
35
36
pub struct ParquetSinkNode {
37
target: SinkTarget,
38
39
input_schema: SchemaRef,
40
sink_options: SinkOptions,
41
write_options: ParquetWriteOptions,
42
43
parquet_schema: SchemaDescriptor,
44
arrow_schema: ArrowSchema,
45
column_options: Vec<ColumnWriteOptions>,
46
cloud_options: Option<CloudOptions>,
47
48
file_size: Arc<RelaxedCell<u64>>,
49
metrics: Arc<Mutex<Option<WriteMetrics>>>,
50
51
io_tx: Option<crate::async_primitives::connector::Sender<Vec<Vec<CompressedPage>>>>,
52
io_task: Option<tokio_util::task::AbortOnDropHandle<PolarsResult<()>>>,
53
}
54
55
impl ParquetSinkNode {
56
pub fn new(
57
input_schema: SchemaRef,
58
target: SinkTarget,
59
sink_options: SinkOptions,
60
write_options: &ParquetWriteOptions,
61
cloud_options: Option<CloudOptions>,
62
collect_metrics: bool,
63
) -> PolarsResult<Self> {
64
let schema = schema_to_arrow_checked(&input_schema, CompatLevel::newest(), "parquet")?;
65
let column_options: Vec<ColumnWriteOptions> =
66
get_column_write_options(&schema, &write_options.field_overwrites);
67
let parquet_schema = to_parquet_schema(&schema, &column_options)?;
68
let metrics =
69
Arc::new(Mutex::new(collect_metrics.then(|| {
70
WriteMetrics::new(target.to_display_string(), &input_schema)
71
})));
72
73
Ok(Self {
74
target,
75
76
input_schema,
77
sink_options,
78
write_options: write_options.clone(),
79
80
parquet_schema,
81
arrow_schema: schema,
82
column_options,
83
cloud_options,
84
85
file_size: Arc::default(),
86
metrics,
87
88
io_tx: None,
89
io_task: None,
90
})
91
}
92
}
93
94
// 512 ^ 2
95
const DEFAULT_ROW_GROUP_SIZE: usize = 1 << 18;
96
97
impl SinkNode for ParquetSinkNode {
98
fn name(&self) -> &str {
99
"parquet-sink"
100
}
101
102
fn is_sink_input_parallel(&self) -> bool {
103
false
104
}
105
fn do_maintain_order(&self) -> bool {
106
self.sink_options.maintain_order
107
}
108
109
fn initialize(&mut self, _state: &StreamingExecutionState) -> PolarsResult<()> {
110
// Collect task -> IO task
111
let (io_tx, mut io_rx) = connector::<Vec<Vec<CompressedPage>>>();
112
113
// IO task.
114
//
115
// Task that will actually do write to the target file. It is important that this is only
116
// spawned once.
117
let target = self.target.clone();
118
let sink_options = self.sink_options.clone();
119
let cloud_options = self.cloud_options.clone();
120
let write_options = self.write_options.clone();
121
let arrow_schema = self.arrow_schema.clone();
122
let parquet_schema = self.parquet_schema.clone();
123
let column_options = self.column_options.clone();
124
let output_file_size = self.file_size.clone();
125
let io_task = polars_io::pl_async::get_runtime().spawn(async move {
126
let mut file = target
127
.open_into_writeable_async(&sink_options, cloud_options.as_ref())
128
.await?;
129
130
let writer = BufWriter::new(&mut *file);
131
let key_value_metadata = write_options.key_value_metadata;
132
let write_options = WriteOptions {
133
statistics: write_options.statistics,
134
compression: write_options.compression.into(),
135
version: Version::V1,
136
data_page_size: write_options.data_page_size,
137
};
138
let file_writer = Mutex::new(FileWriter::new_with_parquet_schema(
139
writer,
140
arrow_schema,
141
parquet_schema,
142
write_options,
143
));
144
let mut writer = BatchedWriter::new(
145
file_writer,
146
column_options,
147
write_options,
148
false,
149
key_value_metadata,
150
);
151
152
let num_parquet_columns = writer.parquet_schema().leaves().len();
153
while let Ok(current_row_group) = io_rx.recv().await {
154
// @TODO: At the moment this is a sync write, this is not ideal because we can only
155
// have so many blocking threads in the tokio threadpool.
156
assert_eq!(current_row_group.len(), num_parquet_columns);
157
writer.write_row_group(&current_row_group)?;
158
}
159
160
let file_size = writer.finish()?;
161
drop(writer);
162
163
file.sync_on_close(sink_options.sync_on_close)?;
164
file.close()?;
165
166
output_file_size.store(file_size);
167
PolarsResult::Ok(())
168
});
169
170
self.io_tx = Some(io_tx);
171
self.io_task = Some(tokio_util::task::AbortOnDropHandle::new(io_task));
172
173
Ok(())
174
}
175
176
fn spawn_sink(
177
&mut self,
178
recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,
179
state: &StreamingExecutionState,
180
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
181
) {
182
// Collect task -> IO task
183
let mut io_tx = self
184
.io_tx
185
.take()
186
.expect("not initialized / spawn called more than once");
187
// Buffer task -> Encode tasks
188
let (dist_tx, dist_rxs) =
189
distributor_channel(state.num_pipelines, *DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE);
190
// Encode tasks -> Collect task
191
let (mut lin_rx, lin_txs) =
192
Linearizer::new(state.num_pipelines, *DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);
193
194
let write_options = &self.write_options;
195
196
let options = WriteOptions {
197
statistics: write_options.statistics,
198
compression: write_options.compression.into(),
199
version: Version::V1,
200
data_page_size: write_options.data_page_size,
201
};
202
203
// Buffer task.
204
join_handles.push(buffer_and_distribute_columns_task(
205
recv_port_rx,
206
dist_tx,
207
write_options
208
.row_group_size
209
.unwrap_or(DEFAULT_ROW_GROUP_SIZE),
210
self.input_schema.clone(),
211
self.metrics.clone(),
212
));
213
214
// Encode task.
215
//
216
// Task encodes the columns into their corresponding Parquet encoding.
217
join_handles.extend(
218
dist_rxs
219
.into_iter()
220
.zip(lin_txs)
221
.map(|(mut dist_rx, mut lin_tx)| {
222
let parquet_schema = self.parquet_schema.clone();
223
let column_options = self.column_options.clone();
224
225
spawn(TaskPriority::High, async move {
226
while let Ok((rg_idx, col_idx, column)) = dist_rx.recv().await {
227
let type_ = &parquet_schema.fields()[col_idx];
228
let column_options = &column_options[col_idx];
229
230
let array = column.as_materialized_series().rechunk();
231
let array = array.to_arrow(0, CompatLevel::newest());
232
233
// @TODO: This causes all structs fields to be handled on a single thread. It
234
// would be preferable to split the encoding among multiple threads.
235
236
// @NOTE: Since one Polars column might contain multiple Parquet columns (when
237
// it has a struct datatype), we return a Vec<Vec<CompressedPage>>.
238
239
// Array -> Parquet pages.
240
let encoded_columns =
241
array_to_columns(array, type_.clone(), column_options, options)?;
242
243
// Compress the pages.
244
let compressed_pages = encoded_columns
245
.into_iter()
246
.map(|encoded_pages| {
247
Compressor::new_from_vec(
248
encoded_pages.map(|result| {
249
result.map_err(|e| {
250
ParquetError::FeatureNotSupported(format!(
251
"reraised in polars: {e}",
252
))
253
})
254
}),
255
options.compression,
256
vec![],
257
)
258
.collect::<ParquetResult<Vec<_>>>()
259
})
260
.collect::<ParquetResult<Vec<_>>>()?;
261
262
if lin_tx
263
.insert(Priority(Reverse(rg_idx), (col_idx, compressed_pages)))
264
.await
265
.is_err()
266
{
267
return Ok(());
268
}
269
}
270
271
PolarsResult::Ok(())
272
})
273
}),
274
);
275
276
// Collect Task.
277
//
278
// Collects all the encoded data and packs it together for the IO task to write it.
279
let input_schema = self.input_schema.clone();
280
let num_parquet_columns = self.parquet_schema.leaves().len();
281
join_handles.push(spawn(TaskPriority::High, async move {
282
struct Current {
283
seq: usize,
284
num_columns_seen: usize,
285
columns: Vec<Option<Vec<Vec<CompressedPage>>>>,
286
}
287
288
let mut current = Current {
289
seq: 0,
290
num_columns_seen: 0,
291
columns: (0..input_schema.len()).map(|_| None).collect(),
292
};
293
294
// Linearize from all the Encoder tasks.
295
while let Some(Priority(Reverse(seq), (i, compressed_pages))) = lin_rx.get().await {
296
if current.num_columns_seen == 0 {
297
current.seq = seq;
298
}
299
300
debug_assert_eq!(current.seq, seq);
301
debug_assert!(current.columns[i].is_none());
302
current.columns[i] = Some(compressed_pages);
303
current.num_columns_seen += 1;
304
305
if current.num_columns_seen == input_schema.len() {
306
// @Optimize: Keep track of these sizes so we can correctly preallocate
307
// them.
308
let mut current_row_group: Vec<Vec<CompressedPage>> =
309
Vec::with_capacity(num_parquet_columns);
310
for column in current.columns.iter_mut() {
311
current_row_group.extend(column.take().unwrap());
312
}
313
314
if io_tx.send(current_row_group).await.is_err() {
315
return Ok(());
316
}
317
current.num_columns_seen = 0;
318
}
319
}
320
321
Ok(())
322
}));
323
}
324
325
fn get_metrics(&self) -> PolarsResult<Option<WriteMetrics>> {
326
let file_size = self.file_size.load();
327
let metrics = self.metrics.lock().unwrap().take();
328
329
Ok(metrics.map(|mut m| {
330
m.file_size = file_size;
331
m
332
}))
333
}
334
335
fn finalize(
336
&mut self,
337
_state: &StreamingExecutionState,
338
) -> Option<Pin<Box<dyn Future<Output = PolarsResult<()>> + Send>>> {
339
// If we were never spawned, we need to make sure that the `tx` is taken. This signals to
340
// the IO task that it is done and prevents deadlocks.
341
drop(self.io_tx.take());
342
343
let io_task = self
344
.io_task
345
.take()
346
.expect("not initialized / finish called more than once");
347
348
// Wait for the IO task to complete.
349
Some(Box::pin(async move {
350
io_task
351
.await
352
.unwrap_or_else(|e| Err(std::io::Error::from(e).into()))
353
}))
354
}
355
}
356
357