Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/expressions/aggregation.rs
6940 views
1
use std::borrow::Cow;
2
3
use arrow::array::*;
4
use arrow::compute::concatenate::concatenate;
5
use arrow::legacy::utils::CustomIterTools;
6
use arrow::offset::Offsets;
7
use polars_compute::rolling::QuantileMethod;
8
use polars_core::POOL;
9
use polars_core::prelude::*;
10
use polars_core::series::IsSorted;
11
use polars_core::utils::{_split_offsets, NoNull};
12
#[cfg(feature = "propagate_nans")]
13
use polars_ops::prelude::nan_propagating_aggregate;
14
use rayon::prelude::*;
15
16
use super::*;
17
use crate::expressions::AggState::AggregatedScalar;
18
use crate::expressions::{
19
AggState, AggregationContext, PartitionedAggregation, PhysicalExpr, UpdateGroups,
20
};
21
22
#[derive(Debug, Clone, Copy)]
23
pub struct AggregationType {
24
pub(crate) groupby: GroupByMethod,
25
pub(crate) allow_threading: bool,
26
}
27
28
pub(crate) struct AggregationExpr {
29
pub(crate) input: Arc<dyn PhysicalExpr>,
30
pub(crate) agg_type: AggregationType,
31
field: Option<Field>,
32
}
33
34
impl AggregationExpr {
35
pub fn new(
36
expr: Arc<dyn PhysicalExpr>,
37
agg_type: AggregationType,
38
field: Option<Field>,
39
) -> Self {
40
Self {
41
input: expr,
42
agg_type,
43
field,
44
}
45
}
46
}
47
48
impl PhysicalExpr for AggregationExpr {
49
fn as_expression(&self) -> Option<&Expr> {
50
None
51
}
52
53
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
54
let s = self.input.evaluate(df, state)?;
55
56
let AggregationType {
57
groupby,
58
allow_threading,
59
} = self.agg_type;
60
61
let is_float = s.dtype().is_float();
62
let group_by = match groupby {
63
GroupByMethod::NanMin if !is_float => GroupByMethod::Min,
64
GroupByMethod::NanMax if !is_float => GroupByMethod::Max,
65
gb => gb,
66
};
67
68
match group_by {
69
GroupByMethod::Min => match s.is_sorted_flag() {
70
IsSorted::Ascending | IsSorted::Descending => {
71
s.min_reduce().map(|sc| sc.into_column(s.name().clone()))
72
},
73
IsSorted::Not => parallel_op_columns(
74
|s| s.min_reduce().map(|sc| sc.into_column(s.name().clone())),
75
s,
76
allow_threading,
77
),
78
},
79
#[cfg(feature = "propagate_nans")]
80
GroupByMethod::NanMin => parallel_op_columns(
81
|s| {
82
Ok(polars_ops::prelude::nan_propagating_aggregate::nan_min_s(
83
s.as_materialized_series(),
84
s.name().clone(),
85
)
86
.into_column())
87
},
88
s,
89
allow_threading,
90
),
91
#[cfg(not(feature = "propagate_nans"))]
92
GroupByMethod::NanMin => {
93
panic!("activate 'propagate_nans' feature")
94
},
95
GroupByMethod::Max => match s.is_sorted_flag() {
96
IsSorted::Ascending | IsSorted::Descending => {
97
s.max_reduce().map(|sc| sc.into_column(s.name().clone()))
98
},
99
IsSorted::Not => parallel_op_columns(
100
|s| s.max_reduce().map(|sc| sc.into_column(s.name().clone())),
101
s,
102
allow_threading,
103
),
104
},
105
#[cfg(feature = "propagate_nans")]
106
GroupByMethod::NanMax => parallel_op_columns(
107
|s| {
108
Ok(polars_ops::prelude::nan_propagating_aggregate::nan_max_s(
109
s.as_materialized_series(),
110
s.name().clone(),
111
)
112
.into_column())
113
},
114
s,
115
allow_threading,
116
),
117
#[cfg(not(feature = "propagate_nans"))]
118
GroupByMethod::NanMax => {
119
panic!("activate 'propagate_nans' feature")
120
},
121
GroupByMethod::Median => s.median_reduce().map(|sc| sc.into_column(s.name().clone())),
122
GroupByMethod::Mean => Ok(s.mean_reduce().into_column(s.name().clone())),
123
GroupByMethod::First => Ok(if s.is_empty() {
124
Column::full_null(s.name().clone(), 1, s.dtype())
125
} else {
126
s.head(Some(1))
127
}),
128
GroupByMethod::Last => Ok(if s.is_empty() {
129
Column::full_null(s.name().clone(), 1, s.dtype())
130
} else {
131
s.tail(Some(1))
132
}),
133
GroupByMethod::Sum => parallel_op_columns(
134
|s| s.sum_reduce().map(|sc| sc.into_column(s.name().clone())),
135
s,
136
allow_threading,
137
),
138
GroupByMethod::Groups => unreachable!(),
139
GroupByMethod::NUnique => s.n_unique().map(|count| {
140
IdxCa::from_slice(s.name().clone(), &[count as IdxSize]).into_column()
141
}),
142
GroupByMethod::Count { include_nulls } => {
143
let count = s.len() - s.null_count() * !include_nulls as usize;
144
145
Ok(IdxCa::from_slice(s.name().clone(), &[count as IdxSize]).into_column())
146
},
147
GroupByMethod::Implode => s.implode().map(|ca| ca.into_column()),
148
GroupByMethod::Std(ddof) => s
149
.std_reduce(ddof)
150
.map(|sc| sc.into_column(s.name().clone())),
151
GroupByMethod::Var(ddof) => s
152
.var_reduce(ddof)
153
.map(|sc| sc.into_column(s.name().clone())),
154
GroupByMethod::Quantile(_, _) => unimplemented!(),
155
}
156
}
157
#[allow(clippy::ptr_arg)]
158
fn evaluate_on_groups<'a>(
159
&self,
160
df: &DataFrame,
161
groups: &'a GroupPositions,
162
state: &ExecutionState,
163
) -> PolarsResult<AggregationContext<'a>> {
164
let mut ac = self.input.evaluate_on_groups(df, groups, state)?;
165
// don't change names by aggregations as is done in polars-core
166
let keep_name = ac.get_values().name().clone();
167
168
// Literals cannot be aggregated except for implode.
169
polars_ensure!(!matches!(ac.agg_state(), AggState::LiteralScalar(_)), ComputeError: "cannot aggregate a literal");
170
171
if let AggregatedScalar(_) = ac.agg_state() {
172
match self.agg_type.groupby {
173
GroupByMethod::Implode => {},
174
_ => {
175
polars_bail!(ComputeError: "cannot aggregate as {}, the column is already aggregated", self.agg_type.groupby);
176
},
177
}
178
}
179
180
// SAFETY:
181
// groups must always be in bounds.
182
let out = unsafe {
183
match self.agg_type.groupby {
184
GroupByMethod::Min => {
185
let (c, groups) = ac.get_final_aggregation();
186
let agg_c = c.agg_min(&groups);
187
AggregatedScalar(agg_c.with_name(keep_name))
188
},
189
GroupByMethod::Max => {
190
let (c, groups) = ac.get_final_aggregation();
191
let agg_c = c.agg_max(&groups);
192
AggregatedScalar(agg_c.with_name(keep_name))
193
},
194
GroupByMethod::Median => {
195
let (c, groups) = ac.get_final_aggregation();
196
let agg_c = c.agg_median(&groups);
197
AggregatedScalar(agg_c.with_name(keep_name))
198
},
199
GroupByMethod::Mean => {
200
let (c, groups) = ac.get_final_aggregation();
201
let agg_c = c.agg_mean(&groups);
202
AggregatedScalar(agg_c.with_name(keep_name))
203
},
204
GroupByMethod::Sum => {
205
let (c, groups) = ac.get_final_aggregation();
206
let agg_c = c.agg_sum(&groups);
207
AggregatedScalar(agg_c.with_name(keep_name))
208
},
209
GroupByMethod::Count { include_nulls } => {
210
if include_nulls || ac.get_values().null_count() == 0 {
211
// a few fast paths that prevent materializing new groups
212
match ac.update_groups {
213
UpdateGroups::WithSeriesLen => {
214
let list = ac
215
.get_values()
216
.list()
217
.expect("impl error, should be a list at this point");
218
219
let mut s = match list.chunks().len() {
220
1 => {
221
let arr = list.downcast_iter().next().unwrap();
222
let offsets = arr.offsets().as_slice();
223
224
let mut previous = 0i64;
225
let counts: NoNull<IdxCa> = offsets[1..]
226
.iter()
227
.map(|&o| {
228
let len = (o - previous) as IdxSize;
229
previous = o;
230
len
231
})
232
.collect_trusted();
233
counts.into_inner()
234
},
235
_ => {
236
let counts: NoNull<IdxCa> = list
237
.amortized_iter()
238
.map(|s| {
239
if let Some(s) = s {
240
s.as_ref().len() as IdxSize
241
} else {
242
1
243
}
244
})
245
.collect_trusted();
246
counts.into_inner()
247
},
248
};
249
s.rename(keep_name);
250
AggregatedScalar(s.into_column())
251
},
252
UpdateGroups::WithGroupsLen => {
253
// no need to update the groups
254
// we can just get the attribute, because we only need the length,
255
// not the correct order
256
let mut ca = ac.groups.group_count();
257
ca.rename(keep_name);
258
AggregatedScalar(ca.into_column())
259
},
260
// materialize groups
261
_ => {
262
let mut ca = ac.groups().group_count();
263
ca.rename(keep_name);
264
AggregatedScalar(ca.into_column())
265
},
266
}
267
} else {
268
// TODO: optimize this/and write somewhere else.
269
match ac.agg_state() {
270
AggState::LiteralScalar(s) | AggState::AggregatedScalar(s) => {
271
AggregatedScalar(Column::new(
272
keep_name,
273
[(s.len() as IdxSize - s.null_count() as IdxSize)],
274
))
275
},
276
AggState::AggregatedList(s) => {
277
let ca = s.list()?;
278
let out: IdxCa = ca
279
.into_iter()
280
.map(|opt_s| {
281
opt_s
282
.map(|s| s.len() as IdxSize - s.null_count() as IdxSize)
283
})
284
.collect();
285
AggregatedScalar(out.into_column().with_name(keep_name))
286
},
287
AggState::NotAggregated(s) => {
288
let s = s.clone();
289
let groups = ac.groups();
290
let out: IdxCa = if matches!(s.dtype(), &DataType::Null) {
291
IdxCa::full(s.name().clone(), 0, groups.len())
292
} else {
293
match groups.as_ref().as_ref() {
294
GroupsType::Idx(idx) => {
295
let s = s.rechunk();
296
// @scalar-opt
297
// @partition-opt
298
let array = &s.as_materialized_series().chunks()[0];
299
let validity = array.validity().unwrap();
300
idx.iter()
301
.map(|(_, g)| {
302
let mut count = 0 as IdxSize;
303
// Count valid values
304
g.iter().for_each(|i| {
305
count += validity
306
.get_bit_unchecked(*i as usize)
307
as IdxSize;
308
});
309
count
310
})
311
.collect_ca_trusted_with_dtype(keep_name, IDX_DTYPE)
312
},
313
GroupsType::Slice { groups, .. } => {
314
// Slice and use computed null count
315
groups
316
.iter()
317
.map(|g| {
318
let start = g[0];
319
let len = g[1];
320
len - s
321
.slice(start as i64, len as usize)
322
.null_count()
323
as IdxSize
324
})
325
.collect_ca_trusted_with_dtype(keep_name, IDX_DTYPE)
326
},
327
}
328
};
329
AggregatedScalar(out.into_column())
330
},
331
}
332
}
333
},
334
GroupByMethod::First => {
335
let (s, groups) = ac.get_final_aggregation();
336
let agg_s = s.agg_first(&groups);
337
AggregatedScalar(agg_s.with_name(keep_name))
338
},
339
GroupByMethod::Last => {
340
let (s, groups) = ac.get_final_aggregation();
341
let agg_s = s.agg_last(&groups);
342
AggregatedScalar(agg_s.with_name(keep_name))
343
},
344
GroupByMethod::NUnique => {
345
let (s, groups) = ac.get_final_aggregation();
346
let agg_s = s.agg_n_unique(&groups);
347
AggregatedScalar(agg_s.with_name(keep_name))
348
},
349
GroupByMethod::Implode => {
350
// If the aggregation is already in an aggregate flat state (AggregatedScalar), for instance by
351
// a mean() aggregation, we simply wrap into a list and maintain the AggregatedScalar state
352
//
353
// If it is not, we traverse the groups and create a list per group.
354
let c = match ac.agg_state() {
355
// mean agg:
356
// -> f64 -> list<f64>
357
AggregatedScalar(c) => c
358
.cast(&DataType::List(Box::new(c.dtype().clone())))
359
.unwrap(),
360
// Auto-imploded
361
AggState::NotAggregated(_) | AggState::AggregatedList(_) => {
362
ac._implode_no_agg();
363
return Ok(ac);
364
},
365
_ => {
366
let agg = ac.aggregated();
367
agg.as_list().into_column()
368
},
369
};
370
match ac.agg_state() {
371
// An imploded scalar remains a scalar
372
AggregatedScalar(_) => AggregatedScalar(c.with_name(keep_name)),
373
_ => AggState::AggregatedList(c.with_name(keep_name)),
374
}
375
},
376
GroupByMethod::Groups => {
377
let mut column: ListChunked = ac.groups().as_list_chunked();
378
column.rename(keep_name);
379
AggregatedScalar(column.into_column())
380
},
381
GroupByMethod::Std(ddof) => {
382
let (c, groups) = ac.get_final_aggregation();
383
let agg_c = c.agg_std(&groups, ddof);
384
AggregatedScalar(agg_c.with_name(keep_name))
385
},
386
GroupByMethod::Var(ddof) => {
387
let (c, groups) = ac.get_final_aggregation();
388
let agg_c = c.agg_var(&groups, ddof);
389
AggregatedScalar(agg_c.with_name(keep_name))
390
},
391
GroupByMethod::Quantile(_, _) => {
392
// implemented explicitly in AggQuantile struct
393
unimplemented!()
394
},
395
GroupByMethod::NanMin => {
396
#[cfg(feature = "propagate_nans")]
397
{
398
let (c, groups) = ac.get_final_aggregation();
399
let agg_c = if c.dtype().is_float() {
400
nan_propagating_aggregate::group_agg_nan_min_s(
401
c.as_materialized_series(),
402
&groups,
403
)
404
.into_column()
405
} else {
406
c.agg_min(&groups)
407
};
408
AggregatedScalar(agg_c.with_name(keep_name))
409
}
410
#[cfg(not(feature = "propagate_nans"))]
411
{
412
panic!("activate 'propagate_nans' feature")
413
}
414
},
415
GroupByMethod::NanMax => {
416
#[cfg(feature = "propagate_nans")]
417
{
418
let (c, groups) = ac.get_final_aggregation();
419
let agg_c = if c.dtype().is_float() {
420
nan_propagating_aggregate::group_agg_nan_max_s(
421
c.as_materialized_series(),
422
&groups,
423
)
424
.into_column()
425
} else {
426
c.agg_max(&groups)
427
};
428
AggregatedScalar(agg_c.with_name(keep_name))
429
}
430
#[cfg(not(feature = "propagate_nans"))]
431
{
432
panic!("activate 'propagate_nans' feature")
433
}
434
},
435
}
436
};
437
438
Ok(AggregationContext::from_agg_state(
439
out,
440
Cow::Borrowed(groups),
441
))
442
}
443
444
fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
445
if let Some(field) = self.field.as_ref() {
446
Ok(field.clone())
447
} else {
448
self.input.to_field(input_schema)
449
}
450
}
451
452
fn is_scalar(&self) -> bool {
453
true
454
}
455
456
fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> {
457
Some(self)
458
}
459
}
460
461
impl PartitionedAggregation for AggregationExpr {
462
fn evaluate_partitioned(
463
&self,
464
df: &DataFrame,
465
groups: &GroupPositions,
466
state: &ExecutionState,
467
) -> PolarsResult<Column> {
468
let expr = self.input.as_partitioned_aggregator().unwrap();
469
let column = expr.evaluate_partitioned(df, groups, state)?;
470
471
// SAFETY:
472
// groups are in bounds
473
unsafe {
474
match self.agg_type.groupby {
475
#[cfg(feature = "dtype-struct")]
476
GroupByMethod::Mean => {
477
let new_name = column.name().clone();
478
479
// ensure we don't overflow
480
// the all 8 and 16 bits integers are already upcasted to int16 on `agg_sum`
481
let mut agg_s = if matches!(column.dtype(), DataType::Int32 | DataType::UInt32)
482
{
483
column.cast(&DataType::Int64).unwrap().agg_sum(groups)
484
} else {
485
column.agg_sum(groups)
486
};
487
agg_s.rename(new_name.clone());
488
489
if !agg_s.dtype().is_primitive_numeric() {
490
Ok(agg_s)
491
} else {
492
let agg_s = match agg_s.dtype() {
493
DataType::Float32 => agg_s,
494
_ => agg_s.cast(&DataType::Float64).unwrap(),
495
};
496
let mut count_s = column.agg_valid_count(groups);
497
count_s.rename(PlSmallStr::from_static("__POLARS_COUNT"));
498
Ok(
499
StructChunked::from_columns(new_name, agg_s.len(), &[agg_s, count_s])
500
.unwrap()
501
.into_column(),
502
)
503
}
504
},
505
GroupByMethod::Implode => {
506
let new_name = column.name().clone();
507
let mut agg = column.agg_list(groups);
508
agg.rename(new_name);
509
Ok(agg)
510
},
511
GroupByMethod::First => {
512
let mut agg = column.agg_first(groups);
513
agg.rename(column.name().clone());
514
Ok(agg)
515
},
516
GroupByMethod::Last => {
517
let mut agg = column.agg_last(groups);
518
agg.rename(column.name().clone());
519
Ok(agg)
520
},
521
GroupByMethod::Max => {
522
let mut agg = column.agg_max(groups);
523
agg.rename(column.name().clone());
524
Ok(agg)
525
},
526
GroupByMethod::Min => {
527
let mut agg = column.agg_min(groups);
528
agg.rename(column.name().clone());
529
Ok(agg)
530
},
531
GroupByMethod::Sum => {
532
let mut agg = column.agg_sum(groups);
533
agg.rename(column.name().clone());
534
Ok(agg)
535
},
536
GroupByMethod::Count {
537
include_nulls: true,
538
} => {
539
let mut ca = groups.group_count();
540
ca.rename(column.name().clone());
541
Ok(ca.into_column())
542
},
543
_ => {
544
unimplemented!()
545
},
546
}
547
}
548
}
549
550
fn finalize(
551
&self,
552
partitioned: Column,
553
groups: &GroupPositions,
554
_state: &ExecutionState,
555
) -> PolarsResult<Column> {
556
match self.agg_type.groupby {
557
GroupByMethod::Count {
558
include_nulls: true,
559
}
560
| GroupByMethod::Sum => {
561
let mut agg = unsafe { partitioned.agg_sum(groups) };
562
agg.rename(partitioned.name().clone());
563
Ok(agg)
564
},
565
#[cfg(feature = "dtype-struct")]
566
GroupByMethod::Mean => {
567
let new_name = partitioned.name().clone();
568
match partitioned.dtype() {
569
DataType::Struct(_) => {
570
let ca = partitioned.struct_().unwrap();
571
let fields = ca.fields_as_series();
572
let sum = &fields[0];
573
let count = &fields[1];
574
let (agg_count, agg_s) =
575
unsafe { POOL.join(|| count.agg_sum(groups), || sum.agg_sum(groups)) };
576
577
// Ensure that we don't divide by zero by masking out zeros.
578
let agg_count = agg_count.idx().unwrap();
579
let mask = agg_count.equal(0 as IdxSize);
580
let agg_count = agg_count.set(&mask, None).unwrap().into_series();
581
582
let agg_s = &agg_s / &agg_count.cast(agg_s.dtype()).unwrap();
583
Ok(agg_s?.with_name(new_name).into_column())
584
},
585
_ => Ok(Column::full_null(
586
new_name,
587
groups.len(),
588
partitioned.dtype(),
589
)),
590
}
591
},
592
GroupByMethod::Implode => {
593
// the groups are scattered over multiple groups/sub dataframes.
594
// we now must collect them into a single group
595
let ca = partitioned.list().unwrap();
596
let new_name = partitioned.name().clone();
597
598
let mut values = Vec::with_capacity(groups.len());
599
let mut can_fast_explode = true;
600
601
let mut offsets = Vec::<i64>::with_capacity(groups.len() + 1);
602
let mut length_so_far = 0i64;
603
offsets.push(length_so_far);
604
605
let mut process_group = |ca: ListChunked| -> PolarsResult<()> {
606
let s = ca.explode(false)?;
607
length_so_far += s.len() as i64;
608
offsets.push(length_so_far);
609
values.push(s.chunks()[0].clone());
610
611
if s.is_empty() {
612
can_fast_explode = false;
613
}
614
Ok(())
615
};
616
617
match groups.as_ref() {
618
GroupsType::Idx(groups) => {
619
for (_, idx) in groups {
620
let ca = unsafe {
621
// SAFETY:
622
// The indexes of the group_by operation are never out of bounds
623
ca.take_unchecked(idx)
624
};
625
process_group(ca)?;
626
}
627
},
628
GroupsType::Slice { groups, .. } => {
629
for [first, len] in groups {
630
let len = *len as usize;
631
let ca = ca.slice(*first as i64, len);
632
process_group(ca)?;
633
}
634
},
635
}
636
637
let vals = values.iter().map(|arr| &**arr).collect::<Vec<_>>();
638
let values = concatenate(&vals).unwrap();
639
640
let dtype = ListArray::<i64>::default_datatype(values.dtype().clone());
641
// SAFETY: offsets are monotonically increasing.
642
let arr = ListArray::<i64>::new(
643
dtype,
644
unsafe { Offsets::new_unchecked(offsets).into() },
645
values,
646
None,
647
);
648
let mut ca = ListChunked::with_chunk(new_name, arr);
649
if can_fast_explode {
650
ca.set_fast_explode()
651
}
652
Ok(ca.into_series().as_list().into_column())
653
},
654
GroupByMethod::First => {
655
let mut agg = unsafe { partitioned.agg_first(groups) };
656
agg.rename(partitioned.name().clone());
657
Ok(agg)
658
},
659
GroupByMethod::Last => {
660
let mut agg = unsafe { partitioned.agg_last(groups) };
661
agg.rename(partitioned.name().clone());
662
Ok(agg)
663
},
664
GroupByMethod::Max => {
665
let mut agg = unsafe { partitioned.agg_max(groups) };
666
agg.rename(partitioned.name().clone());
667
Ok(agg)
668
},
669
GroupByMethod::Min => {
670
let mut agg = unsafe { partitioned.agg_min(groups) };
671
agg.rename(partitioned.name().clone());
672
Ok(agg)
673
},
674
_ => unimplemented!(),
675
}
676
}
677
}
678
679
pub struct AggQuantileExpr {
680
pub(crate) input: Arc<dyn PhysicalExpr>,
681
pub(crate) quantile: Arc<dyn PhysicalExpr>,
682
pub(crate) method: QuantileMethod,
683
}
684
685
impl AggQuantileExpr {
686
pub fn new(
687
input: Arc<dyn PhysicalExpr>,
688
quantile: Arc<dyn PhysicalExpr>,
689
method: QuantileMethod,
690
) -> Self {
691
Self {
692
input,
693
quantile,
694
method,
695
}
696
}
697
698
fn get_quantile(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<f64> {
699
let quantile = self.quantile.evaluate(df, state)?;
700
polars_ensure!(quantile.len() <= 1, ComputeError:
701
"polars only supports computing a single quantile; \
702
make sure the 'quantile' expression input produces a single quantile"
703
);
704
quantile.get(0).unwrap().try_extract()
705
}
706
}
707
708
impl PhysicalExpr for AggQuantileExpr {
709
fn as_expression(&self) -> Option<&Expr> {
710
None
711
}
712
713
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
714
let input = self.input.evaluate(df, state)?;
715
let quantile = self.get_quantile(df, state)?;
716
input
717
.quantile_reduce(quantile, self.method)
718
.map(|sc| sc.into_column(input.name().clone()))
719
}
720
#[allow(clippy::ptr_arg)]
721
fn evaluate_on_groups<'a>(
722
&self,
723
df: &DataFrame,
724
groups: &'a GroupPositions,
725
state: &ExecutionState,
726
) -> PolarsResult<AggregationContext<'a>> {
727
let mut ac = self.input.evaluate_on_groups(df, groups, state)?;
728
// don't change names by aggregations as is done in polars-core
729
let keep_name = ac.get_values().name().clone();
730
731
let quantile = self.get_quantile(df, state)?;
732
733
// SAFETY:
734
// groups are in bounds
735
let mut agg = unsafe {
736
ac.flat_naive()
737
.into_owned()
738
.agg_quantile(ac.groups(), quantile, self.method)
739
};
740
agg.rename(keep_name);
741
Ok(AggregationContext::from_agg_state(
742
AggregatedScalar(agg),
743
Cow::Borrowed(groups),
744
))
745
}
746
747
fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
748
self.input.to_field(input_schema)
749
}
750
751
fn is_scalar(&self) -> bool {
752
true
753
}
754
}
755
756
/// Simple wrapper to parallelize functions that can be divided over threads aggregated and
757
/// finally aggregated in the main thread. This can be done for sum, min, max, etc.
758
fn parallel_op_columns<F>(f: F, s: Column, allow_threading: bool) -> PolarsResult<Column>
759
where
760
F: Fn(Column) -> PolarsResult<Column> + Send + Sync,
761
{
762
// set during debug low so
763
// we mimic production size data behavior
764
#[cfg(debug_assertions)]
765
let thread_boundary = 0;
766
767
#[cfg(not(debug_assertions))]
768
let thread_boundary = 100_000;
769
770
// threading overhead/ splitting work stealing is costly..
771
772
if !allow_threading
773
|| s.len() < thread_boundary
774
|| POOL.current_thread_has_pending_tasks().unwrap_or(false)
775
{
776
return f(s);
777
}
778
let n_threads = POOL.current_num_threads();
779
let splits = _split_offsets(s.len(), n_threads);
780
781
let chunks = POOL.install(|| {
782
splits
783
.into_par_iter()
784
.map(|(offset, len)| {
785
let s = s.slice(offset as i64, len);
786
f(s)
787
})
788
.collect::<PolarsResult<Vec<_>>>()
789
})?;
790
791
let mut iter = chunks.into_iter();
792
let first = iter.next().unwrap();
793
let dtype = first.dtype();
794
let out = iter.fold(first.to_physical_repr(), |mut acc, s| {
795
acc.append(&s.to_physical_repr()).unwrap();
796
acc
797
});
798
799
unsafe { f(out.from_physical_unchecked(dtype).unwrap()) }
800
}
801
802