Path: blob/main/crates/polars-stream/src/nodes/io_sinks2/components/partitioner_pipeline.rs
7884 views
use std::sync::Arc;12use polars_error::PolarsResult;3use polars_expr::state::ExecutionState;45use crate::async_executor::{self, TaskPriority};6use crate::async_primitives::connector;7use crate::morsel::Morsel;8use crate::nodes::io_sinks2::components::partitioner::{PartitionedDataFrames, Partitioner};910pub struct PartitionerPipeline {11pub morsel_rx: connector::Receiver<Morsel>,12pub partitioner: Arc<Partitioner>,13pub inflight_morsel_semaphore: Arc<tokio::sync::Semaphore>,14pub partitioned_dfs_tx: tokio::sync::mpsc::Sender<15async_executor::AbortOnDropHandle<PolarsResult<PartitionedDataFrames>>,16>,17pub in_memory_exec_state: Arc<ExecutionState>,18}1920impl PartitionerPipeline {21pub async fn run(self) {22let PartitionerPipeline {23mut morsel_rx,24partitioner,25inflight_morsel_semaphore,26partitioned_dfs_tx,27in_memory_exec_state,28} = self;2930loop {31// Acquire a single permit to have backpressure. This is not attached to the send as the32// morsels from here do not count towards the in-flight morsel limit.33let permit = inflight_morsel_semaphore.acquire().await.unwrap();34drop(permit);3536let Ok(morsel) = morsel_rx.recv().await else {37return;38};3940let partitioner = Arc::clone(&partitioner);41let in_memory_exec_state = Arc::clone(&in_memory_exec_state);4243if partitioned_dfs_tx44.send(async_executor::AbortOnDropHandle::new(45async_executor::spawn(TaskPriority::Low, async move {46partitioner47.partition_morsel(morsel, in_memory_exec_state.as_ref())48.await49}),50))51.await52.is_err()53{54return;55}56}57}58}596061