Path: blob/main/crates/polars-stream/src/nodes/io_sinks/components/partitioner.rs
8480 views
use std::borrow::Cow;12use polars_core::frame::DataFrame;3use polars_core::prelude::row_encode::_get_rows_encoded_ca_unordered;4use polars_core::prelude::{BinaryOffsetChunked, Column, IntoGroupsType};5use polars_error::PolarsResult;6use polars_expr::hash_keys::{HashKeysVariant, hash_keys_variant_for_dtype};7use polars_expr::state::ExecutionState;8use polars_utils::pl_str::PlSmallStr;910use crate::async_primitives::wait_group::WaitToken;11use crate::expression::StreamExpr;12use crate::morsel::Morsel;13use crate::nodes::io_sinks::components::exclude_keys_projection::ExcludeKeysProjection;14use crate::nodes::io_sinks::components::partition_key::{PartitionKey, PreComputedKeys};15use crate::nodes::io_sinks::components::size::RowCountAndSize;1617pub struct PartitionedDataFrames {18pub partitions_vec: Vec<Partition>,19pub input_size: RowCountAndSize,20pub input_wait_token: Option<WaitToken>,21}2223pub struct Partition {24pub key: PartitionKey,25/// 1-row df with keys.26pub keys_df: DataFrame,27/// Does not include columns in `keys_df`28pub df: DataFrame,29}3031pub enum Partitioner {32/// All rows to a single partition33FileSize,34Keyed(KeyedPartitioner),35}3637impl Partitioner {38pub async fn partition_morsel(39&self,40morsel: Morsel,41in_memory_exec_state: &ExecutionState,42) -> PolarsResult<PartitionedDataFrames> {43let (df, _, _, input_wait_token) = morsel.into_inner();44let input_size = RowCountAndSize::new_from_df(&df);45let partitions_vec = match self {46Self::FileSize => vec![Partition {47key: PartitionKey::NULL,48keys_df: DataFrame::empty_with_height(1),49df,50}],51Self::Keyed(v) => v.partition_df(df, in_memory_exec_state).await?,52};5354let out = PartitionedDataFrames {55partitions_vec,56input_size,57input_wait_token,58};5960Ok(out)61}6263pub fn verbose_display(&self) -> impl std::fmt::Display + '_ {64struct DisplayWrap<'a>(&'a Partitioner);6566impl std::fmt::Display for DisplayWrap<'_> {67fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {68match self.0 {69Partitioner::FileSize => f.write_str("FileSize"),70Partitioner::Keyed(kp) => write!(71f,72"Keyed({} key{})",73kp.key_exprs.len(),74if kp.key_exprs.len() == 1 { "" } else { "s" }75),76}77}78}7980DisplayWrap(self)81}82}8384pub struct KeyedPartitioner {85/// Must be non-empty86pub key_exprs: Vec<StreamExpr>,87/// Exclude key columns from full gather. Can be `None` if all key exprs output88/// names do not overlap with existing names.89pub exclude_keys_projection: Option<ExcludeKeysProjection>,90}9192impl KeyedPartitioner {93async fn partition_df(94&self,95df: DataFrame,96in_memory_exec_state: &ExecutionState,97) -> PolarsResult<Vec<Partition>> {98assert!(!self.key_exprs.is_empty());99100let mut key_columns = Vec::with_capacity(self.key_exprs.len());101102for e in self.key_exprs.as_slice() {103key_columns.push(104e.evaluate_preserve_len_broadcast(&df, in_memory_exec_state)105.await?,106);107}108109let mut pre_computed_keys: Option<PreComputedKeys> = None;110let single_non_encode = match key_columns.as_slice() {111[c] => {112pre_computed_keys = PreComputedKeys::opt_new_non_encoded(c);113hash_keys_variant_for_dtype(c.dtype()) != HashKeysVariant::RowEncoded114},115_ => false,116};117118let row_encode = |columns: &[Column]| -> BinaryOffsetChunked {119_get_rows_encoded_ca_unordered(PlSmallStr::EMPTY, columns)120.unwrap()121.rechunk()122.into_owned()123};124125let mut keys_encoded_ca: Option<BinaryOffsetChunked> =126(!single_non_encode).then(|| row_encode(&key_columns));127128let groups = if single_non_encode {129key_columns[0]130.as_materialized_series()131.group_tuples(false, false)132} else {133keys_encoded_ca.as_ref().unwrap().group_tuples(false, false)134}135.unwrap();136137if pre_computed_keys.is_none() {138if keys_encoded_ca.is_none() && groups.len() > (df.height() / 4) {139keys_encoded_ca = Some(row_encode(&key_columns));140}141142pre_computed_keys = keys_encoded_ca143.as_ref()144.map(|x| PreComputedKeys::RowEncoded(x.downcast_as_array().clone()));145}146147let gather_source_df: Cow<DataFrame> =148if let Some(projection) = self.exclude_keys_projection.as_ref() {149let columns = df.columns();150151Cow::Owned(unsafe {152DataFrame::new_unchecked(153df.height(),154projection155.iter_indices()156.map(|i| columns[i].clone())157.collect(),158)159})160} else {161Cow::Borrowed(&df)162};163164let partitions_vec: Vec<Partition> = groups165.iter()166.map(|groups_indicator| {167let first_idx = groups_indicator.first();168let df = unsafe { gather_source_df.gather_group_unchecked(&groups_indicator) };169170// Ensure 0-width is handled properly.171assert_eq!(df.height(), groups_indicator.len());172173let keys_df: DataFrame = unsafe {174DataFrame::new_unchecked(1751,176key_columns177.iter()178.map(|c| c.take_slice_unchecked(&[first_idx]))179.collect(),180)181};182183let key: PartitionKey = pre_computed_keys.as_ref().map_or_else(184|| PartitionKey::from_slice(row_encode(keys_df.columns()).get(0).unwrap()),185|keys| keys.get_key(first_idx.try_into().unwrap()),186);187188Partition { key, keys_df, df }189})190.collect();191192Ok(partitions_vec)193}194}195196197