Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/reduce/first_last_nonnull.rs
7884 views
1
use polars_core::frame::row::AnyValueBufferTrusted;
2
use polars_core::with_match_physical_numeric_polars_type;
3
4
use super::first_last::{First, Last, replace_opt_bytes};
5
use super::*;
6
7
pub fn new_first_nonnull_reduction(dtype: DataType) -> Box<dyn GroupedReduction> {
8
new_nonnull_reduction_with_policy(dtype, First)
9
}
10
11
pub fn new_last_nonnull_reduction(dtype: DataType) -> Box<dyn GroupedReduction> {
12
new_nonnull_reduction_with_policy(dtype, Last)
13
}
14
15
fn new_nonnull_reduction_with_policy<P: NonNullPolicy + 'static>(
16
dtype: DataType,
17
policy: P,
18
) -> Box<dyn GroupedReduction> {
19
use DataType::*;
20
use VecGroupedReduction as VGR;
21
match dtype {
22
Boolean => Box::new(VecGroupedReduction::new(
23
dtype,
24
BoolFirstLastNonNullReducer(policy),
25
)),
26
_ if dtype.is_primitive_numeric()
27
|| dtype.is_temporal()
28
|| dtype.is_decimal()
29
|| dtype.is_categorical() =>
30
{
31
with_match_physical_numeric_polars_type!(dtype.to_physical(), |$T| {
32
Box::new(VGR::new(dtype, NumFirstLastNonNullReducer::<_, $T>(policy, PhantomData)))
33
})
34
},
35
String | Binary => Box::new(VecGroupedReduction::new(
36
dtype,
37
BinaryFirstLastNonNullReducer(policy),
38
)),
39
_ => Box::new(GenericFirstLastNonNullGroupedReduction::new(dtype, policy)),
40
}
41
}
42
43
enum FirstOrLast {
44
First,
45
Last,
46
}
47
trait NonNullPolicy: Copy + Send + Sync + 'static {
48
fn is_first_or_last(self) -> FirstOrLast;
49
fn index(self, len: usize) -> usize;
50
fn might_replace(self, new: u64, old: u64, seen: bool) -> bool;
51
}
52
53
impl NonNullPolicy for First {
54
fn is_first_or_last(self) -> FirstOrLast {
55
FirstOrLast::First
56
}
57
58
fn index(self, _len: usize) -> usize {
59
0
60
}
61
62
fn might_replace(self, new: u64, old: u64, seen: bool) -> bool {
63
// Subtracting 1 with wrapping leaves all order unchanged, except it
64
// makes 0 (no value) the largest possible.
65
// If an item has not yet been found, we still might replace, even if we are higher idx.
66
!seen || (new.wrapping_sub(1) < old.wrapping_sub(1))
67
}
68
}
69
70
impl NonNullPolicy for Last {
71
fn is_first_or_last(self) -> FirstOrLast {
72
FirstOrLast::Last
73
}
74
75
fn index(self, len: usize) -> usize {
76
len - 1
77
}
78
79
fn might_replace(self, new: u64, old: u64, seen: bool) -> bool {
80
// If an item has not yet been found, we still might replace, even if we are lower idx.
81
!seen || (new >= old)
82
}
83
}
84
85
struct NumFirstLastNonNullReducer<P: NonNullPolicy, T>(P, PhantomData<T>);
86
87
#[derive(Clone, Debug, Default)]
88
struct ValueForNonNull<T: Clone> {
89
value: Option<T>,
90
seq: u64,
91
seen: bool,
92
}
93
94
impl<P: NonNullPolicy, T> Clone for NumFirstLastNonNullReducer<P, T> {
95
fn clone(&self) -> Self {
96
Self(self.0, PhantomData)
97
}
98
}
99
100
impl<P, T> Reducer for NumFirstLastNonNullReducer<P, T>
101
where
102
P: NonNullPolicy,
103
T: PolarsNumericType,
104
{
105
type Dtype = T;
106
type Value = ValueForNonNull<T::Native>;
107
108
fn init(&self) -> Self::Value {
109
ValueForNonNull::default()
110
}
111
112
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
113
s.to_physical_repr()
114
}
115
116
fn combine(&self, old: &mut Self::Value, new: &Self::Value) {
117
if new.value.is_some() && self.0.might_replace(new.seq, old.seq, old.seen) {
118
old.value = new.value;
119
old.seq = new.seq;
120
old.seen = true;
121
}
122
}
123
124
fn reduce_one(&self, old: &mut Self::Value, new: Option<T::Native>, seq_id: u64) {
125
if new.is_some() && self.0.might_replace(seq_id, old.seq, old.seen) {
126
old.value = new;
127
old.seq = seq_id;
128
old.seen = true;
129
}
130
}
131
132
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {
133
if ca.is_empty() {
134
return;
135
}
136
137
if self.0.might_replace(seq_id, v.seq, v.seen) {
138
let val = if ca.has_nulls() {
139
match self.0.is_first_or_last() {
140
FirstOrLast::First => ca.first_non_null(),
141
FirstOrLast::Last => ca.last_non_null(),
142
}
143
} else {
144
Some(self.0.index(ca.len()))
145
}
146
// SAFETY: idx is vlid.
147
.and_then(|idx| unsafe { ca.get_unchecked(idx) });
148
149
if val.is_some() {
150
v.value = val;
151
v.seq = seq_id;
152
v.seen = true;
153
}
154
}
155
}
156
157
fn finish(
158
&self,
159
v: Vec<Self::Value>,
160
m: Option<Bitmap>,
161
dtype: &DataType,
162
) -> PolarsResult<Series> {
163
assert!(m.is_none()); // This should only be used with VecGroupedReduction.
164
let ca: ChunkedArray<T> = v
165
.into_iter()
166
.map(|red_val| red_val.value)
167
.collect_ca(PlSmallStr::EMPTY);
168
let s = ca.into_series();
169
unsafe { s.from_physical_unchecked(dtype) }
170
}
171
}
172
173
struct BinaryFirstLastNonNullReducer<P: NonNullPolicy>(P);
174
175
impl<P: NonNullPolicy> Clone for BinaryFirstLastNonNullReducer<P> {
176
fn clone(&self) -> Self {
177
Self(self.0)
178
}
179
}
180
181
impl<P> Reducer for BinaryFirstLastNonNullReducer<P>
182
where
183
P: NonNullPolicy,
184
{
185
type Dtype = BinaryType;
186
type Value = ValueForNonNull<Vec<u8>>;
187
188
fn init(&self) -> Self::Value {
189
ValueForNonNull::default()
190
}
191
192
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
193
Cow::Owned(s.cast(&DataType::Binary).unwrap())
194
}
195
196
fn combine(&self, old: &mut Self::Value, new: &Self::Value) {
197
if new.value.is_some() && self.0.might_replace(new.seq, old.seq, old.seen) {
198
old.value.clone_from(&new.value);
199
old.seq = new.seq;
200
old.seen = true;
201
}
202
}
203
204
fn reduce_one(&self, old: &mut Self::Value, new: Option<&[u8]>, seq_id: u64) {
205
if new.is_some() && self.0.might_replace(seq_id, old.seq, old.seen) {
206
replace_opt_bytes(&mut old.value, new);
207
old.seq = seq_id;
208
old.seen = true;
209
}
210
}
211
212
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {
213
if ca.is_empty() {
214
return;
215
}
216
if self.0.might_replace(seq_id, v.seq, v.seen) {
217
let val = if ca.has_nulls() {
218
match self.0.is_first_or_last() {
219
FirstOrLast::First => ca.first_non_null(),
220
FirstOrLast::Last => ca.last_non_null(),
221
}
222
} else {
223
Some(self.0.index(ca.len()))
224
}
225
.and_then(|idx| ca.get(idx));
226
227
if val.is_some() {
228
replace_opt_bytes(&mut v.value, val);
229
v.seq = seq_id;
230
v.seen = true;
231
}
232
}
233
}
234
235
fn finish(
236
&self,
237
v: Vec<Self::Value>,
238
m: Option<Bitmap>,
239
dtype: &DataType,
240
) -> PolarsResult<Series> {
241
assert!(m.is_none()); // This should only be used with VecGroupedReduction.
242
let ca: BinaryChunked = v
243
.into_iter()
244
.map(|ValueForNonNull { value, .. }| value)
245
.collect_ca(PlSmallStr::EMPTY);
246
ca.into_series().cast(dtype)
247
}
248
}
249
250
#[derive(Clone)]
251
struct BoolFirstLastNonNullReducer<P: NonNullPolicy>(P);
252
253
impl<P> Reducer for BoolFirstLastNonNullReducer<P>
254
where
255
P: NonNullPolicy,
256
{
257
type Dtype = BooleanType;
258
type Value = ValueForNonNull<bool>;
259
260
fn init(&self) -> Self::Value {
261
ValueForNonNull::default()
262
}
263
264
fn combine(&self, old: &mut Self::Value, new: &Self::Value) {
265
if new.value.is_some() && self.0.might_replace(new.seq, old.seq, old.seen) {
266
old.value = new.value;
267
old.seq = new.seq;
268
old.seen = new.seen;
269
}
270
}
271
272
fn reduce_one(&self, old: &mut Self::Value, new: Option<bool>, seq_id: u64) {
273
if new.is_some() && self.0.might_replace(seq_id, old.seq, old.seen) {
274
old.value = new;
275
old.seq = seq_id;
276
old.seen = true;
277
}
278
}
279
280
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {
281
if ca.is_empty() {
282
return;
283
}
284
if self.0.might_replace(seq_id, v.seq, v.seen) {
285
let val = if ca.has_nulls() {
286
match self.0.is_first_or_last() {
287
FirstOrLast::First => ca.first_non_null(),
288
FirstOrLast::Last => ca.last_non_null(),
289
}
290
} else {
291
Some(self.0.index(ca.len()))
292
}
293
.and_then(|idx| ca.get(idx));
294
295
if val.is_some() {
296
v.value = val;
297
v.seq = seq_id;
298
v.seen = true;
299
}
300
}
301
}
302
303
fn finish(
304
&self,
305
v: Vec<Self::Value>,
306
m: Option<Bitmap>,
307
_dtype: &DataType,
308
) -> PolarsResult<Series> {
309
assert!(m.is_none()); // This should only be used with VecGroupedReduction.
310
let ca: BooleanChunked = v
311
.into_iter()
312
.map(|ValueForNonNull { value, .. }| value)
313
.collect_ca(PlSmallStr::EMPTY);
314
Ok(ca.into_series())
315
}
316
}
317
318
struct GenericFirstLastNonNullGroupedReduction<P: NonNullPolicy> {
319
in_dtype: DataType,
320
policy: P,
321
values: Vec<AnyValue<'static>>,
322
seqs: Vec<u64>,
323
seen: MutableBitmap,
324
evicted_values: Vec<AnyValue<'static>>,
325
evicted_seqs: Vec<u64>,
326
evicted_seen: BitmapBuilder,
327
}
328
329
impl<P: NonNullPolicy> GenericFirstLastNonNullGroupedReduction<P> {
330
fn new(in_dtype: DataType, policy: P) -> Self {
331
Self {
332
in_dtype,
333
policy,
334
values: Vec::new(),
335
seqs: Vec::new(),
336
seen: MutableBitmap::new(),
337
evicted_values: Vec::new(),
338
evicted_seqs: Vec::new(),
339
evicted_seen: BitmapBuilder::new(),
340
}
341
}
342
}
343
344
impl<P: NonNullPolicy + 'static> GroupedReduction for GenericFirstLastNonNullGroupedReduction<P> {
345
fn new_empty(&self) -> Box<dyn GroupedReduction> {
346
Box::new(Self::new(self.in_dtype.clone(), self.policy))
347
}
348
349
fn reserve(&mut self, additional: usize) {
350
self.values.reserve(additional);
351
self.seqs.reserve(additional);
352
self.seen.reserve(additional);
353
}
354
355
fn resize(&mut self, num_groups: IdxSize) {
356
self.values.resize(num_groups as usize, AnyValue::Null);
357
self.seqs.resize(num_groups as usize, 0);
358
self.seen.resize(num_groups as usize, false);
359
}
360
361
fn update_group(
362
&mut self,
363
values: &[&Column],
364
group_idx: IdxSize,
365
seq_id: u64,
366
) -> PolarsResult<()> {
367
let &[values] = values else { unreachable!() };
368
assert!(values.dtype() == &self.in_dtype);
369
if !values.is_empty() {
370
let seq_id = seq_id + 1; // We use 0 for 'no value'.
371
if self.policy.might_replace(
372
seq_id,
373
self.seqs[group_idx as usize],
374
self.seen.get(group_idx as usize),
375
) {
376
let val = if values.has_nulls() {
377
match self.policy.is_first_or_last() {
378
FirstOrLast::First => values
379
.as_materialized_series_maintain_scalar()
380
.first_non_null()
381
.into_value(),
382
FirstOrLast::Last => values
383
.as_materialized_series_maintain_scalar()
384
.last_non_null()
385
.into_value(),
386
}
387
} else {
388
// SAFETY: index is valid.
389
unsafe { values.get_unchecked(self.policy.index(values.len())) }
390
}
391
.into_static();
392
393
if !val.is_null() {
394
self.values[group_idx as usize] = val;
395
self.seqs[group_idx as usize] = seq_id;
396
self.seen.set(group_idx as usize, true);
397
}
398
}
399
}
400
Ok(())
401
}
402
403
unsafe fn update_groups_while_evicting(
404
&mut self,
405
values: &[&Column],
406
subset: &[IdxSize],
407
group_idxs: &[EvictIdx],
408
seq_id: u64,
409
) -> PolarsResult<()> {
410
let &[values] = values else { unreachable!() };
411
assert!(values.dtype() == &self.in_dtype);
412
assert!(subset.len() == group_idxs.len());
413
let seq_id = seq_id + 1; // We use 0 for 'no value'.
414
for (i, g) in subset.iter().zip(group_idxs) {
415
let idx = g.idx();
416
let grp_val = self.values.get_unchecked_mut(idx);
417
let grp_seq = self.seqs.get_unchecked_mut(idx);
418
if g.should_evict() {
419
self.evicted_values
420
.push(core::mem::replace(grp_val, AnyValue::Null));
421
self.evicted_seqs.push(core::mem::replace(grp_seq, 0));
422
self.evicted_seen.push(self.seen.get_unchecked(idx));
423
self.seen.set_unchecked(idx, false);
424
}
425
if self
426
.policy
427
.might_replace(seq_id, *grp_seq, self.seen.get_unchecked(idx))
428
{
429
let val = values.get_unchecked(*i as usize).into_static();
430
if !val.is_null() {
431
*grp_val = values.get_unchecked(*i as usize).into_static();
432
*grp_seq = seq_id;
433
self.seen.set_unchecked(idx, true);
434
}
435
}
436
}
437
Ok(())
438
}
439
440
unsafe fn combine_subset(
441
&mut self,
442
other: &dyn GroupedReduction,
443
subset: &[IdxSize],
444
group_idxs: &[IdxSize],
445
) -> PolarsResult<()> {
446
let other = other.as_any().downcast_ref::<Self>().unwrap();
447
assert!(self.in_dtype == other.in_dtype);
448
assert!(subset.len() == group_idxs.len());
449
for (i, g) in group_idxs.iter().enumerate() {
450
let si = *subset.get_unchecked(i) as usize;
451
if self.policy.might_replace(
452
*other.seqs.get_unchecked(si),
453
*self.seqs.get_unchecked(*g as usize),
454
self.seen.get_unchecked(*g as usize),
455
) {
456
let val = other.values.get_unchecked(si);
457
if !val.is_null() {
458
*self.values.get_unchecked_mut(*g as usize) = val.clone();
459
*self.seqs.get_unchecked_mut(*g as usize) = *other.seqs.get_unchecked(si);
460
self.seen.set_unchecked(*g as usize, true);
461
}
462
}
463
}
464
Ok(())
465
}
466
467
fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
468
Box::new(Self {
469
in_dtype: self.in_dtype.clone(),
470
policy: self.policy,
471
values: core::mem::take(&mut self.evicted_values),
472
seqs: core::mem::take(&mut self.evicted_seqs),
473
seen: core::mem::take(&mut self.evicted_seen).into_mut(),
474
evicted_values: Vec::new(),
475
evicted_seqs: Vec::new(),
476
evicted_seen: BitmapBuilder::new(),
477
})
478
}
479
480
fn finalize(&mut self) -> PolarsResult<Series> {
481
self.seqs.clear();
482
let phys_type = self.in_dtype.to_physical();
483
let mut buf = AnyValueBufferTrusted::new(&phys_type, self.values.len());
484
for v in core::mem::take(&mut self.values) {
485
// SAFETY: v is cast to physical.
486
unsafe { buf.add_unchecked_owned_physical(&v.to_physical()) };
487
}
488
// SAFETY: dtype is valid for series.
489
unsafe { buf.into_series().from_physical_unchecked(&self.in_dtype) }
490
}
491
492
fn as_any(&self) -> &dyn Any {
493
self
494
}
495
}
496
497