Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/hot_groups/row_encoded.rs
6940 views
1
use arrow::array::{BinaryArray, PrimitiveArray};
2
use arrow::buffer::Buffer;
3
use arrow::offset::{Offsets, OffsetsBuffer};
4
use polars_utils::vec::PushUnchecked;
5
6
use super::*;
7
use crate::hash_keys::RowEncodedKeys;
8
use crate::hot_groups::fixed_index_table::FixedIndexTable;
9
10
pub struct RowEncodedHashHotGrouper {
11
key_schema: Arc<Schema>,
12
table: FixedIndexTable<(u64, Vec<u8>)>,
13
evicted_key_hashes: Vec<u64>,
14
evicted_key_data: Vec<u8>,
15
evicted_key_offsets: Offsets<i64>,
16
}
17
18
impl RowEncodedHashHotGrouper {
19
pub fn new(key_schema: Arc<Schema>, max_groups: usize) -> Self {
20
Self {
21
key_schema,
22
table: FixedIndexTable::new(max_groups.try_into().unwrap()),
23
evicted_key_hashes: Vec::new(),
24
evicted_key_data: Vec::new(),
25
evicted_key_offsets: Offsets::new(),
26
}
27
}
28
}
29
30
impl HotGrouper for RowEncodedHashHotGrouper {
31
fn new_empty(&self, max_groups: usize) -> Box<dyn HotGrouper> {
32
Box::new(Self::new(self.key_schema.clone(), max_groups))
33
}
34
35
fn num_groups(&self) -> IdxSize {
36
self.table.len() as IdxSize
37
}
38
39
fn insert_keys(
40
&mut self,
41
keys: &HashKeys,
42
hot_idxs: &mut Vec<IdxSize>,
43
hot_group_idxs: &mut Vec<EvictIdx>,
44
cold_idxs: &mut Vec<IdxSize>,
45
force_hot: bool,
46
) {
47
let HashKeys::RowEncoded(keys) = keys else {
48
unreachable!()
49
};
50
51
hot_idxs.reserve(keys.hashes.len());
52
hot_group_idxs.reserve(keys.hashes.len());
53
cold_idxs.reserve(keys.hashes.len());
54
55
unsafe {
56
keys.for_each_hash(|idx, opt_h| {
57
if let Some(h) = opt_h {
58
let key = keys.keys.value_unchecked(idx as usize);
59
let opt_g = self.table.insert_key(
60
h,
61
key,
62
force_hot,
63
|a, b| *a == b.1,
64
|k| (h, k.to_owned()),
65
|k, ev_k| {
66
self.evicted_key_hashes.push(ev_k.0);
67
self.evicted_key_offsets.try_push(ev_k.1.len()).unwrap();
68
self.evicted_key_data.extend_from_slice(&ev_k.1);
69
ev_k.0 = h;
70
ev_k.1.clear();
71
ev_k.1.extend_from_slice(k);
72
},
73
);
74
if let Some(g) = opt_g {
75
hot_idxs.push_unchecked(idx as IdxSize);
76
hot_group_idxs.push_unchecked(g);
77
} else {
78
cold_idxs.push_unchecked(idx as IdxSize);
79
}
80
}
81
});
82
}
83
}
84
85
fn keys(&self) -> HashKeys {
86
unsafe {
87
let mut hashes = Vec::with_capacity(self.table.len());
88
let keys = LargeBinaryArray::from_trusted_len_values_iter(
89
self.table.keys().iter().map(|(h, k)| {
90
hashes.push_unchecked(*h);
91
k
92
}),
93
);
94
let hashes = PrimitiveArray::from_vec(hashes);
95
HashKeys::RowEncoded(RowEncodedKeys { hashes, keys })
96
}
97
}
98
99
fn num_evictions(&self) -> usize {
100
self.evicted_key_offsets.len_proxy()
101
}
102
103
fn take_evicted_keys(&mut self) -> HashKeys {
104
let hashes = PrimitiveArray::from_vec(core::mem::take(&mut self.evicted_key_hashes));
105
let values = Buffer::from(core::mem::take(&mut self.evicted_key_data));
106
let offsets = OffsetsBuffer::from(core::mem::take(&mut self.evicted_key_offsets));
107
let keys = BinaryArray::new(ArrowDataType::LargeBinary, offsets, values, None);
108
HashKeys::RowEncoded(RowEncodedKeys { hashes, keys })
109
}
110
111
fn as_any(&self) -> &dyn Any {
112
self
113
}
114
}
115
116