Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/hot_groups/single_key.rs
6940 views
1
use std::hash::BuildHasher;
2
3
use arrow::array::Array;
4
use arrow::bitmap::MutableBitmap;
5
use polars_utils::total_ord::{BuildHasherTotalExt, TotalEq, TotalHash};
6
use polars_utils::vec::PushUnchecked;
7
8
use super::*;
9
use crate::hash_keys::SingleKeys;
10
use crate::hot_groups::fixed_index_table::FixedIndexTable;
11
12
pub struct SingleKeyHashHotGrouper<T: PolarsDataType> {
13
dtype: DataType,
14
table: FixedIndexTable<T::Physical<'static>>,
15
evicted_keys: Vec<T::Physical<'static>>,
16
null_idx: IdxSize,
17
random_state: PlRandomState,
18
}
19
20
impl<K, T: PolarsDataType> SingleKeyHashHotGrouper<T>
21
where
22
for<'a> T: PolarsDataType<Physical<'a> = K>,
23
K: Default + TotalHash + TotalEq + Send + Sync + 'static,
24
{
25
pub fn new(dtype: DataType, max_groups: usize) -> Self {
26
Self {
27
dtype,
28
table: FixedIndexTable::new(max_groups.try_into().unwrap()),
29
evicted_keys: Vec::new(),
30
null_idx: IdxSize::MAX,
31
random_state: PlRandomState::default(),
32
}
33
}
34
35
#[inline(always)]
36
fn insert_key<R: BuildHasher>(
37
&mut self,
38
k: T::Physical<'static>,
39
force_hot: bool,
40
random_state: &R,
41
) -> Option<EvictIdx> {
42
let h = random_state.tot_hash_one(&k);
43
self.table.insert_key(
44
h,
45
k,
46
force_hot,
47
|a, b| a.tot_eq(b),
48
|k| k,
49
|k, ev_k| self.evicted_keys.push(core::mem::replace(ev_k, k)),
50
)
51
}
52
53
#[inline(always)]
54
fn insert_null(&mut self) -> Option<EvictIdx> {
55
if self.null_idx == IdxSize::MAX {
56
self.null_idx = self.table.push_unmapped_key(T::Physical::default());
57
}
58
Some(EvictIdx::new(self.null_idx, false))
59
}
60
61
fn finalize_keys(&self, keys: Vec<T::Physical<'static>>, add_mask: bool) -> HashKeys {
62
let mut keys = T::Array::from_vec(
63
keys,
64
self.dtype.to_physical().to_arrow(CompatLevel::newest()),
65
);
66
if add_mask && self.null_idx < IdxSize::MAX {
67
let mut validity = MutableBitmap::new();
68
validity.extend_constant(keys.len(), true);
69
validity.set(self.null_idx as usize, false);
70
keys = keys.with_validity_typed(Some(validity.freeze()));
71
}
72
73
unsafe {
74
let s = Series::from_chunks_and_dtype_unchecked(
75
PlSmallStr::EMPTY,
76
vec![Box::new(keys)],
77
&self.dtype,
78
);
79
HashKeys::Single(SingleKeys {
80
keys: s,
81
null_is_valid: self.null_idx < IdxSize::MAX,
82
random_state: self.random_state,
83
})
84
}
85
}
86
}
87
88
impl<K, T> HotGrouper for SingleKeyHashHotGrouper<T>
89
where
90
for<'a> T: PolarsPhysicalType<Physical<'a> = K>,
91
K: Default + TotalHash + TotalEq + Clone + Send + Sync + 'static,
92
{
93
fn new_empty(&self, max_groups: usize) -> Box<dyn HotGrouper> {
94
Box::new(Self::new(self.dtype.clone(), max_groups))
95
}
96
97
fn num_groups(&self) -> IdxSize {
98
self.table.len() as IdxSize
99
}
100
101
fn insert_keys(
102
&mut self,
103
hash_keys: &HashKeys,
104
hot_idxs: &mut Vec<IdxSize>,
105
hot_group_idxs: &mut Vec<EvictIdx>,
106
cold_idxs: &mut Vec<IdxSize>,
107
force_hot: bool,
108
) {
109
let HashKeys::Single(hash_keys) = hash_keys else {
110
unreachable!()
111
};
112
113
// Preserve random state if non-empty.
114
if !hash_keys.keys.is_empty() {
115
self.random_state = hash_keys.random_state;
116
}
117
118
let keys: &ChunkedArray<T> = hash_keys.keys.as_phys_any().downcast_ref().unwrap();
119
hot_idxs.reserve(keys.len());
120
hot_group_idxs.reserve(keys.len());
121
cold_idxs.reserve(keys.len());
122
123
let mut push_g = |idx: usize, opt_g: Option<EvictIdx>| unsafe {
124
if let Some(g) = opt_g {
125
hot_idxs.push_unchecked(idx as IdxSize);
126
hot_group_idxs.push_unchecked(g);
127
} else {
128
cold_idxs.push_unchecked(idx as IdxSize);
129
}
130
};
131
132
let mut idx = 0;
133
for arr in keys.downcast_iter() {
134
if arr.has_nulls() {
135
if hash_keys.null_is_valid {
136
for opt_k in arr.iter() {
137
if let Some(k) = opt_k {
138
push_g(idx, self.insert_key(k, force_hot, &hash_keys.random_state));
139
} else {
140
push_g(idx, self.insert_null());
141
}
142
idx += 1;
143
}
144
} else {
145
for opt_k in arr.iter() {
146
if let Some(k) = opt_k {
147
push_g(idx, self.insert_key(k, force_hot, &hash_keys.random_state));
148
}
149
idx += 1;
150
}
151
}
152
} else {
153
for k in arr.values_iter() {
154
let g = self.insert_key(k, force_hot, &hash_keys.random_state);
155
push_g(idx, g);
156
idx += 1;
157
}
158
}
159
}
160
}
161
162
fn keys(&self) -> HashKeys {
163
self.finalize_keys(self.table.keys().to_vec(), true)
164
}
165
166
fn num_evictions(&self) -> usize {
167
self.evicted_keys.len()
168
}
169
170
fn take_evicted_keys(&mut self) -> HashKeys {
171
let keys = core::mem::take(&mut self.evicted_keys);
172
self.finalize_keys(keys, false)
173
}
174
175
fn as_any(&self) -> &dyn Any {
176
self
177
}
178
}
179
180