Path: blob/main/crates/polars-stream/src/nodes/io_sinks/partition/mod.rs
6939 views
use std::sync::Arc;12use futures::StreamExt;3use futures::stream::FuturesUnordered;4use polars_core::prelude::{Column, DataType, SortMultipleOptions};5use polars_core::scalar::Scalar;6use polars_core::schema::SchemaRef;7use polars_error::PolarsResult;8use polars_io::cloud::CloudOptions;9use polars_plan::dsl::{10FileType, PartitionTargetCallback, PartitionTargetCallbackResult, PartitionTargetContext,11SinkOptions, SinkTarget,12};13use polars_utils::format_pl_smallstr;14use polars_utils::plpath::PlPathRef;1516use super::{DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE, SinkInputPort, SinkNode};17use crate::async_executor::{AbortOnDropHandle, spawn};18use crate::async_primitives::wait_group::WaitGroup;19use crate::async_primitives::{connector, distributor_channel};20use crate::execute::StreamingExecutionState;21use crate::expression::StreamExpr;22use crate::morsel::{MorselSeq, SourceToken};23use crate::nodes::io_sinks::phase::PhaseOutcome;24use crate::nodes::{Morsel, TaskPriority};2526pub mod by_key;27pub mod max_size;28pub mod parted;2930#[derive(Clone)]31pub struct PerPartitionSortBy {32// Invariant: all vecs have the same length.33pub selectors: Vec<StreamExpr>,34pub descending: Vec<bool>,35pub nulls_last: Vec<bool>,36pub maintain_order: bool,37}3839pub type CreateNewSinkFn =40Arc<dyn Send + Sync + Fn(SchemaRef, SinkTarget) -> PolarsResult<Box<dyn SinkNode + Send>>>;4142pub fn get_create_new_fn(43file_type: FileType,44sink_options: SinkOptions,45cloud_options: Option<CloudOptions>,46collect_metrics: bool,47) -> CreateNewSinkFn {48match file_type {49#[cfg(feature = "ipc")]50FileType::Ipc(ipc_writer_options) => Arc::new(move |input_schema, target| {51let sink = Box::new(super::ipc::IpcSinkNode::new(52input_schema,53target,54sink_options.clone(),55ipc_writer_options,56cloud_options.clone(),57)) as Box<dyn SinkNode + Send>;58Ok(sink)59}) as _,60#[cfg(feature = "json")]61FileType::Json(_ndjson_writer_options) => Arc::new(move |_input_schema, target| {62let sink = Box::new(super::json::NDJsonSinkNode::new(63target,64sink_options.clone(),65cloud_options.clone(),66)) as Box<dyn SinkNode + Send>;67Ok(sink)68}) as _,69#[cfg(feature = "parquet")]70FileType::Parquet(parquet_writer_options) => {71Arc::new(move |input_schema, target: SinkTarget| {72let sink = Box::new(super::parquet::ParquetSinkNode::new(73input_schema,74target,75sink_options.clone(),76&parquet_writer_options,77cloud_options.clone(),78collect_metrics,79)?) as Box<dyn SinkNode + Send>;80Ok(sink)81}) as _82},83#[cfg(feature = "csv")]84FileType::Csv(csv_writer_options) => Arc::new(move |input_schema, target| {85let sink = Box::new(super::csv::CsvSinkNode::new(86target,87input_schema,88sink_options.clone(),89csv_writer_options.clone(),90cloud_options.clone(),91)) as Box<dyn SinkNode + Send>;92Ok(sink)93}) as _,94#[cfg(not(any(95feature = "csv",96feature = "parquet",97feature = "json",98feature = "ipc"99)))]100_ => {101panic!("activate source feature")102},103}104}105106enum SinkSender {107Connector(connector::Sender<Morsel>),108Distributor(distributor_channel::Sender<Morsel>),109}110111impl SinkSender {112pub async fn send(&mut self, morsel: Morsel) -> Result<(), Morsel> {113match self {114SinkSender::Connector(sender) => sender.send(morsel).await,115SinkSender::Distributor(sender) => sender.send(morsel).await,116}117}118}119120fn default_by_key_file_path_cb(121ext: &str,122_file_idx: usize,123_part_idx: usize,124in_part_idx: usize,125columns: Option<&[Column]>,126separator: char,127) -> PolarsResult<String> {128use std::fmt::Write;129130let columns = columns.unwrap();131assert!(!columns.is_empty());132133let mut file_path = String::new();134for c in columns {135let name = c.name();136let value = c.head(Some(1)).strict_cast(&DataType::String)?;137let value = value.str().unwrap();138let value = value139.get(0)140.unwrap_or("__HIVE_DEFAULT_PARTITION__")141.as_bytes();142let value = percent_encoding::percent_encode(value, polars_io::utils::URL_ENCODE_CHAR_SET);143write!(&mut file_path, "{name}={value}").unwrap();144file_path.push(separator);145}146write!(&mut file_path, "{in_part_idx}.{ext}").unwrap();147148Ok(file_path)149}150151type FilePathCallback =152fn(&str, usize, usize, usize, Option<&[Column]>, char) -> PolarsResult<String>;153154#[allow(clippy::too_many_arguments)]155async fn open_new_sink(156base_path: PlPathRef<'_>,157file_path_cb: Option<&PartitionTargetCallback>,158default_file_path_cb: FilePathCallback,159file_idx: usize,160part_idx: usize,161in_part_idx: usize,162keys: Option<&[Column]>,163create_new_sink: &CreateNewSinkFn,164sink_input_schema: SchemaRef,165partition_name: &'static str,166ext: &str,167verbose: bool,168state: &StreamingExecutionState,169per_partition_sort_by: Option<&PerPartitionSortBy>,170) -> PolarsResult<171Option<(172FuturesUnordered<AbortOnDropHandle<PolarsResult<()>>>,173SinkSender,174Box<dyn SinkNode + Send>,175)>,176> {177let separator = '/'; // note: accepted by both Windows and Linux178let file_path = default_file_path_cb(ext, file_idx, part_idx, in_part_idx, keys, separator)?;179let path = base_path.join(file_path.as_str());180181// If the user provided their own callback, modify the path to that.182let target = if let Some(file_path_cb) = file_path_cb {183let keys = keys.map_or(Vec::new(), |keys| {184keys.iter()185.map(|k| polars_plan::dsl::PartitionTargetContextKey {186name: k.name().clone(),187raw_value: Scalar::new(k.dtype().clone(), k.get(0).unwrap().into_static()),188})189.collect()190});191192let target = file_path_cb.call(PartitionTargetContext {193file_idx,194part_idx,195in_part_idx,196keys,197file_path,198full_path: path,199})?;200match target {201// Offset the given path by the base_path.202PartitionTargetCallbackResult::Str(p) => SinkTarget::Path(base_path.join(p)),203PartitionTargetCallbackResult::Dyn(t) => SinkTarget::Dyn(t),204}205} else {206SinkTarget::Path(path)207};208209if verbose {210match &target {211SinkTarget::Path(p) => eprintln!(212"[partition[{partition_name}]]: Start on new file '{}'",213p.display(),214),215SinkTarget::Dyn(_) => eprintln!("[partition[{partition_name}]]: Start on new file",),216}217}218219let mut node = (create_new_sink)(sink_input_schema.clone(), target)?;220let mut join_handles = Vec::new();221let (sink_input, mut sender) = if node.is_sink_input_parallel() {222let (tx, dist_rxs) = distributor_channel::distributor_channel(223state.num_pipelines,224*DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE,225);226let (txs, rxs) = (0..state.num_pipelines)227.map(|_| connector::connector())228.collect::<(Vec<_>, Vec<_>)>();229join_handles.extend(dist_rxs.into_iter().zip(txs).map(|(mut dist_rx, mut tx)| {230spawn(TaskPriority::High, async move {231while let Ok(morsel) = dist_rx.recv().await {232if tx.send(morsel).await.is_err() {233break;234}235}236Ok(())237})238}));239240(SinkInputPort::Parallel(rxs), SinkSender::Distributor(tx))241} else {242let (tx, rx) = connector::connector();243(SinkInputPort::Serial(rx), SinkSender::Connector(tx))244};245246// Handle sorting per partition.247if let Some(per_partition_sort_by) = per_partition_sort_by {248let num_selectors = per_partition_sort_by.selectors.len();249let (tx, mut rx) = connector::connector();250251let state = state.in_memory_exec_state.split();252let selectors = per_partition_sort_by.selectors.clone();253let descending = per_partition_sort_by.descending.clone();254let nulls_last = per_partition_sort_by.nulls_last.clone();255let maintain_order = per_partition_sort_by.maintain_order;256257// Tell the partitioning sink to send stuff here instead.258let mut old_sender = std::mem::replace(&mut sender, SinkSender::Connector(tx));259260// This all happens in a single thread per partition. Acceptable for now as the main261// usecase here is writing many partitions, not the best idea for the future.262join_handles.push(spawn(TaskPriority::High, async move {263// Gather all morsels for this partition. We expect at least one morsel per partition.264let Ok(morsel) = rx.recv().await else {265return Ok(());266};267let mut df = morsel.into_df();268while let Ok(next_morsel) = rx.recv().await {269df.vstack_mut_owned(next_morsel.into_df())?;270}271272let mut names = Vec::with_capacity(num_selectors);273for (i, s) in selectors.into_iter().enumerate() {274// @NOTE: This evaluation cannot be done as chunks come in since it might contain275// non-elementwise expressions.276let c = s.evaluate(&df, &state).await?;277let name = format_pl_smallstr!("__POLARS_PART_SORT_COL{i}");278names.push(name.clone());279df.with_column(c.with_name(name))?;280}281df.sort_in_place(282names,283SortMultipleOptions {284descending,285nulls_last,286multithreaded: false,287maintain_order,288limit: None,289},290)?;291df = df.select_by_range(0..df.width() - num_selectors)?;292293_ = old_sender294.send(Morsel::new(df, MorselSeq::default(), SourceToken::new()))295.await;296Ok(())297}));298}299300let (mut sink_input_tx, sink_input_rx) = connector::connector();301node.initialize(state)?;302node.spawn_sink(sink_input_rx, state, &mut join_handles);303let mut join_handles =304FuturesUnordered::from_iter(join_handles.into_iter().map(AbortOnDropHandle::new));305306let (_, outcome) = PhaseOutcome::new_shared_wait(WaitGroup::default().token());307if sink_input_tx.send((outcome, sink_input)).await.is_err() {308// If this sending failed, probably some error occurred.309drop(sender);310while let Some(res) = join_handles.next().await {311res?;312}313314return Ok(None);315}316317Ok(Some((join_handles, sender, node)))318}319320321