Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-compute/src/moment.rs
6939 views
1
// Some formulae:
2
// mean_x = sum(weight[i] * x[i]) / sum(weight)
3
// dp_xy = weighted sum of deviation products of variables x, y, written in
4
// the paper as simply XY.
5
// dp_xy = sum(weight[i] * (x[i] - mean_x) * (y[i] - mean_y))
6
//
7
// cov(x, y) = dp_xy / sum(weight)
8
// var(x) = cov(x, x)
9
//
10
// Algorithms from:
11
// Numerically stable parallel computation of (co-)variance.
12
// Schubert, E. & Gertz, M. (2018).
13
//
14
// Key equations from the paper:
15
// (17) for mean update, (23) for dp update (and also Table 1).
16
//
17
//
18
// For higher moments we refer to:
19
// Numerically Stable, Scalable Formulas for Parallel and Online Computation of
20
// Higher-Order Multivariate Central Moments with Arbitrary Weights.
21
// Pébay, P. & Terriberry, T. B. & Kolla, H. & Bennett J. (2016)
22
//
23
// Key equations from paper:
24
// (3.26) mean update, (3.27) moment update.
25
//
26
// Here we use mk to mean the weighted kth central moment:
27
// mk = sum(weight[i] * (x[i] - mean_x)**k)
28
// Note that we'll use the terms m2 = dp = dp_xx if unambiguous.
29
30
#![allow(clippy::collapsible_else_if)]
31
32
use arrow::array::{Array, PrimitiveArray};
33
use arrow::types::NativeType;
34
use num_traits::AsPrimitive;
35
use polars_utils::algebraic_ops::*;
36
37
const CHUNK_SIZE: usize = 128;
38
39
#[derive(Default, Clone)]
40
pub struct VarState {
41
weight: f64,
42
mean: f64,
43
dp: f64,
44
}
45
46
#[derive(Default, Clone)]
47
pub struct CovState {
48
weight: f64,
49
mean_x: f64,
50
mean_y: f64,
51
dp_xy: f64,
52
}
53
54
#[derive(Default, Clone)]
55
pub struct PearsonState {
56
weight: f64,
57
mean_x: f64,
58
mean_y: f64,
59
dp_xx: f64,
60
dp_xy: f64,
61
dp_yy: f64,
62
}
63
64
impl VarState {
65
fn new(x: &[f64]) -> Self {
66
if x.is_empty() {
67
return Self::default();
68
}
69
70
let weight = x.len() as f64;
71
let mean = alg_sum_f64(x.iter().copied()) / weight;
72
Self {
73
weight,
74
mean,
75
dp: alg_sum_f64(x.iter().map(|&xi| (xi - mean) * (xi - mean))),
76
}
77
}
78
79
fn clear_zero_weight_nan(&mut self) {
80
// Clear NaNs due to division by zero.
81
if self.weight == 0.0 {
82
self.mean = 0.0;
83
self.dp = 0.0;
84
}
85
}
86
87
pub fn insert_one(&mut self, x: f64) {
88
// Just a specialized version of
89
// self.combine(&Self { weight: 1.0, mean: x, dp: 0.0 })
90
let new_weight = self.weight + 1.0;
91
let delta_mean = x - self.mean;
92
let new_mean = self.mean + delta_mean / new_weight;
93
self.dp += (x - new_mean) * delta_mean;
94
self.weight = new_weight;
95
self.mean = new_mean;
96
self.clear_zero_weight_nan();
97
}
98
99
pub fn remove_one(&mut self, x: f64) {
100
// Just a specialized version of
101
// self.combine(&Self { weight: -1.0, mean: x, dp: 0.0 })
102
let new_weight = self.weight - 1.0;
103
let delta_mean = x - self.mean;
104
let new_mean = self.mean - delta_mean / new_weight;
105
self.dp -= (x - new_mean) * delta_mean;
106
self.weight = new_weight;
107
self.mean = new_mean;
108
self.clear_zero_weight_nan();
109
}
110
111
pub fn combine(&mut self, other: &Self) {
112
if other.weight == 0.0 {
113
return;
114
}
115
116
let new_weight = self.weight + other.weight;
117
let other_weight_frac = other.weight / new_weight;
118
let delta_mean = other.mean - self.mean;
119
let new_mean = self.mean + delta_mean * other_weight_frac;
120
self.dp += other.dp + other.weight * (other.mean - new_mean) * delta_mean;
121
self.weight = new_weight;
122
self.mean = new_mean;
123
self.clear_zero_weight_nan();
124
}
125
126
pub fn finalize(&self, ddof: u8) -> Option<f64> {
127
if self.weight <= ddof as f64 {
128
None
129
} else {
130
let var = self.dp / (self.weight - ddof as f64);
131
Some(if var < 0.0 {
132
// Variance can't be negative, except through numerical instability.
133
// We don't use f64::max here so we propagate nans.
134
0.0
135
} else {
136
var
137
})
138
}
139
}
140
}
141
142
impl CovState {
143
fn new(x: &[f64], y: &[f64]) -> Self {
144
assert!(x.len() == y.len());
145
if x.is_empty() {
146
return Self::default();
147
}
148
149
let weight = x.len() as f64;
150
let inv_weight = 1.0 / weight;
151
let mean_x = alg_sum_f64(x.iter().copied()) * inv_weight;
152
let mean_y = alg_sum_f64(y.iter().copied()) * inv_weight;
153
Self {
154
weight,
155
mean_x,
156
mean_y,
157
dp_xy: alg_sum_f64(
158
x.iter()
159
.zip(y)
160
.map(|(&xi, &yi)| (xi - mean_x) * (yi - mean_y)),
161
),
162
}
163
}
164
165
pub fn combine(&mut self, other: &Self) {
166
if other.weight == 0.0 {
167
return;
168
} else if self.weight == 0.0 {
169
*self = other.clone();
170
return;
171
}
172
173
let new_weight = self.weight + other.weight;
174
let other_weight_frac = other.weight / new_weight;
175
let delta_mean_x = other.mean_x - self.mean_x;
176
let delta_mean_y = other.mean_y - self.mean_y;
177
let new_mean_x = self.mean_x + delta_mean_x * other_weight_frac;
178
let new_mean_y = self.mean_y + delta_mean_y * other_weight_frac;
179
self.dp_xy += other.dp_xy + other.weight * (other.mean_x - new_mean_x) * delta_mean_y;
180
self.weight = new_weight;
181
self.mean_x = new_mean_x;
182
self.mean_y = new_mean_y;
183
}
184
185
pub fn finalize(&self, ddof: u8) -> Option<f64> {
186
if self.weight <= ddof as f64 {
187
None
188
} else {
189
Some(self.dp_xy / (self.weight - ddof as f64))
190
}
191
}
192
}
193
194
impl PearsonState {
195
fn new(x: &[f64], y: &[f64]) -> Self {
196
assert!(x.len() == y.len());
197
if x.is_empty() {
198
return Self::default();
199
}
200
201
let weight = x.len() as f64;
202
let inv_weight = 1.0 / weight;
203
let mean_x = alg_sum_f64(x.iter().copied()) * inv_weight;
204
let mean_y = alg_sum_f64(y.iter().copied()) * inv_weight;
205
let mut dp_xx = 0.0;
206
let mut dp_xy = 0.0;
207
let mut dp_yy = 0.0;
208
for (xi, yi) in x.iter().zip(y.iter()) {
209
dp_xx = alg_add_f64(dp_xx, (xi - mean_x) * (xi - mean_x));
210
dp_xy = alg_add_f64(dp_xy, (xi - mean_x) * (yi - mean_y));
211
dp_yy = alg_add_f64(dp_yy, (yi - mean_y) * (yi - mean_y));
212
}
213
Self {
214
weight,
215
mean_x,
216
mean_y,
217
dp_xx,
218
dp_xy,
219
dp_yy,
220
}
221
}
222
223
pub fn combine(&mut self, other: &Self) {
224
if other.weight == 0.0 {
225
return;
226
} else if self.weight == 0.0 {
227
*self = other.clone();
228
return;
229
}
230
231
let new_weight = self.weight + other.weight;
232
let other_weight_frac = other.weight / new_weight;
233
let delta_mean_x = other.mean_x - self.mean_x;
234
let delta_mean_y = other.mean_y - self.mean_y;
235
let new_mean_x = self.mean_x + delta_mean_x * other_weight_frac;
236
let new_mean_y = self.mean_y + delta_mean_y * other_weight_frac;
237
self.dp_xx += other.dp_xx + other.weight * (other.mean_x - new_mean_x) * delta_mean_x;
238
self.dp_xy += other.dp_xy + other.weight * (other.mean_x - new_mean_x) * delta_mean_y;
239
self.dp_yy += other.dp_yy + other.weight * (other.mean_y - new_mean_y) * delta_mean_y;
240
self.weight = new_weight;
241
self.mean_x = new_mean_x;
242
self.mean_y = new_mean_y;
243
}
244
245
pub fn finalize(&self) -> f64 {
246
let denom_sq = self.dp_xx * self.dp_yy;
247
if denom_sq > 0.0 {
248
self.dp_xy / denom_sq.sqrt()
249
} else {
250
f64::NAN
251
}
252
}
253
}
254
255
#[derive(Default, Clone)]
256
pub struct SkewState {
257
weight: f64,
258
mean: f64,
259
m2: f64,
260
m3: f64,
261
}
262
263
impl SkewState {
264
fn new(x: &[f64]) -> Self {
265
if x.is_empty() {
266
return Self::default();
267
}
268
269
let weight = x.len() as f64;
270
let mean = alg_sum_f64(x.iter().copied()) / weight;
271
let mut m2 = 0.0;
272
let mut m3 = 0.0;
273
for xi in x.iter() {
274
let d = xi - mean;
275
let d2 = d * d;
276
let d3 = d * d2;
277
m2 = alg_add_f64(m2, d2);
278
m3 = alg_add_f64(m3, d3);
279
}
280
Self {
281
weight,
282
mean,
283
m2,
284
m3,
285
}
286
}
287
288
fn clear_zero_weight_nan(&mut self) {
289
// Clear NaNs due to division by zero.
290
if self.weight == 0.0 {
291
self.mean = 0.0;
292
self.m2 = 0.0;
293
self.m3 = 0.0;
294
}
295
}
296
297
pub fn insert_one(&mut self, x: f64) {
298
// Specialization of self.combine(&SkewState { weight: 1.0, mean: x, m2: 0.0, m3: 0.0 });
299
let new_weight = self.weight + 1.0;
300
let delta_mean = x - self.mean;
301
let delta_mean_weight = delta_mean / new_weight;
302
let new_mean = self.mean + delta_mean_weight;
303
304
let weight_diff = self.weight - 1.0;
305
let m2_update = (x - new_mean) * delta_mean;
306
let new_m2 = self.m2 + m2_update;
307
let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff - 3.0 * self.m2);
308
309
self.weight = new_weight;
310
self.mean = new_mean;
311
self.m2 = new_m2;
312
self.m3 = new_m3;
313
self.clear_zero_weight_nan();
314
}
315
316
pub fn remove_one(&mut self, x: f64) {
317
// Specialization of self.combine(&SkewState { weight: -1.0, mean: x, m2: 0.0, m3: 0.0 });
318
let new_weight = self.weight - 1.0;
319
let delta_mean = x - self.mean;
320
let delta_mean_weight = delta_mean / new_weight;
321
let new_mean = self.mean - delta_mean_weight;
322
323
let weight_diff = self.weight + 1.0;
324
let m2_update = (new_mean - x) * delta_mean;
325
let new_m2 = self.m2 + m2_update;
326
let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff + 3.0 * self.m2);
327
328
self.weight = new_weight;
329
self.mean = new_mean;
330
self.m2 = new_m2;
331
self.m3 = new_m3;
332
self.clear_zero_weight_nan();
333
}
334
335
pub fn combine(&mut self, other: &Self) {
336
if other.weight == 0.0 {
337
return;
338
} else if self.weight == 0.0 {
339
*self = other.clone();
340
return;
341
}
342
343
let new_weight = self.weight + other.weight;
344
let delta_mean = other.mean - self.mean;
345
let delta_mean_weight = delta_mean / new_weight;
346
let new_mean = self.mean + other.weight * delta_mean_weight;
347
348
let weight_diff = self.weight - other.weight;
349
let self_weight_other_m2 = self.weight * other.m2;
350
let other_weight_self_m2 = other.weight * self.m2;
351
let m2_update = other.weight * (other.mean - new_mean) * delta_mean;
352
let new_m2 = self.m2 + other.m2 + m2_update;
353
let new_m3 = self.m3
354
+ other.m3
355
+ delta_mean_weight
356
* (m2_update * weight_diff + 3.0 * (self_weight_other_m2 - other_weight_self_m2));
357
358
self.weight = new_weight;
359
self.mean = new_mean;
360
self.m2 = new_m2;
361
self.m3 = new_m3;
362
self.clear_zero_weight_nan();
363
}
364
365
pub fn finalize(&self, bias: bool) -> Option<f64> {
366
let m2 = self.m2 / self.weight;
367
let m3 = self.m3 / self.weight;
368
let is_zero = m2 <= (f64::EPSILON * self.mean).powi(2);
369
let biased_est = if is_zero { f64::NAN } else { m3 / m2.powf(1.5) };
370
if bias {
371
if self.weight == 0.0 {
372
None
373
} else {
374
Some(biased_est)
375
}
376
} else {
377
if self.weight <= 2.0 {
378
None
379
} else {
380
let correction = (self.weight * (self.weight - 1.0)).sqrt() / (self.weight - 2.0);
381
Some(correction * biased_est)
382
}
383
}
384
}
385
}
386
387
#[derive(Default, Clone)]
388
pub struct KurtosisState {
389
weight: f64,
390
mean: f64,
391
m2: f64,
392
m3: f64,
393
m4: f64,
394
}
395
396
impl KurtosisState {
397
fn new(x: &[f64]) -> Self {
398
if x.is_empty() {
399
return Self::default();
400
}
401
402
let weight = x.len() as f64;
403
let mean = alg_sum_f64(x.iter().copied()) / weight;
404
let mut m2 = 0.0;
405
let mut m3 = 0.0;
406
let mut m4 = 0.0;
407
for xi in x.iter() {
408
let d = xi - mean;
409
let d2 = d * d;
410
let d3 = d * d2;
411
let d4 = d2 * d2;
412
m2 = alg_add_f64(m2, d2);
413
m3 = alg_add_f64(m3, d3);
414
m4 = alg_add_f64(m4, d4);
415
}
416
Self {
417
weight,
418
mean,
419
m2,
420
m3,
421
m4,
422
}
423
}
424
425
fn clear_zero_weight_nan(&mut self) {
426
// Clear NaNs due to division by zero.
427
if self.weight == 0.0 {
428
self.mean = 0.0;
429
self.m2 = 0.0;
430
self.m3 = 0.0;
431
self.m4 = 0.0;
432
}
433
}
434
435
pub fn insert_one(&mut self, x: f64) {
436
// Specialization of self.combine(&KurtosisState { weight: 1.0, mean: x, m2: 0.0, m3: 0.0, m4: 0.0 });
437
let new_weight = self.weight + 1.0;
438
let delta_mean = x - self.mean;
439
let delta_mean_weight = delta_mean / new_weight;
440
let new_mean = self.mean + delta_mean_weight;
441
442
let weight_diff = self.weight - 1.0;
443
let m2_update = (x - new_mean) * delta_mean;
444
let new_m2 = self.m2 + m2_update;
445
let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff - 3.0 * self.m2);
446
let new_m4 = self.m4
447
+ delta_mean_weight
448
* (delta_mean_weight
449
* (m2_update * (self.weight * weight_diff + 1.0) + 6.0 * self.m2)
450
- 4.0 * self.m3);
451
452
self.weight = new_weight;
453
self.mean = new_mean;
454
self.m2 = new_m2;
455
self.m3 = new_m3;
456
self.m4 = new_m4;
457
self.clear_zero_weight_nan();
458
}
459
460
pub fn remove_one(&mut self, x: f64) {
461
// Specialization of self.combine(&KurtosisState { weight: -1.0, mean: x, m2: 0.0, m3: 0.0, m4: 0.0 });
462
let new_weight = self.weight - 1.0;
463
let delta_mean = x - self.mean;
464
let delta_mean_weight = delta_mean / new_weight;
465
let new_mean = self.mean - delta_mean_weight;
466
467
let weight_diff = self.weight + 1.0;
468
let m2_update = (new_mean - x) * delta_mean;
469
let new_m2 = self.m2 + m2_update;
470
let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff + 3.0 * self.m2);
471
let new_m4 = self.m4
472
+ delta_mean_weight
473
* (delta_mean_weight
474
* (m2_update * (self.weight * weight_diff + 1.0) + 6.0 * self.m2)
475
+ 4.0 * self.m3);
476
477
self.weight = new_weight;
478
self.mean = new_mean;
479
self.m2 = new_m2;
480
self.m3 = new_m3;
481
self.m4 = new_m4;
482
self.clear_zero_weight_nan();
483
}
484
485
pub fn combine(&mut self, other: &Self) {
486
if other.weight == 0.0 {
487
return;
488
} else if self.weight == 0.0 {
489
*self = other.clone();
490
return;
491
}
492
493
let new_weight = self.weight + other.weight;
494
let delta_mean = other.mean - self.mean;
495
let delta_mean_weight = delta_mean / new_weight;
496
let new_mean = self.mean + other.weight * delta_mean_weight;
497
498
let weight_diff = self.weight - other.weight;
499
let self_weight_other_m2 = self.weight * other.m2;
500
let other_weight_self_m2 = other.weight * self.m2;
501
let m2_update = other.weight * (other.mean - new_mean) * delta_mean;
502
let new_m2 = self.m2 + other.m2 + m2_update;
503
let new_m3 = self.m3
504
+ other.m3
505
+ delta_mean_weight
506
* (m2_update * weight_diff + 3.0 * (self_weight_other_m2 - other_weight_self_m2));
507
let new_m4 = self.m4
508
+ other.m4
509
+ delta_mean_weight
510
* (delta_mean_weight
511
* (m2_update * (self.weight * weight_diff + other.weight * other.weight)
512
+ 6.0
513
* (self.weight * self_weight_other_m2
514
+ other.weight * other_weight_self_m2))
515
+ 4.0 * (self.weight * other.m3 - other.weight * self.m3));
516
517
self.weight = new_weight;
518
self.mean = new_mean;
519
self.m2 = new_m2;
520
self.m3 = new_m3;
521
self.m4 = new_m4;
522
self.clear_zero_weight_nan();
523
}
524
525
pub fn finalize(&self, fisher: bool, bias: bool) -> Option<f64> {
526
let m4 = self.m4 / self.weight;
527
let m2 = self.m2 / self.weight;
528
let is_zero = m2 <= (f64::EPSILON * self.mean).powi(2);
529
let biased_est = if is_zero { f64::NAN } else { m4 / (m2 * m2) };
530
let out = if bias {
531
if self.weight == 0.0 {
532
return None;
533
}
534
535
biased_est
536
} else {
537
if self.weight <= 3.0 {
538
return None;
539
}
540
541
let n = self.weight;
542
let nm1_nm2 = (n - 1.0) / (n - 2.0);
543
let np1_nm3 = (n + 1.0) / (n - 3.0);
544
let nm1_nm3 = (n - 1.0) / (n - 3.0);
545
nm1_nm2 * (np1_nm3 * biased_est - 3.0 * nm1_nm3) + 3.0
546
};
547
548
if fisher { Some(out - 3.0) } else { Some(out) }
549
}
550
}
551
552
fn chunk_as_float<T, I, F>(it: I, mut f: F)
553
where
554
T: NativeType + AsPrimitive<f64>,
555
I: IntoIterator<Item = T>,
556
F: FnMut(&[f64]),
557
{
558
let mut chunk = [0.0; CHUNK_SIZE];
559
let mut i = 0;
560
for val in it {
561
if i >= CHUNK_SIZE {
562
f(&chunk);
563
i = 0;
564
}
565
chunk[i] = val.as_();
566
i += 1;
567
}
568
if i > 0 {
569
f(&chunk[..i]);
570
}
571
}
572
573
fn chunk_as_float_binary<T, U, I, F>(it: I, mut f: F)
574
where
575
T: NativeType + AsPrimitive<f64>,
576
U: NativeType + AsPrimitive<f64>,
577
I: IntoIterator<Item = (T, U)>,
578
F: FnMut(&[f64], &[f64]),
579
{
580
let mut left_chunk = [0.0; CHUNK_SIZE];
581
let mut right_chunk = [0.0; CHUNK_SIZE];
582
let mut i = 0;
583
for (l, r) in it {
584
if i >= CHUNK_SIZE {
585
f(&left_chunk, &right_chunk);
586
i = 0;
587
}
588
left_chunk[i] = l.as_();
589
right_chunk[i] = r.as_();
590
i += 1;
591
}
592
if i > 0 {
593
f(&left_chunk[..i], &right_chunk[..i]);
594
}
595
}
596
597
pub fn var<T>(arr: &PrimitiveArray<T>) -> VarState
598
where
599
T: NativeType + AsPrimitive<f64>,
600
{
601
let mut out = VarState::default();
602
if arr.has_nulls() {
603
chunk_as_float(arr.non_null_values_iter(), |chunk| {
604
out.combine(&VarState::new(chunk))
605
});
606
} else {
607
chunk_as_float(arr.values().iter().copied(), |chunk| {
608
out.combine(&VarState::new(chunk))
609
});
610
}
611
out
612
}
613
614
pub fn cov<T, U>(x: &PrimitiveArray<T>, y: &PrimitiveArray<U>) -> CovState
615
where
616
T: NativeType + AsPrimitive<f64>,
617
U: NativeType + AsPrimitive<f64>,
618
{
619
assert!(x.len() == y.len());
620
let mut out = CovState::default();
621
if x.has_nulls() || y.has_nulls() {
622
chunk_as_float_binary(
623
x.iter()
624
.zip(y.iter())
625
.filter_map(|(l, r)| l.copied().zip(r.copied())),
626
|l, r| out.combine(&CovState::new(l, r)),
627
);
628
} else {
629
chunk_as_float_binary(
630
x.values().iter().copied().zip(y.values().iter().copied()),
631
|l, r| out.combine(&CovState::new(l, r)),
632
);
633
}
634
out
635
}
636
637
pub fn pearson_corr<T, U>(x: &PrimitiveArray<T>, y: &PrimitiveArray<U>) -> PearsonState
638
where
639
T: NativeType + AsPrimitive<f64>,
640
U: NativeType + AsPrimitive<f64>,
641
{
642
assert!(x.len() == y.len());
643
let mut out = PearsonState::default();
644
if x.has_nulls() || y.has_nulls() {
645
chunk_as_float_binary(
646
x.iter()
647
.zip(y.iter())
648
.filter_map(|(l, r)| l.copied().zip(r.copied())),
649
|l, r| out.combine(&PearsonState::new(l, r)),
650
);
651
} else {
652
chunk_as_float_binary(
653
x.values().iter().copied().zip(y.values().iter().copied()),
654
|l, r| out.combine(&PearsonState::new(l, r)),
655
);
656
}
657
out
658
}
659
660
pub fn skew<T>(arr: &PrimitiveArray<T>) -> SkewState
661
where
662
T: NativeType + AsPrimitive<f64>,
663
{
664
let mut out = SkewState::default();
665
if arr.has_nulls() {
666
chunk_as_float(arr.non_null_values_iter(), |chunk| {
667
out.combine(&SkewState::new(chunk))
668
});
669
} else {
670
chunk_as_float(arr.values().iter().copied(), |chunk| {
671
out.combine(&SkewState::new(chunk))
672
});
673
}
674
out
675
}
676
677
pub fn kurtosis<T>(arr: &PrimitiveArray<T>) -> KurtosisState
678
where
679
T: NativeType + AsPrimitive<f64>,
680
{
681
let mut out = KurtosisState::default();
682
if arr.has_nulls() {
683
chunk_as_float(arr.non_null_values_iter(), |chunk| {
684
out.combine(&KurtosisState::new(chunk))
685
});
686
} else {
687
chunk_as_float(arr.values().iter().copied(), |chunk| {
688
out.combine(&KurtosisState::new(chunk))
689
});
690
}
691
out
692
}
693
694