Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-lazy/src/tests/aggregations.rs
8448 views
1
use polars_ops::prelude::ListNameSpaceImpl;
2
use polars_utils::unitvec;
3
4
use super::*;
5
6
#[test]
7
#[cfg(feature = "dtype-datetime")]
8
fn test_agg_list_type() -> PolarsResult<()> {
9
let s = Series::new("foo".into(), &[1, 2, 3]);
10
let s = s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))?;
11
12
let l = unsafe { s.agg_list(&GroupsType::Idx(vec![(0, unitvec![0, 1, 2])].into())) };
13
14
let result = match l.dtype() {
15
DataType::List(inner) => {
16
matches!(&**inner, DataType::Datetime(TimeUnit::Nanoseconds, None))
17
},
18
_ => false,
19
};
20
assert!(result);
21
22
Ok(())
23
}
24
25
#[test]
26
fn test_agg_exprs() -> PolarsResult<()> {
27
let df = fruits_cars();
28
29
// a binary expression followed by a function and an aggregation. See if it runs
30
let out = df
31
.lazy()
32
.group_by_stable([col("cars")])
33
.agg([(lit(1) - col("A"))
34
.map(|s| Ok(&s * 2), |_, f| Ok(f.clone()))
35
.alias("foo")])
36
.collect()?;
37
let ca = out.column("foo")?.list()?;
38
let out = ca.lst_lengths();
39
40
assert_eq!(Vec::from(&out), &[Some(4), Some(1)]);
41
Ok(())
42
}
43
44
#[test]
45
fn test_agg_unique_first() -> PolarsResult<()> {
46
let df = df![
47
"g"=> [1, 1, 2, 2, 3, 4, 1],
48
"v"=> [1, 2, 2, 2, 3, 4, 1],
49
]?;
50
51
let out = df
52
.lazy()
53
.group_by_stable([col("g")])
54
.agg([
55
col("v").unique().first().alias("v_first"),
56
col("v")
57
.unique()
58
.sort(Default::default())
59
.first()
60
.alias("true_first"),
61
col("v").unique().implode(),
62
])
63
.collect()?;
64
65
let a = out.column("v_first").unwrap();
66
let a = a.as_materialized_series().sum::<i32>().unwrap();
67
// can be both because unique does not guarantee order
68
assert!(a == 10 || a == 11);
69
70
let a = out.column("true_first").unwrap();
71
let a = a.as_materialized_series().sum::<i32>().unwrap();
72
// can be both because unique does not guarantee order
73
assert_eq!(a, 10);
74
75
Ok(())
76
}
77
78
#[test]
79
#[cfg(feature = "cum_agg")]
80
fn test_cum_sum_agg_as_key() -> PolarsResult<()> {
81
let df = df![
82
"depth" => &[0i32, 1, 2, 3, 4, 5, 6, 7, 8, 9],
83
"soil" => &["peat", "peat", "peat", "silt", "silt", "silt", "sand", "sand", "peat", "peat"]
84
]?;
85
// this checks if the grouper can work with the complex query as a key
86
87
let out = df
88
.lazy()
89
.group_by([col("soil")
90
.neq(col("soil").shift_and_fill(lit(1), col("soil").first()))
91
.cum_sum(false)
92
.alias("key")])
93
.agg([col("depth").max().name().keep()])
94
.sort(["depth"], Default::default())
95
.collect()?;
96
97
assert_eq!(
98
Vec::from(out.column("key")?.u32()?),
99
&[Some(0), Some(1), Some(2), Some(3)]
100
);
101
assert_eq!(
102
Vec::from(out.column("depth")?.i32()?),
103
&[Some(2), Some(5), Some(7), Some(9)]
104
);
105
106
Ok(())
107
}
108
109
#[test]
110
#[cfg(feature = "moment")]
111
fn test_auto_skew_kurtosis_agg() -> PolarsResult<()> {
112
let df = fruits_cars();
113
114
let out = df
115
.lazy()
116
.group_by([col("fruits")])
117
.agg([
118
col("B").skew(false).alias("bskew"),
119
col("B").kurtosis(false, false).alias("bkurt"),
120
])
121
.collect()?;
122
123
assert!(matches!(out.column("bskew")?.dtype(), DataType::Float64));
124
assert!(matches!(out.column("bkurt")?.dtype(), DataType::Float64));
125
126
Ok(())
127
}
128
129
#[test]
130
fn test_auto_list_agg() -> PolarsResult<()> {
131
let df = fruits_cars();
132
133
// test if alias executor adds a list after shift and fill
134
let out = df
135
.clone()
136
.lazy()
137
.group_by([col("fruits")])
138
.agg([col("B").shift_and_fill(lit(-1), lit(-1)).alias("foo")])
139
.collect()?;
140
141
assert!(matches!(out.column("foo")?.dtype(), DataType::List(_)));
142
143
// test if it runs and group_by executor thus implements a list after shift_and_fill
144
let _out = df
145
.clone()
146
.lazy()
147
.group_by([col("fruits")])
148
.agg([col("B").shift_and_fill(lit(-1), lit(-1))])
149
.collect()?;
150
151
// test if window expr executor adds list
152
let _out = df
153
.clone()
154
.lazy()
155
.select([col("B").shift_and_fill(lit(-1), lit(-1)).alias("foo")])
156
.collect()?;
157
158
let _out = df
159
.lazy()
160
.select([col("B").shift_and_fill(lit(-1), lit(-1))])
161
.collect()?;
162
Ok(())
163
}
164
#[test]
165
#[cfg(feature = "rolling_window")]
166
fn test_power_in_agg_list1() -> PolarsResult<()> {
167
let df = fruits_cars();
168
169
// this test if the group tuples are correctly updated after
170
// a flat apply on a final aggregation
171
let out = df
172
.lazy()
173
.group_by([col("fruits")])
174
.agg([
175
col("A")
176
.rolling_min(RollingOptionsFixedWindow {
177
window_size: 1,
178
..Default::default()
179
})
180
.alias("input"),
181
col("A")
182
.rolling_min(RollingOptionsFixedWindow {
183
window_size: 1,
184
..Default::default()
185
})
186
.pow(2.0)
187
.alias("foo"),
188
])
189
.sort(
190
["fruits"],
191
SortMultipleOptions::default().with_order_descending(true),
192
)
193
.collect()?;
194
195
let agg = out.column("foo")?.list()?;
196
let first = agg.get_as_series(0).unwrap();
197
let vals = first.f64()?;
198
assert_eq!(Vec::from(vals), &[Some(1.0), Some(4.0), Some(25.0)]);
199
200
Ok(())
201
}
202
203
#[test]
204
#[cfg(feature = "rolling_window")]
205
fn test_power_in_agg_list2() -> PolarsResult<()> {
206
let df = fruits_cars();
207
208
// this test if the group tuples are correctly updated after
209
// a flat apply on evaluate_on_groups
210
let out = df
211
.lazy()
212
.group_by([col("fruits")])
213
.agg([col("A")
214
.rolling_min(RollingOptionsFixedWindow {
215
window_size: 2,
216
min_periods: 2,
217
..Default::default()
218
})
219
.pow(2.0)
220
.sum()
221
.alias("foo")])
222
.sort(
223
["fruits"],
224
SortMultipleOptions::default().with_order_descending(true),
225
)
226
.collect()?;
227
228
let agg = out.column("foo")?.f64()?;
229
assert_eq!(Vec::from(agg), &[Some(5.0), Some(9.0)]);
230
231
Ok(())
232
}
233
#[test]
234
fn test_binary_agg_context_0() -> PolarsResult<()> {
235
let df = df![
236
"groups" => [1, 1, 2, 2, 3, 3],
237
"vals" => [1, 2, 3, 4, 5, 6]
238
]
239
.unwrap();
240
241
let out = df
242
.lazy()
243
.group_by_stable([col("groups")])
244
.agg([when(col("vals").first().neq(lit(1)))
245
.then(repeat(lit("a"), len()))
246
.otherwise(repeat(lit("b"), len()))
247
.alias("foo")])
248
.collect()
249
.unwrap();
250
251
let out = out.column("foo")?;
252
let out = out.explode(ExplodeOptions {
253
empty_as_null: true,
254
keep_nulls: true,
255
})?;
256
let out = out.str()?;
257
assert_eq!(
258
Vec::from(out),
259
&[
260
Some("b"),
261
Some("b"),
262
Some("a"),
263
Some("a"),
264
Some("a"),
265
Some("a")
266
]
267
);
268
Ok(())
269
}
270
271
// just like binary expression, this must be changed. This can work
272
#[test]
273
fn test_binary_agg_context_1() -> PolarsResult<()> {
274
let df = df![
275
"groups" => [1, 1, 2, 2, 3, 3],
276
"vals" => [1, 13, 3, 87, 1, 6]
277
]?;
278
279
// groups
280
// 1 => [1, 13]
281
// 2 => [3, 87]
282
// 3 => [1, 6]
283
284
let out = df
285
.clone()
286
.lazy()
287
.group_by_stable([col("groups")])
288
.agg([when(col("vals").eq(lit(1)))
289
.then(col("vals").sum())
290
.otherwise(lit(90))
291
.alias("vals")])
292
.collect()?;
293
294
// if vals == 1 then sum(vals) else vals
295
// [14, 90]
296
// [90, 90]
297
// [7, 90]
298
let out = out.column("vals")?;
299
let out = out.explode(ExplodeOptions {
300
empty_as_null: true,
301
keep_nulls: true,
302
})?;
303
let out = out.i32()?;
304
assert_eq!(
305
Vec::from(out),
306
&[Some(14), Some(90), Some(90), Some(90), Some(7), Some(90)]
307
);
308
309
let out = df
310
.lazy()
311
.group_by_stable([col("groups")])
312
.agg([when(col("vals").eq(lit(1)))
313
.then(lit(90))
314
.otherwise(col("vals").sum())
315
.alias("vals")])
316
.collect()?;
317
318
// if vals == 1 then 90 else sum(vals)
319
// [90, 14]
320
// [90, 90]
321
// [90, 7]
322
let out = out.column("vals")?;
323
let out = out.explode(ExplodeOptions {
324
empty_as_null: true,
325
keep_nulls: true,
326
})?;
327
let out = out.i32()?;
328
assert_eq!(
329
Vec::from(out),
330
&[Some(90), Some(14), Some(90), Some(90), Some(90), Some(7)]
331
);
332
333
Ok(())
334
}
335
336
#[test]
337
fn test_binary_agg_context_2() -> PolarsResult<()> {
338
let df = df![
339
"groups" => [1, 1, 2, 2, 3, 3],
340
"vals" => [1, 2, 3, 4, 5, 6]
341
]?;
342
343
// this is complex because we first aggregate one expression of the binary operation.
344
345
let out = df
346
.clone()
347
.lazy()
348
.group_by_stable([col("groups")])
349
.agg([(col("vals").first() - col("vals")).alias("vals")])
350
.collect()?;
351
352
// 0 - [1, 2] = [0, -1]
353
// 3 - [3, 4] = [0, -1]
354
// 5 - [5, 6] = [0, -1]
355
let out = out.column("vals")?;
356
let out = out.explode(ExplodeOptions {
357
empty_as_null: true,
358
keep_nulls: true,
359
})?;
360
let out = out.i32()?;
361
assert_eq!(
362
Vec::from(out),
363
&[Some(0), Some(-1), Some(0), Some(-1), Some(0), Some(-1)]
364
);
365
366
// Same, but now we reverse the lhs / rhs.
367
let out = df
368
.lazy()
369
.group_by_stable([col("groups")])
370
.agg([((col("vals")) - col("vals").first()).alias("vals")])
371
.collect()?;
372
373
// [1, 2] - 1 = [0, 1]
374
// [3, 4] - 3 = [0, 1]
375
// [5, 6] - 5 = [0, 1]
376
let out = out.column("vals")?;
377
let out = out.explode(ExplodeOptions {
378
empty_as_null: true,
379
keep_nulls: true,
380
})?;
381
let out = out.i32()?;
382
assert_eq!(
383
Vec::from(out),
384
&[Some(0), Some(1), Some(0), Some(1), Some(0), Some(1)]
385
);
386
387
Ok(())
388
}
389
390
#[test]
391
fn test_binary_agg_context_3() -> PolarsResult<()> {
392
let df = fruits_cars();
393
394
let out = df
395
.lazy()
396
.group_by_stable([col("cars")])
397
.agg([(col("A") - col("A").first()).last().alias("last")])
398
.collect()?;
399
400
let out = out.column("last")?;
401
assert_eq!(out.get(0)?, AnyValue::Int32(4));
402
assert_eq!(out.get(1)?, AnyValue::Int32(0));
403
404
Ok(())
405
}
406
407
#[test]
408
fn test_shift_elementwise_issue_2509() -> PolarsResult<()> {
409
let df = df![
410
"x"=> [0, 0, 0, 1, 1, 1, 2, 2, 2],
411
"y"=> [0, 10, 20, 0, 10, 20, 0, 10, 20]
412
]?;
413
let out = df
414
.lazy()
415
// Don't use maintain order here! That hides the bug
416
.group_by([col("x")])
417
.agg(&[(col("y").shift(lit(-1)) + col("x")).alias("sum")])
418
.sort(["x"], Default::default())
419
.collect()?;
420
421
let out = out.explode(
422
["sum"],
423
ExplodeOptions {
424
empty_as_null: true,
425
keep_nulls: true,
426
},
427
)?;
428
let out = out.column("sum")?;
429
assert_eq!(out.get(0)?, AnyValue::Int32(10));
430
assert_eq!(out.get(1)?, AnyValue::Int32(20));
431
assert_eq!(out.get(2)?, AnyValue::Null);
432
assert_eq!(out.get(3)?, AnyValue::Int32(11));
433
assert_eq!(out.get(4)?, AnyValue::Int32(21));
434
assert_eq!(out.get(5)?, AnyValue::Null);
435
436
Ok(())
437
}
438
439
#[test]
440
fn take_aggregations() -> PolarsResult<()> {
441
let df = df![
442
"user" => ["lucy", "bob", "bob", "lucy", "tim"],
443
"book" => ["c", "b", "a", "a", "a"],
444
"count" => [3, 1, 2, 1, 1]
445
]?;
446
447
let out = df
448
.clone()
449
.lazy()
450
.group_by([col("user")])
451
.agg([col("book")
452
.get(col("count").arg_max(), false)
453
.alias("fav_book")])
454
.sort(["user"], Default::default())
455
.collect()?;
456
457
let s = out.column("fav_book")?;
458
assert_eq!(s.get(0)?, AnyValue::String("a"));
459
assert_eq!(s.get(1)?, AnyValue::String("c"));
460
assert_eq!(s.get(2)?, AnyValue::String("a"));
461
462
let out = df
463
.clone()
464
.lazy()
465
.group_by([col("user")])
466
.agg([
467
// keep the head as it test slice correctness
468
col("book")
469
.gather(col("count").arg_sort(true, false).head(Some(2)))
470
.alias("ordered"),
471
])
472
.sort(["user"], Default::default())
473
.collect()?;
474
let s = out.column("ordered")?;
475
let flat = s.explode(ExplodeOptions {
476
empty_as_null: true,
477
keep_nulls: true,
478
})?;
479
let flat = flat.str()?;
480
let vals = flat.into_no_null_iter().collect::<Vec<_>>();
481
assert_eq!(vals, ["a", "b", "c", "a", "a"]);
482
483
let out = df
484
.lazy()
485
.group_by([col("user")])
486
.agg([col("book").get(lit(0), false).alias("take_lit")])
487
.sort(["user"], Default::default())
488
.collect()?;
489
490
let taken = out.column("take_lit")?;
491
let taken = taken.str()?;
492
let vals = taken.into_no_null_iter().collect::<Vec<_>>();
493
assert_eq!(vals, ["b", "c", "a"]);
494
495
Ok(())
496
}
497
#[test]
498
fn test_take_consistency() -> PolarsResult<()> {
499
let df = fruits_cars();
500
let out = df
501
.clone()
502
.lazy()
503
.select([col("A").arg_sort(true, false).get(lit(0), false)])
504
.collect()?;
505
506
let a = out.column("A")?;
507
let a = a.idx()?;
508
assert_eq!(a.get(0), Some(4));
509
510
let out = df
511
.clone()
512
.lazy()
513
.group_by_stable([col("cars")])
514
.agg([col("A").arg_sort(true, false).get(lit(0), false)])
515
.collect()?;
516
517
let out = out.column("A")?;
518
let out = out.idx()?;
519
assert_eq!(Vec::from(out), &[Some(3), Some(0)]);
520
521
let out_df = df
522
.lazy()
523
.group_by_stable([col("cars")])
524
.agg([
525
col("A"),
526
col("A").arg_sort(true, false).get(lit(0), false).alias("1"),
527
col("A")
528
.get(col("A").arg_sort(true, false).get(lit(0), false), false)
529
.alias("2"),
530
])
531
.collect()?;
532
533
let out = out_df.column("2")?;
534
let out = out.i32()?;
535
assert_eq!(Vec::from(out), &[Some(5), Some(2)]);
536
537
let out = out_df.column("1")?;
538
let out = out.idx()?;
539
assert_eq!(Vec::from(out), &[Some(3), Some(0)]);
540
541
Ok(())
542
}
543
544
#[test]
545
fn test_take_in_groups() -> PolarsResult<()> {
546
let df = fruits_cars();
547
548
let out = df
549
.lazy()
550
.sort(["fruits"], Default::default())
551
.select([col("B")
552
.get(lit(0u32), false)
553
.over([col("fruits")])
554
.alias("taken")])
555
.collect()?;
556
557
assert_eq!(
558
Vec::from(out.column("taken")?.i32()?),
559
&[Some(3), Some(3), Some(5), Some(5), Some(5)]
560
);
561
Ok(())
562
}
563
564
#[test]
565
fn test_anonymous_function_returns_scalar_all_null_20679() {
566
use std::sync::Arc;
567
568
fn reduction_function(column: Column) -> PolarsResult<Column> {
569
let val = column.get(0)?.into_static();
570
let col = Column::new_scalar("".into(), Scalar::new(column.dtype().clone(), val), 1);
571
Ok(col)
572
}
573
574
let a = Column::new("a".into(), &[0, 0, 1]);
575
let dtype = DataType::Null;
576
let b = Column::new_scalar("b".into(), Scalar::new(dtype, AnyValue::Null), 3);
577
let df = DataFrame::new_infer_height(vec![a, b]).unwrap();
578
579
let f = move |c: &mut [Column]| reduction_function(std::mem::take(&mut c[0]));
580
let dt = |_: &Schema, fs: &[Field]| Ok(fs[0].clone());
581
582
let f = BaseColumnUdf::new(f, dt);
583
584
let expr = Expr::AnonymousFunction {
585
input: vec![col("b")],
586
function: LazySerde::Deserialized(SpecialEq::new(Arc::new(f))),
587
options: FunctionOptions::aggregation(),
588
fmt_str: Box::new(PlSmallStr::EMPTY),
589
};
590
591
let grouped_df = df
592
.lazy()
593
.group_by([col("a")])
594
.agg([expr])
595
.collect()
596
.unwrap();
597
598
assert_eq!(grouped_df.columns()[1].dtype(), &DataType::Null);
599
}
600
601