Path: blob/main/crates/polars-expr/src/hot_groups/single_key.rs
6940 views
use std::hash::BuildHasher;12use arrow::array::Array;3use arrow::bitmap::MutableBitmap;4use polars_utils::total_ord::{BuildHasherTotalExt, TotalEq, TotalHash};5use polars_utils::vec::PushUnchecked;67use super::*;8use crate::hash_keys::SingleKeys;9use crate::hot_groups::fixed_index_table::FixedIndexTable;1011pub struct SingleKeyHashHotGrouper<T: PolarsDataType> {12dtype: DataType,13table: FixedIndexTable<T::Physical<'static>>,14evicted_keys: Vec<T::Physical<'static>>,15null_idx: IdxSize,16random_state: PlRandomState,17}1819impl<K, T: PolarsDataType> SingleKeyHashHotGrouper<T>20where21for<'a> T: PolarsDataType<Physical<'a> = K>,22K: Default + TotalHash + TotalEq + Send + Sync + 'static,23{24pub fn new(dtype: DataType, max_groups: usize) -> Self {25Self {26dtype,27table: FixedIndexTable::new(max_groups.try_into().unwrap()),28evicted_keys: Vec::new(),29null_idx: IdxSize::MAX,30random_state: PlRandomState::default(),31}32}3334#[inline(always)]35fn insert_key<R: BuildHasher>(36&mut self,37k: T::Physical<'static>,38force_hot: bool,39random_state: &R,40) -> Option<EvictIdx> {41let h = random_state.tot_hash_one(&k);42self.table.insert_key(43h,44k,45force_hot,46|a, b| a.tot_eq(b),47|k| k,48|k, ev_k| self.evicted_keys.push(core::mem::replace(ev_k, k)),49)50}5152#[inline(always)]53fn insert_null(&mut self) -> Option<EvictIdx> {54if self.null_idx == IdxSize::MAX {55self.null_idx = self.table.push_unmapped_key(T::Physical::default());56}57Some(EvictIdx::new(self.null_idx, false))58}5960fn finalize_keys(&self, keys: Vec<T::Physical<'static>>, add_mask: bool) -> HashKeys {61let mut keys = T::Array::from_vec(62keys,63self.dtype.to_physical().to_arrow(CompatLevel::newest()),64);65if add_mask && self.null_idx < IdxSize::MAX {66let mut validity = MutableBitmap::new();67validity.extend_constant(keys.len(), true);68validity.set(self.null_idx as usize, false);69keys = keys.with_validity_typed(Some(validity.freeze()));70}7172unsafe {73let s = Series::from_chunks_and_dtype_unchecked(74PlSmallStr::EMPTY,75vec![Box::new(keys)],76&self.dtype,77);78HashKeys::Single(SingleKeys {79keys: s,80null_is_valid: self.null_idx < IdxSize::MAX,81random_state: self.random_state,82})83}84}85}8687impl<K, T> HotGrouper for SingleKeyHashHotGrouper<T>88where89for<'a> T: PolarsPhysicalType<Physical<'a> = K>,90K: Default + TotalHash + TotalEq + Clone + Send + Sync + 'static,91{92fn new_empty(&self, max_groups: usize) -> Box<dyn HotGrouper> {93Box::new(Self::new(self.dtype.clone(), max_groups))94}9596fn num_groups(&self) -> IdxSize {97self.table.len() as IdxSize98}99100fn insert_keys(101&mut self,102hash_keys: &HashKeys,103hot_idxs: &mut Vec<IdxSize>,104hot_group_idxs: &mut Vec<EvictIdx>,105cold_idxs: &mut Vec<IdxSize>,106force_hot: bool,107) {108let HashKeys::Single(hash_keys) = hash_keys else {109unreachable!()110};111112// Preserve random state if non-empty.113if !hash_keys.keys.is_empty() {114self.random_state = hash_keys.random_state;115}116117let keys: &ChunkedArray<T> = hash_keys.keys.as_phys_any().downcast_ref().unwrap();118hot_idxs.reserve(keys.len());119hot_group_idxs.reserve(keys.len());120cold_idxs.reserve(keys.len());121122let mut push_g = |idx: usize, opt_g: Option<EvictIdx>| unsafe {123if let Some(g) = opt_g {124hot_idxs.push_unchecked(idx as IdxSize);125hot_group_idxs.push_unchecked(g);126} else {127cold_idxs.push_unchecked(idx as IdxSize);128}129};130131let mut idx = 0;132for arr in keys.downcast_iter() {133if arr.has_nulls() {134if hash_keys.null_is_valid {135for opt_k in arr.iter() {136if let Some(k) = opt_k {137push_g(idx, self.insert_key(k, force_hot, &hash_keys.random_state));138} else {139push_g(idx, self.insert_null());140}141idx += 1;142}143} else {144for opt_k in arr.iter() {145if let Some(k) = opt_k {146push_g(idx, self.insert_key(k, force_hot, &hash_keys.random_state));147}148idx += 1;149}150}151} else {152for k in arr.values_iter() {153let g = self.insert_key(k, force_hot, &hash_keys.random_state);154push_g(idx, g);155idx += 1;156}157}158}159}160161fn keys(&self) -> HashKeys {162self.finalize_keys(self.table.keys().to_vec(), true)163}164165fn num_evictions(&self) -> usize {166self.evicted_keys.len()167}168169fn take_evicted_keys(&mut self) -> HashKeys {170let keys = core::mem::take(&mut self.evicted_keys);171self.finalize_keys(keys, false)172}173174fn as_any(&self) -> &dyn Any {175self176}177}178179180