Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/partition/mod.rs
6939 views
1
use std::sync::Arc;
2
3
use futures::StreamExt;
4
use futures::stream::FuturesUnordered;
5
use polars_core::prelude::{Column, DataType, SortMultipleOptions};
6
use polars_core::scalar::Scalar;
7
use polars_core::schema::SchemaRef;
8
use polars_error::PolarsResult;
9
use polars_io::cloud::CloudOptions;
10
use polars_plan::dsl::{
11
FileType, PartitionTargetCallback, PartitionTargetCallbackResult, PartitionTargetContext,
12
SinkOptions, SinkTarget,
13
};
14
use polars_utils::format_pl_smallstr;
15
use polars_utils::plpath::PlPathRef;
16
17
use super::{DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE, SinkInputPort, SinkNode};
18
use crate::async_executor::{AbortOnDropHandle, spawn};
19
use crate::async_primitives::wait_group::WaitGroup;
20
use crate::async_primitives::{connector, distributor_channel};
21
use crate::execute::StreamingExecutionState;
22
use crate::expression::StreamExpr;
23
use crate::morsel::{MorselSeq, SourceToken};
24
use crate::nodes::io_sinks::phase::PhaseOutcome;
25
use crate::nodes::{Morsel, TaskPriority};
26
27
pub mod by_key;
28
pub mod max_size;
29
pub mod parted;
30
31
#[derive(Clone)]
32
pub struct PerPartitionSortBy {
33
// Invariant: all vecs have the same length.
34
pub selectors: Vec<StreamExpr>,
35
pub descending: Vec<bool>,
36
pub nulls_last: Vec<bool>,
37
pub maintain_order: bool,
38
}
39
40
pub type CreateNewSinkFn =
41
Arc<dyn Send + Sync + Fn(SchemaRef, SinkTarget) -> PolarsResult<Box<dyn SinkNode + Send>>>;
42
43
pub fn get_create_new_fn(
44
file_type: FileType,
45
sink_options: SinkOptions,
46
cloud_options: Option<CloudOptions>,
47
collect_metrics: bool,
48
) -> CreateNewSinkFn {
49
match file_type {
50
#[cfg(feature = "ipc")]
51
FileType::Ipc(ipc_writer_options) => Arc::new(move |input_schema, target| {
52
let sink = Box::new(super::ipc::IpcSinkNode::new(
53
input_schema,
54
target,
55
sink_options.clone(),
56
ipc_writer_options,
57
cloud_options.clone(),
58
)) as Box<dyn SinkNode + Send>;
59
Ok(sink)
60
}) as _,
61
#[cfg(feature = "json")]
62
FileType::Json(_ndjson_writer_options) => Arc::new(move |_input_schema, target| {
63
let sink = Box::new(super::json::NDJsonSinkNode::new(
64
target,
65
sink_options.clone(),
66
cloud_options.clone(),
67
)) as Box<dyn SinkNode + Send>;
68
Ok(sink)
69
}) as _,
70
#[cfg(feature = "parquet")]
71
FileType::Parquet(parquet_writer_options) => {
72
Arc::new(move |input_schema, target: SinkTarget| {
73
let sink = Box::new(super::parquet::ParquetSinkNode::new(
74
input_schema,
75
target,
76
sink_options.clone(),
77
&parquet_writer_options,
78
cloud_options.clone(),
79
collect_metrics,
80
)?) as Box<dyn SinkNode + Send>;
81
Ok(sink)
82
}) as _
83
},
84
#[cfg(feature = "csv")]
85
FileType::Csv(csv_writer_options) => Arc::new(move |input_schema, target| {
86
let sink = Box::new(super::csv::CsvSinkNode::new(
87
target,
88
input_schema,
89
sink_options.clone(),
90
csv_writer_options.clone(),
91
cloud_options.clone(),
92
)) as Box<dyn SinkNode + Send>;
93
Ok(sink)
94
}) as _,
95
#[cfg(not(any(
96
feature = "csv",
97
feature = "parquet",
98
feature = "json",
99
feature = "ipc"
100
)))]
101
_ => {
102
panic!("activate source feature")
103
},
104
}
105
}
106
107
enum SinkSender {
108
Connector(connector::Sender<Morsel>),
109
Distributor(distributor_channel::Sender<Morsel>),
110
}
111
112
impl SinkSender {
113
pub async fn send(&mut self, morsel: Morsel) -> Result<(), Morsel> {
114
match self {
115
SinkSender::Connector(sender) => sender.send(morsel).await,
116
SinkSender::Distributor(sender) => sender.send(morsel).await,
117
}
118
}
119
}
120
121
fn default_by_key_file_path_cb(
122
ext: &str,
123
_file_idx: usize,
124
_part_idx: usize,
125
in_part_idx: usize,
126
columns: Option<&[Column]>,
127
separator: char,
128
) -> PolarsResult<String> {
129
use std::fmt::Write;
130
131
let columns = columns.unwrap();
132
assert!(!columns.is_empty());
133
134
let mut file_path = String::new();
135
for c in columns {
136
let name = c.name();
137
let value = c.head(Some(1)).strict_cast(&DataType::String)?;
138
let value = value.str().unwrap();
139
let value = value
140
.get(0)
141
.unwrap_or("__HIVE_DEFAULT_PARTITION__")
142
.as_bytes();
143
let value = percent_encoding::percent_encode(value, polars_io::utils::URL_ENCODE_CHAR_SET);
144
write!(&mut file_path, "{name}={value}").unwrap();
145
file_path.push(separator);
146
}
147
write!(&mut file_path, "{in_part_idx}.{ext}").unwrap();
148
149
Ok(file_path)
150
}
151
152
type FilePathCallback =
153
fn(&str, usize, usize, usize, Option<&[Column]>, char) -> PolarsResult<String>;
154
155
#[allow(clippy::too_many_arguments)]
156
async fn open_new_sink(
157
base_path: PlPathRef<'_>,
158
file_path_cb: Option<&PartitionTargetCallback>,
159
default_file_path_cb: FilePathCallback,
160
file_idx: usize,
161
part_idx: usize,
162
in_part_idx: usize,
163
keys: Option<&[Column]>,
164
create_new_sink: &CreateNewSinkFn,
165
sink_input_schema: SchemaRef,
166
partition_name: &'static str,
167
ext: &str,
168
verbose: bool,
169
state: &StreamingExecutionState,
170
per_partition_sort_by: Option<&PerPartitionSortBy>,
171
) -> PolarsResult<
172
Option<(
173
FuturesUnordered<AbortOnDropHandle<PolarsResult<()>>>,
174
SinkSender,
175
Box<dyn SinkNode + Send>,
176
)>,
177
> {
178
let separator = '/'; // note: accepted by both Windows and Linux
179
let file_path = default_file_path_cb(ext, file_idx, part_idx, in_part_idx, keys, separator)?;
180
let path = base_path.join(file_path.as_str());
181
182
// If the user provided their own callback, modify the path to that.
183
let target = if let Some(file_path_cb) = file_path_cb {
184
let keys = keys.map_or(Vec::new(), |keys| {
185
keys.iter()
186
.map(|k| polars_plan::dsl::PartitionTargetContextKey {
187
name: k.name().clone(),
188
raw_value: Scalar::new(k.dtype().clone(), k.get(0).unwrap().into_static()),
189
})
190
.collect()
191
});
192
193
let target = file_path_cb.call(PartitionTargetContext {
194
file_idx,
195
part_idx,
196
in_part_idx,
197
keys,
198
file_path,
199
full_path: path,
200
})?;
201
match target {
202
// Offset the given path by the base_path.
203
PartitionTargetCallbackResult::Str(p) => SinkTarget::Path(base_path.join(p)),
204
PartitionTargetCallbackResult::Dyn(t) => SinkTarget::Dyn(t),
205
}
206
} else {
207
SinkTarget::Path(path)
208
};
209
210
if verbose {
211
match &target {
212
SinkTarget::Path(p) => eprintln!(
213
"[partition[{partition_name}]]: Start on new file '{}'",
214
p.display(),
215
),
216
SinkTarget::Dyn(_) => eprintln!("[partition[{partition_name}]]: Start on new file",),
217
}
218
}
219
220
let mut node = (create_new_sink)(sink_input_schema.clone(), target)?;
221
let mut join_handles = Vec::new();
222
let (sink_input, mut sender) = if node.is_sink_input_parallel() {
223
let (tx, dist_rxs) = distributor_channel::distributor_channel(
224
state.num_pipelines,
225
*DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE,
226
);
227
let (txs, rxs) = (0..state.num_pipelines)
228
.map(|_| connector::connector())
229
.collect::<(Vec<_>, Vec<_>)>();
230
join_handles.extend(dist_rxs.into_iter().zip(txs).map(|(mut dist_rx, mut tx)| {
231
spawn(TaskPriority::High, async move {
232
while let Ok(morsel) = dist_rx.recv().await {
233
if tx.send(morsel).await.is_err() {
234
break;
235
}
236
}
237
Ok(())
238
})
239
}));
240
241
(SinkInputPort::Parallel(rxs), SinkSender::Distributor(tx))
242
} else {
243
let (tx, rx) = connector::connector();
244
(SinkInputPort::Serial(rx), SinkSender::Connector(tx))
245
};
246
247
// Handle sorting per partition.
248
if let Some(per_partition_sort_by) = per_partition_sort_by {
249
let num_selectors = per_partition_sort_by.selectors.len();
250
let (tx, mut rx) = connector::connector();
251
252
let state = state.in_memory_exec_state.split();
253
let selectors = per_partition_sort_by.selectors.clone();
254
let descending = per_partition_sort_by.descending.clone();
255
let nulls_last = per_partition_sort_by.nulls_last.clone();
256
let maintain_order = per_partition_sort_by.maintain_order;
257
258
// Tell the partitioning sink to send stuff here instead.
259
let mut old_sender = std::mem::replace(&mut sender, SinkSender::Connector(tx));
260
261
// This all happens in a single thread per partition. Acceptable for now as the main
262
// usecase here is writing many partitions, not the best idea for the future.
263
join_handles.push(spawn(TaskPriority::High, async move {
264
// Gather all morsels for this partition. We expect at least one morsel per partition.
265
let Ok(morsel) = rx.recv().await else {
266
return Ok(());
267
};
268
let mut df = morsel.into_df();
269
while let Ok(next_morsel) = rx.recv().await {
270
df.vstack_mut_owned(next_morsel.into_df())?;
271
}
272
273
let mut names = Vec::with_capacity(num_selectors);
274
for (i, s) in selectors.into_iter().enumerate() {
275
// @NOTE: This evaluation cannot be done as chunks come in since it might contain
276
// non-elementwise expressions.
277
let c = s.evaluate(&df, &state).await?;
278
let name = format_pl_smallstr!("__POLARS_PART_SORT_COL{i}");
279
names.push(name.clone());
280
df.with_column(c.with_name(name))?;
281
}
282
df.sort_in_place(
283
names,
284
SortMultipleOptions {
285
descending,
286
nulls_last,
287
multithreaded: false,
288
maintain_order,
289
limit: None,
290
},
291
)?;
292
df = df.select_by_range(0..df.width() - num_selectors)?;
293
294
_ = old_sender
295
.send(Morsel::new(df, MorselSeq::default(), SourceToken::new()))
296
.await;
297
Ok(())
298
}));
299
}
300
301
let (mut sink_input_tx, sink_input_rx) = connector::connector();
302
node.initialize(state)?;
303
node.spawn_sink(sink_input_rx, state, &mut join_handles);
304
let mut join_handles =
305
FuturesUnordered::from_iter(join_handles.into_iter().map(AbortOnDropHandle::new));
306
307
let (_, outcome) = PhaseOutcome::new_shared_wait(WaitGroup::default().token());
308
if sink_input_tx.send((outcome, sink_input)).await.is_err() {
309
// If this sending failed, probably some error occurred.
310
drop(sender);
311
while let Some(res) = join_handles.next().await {
312
res?;
313
}
314
315
return Ok(None);
316
}
317
318
Ok(Some((join_handles, sender, node)))
319
}
320
321