Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/partition/parted.rs
6939 views
1
use std::pin::Pin;
2
use std::sync::{Arc, Mutex};
3
4
use futures::StreamExt;
5
use futures::stream::FuturesUnordered;
6
use polars_core::config;
7
use polars_core::prelude::row_encode::_get_rows_encoded_ca_unordered;
8
use polars_core::prelude::{AnyValue, Column, IntoColumn, PlHashSet};
9
use polars_core::schema::SchemaRef;
10
use polars_error::PolarsResult;
11
use polars_plan::dsl::{PartitionTargetCallback, SinkFinishCallback, SinkOptions};
12
use polars_utils::pl_str::PlSmallStr;
13
use polars_utils::plpath::PlPath;
14
use polars_utils::relaxed_cell::RelaxedCell;
15
16
use super::{CreateNewSinkFn, PerPartitionSortBy};
17
use crate::async_executor::{AbortOnDropHandle, spawn};
18
use crate::async_primitives::connector::Receiver;
19
use crate::async_primitives::distributor_channel::distributor_channel;
20
use crate::execute::StreamingExecutionState;
21
use crate::nodes::io_sinks::metrics::WriteMetrics;
22
use crate::nodes::io_sinks::partition::{SinkSender, open_new_sink};
23
use crate::nodes::io_sinks::phase::PhaseOutcome;
24
use crate::nodes::io_sinks::{SinkInputPort, SinkNode};
25
use crate::nodes::{JoinHandle, Morsel, TaskPriority};
26
27
pub struct PartedPartitionSinkNode {
28
input_schema: SchemaRef,
29
// This is not be the same as the input_schema, e.g. when include_key=false then this will not
30
// include the keys columns.
31
sink_input_schema: SchemaRef,
32
33
key_cols: Arc<[PlSmallStr]>,
34
base_path: Arc<PlPath>,
35
file_path_cb: Option<PartitionTargetCallback>,
36
create_new: CreateNewSinkFn,
37
ext: PlSmallStr,
38
39
sink_options: SinkOptions,
40
include_key: bool,
41
42
/// The number of tasks that get used to wait for finished files. If you are write large enough
43
/// files (i.e. they would be formed by multiple morsels) this should almost always be 1. But
44
/// if you are writing many small files, this should scan up to allow for your threads to
45
/// saturate. In any sane situation this should never go past the amount of threads you have
46
/// available.
47
///
48
/// This is somewhat proportional to the amount of files open at any given point.
49
num_retire_tasks: usize,
50
51
per_partition_sort_by: Option<PerPartitionSortBy>,
52
partition_metrics: Arc<Mutex<Vec<Vec<WriteMetrics>>>>,
53
finish_callback: Option<SinkFinishCallback>,
54
}
55
56
const DEFAULT_RETIRE_TASKS: usize = 1;
57
impl PartedPartitionSinkNode {
58
#[allow(clippy::too_many_arguments)]
59
pub fn new(
60
input_schema: SchemaRef,
61
key_cols: Arc<[PlSmallStr]>,
62
base_path: Arc<PlPath>,
63
file_path_cb: Option<PartitionTargetCallback>,
64
create_new: CreateNewSinkFn,
65
ext: PlSmallStr,
66
sink_options: SinkOptions,
67
include_key: bool,
68
per_partition_sort_by: Option<PerPartitionSortBy>,
69
finish_callback: Option<SinkFinishCallback>,
70
) -> Self {
71
assert!(!key_cols.is_empty());
72
73
let mut sink_input_schema = input_schema.clone();
74
if !include_key {
75
let keys_col_hm = PlHashSet::from_iter(key_cols.iter().map(|s| s.as_str()));
76
sink_input_schema = Arc::new(
77
sink_input_schema
78
.try_project(
79
input_schema
80
.iter_names()
81
.filter(|n| !keys_col_hm.contains(n.as_str()))
82
.cloned(),
83
)
84
.unwrap(),
85
);
86
}
87
88
let num_retire_tasks =
89
std::env::var("POLARS_PARTED_SINK_RETIRE_TASKS").map_or(DEFAULT_RETIRE_TASKS, |v| {
90
v.parse::<usize>()
91
.expect("unable to parse POLARS_PARTED_SINK_RETIRE_TASKS")
92
.max(1)
93
});
94
95
Self {
96
input_schema,
97
sink_input_schema,
98
key_cols,
99
base_path,
100
file_path_cb,
101
create_new,
102
ext,
103
sink_options,
104
num_retire_tasks,
105
include_key,
106
per_partition_sort_by,
107
partition_metrics: Arc::new(Mutex::new(Vec::with_capacity(num_retire_tasks))),
108
finish_callback,
109
}
110
}
111
}
112
113
impl SinkNode for PartedPartitionSinkNode {
114
fn name(&self) -> &str {
115
"partition-parted-sink"
116
}
117
118
fn is_sink_input_parallel(&self) -> bool {
119
false
120
}
121
fn do_maintain_order(&self) -> bool {
122
self.sink_options.maintain_order
123
}
124
125
fn initialize(&mut self, _state: &StreamingExecutionState) -> PolarsResult<()> {
126
Ok(())
127
}
128
129
fn spawn_sink(
130
&mut self,
131
mut recv_port_recv: Receiver<(PhaseOutcome, SinkInputPort)>,
132
state: &StreamingExecutionState,
133
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
134
) {
135
// Main Task -> Retire Tasks
136
let (mut retire_tx, retire_rxs) = distributor_channel(self.num_retire_tasks, 1);
137
138
// Whether an error has been observed in the retire tasks.
139
let has_error_occurred = Arc::new(RelaxedCell::from(false));
140
141
// Main Task.
142
//
143
// Takes the morsels coming in and passes them to underlying sink.
144
let task_state = state.clone();
145
let sink_input_schema = self.sink_input_schema.clone();
146
let key_cols = self.key_cols.clone();
147
let base_path = self.base_path.clone();
148
let file_path_cb = self.file_path_cb.clone();
149
let create_new = self.create_new.clone();
150
let ext = self.ext.clone();
151
let include_key = self.include_key;
152
let retire_error = has_error_occurred.clone();
153
let per_partition_sort_by = self.per_partition_sort_by.clone();
154
join_handles.push(spawn(TaskPriority::High, async move {
155
struct CurrentSink {
156
sender: SinkSender,
157
join_handles: FuturesUnordered<AbortOnDropHandle<PolarsResult<()>>>,
158
value: AnyValue<'static>,
159
keys: Vec<Column>,
160
node: Box<dyn SinkNode + Send>,
161
}
162
163
let verbose = config::verbose();
164
let mut file_idx = 0;
165
let mut current_sink_opt: Option<CurrentSink> = None;
166
let mut lengths = Vec::new();
167
168
while let Ok((outcome, recv_port)) = recv_port_recv.recv().await {
169
let mut recv_port = recv_port.serial();
170
while let Ok(morsel) = recv_port.recv().await {
171
let (mut df, seq, source_token, consume_token) = morsel.into_inner();
172
if df.height() == 0 {
173
continue;
174
}
175
176
let mut c = if key_cols.len() == 1 {
177
let idx = df.try_get_column_index(&key_cols[0])?;
178
df.get_columns()[idx].clone()
179
} else {
180
let columns = df.select_columns(key_cols.iter().cloned())?;
181
_get_rows_encoded_ca_unordered(PlSmallStr::EMPTY, &columns)?.into_column()
182
};
183
184
lengths.clear();
185
polars_ops::series::rle_lengths(&c, &mut lengths)?;
186
187
for &length in &lengths {
188
if retire_error.load() {
189
return Ok(());
190
}
191
192
let mut parted_df;
193
let parted_c;
194
(parted_df, df) = df.split_at(length as i64);
195
(parted_c, c) = c.split_at(length as i64);
196
197
let value = parted_c.get(0).unwrap().into_static();
198
199
// If we have a sink open that does not match the value, close it.
200
if let Some(current_sink) = current_sink_opt.take() {
201
if current_sink.value != value {
202
drop(current_sink.sender);
203
if retire_tx
204
.send((
205
current_sink.join_handles,
206
current_sink.node,
207
current_sink.keys,
208
))
209
.await
210
.is_err()
211
{
212
return Ok(());
213
};
214
} else {
215
current_sink_opt = Some(current_sink);
216
}
217
}
218
219
let current_sink = match current_sink_opt.as_mut() {
220
Some(c) => c,
221
None => {
222
let keys = parted_df.select_columns(key_cols.iter().cloned())?;
223
let result = open_new_sink(
224
base_path.as_ref().as_ref(),
225
file_path_cb.as_ref(),
226
super::default_by_key_file_path_cb,
227
file_idx,
228
file_idx,
229
0,
230
Some(keys.as_slice()),
231
&create_new,
232
sink_input_schema.clone(),
233
"parted",
234
ext.as_str(),
235
verbose,
236
&task_state,
237
per_partition_sort_by.as_ref(),
238
)
239
.await?;
240
file_idx += 1;
241
let Some((join_handles, sender, node)) = result else {
242
return Ok(());
243
};
244
245
current_sink_opt.insert(CurrentSink {
246
sender,
247
value,
248
join_handles,
249
node,
250
keys,
251
})
252
},
253
};
254
255
if !include_key {
256
parted_df = parted_df.drop_many(key_cols.iter().cloned());
257
}
258
259
if current_sink
260
.sender
261
.send(Morsel::new(parted_df, seq, source_token.clone()))
262
.await
263
.is_err()
264
{
265
return Ok(());
266
};
267
}
268
269
drop(consume_token);
270
}
271
272
outcome.stopped();
273
}
274
275
if let Some(current_sink) = current_sink_opt.take() {
276
drop(current_sink.sender);
277
if retire_tx
278
.send((
279
current_sink.join_handles,
280
current_sink.node,
281
current_sink.keys,
282
))
283
.await
284
.is_err()
285
{
286
return Ok(());
287
};
288
}
289
290
Ok(())
291
}));
292
293
// Retire Tasks.
294
//
295
// If a file is finished someone needs to wait for the sink tasks to finish. Since we don't
296
// want to block the main task, we do it in separate tasks. Usually this is only 1 task,
297
// but it can be scaled up using an environment variable.
298
let has_error_occurred = &has_error_occurred;
299
join_handles.extend(retire_rxs.into_iter().map(|mut retire_rx| {
300
let global_partition_metrics = self.partition_metrics.clone();
301
let has_error_occurred = has_error_occurred.clone();
302
let task_state = state.clone();
303
304
spawn(TaskPriority::High, async move {
305
let mut partition_metrics = Vec::new();
306
307
while let Ok((mut join_handles, mut node, keys)) = retire_rx.recv().await {
308
while let Some(ret) = join_handles.next().await {
309
ret.inspect_err(|_| {
310
has_error_occurred.store(true);
311
})?;
312
}
313
if let Some(mut metrics) = node.get_metrics()? {
314
metrics.keys = Some(
315
keys.into_iter()
316
.map(|c| c.get(0).unwrap().into_static())
317
.collect(),
318
);
319
partition_metrics.push(metrics);
320
}
321
if let Some(finalize) = node.finalize(&task_state) {
322
finalize.await?;
323
}
324
}
325
326
{
327
let mut global_written_partitions = global_partition_metrics.lock().unwrap();
328
global_written_partitions.push(partition_metrics);
329
}
330
331
Ok(())
332
})
333
}));
334
}
335
336
fn finalize(
337
&mut self,
338
_state: &StreamingExecutionState,
339
) -> Option<Pin<Box<dyn Future<Output = PolarsResult<()>> + Send>>> {
340
let finish_callback = self.finish_callback.clone();
341
let partition_metrics = self.partition_metrics.clone();
342
let sink_input_schema = self.sink_input_schema.clone();
343
let input_schema = self.input_schema.clone();
344
let key_cols = self.key_cols.clone();
345
346
Some(Box::pin(async move {
347
if let Some(finish_callback) = &finish_callback {
348
let mut written_partitions = partition_metrics.lock().unwrap();
349
let written_partitions =
350
std::mem::take::<Vec<Vec<WriteMetrics>>>(written_partitions.as_mut())
351
.into_iter()
352
.flatten()
353
.collect();
354
let df = WriteMetrics::collapse_to_df(
355
written_partitions,
356
&sink_input_schema,
357
Some(&input_schema.try_project(key_cols.iter()).unwrap()),
358
);
359
finish_callback.call(df)?;
360
}
361
Ok(())
362
}))
363
}
364
}
365
366