Path: blob/main/crates/polars-stream/src/nodes/io_sinks2/components/partitioner.rs
7884 views
use std::borrow::Cow;12use arrow::array::{BinaryViewArray, FixedSizeBinaryArray, PrimitiveArray};3use arrow::buffer::Buffer;4use arrow::datatypes::ArrowDataType;5use polars_core::frame::DataFrame;6use polars_core::prelude::row_encode::_get_rows_encoded_ca_unordered;7use polars_core::prelude::{8BinaryOffsetChunked, Column, DataType, GroupsIndicator, IntoGroupsType, LargeBinaryArray,9};10use polars_core::{with_match_physical_integer_type, with_match_physical_numeric_type};11use polars_error::PolarsResult;12use polars_expr::hash_keys::{HashKeysVariant, hash_keys_variant_for_dtype};13use polars_expr::state::ExecutionState;14use polars_utils::IdxSize;15use polars_utils::pl_str::PlSmallStr;1617use crate::async_primitives::wait_group::WaitToken;18use crate::expression::StreamExpr;19use crate::morsel::Morsel;20use crate::nodes::io_sinks2::components::exclude_keys_projection::ExcludeKeysProjection;21use crate::nodes::io_sinks2::components::partition_key::{PartitionKey, PreComputedKeys};22use crate::nodes::io_sinks2::components::size::RowCountAndSize;2324pub struct PartitionedDataFrames {25pub partitions_vec: Vec<Partition>,26pub input_size: RowCountAndSize,27pub input_wait_token: Option<WaitToken>,28}2930pub struct Partition {31pub key: PartitionKey,32/// 1-row df with keys.33pub keys_df: DataFrame,34/// Does not include columns in `keys_df`35pub df: DataFrame,36}3738pub enum Partitioner {39/// All rows to a single partition40FileSize,41Keyed(KeyedPartitioner),42}4344impl Partitioner {45pub async fn partition_morsel(46&self,47morsel: Morsel,48in_memory_exec_state: &ExecutionState,49) -> PolarsResult<PartitionedDataFrames> {50let (df, _, _, input_wait_token) = morsel.into_inner();51let input_size = RowCountAndSize::new_from_df(&df);52let partitions_vec = match self {53Self::FileSize => vec![Partition {54key: PartitionKey::NULL,55keys_df: DataFrame::empty_with_height(1),56df,57}],58Self::Keyed(v) => v.partition_df(df, in_memory_exec_state).await?,59};6061let out = PartitionedDataFrames {62partitions_vec,63input_size,64input_wait_token,65};6667Ok(out)68}6970pub fn verbose_display(&self) -> impl std::fmt::Display + '_ {71struct DisplayWrap<'a>(&'a Partitioner);7273impl std::fmt::Display for DisplayWrap<'_> {74fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {75match self.0 {76Partitioner::FileSize => f.write_str("FileSize"),77Partitioner::Keyed(kp) => write!(78f,79"Keyed({} key{})",80kp.key_exprs.len(),81if kp.key_exprs.len() == 1 { "" } else { "s" }82),83}84}85}8687DisplayWrap(self)88}89}9091pub struct KeyedPartitioner {92/// Must be non-empty93pub key_exprs: Vec<StreamExpr>,94/// Exclude key columns from full gather. Can be `None` if all key exprs output95/// names do not overlap with existing names.96pub exclude_keys_projection: Option<ExcludeKeysProjection>,97}9899impl KeyedPartitioner {100async fn partition_df(101&self,102df: DataFrame,103in_memory_exec_state: &ExecutionState,104) -> PolarsResult<Vec<Partition>> {105assert!(!self.key_exprs.is_empty());106107let mut key_columns = Vec::with_capacity(self.key_exprs.len());108109for e in self.key_exprs.as_slice() {110key_columns.push(e.evaluate(&df, in_memory_exec_state).await?);111}112113let mut pre_computed_keys: Option<PreComputedKeys> = None;114let single_non_encode = match key_columns.as_slice() {115[c] => {116pre_computed_keys = PreComputedKeys::opt_new_non_encoded(c);117hash_keys_variant_for_dtype(c.dtype()) != HashKeysVariant::RowEncoded118},119_ => false,120};121122let row_encode = |columns: &[Column]| -> BinaryOffsetChunked {123_get_rows_encoded_ca_unordered(PlSmallStr::EMPTY, columns)124.unwrap()125.rechunk()126.into_owned()127};128129let mut keys_encoded_ca: Option<BinaryOffsetChunked> =130(!single_non_encode).then(|| row_encode(&key_columns));131132let groups = if single_non_encode {133key_columns[0]134.as_materialized_series()135.group_tuples(false, false)136} else {137keys_encoded_ca.as_ref().unwrap().group_tuples(false, false)138}139.unwrap();140141if pre_computed_keys.is_none() {142if keys_encoded_ca.is_none() && groups.len() > (df.height() / 4) {143keys_encoded_ca = Some(row_encode(&key_columns));144}145146pre_computed_keys = keys_encoded_ca147.as_ref()148.map(|x| PreComputedKeys::RowEncoded(x.downcast_as_array().clone()));149}150151let gather_source_df: Cow<DataFrame> =152if let Some(projection) = self.exclude_keys_projection.as_ref() {153let columns = df.get_columns();154155Cow::Owned(if projection.len() == 0 {156DataFrame::empty_with_height(df.height())157} else {158projection159.iter_indices()160.map(|i| columns[i].clone())161.collect()162})163} else {164Cow::Borrowed(&df)165};166167let partitions_vec: Vec<Partition> = groups168.iter()169.map(|groups_indicator| {170let first_idx = groups_indicator.first();171let df = unsafe { gather_source_df.gather_group_unchecked(&groups_indicator) };172173// Ensure 0-width is handled properly.174assert_eq!(df.height(), groups_indicator.len());175176let keys_df: DataFrame = key_columns177.iter()178.map(|c| unsafe { c.take_slice_unchecked(&[first_idx]) })179.collect();180181let key: PartitionKey = pre_computed_keys.as_ref().map_or_else(182|| PartitionKey::from_slice(row_encode(keys_df.get_columns()).get(0).unwrap()),183|keys| keys.get_key(first_idx.try_into().unwrap()),184);185186Partition { key, keys_df, df }187})188.collect();189190Ok(partitions_vec)191}192}193194195