Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/reduce/count.rs
8416 views
1
#![allow(unsafe_op_in_unsafe_fn)]
2
use polars_core::error::constants::LENGTH_LIMIT_MSG;
3
4
use super::*;
5
6
pub struct CountReduce {
7
counts: Vec<u64>,
8
evicted_counts: Vec<u64>,
9
include_nulls: bool,
10
}
11
12
impl CountReduce {
13
pub fn new(include_nulls: bool) -> Self {
14
Self {
15
counts: Vec::new(),
16
evicted_counts: Vec::new(),
17
include_nulls,
18
}
19
}
20
}
21
22
impl GroupedReduction for CountReduce {
23
fn new_empty(&self) -> Box<dyn GroupedReduction> {
24
Box::new(Self::new(self.include_nulls))
25
}
26
27
fn reserve(&mut self, additional: usize) {
28
self.counts.reserve(additional);
29
}
30
31
fn resize(&mut self, num_groups: IdxSize) {
32
self.counts.resize(num_groups as usize, 0);
33
}
34
35
fn update_group(
36
&mut self,
37
values: &[&Column],
38
group_idx: IdxSize,
39
_seq_id: u64,
40
) -> PolarsResult<()> {
41
let &[values] = values else { unreachable!() };
42
let mut count = values.len();
43
if !self.include_nulls {
44
count -= values.null_count();
45
}
46
self.counts[group_idx as usize] += count as u64;
47
Ok(())
48
}
49
50
unsafe fn update_groups_while_evicting(
51
&mut self,
52
values: &[&Column],
53
subset: &[IdxSize],
54
group_idxs: &[EvictIdx],
55
_seq_id: u64,
56
) -> PolarsResult<()> {
57
let &[values] = values else { unreachable!() };
58
assert!(subset.len() == group_idxs.len());
59
let values = values.as_materialized_series(); // @scalar-opt
60
let chunks = values.chunks();
61
assert!(chunks.len() == 1);
62
let arr = &*chunks[0];
63
if arr.has_nulls() && !self.include_nulls {
64
let valid = arr.validity().unwrap();
65
for (i, g) in subset.iter().zip(group_idxs) {
66
let grp = self.counts.get_unchecked_mut(g.idx());
67
if g.should_evict() {
68
self.evicted_counts.push(*grp);
69
*grp = 0;
70
}
71
*grp += valid.get_bit_unchecked(*i as usize) as u64;
72
}
73
} else {
74
for (_, g) in subset.iter().zip(group_idxs) {
75
let grp = self.counts.get_unchecked_mut(g.idx());
76
if g.should_evict() {
77
self.evicted_counts.push(*grp);
78
*grp = 0;
79
}
80
*grp += 1;
81
}
82
}
83
Ok(())
84
}
85
86
unsafe fn combine_subset(
87
&mut self,
88
other: &dyn GroupedReduction,
89
subset: &[IdxSize],
90
group_idxs: &[IdxSize],
91
) -> PolarsResult<()> {
92
let other = other.as_any().downcast_ref::<Self>().unwrap();
93
assert!(subset.len() == group_idxs.len());
94
unsafe {
95
// SAFETY: indices are in-bounds guaranteed by trait.
96
for (i, g) in subset.iter().zip(group_idxs) {
97
*self.counts.get_unchecked_mut(*g as usize) +=
98
*other.counts.get_unchecked(*i as usize);
99
}
100
}
101
Ok(())
102
}
103
104
fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
105
Box::new(Self {
106
counts: core::mem::take(&mut self.evicted_counts),
107
evicted_counts: Vec::new(),
108
include_nulls: self.include_nulls,
109
})
110
}
111
112
fn finalize(&mut self) -> PolarsResult<Series> {
113
let ca: IdxCa = self
114
.counts
115
.drain(..)
116
.map(|l| IdxSize::try_from(l).expect(LENGTH_LIMIT_MSG))
117
.collect_ca(PlSmallStr::EMPTY);
118
Ok(ca.into_series())
119
}
120
121
fn as_any(&self) -> &dyn Any {
122
self
123
}
124
}
125
126
pub struct NullCountReduce {
127
counts: Vec<u64>,
128
evicted_counts: Vec<u64>,
129
}
130
131
impl NullCountReduce {
132
pub fn new() -> Self {
133
Self {
134
counts: Vec::new(),
135
evicted_counts: Vec::new(),
136
}
137
}
138
}
139
140
impl GroupedReduction for NullCountReduce {
141
fn new_empty(&self) -> Box<dyn GroupedReduction> {
142
Box::new(Self::new())
143
}
144
145
fn reserve(&mut self, additional: usize) {
146
self.counts.reserve(additional);
147
}
148
149
fn resize(&mut self, num_groups: IdxSize) {
150
self.counts.resize(num_groups as usize, 0);
151
}
152
153
fn update_group(
154
&mut self,
155
values: &[&Column],
156
group_idx: IdxSize,
157
_seq_id: u64,
158
) -> PolarsResult<()> {
159
let &[values] = values else { unreachable!() };
160
self.counts[group_idx as usize] += values.null_count() as u64;
161
Ok(())
162
}
163
164
unsafe fn update_groups_while_evicting(
165
&mut self,
166
values: &[&Column],
167
subset: &[IdxSize],
168
group_idxs: &[EvictIdx],
169
_seq_id: u64,
170
) -> PolarsResult<()> {
171
let &[values] = values else { unreachable!() };
172
assert!(subset.len() == group_idxs.len());
173
let values = values.as_materialized_series(); // @scalar-opt
174
let chunks = values.chunks();
175
assert!(chunks.len() == 1);
176
let arr = &*chunks[0];
177
if arr.has_nulls() {
178
let valid = arr.validity().unwrap();
179
for (i, g) in subset.iter().zip(group_idxs) {
180
let grp = self.counts.get_unchecked_mut(g.idx());
181
if g.should_evict() {
182
self.evicted_counts.push(*grp);
183
*grp = 0;
184
}
185
*grp += (!valid.get_bit_unchecked(*i as usize)) as u64;
186
}
187
} else {
188
for (_, g) in subset.iter().zip(group_idxs) {
189
let grp = self.counts.get_unchecked_mut(g.idx());
190
if g.should_evict() {
191
self.evicted_counts.push(*grp);
192
*grp = 0;
193
}
194
}
195
}
196
Ok(())
197
}
198
199
unsafe fn combine_subset(
200
&mut self,
201
other: &dyn GroupedReduction,
202
subset: &[IdxSize],
203
group_idxs: &[IdxSize],
204
) -> PolarsResult<()> {
205
let other = other.as_any().downcast_ref::<Self>().unwrap();
206
assert!(subset.len() == group_idxs.len());
207
unsafe {
208
// SAFETY: indices are in-bounds guaranteed by trait.
209
for (i, g) in subset.iter().zip(group_idxs) {
210
*self.counts.get_unchecked_mut(*g as usize) +=
211
*other.counts.get_unchecked(*i as usize);
212
}
213
}
214
Ok(())
215
}
216
217
fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
218
Box::new(Self {
219
counts: core::mem::take(&mut self.evicted_counts),
220
evicted_counts: Vec::new(),
221
})
222
}
223
224
fn finalize(&mut self) -> PolarsResult<Series> {
225
let ca: IdxCa = self
226
.counts
227
.drain(..)
228
.map(|l| IdxSize::try_from(l).expect(LENGTH_LIMIT_MSG))
229
.collect_ca(PlSmallStr::EMPTY);
230
Ok(ca.into_series())
231
}
232
233
fn as_any(&self) -> &dyn Any {
234
self
235
}
236
}
237
238