Path: blob/main/crates/polars-expr/src/hot_groups/row_encoded.rs
6940 views
use arrow::array::{BinaryArray, PrimitiveArray};1use arrow::buffer::Buffer;2use arrow::offset::{Offsets, OffsetsBuffer};3use polars_utils::vec::PushUnchecked;45use super::*;6use crate::hash_keys::RowEncodedKeys;7use crate::hot_groups::fixed_index_table::FixedIndexTable;89pub struct RowEncodedHashHotGrouper {10key_schema: Arc<Schema>,11table: FixedIndexTable<(u64, Vec<u8>)>,12evicted_key_hashes: Vec<u64>,13evicted_key_data: Vec<u8>,14evicted_key_offsets: Offsets<i64>,15}1617impl RowEncodedHashHotGrouper {18pub fn new(key_schema: Arc<Schema>, max_groups: usize) -> Self {19Self {20key_schema,21table: FixedIndexTable::new(max_groups.try_into().unwrap()),22evicted_key_hashes: Vec::new(),23evicted_key_data: Vec::new(),24evicted_key_offsets: Offsets::new(),25}26}27}2829impl HotGrouper for RowEncodedHashHotGrouper {30fn new_empty(&self, max_groups: usize) -> Box<dyn HotGrouper> {31Box::new(Self::new(self.key_schema.clone(), max_groups))32}3334fn num_groups(&self) -> IdxSize {35self.table.len() as IdxSize36}3738fn insert_keys(39&mut self,40keys: &HashKeys,41hot_idxs: &mut Vec<IdxSize>,42hot_group_idxs: &mut Vec<EvictIdx>,43cold_idxs: &mut Vec<IdxSize>,44force_hot: bool,45) {46let HashKeys::RowEncoded(keys) = keys else {47unreachable!()48};4950hot_idxs.reserve(keys.hashes.len());51hot_group_idxs.reserve(keys.hashes.len());52cold_idxs.reserve(keys.hashes.len());5354unsafe {55keys.for_each_hash(|idx, opt_h| {56if let Some(h) = opt_h {57let key = keys.keys.value_unchecked(idx as usize);58let opt_g = self.table.insert_key(59h,60key,61force_hot,62|a, b| *a == b.1,63|k| (h, k.to_owned()),64|k, ev_k| {65self.evicted_key_hashes.push(ev_k.0);66self.evicted_key_offsets.try_push(ev_k.1.len()).unwrap();67self.evicted_key_data.extend_from_slice(&ev_k.1);68ev_k.0 = h;69ev_k.1.clear();70ev_k.1.extend_from_slice(k);71},72);73if let Some(g) = opt_g {74hot_idxs.push_unchecked(idx as IdxSize);75hot_group_idxs.push_unchecked(g);76} else {77cold_idxs.push_unchecked(idx as IdxSize);78}79}80});81}82}8384fn keys(&self) -> HashKeys {85unsafe {86let mut hashes = Vec::with_capacity(self.table.len());87let keys = LargeBinaryArray::from_trusted_len_values_iter(88self.table.keys().iter().map(|(h, k)| {89hashes.push_unchecked(*h);90k91}),92);93let hashes = PrimitiveArray::from_vec(hashes);94HashKeys::RowEncoded(RowEncodedKeys { hashes, keys })95}96}9798fn num_evictions(&self) -> usize {99self.evicted_key_offsets.len_proxy()100}101102fn take_evicted_keys(&mut self) -> HashKeys {103let hashes = PrimitiveArray::from_vec(core::mem::take(&mut self.evicted_key_hashes));104let values = Buffer::from(core::mem::take(&mut self.evicted_key_data));105let offsets = OffsetsBuffer::from(core::mem::take(&mut self.evicted_key_offsets));106let keys = BinaryArray::new(ArrowDataType::LargeBinary, offsets, values, None);107HashKeys::RowEncoded(RowEncodedKeys { hashes, keys })108}109110fn as_any(&self) -> &dyn Any {111self112}113}114115116