Path: blob/main/crates/polars-stream/src/nodes/io_sinks2/mod.rs
7884 views
use polars_core::frame::DataFrame;1use polars_error::{PolarsResult, polars_ensure};2use polars_io::pl_async;3use polars_utils::format_pl_smallstr;4use polars_utils::pl_str::PlSmallStr;56use super::{ComputeNode, PortState};7use crate::async_executor;8use crate::async_primitives::connector;9use crate::execute::StreamingExecutionState;10use crate::morsel::{Morsel, MorselSeq, SourceToken};11use crate::nodes::TaskPriority;12use crate::nodes::io_sinks2::components::partitioner::Partitioner;13use crate::nodes::io_sinks2::config::{IOSinkNodeConfig, IOSinkTarget};14use crate::nodes::io_sinks2::pipeline_initialization::partition_by::start_partition_sink_pipeline;15use crate::nodes::io_sinks2::pipeline_initialization::single_file::start_single_file_sink_pipeline;16use crate::pipe::PortReceiver;17pub mod components;18pub mod config;19pub mod pipeline_initialization;20pub mod writers;2122pub struct IOSinkNode {23name: PlSmallStr,24state: IOSinkNodeState,25verbose: bool,26}2728impl IOSinkNode {29pub fn new(config: impl Into<Box<IOSinkNodeConfig>>) -> Self {30let config = config.into();3132let target_type = match &config.target {33IOSinkTarget::File(_) => "single-file",34IOSinkTarget::Partitioned(p) => match &p.partitioner {35Partitioner::Keyed(_) => "partition-keyed",36Partitioner::FileSize => "partition-file-size",37},38};3940let extension = config.file_format.extension();4142let name = format_pl_smallstr!("io-sink[{target_type}[{extension}]]");43let verbose = polars_core::config::verbose();4445IOSinkNode {46name,47state: IOSinkNodeState::Uninitialized { config },48verbose,49}50}51}5253impl ComputeNode for IOSinkNode {54fn name(&self) -> &str {55&self.name56}5758fn update_state(59&mut self,60recv: &mut [crate::graph::PortState],61send: &mut [crate::graph::PortState],62execution_state: &StreamingExecutionState,63) -> polars_error::PolarsResult<()> {64assert_eq!(recv.len(), 1);65assert!(send.is_empty());6667recv[0] = if recv[0] == PortState::Done {68// Ensure initialize / writes empty file for empty output.69self.state.initialize(&self.name, execution_state)?;7071match std::mem::replace(&mut self.state, IOSinkNodeState::Finished) {72IOSinkNodeState::Initialized {73phase_channel_tx,74task_handle,75} => {76if self.verbose {77eprintln!(78"{}: Join on task_handle (recv PortState::Done)",79self.name()80);81}82drop(phase_channel_tx);83pl_async::get_runtime().block_on(task_handle)?;84},85IOSinkNodeState::Finished => {},86IOSinkNodeState::Uninitialized { .. } => unreachable!(),87};8889PortState::Done90} else {91polars_ensure!(92!matches!(self.state, IOSinkNodeState::Finished),93ComputeError:94"unreachable: IO sink node state is 'Finished', but recv port \95state is not 'Done'."96);9798PortState::Ready99};100101Ok(())102}103104fn spawn<'env, 's>(105&'env mut self,106scope: &'s crate::async_executor::TaskScope<'s, 'env>,107recv_ports: &mut [Option<crate::pipe::RecvPort<'_>>],108send_ports: &mut [Option<crate::pipe::SendPort<'_>>],109execution_state: &'s StreamingExecutionState,110join_handles: &mut Vec<crate::async_executor::JoinHandle<polars_error::PolarsResult<()>>>,111) {112assert_eq!(recv_ports.len(), 1);113assert!(send_ports.is_empty());114115let phase_morsel_rx = recv_ports[0].take().unwrap().serial();116117join_handles.push(scope.spawn_task(TaskPriority::Low, async move {118self.state.initialize(&self.name, execution_state)?;119120let IOSinkNodeState::Initialized {121phase_channel_tx, ..122} = &mut self.state123else {124unreachable!()125};126127if phase_channel_tx.send(phase_morsel_rx).await.is_err() {128let IOSinkNodeState::Initialized {129phase_channel_tx,130task_handle,131} = std::mem::replace(&mut self.state, IOSinkNodeState::Finished)132else {133unreachable!()134};135136if self.verbose {137eprintln!(138"{}: Join on task_handle (phase_channel_tx Err)",139self.name()140);141}142143drop(phase_channel_tx);144145return Err(task_handle.await.unwrap_err());146}147148Ok(())149}));150}151}152153enum IOSinkNodeState {154Uninitialized {155config: Box<IOSinkNodeConfig>,156},157158Initialized {159phase_channel_tx: connector::Sender<PortReceiver>,160/// Join handle for all background tasks.161task_handle: async_executor::AbortOnDropHandle<PolarsResult<()>>,162},163164Finished,165}166167impl IOSinkNodeState {168/// Initialize state if not yet initialized.169fn initialize(170&mut self,171node_name: &PlSmallStr,172execution_state: &StreamingExecutionState,173) -> PolarsResult<()> {174use IOSinkNodeState::*;175176if !matches!(self, Self::Uninitialized { .. }) {177return Ok(());178}179180let Uninitialized { mut config } = std::mem::replace(self, Finished) else {181unreachable!()182};183184config.num_pipelines = execution_state.num_pipelines;185186let (phase_channel_tx, mut phase_channel_rx) = connector::connector::<PortReceiver>();187let (mut multi_phase_tx, multi_phase_rx) = connector::connector();188189let first_morsel = Morsel::new(190DataFrame::empty_with_arc_schema(config.input_schema.clone()),191MorselSeq::new(0),192SourceToken::default(),193);194195async_executor::spawn(TaskPriority::High, async move {196multi_phase_tx.send(first_morsel).await.unwrap();197198let mut morsel_seq: u64 = 1;199200while let Ok(mut phase_rx) = phase_channel_rx.recv().await {201while let Ok(mut morsel) = phase_rx.recv().await {202morsel.set_seq(MorselSeq::new(morsel_seq));203morsel_seq = morsel_seq.saturating_add(1);204205if multi_phase_tx.send(morsel).await.is_err() {206break;207}208}209}210});211212let task_handle = match &config.target {213IOSinkTarget::File(_) => {214start_single_file_sink_pipeline(node_name.clone(), multi_phase_rx, *config)?215},216217IOSinkTarget::Partitioned { .. } => {218start_partition_sink_pipeline(node_name, multi_phase_rx, *config, execution_state)?219},220};221222*self = Initialized {223phase_channel_tx,224task_handle,225};226227Ok(())228}229}230231232