Path: blob/main/crates/polars-stream/src/nodes/io_sinks/partition/parted.rs
6939 views
use std::pin::Pin;1use std::sync::{Arc, Mutex};23use futures::StreamExt;4use futures::stream::FuturesUnordered;5use polars_core::config;6use polars_core::prelude::row_encode::_get_rows_encoded_ca_unordered;7use polars_core::prelude::{AnyValue, Column, IntoColumn, PlHashSet};8use polars_core::schema::SchemaRef;9use polars_error::PolarsResult;10use polars_plan::dsl::{PartitionTargetCallback, SinkFinishCallback, SinkOptions};11use polars_utils::pl_str::PlSmallStr;12use polars_utils::plpath::PlPath;13use polars_utils::relaxed_cell::RelaxedCell;1415use super::{CreateNewSinkFn, PerPartitionSortBy};16use crate::async_executor::{AbortOnDropHandle, spawn};17use crate::async_primitives::connector::Receiver;18use crate::async_primitives::distributor_channel::distributor_channel;19use crate::execute::StreamingExecutionState;20use crate::nodes::io_sinks::metrics::WriteMetrics;21use crate::nodes::io_sinks::partition::{SinkSender, open_new_sink};22use crate::nodes::io_sinks::phase::PhaseOutcome;23use crate::nodes::io_sinks::{SinkInputPort, SinkNode};24use crate::nodes::{JoinHandle, Morsel, TaskPriority};2526pub struct PartedPartitionSinkNode {27input_schema: SchemaRef,28// This is not be the same as the input_schema, e.g. when include_key=false then this will not29// include the keys columns.30sink_input_schema: SchemaRef,3132key_cols: Arc<[PlSmallStr]>,33base_path: Arc<PlPath>,34file_path_cb: Option<PartitionTargetCallback>,35create_new: CreateNewSinkFn,36ext: PlSmallStr,3738sink_options: SinkOptions,39include_key: bool,4041/// The number of tasks that get used to wait for finished files. If you are write large enough42/// files (i.e. they would be formed by multiple morsels) this should almost always be 1. But43/// if you are writing many small files, this should scan up to allow for your threads to44/// saturate. In any sane situation this should never go past the amount of threads you have45/// available.46///47/// This is somewhat proportional to the amount of files open at any given point.48num_retire_tasks: usize,4950per_partition_sort_by: Option<PerPartitionSortBy>,51partition_metrics: Arc<Mutex<Vec<Vec<WriteMetrics>>>>,52finish_callback: Option<SinkFinishCallback>,53}5455const DEFAULT_RETIRE_TASKS: usize = 1;56impl PartedPartitionSinkNode {57#[allow(clippy::too_many_arguments)]58pub fn new(59input_schema: SchemaRef,60key_cols: Arc<[PlSmallStr]>,61base_path: Arc<PlPath>,62file_path_cb: Option<PartitionTargetCallback>,63create_new: CreateNewSinkFn,64ext: PlSmallStr,65sink_options: SinkOptions,66include_key: bool,67per_partition_sort_by: Option<PerPartitionSortBy>,68finish_callback: Option<SinkFinishCallback>,69) -> Self {70assert!(!key_cols.is_empty());7172let mut sink_input_schema = input_schema.clone();73if !include_key {74let keys_col_hm = PlHashSet::from_iter(key_cols.iter().map(|s| s.as_str()));75sink_input_schema = Arc::new(76sink_input_schema77.try_project(78input_schema79.iter_names()80.filter(|n| !keys_col_hm.contains(n.as_str()))81.cloned(),82)83.unwrap(),84);85}8687let num_retire_tasks =88std::env::var("POLARS_PARTED_SINK_RETIRE_TASKS").map_or(DEFAULT_RETIRE_TASKS, |v| {89v.parse::<usize>()90.expect("unable to parse POLARS_PARTED_SINK_RETIRE_TASKS")91.max(1)92});9394Self {95input_schema,96sink_input_schema,97key_cols,98base_path,99file_path_cb,100create_new,101ext,102sink_options,103num_retire_tasks,104include_key,105per_partition_sort_by,106partition_metrics: Arc::new(Mutex::new(Vec::with_capacity(num_retire_tasks))),107finish_callback,108}109}110}111112impl SinkNode for PartedPartitionSinkNode {113fn name(&self) -> &str {114"partition-parted-sink"115}116117fn is_sink_input_parallel(&self) -> bool {118false119}120fn do_maintain_order(&self) -> bool {121self.sink_options.maintain_order122}123124fn initialize(&mut self, _state: &StreamingExecutionState) -> PolarsResult<()> {125Ok(())126}127128fn spawn_sink(129&mut self,130mut recv_port_recv: Receiver<(PhaseOutcome, SinkInputPort)>,131state: &StreamingExecutionState,132join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,133) {134// Main Task -> Retire Tasks135let (mut retire_tx, retire_rxs) = distributor_channel(self.num_retire_tasks, 1);136137// Whether an error has been observed in the retire tasks.138let has_error_occurred = Arc::new(RelaxedCell::from(false));139140// Main Task.141//142// Takes the morsels coming in and passes them to underlying sink.143let task_state = state.clone();144let sink_input_schema = self.sink_input_schema.clone();145let key_cols = self.key_cols.clone();146let base_path = self.base_path.clone();147let file_path_cb = self.file_path_cb.clone();148let create_new = self.create_new.clone();149let ext = self.ext.clone();150let include_key = self.include_key;151let retire_error = has_error_occurred.clone();152let per_partition_sort_by = self.per_partition_sort_by.clone();153join_handles.push(spawn(TaskPriority::High, async move {154struct CurrentSink {155sender: SinkSender,156join_handles: FuturesUnordered<AbortOnDropHandle<PolarsResult<()>>>,157value: AnyValue<'static>,158keys: Vec<Column>,159node: Box<dyn SinkNode + Send>,160}161162let verbose = config::verbose();163let mut file_idx = 0;164let mut current_sink_opt: Option<CurrentSink> = None;165let mut lengths = Vec::new();166167while let Ok((outcome, recv_port)) = recv_port_recv.recv().await {168let mut recv_port = recv_port.serial();169while let Ok(morsel) = recv_port.recv().await {170let (mut df, seq, source_token, consume_token) = morsel.into_inner();171if df.height() == 0 {172continue;173}174175let mut c = if key_cols.len() == 1 {176let idx = df.try_get_column_index(&key_cols[0])?;177df.get_columns()[idx].clone()178} else {179let columns = df.select_columns(key_cols.iter().cloned())?;180_get_rows_encoded_ca_unordered(PlSmallStr::EMPTY, &columns)?.into_column()181};182183lengths.clear();184polars_ops::series::rle_lengths(&c, &mut lengths)?;185186for &length in &lengths {187if retire_error.load() {188return Ok(());189}190191let mut parted_df;192let parted_c;193(parted_df, df) = df.split_at(length as i64);194(parted_c, c) = c.split_at(length as i64);195196let value = parted_c.get(0).unwrap().into_static();197198// If we have a sink open that does not match the value, close it.199if let Some(current_sink) = current_sink_opt.take() {200if current_sink.value != value {201drop(current_sink.sender);202if retire_tx203.send((204current_sink.join_handles,205current_sink.node,206current_sink.keys,207))208.await209.is_err()210{211return Ok(());212};213} else {214current_sink_opt = Some(current_sink);215}216}217218let current_sink = match current_sink_opt.as_mut() {219Some(c) => c,220None => {221let keys = parted_df.select_columns(key_cols.iter().cloned())?;222let result = open_new_sink(223base_path.as_ref().as_ref(),224file_path_cb.as_ref(),225super::default_by_key_file_path_cb,226file_idx,227file_idx,2280,229Some(keys.as_slice()),230&create_new,231sink_input_schema.clone(),232"parted",233ext.as_str(),234verbose,235&task_state,236per_partition_sort_by.as_ref(),237)238.await?;239file_idx += 1;240let Some((join_handles, sender, node)) = result else {241return Ok(());242};243244current_sink_opt.insert(CurrentSink {245sender,246value,247join_handles,248node,249keys,250})251},252};253254if !include_key {255parted_df = parted_df.drop_many(key_cols.iter().cloned());256}257258if current_sink259.sender260.send(Morsel::new(parted_df, seq, source_token.clone()))261.await262.is_err()263{264return Ok(());265};266}267268drop(consume_token);269}270271outcome.stopped();272}273274if let Some(current_sink) = current_sink_opt.take() {275drop(current_sink.sender);276if retire_tx277.send((278current_sink.join_handles,279current_sink.node,280current_sink.keys,281))282.await283.is_err()284{285return Ok(());286};287}288289Ok(())290}));291292// Retire Tasks.293//294// If a file is finished someone needs to wait for the sink tasks to finish. Since we don't295// want to block the main task, we do it in separate tasks. Usually this is only 1 task,296// but it can be scaled up using an environment variable.297let has_error_occurred = &has_error_occurred;298join_handles.extend(retire_rxs.into_iter().map(|mut retire_rx| {299let global_partition_metrics = self.partition_metrics.clone();300let has_error_occurred = has_error_occurred.clone();301let task_state = state.clone();302303spawn(TaskPriority::High, async move {304let mut partition_metrics = Vec::new();305306while let Ok((mut join_handles, mut node, keys)) = retire_rx.recv().await {307while let Some(ret) = join_handles.next().await {308ret.inspect_err(|_| {309has_error_occurred.store(true);310})?;311}312if let Some(mut metrics) = node.get_metrics()? {313metrics.keys = Some(314keys.into_iter()315.map(|c| c.get(0).unwrap().into_static())316.collect(),317);318partition_metrics.push(metrics);319}320if let Some(finalize) = node.finalize(&task_state) {321finalize.await?;322}323}324325{326let mut global_written_partitions = global_partition_metrics.lock().unwrap();327global_written_partitions.push(partition_metrics);328}329330Ok(())331})332}));333}334335fn finalize(336&mut self,337_state: &StreamingExecutionState,338) -> Option<Pin<Box<dyn Future<Output = PolarsResult<()>> + Send>>> {339let finish_callback = self.finish_callback.clone();340let partition_metrics = self.partition_metrics.clone();341let sink_input_schema = self.sink_input_schema.clone();342let input_schema = self.input_schema.clone();343let key_cols = self.key_cols.clone();344345Some(Box::pin(async move {346if let Some(finish_callback) = &finish_callback {347let mut written_partitions = partition_metrics.lock().unwrap();348let written_partitions =349std::mem::take::<Vec<Vec<WriteMetrics>>>(written_partitions.as_mut())350.into_iter()351.flatten()352.collect();353let df = WriteMetrics::collapse_to_df(354written_partitions,355&sink_input_schema,356Some(&input_schema.try_project(key_cols.iter()).unwrap()),357);358finish_callback.call(df)?;359}360Ok(())361}))362}363}364365366