Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/operations/test_pivot.py
8422 views
1
from __future__ import annotations
2
3
from datetime import date, timedelta
4
from typing import TYPE_CHECKING, Any
5
6
import pytest
7
8
import polars as pl
9
import polars.selectors as cs
10
from polars.exceptions import ComputeError, DuplicateError
11
from polars.testing import assert_frame_equal
12
13
if TYPE_CHECKING:
14
from polars._typing import PivotAgg, PolarsIntegerType, PolarsTemporalType
15
16
17
def test_pivot() -> None:
18
df = pl.DataFrame(
19
{
20
"foo": ["A", "A", "B", "B", "C"],
21
"bar": ["k", "l", "m", "n", "o"],
22
"N": [1, 2, 2, 4, 2],
23
}
24
)
25
result = df.pivot("bar", values="N", aggregate_function=None)
26
27
expected = pl.DataFrame(
28
[
29
("A", 1, 2, None, None, None),
30
("B", None, None, 2, 4, None),
31
("C", None, None, None, None, 2),
32
],
33
schema=["foo", "k", "l", "m", "n", "o"],
34
orient="row",
35
)
36
assert_frame_equal(result, expected)
37
38
39
def test_pivot_no_values() -> None:
40
df = pl.DataFrame(
41
{
42
"foo": ["A", "A", "B", "B", "C"],
43
"bar": ["k", "l", "m", "n", "o"],
44
"N1": [1, 2, 2, 4, 2],
45
"N2": [1, 2, 2, 4, 2],
46
}
47
)
48
result = df.pivot(on="bar", index="foo", aggregate_function=None)
49
expected = pl.DataFrame(
50
{
51
"foo": ["A", "B", "C"],
52
"N1_k": [1, None, None],
53
"N1_l": [2, None, None],
54
"N1_m": [None, 2, None],
55
"N1_n": [None, 4, None],
56
"N1_o": [None, None, 2],
57
"N2_k": [1, None, None],
58
"N2_l": [2, None, None],
59
"N2_m": [None, 2, None],
60
"N2_n": [None, 4, None],
61
"N2_o": [None, None, 2],
62
}
63
)
64
65
assert_frame_equal(result, expected)
66
67
68
def test_pivot_list() -> None:
69
df = pl.DataFrame({"a": [1, 2, 3], "b": [[1, 1], [2, 2], [3, 3]]})
70
71
expected = pl.DataFrame(
72
{
73
"a": [1, 2, 3],
74
"1": [[1, 1], None, None],
75
"2": [None, [2, 2], None],
76
"3": [None, None, [3, 3]],
77
}
78
)
79
out = df.pivot(
80
index="a",
81
on="a",
82
values="b",
83
aggregate_function="first",
84
sort_columns=True,
85
)
86
assert_frame_equal(out, expected)
87
88
89
@pytest.mark.parametrize(
90
("agg_fn", "expected_rows"),
91
[
92
("first", [("a", 2, None, None), ("b", None, None, 10)]),
93
("len", [("a", 2, 0, 0), ("b", 0, 2, 1)]),
94
("min", [("a", 2, None, None), ("b", None, 8, 10)]),
95
("max", [("a", 4, None, None), ("b", None, 8, 10)]),
96
("sum", [("a", 6, 0, 0), ("b", 0, 8, 10)]),
97
("mean", [("a", 3.0, None, None), ("b", None, 8.0, 10.0)]),
98
("median", [("a", 3.0, None, None), ("b", None, 8.0, 10.0)]),
99
],
100
)
101
def test_pivot_aggregate(agg_fn: PivotAgg, expected_rows: list[tuple[Any]]) -> None:
102
df = pl.DataFrame(
103
{
104
"a": [1, 1, 2, 2, 3],
105
"b": ["a", "a", "b", "b", "b"],
106
"c": [2, 4, None, 8, 10],
107
}
108
)
109
result = df.pivot(
110
index="b", on="a", values="c", aggregate_function=agg_fn, sort_columns=True
111
)
112
assert result.rows() == expected_rows
113
114
115
def test_pivot_categorical_3968() -> None:
116
df = pl.DataFrame(
117
{
118
"foo": ["one", "one", "one", "two", "two", "two"],
119
"bar": ["A", "B", "C", "A", "B", "C"],
120
"baz": [1, 2, 3, 4, 5, 6],
121
}
122
)
123
124
result = df.with_columns(pl.col("baz").cast(str).cast(pl.Categorical))
125
126
expected = pl.DataFrame(
127
{
128
"foo": ["one", "one", "one", "two", "two", "two"],
129
"bar": ["A", "B", "C", "A", "B", "C"],
130
"baz": ["1", "2", "3", "4", "5", "6"],
131
},
132
schema_overrides={"baz": pl.Categorical},
133
)
134
assert_frame_equal(result, expected, categorical_as_str=True)
135
136
137
@pytest.mark.parametrize("maintain_order", [False, True])
138
def test_pivot_categorical_index(maintain_order: bool) -> None:
139
df = pl.DataFrame(
140
{"A": ["Fire", "Water", "Water", "Fire"], "B": ["Car", "Car", "Car", "Ship"]},
141
schema=[("A", pl.Categorical), ("B", pl.Categorical)],
142
)
143
144
result = df.pivot(
145
index=["A"],
146
on="B",
147
values="B",
148
aggregate_function="len",
149
maintain_order=maintain_order,
150
)
151
expected = pl.DataFrame(
152
{"A": ["Fire", "Water"], "Car": [1, 2], "Ship": [1, 0]},
153
schema={
154
"A": pl.Categorical,
155
"Car": pl.get_index_type(),
156
"Ship": pl.get_index_type(),
157
},
158
)
159
assert_frame_equal(result, expected, check_row_order=maintain_order)
160
161
# test expression dispatch
162
result = df.pivot(
163
index=["A"],
164
on="B",
165
values="B",
166
aggregate_function=pl.len(),
167
maintain_order=maintain_order,
168
)
169
assert_frame_equal(result, expected, check_row_order=maintain_order)
170
171
df = pl.DataFrame(
172
{
173
"A": ["Fire", "Water", "Water", "Fire"],
174
"B": ["Car", "Car", "Car", "Ship"],
175
"C": ["Paper", "Paper", "Paper", "Paper"],
176
},
177
schema=[("A", pl.Categorical), ("B", pl.Categorical), ("C", pl.Categorical)],
178
)
179
result = df.pivot(
180
index=["A", "C"],
181
on="B",
182
values="B",
183
aggregate_function="len",
184
maintain_order=maintain_order,
185
)
186
expected = pl.DataFrame(
187
{
188
"A": ["Fire", "Water"],
189
"C": ["Paper", "Paper"],
190
"Car": [1, 2],
191
"Ship": [1, 0],
192
},
193
schema={
194
"A": pl.Categorical,
195
"C": pl.Categorical,
196
"Car": pl.get_index_type(),
197
"Ship": pl.get_index_type(),
198
},
199
)
200
assert_frame_equal(result, expected, check_row_order=maintain_order)
201
202
203
def test_pivot_multiple_values_column_names_5116() -> None:
204
df = pl.DataFrame(
205
{
206
"x1": [1, 2, 3, 4, 5, 6, 7, 8],
207
"x2": [8, 7, 6, 5, 4, 3, 2, 1],
208
"c1": ["A", "B"] * 4,
209
"c2": ["C", "C", "D", "D"] * 2,
210
}
211
)
212
213
with pytest.raises(
214
ComputeError,
215
match="aggregation 'item' expected no or a single value, got 2 values",
216
):
217
df.pivot(
218
index="c1",
219
on="c2",
220
values=["x1", "x2"],
221
separator="|",
222
aggregate_function=None,
223
)
224
225
result = df.pivot(
226
index="c1",
227
on="c2",
228
values=["x1", "x2"],
229
separator="|",
230
aggregate_function="first",
231
)
232
expected = {
233
"c1": ["A", "B"],
234
"x1|C": [1, 2],
235
"x1|D": [3, 4],
236
"x2|C": [8, 7],
237
"x2|D": [6, 5],
238
}
239
assert result.to_dict(as_series=False) == expected
240
241
242
@pytest.mark.parametrize("maintain_order", [False, True])
243
def test_pivot_duplicate_names_7731(maintain_order: bool) -> None:
244
df = pl.DataFrame(
245
{
246
"a": [1, 4],
247
"b": [1.5, 2.5],
248
"c": ["x", "x"],
249
"d": [7, 8],
250
"e": ["x", "y"],
251
}
252
)
253
result = df.pivot(
254
index=cs.float(),
255
on=cs.string(),
256
values=cs.integer(),
257
aggregate_function="first",
258
maintain_order=maintain_order,
259
)
260
expected = pl.DataFrame(
261
{
262
"b": [1.5, 2.5],
263
'a_{"x","x"}': [1, None],
264
'a_{"x","y"}': [None, 4],
265
'd_{"x","x"}': [7, None],
266
'd_{"x","y"}': [None, 8],
267
}
268
)
269
assert_frame_equal(result, expected, check_row_order=maintain_order)
270
271
272
def test_pivot_duplicate_names_11663() -> None:
273
df = pl.DataFrame({"a": [1, 2], "b": [1, 2], "c": ["x", "x"], "d": ["x", "y"]})
274
result = df.pivot(index="b", on=["c", "d"], values="a").to_dict(as_series=False)
275
expected = {"b": [1, 2], '{"x","x"}': [1, None], '{"x","y"}': [None, 2]}
276
assert result == expected
277
278
279
def test_pivot_multiple_columns_12407() -> None:
280
df = pl.DataFrame(
281
{
282
"a": ["beep", "bop"],
283
"b": ["a", "b"],
284
"c": ["s", "f"],
285
"d": [7, 8],
286
"e": ["x", "y"],
287
}
288
)
289
result = df.pivot(
290
index="b", on=["c", "e"], values=["a"], aggregate_function="len"
291
).to_dict(as_series=False)
292
expected = {"b": ["a", "b"], '{"s","x"}': [1, 0], '{"f","y"}': [0, 1]}
293
assert result == expected
294
295
296
def test_pivot_struct_13120() -> None:
297
df = pl.DataFrame(
298
{
299
"index": [1, 2, 3, 1, 2, 3],
300
"item_type": ["a", "a", "a", "b", "b", "b"],
301
"item_id": [123, 123, 123, 456, 456, 456],
302
"values": [4, 5, 6, 7, 8, 9],
303
}
304
)
305
df = df.with_columns(pl.struct(["item_type", "item_id"]).alias("columns")).drop(
306
"item_type", "item_id"
307
)
308
result = df.pivot(index="index", on="columns", values="values").to_dict(
309
as_series=False
310
)
311
expected = {"index": [1, 2, 3], '{"a",123}': [4, 5, 6], '{"b",456}': [7, 8, 9]}
312
assert result == expected
313
314
315
def test_pivot_index_struct_14101() -> None:
316
df = pl.DataFrame(
317
{
318
"a": [1, 2, 1],
319
"b": [{"a": 1}, {"a": 1}, {"a": 2}],
320
"c": ["x", "y", "y"],
321
"d": [1, 1, 3],
322
}
323
)
324
result = df.pivot(index="b", on="c", values="a")
325
expected = pl.DataFrame({"b": [{"a": 1}, {"a": 2}], "x": [1, None], "y": [2, 1]})
326
assert_frame_equal(result, expected)
327
328
329
def test_pivot_nested_struct_17065() -> None:
330
df = pl.DataFrame(
331
{
332
"foo": ["one", "two", "one", "two"],
333
"bar": ["x", "x", "y", "y"],
334
"baz": [
335
{"a": 1, "b": {"c": 2}},
336
{"a": 3, "b": {"c": 4}},
337
{"a": 5, "b": {"c": 6}},
338
{"a": 7, "b": {"c": 8}},
339
],
340
}
341
)
342
result = df.pivot(on="bar", index="foo", values="baz")
343
expected = pl.DataFrame(
344
{
345
"foo": ["one", "two"],
346
"x": [
347
{"a": 1, "b": {"c": 2}},
348
{"a": 3, "b": {"c": 4}},
349
],
350
"y": [
351
{"a": 5, "b": {"c": 6}},
352
{"a": 7, "b": {"c": 8}},
353
],
354
}
355
)
356
assert_frame_equal(result, expected)
357
358
359
def test_pivot_name_already_exists() -> None:
360
# This should be extremely rare...but still, good to check it
361
df = pl.DataFrame(
362
{
363
"a": ["a", "b"],
364
"b": ["b", "a"],
365
'{"a","b"}': [1, 2],
366
}
367
)
368
with pytest.raises(DuplicateError, match="has more than one occurrence"):
369
df.pivot(
370
["a", "b"],
371
index='{"a","b"}',
372
values="a",
373
aggregate_function="first",
374
)
375
376
377
def test_pivot_floats() -> None:
378
df = pl.DataFrame(
379
{
380
"article": ["a", "a", "a", "b", "b", "b"],
381
"weight": [1.0, 1.0, 4.4, 1.0, 8.8, 1.0],
382
"quantity": [1.0, 5.0, 1.0, 1.0, 1.0, 7.5],
383
"price": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0],
384
}
385
)
386
387
with pytest.raises(
388
ComputeError,
389
match="aggregation 'item' expected no or a single value, got 2 values",
390
):
391
result = df.pivot(
392
index="weight", on="quantity", values="price", aggregate_function=None
393
)
394
395
result = df.pivot(
396
index="weight", on="quantity", values="price", aggregate_function="first"
397
)
398
expected = {
399
"weight": [1.0, 4.4, 8.8],
400
"1.0": [1.0, 3.0, 5.0],
401
"5.0": [2.0, None, None],
402
"7.5": [6.0, None, None],
403
}
404
assert result.to_dict(as_series=False) == expected
405
406
result = df.pivot(
407
index=["article", "weight"],
408
on="quantity",
409
values="price",
410
aggregate_function=None,
411
)
412
expected = {
413
"article": ["a", "a", "b", "b"],
414
"weight": [1.0, 4.4, 1.0, 8.8],
415
"1.0": [1.0, 3.0, 4.0, 5.0],
416
"5.0": [2.0, None, None, None],
417
"7.5": [None, None, 6.0, None],
418
}
419
assert result.to_dict(as_series=False) == expected
420
421
422
def test_pivot_reinterpret_5907() -> None:
423
df = pl.DataFrame(
424
{
425
"A": pl.Series([3, -2, 3, -2], dtype=pl.Int32),
426
"B": ["x", "x", "y", "y"],
427
"C": [100, 50, 500, -80],
428
}
429
)
430
431
result = df.pivot(
432
index=["A"], on=["B"], values=["C"], aggregate_function=pl.element().sum()
433
)
434
expected = {"A": [3, -2], "x": [100, 50], "y": [500, -80]}
435
assert result.to_dict(as_series=False) == expected
436
437
438
@pytest.mark.parametrize(
439
"dtype",
440
[
441
pl.Date,
442
pl.Datetime("ms"),
443
pl.Datetime("ns"),
444
pl.Datetime("us"),
445
pl.Datetime("ms", time_zone="Asia/Shanghai"),
446
pl.Duration("ms"),
447
pl.Duration("us"),
448
pl.Duration("ns"),
449
pl.Time,
450
],
451
)
452
def test_pivot_temporal_logical_types(dtype: PolarsTemporalType) -> None:
453
idx = pl.Series([7, 8, 9, 0, 1, 2, 3, 4]).cast(dtype)
454
df = pl.DataFrame(
455
{
456
"idx": idx,
457
"foo": ["a"] * 3 + ["b"] * 5,
458
"value": [0] * 8,
459
}
460
)
461
assert df.pivot(
462
index="idx", on="foo", values="value", aggregate_function=None
463
).to_dict(as_series=False) == {
464
"idx": idx.to_list(),
465
"a": [0, 0, 0, None, None, None, None, None],
466
"b": [None, None, None, 0, 0, 0, 0, 0],
467
}
468
469
470
def test_pivot_negative_duration() -> None:
471
df1 = pl.DataFrame({"root": [date(2020, i, 15) for i in (1, 2)]})
472
df2 = pl.DataFrame({"delta": [timedelta(days=i) for i in (-2, -1, 0, 1)]})
473
474
df = df1.join(df2, how="cross", maintain_order="left_right").with_columns(
475
pl.Series(name="value", values=range(len(df1) * len(df2)))
476
)
477
assert df.pivot(
478
index="delta", on="root", values="value", aggregate_function=None
479
).to_dict(as_series=False) == {
480
"delta": [
481
timedelta(days=-2),
482
timedelta(days=-1),
483
timedelta(0),
484
timedelta(days=1),
485
],
486
"2020-01-15": [0, 1, 2, 3],
487
"2020-02-15": [4, 5, 6, 7],
488
}
489
490
491
def test_aggregate_function_default() -> None:
492
df = pl.DataFrame({"a": [1, 2], "b": ["foo", "foo"], "c": ["x", "x"]})
493
with pytest.raises(
494
ComputeError,
495
match="aggregation 'item' expected no or a single value, got 2 values",
496
):
497
df.pivot(index="b", on="c", values="a")
498
499
500
def test_pivot_aggregate_function_count_deprecated() -> None:
501
df = pl.DataFrame(
502
{
503
"foo": ["A", "A", "B", "B", "C"],
504
"N": [1, 2, 2, 4, 2],
505
"bar": ["k", "l", "m", "n", "o"],
506
}
507
)
508
with pytest.deprecated_call():
509
df.pivot(index="foo", on="bar", values="N", aggregate_function="count") # type: ignore[arg-type]
510
511
512
def test_pivot_struct() -> None:
513
data = {
514
"id": ["a", "a", "b", "c", "c", "c"],
515
"week": ["1", "2", "3", "4", "3", "1"],
516
"num1": [1, 3, 5, 4, 3, 6],
517
"num2": [4, 5, 3, 4, 6, 6],
518
}
519
df = pl.DataFrame(data).with_columns(nums=pl.struct(["num1", "num2"]))
520
521
assert df.pivot(
522
values="nums", index="id", on="week", aggregate_function="first"
523
).to_dict(as_series=False) == {
524
"id": ["a", "b", "c"],
525
"1": [
526
{"num1": 1, "num2": 4},
527
None,
528
{"num1": 6, "num2": 6},
529
],
530
"2": [
531
{"num1": 3, "num2": 5},
532
None,
533
None,
534
],
535
"3": [
536
None,
537
{"num1": 5, "num2": 3},
538
{"num1": 3, "num2": 6},
539
],
540
"4": [
541
None,
542
None,
543
{"num1": 4, "num2": 4},
544
],
545
}
546
547
548
def test_duplicate_column_names_which_should_raise_14305() -> None:
549
df = pl.DataFrame({"a": [1, 3, 2], "c": ["a", "a", "a"], "d": [7, 8, 9]})
550
with pytest.raises(DuplicateError, match="has more than one occurrence"):
551
df.pivot(index="a", on="c", values="d")
552
553
554
def test_multi_index_containing_struct() -> None:
555
df = pl.DataFrame(
556
{
557
"a": [1, 2, 1],
558
"b": [{"a": 1}, {"a": 1}, {"a": 2}],
559
"c": ["x", "y", "y"],
560
"d": [1, 1, 3],
561
}
562
)
563
result = df.pivot(index=("b", "d"), on="c", values="a")
564
expected = pl.DataFrame(
565
{"b": [{"a": 1}, {"a": 2}], "d": [1, 3], "x": [1, None], "y": [2, 1]}
566
)
567
assert_frame_equal(result, expected)
568
569
570
def test_list_pivot() -> None:
571
df = pl.DataFrame(
572
{
573
"a": [1, 2, 3, 1],
574
"b": [[1, 2], [3, 4], [5, 6], [1, 2]],
575
"c": ["x", "x", "y", "y"],
576
"d": [1, 2, 3, 4],
577
}
578
)
579
assert df.pivot(
580
index=["a", "b"],
581
on="c",
582
values="d",
583
).to_dict(as_series=False) == {
584
"a": [1, 2, 3],
585
"b": [[1, 2], [3, 4], [5, 6]],
586
"x": [1, 2, None],
587
"y": [4, None, 3],
588
}
589
590
591
def test_pivot_string_17081() -> None:
592
df = pl.DataFrame(
593
{
594
"a": ["1", "2", "3"],
595
"b": ["4", "5", "6"],
596
"c": ["7", "8", "9"],
597
}
598
)
599
assert df.pivot(index="a", on="b", values="c", aggregate_function="min").to_dict(
600
as_series=False
601
) == {
602
"a": ["1", "2", "3"],
603
"4": ["7", None, None],
604
"5": [None, "8", None],
605
"6": [None, None, "9"],
606
}
607
608
609
def test_pivot_invalid() -> None:
610
with pytest.raises(
611
pl.exceptions.InvalidOperationError,
612
match="`pivot` needs either `index or `values` needs to be specified",
613
):
614
pl.DataFrame({"a": [1, 2], "b": [2, 3], "c": [3, 4]}).pivot("a")
615
616
617
@pytest.mark.parametrize(
618
"dtype",
619
[pl.Int8, pl.Int16, pl.Int32, pl.Int64, pl.UInt8, pl.UInt16, pl.UInt32, pl.UInt64],
620
)
621
def test_pivot_empty_index_dtypes(dtype: PolarsIntegerType) -> None:
622
index = pl.Series([], dtype=dtype)
623
df = pl.DataFrame({"index": index, "on": [], "values": []})
624
result = df.pivot(index="index", on="on", values="values")
625
expected = pl.DataFrame({"index": index})
626
assert_frame_equal(result, expected)
627
628
629
def test_pivot_agg_column_ref_invalid_22479() -> None:
630
df = pl.DataFrame(
631
{"a": ["x", "x", "x"], "b": [1, 1, 1], "c": [7, 8, 9], "d": [0, 2, 1]}
632
)
633
with pytest.raises(
634
pl.exceptions.InvalidOperationError,
635
match="explicit column references are not allowed in the `aggregate_function` of `pivot`",
636
):
637
df.pivot(
638
on="a", index="b", values="c", aggregate_function=pl.element().sort_by("d")
639
)
640
641
642
def test_pivot_agg_null_methods_23408() -> None:
643
df = pl.DataFrame(
644
{
645
"idx": [0, 0, 1, 1],
646
"on": ["a", "b", "a", "c"],
647
"val": ["aa", "bb", "aa", "cc"],
648
}
649
)
650
out = df.pivot(
651
on="on",
652
index="idx",
653
values="val",
654
aggregate_function=pl.element().first().is_null(),
655
)
656
expected = pl.DataFrame(
657
{"idx": [0, 1], "a": [False, False], "b": [False, True], "c": [True, False]}
658
)
659
assert_frame_equal(out, expected)
660
661
out = df.pivot(
662
on="on",
663
index="idx",
664
values="val",
665
aggregate_function=pl.element().first().fill_null("xx"),
666
)
667
expected = pl.DataFrame(
668
{"idx": [0, 1], "a": ["aa", "aa"], "b": ["bb", "xx"], "c": ["xx", "cc"]}
669
)
670
assert_frame_equal(out, expected)
671
672
673
def test_pivot_obj_25527() -> None:
674
df = pl.DataFrame(
675
{
676
"idx": [0, 0, 1, 1],
677
"key": ["foo", "bar", "foo", "bar"],
678
"value": ["obj 0 foo", "obj 0 bar", "obj 1 foo", "obj 1 bar"],
679
},
680
schema={
681
"idx": pl.Int64,
682
"key": pl.String,
683
"value": pl.Object,
684
},
685
)
686
687
out = df.pivot(on="key", index="idx")
688
assert out["foo"].to_list() == ["obj 0 foo", "obj 1 foo"]
689
assert out["foo"].dtype == pl.Object
690
assert out["bar"].to_list() == ["obj 0 bar", "obj 1 bar"]
691
assert out["bar"].dtype == pl.Object
692
693