Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/idx_table/binview.rs
6940 views
1
#![allow(clippy::unnecessary_cast)] // Clippy doesn't recognize that IdxSize and u64 can be different.
2
#![allow(unsafe_op_in_unsafe_fn)]
3
4
use arrow::array::{Array, View};
5
use arrow::buffer::Buffer;
6
use polars_compute::binview_index_map::{BinaryViewIndexMap, Entry};
7
use polars_utils::idx_vec::UnitVec;
8
use polars_utils::itertools::Itertools;
9
use polars_utils::relaxed_cell::RelaxedCell;
10
use polars_utils::unitvec;
11
12
use super::*;
13
use crate::hash_keys::HashKeys;
14
15
pub struct BinviewKeyIdxTable {
16
// These AtomicU64s actually are IdxSizes, but we use the top bit of the
17
// first index in each to mark keys during probing.
18
idx_map: BinaryViewIndexMap<UnitVec<RelaxedCell<u64>>>,
19
idx_offset: IdxSize,
20
null_keys: Vec<IdxSize>,
21
nulls_emitted: RelaxedCell<bool>,
22
}
23
24
impl BinviewKeyIdxTable {
25
pub fn new() -> Self {
26
Self {
27
idx_map: BinaryViewIndexMap::default(),
28
idx_offset: 0,
29
null_keys: Vec::new(),
30
nulls_emitted: RelaxedCell::from(false),
31
}
32
}
33
34
/// # Safety
35
/// The view must be valid for the buffers.
36
#[inline(always)]
37
unsafe fn probe_one<const MARK_MATCHES: bool>(
38
&self,
39
key_idx: IdxSize,
40
hash: u64,
41
key: &View,
42
buffers: &[Buffer<u8>],
43
table_match: &mut Vec<IdxSize>,
44
probe_match: &mut Vec<IdxSize>,
45
) -> bool {
46
if let Some(idxs) = unsafe { self.idx_map.get_view(hash, key, buffers) } {
47
for idx in &idxs[..] {
48
// Create matches, making sure to clear top bit.
49
table_match.push((idx.load() & !(1 << 63)) as IdxSize);
50
probe_match.push(key_idx);
51
}
52
53
// Mark if necessary. This action is idempotent so doesn't need
54
// atomic fetch_or to do it atomically.
55
if MARK_MATCHES {
56
let first_idx = unsafe { idxs.get_unchecked(0) };
57
let first_idx_val = first_idx.load();
58
if first_idx_val >> 63 == 0 {
59
first_idx.store(first_idx_val | (1 << 63));
60
}
61
}
62
true
63
} else {
64
false
65
}
66
}
67
68
/// # Safety
69
/// The views must be valid for the buffers.
70
unsafe fn probe_impl<
71
'a,
72
const MARK_MATCHES: bool,
73
const EMIT_UNMATCHED: bool,
74
const NULL_IS_VALID: bool,
75
>(
76
&self,
77
keys: impl Iterator<Item = (IdxSize, u64, Option<&'a View>)>,
78
buffers: &[Buffer<u8>],
79
table_match: &mut Vec<IdxSize>,
80
probe_match: &mut Vec<IdxSize>,
81
limit: IdxSize,
82
) -> IdxSize {
83
let mut keys_processed = 0;
84
for (key_idx, hash, key) in keys {
85
let found_match = if let Some(key) = key {
86
self.probe_one::<MARK_MATCHES>(
87
key_idx,
88
hash,
89
key,
90
buffers,
91
table_match,
92
probe_match,
93
)
94
} else if NULL_IS_VALID {
95
for idx in &self.null_keys {
96
table_match.push(*idx);
97
probe_match.push(key_idx);
98
}
99
if MARK_MATCHES && !self.nulls_emitted.load() {
100
self.nulls_emitted.store(true);
101
}
102
!self.null_keys.is_empty()
103
} else {
104
false
105
};
106
107
if EMIT_UNMATCHED && !found_match {
108
table_match.push(IdxSize::MAX);
109
probe_match.push(key_idx);
110
}
111
112
keys_processed += 1;
113
if table_match.len() >= limit as usize {
114
break;
115
}
116
}
117
keys_processed
118
}
119
120
/// # Safety
121
/// The views must be valid for the buffers.
122
#[allow(clippy::too_many_arguments)]
123
unsafe fn probe_dispatch<'a>(
124
&self,
125
keys: impl Iterator<Item = (IdxSize, u64, Option<&'a View>)>,
126
buffers: &[Buffer<u8>],
127
table_match: &mut Vec<IdxSize>,
128
probe_match: &mut Vec<IdxSize>,
129
mark_matches: bool,
130
emit_unmatched: bool,
131
null_is_valid: bool,
132
limit: IdxSize,
133
) -> IdxSize {
134
match (mark_matches, emit_unmatched, null_is_valid) {
135
(false, false, false) => self.probe_impl::<false, false, false>(
136
keys,
137
buffers,
138
table_match,
139
probe_match,
140
limit,
141
),
142
(false, false, true) => self.probe_impl::<false, false, true>(
143
keys,
144
buffers,
145
table_match,
146
probe_match,
147
limit,
148
),
149
(false, true, false) => self.probe_impl::<false, true, false>(
150
keys,
151
buffers,
152
table_match,
153
probe_match,
154
limit,
155
),
156
(false, true, true) => {
157
self.probe_impl::<false, true, true>(keys, buffers, table_match, probe_match, limit)
158
},
159
(true, false, false) => self.probe_impl::<true, false, false>(
160
keys,
161
buffers,
162
table_match,
163
probe_match,
164
limit,
165
),
166
(true, false, true) => {
167
self.probe_impl::<true, false, true>(keys, buffers, table_match, probe_match, limit)
168
},
169
(true, true, false) => {
170
self.probe_impl::<true, true, false>(keys, buffers, table_match, probe_match, limit)
171
},
172
(true, true, true) => {
173
self.probe_impl::<true, true, true>(keys, buffers, table_match, probe_match, limit)
174
},
175
}
176
}
177
}
178
179
impl IdxTable for BinviewKeyIdxTable {
180
fn new_empty(&self) -> Box<dyn IdxTable> {
181
Box::new(Self::new())
182
}
183
184
fn reserve(&mut self, additional: usize) {
185
self.idx_map.reserve(additional);
186
}
187
188
fn num_keys(&self) -> IdxSize {
189
self.idx_map.len()
190
}
191
192
fn insert_keys(&mut self, _hash_keys: &HashKeys, _track_unmatchable: bool) {
193
// Isn't needed anymore, but also don't want to remove the code from the other implementations.
194
unimplemented!()
195
}
196
197
unsafe fn insert_keys_subset(
198
&mut self,
199
hash_keys: &HashKeys,
200
subset: &[IdxSize],
201
track_unmatchable: bool,
202
) {
203
let HashKeys::Binview(hash_keys) = hash_keys else {
204
unreachable!()
205
};
206
let new_idx_offset = (self.idx_offset as usize)
207
.checked_add(subset.len())
208
.unwrap();
209
assert!(
210
new_idx_offset < IdxSize::MAX as usize,
211
"overly large index in BinviewKeyIdxTable"
212
);
213
214
unsafe {
215
let buffers = hash_keys.keys.data_buffers();
216
let views = hash_keys.keys.views();
217
if let Some(validity) = hash_keys.keys.validity() {
218
for (i, subset_idx) in subset.iter().enumerate_idx() {
219
let hash = hash_keys.hashes.value_unchecked(*subset_idx as usize);
220
let key = views.get_unchecked(*subset_idx as usize);
221
let idx = self.idx_offset + i;
222
if validity.get_bit_unchecked(*subset_idx as usize) {
223
match self.idx_map.entry_view(hash, *key, buffers) {
224
Entry::Occupied(o) => {
225
o.into_mut().push(RelaxedCell::from(idx as u64));
226
},
227
Entry::Vacant(v) => {
228
v.insert(unitvec![RelaxedCell::from(idx as u64)]);
229
},
230
}
231
} else if track_unmatchable | hash_keys.null_is_valid {
232
self.null_keys.push(idx);
233
}
234
}
235
} else {
236
for (i, subset_idx) in subset.iter().enumerate_idx() {
237
let hash = hash_keys.hashes.value_unchecked(*subset_idx as usize);
238
let key = views.get_unchecked(*subset_idx as usize);
239
let idx = self.idx_offset + i;
240
match self.idx_map.entry_view(hash, *key, buffers) {
241
Entry::Occupied(o) => {
242
o.into_mut().push(RelaxedCell::from(idx as u64));
243
},
244
Entry::Vacant(v) => {
245
v.insert(unitvec![RelaxedCell::from(idx as u64)]);
246
},
247
}
248
}
249
}
250
}
251
252
self.idx_offset = new_idx_offset as IdxSize;
253
}
254
255
fn probe(
256
&self,
257
_hash_keys: &HashKeys,
258
_table_match: &mut Vec<IdxSize>,
259
_probe_match: &mut Vec<IdxSize>,
260
_mark_matches: bool,
261
_emit_unmatched: bool,
262
_limit: IdxSize,
263
) -> IdxSize {
264
// Isn't needed anymore, but also don't want to remove the code from the other implementations.
265
unimplemented!()
266
}
267
268
unsafe fn probe_subset(
269
&self,
270
hash_keys: &HashKeys,
271
subset: &[IdxSize],
272
table_match: &mut Vec<IdxSize>,
273
probe_match: &mut Vec<IdxSize>,
274
mark_matches: bool,
275
emit_unmatched: bool,
276
limit: IdxSize,
277
) -> IdxSize {
278
let HashKeys::Binview(hash_keys) = hash_keys else {
279
unreachable!()
280
};
281
282
unsafe {
283
let buffers = hash_keys.keys.data_buffers();
284
let views = hash_keys.keys.views();
285
if let Some(validity) = hash_keys.keys.validity() {
286
let iter = subset.iter().map(|i| {
287
(
288
*i,
289
hash_keys.hashes.value_unchecked(*i as usize),
290
if validity.get_bit_unchecked(*i as usize) {
291
Some(views.get_unchecked(*i as usize))
292
} else {
293
None
294
},
295
)
296
});
297
self.probe_dispatch(
298
iter,
299
buffers,
300
table_match,
301
probe_match,
302
mark_matches,
303
emit_unmatched,
304
hash_keys.null_is_valid,
305
limit,
306
)
307
} else {
308
let iter = subset.iter().map(|i| {
309
(
310
*i,
311
hash_keys.hashes.value_unchecked(*i as usize),
312
Some(views.get_unchecked(*i as usize)),
313
)
314
});
315
self.probe_dispatch(
316
iter,
317
buffers,
318
table_match,
319
probe_match,
320
mark_matches,
321
emit_unmatched,
322
false, // Whether or not nulls are valid doesn't matter.
323
limit,
324
)
325
}
326
}
327
}
328
329
fn unmarked_keys(
330
&self,
331
out: &mut Vec<IdxSize>,
332
mut offset: IdxSize,
333
limit: IdxSize,
334
) -> IdxSize {
335
out.clear();
336
337
let mut keys_processed = 0;
338
if !self.nulls_emitted.load() {
339
if (offset as usize) < self.null_keys.len() {
340
out.extend(
341
self.null_keys[offset as usize..]
342
.iter()
343
.copied()
344
.take(limit as usize),
345
);
346
keys_processed += out.len() as IdxSize;
347
offset += out.len() as IdxSize;
348
if out.len() >= limit as usize {
349
return keys_processed;
350
}
351
}
352
offset -= self.null_keys.len() as IdxSize;
353
}
354
355
while let Some((_, _, idxs)) = self.idx_map.get_index(offset) {
356
let first_idx = unsafe { idxs.get_unchecked(0) };
357
let first_idx_val = first_idx.load();
358
if first_idx_val >> 63 == 0 {
359
for idx in &idxs[..] {
360
out.push((idx.load() & !(1 << 63)) as IdxSize);
361
}
362
}
363
364
keys_processed += 1;
365
offset += 1;
366
if out.len() >= limit as usize {
367
break;
368
}
369
}
370
371
keys_processed
372
}
373
}
374
375