Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/groups/binview.rs
6940 views
1
use arrow::array::{Array, BinaryViewArrayGeneric, View, ViewType};
2
use arrow::bitmap::{Bitmap, MutableBitmap};
3
use arrow::buffer::Buffer;
4
use polars_compute::binview_index_map::{BinaryViewIndexMap, Entry};
5
6
use super::*;
7
use crate::hash_keys::HashKeys;
8
9
#[derive(Default)]
10
pub struct BinviewHashGrouper {
11
idx_map: BinaryViewIndexMap<()>,
12
null_idx: IdxSize,
13
}
14
15
impl BinviewHashGrouper {
16
pub fn new() -> Self {
17
Self {
18
idx_map: BinaryViewIndexMap::default(),
19
null_idx: IdxSize::MAX,
20
}
21
}
22
23
/// # Safety
24
/// The view must be valid for the given buffer set.
25
#[inline(always)]
26
unsafe fn insert_key(&mut self, hash: u64, view: View, buffers: &Arc<[Buffer<u8>]>) -> IdxSize {
27
unsafe {
28
match self.idx_map.entry_view(hash, view, buffers) {
29
Entry::Occupied(o) => o.index(),
30
Entry::Vacant(v) => {
31
let index = v.index();
32
v.insert(());
33
index
34
},
35
}
36
}
37
}
38
39
#[inline(always)]
40
fn insert_null(&mut self) -> IdxSize {
41
if self.null_idx == IdxSize::MAX {
42
self.null_idx = self.idx_map.push_unmapped_empty_entry(());
43
}
44
self.null_idx
45
}
46
47
/// # Safety
48
/// The view must be valid for the given buffer set.
49
#[inline(always)]
50
unsafe fn contains_key(&self, hash: u64, view: &View, buffers: &Arc<[Buffer<u8>]>) -> bool {
51
unsafe { self.idx_map.get_view(hash, view, buffers).is_some() }
52
}
53
54
#[inline(always)]
55
fn contains_null(&self) -> bool {
56
self.null_idx < IdxSize::MAX
57
}
58
59
/// # Safety
60
/// The views must be valid for the given buffers.
61
unsafe fn finalize_keys<V: ViewType + ?Sized>(
62
&self,
63
schema: &Schema,
64
views: Buffer<View>,
65
buffers: Arc<[Buffer<u8>]>,
66
validity: Option<Bitmap>,
67
) -> DataFrame {
68
let (name, dtype) = schema.get_at_index(0).unwrap();
69
unsafe {
70
let arrow_dtype = dtype.to_arrow(CompatLevel::newest());
71
let keys = BinaryViewArrayGeneric::<V>::new_unchecked_unknown_md(
72
arrow_dtype,
73
views,
74
buffers,
75
validity,
76
None,
77
);
78
let s =
79
Series::from_chunks_and_dtype_unchecked(name.clone(), vec![Box::new(keys)], dtype);
80
DataFrame::new(vec![Column::from(s)]).unwrap()
81
}
82
}
83
}
84
85
impl Grouper for BinviewHashGrouper {
86
fn new_empty(&self) -> Box<dyn Grouper> {
87
Box::new(Self::new())
88
}
89
90
fn reserve(&mut self, additional: usize) {
91
self.idx_map.reserve(additional);
92
}
93
94
fn num_groups(&self) -> IdxSize {
95
self.idx_map.len()
96
}
97
98
unsafe fn insert_keys_subset(
99
&mut self,
100
hash_keys: &HashKeys,
101
subset: &[IdxSize],
102
group_idxs: Option<&mut Vec<IdxSize>>,
103
) {
104
let HashKeys::Binview(hash_keys) = hash_keys else {
105
unreachable!()
106
};
107
108
unsafe {
109
let views = hash_keys.keys.views().as_slice();
110
let buffers = hash_keys.keys.data_buffers();
111
if let Some(validity) = hash_keys.keys.validity() {
112
if hash_keys.null_is_valid {
113
let groups = subset.iter().map(|idx| {
114
if validity.get_bit_unchecked(*idx as usize) {
115
let hash = hash_keys.hashes.value_unchecked(*idx as usize);
116
let view = views.get_unchecked(*idx as usize);
117
self.insert_key(hash, *view, buffers)
118
} else {
119
self.insert_null()
120
}
121
});
122
if let Some(group_idxs) = group_idxs {
123
group_idxs.reserve(subset.len());
124
group_idxs.extend(groups);
125
} else {
126
groups.for_each(drop);
127
}
128
} else {
129
let groups = subset.iter().filter_map(|idx| {
130
if validity.get_bit_unchecked(*idx as usize) {
131
let hash = hash_keys.hashes.value_unchecked(*idx as usize);
132
let view = views.get_unchecked(*idx as usize);
133
Some(self.insert_key(hash, *view, buffers))
134
} else {
135
None
136
}
137
});
138
if let Some(group_idxs) = group_idxs {
139
group_idxs.reserve(subset.len());
140
group_idxs.extend(groups);
141
} else {
142
groups.for_each(drop);
143
}
144
}
145
} else {
146
let groups = subset.iter().map(|idx| {
147
let hash = hash_keys.hashes.value_unchecked(*idx as usize);
148
let view = views.get_unchecked(*idx as usize);
149
self.insert_key(hash, *view, buffers)
150
});
151
if let Some(group_idxs) = group_idxs {
152
group_idxs.reserve(subset.len());
153
group_idxs.extend(groups);
154
} else {
155
groups.for_each(drop);
156
}
157
}
158
}
159
}
160
161
fn get_keys_in_group_order(&self, schema: &Schema) -> DataFrame {
162
let buffers: Arc<[_]> = self
163
.idx_map
164
.buffers()
165
.iter()
166
.map(|b| Buffer::from(b.to_vec()))
167
.collect();
168
let views = self.idx_map.iter_hash_views().map(|(_h, v)| v).collect();
169
let validity = if self.null_idx < IdxSize::MAX {
170
let mut validity = MutableBitmap::new();
171
validity.extend_constant(self.idx_map.len() as usize, true);
172
validity.set(self.null_idx as usize, false);
173
Some(validity.freeze())
174
} else {
175
None
176
};
177
178
unsafe {
179
let (_name, dt) = schema.get_at_index(0).unwrap();
180
match dt {
181
DataType::Binary => self.finalize_keys::<[u8]>(schema, views, buffers, validity),
182
DataType::String => self.finalize_keys::<str>(schema, views, buffers, validity),
183
_ => unreachable!(),
184
}
185
}
186
}
187
188
/// # Safety
189
/// All groupers must be a BinviewHashGrouper.
190
unsafe fn probe_partitioned_groupers(
191
&self,
192
groupers: &[Box<dyn Grouper>],
193
hash_keys: &HashKeys,
194
partitioner: &HashPartitioner,
195
invert: bool,
196
probe_matches: &mut Vec<IdxSize>,
197
) {
198
let HashKeys::Binview(hash_keys) = hash_keys else {
199
unreachable!()
200
};
201
assert!(partitioner.num_partitions() == groupers.len());
202
203
unsafe {
204
let null_p = partitioner.null_partition();
205
let buffers = hash_keys.keys.data_buffers();
206
let views = hash_keys.keys.views().as_slice();
207
hash_keys.for_each_hash(|idx, opt_h| {
208
let has_group = if let Some(h) = opt_h {
209
let p = partitioner.hash_to_partition(h);
210
let dyn_grouper: &dyn Grouper = &**groupers.get_unchecked(p);
211
let grouper =
212
&*(dyn_grouper as *const dyn Grouper as *const BinviewHashGrouper);
213
let view = views.get_unchecked(idx as usize);
214
grouper.contains_key(h, view, buffers)
215
} else {
216
let dyn_grouper: &dyn Grouper = &**groupers.get_unchecked(null_p);
217
let grouper =
218
&*(dyn_grouper as *const dyn Grouper as *const BinviewHashGrouper);
219
grouper.contains_null()
220
};
221
222
if has_group != invert {
223
probe_matches.push(idx);
224
}
225
});
226
}
227
}
228
229
/// # Safety
230
/// All groupers must be a BinviewHashGrouper.
231
unsafe fn contains_key_partitioned_groupers(
232
&self,
233
groupers: &[Box<dyn Grouper>],
234
hash_keys: &HashKeys,
235
partitioner: &HashPartitioner,
236
invert: bool,
237
contains_key: &mut BitmapBuilder,
238
) {
239
let HashKeys::Binview(hash_keys) = hash_keys else {
240
unreachable!()
241
};
242
assert!(partitioner.num_partitions() == groupers.len());
243
244
unsafe {
245
let null_p = partitioner.null_partition();
246
let buffers = hash_keys.keys.data_buffers();
247
let views = hash_keys.keys.views().as_slice();
248
hash_keys.for_each_hash(|idx, opt_h| {
249
let has_group = if let Some(h) = opt_h {
250
let p = partitioner.hash_to_partition(h);
251
let dyn_grouper: &dyn Grouper = &**groupers.get_unchecked(p);
252
let grouper =
253
&*(dyn_grouper as *const dyn Grouper as *const BinviewHashGrouper);
254
let view = views.get_unchecked(idx as usize);
255
grouper.contains_key(h, view, buffers)
256
} else {
257
let dyn_grouper: &dyn Grouper = &**groupers.get_unchecked(null_p);
258
let grouper =
259
&*(dyn_grouper as *const dyn Grouper as *const BinviewHashGrouper);
260
grouper.contains_null()
261
};
262
263
contains_key.push(has_group != invert);
264
});
265
}
266
}
267
268
fn as_any(&self) -> &dyn Any {
269
self
270
}
271
}
272
273