Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks2/components/partitioner_pipeline.rs
7884 views
1
use std::sync::Arc;
2
3
use polars_error::PolarsResult;
4
use polars_expr::state::ExecutionState;
5
6
use crate::async_executor::{self, TaskPriority};
7
use crate::async_primitives::connector;
8
use crate::morsel::Morsel;
9
use crate::nodes::io_sinks2::components::partitioner::{PartitionedDataFrames, Partitioner};
10
11
pub struct PartitionerPipeline {
12
pub morsel_rx: connector::Receiver<Morsel>,
13
pub partitioner: Arc<Partitioner>,
14
pub inflight_morsel_semaphore: Arc<tokio::sync::Semaphore>,
15
pub partitioned_dfs_tx: tokio::sync::mpsc::Sender<
16
async_executor::AbortOnDropHandle<PolarsResult<PartitionedDataFrames>>,
17
>,
18
pub in_memory_exec_state: Arc<ExecutionState>,
19
}
20
21
impl PartitionerPipeline {
22
pub async fn run(self) {
23
let PartitionerPipeline {
24
mut morsel_rx,
25
partitioner,
26
inflight_morsel_semaphore,
27
partitioned_dfs_tx,
28
in_memory_exec_state,
29
} = self;
30
31
loop {
32
// Acquire a single permit to have backpressure. This is not attached to the send as the
33
// morsels from here do not count towards the in-flight morsel limit.
34
let permit = inflight_morsel_semaphore.acquire().await.unwrap();
35
drop(permit);
36
37
let Ok(morsel) = morsel_rx.recv().await else {
38
return;
39
};
40
41
let partitioner = Arc::clone(&partitioner);
42
let in_memory_exec_state = Arc::clone(&in_memory_exec_state);
43
44
if partitioned_dfs_tx
45
.send(async_executor::AbortOnDropHandle::new(
46
async_executor::spawn(TaskPriority::Low, async move {
47
partitioner
48
.partition_morsel(morsel, in_memory_exec_state.as_ref())
49
.await
50
}),
51
))
52
.await
53
.is_err()
54
{
55
return;
56
}
57
}
58
}
59
}
60
61