Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/reduce/mean.rs
6940 views
1
use std::marker::PhantomData;
2
3
use arrow::temporal_conversions::MICROSECONDS_IN_DAY;
4
use num_traits::{AsPrimitive, Zero};
5
use polars_core::with_match_physical_numeric_polars_type;
6
7
use super::*;
8
9
pub fn new_mean_reduction(dtype: DataType) -> Box<dyn GroupedReduction> {
10
use DataType::*;
11
use VecGroupedReduction as VGR;
12
match dtype {
13
Boolean => Box::new(VGR::new(dtype, BoolMeanReducer)),
14
_ if dtype.is_primitive_numeric() || dtype.is_temporal() => {
15
with_match_physical_numeric_polars_type!(dtype.to_physical(), |$T| {
16
Box::new(VGR::new(dtype, NumMeanReducer::<$T>(PhantomData)))
17
})
18
},
19
#[cfg(feature = "dtype-decimal")]
20
Decimal(_, _) => Box::new(VGR::new(dtype, NumMeanReducer::<Int128Type>(PhantomData))),
21
22
// For compatibility with the current engine, should probably be an error.
23
String | Binary => Box::new(super::NullGroupedReduction::new(dtype)),
24
25
_ => unimplemented!("{dtype:?} is not supported by mean reduction"),
26
}
27
}
28
29
fn finish_output(values: Vec<(f64, usize)>, dtype: &DataType) -> Series {
30
match dtype {
31
DataType::Float32 => {
32
let ca: Float32Chunked = values
33
.into_iter()
34
.map(|(s, c)| (c != 0).then(|| (s / c as f64) as f32))
35
.collect_ca(PlSmallStr::EMPTY);
36
ca.into_series()
37
},
38
dt if dt.is_primitive_numeric() => {
39
let ca: Float64Chunked = values
40
.into_iter()
41
.map(|(s, c)| (c != 0).then(|| s / c as f64))
42
.collect_ca(PlSmallStr::EMPTY);
43
ca.into_series()
44
},
45
#[cfg(feature = "dtype-decimal")]
46
DataType::Decimal(_prec, scale) => {
47
let inv_scale_factor = 1.0 / 10u128.pow(scale.unwrap() as u32) as f64;
48
let ca: Float64Chunked = values
49
.into_iter()
50
.map(|(s, c)| (c != 0).then(|| s / c as f64 * inv_scale_factor))
51
.collect_ca(PlSmallStr::EMPTY);
52
ca.into_series()
53
},
54
#[cfg(feature = "dtype-datetime")]
55
DataType::Date => {
56
const US_IN_DAY: f64 = MICROSECONDS_IN_DAY as f64;
57
let ca: Int64Chunked = values
58
.into_iter()
59
.map(|(s, c)| (c != 0).then(|| (s * US_IN_DAY / c as f64) as i64))
60
.collect_ca(PlSmallStr::EMPTY);
61
ca.into_datetime(TimeUnit::Microseconds, None).into_series()
62
},
63
DataType::Datetime(_, _) | DataType::Duration(_) | DataType::Time => {
64
let ca: Int64Chunked = values
65
.into_iter()
66
.map(|(s, c)| (c != 0).then(|| (s / c as f64) as i64))
67
.collect_ca(PlSmallStr::EMPTY);
68
ca.into_series().cast(dtype).unwrap()
69
},
70
_ => unimplemented!(),
71
}
72
}
73
74
struct NumMeanReducer<T>(PhantomData<T>);
75
impl<T> Clone for NumMeanReducer<T> {
76
fn clone(&self) -> Self {
77
Self(PhantomData)
78
}
79
}
80
81
impl<T> Reducer for NumMeanReducer<T>
82
where
83
T: PolarsNumericType,
84
ChunkedArray<T>: ChunkAgg<T::Native>,
85
{
86
type Dtype = T;
87
type Value = (f64, usize);
88
89
#[inline(always)]
90
fn init(&self) -> Self::Value {
91
(0.0, 0)
92
}
93
94
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
95
s.to_physical_repr()
96
}
97
98
#[inline(always)]
99
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
100
a.0 += b.0;
101
a.1 += b.1;
102
}
103
104
#[inline(always)]
105
fn reduce_one(&self, a: &mut Self::Value, b: Option<T::Native>, _seq_id: u64) {
106
a.0 += b.unwrap_or(T::Native::zero()).as_();
107
a.1 += b.is_some() as usize;
108
}
109
110
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, _seq_id: u64) {
111
v.0 += ChunkAgg::_sum_as_f64(ca);
112
v.1 += ca.len() - ca.null_count();
113
}
114
115
fn finish(
116
&self,
117
v: Vec<Self::Value>,
118
m: Option<Bitmap>,
119
dtype: &DataType,
120
) -> PolarsResult<Series> {
121
assert!(m.is_none());
122
Ok(finish_output(v, dtype))
123
}
124
}
125
126
#[derive(Clone)]
127
struct BoolMeanReducer;
128
129
impl Reducer for BoolMeanReducer {
130
type Dtype = BooleanType;
131
type Value = (usize, usize);
132
133
#[inline(always)]
134
fn init(&self) -> Self::Value {
135
(0, 0)
136
}
137
138
#[inline(always)]
139
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
140
a.0 += b.0;
141
a.1 += b.1;
142
}
143
144
#[inline(always)]
145
fn reduce_one(&self, a: &mut Self::Value, b: Option<bool>, _seq_id: u64) {
146
a.0 += b.unwrap_or(false) as usize;
147
a.1 += b.is_some() as usize;
148
}
149
150
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, _seq_id: u64) {
151
v.0 += ca.sum().unwrap_or(0) as usize;
152
v.1 += ca.len() - ca.null_count();
153
}
154
155
fn finish(
156
&self,
157
v: Vec<Self::Value>,
158
m: Option<Bitmap>,
159
dtype: &DataType,
160
) -> PolarsResult<Series> {
161
assert!(m.is_none());
162
assert!(dtype == &DataType::Boolean);
163
let ca: Float64Chunked = v
164
.into_iter()
165
.map(|(s, c)| (c != 0).then(|| s as f64 / c as f64))
166
.collect_ca(PlSmallStr::EMPTY);
167
Ok(ca.into_series())
168
}
169
}
170
171