Path: blob/main/crates/polars-stream/src/nodes/io_sinks/mod.rs
8446 views
use std::sync::Arc;12use polars_core::frame::DataFrame;3use polars_error::{PolarsResult, polars_ensure};4use polars_io::metrics::IOMetrics;5use polars_io::pl_async;6use polars_utils::format_pl_smallstr;7use polars_utils::pl_str::PlSmallStr;89use super::{ComputeNode, PortState};10use crate::async_executor;11use crate::async_primitives::connector;12use crate::execute::StreamingExecutionState;13use crate::metrics::MetricsBuilder;14use crate::morsel::{Morsel, MorselSeq, SourceToken};15use crate::nodes::TaskPriority;16use crate::nodes::io_sinks::components::partitioner::Partitioner;17use crate::nodes::io_sinks::config::{IOSinkNodeConfig, IOSinkTarget};18use crate::nodes::io_sinks::pipeline_initialization::partition_by::start_partition_sink_pipeline;19use crate::nodes::io_sinks::pipeline_initialization::single_file::start_single_file_sink_pipeline;20use crate::pipe::PortReceiver;21pub mod components;22pub mod config;23pub mod pipeline_initialization;24pub mod writers;2526pub struct IOSinkNode {27name: PlSmallStr,28state: IOSinkNodeState,29io_metrics: Option<Arc<IOMetrics>>,30verbose: bool,31}3233impl IOSinkNode {34pub fn new(config: impl Into<Box<IOSinkNodeConfig>>) -> Self {35let config = config.into();3637let target_type = match &config.target {38IOSinkTarget::File(_) => "single-file",39IOSinkTarget::Partitioned(p) => match &p.partitioner {40Partitioner::Keyed(_) => "partition-keyed",41Partitioner::FileSize => "partition-file-size",42},43};4445let extension = config.file_format.extension();4647let name = format_pl_smallstr!("io-sink[{target_type}[{extension}]]");48let verbose = polars_core::config::verbose();4950IOSinkNode {51name,52state: IOSinkNodeState::Uninitialized { config },53io_metrics: None,54verbose,55}56}57}5859impl ComputeNode for IOSinkNode {60fn name(&self) -> &str {61&self.name62}6364fn set_metrics_builder(&mut self, metrics_builder: MetricsBuilder) {65self.io_metrics = Some(metrics_builder.new_io_metrics());66}6768fn update_state(69&mut self,70recv: &mut [crate::graph::PortState],71send: &mut [crate::graph::PortState],72execution_state: &StreamingExecutionState,73) -> polars_error::PolarsResult<()> {74assert_eq!(recv.len(), 1);75assert!(send.is_empty());7677recv[0] = if recv[0] == PortState::Done {78// Ensure initialize / writes empty file for empty output.79self.state80.initialize(&self.name, execution_state, self.io_metrics.clone())?;8182match std::mem::replace(&mut self.state, IOSinkNodeState::Finished) {83IOSinkNodeState::Initialized {84phase_channel_tx,85task_handle,86} => {87if self.verbose {88eprintln!(89"{}: Join on task_handle (recv PortState::Done)",90self.name()91);92}93drop(phase_channel_tx);94pl_async::get_runtime().block_on(task_handle)?;95},96IOSinkNodeState::Finished => {},97IOSinkNodeState::Uninitialized { .. } => unreachable!(),98};99100PortState::Done101} else {102polars_ensure!(103!matches!(self.state, IOSinkNodeState::Finished),104ComputeError:105"unreachable: IO sink node state is 'Finished', but recv port \106state is not 'Done'."107);108109PortState::Ready110};111112Ok(())113}114115fn spawn<'env, 's>(116&'env mut self,117scope: &'s crate::async_executor::TaskScope<'s, 'env>,118recv_ports: &mut [Option<crate::pipe::RecvPort<'_>>],119send_ports: &mut [Option<crate::pipe::SendPort<'_>>],120execution_state: &'s StreamingExecutionState,121join_handles: &mut Vec<crate::async_executor::JoinHandle<polars_error::PolarsResult<()>>>,122) {123assert_eq!(recv_ports.len(), 1);124assert!(send_ports.is_empty());125126let phase_morsel_rx = recv_ports[0].take().unwrap().serial();127128join_handles.push(scope.spawn_task(TaskPriority::Low, async move {129self.state130.initialize(&self.name, execution_state, self.io_metrics.clone())?;131132let IOSinkNodeState::Initialized {133phase_channel_tx, ..134} = &mut self.state135else {136unreachable!()137};138139if phase_channel_tx.send(phase_morsel_rx).await.is_err() {140let IOSinkNodeState::Initialized {141phase_channel_tx,142task_handle,143} = std::mem::replace(&mut self.state, IOSinkNodeState::Finished)144else {145unreachable!()146};147148if self.verbose {149eprintln!(150"{}: Join on task_handle (phase_channel_tx Err)",151self.name()152);153}154155drop(phase_channel_tx);156157return Err(task_handle.await.unwrap_err());158}159160Ok(())161}));162}163}164165enum IOSinkNodeState {166Uninitialized {167config: Box<IOSinkNodeConfig>,168},169170Initialized {171phase_channel_tx: connector::Sender<PortReceiver>,172/// Join handle for all background tasks.173task_handle: async_executor::AbortOnDropHandle<PolarsResult<()>>,174},175176Finished,177}178179impl IOSinkNodeState {180/// Initialize state if not yet initialized.181fn initialize(182&mut self,183node_name: &PlSmallStr,184execution_state: &StreamingExecutionState,185io_metrics: Option<Arc<IOMetrics>>,186) -> PolarsResult<()> {187use IOSinkNodeState::*;188189if !matches!(self, Self::Uninitialized { .. }) {190return Ok(());191}192193let Uninitialized { config } = std::mem::replace(self, Finished) else {194unreachable!()195};196197let (phase_channel_tx, mut phase_channel_rx) = connector::connector::<PortReceiver>();198let (mut multi_phase_tx, multi_phase_rx) = connector::connector();199200let _ = multi_phase_tx.try_send(Morsel::new(201DataFrame::empty_with_arc_schema(config.input_schema.clone()),202MorselSeq::new(0),203SourceToken::default(),204));205206async_executor::spawn(TaskPriority::High, async move {207let mut morsel_seq: u64 = 1;208209while let Ok(mut phase_rx) = phase_channel_rx.recv().await {210while let Ok(mut morsel) = phase_rx.recv().await {211morsel.set_seq(MorselSeq::new(morsel_seq));212morsel_seq = morsel_seq.saturating_add(1);213214if multi_phase_tx.send(morsel).await.is_err() {215break;216}217}218}219});220221let task_handle = match &config.target {222IOSinkTarget::File(_) => start_single_file_sink_pipeline(223node_name.clone(),224multi_phase_rx,225*config,226execution_state,227io_metrics,228)?,229230IOSinkTarget::Partitioned { .. } => start_partition_sink_pipeline(231node_name,232multi_phase_rx,233*config,234execution_state,235io_metrics,236)?,237};238239*self = Initialized {240phase_channel_tx,241task_handle,242};243244Ok(())245}246}247248249