Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-compute/src/moment.rs
8468 views
1
// Some formulae:
2
// mean_x = sum(weight[i] * x[i]) / sum(weight)
3
// dp_xy = weighted sum of deviation products of variables x, y, written in
4
// the paper as simply XY.
5
// dp_xy = sum(weight[i] * (x[i] - mean_x) * (y[i] - mean_y))
6
//
7
// cov(x, y) = dp_xy / sum(weight)
8
// var(x) = cov(x, x)
9
//
10
// Algorithms from:
11
// Numerically stable parallel computation of (co-)variance.
12
// Schubert, E. & Gertz, M. (2018).
13
//
14
// Key equations from the paper:
15
// (17) for mean update, (23) for dp update (and also Table 1).
16
//
17
//
18
// For higher moments we refer to:
19
// Numerically Stable, Scalable Formulas for Parallel and Online Computation of
20
// Higher-Order Multivariate Central Moments with Arbitrary Weights.
21
// Pébay, P. & Terriberry, T. B. & Kolla, H. & Bennett J. (2016)
22
//
23
// Key equations from paper:
24
// (3.26) mean update, (3.27) moment update.
25
//
26
// Here we use mk to mean the weighted kth central moment:
27
// mk = sum(weight[i] * (x[i] - mean_x)**k)
28
// Note that we'll use the terms m2 = dp = dp_xx if unambiguous.
29
30
#![allow(clippy::collapsible_else_if)]
31
32
use arrow::array::{Array, PrimitiveArray};
33
use arrow::types::NativeType;
34
use num_traits::AsPrimitive;
35
use polars_utils::algebraic_ops::*;
36
37
const CHUNK_SIZE: usize = 128;
38
39
#[derive(Default, Clone)]
40
#[repr(C)] // For serialization, don't change struct member order.
41
pub struct VarState {
42
weight: f64,
43
mean: f64,
44
dp: f64,
45
}
46
47
#[derive(Default, Clone)]
48
#[repr(C)] // For serialization, don't change struct member order.
49
pub struct CovState {
50
weight: f64,
51
mean_x: f64,
52
mean_y: f64,
53
dp_xy: f64,
54
}
55
56
#[derive(Default, Clone)]
57
#[repr(C)] // For serialization, don't change struct member order.
58
pub struct PearsonState {
59
weight: f64,
60
mean_x: f64,
61
mean_y: f64,
62
dp_xx: f64,
63
dp_xy: f64,
64
dp_yy: f64,
65
}
66
67
impl VarState {
68
fn new(x: &[f64]) -> Self {
69
if x.is_empty() {
70
return Self::default();
71
}
72
73
let weight = x.len() as f64;
74
let mean = alg_sum_f64(x.iter().copied()) / weight;
75
Self {
76
weight,
77
mean,
78
dp: alg_sum_f64(x.iter().map(|&xi| (xi - mean) * (xi - mean))),
79
}
80
}
81
82
fn clear_zero_weight_nan(&mut self) {
83
// Clear NaNs due to division by zero.
84
if self.weight == 0.0 {
85
self.mean = 0.0;
86
self.dp = 0.0;
87
}
88
}
89
90
pub fn insert_one(&mut self, x: f64) {
91
// Just a specialized version of
92
// self.combine(&Self { weight: 1.0, mean: x, dp: 0.0 })
93
let new_weight = self.weight + 1.0;
94
let delta_mean = x - self.mean;
95
let new_mean = self.mean + delta_mean / new_weight;
96
self.dp += (x - new_mean) * delta_mean;
97
self.weight = new_weight;
98
self.mean = new_mean;
99
self.clear_zero_weight_nan();
100
}
101
102
pub fn remove_one(&mut self, x: f64) {
103
// Just a specialized version of
104
// self.combine(&Self { weight: -1.0, mean: x, dp: 0.0 })
105
let new_weight = self.weight - 1.0;
106
let delta_mean = x - self.mean;
107
let new_mean = self.mean - delta_mean / new_weight;
108
self.dp -= (x - new_mean) * delta_mean;
109
self.weight = new_weight;
110
self.mean = new_mean;
111
self.clear_zero_weight_nan();
112
}
113
114
pub fn combine(&mut self, other: &Self) {
115
if other.weight == 0.0 {
116
return;
117
}
118
119
let new_weight = self.weight + other.weight;
120
let other_weight_frac = other.weight / new_weight;
121
let delta_mean = other.mean - self.mean;
122
let new_mean = self.mean + delta_mean * other_weight_frac;
123
self.dp += other.dp + other.weight * (other.mean - new_mean) * delta_mean;
124
self.weight = new_weight;
125
self.mean = new_mean;
126
self.clear_zero_weight_nan();
127
}
128
129
pub fn finalize(&self, ddof: u8) -> Option<f64> {
130
if self.weight <= ddof as f64 {
131
None
132
} else {
133
let var = self.dp / (self.weight - ddof as f64);
134
Some(if var < 0.0 {
135
// Variance can't be negative, except through numerical instability.
136
// We don't use f64::max here so we propagate nans.
137
0.0
138
} else {
139
var
140
})
141
}
142
}
143
}
144
145
impl CovState {
146
fn new(x: &[f64], y: &[f64]) -> Self {
147
assert!(x.len() == y.len());
148
if x.is_empty() {
149
return Self::default();
150
}
151
152
let weight = x.len() as f64;
153
let inv_weight = 1.0 / weight;
154
let mean_x = alg_sum_f64(x.iter().copied()) * inv_weight;
155
let mean_y = alg_sum_f64(y.iter().copied()) * inv_weight;
156
Self {
157
weight,
158
mean_x,
159
mean_y,
160
dp_xy: alg_sum_f64(
161
x.iter()
162
.zip(y)
163
.map(|(&xi, &yi)| (xi - mean_x) * (yi - mean_y)),
164
),
165
}
166
}
167
168
pub fn combine(&mut self, other: &Self) {
169
if other.weight == 0.0 {
170
return;
171
} else if self.weight == 0.0 {
172
*self = other.clone();
173
return;
174
}
175
176
let new_weight = self.weight + other.weight;
177
let other_weight_frac = other.weight / new_weight;
178
let delta_mean_x = other.mean_x - self.mean_x;
179
let delta_mean_y = other.mean_y - self.mean_y;
180
let new_mean_x = self.mean_x + delta_mean_x * other_weight_frac;
181
let new_mean_y = self.mean_y + delta_mean_y * other_weight_frac;
182
self.dp_xy += other.dp_xy + other.weight * (other.mean_x - new_mean_x) * delta_mean_y;
183
self.weight = new_weight;
184
self.mean_x = new_mean_x;
185
self.mean_y = new_mean_y;
186
}
187
188
pub fn finalize(&self, ddof: u8) -> Option<f64> {
189
if self.weight <= ddof as f64 {
190
None
191
} else {
192
Some(self.dp_xy / (self.weight - ddof as f64))
193
}
194
}
195
}
196
197
impl PearsonState {
198
fn new(x: &[f64], y: &[f64]) -> Self {
199
assert!(x.len() == y.len());
200
if x.is_empty() {
201
return Self::default();
202
}
203
204
let weight = x.len() as f64;
205
let inv_weight = 1.0 / weight;
206
let mean_x = alg_sum_f64(x.iter().copied()) * inv_weight;
207
let mean_y = alg_sum_f64(y.iter().copied()) * inv_weight;
208
let mut dp_xx = 0.0;
209
let mut dp_xy = 0.0;
210
let mut dp_yy = 0.0;
211
for (xi, yi) in x.iter().zip(y.iter()) {
212
dp_xx = alg_add_f64(dp_xx, (xi - mean_x) * (xi - mean_x));
213
dp_xy = alg_add_f64(dp_xy, (xi - mean_x) * (yi - mean_y));
214
dp_yy = alg_add_f64(dp_yy, (yi - mean_y) * (yi - mean_y));
215
}
216
Self {
217
weight,
218
mean_x,
219
mean_y,
220
dp_xx,
221
dp_xy,
222
dp_yy,
223
}
224
}
225
226
pub fn combine(&mut self, other: &Self) {
227
if other.weight == 0.0 {
228
return;
229
} else if self.weight == 0.0 {
230
*self = other.clone();
231
return;
232
}
233
234
let new_weight = self.weight + other.weight;
235
let other_weight_frac = other.weight / new_weight;
236
let delta_mean_x = other.mean_x - self.mean_x;
237
let delta_mean_y = other.mean_y - self.mean_y;
238
let new_mean_x = self.mean_x + delta_mean_x * other_weight_frac;
239
let new_mean_y = self.mean_y + delta_mean_y * other_weight_frac;
240
self.dp_xx += other.dp_xx + other.weight * (other.mean_x - new_mean_x) * delta_mean_x;
241
self.dp_xy += other.dp_xy + other.weight * (other.mean_x - new_mean_x) * delta_mean_y;
242
self.dp_yy += other.dp_yy + other.weight * (other.mean_y - new_mean_y) * delta_mean_y;
243
self.weight = new_weight;
244
self.mean_x = new_mean_x;
245
self.mean_y = new_mean_y;
246
}
247
248
pub fn finalize(&self) -> f64 {
249
let denom_sq = self.dp_xx * self.dp_yy;
250
if denom_sq > 0.0 {
251
self.dp_xy / denom_sq.sqrt()
252
} else {
253
f64::NAN
254
}
255
}
256
}
257
258
#[derive(Default, Clone)]
259
#[repr(C)] // For serialization, don't change struct member order.
260
pub struct SkewState {
261
weight: f64,
262
mean: f64,
263
m2: f64,
264
m3: f64,
265
}
266
267
impl SkewState {
268
fn new(x: &[f64]) -> Self {
269
Self::from_iter(x.iter().copied(), x.len())
270
}
271
272
fn from_iter(iter: impl Iterator<Item = f64> + Clone, length: usize) -> Self {
273
if length == 0 {
274
return Self::default();
275
}
276
277
let weight = length as f64;
278
let mean = alg_sum_f64(iter.clone()) / weight;
279
let mut m2 = 0.0;
280
let mut m3 = 0.0;
281
for xi in iter {
282
let d = xi - mean;
283
let d2 = d * d;
284
let d3 = d * d2;
285
m2 = alg_add_f64(m2, d2);
286
m3 = alg_add_f64(m3, d3);
287
}
288
Self {
289
weight,
290
mean,
291
m2,
292
m3,
293
}
294
}
295
296
fn clear_zero_weight_nan(&mut self) {
297
// Clear NaNs due to division by zero.
298
if self.weight == 0.0 {
299
self.mean = 0.0;
300
self.m2 = 0.0;
301
self.m3 = 0.0;
302
}
303
}
304
305
pub fn from_array(arr: &PrimitiveArray<f64>, start: usize, length: usize) -> Self {
306
let validity = arr.validity().cloned();
307
let validity = validity
308
.map(|v| v.sliced(start, length))
309
.filter(|v| v.unset_bits() > 0);
310
311
match validity {
312
None => Self::new(&arr.values().as_slice()[start..][..length]),
313
Some(validity) => {
314
let iter = arr.values()[start..][..length].iter().copied();
315
let iter = iter
316
.zip(validity.iter())
317
.filter_map(|(x, v)| v.then_some(x));
318
Self::from_iter(iter, validity.set_bits())
319
},
320
}
321
}
322
323
pub fn insert_one(&mut self, x: f64) {
324
// Specialization of self.combine(&SkewState { weight: 1.0, mean: x, m2: 0.0, m3: 0.0 });
325
let new_weight = self.weight + 1.0;
326
let delta_mean = x - self.mean;
327
let delta_mean_weight = delta_mean / new_weight;
328
let new_mean = self.mean + delta_mean_weight;
329
330
let weight_diff = self.weight - 1.0;
331
let m2_update = (x - new_mean) * delta_mean;
332
let new_m2 = self.m2 + m2_update;
333
let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff - 3.0 * self.m2);
334
335
self.weight = new_weight;
336
self.mean = new_mean;
337
self.m2 = new_m2;
338
self.m3 = new_m3;
339
self.clear_zero_weight_nan();
340
}
341
342
pub fn remove_one(&mut self, x: f64) {
343
// Specialization of self.combine(&SkewState { weight: -1.0, mean: x, m2: 0.0, m3: 0.0 });
344
let new_weight = self.weight - 1.0;
345
let delta_mean = x - self.mean;
346
let delta_mean_weight = delta_mean / new_weight;
347
let new_mean = self.mean - delta_mean_weight;
348
349
let weight_diff = self.weight + 1.0;
350
let m2_update = (new_mean - x) * delta_mean;
351
let new_m2 = self.m2 + m2_update;
352
let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff + 3.0 * self.m2);
353
354
self.weight = new_weight;
355
self.mean = new_mean;
356
self.m2 = new_m2;
357
self.m3 = new_m3;
358
self.clear_zero_weight_nan();
359
}
360
361
pub fn combine(&mut self, other: &Self) {
362
if other.weight == 0.0 {
363
return;
364
} else if self.weight == 0.0 {
365
*self = other.clone();
366
return;
367
}
368
369
let new_weight = self.weight + other.weight;
370
let delta_mean = other.mean - self.mean;
371
let delta_mean_weight = delta_mean / new_weight;
372
let new_mean = self.mean + other.weight * delta_mean_weight;
373
374
let weight_diff = self.weight - other.weight;
375
let self_weight_other_m2 = self.weight * other.m2;
376
let other_weight_self_m2 = other.weight * self.m2;
377
let m2_update = other.weight * (other.mean - new_mean) * delta_mean;
378
let new_m2 = self.m2 + other.m2 + m2_update;
379
let new_m3 = self.m3
380
+ other.m3
381
+ delta_mean_weight
382
* (m2_update * weight_diff + 3.0 * (self_weight_other_m2 - other_weight_self_m2));
383
384
self.weight = new_weight;
385
self.mean = new_mean;
386
self.m2 = new_m2;
387
self.m3 = new_m3;
388
self.clear_zero_weight_nan();
389
}
390
391
pub fn finalize(&self, bias: bool) -> Option<f64> {
392
let m2 = self.m2 / self.weight;
393
let m3 = self.m3 / self.weight;
394
let is_zero = m2 <= (f64::EPSILON * self.mean).powi(2);
395
let biased_est = if is_zero { f64::NAN } else { m3 / m2.powf(1.5) };
396
if bias {
397
if self.weight == 0.0 {
398
None
399
} else {
400
Some(biased_est)
401
}
402
} else {
403
if self.weight <= 2.0 {
404
None
405
} else {
406
let correction = (self.weight * (self.weight - 1.0)).sqrt() / (self.weight - 2.0);
407
Some(correction * biased_est)
408
}
409
}
410
}
411
}
412
413
#[derive(Default, Clone)]
414
#[repr(C)] // For serialization, don't change struct member order.
415
pub struct KurtosisState {
416
weight: f64,
417
mean: f64,
418
m2: f64,
419
m3: f64,
420
m4: f64,
421
}
422
423
impl KurtosisState {
424
pub fn new(x: &[f64]) -> Self {
425
Self::from_iter(x.iter().copied(), x.len())
426
}
427
428
pub fn from_iter(iter: impl Iterator<Item = f64> + Clone, length: usize) -> Self {
429
if length == 0 {
430
return Self::default();
431
}
432
433
let weight = length as f64;
434
let mean = alg_sum_f64(iter.clone()) / weight;
435
let mut m2 = 0.0;
436
let mut m3 = 0.0;
437
let mut m4 = 0.0;
438
for xi in iter {
439
let d = xi - mean;
440
let d2 = d * d;
441
let d3 = d * d2;
442
let d4 = d2 * d2;
443
m2 = alg_add_f64(m2, d2);
444
m3 = alg_add_f64(m3, d3);
445
m4 = alg_add_f64(m4, d4);
446
}
447
Self {
448
weight,
449
mean,
450
m2,
451
m3,
452
m4,
453
}
454
}
455
456
pub fn from_array(arr: &PrimitiveArray<f64>, start: usize, length: usize) -> Self {
457
let validity = arr.validity().cloned();
458
let validity = validity
459
.map(|v| v.sliced(start, length))
460
.filter(|v| v.unset_bits() > 0);
461
462
match validity {
463
None => Self::new(&arr.values().as_slice()[start..][..length]),
464
Some(validity) => {
465
let iter = arr.values()[start..][..length].iter().copied();
466
let iter = iter
467
.zip(validity.iter())
468
.filter_map(|(x, v)| v.then_some(x));
469
Self::from_iter(iter, validity.set_bits())
470
},
471
}
472
}
473
474
fn clear_zero_weight_nan(&mut self) {
475
// Clear NaNs due to division by zero.
476
if self.weight == 0.0 {
477
self.mean = 0.0;
478
self.m2 = 0.0;
479
self.m3 = 0.0;
480
self.m4 = 0.0;
481
}
482
}
483
484
pub fn insert_one(&mut self, x: f64) {
485
// Specialization of self.combine(&KurtosisState { weight: 1.0, mean: x, m2: 0.0, m3: 0.0, m4: 0.0 });
486
let new_weight = self.weight + 1.0;
487
let delta_mean = x - self.mean;
488
let delta_mean_weight = delta_mean / new_weight;
489
let new_mean = self.mean + delta_mean_weight;
490
491
let weight_diff = self.weight - 1.0;
492
let m2_update = (x - new_mean) * delta_mean;
493
let new_m2 = self.m2 + m2_update;
494
let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff - 3.0 * self.m2);
495
let new_m4 = self.m4
496
+ delta_mean_weight
497
* (delta_mean_weight
498
* (m2_update * (self.weight * weight_diff + 1.0) + 6.0 * self.m2)
499
- 4.0 * self.m3);
500
501
self.weight = new_weight;
502
self.mean = new_mean;
503
self.m2 = new_m2;
504
self.m3 = new_m3;
505
self.m4 = new_m4;
506
self.clear_zero_weight_nan();
507
}
508
509
pub fn remove_one(&mut self, x: f64) {
510
// Specialization of self.combine(&KurtosisState { weight: -1.0, mean: x, m2: 0.0, m3: 0.0, m4: 0.0 });
511
let new_weight = self.weight - 1.0;
512
let delta_mean = x - self.mean;
513
let delta_mean_weight = delta_mean / new_weight;
514
let new_mean = self.mean - delta_mean_weight;
515
516
let weight_diff = self.weight + 1.0;
517
let m2_update = (new_mean - x) * delta_mean;
518
let new_m2 = self.m2 + m2_update;
519
let new_m3 = self.m3 + delta_mean_weight * (m2_update * weight_diff + 3.0 * self.m2);
520
let new_m4 = self.m4
521
+ delta_mean_weight
522
* (delta_mean_weight
523
* (m2_update * (self.weight * weight_diff + 1.0) + 6.0 * self.m2)
524
+ 4.0 * self.m3);
525
526
self.weight = new_weight;
527
self.mean = new_mean;
528
self.m2 = new_m2;
529
self.m3 = new_m3;
530
self.m4 = new_m4;
531
self.clear_zero_weight_nan();
532
}
533
534
pub fn combine(&mut self, other: &Self) {
535
if other.weight == 0.0 {
536
return;
537
} else if self.weight == 0.0 {
538
*self = other.clone();
539
return;
540
}
541
542
let new_weight = self.weight + other.weight;
543
let delta_mean = other.mean - self.mean;
544
let delta_mean_weight = delta_mean / new_weight;
545
let new_mean = self.mean + other.weight * delta_mean_weight;
546
547
let weight_diff = self.weight - other.weight;
548
let self_weight_other_m2 = self.weight * other.m2;
549
let other_weight_self_m2 = other.weight * self.m2;
550
let m2_update = other.weight * (other.mean - new_mean) * delta_mean;
551
let new_m2 = self.m2 + other.m2 + m2_update;
552
let new_m3 = self.m3
553
+ other.m3
554
+ delta_mean_weight
555
* (m2_update * weight_diff + 3.0 * (self_weight_other_m2 - other_weight_self_m2));
556
let new_m4 = self.m4
557
+ other.m4
558
+ delta_mean_weight
559
* (delta_mean_weight
560
* (m2_update * (self.weight * weight_diff + other.weight * other.weight)
561
+ 6.0
562
* (self.weight * self_weight_other_m2
563
+ other.weight * other_weight_self_m2))
564
+ 4.0 * (self.weight * other.m3 - other.weight * self.m3));
565
566
self.weight = new_weight;
567
self.mean = new_mean;
568
self.m2 = new_m2;
569
self.m3 = new_m3;
570
self.m4 = new_m4;
571
self.clear_zero_weight_nan();
572
}
573
574
pub fn finalize(&self, fisher: bool, bias: bool) -> Option<f64> {
575
let m4 = self.m4 / self.weight;
576
let m2 = self.m2 / self.weight;
577
let is_zero = m2 <= (f64::EPSILON * self.mean).powi(2);
578
let biased_est = if is_zero { f64::NAN } else { m4 / (m2 * m2) };
579
let out = if bias {
580
if self.weight == 0.0 {
581
return None;
582
}
583
584
biased_est
585
} else {
586
if self.weight <= 3.0 {
587
return None;
588
}
589
590
let n = self.weight;
591
let nm1_nm2 = (n - 1.0) / (n - 2.0);
592
let np1_nm3 = (n + 1.0) / (n - 3.0);
593
let nm1_nm3 = (n - 1.0) / (n - 3.0);
594
nm1_nm2 * (np1_nm3 * biased_est - 3.0 * nm1_nm3) + 3.0
595
};
596
597
if fisher { Some(out - 3.0) } else { Some(out) }
598
}
599
}
600
601
fn chunk_as_float<T, I, F>(it: I, mut f: F)
602
where
603
T: NativeType + AsPrimitive<f64>,
604
I: IntoIterator<Item = T>,
605
F: FnMut(&[f64]),
606
{
607
let mut chunk = [0.0; CHUNK_SIZE];
608
let mut i = 0;
609
for val in it {
610
if i >= CHUNK_SIZE {
611
f(&chunk);
612
i = 0;
613
}
614
chunk[i] = val.as_();
615
i += 1;
616
}
617
if i > 0 {
618
f(&chunk[..i]);
619
}
620
}
621
622
fn chunk_as_float_binary<T, U, I, F>(it: I, mut f: F)
623
where
624
T: NativeType + AsPrimitive<f64>,
625
U: NativeType + AsPrimitive<f64>,
626
I: IntoIterator<Item = (T, U)>,
627
F: FnMut(&[f64], &[f64]),
628
{
629
let mut left_chunk = [0.0; CHUNK_SIZE];
630
let mut right_chunk = [0.0; CHUNK_SIZE];
631
let mut i = 0;
632
for (l, r) in it {
633
if i >= CHUNK_SIZE {
634
f(&left_chunk, &right_chunk);
635
i = 0;
636
}
637
left_chunk[i] = l.as_();
638
right_chunk[i] = r.as_();
639
i += 1;
640
}
641
if i > 0 {
642
f(&left_chunk[..i], &right_chunk[..i]);
643
}
644
}
645
646
pub fn var<T>(arr: &PrimitiveArray<T>) -> VarState
647
where
648
T: NativeType + AsPrimitive<f64>,
649
{
650
let mut out = VarState::default();
651
if arr.has_nulls() {
652
chunk_as_float(arr.non_null_values_iter(), |chunk| {
653
out.combine(&VarState::new(chunk))
654
});
655
} else {
656
chunk_as_float(arr.values().iter().copied(), |chunk| {
657
out.combine(&VarState::new(chunk))
658
});
659
}
660
out
661
}
662
663
pub fn cov<T, U>(x: &PrimitiveArray<T>, y: &PrimitiveArray<U>) -> CovState
664
where
665
T: NativeType + AsPrimitive<f64>,
666
U: NativeType + AsPrimitive<f64>,
667
{
668
assert!(x.len() == y.len());
669
let mut out = CovState::default();
670
if x.has_nulls() || y.has_nulls() {
671
chunk_as_float_binary(
672
x.iter()
673
.zip(y.iter())
674
.filter_map(|(l, r)| l.copied().zip(r.copied())),
675
|l, r| out.combine(&CovState::new(l, r)),
676
);
677
} else {
678
chunk_as_float_binary(
679
x.values().iter().copied().zip(y.values().iter().copied()),
680
|l, r| out.combine(&CovState::new(l, r)),
681
);
682
}
683
out
684
}
685
686
pub fn pearson_corr<T, U>(x: &PrimitiveArray<T>, y: &PrimitiveArray<U>) -> PearsonState
687
where
688
T: NativeType + AsPrimitive<f64>,
689
U: NativeType + AsPrimitive<f64>,
690
{
691
assert!(x.len() == y.len());
692
let mut out = PearsonState::default();
693
if x.has_nulls() || y.has_nulls() {
694
chunk_as_float_binary(
695
x.iter()
696
.zip(y.iter())
697
.filter_map(|(l, r)| l.copied().zip(r.copied())),
698
|l, r| out.combine(&PearsonState::new(l, r)),
699
);
700
} else {
701
chunk_as_float_binary(
702
x.values().iter().copied().zip(y.values().iter().copied()),
703
|l, r| out.combine(&PearsonState::new(l, r)),
704
);
705
}
706
out
707
}
708
709
pub fn skew<T>(arr: &PrimitiveArray<T>) -> SkewState
710
where
711
T: NativeType + AsPrimitive<f64>,
712
{
713
let mut out = SkewState::default();
714
if arr.has_nulls() {
715
chunk_as_float(arr.non_null_values_iter(), |chunk| {
716
out.combine(&SkewState::new(chunk))
717
});
718
} else {
719
chunk_as_float(arr.values().iter().copied(), |chunk| {
720
out.combine(&SkewState::new(chunk))
721
});
722
}
723
out
724
}
725
726
pub fn kurtosis<T>(arr: &PrimitiveArray<T>) -> KurtosisState
727
where
728
T: NativeType + AsPrimitive<f64>,
729
{
730
let mut out = KurtosisState::default();
731
if arr.has_nulls() {
732
chunk_as_float(arr.non_null_values_iter(), |chunk| {
733
out.combine(&KurtosisState::new(chunk))
734
});
735
} else {
736
chunk_as_float(arr.values().iter().copied(), |chunk| {
737
out.combine(&KurtosisState::new(chunk))
738
});
739
}
740
out
741
}
742
743