Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/hot_groups/binview.rs
6940 views
1
use arrow::array::builder::StaticArrayBuilder;
2
use arrow::array::{BinaryViewArrayGenericBuilder, PrimitiveArray, View};
3
use arrow::bitmap::MutableBitmap;
4
use arrow::buffer::Buffer;
5
use polars_utils::vec::PushUnchecked;
6
7
use super::*;
8
use crate::hash_keys::BinviewKeys;
9
use crate::hot_groups::fixed_index_table::FixedIndexTable;
10
11
pub struct BinviewHashHotGrouper {
12
// The views in this table when not inline are stored in the vec.
13
table: FixedIndexTable<(u64, View, Vec<u8>)>,
14
evicted_key_hashes: Vec<u64>,
15
evicted_keys: BinaryViewArrayGenericBuilder<[u8]>,
16
null_idx: IdxSize,
17
}
18
19
impl BinviewHashHotGrouper {
20
pub fn new(max_groups: usize) -> Self {
21
Self {
22
table: FixedIndexTable::new(max_groups.try_into().unwrap()),
23
evicted_key_hashes: Vec::new(),
24
evicted_keys: BinaryViewArrayGenericBuilder::new(ArrowDataType::BinaryView),
25
null_idx: IdxSize::MAX,
26
}
27
}
28
29
/// # Safety
30
/// The view must be valid for the given buffer set.
31
#[inline(always)]
32
unsafe fn insert_key(
33
&mut self,
34
hash: u64,
35
view: View,
36
force_hot: bool,
37
buffers: &Arc<[Buffer<u8>]>,
38
) -> Option<EvictIdx> {
39
unsafe {
40
let mut evict = |ev_h: &u64, ev_view: &View, ev_buffer: &Vec<u8>| {
41
self.evicted_key_hashes.push(*ev_h);
42
if ev_view.is_inline() {
43
self.evicted_keys.push_inline_view_ignore_validity(*ev_view);
44
} else {
45
self.evicted_keys
46
.push_value_ignore_validity(ev_buffer.as_slice());
47
}
48
};
49
if view.is_inline() {
50
self.table.insert_key(
51
hash,
52
(),
53
force_hot,
54
|_, b| view == b.1,
55
|_| (hash, view, Vec::new()),
56
|_, ev_k| {
57
let (ev_h, ev_view, ev_buffer) = ev_k;
58
evict(ev_h, ev_view, ev_buffer);
59
*ev_h = hash;
60
*ev_view = view;
61
ev_buffer.clear();
62
},
63
)
64
} else {
65
let bytes = view.get_external_slice_unchecked(buffers);
66
self.table.insert_key(
67
hash,
68
(),
69
force_hot,
70
|_, b| {
71
// We only reach here if the hash matched, so jump straight to full comparison.
72
bytes == b.2
73
},
74
|_| (hash, view, bytes.to_vec()),
75
|_, ev_k| {
76
let (ev_h, ev_view, ev_buffer) = ev_k;
77
evict(ev_h, ev_view, ev_buffer);
78
*ev_h = hash;
79
*ev_view = view;
80
ev_buffer.clear();
81
ev_buffer.extend_from_slice(bytes);
82
},
83
)
84
}
85
}
86
}
87
88
#[inline(always)]
89
fn insert_null(&mut self) -> Option<EvictIdx> {
90
if self.null_idx == IdxSize::MAX {
91
self.null_idx = self
92
.table
93
.push_unmapped_key((0, View::default(), Vec::new()));
94
}
95
Some(EvictIdx::new(self.null_idx, false))
96
}
97
}
98
99
impl HotGrouper for BinviewHashHotGrouper {
100
fn new_empty(&self, max_groups: usize) -> Box<dyn HotGrouper> {
101
Box::new(Self::new(max_groups))
102
}
103
104
fn num_groups(&self) -> IdxSize {
105
self.table.len() as IdxSize
106
}
107
108
fn insert_keys(
109
&mut self,
110
hash_keys: &HashKeys,
111
hot_idxs: &mut Vec<IdxSize>,
112
hot_group_idxs: &mut Vec<EvictIdx>,
113
cold_idxs: &mut Vec<IdxSize>,
114
force_hot: bool,
115
) {
116
let HashKeys::Binview(hash_keys) = hash_keys else {
117
unreachable!()
118
};
119
120
hot_idxs.reserve(hash_keys.keys.len());
121
hot_group_idxs.reserve(hash_keys.keys.len());
122
cold_idxs.reserve(hash_keys.keys.len());
123
124
let mut push_g = |idx: usize, opt_g: Option<EvictIdx>| unsafe {
125
if let Some(g) = opt_g {
126
hot_idxs.push_unchecked(idx as IdxSize);
127
hot_group_idxs.push_unchecked(g);
128
} else {
129
cold_idxs.push_unchecked(idx as IdxSize);
130
}
131
};
132
133
unsafe {
134
let views = hash_keys.keys.views().as_slice();
135
let buffers = hash_keys.keys.data_buffers();
136
if hash_keys.null_is_valid {
137
hash_keys.for_each_hash(|idx, opt_h| {
138
if let Some(h) = opt_h {
139
let view = views.get_unchecked(idx as usize);
140
push_g(idx as usize, self.insert_key(h, *view, force_hot, buffers));
141
} else {
142
push_g(idx as usize, self.insert_null());
143
}
144
});
145
} else {
146
hash_keys.for_each_hash(|idx, opt_h| {
147
if let Some(h) = opt_h {
148
let view = views.get_unchecked(idx as usize);
149
push_g(idx as usize, self.insert_key(h, *view, force_hot, buffers));
150
}
151
});
152
}
153
}
154
}
155
156
fn keys(&self) -> HashKeys {
157
unsafe {
158
let mut hashes = Vec::with_capacity(self.table.len());
159
let mut keys_builder = BinaryViewArrayGenericBuilder::new(ArrowDataType::BinaryView);
160
keys_builder.reserve(self.table.len());
161
for (h, view, buf) in self.table.keys() {
162
hashes.push_unchecked(*h);
163
if view.is_inline() {
164
keys_builder.push_inline_view_ignore_validity(*view);
165
} else {
166
keys_builder.push_value_ignore_validity(buf.as_slice());
167
}
168
}
169
170
let hashes = PrimitiveArray::from_vec(hashes);
171
let mut keys = keys_builder.freeze();
172
let null_is_valid = self.null_idx < IdxSize::MAX;
173
if null_is_valid {
174
let mut validity = MutableBitmap::new();
175
validity.extend_constant(keys.len(), true);
176
validity.set(self.null_idx as usize, false);
177
keys = keys.with_validity_typed(Some(validity.freeze()));
178
}
179
HashKeys::Binview(BinviewKeys {
180
hashes,
181
keys,
182
null_is_valid,
183
})
184
}
185
}
186
187
fn num_evictions(&self) -> usize {
188
self.evicted_keys.len()
189
}
190
191
fn take_evicted_keys(&mut self) -> HashKeys {
192
let hashes = core::mem::take(&mut self.evicted_key_hashes);
193
let keys = self.evicted_keys.freeze_reset();
194
HashKeys::Binview(BinviewKeys {
195
hashes: PrimitiveArray::from_vec(hashes),
196
keys,
197
null_is_valid: false,
198
})
199
}
200
201
fn as_any(&self) -> &dyn Any {
202
self
203
}
204
}
205
206