Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/lazyframe/test_lazyframe.py
6939 views
1
from __future__ import annotations
2
3
import re
4
from datetime import date, datetime
5
from functools import reduce
6
from inspect import signature
7
from operator import add
8
from string import ascii_letters
9
from typing import TYPE_CHECKING, Any, Callable, NoReturn, cast
10
11
import numpy as np
12
import pytest
13
14
import polars as pl
15
import polars.selectors as cs
16
from polars import lit, when
17
from polars.exceptions import (
18
InvalidOperationError,
19
PerformanceWarning,
20
PolarsInefficientMapWarning,
21
)
22
from polars.testing import assert_frame_equal, assert_series_equal
23
from tests.unit.conftest import FLOAT_DTYPES, NUMERIC_DTYPES
24
25
if TYPE_CHECKING:
26
from _pytest.capture import CaptureFixture
27
28
from polars._typing import PolarsDataType
29
30
31
def test_init_signature_match() -> None:
32
# eager/lazy init signatures are expected to match; if this test fails, it
33
# means a parameter was added to one but not the other, and that should be
34
# fixed (or an explicit exemption should be made here, with an explanation)
35
assert signature(pl.DataFrame.__init__) == signature(pl.LazyFrame.__init__)
36
37
38
def test_lazy_misc() -> None:
39
ldf = pl.LazyFrame({"a": [1, 2, 3], "b": [1.0, 2.0, 3.0]})
40
_ = ldf.with_columns(pl.lit(1).alias("foo")).select([pl.col("a"), pl.col("foo")])
41
42
# test if it executes
43
_ = ldf.with_columns(
44
when(pl.col("a") > pl.lit(2)).then(pl.lit(10)).otherwise(pl.lit(1)).alias("new")
45
).collect()
46
47
48
def test_implode() -> None:
49
ldf = pl.LazyFrame({"a": [1, 2, 3], "b": [1.0, 2.0, 3.0]})
50
eager = (
51
ldf.group_by(pl.col("a").alias("grp"), maintain_order=True)
52
.agg(pl.implode("a", "b").name.suffix("_imp"))
53
.collect()
54
)
55
assert_frame_equal(
56
eager,
57
pl.DataFrame(
58
{
59
"grp": [1, 2, 3],
60
"a_imp": [[1], [2], [3]],
61
"b_imp": [[1.0], [2.0], [3.0]],
62
}
63
),
64
)
65
66
67
def test_lazyframe_membership_operator() -> None:
68
ldf = pl.LazyFrame({"name": ["Jane", "John"], "age": [20, 30]})
69
assert "name" in ldf
70
assert "phone" not in ldf
71
72
# note: cannot use lazyframe in boolean context
73
with pytest.raises(TypeError, match="ambiguous"):
74
not ldf
75
76
77
def test_apply() -> None:
78
ldf = pl.LazyFrame({"a": [1, 2, 3], "b": [1.0, 2.0, 3.0]})
79
new = ldf.with_columns_seq(
80
pl.col("a").map_batches(lambda s: s * 2, return_dtype=pl.Int64).alias("foo")
81
)
82
expected = ldf.clone().with_columns((pl.col("a") * 2).alias("foo"))
83
assert_frame_equal(new, expected)
84
assert_frame_equal(new.collect(), expected.collect())
85
86
with pytest.warns(PolarsInefficientMapWarning, match="with this one instead"):
87
for strategy in ["thread_local", "threading"]:
88
ldf = pl.LazyFrame({"a": [1, 2, 3] * 20, "b": [1.0, 2.0, 3.0] * 20})
89
new = ldf.with_columns(
90
pl.col("a")
91
.map_elements(lambda s: s * 2, strategy=strategy, return_dtype=pl.Int64) # type: ignore[arg-type]
92
.alias("foo")
93
)
94
expected = ldf.clone().with_columns((pl.col("a") * 2).alias("foo"))
95
assert_frame_equal(new.collect(), expected.collect())
96
97
98
def test_add_eager_column() -> None:
99
lf = pl.LazyFrame({"a": [1, 2, 3], "b": [1.0, 2.0, 3.0]})
100
assert lf.collect_schema().len() == 2
101
102
out = lf.with_columns(pl.lit(pl.Series("c", [1, 2, 3]))).collect()
103
assert out["c"].sum() == 6
104
assert out.collect_schema().len() == 3
105
106
107
def test_set_null() -> None:
108
ldf = pl.LazyFrame({"a": [1, 2, 3], "b": [1.0, 2.0, 3.0]})
109
out = ldf.with_columns(
110
when(pl.col("a") > 1).then(lit(None)).otherwise(100).alias("foo")
111
).collect()
112
s = out["foo"]
113
assert s[0] == 100
114
assert s[1] is None
115
assert s[2] is None
116
117
118
def test_gather_every() -> None:
119
ldf = pl.LazyFrame({"a": [1, 2, 3, 4], "b": ["w", "x", "y", "z"]})
120
expected_df = pl.DataFrame({"a": [1, 3], "b": ["w", "y"]})
121
assert_frame_equal(expected_df, ldf.gather_every(2).collect())
122
expected_df = pl.DataFrame({"a": [2, 4], "b": ["x", "z"]})
123
assert_frame_equal(expected_df, ldf.gather_every(2, offset=1).collect())
124
125
126
def test_agg() -> None:
127
df = pl.DataFrame({"a": [1, 2, 3], "b": [1.0, 2.0, 3.0]})
128
ldf = df.lazy().min()
129
res = ldf.collect()
130
assert res.shape == (1, 2)
131
assert res.row(0) == (1, 1.0)
132
133
134
def test_count_suffix_10783() -> None:
135
df = pl.DataFrame(
136
{
137
"a": [["a", "c", "b"], ["a", "b", "c"], ["a", "d", "c"], ["c", "a", "b"]],
138
"b": [["a", "c", "b"], ["a", "b", "c"], ["a", "d", "c"], ["c", "a", "b"]],
139
}
140
)
141
df_with_cnt = df.with_columns(
142
pl.len()
143
.over(pl.col("a").list.sort().list.join("").hash())
144
.name.suffix("_suffix")
145
)
146
df_expect = df.with_columns(pl.Series("len_suffix", [3, 3, 1, 3]))
147
assert_frame_equal(df_with_cnt, df_expect, check_dtypes=False)
148
149
150
def test_or() -> None:
151
ldf = pl.LazyFrame({"a": [1, 2, 3], "b": [1.0, 2.0, 3.0]})
152
out = ldf.filter((pl.col("a") == 1) | (pl.col("b") > 2)).collect()
153
assert out.rows() == [(1, 1.0), (3, 3.0)]
154
155
156
def test_filter_str() -> None:
157
# use a str instead of a column expr
158
ldf = pl.LazyFrame(
159
{
160
"time": ["11:11:00", "11:12:00", "11:13:00", "11:14:00"],
161
"bools": [True, False, True, False],
162
}
163
)
164
165
# last row based on a filter
166
result = ldf.filter(pl.col("bools")).select_seq(pl.last("*")).collect()
167
expected = pl.DataFrame({"time": ["11:13:00"], "bools": [True]})
168
assert_frame_equal(result, expected)
169
170
# last row based on a filter
171
result = ldf.filter("bools").select(pl.last("*")).collect()
172
assert_frame_equal(result, expected)
173
174
175
def test_filter_multiple_predicates() -> None:
176
ldf = pl.LazyFrame(
177
{
178
"a": [1, 1, 1, 2, 2],
179
"b": [1, 1, 2, 2, 2],
180
"c": [1, 1, 2, 3, 4],
181
}
182
)
183
184
# multiple predicates
185
expected = pl.DataFrame({"a": [1, 1, 1], "b": [1, 1, 2], "c": [1, 1, 2]})
186
for out in (
187
ldf.filter(pl.col("a") == 1, pl.col("b") <= 2), # positional/splat
188
ldf.filter([pl.col("a") == 1, pl.col("b") <= 2]), # as list
189
):
190
assert_frame_equal(out.collect(), expected)
191
192
# multiple kwargs
193
assert_frame_equal(
194
ldf.filter(a=1, b=2).collect(),
195
pl.DataFrame({"a": [1], "b": [2], "c": [2]}),
196
)
197
198
# both positional and keyword args
199
assert_frame_equal(
200
ldf.filter(pl.col("c") < 4, a=2, b=2).collect(),
201
pl.DataFrame({"a": [2], "b": [2], "c": [3]}),
202
)
203
204
ldf = pl.LazyFrame(
205
{
206
"description": ["eq", "gt", "ge"],
207
"predicate": ["==", ">", ">="],
208
},
209
)
210
assert ldf.filter(predicate="==").select("description").collect().item() == "eq"
211
212
213
@pytest.mark.parametrize(
214
"predicate",
215
[
216
[pl.lit(True)],
217
iter([pl.lit(True)]),
218
[True, True, True],
219
iter([True, True, True]),
220
(p for p in (pl.col("c") < 9,)),
221
(p for p in (pl.col("a") > 0, pl.col("b") > 0)),
222
],
223
)
224
def test_filter_seq_iterable_all_true(predicate: Any) -> None:
225
ldf = pl.LazyFrame(
226
{
227
"a": [1, 1, 1],
228
"b": [1, 1, 2],
229
"c": [3, 1, 2],
230
}
231
)
232
assert_frame_equal(ldf, ldf.filter(predicate))
233
234
235
def test_apply_custom_function() -> None:
236
ldf = pl.LazyFrame(
237
{
238
"A": [1, 2, 3, 4, 5],
239
"fruits": ["banana", "banana", "apple", "apple", "banana"],
240
"B": [5, 4, 3, 2, 1],
241
"cars": ["beetle", "audi", "beetle", "beetle", "beetle"],
242
}
243
)
244
245
# two ways to determine the length groups.
246
df = (
247
ldf.group_by("fruits")
248
.agg(
249
[
250
pl.col("cars")
251
.implode()
252
.map_elements(lambda groups: groups.len(), return_dtype=pl.Int64)
253
.alias("custom_1"),
254
pl.col("cars")
255
.implode()
256
.map_elements(lambda groups: groups.len(), return_dtype=pl.Int64)
257
.alias("custom_2"),
258
pl.count("cars").alias("cars_count"),
259
]
260
)
261
.sort("custom_1", descending=True)
262
).collect()
263
264
expected = pl.DataFrame(
265
{
266
"fruits": ["banana", "apple"],
267
"custom_1": [3, 2],
268
"custom_2": [3, 2],
269
"cars_count": [3, 2],
270
}
271
)
272
expected = expected.with_columns(pl.col("cars_count").cast(pl.UInt32))
273
assert_frame_equal(df, expected)
274
275
276
def test_group_by() -> None:
277
ldf = pl.LazyFrame(
278
{
279
"a": [1.0, None, 3.0, 4.0],
280
"b": [5.0, 2.5, -3.0, 2.0],
281
"grp": ["a", "a", "b", "b"],
282
}
283
)
284
expected_a = pl.DataFrame({"grp": ["a", "b"], "a": [1.0, 3.5]})
285
expected_a_b = pl.DataFrame({"grp": ["a", "b"], "a": [1.0, 3.5], "b": [3.75, -0.5]})
286
287
for out in (
288
ldf.group_by("grp").agg(pl.mean("a")).collect(),
289
ldf.group_by(pl.col("grp")).agg(pl.mean("a")).collect(),
290
):
291
assert_frame_equal(out.sort(by="grp"), expected_a)
292
293
out = ldf.group_by("grp").agg(pl.mean("a", "b")).collect()
294
assert_frame_equal(out.sort(by="grp"), expected_a_b)
295
296
297
def test_arg_unique() -> None:
298
ldf = pl.LazyFrame({"a": [4, 1, 4]})
299
col_a_unique = ldf.select(pl.col("a").arg_unique()).collect()["a"]
300
assert_series_equal(col_a_unique, pl.Series("a", [0, 1]).cast(pl.UInt32))
301
302
303
def test_arg_sort() -> None:
304
ldf = pl.LazyFrame({"a": [4, 1, 3]}).select(pl.col("a").arg_sort())
305
assert ldf.collect()["a"].to_list() == [1, 2, 0]
306
307
308
def test_window_function() -> None:
309
lf = pl.LazyFrame(
310
{
311
"A": [1, 2, 3, 4, 5],
312
"fruits": ["banana", "banana", "apple", "apple", "banana"],
313
"B": [5, 4, 3, 2, 1],
314
"cars": ["beetle", "audi", "beetle", "beetle", "beetle"],
315
}
316
)
317
assert lf.collect_schema().len() == 4
318
319
q = lf.with_columns(
320
pl.sum("A").over("fruits").alias("fruit_sum_A"),
321
pl.first("B").over("fruits").alias("fruit_first_B"),
322
pl.max("B").over("cars").alias("cars_max_B"),
323
)
324
assert q.collect_schema().len() == 7
325
326
assert q.collect()["cars_max_B"].to_list() == [5, 4, 5, 5, 5]
327
328
out = lf.select([pl.first("B").over(["fruits", "cars"]).alias("B_first")])
329
assert out.collect()["B_first"].to_list() == [5, 4, 3, 3, 5]
330
331
332
def test_when_then_flatten() -> None:
333
ldf = pl.LazyFrame({"foo": [1, 2, 3], "bar": [3, 4, 5]})
334
335
assert ldf.select(
336
when(pl.col("foo") > 1)
337
.then(pl.col("bar"))
338
.when(pl.col("bar") < 3)
339
.then(10)
340
.otherwise(30)
341
).collect()["bar"].to_list() == [30, 4, 5]
342
343
344
def test_describe_plan() -> None:
345
assert isinstance(pl.LazyFrame({"a": [1]}).explain(optimized=True), str)
346
assert isinstance(pl.LazyFrame({"a": [1]}).explain(optimized=False), str)
347
348
349
@pytest.mark.may_fail_cloud # reason: inspects logs
350
def test_inspect(capsys: CaptureFixture[str]) -> None:
351
ldf = pl.LazyFrame({"a": [1]})
352
ldf.inspect().collect()
353
captured = capsys.readouterr()
354
assert len(captured.out) > 0
355
356
ldf.select(pl.col("a").cum_sum().inspect().alias("bar")).collect()
357
res = capsys.readouterr()
358
assert len(res.out) > 0
359
360
361
@pytest.mark.may_fail_auto_streaming
362
def test_fetch(fruits_cars: pl.DataFrame) -> None:
363
with pytest.warns(DeprecationWarning):
364
res = fruits_cars.lazy().select("*").fetch(2)
365
assert_frame_equal(res, res[:2])
366
367
368
def test_fold_filter() -> None:
369
lf = pl.LazyFrame({"a": [1, 2, 3], "b": [0, 1, 2]})
370
371
out = lf.filter(
372
pl.fold(
373
acc=pl.lit(True),
374
function=lambda a, b: a & b,
375
exprs=[pl.col(c) > 1 for c in lf.collect_schema()],
376
)
377
).collect()
378
379
assert out.shape == (1, 2)
380
assert out.rows() == [(3, 2)]
381
382
out = lf.filter(
383
pl.fold(
384
acc=pl.lit(True),
385
function=lambda a, b: a | b,
386
exprs=[pl.col(c) > 1 for c in lf.collect_schema()],
387
)
388
).collect()
389
390
assert out.rows() == [(1, 0), (2, 1), (3, 2)]
391
392
393
def test_head_group_by() -> None:
394
commodity_prices = {
395
"commodity": [
396
"Wheat",
397
"Wheat",
398
"Wheat",
399
"Wheat",
400
"Corn",
401
"Corn",
402
"Corn",
403
"Corn",
404
"Corn",
405
],
406
"location": [
407
"StPaul",
408
"StPaul",
409
"StPaul",
410
"Chicago",
411
"Chicago",
412
"Chicago",
413
"Chicago",
414
"Chicago",
415
"Chicago",
416
],
417
"seller": [
418
"Bob",
419
"Charlie",
420
"Susan",
421
"Paul",
422
"Ed",
423
"Mary",
424
"Paul",
425
"Charlie",
426
"Norman",
427
],
428
"price": [1.0, 0.7, 0.8, 0.55, 2.0, 3.0, 2.4, 1.8, 2.1],
429
}
430
ldf = pl.LazyFrame(commodity_prices)
431
432
# this query flexes the wildcard exclusion quite a bit.
433
keys = ["commodity", "location"]
434
out = (
435
ldf.sort(by="price", descending=True)
436
.group_by(keys, maintain_order=True)
437
.agg([pl.col("*").exclude(keys).head(2).name.keep()])
438
.explode(cs.all().exclude(keys))
439
)
440
441
assert out.collect().rows() == [
442
("Corn", "Chicago", "Mary", 3.0),
443
("Corn", "Chicago", "Paul", 2.4),
444
("Wheat", "StPaul", "Bob", 1.0),
445
("Wheat", "StPaul", "Susan", 0.8),
446
("Wheat", "Chicago", "Paul", 0.55),
447
]
448
449
ldf = pl.LazyFrame(
450
{"letters": ["c", "c", "a", "c", "a", "b"], "nrs": [1, 2, 3, 4, 5, 6]}
451
)
452
out = ldf.group_by("letters").tail(2).sort("letters")
453
assert_frame_equal(
454
out.collect(),
455
pl.DataFrame({"letters": ["a", "a", "b", "c", "c"], "nrs": [3, 5, 6, 2, 4]}),
456
)
457
out = ldf.group_by("letters").head(2).sort("letters")
458
assert_frame_equal(
459
out.collect(),
460
pl.DataFrame({"letters": ["a", "a", "b", "c", "c"], "nrs": [3, 5, 6, 1, 2]}),
461
)
462
463
464
def test_is_null_is_not_null() -> None:
465
ldf = pl.LazyFrame({"nrs": [1, 2, None]}).select(
466
pl.col("nrs").is_null().alias("is_null"),
467
pl.col("nrs").is_not_null().alias("not_null"),
468
)
469
assert ldf.collect()["is_null"].to_list() == [False, False, True]
470
assert ldf.collect()["not_null"].to_list() == [True, True, False]
471
472
473
def test_is_nan_is_not_nan() -> None:
474
ldf = pl.LazyFrame({"nrs": np.array([1, 2, np.nan])}).select(
475
pl.col("nrs").is_nan().alias("is_nan"),
476
pl.col("nrs").is_not_nan().alias("not_nan"),
477
)
478
assert ldf.collect()["is_nan"].to_list() == [False, False, True]
479
assert ldf.collect()["not_nan"].to_list() == [True, True, False]
480
481
482
def test_is_finite_is_infinite() -> None:
483
ldf = pl.LazyFrame({"nrs": np.array([1, 2, np.inf])}).select(
484
pl.col("nrs").is_infinite().alias("is_inf"),
485
pl.col("nrs").is_finite().alias("not_inf"),
486
)
487
assert ldf.collect()["is_inf"].to_list() == [False, False, True]
488
assert ldf.collect()["not_inf"].to_list() == [True, True, False]
489
490
491
def test_len() -> None:
492
ldf = pl.LazyFrame({"nrs": [1, 2, 3]})
493
assert cast(int, ldf.select(pl.col("nrs").len()).collect().item()) == 3
494
495
496
@pytest.mark.parametrize("dtype", NUMERIC_DTYPES)
497
def test_cum_agg(dtype: PolarsDataType) -> None:
498
ldf = pl.LazyFrame({"a": [1, 2, 3, 2]}, schema={"a": dtype})
499
assert_series_equal(
500
ldf.select(pl.col("a").cum_min()).collect()["a"],
501
pl.Series("a", [1, 1, 1, 1], dtype=dtype),
502
)
503
assert_series_equal(
504
ldf.select(pl.col("a").cum_max()).collect()["a"],
505
pl.Series("a", [1, 2, 3, 3], dtype=dtype),
506
)
507
508
expected_dtype = (
509
pl.Int64 if dtype in [pl.Int8, pl.Int16, pl.UInt8, pl.UInt16] else dtype
510
)
511
assert_series_equal(
512
ldf.select(pl.col("a").cum_sum()).collect()["a"],
513
pl.Series("a", [1, 3, 6, 8], dtype=expected_dtype),
514
)
515
516
expected_dtype = (
517
pl.Int64
518
if dtype in [pl.Int8, pl.Int16, pl.Int32, pl.UInt8, pl.UInt16, pl.UInt32]
519
else dtype
520
)
521
assert_series_equal(
522
ldf.select(pl.col("a").cum_prod()).collect()["a"],
523
pl.Series("a", [1, 2, 6, 12], dtype=expected_dtype),
524
)
525
526
527
def test_ceil() -> None:
528
ldf = pl.LazyFrame({"a": [1.8, 1.2, 3.0]})
529
result = ldf.select(pl.col("a").ceil()).collect()
530
assert_frame_equal(result, pl.DataFrame({"a": [2.0, 2.0, 3.0]}))
531
532
ldf = pl.LazyFrame({"a": [1, 2, 3]})
533
result = ldf.select(pl.col("a").ceil()).collect()
534
assert_frame_equal(ldf.collect(), result)
535
536
537
def test_floor() -> None:
538
ldf = pl.LazyFrame({"a": [1.8, 1.2, 3.0]})
539
result = ldf.select(pl.col("a").floor()).collect()
540
assert_frame_equal(result, pl.DataFrame({"a": [1.0, 1.0, 3.0]}))
541
542
ldf = pl.LazyFrame({"a": [1, 2, 3]})
543
result = ldf.select(pl.col("a").floor()).collect()
544
assert_frame_equal(ldf.collect(), result)
545
546
547
@pytest.mark.parametrize(
548
("n", "ndigits", "expected"),
549
[
550
(1.005, 2, 1.0),
551
(1234.00000254495, 10, 1234.000002545),
552
(1835.665, 2, 1835.67),
553
(-1835.665, 2, -1835.67),
554
(1.27499, 2, 1.27),
555
(123.45678, 2, 123.46),
556
(1254, 2, 1254.0),
557
(1254, 0, 1254.0),
558
(123.55, 0, 124.0),
559
(123.55, 1, 123.6),
560
(-1.23456789, 6, -1.234568),
561
(1.0e-5, 5, 0.00001),
562
(1.0e-20, 20, 1e-20),
563
(1.0e20, 2, 100000000000000000000.0),
564
],
565
)
566
@pytest.mark.parametrize("dtype", FLOAT_DTYPES)
567
def test_round(n: float, ndigits: int, expected: float, dtype: pl.DataType) -> None:
568
ldf = pl.LazyFrame({"value": [n]}, schema_overrides={"value": dtype})
569
assert_series_equal(
570
ldf.select(pl.col("value").round(decimals=ndigits)).collect().to_series(),
571
pl.Series("value", [expected], dtype=dtype),
572
)
573
574
575
def test_dot() -> None:
576
ldf = pl.LazyFrame({"a": [1.8, 1.2, 3.0], "b": [3.2, 1, 2]}).select(
577
pl.col("a").dot(pl.col("b"))
578
)
579
assert cast(float, ldf.collect().item()) == 12.96
580
581
582
def test_sort() -> None:
583
ldf = pl.LazyFrame({"a": [1, 2, 3, 2]}).select(pl.col("a").sort())
584
assert_series_equal(ldf.collect()["a"], pl.Series("a", [1, 2, 2, 3]))
585
586
587
def test_custom_group_by() -> None:
588
ldf = pl.LazyFrame({"a": [1, 2, 1, 1], "b": ["a", "b", "c", "c"]})
589
out = (
590
ldf.group_by("b", maintain_order=True)
591
.agg(
592
[
593
pl.col("a")
594
.implode()
595
.map_elements(lambda x: x.sum(), return_dtype=pl.Int64)
596
]
597
)
598
.collect()
599
)
600
assert out.rows() == [("a", 1), ("b", 2), ("c", 2)]
601
602
603
def test_lazy_columns() -> None:
604
lf = pl.LazyFrame(
605
{
606
"a": [1],
607
"b": [1],
608
"c": [1],
609
}
610
)
611
assert lf.select("a", "c").collect_schema().names() == ["a", "c"]
612
613
614
def test_cast_frame() -> None:
615
lf = pl.LazyFrame(
616
{
617
"a": [1.0, 2.5, 3.0],
618
"b": [4, 5, None],
619
"c": [True, False, True],
620
"d": [date(2020, 1, 2), date(2021, 3, 4), date(2022, 5, 6)],
621
}
622
)
623
624
# cast via col:dtype map
625
assert lf.cast(
626
dtypes={"b": pl.Float32, "c": pl.String, "d": pl.Datetime("ms")}
627
).collect_schema() == {
628
"a": pl.Float64,
629
"b": pl.Float32,
630
"c": pl.String,
631
"d": pl.Datetime("ms"),
632
}
633
634
# cast via selector:dtype map
635
lfc = lf.cast(
636
{
637
cs.float(): pl.UInt8,
638
cs.integer(): pl.Int32,
639
cs.temporal(): pl.String,
640
}
641
)
642
assert lfc.collect_schema() == {
643
"a": pl.UInt8,
644
"b": pl.Int32,
645
"c": pl.Boolean,
646
"d": pl.String,
647
}
648
assert lfc.collect().rows() == [
649
(1, 4, True, "2020-01-02"),
650
(2, 5, False, "2021-03-04"),
651
(3, None, True, "2022-05-06"),
652
]
653
654
# cast all fields to a single type
655
result = lf.cast(pl.String)
656
expected = pl.LazyFrame(
657
{
658
"a": ["1.0", "2.5", "3.0"],
659
"b": ["4", "5", None],
660
"c": ["true", "false", "true"],
661
"d": ["2020-01-02", "2021-03-04", "2022-05-06"],
662
}
663
)
664
assert_frame_equal(result, expected)
665
666
# test 'strict' mode
667
lf = pl.LazyFrame({"a": [1000, 2000, 3000]})
668
669
with pytest.raises(InvalidOperationError, match="conversion .* failed"):
670
lf.cast(pl.UInt8).collect()
671
672
assert lf.cast(pl.UInt8, strict=False).collect().rows() == [
673
(None,),
674
(None,),
675
(None,),
676
]
677
678
679
def test_interpolate() -> None:
680
df = pl.DataFrame({"a": [1, None, 3]})
681
assert df.select(pl.col("a").interpolate())["a"].to_list() == [1, 2, 3]
682
assert df["a"].interpolate().to_list() == [1, 2, 3]
683
assert df.interpolate()["a"].to_list() == [1, 2, 3]
684
assert df.lazy().interpolate().collect()["a"].to_list() == [1, 2, 3]
685
686
687
def test_fill_nan() -> None:
688
df = pl.DataFrame({"a": [1.0, np.nan, 3.0]})
689
assert_series_equal(df.fill_nan(2.0)["a"], pl.Series("a", [1.0, 2.0, 3.0]))
690
assert_series_equal(
691
df.lazy().fill_nan(2.0).collect()["a"], pl.Series("a", [1.0, 2.0, 3.0])
692
)
693
assert_series_equal(
694
df.lazy().fill_nan(None).collect()["a"], pl.Series("a", [1.0, None, 3.0])
695
)
696
assert_series_equal(
697
df.select(pl.col("a").fill_nan(2))["a"], pl.Series("a", [1.0, 2.0, 3.0])
698
)
699
# nearest
700
assert pl.Series([None, 1, None, None, None, -8, None, None, 10]).interpolate(
701
method="nearest"
702
).to_list() == [None, 1, 1, -8, -8, -8, -8, 10, 10]
703
704
705
def test_fill_null() -> None:
706
df = pl.DataFrame({"a": [1.0, None, 3.0]})
707
708
assert df.select([pl.col("a").fill_null(strategy="min")])["a"][1] == 1.0
709
assert df.lazy().fill_null(2).collect()["a"].to_list() == [1.0, 2.0, 3.0]
710
711
with pytest.raises(ValueError, match="must specify either"):
712
df.fill_null()
713
with pytest.raises(ValueError, match="cannot specify both"):
714
df.fill_null(value=3.0, strategy="max")
715
with pytest.raises(ValueError, match="can only specify `limit`"):
716
df.fill_null(strategy="max", limit=2)
717
718
719
def test_backward_fill() -> None:
720
ldf = pl.LazyFrame({"a": [1.0, None, 3.0]})
721
col_a_backward_fill = ldf.select(
722
[pl.col("a").fill_null(strategy="backward")]
723
).collect()["a"]
724
assert_series_equal(col_a_backward_fill, pl.Series("a", [1, 3, 3]).cast(pl.Float64))
725
726
727
def test_rolling(fruits_cars: pl.DataFrame) -> None:
728
ldf = fruits_cars.lazy()
729
out = ldf.select(
730
pl.col("A").rolling_min(3, min_samples=1).alias("1"),
731
pl.col("A").rolling_min(3).alias("1b"),
732
pl.col("A").rolling_mean(3, min_samples=1).alias("2"),
733
pl.col("A").rolling_mean(3).alias("2b"),
734
pl.col("A").rolling_max(3, min_samples=1).alias("3"),
735
pl.col("A").rolling_max(3).alias("3b"),
736
pl.col("A").rolling_sum(3, min_samples=1).alias("4"),
737
pl.col("A").rolling_sum(3).alias("4b"),
738
# below we use .round purely for the ability to do assert frame equality
739
pl.col("A").rolling_std(3).round(1).alias("std"),
740
pl.col("A").rolling_var(3).round(1).alias("var"),
741
)
742
743
assert_frame_equal(
744
out.collect(),
745
pl.DataFrame(
746
{
747
"1": [1, 1, 1, 2, 3],
748
"1b": [None, None, 1, 2, 3],
749
"2": [1.0, 1.5, 2.0, 3.0, 4.0],
750
"2b": [None, None, 2.0, 3.0, 4.0],
751
"3": [1, 2, 3, 4, 5],
752
"3b": [None, None, 3, 4, 5],
753
"4": [1, 3, 6, 9, 12],
754
"4b": [None, None, 6, 9, 12],
755
"std": [None, None, 1.0, 1.0, 1.0],
756
"var": [None, None, 1.0, 1.0, 1.0],
757
}
758
),
759
)
760
761
out_single_val_variance = ldf.select(
762
pl.col("A").rolling_std(3, min_samples=1).round(decimals=4).alias("std"),
763
pl.col("A").rolling_var(3, min_samples=1).round(decimals=1).alias("var"),
764
).collect()
765
766
assert cast(float, out_single_val_variance[0, "std"]) is None
767
assert cast(float, out_single_val_variance[0, "var"]) is None
768
769
770
def test_arr_namespace(fruits_cars: pl.DataFrame) -> None:
771
ldf = fruits_cars.lazy()
772
out = ldf.select(
773
"fruits",
774
pl.col("B")
775
.over("fruits", mapping_strategy="join")
776
.list.min()
777
.alias("B_by_fruits_min1"),
778
pl.col("B")
779
.min()
780
.over("fruits", mapping_strategy="join")
781
.alias("B_by_fruits_min2"),
782
pl.col("B")
783
.over("fruits", mapping_strategy="join")
784
.list.max()
785
.alias("B_by_fruits_max1"),
786
pl.col("B")
787
.max()
788
.over("fruits", mapping_strategy="join")
789
.alias("B_by_fruits_max2"),
790
pl.col("B")
791
.over("fruits", mapping_strategy="join")
792
.list.sum()
793
.alias("B_by_fruits_sum1"),
794
pl.col("B")
795
.sum()
796
.over("fruits", mapping_strategy="join")
797
.alias("B_by_fruits_sum2"),
798
pl.col("B")
799
.over("fruits", mapping_strategy="join")
800
.list.mean()
801
.alias("B_by_fruits_mean1"),
802
pl.col("B")
803
.mean()
804
.over("fruits", mapping_strategy="join")
805
.alias("B_by_fruits_mean2"),
806
)
807
expected = pl.DataFrame(
808
{
809
"fruits": ["banana", "banana", "apple", "apple", "banana"],
810
"B_by_fruits_min1": [1, 1, 2, 2, 1],
811
"B_by_fruits_min2": [1, 1, 2, 2, 1],
812
"B_by_fruits_max1": [5, 5, 3, 3, 5],
813
"B_by_fruits_max2": [5, 5, 3, 3, 5],
814
"B_by_fruits_sum1": [10, 10, 5, 5, 10],
815
"B_by_fruits_sum2": [10, 10, 5, 5, 10],
816
"B_by_fruits_mean1": [
817
3.3333333333333335,
818
3.3333333333333335,
819
2.5,
820
2.5,
821
3.3333333333333335,
822
],
823
"B_by_fruits_mean2": [
824
3.3333333333333335,
825
3.3333333333333335,
826
2.5,
827
2.5,
828
3.3333333333333335,
829
],
830
}
831
)
832
assert_frame_equal(out.collect(), expected)
833
834
835
def test_arithmetic() -> None:
836
ldf = pl.LazyFrame({"a": [1, 2, 3]})
837
838
out = ldf.select(
839
(pl.col("a") % 2).alias("1"),
840
(2 % pl.col("a")).alias("2"),
841
(1 // pl.col("a")).alias("3"),
842
(1 * pl.col("a")).alias("4"),
843
(1 + pl.col("a")).alias("5"),
844
(1 - pl.col("a")).alias("6"),
845
(pl.col("a") // 2).alias("7"),
846
(pl.col("a") * 2).alias("8"),
847
(pl.col("a") + 2).alias("9"),
848
(pl.col("a") - 2).alias("10"),
849
(-pl.col("a")).alias("11"),
850
)
851
expected = pl.DataFrame(
852
{
853
"1": [1, 0, 1],
854
"2": [0, 0, 2],
855
"3": [1, 0, 0],
856
"4": [1, 2, 3],
857
"5": [2, 3, 4],
858
"6": [0, -1, -2],
859
"7": [0, 1, 1],
860
"8": [2, 4, 6],
861
"9": [3, 4, 5],
862
"10": [-1, 0, 1],
863
"11": [-1, -2, -3],
864
}
865
)
866
assert_frame_equal(out.collect(), expected)
867
868
869
def test_float_floor_divide() -> None:
870
x = 10.4
871
step = 0.5
872
ldf = pl.LazyFrame({"x": [x]})
873
ldf_res = ldf.with_columns(pl.col("x") // step).collect().item()
874
assert ldf_res == x // step
875
876
877
def test_argminmax() -> None:
878
ldf = pl.LazyFrame({"a": [1, 2, 3, 4, 5], "b": [1, 1, 2, 2, 2]})
879
out = ldf.select(
880
pl.col("a").arg_min().alias("min"),
881
pl.col("a").arg_max().alias("max"),
882
).collect()
883
assert out["max"][0] == 4
884
assert out["min"][0] == 0
885
886
out = (
887
ldf.group_by("b", maintain_order=True)
888
.agg([pl.col("a").arg_min().alias("min"), pl.col("a").arg_max().alias("max")])
889
.collect()
890
)
891
assert out["max"][0] == 1
892
assert out["min"][0] == 0
893
894
895
def test_limit(fruits_cars: pl.DataFrame) -> None:
896
assert_frame_equal(fruits_cars.lazy().limit(1).collect(), fruits_cars[0, :])
897
898
899
def test_head(fruits_cars: pl.DataFrame) -> None:
900
assert_frame_equal(fruits_cars.lazy().head(2).collect(), fruits_cars[:2, :])
901
902
903
def test_tail(fruits_cars: pl.DataFrame) -> None:
904
assert_frame_equal(fruits_cars.lazy().tail(2).collect(), fruits_cars[3:, :])
905
906
907
def test_last(fruits_cars: pl.DataFrame) -> None:
908
result = fruits_cars.lazy().last().collect()
909
expected = fruits_cars[(len(fruits_cars) - 1) :, :]
910
assert_frame_equal(result, expected)
911
912
913
def test_first(fruits_cars: pl.DataFrame) -> None:
914
assert_frame_equal(fruits_cars.lazy().first().collect(), fruits_cars[0, :])
915
916
917
def test_join_suffix() -> None:
918
df_left = pl.DataFrame(
919
{
920
"a": ["a", "b", "a", "z"],
921
"b": [1, 2, 3, 4],
922
"c": [6, 5, 4, 3],
923
}
924
)
925
df_right = pl.DataFrame(
926
{
927
"a": ["b", "c", "b", "a"],
928
"b": [0, 3, 9, 6],
929
"c": [1, 0, 2, 1],
930
}
931
)
932
out = df_left.join(df_right, on="a", suffix="_bar")
933
assert out.columns == ["a", "b", "c", "b_bar", "c_bar"]
934
out = df_left.lazy().join(df_right.lazy(), on="a", suffix="_bar").collect()
935
assert out.columns == ["a", "b", "c", "b_bar", "c_bar"]
936
937
938
@pytest.mark.may_fail_cloud # reason: no
939
def test_collect_unexpected_kwargs(df: pl.DataFrame) -> None:
940
with pytest.raises(TypeError, match="unexpected keyword argument"):
941
df.lazy().collect(common_subexpr_elim=False) # type: ignore[call-overload]
942
943
944
def test_spearman_corr() -> None:
945
ldf = pl.LazyFrame(
946
{
947
"era": [1, 1, 1, 2, 2, 2],
948
"prediction": [2, 4, 5, 190, 1, 4],
949
"target": [1, 3, 2, 1, 43, 3],
950
}
951
)
952
953
out = (
954
ldf.group_by("era", maintain_order=True).agg(
955
pl.corr(pl.col("prediction"), pl.col("target"), method="spearman").alias(
956
"c"
957
),
958
)
959
).collect()["c"]
960
assert np.isclose(out[0], 0.5)
961
assert np.isclose(out[1], -1.0)
962
963
# we can also pass in column names directly
964
out = (
965
ldf.group_by("era", maintain_order=True).agg(
966
pl.corr("prediction", "target", method="spearman").alias("c"),
967
)
968
).collect()["c"]
969
assert np.isclose(out[0], 0.5)
970
assert np.isclose(out[1], -1.0)
971
972
973
def test_spearman_corr_ties() -> None:
974
"""In Spearman correlation, ranks are computed using the average method ."""
975
df = pl.DataFrame({"a": [1, 1, 1, 2, 3, 7, 4], "b": [4, 3, 2, 2, 4, 3, 1]})
976
977
result = df.select(
978
pl.corr("a", "b", method="spearman").alias("a1"),
979
pl.corr(pl.col("a").rank("min"), pl.col("b").rank("min")).alias("a2"),
980
pl.corr(pl.col("a").rank(), pl.col("b").rank()).alias("a3"),
981
)
982
expected = pl.DataFrame(
983
[
984
pl.Series("a1", [-0.19048482943986483], dtype=pl.Float64),
985
pl.Series("a2", [-0.17223653586587362], dtype=pl.Float64),
986
pl.Series("a3", [-0.19048482943986483], dtype=pl.Float64),
987
]
988
)
989
assert_frame_equal(result, expected)
990
991
992
def test_pearson_corr() -> None:
993
ldf = pl.LazyFrame(
994
{
995
"era": [1, 1, 1, 2, 2, 2],
996
"prediction": [2, 4, 5, 190, 1, 4],
997
"target": [1, 3, 2, 1, 43, 3],
998
}
999
)
1000
1001
out = (
1002
ldf.group_by("era", maintain_order=True).agg(
1003
pl.corr(
1004
pl.col("prediction"),
1005
pl.col("target"),
1006
method="pearson",
1007
).alias("c"),
1008
)
1009
).collect()["c"]
1010
assert out.to_list() == pytest.approx([0.6546536707079772, -5.477514993831792e-1])
1011
1012
# we can also pass in column names directly
1013
out = (
1014
ldf.group_by("era", maintain_order=True).agg(
1015
pl.corr("prediction", "target", method="pearson").alias("c"),
1016
)
1017
).collect()["c"]
1018
assert out.to_list() == pytest.approx([0.6546536707079772, -5.477514993831792e-1])
1019
1020
1021
def test_null_count() -> None:
1022
lf = pl.LazyFrame({"a": [1, 2, None, 2], "b": [None, 3, None, 3]})
1023
assert lf.null_count().collect().rows() == [(1, 2)]
1024
1025
1026
def test_lazy_concat(df: pl.DataFrame) -> None:
1027
shape = df.shape
1028
shape = (shape[0] * 2, shape[1])
1029
1030
out = pl.concat([df.lazy(), df.lazy()]).collect()
1031
assert out.shape == shape
1032
assert_frame_equal(out, df.vstack(df))
1033
1034
1035
def test_self_join() -> None:
1036
# 2720
1037
ldf = pl.from_dict(
1038
data={
1039
"employee_id": [100, 101, 102],
1040
"employee_name": ["James", "Alice", "Bob"],
1041
"manager_id": [None, 100, 101],
1042
}
1043
).lazy()
1044
1045
out = (
1046
ldf.join(other=ldf, left_on="manager_id", right_on="employee_id", how="left")
1047
.select(
1048
pl.col("employee_id"),
1049
pl.col("employee_name"),
1050
pl.col("employee_name_right").alias("manager_name"),
1051
)
1052
.collect()
1053
)
1054
assert set(out.rows()) == {
1055
(100, "James", None),
1056
(101, "Alice", "James"),
1057
(102, "Bob", "Alice"),
1058
}
1059
1060
1061
def test_group_lengths() -> None:
1062
ldf = pl.LazyFrame(
1063
{
1064
"group": ["A", "A", "A", "B", "B", "B", "B"],
1065
"id": ["1", "1", "2", "3", "4", "3", "5"],
1066
}
1067
)
1068
1069
result = ldf.group_by(["group"], maintain_order=True).agg(
1070
[
1071
(pl.col("id").unique_counts() / pl.col("id").len())
1072
.sum()
1073
.alias("unique_counts_sum"),
1074
pl.col("id").unique().len().alias("unique_len"),
1075
]
1076
)
1077
expected = pl.DataFrame(
1078
{
1079
"group": ["A", "B"],
1080
"unique_counts_sum": [1.0, 1.0],
1081
"unique_len": [2, 3],
1082
},
1083
schema_overrides={"unique_len": pl.UInt32},
1084
)
1085
assert_frame_equal(result.collect(), expected)
1086
1087
1088
def test_quantile_filtered_agg() -> None:
1089
assert (
1090
pl.LazyFrame(
1091
{
1092
"group": [0, 0, 0, 0, 1, 1, 1, 1],
1093
"value": [1, 2, 3, 4, 1, 2, 3, 4],
1094
}
1095
)
1096
.group_by("group")
1097
.agg(pl.col("value").filter(pl.col("value") < 2).quantile(0.5))
1098
.collect()["value"]
1099
.to_list()
1100
) == [1.0, 1.0]
1101
1102
1103
def test_predicate_count_vstack() -> None:
1104
l1 = pl.LazyFrame(
1105
{
1106
"k": ["x", "y"],
1107
"v": [3, 2],
1108
}
1109
)
1110
l2 = pl.LazyFrame(
1111
{
1112
"k": ["x", "y"],
1113
"v": [5, 7],
1114
}
1115
)
1116
assert pl.concat([l1, l2]).filter(pl.len().over("k") == 2).collect()[
1117
"v"
1118
].to_list() == [3, 2, 5, 7]
1119
1120
1121
def test_lazy_method() -> None:
1122
# We want to support `.lazy()` on a Lazy DataFrame to allow more generic user code.
1123
df = pl.DataFrame({"a": [1, 1, 2, 2, 3, 3], "b": [1, 2, 3, 4, 5, 6]})
1124
assert_frame_equal(df.lazy(), df.lazy().lazy())
1125
1126
1127
def test_update_schema_after_projection_pd_t4157() -> None:
1128
ldf = pl.LazyFrame({"c0": [], "c1": [], "c2": []}).rename({"c2": "c2_"})
1129
assert ldf.drop("c2_").select(pl.col("c0")).collect().columns == ["c0"]
1130
1131
1132
def test_type_coercion_unknown_4190() -> None:
1133
df = (
1134
pl.LazyFrame({"a": [1, 2, 3], "b": [1, 2, 3]}).with_columns(
1135
pl.col("a") & pl.col("a").fill_null(True)
1136
)
1137
).collect()
1138
assert df.shape == (3, 2)
1139
assert df.rows() == [(1, 1), (2, 2), (3, 3)]
1140
1141
1142
def test_lazy_cache_same_key() -> None:
1143
ldf = pl.LazyFrame({"a": [1, 2, 3], "b": [3, 4, 5], "c": ["x", "y", "z"]})
1144
1145
# these have the same schema, but should not be used by cache as they are different
1146
add_node = ldf.select([(pl.col("a") + pl.col("b")).alias("a"), pl.col("c")]).cache()
1147
mult_node = ldf.select((pl.col("a") * pl.col("b")).alias("a"), pl.col("c")).cache()
1148
1149
result = mult_node.join(add_node, on="c", suffix="_mult").select(
1150
(pl.col("a") - pl.col("a_mult")).alias("a"), pl.col("c")
1151
)
1152
expected = pl.LazyFrame({"a": [-1, 2, 7], "c": ["x", "y", "z"]})
1153
assert_frame_equal(result, expected, check_row_order=False)
1154
1155
1156
@pytest.mark.may_fail_cloud # reason: inspects logs
1157
@pytest.mark.may_fail_auto_streaming
1158
def test_lazy_cache_hit(monkeypatch: Any, capfd: Any) -> None:
1159
monkeypatch.setenv("POLARS_VERBOSE", "1")
1160
1161
ldf = pl.LazyFrame({"a": [1, 2, 3], "b": [3, 4, 5], "c": ["x", "y", "z"]})
1162
add_node = ldf.select([(pl.col("a") + pl.col("b")).alias("a"), pl.col("c")]).cache()
1163
1164
result = add_node.join(add_node, on="c", suffix="_mult").select(
1165
(pl.col("a") - pl.col("a_mult")).alias("a"), pl.col("c")
1166
)
1167
expected = pl.LazyFrame({"a": [0, 0, 0], "c": ["x", "y", "z"]})
1168
assert_frame_equal(result, expected, check_row_order=False)
1169
1170
(_, err) = capfd.readouterr()
1171
assert "CACHE HIT" in err
1172
1173
1174
@pytest.mark.may_fail_cloud # reason: impure udf
1175
def test_lazy_cache_parallel() -> None:
1176
df_evaluated = 0
1177
1178
def map_df(df: pl.DataFrame) -> pl.DataFrame:
1179
nonlocal df_evaluated
1180
df_evaluated += 1
1181
return df
1182
1183
df = pl.LazyFrame({"a": [1]}).map_batches(map_df).cache()
1184
1185
df = pl.concat(
1186
[
1187
df.select(pl.col("a") + 1),
1188
df.select(pl.col("a") + 2),
1189
df.select(pl.col("a") + 3),
1190
],
1191
parallel=True,
1192
)
1193
1194
assert df_evaluated == 0
1195
1196
df.collect()
1197
assert df_evaluated == 1
1198
1199
1200
@pytest.mark.may_fail_cloud # reason: impure udf
1201
def test_lazy_cache_nested_parallel() -> None:
1202
df_inner_evaluated = 0
1203
df_outer_evaluated = 0
1204
1205
def map_df_inner(df: pl.DataFrame) -> pl.DataFrame:
1206
nonlocal df_inner_evaluated
1207
df_inner_evaluated += 1
1208
return df
1209
1210
def map_df_outer(df: pl.DataFrame) -> pl.DataFrame:
1211
nonlocal df_outer_evaluated
1212
df_outer_evaluated += 1
1213
return df
1214
1215
df_inner = pl.LazyFrame({"a": [1]}).map_batches(map_df_inner).cache()
1216
df_outer = df_inner.select(pl.col("a") + 1).map_batches(map_df_outer).cache()
1217
1218
df = pl.concat(
1219
[
1220
df_outer.select(pl.col("a") + 2),
1221
df_outer.select(pl.col("a") + 3),
1222
],
1223
parallel=True,
1224
)
1225
1226
assert df_inner_evaluated == 0
1227
assert df_outer_evaluated == 0
1228
1229
df.collect()
1230
assert df_inner_evaluated == 1
1231
assert df_outer_evaluated == 1
1232
1233
1234
def test_quadratic_behavior_4736() -> None:
1235
# no assert; if this function does not stall our tests it has passed!
1236
lf = pl.LazyFrame(schema=list(ascii_letters))
1237
lf.select(reduce(add, (pl.col(c) for c in lf.collect_schema())))
1238
1239
1240
@pytest.mark.parametrize("input_dtype", [pl.Int64, pl.Float64])
1241
def test_from_epoch(input_dtype: PolarsDataType) -> None:
1242
ldf = pl.LazyFrame(
1243
[
1244
pl.Series("timestamp_d", [13285]).cast(input_dtype),
1245
pl.Series("timestamp_s", [1147880044]).cast(input_dtype),
1246
pl.Series("timestamp_ms", [1147880044 * 1_000]).cast(input_dtype),
1247
pl.Series("timestamp_us", [1147880044 * 1_000_000]).cast(input_dtype),
1248
pl.Series("timestamp_ns", [1147880044 * 1_000_000_000]).cast(input_dtype),
1249
]
1250
)
1251
1252
exp_dt = datetime(2006, 5, 17, 15, 34, 4)
1253
expected = pl.DataFrame(
1254
[
1255
pl.Series("timestamp_d", [date(2006, 5, 17)]),
1256
pl.Series("timestamp_s", [exp_dt]), # s is no Polars dtype, defaults to us
1257
pl.Series("timestamp_ms", [exp_dt]).cast(pl.Datetime("ms")),
1258
pl.Series("timestamp_us", [exp_dt]), # us is Polars Datetime default
1259
pl.Series("timestamp_ns", [exp_dt]).cast(pl.Datetime("ns")),
1260
]
1261
)
1262
1263
ldf_result = ldf.select(
1264
pl.from_epoch(pl.col("timestamp_d"), time_unit="d"),
1265
pl.from_epoch(pl.col("timestamp_s"), time_unit="s"),
1266
pl.from_epoch(pl.col("timestamp_ms"), time_unit="ms"),
1267
pl.from_epoch(pl.col("timestamp_us"), time_unit="us"),
1268
pl.from_epoch(pl.col("timestamp_ns"), time_unit="ns"),
1269
).collect()
1270
1271
assert_frame_equal(ldf_result, expected)
1272
1273
ts_col = pl.col("timestamp_s")
1274
with pytest.raises(ValueError):
1275
_ = ldf.select(pl.from_epoch(ts_col, time_unit="s2")) # type: ignore[call-overload]
1276
1277
1278
def test_from_epoch_str() -> None:
1279
ldf = pl.LazyFrame(
1280
[
1281
pl.Series("timestamp_ms", [1147880044 * 1_000]).cast(pl.String),
1282
pl.Series("timestamp_us", [1147880044 * 1_000_000]).cast(pl.String),
1283
]
1284
)
1285
1286
with pytest.raises(InvalidOperationError):
1287
ldf.select(
1288
pl.from_epoch(pl.col("timestamp_ms"), time_unit="ms"),
1289
pl.from_epoch(pl.col("timestamp_us"), time_unit="us"),
1290
).collect()
1291
1292
1293
def test_cum_agg_types() -> None:
1294
ldf = pl.LazyFrame({"a": [1, 2], "b": [True, False], "c": [1.3, 2.4]})
1295
cum_sum_lf = ldf.select(
1296
pl.col("a").cum_sum(),
1297
pl.col("b").cum_sum(),
1298
pl.col("c").cum_sum(),
1299
)
1300
assert cum_sum_lf.collect_schema()["a"] == pl.Int64
1301
assert cum_sum_lf.collect_schema()["b"] == pl.UInt32
1302
assert cum_sum_lf.collect_schema()["c"] == pl.Float64
1303
collected_cumsum_lf = cum_sum_lf.collect()
1304
assert collected_cumsum_lf.schema == cum_sum_lf.collect_schema()
1305
1306
cum_prod_lf = ldf.select(
1307
pl.col("a").cast(pl.UInt64).cum_prod(),
1308
pl.col("b").cum_prod(),
1309
pl.col("c").cum_prod(),
1310
)
1311
assert cum_prod_lf.collect_schema()["a"] == pl.UInt64
1312
assert cum_prod_lf.collect_schema()["b"] == pl.Int64
1313
assert cum_prod_lf.collect_schema()["c"] == pl.Float64
1314
collected_cum_prod_lf = cum_prod_lf.collect()
1315
assert collected_cum_prod_lf.schema == cum_prod_lf.collect_schema()
1316
1317
1318
def test_compare_schema_between_lazy_and_eager_6904() -> None:
1319
float32_df = pl.DataFrame({"x": pl.Series(values=[], dtype=pl.Float32)})
1320
eager_result = float32_df.select(pl.col("x").sqrt()).select(pl.col(pl.Float32))
1321
lazy_result = (
1322
float32_df.lazy()
1323
.select(pl.col("x").sqrt())
1324
.select(pl.col(pl.Float32))
1325
.collect()
1326
)
1327
assert eager_result.shape == lazy_result.shape
1328
1329
eager_result = float32_df.select(pl.col("x").pow(2)).select(pl.col(pl.Float32))
1330
lazy_result = (
1331
float32_df.lazy()
1332
.select(pl.col("x").pow(2))
1333
.select(pl.col(pl.Float32))
1334
.collect()
1335
)
1336
assert eager_result.shape == lazy_result.shape
1337
1338
int32_df = pl.DataFrame({"x": pl.Series(values=[], dtype=pl.Int32)})
1339
eager_result = int32_df.select(pl.col("x").pow(2)).select(pl.col(pl.Float64))
1340
lazy_result = (
1341
int32_df.lazy().select(pl.col("x").pow(2)).select(pl.col(pl.Float64)).collect()
1342
)
1343
assert eager_result.shape == lazy_result.shape
1344
1345
int8_df = pl.DataFrame({"x": pl.Series(values=[], dtype=pl.Int8)})
1346
eager_result = int8_df.select(pl.col("x").diff()).select(pl.col(pl.Int16))
1347
lazy_result = (
1348
int8_df.lazy().select(pl.col("x").diff()).select(pl.col(pl.Int16)).collect()
1349
)
1350
assert eager_result.shape == lazy_result.shape
1351
1352
1353
@pytest.mark.slow
1354
@pytest.mark.parametrize(
1355
"dtype",
1356
[
1357
pl.UInt8,
1358
pl.UInt16,
1359
pl.UInt32,
1360
pl.UInt64,
1361
pl.Int8,
1362
pl.Int16,
1363
pl.Int32,
1364
pl.Int64,
1365
pl.Float32,
1366
pl.Float64,
1367
],
1368
)
1369
@pytest.mark.parametrize(
1370
"func",
1371
[
1372
pl.col("x").arg_max(),
1373
pl.col("x").arg_min(),
1374
pl.col("x").max(),
1375
pl.col("x").mean(),
1376
pl.col("x").median(),
1377
pl.col("x").min(),
1378
pl.col("x").nan_max(),
1379
pl.col("x").nan_min(),
1380
pl.col("x").product(),
1381
pl.col("x").quantile(0.5),
1382
pl.col("x").std(),
1383
pl.col("x").sum(),
1384
pl.col("x").var(),
1385
],
1386
)
1387
def test_compare_aggregation_between_lazy_and_eager_6904(
1388
dtype: PolarsDataType, func: pl.Expr
1389
) -> None:
1390
df = pl.DataFrame(
1391
{
1392
"x": pl.Series(values=[1, 2, 3] * 2, dtype=dtype),
1393
"y": pl.Series(values=["a"] * 3 + ["b"] * 3),
1394
}
1395
)
1396
result_eager = df.select(func.over("y")).select("x")
1397
dtype_eager = result_eager["x"].dtype
1398
result_lazy = df.lazy().select(func.over("y")).select(pl.col(dtype_eager)).collect()
1399
assert_frame_equal(result_eager, result_lazy)
1400
1401
1402
@pytest.mark.parametrize(
1403
"comparators",
1404
[
1405
("==", pl.LazyFrame.__eq__),
1406
("!=", pl.LazyFrame.__ne__),
1407
(">", pl.LazyFrame.__gt__),
1408
("<", pl.LazyFrame.__lt__),
1409
(">=", pl.LazyFrame.__ge__),
1410
("<=", pl.LazyFrame.__le__),
1411
],
1412
)
1413
def test_lazy_comparison_operators(
1414
comparators: tuple[str, Callable[[pl.LazyFrame, Any], NoReturn]],
1415
) -> None:
1416
# we cannot compare lazy frames, so all should raise a TypeError
1417
with pytest.raises(
1418
TypeError,
1419
match=f'"{comparators[0]!r}" comparison not supported for LazyFrame objects',
1420
):
1421
comparators[1](pl.LazyFrame(), pl.LazyFrame())
1422
1423
1424
def test_lf_properties() -> None:
1425
lf = pl.LazyFrame(
1426
{
1427
"foo": [1, 2, 3],
1428
"bar": [6.0, 7.0, 8.0],
1429
"ham": ["a", "b", "c"],
1430
}
1431
)
1432
with pytest.warns(PerformanceWarning):
1433
assert lf.schema == {"foo": pl.Int64, "bar": pl.Float64, "ham": pl.String}
1434
with pytest.warns(PerformanceWarning):
1435
assert lf.columns == ["foo", "bar", "ham"]
1436
with pytest.warns(PerformanceWarning):
1437
assert lf.dtypes == [pl.Int64, pl.Float64, pl.String]
1438
with pytest.warns(PerformanceWarning):
1439
assert lf.width == 3
1440
1441
1442
def test_lf_unnest() -> None:
1443
lf = pl.DataFrame(
1444
[
1445
pl.Series(
1446
"a",
1447
[{"ab": [1, 2, 3], "ac": [3, 4, 5]}],
1448
dtype=pl.Struct({"ab": pl.List(pl.Int64), "ac": pl.List(pl.Int64)}),
1449
),
1450
pl.Series(
1451
"b",
1452
[{"ba": [5, 6, 7], "bb": [7, 8, 9]}],
1453
dtype=pl.Struct({"ba": pl.List(pl.Int64), "bb": pl.List(pl.Int64)}),
1454
),
1455
]
1456
).lazy()
1457
1458
expected = pl.DataFrame(
1459
[
1460
pl.Series("ab", [[1, 2, 3]], dtype=pl.List(pl.Int64)),
1461
pl.Series("ac", [[3, 4, 5]], dtype=pl.List(pl.Int64)),
1462
pl.Series("ba", [[5, 6, 7]], dtype=pl.List(pl.Int64)),
1463
pl.Series("bb", [[7, 8, 9]], dtype=pl.List(pl.Int64)),
1464
]
1465
)
1466
assert_frame_equal(lf.unnest("a", "b").collect(), expected)
1467
1468
1469
def test_type_coercion_cast_boolean_after_comparison() -> None:
1470
import operator
1471
1472
lf = pl.LazyFrame({"a": 1, "b": 2})
1473
1474
for op in [
1475
operator.eq,
1476
operator.ne,
1477
operator.lt,
1478
operator.le,
1479
operator.gt,
1480
operator.ge,
1481
pl.Expr.eq_missing,
1482
pl.Expr.ne_missing,
1483
]:
1484
e = op(pl.col("a"), pl.col("b")).cast(pl.Boolean).alias("o")
1485
assert "cast" not in lf.with_columns(e).explain()
1486
1487
e = op(pl.col("a"), pl.col("b")).cast(pl.Boolean).cast(pl.Boolean).alias("o")
1488
assert "cast" not in lf.with_columns(e).explain()
1489
1490
for op in [operator.and_, operator.or_, operator.xor]:
1491
e = op(pl.col("a"), pl.col("b")).cast(pl.Boolean)
1492
assert "cast" in lf.with_columns(e).explain()
1493
1494
1495
def test_unique_length_multiple_columns() -> None:
1496
lf = pl.LazyFrame(
1497
{
1498
"a": [1, 1, 1, 2, 3],
1499
"b": [100, 100, 200, 100, 300],
1500
}
1501
)
1502
assert lf.unique().select(pl.len()).collect().item() == 4
1503
1504
1505
def test_asof_cross_join() -> None:
1506
left = pl.LazyFrame({"a": [-10, 5, 10], "left_val": ["a", "b", "c"]}).with_columns(
1507
pl.col("a").set_sorted()
1508
)
1509
right = pl.LazyFrame(
1510
{"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]}
1511
).with_columns(pl.col("a").set_sorted())
1512
1513
out = left.join_asof(right, on="a").collect()
1514
assert out.shape == (3, 3)
1515
1516
1517
def test_join_bad_input_type() -> None:
1518
left = pl.LazyFrame({"a": [1, 2, 3]})
1519
right = pl.LazyFrame({"a": [1, 2, 3]})
1520
1521
with pytest.raises(
1522
TypeError,
1523
match="expected `other` .*to be a 'LazyFrame'.* not 'DataFrame'",
1524
):
1525
left.join(right.collect(), on="a") # type: ignore[arg-type]
1526
1527
with pytest.raises(
1528
TypeError,
1529
match="expected `other` .*to be a 'LazyFrame'.* not 'Series'",
1530
):
1531
left.join(pl.Series([1, 2, 3]), on="a") # type: ignore[arg-type]
1532
1533
class DummyLazyFrameSubclass(pl.LazyFrame):
1534
pass
1535
1536
a = DummyLazyFrameSubclass(left.collect())
1537
b = DummyLazyFrameSubclass(right.collect())
1538
1539
a.join(b, on="a").collect()
1540
1541
1542
def test_join_where() -> None:
1543
east = pl.LazyFrame(
1544
{
1545
"id": [100, 101, 102],
1546
"dur": [120, 140, 160],
1547
"rev": [12, 14, 16],
1548
"cores": [2, 8, 4],
1549
}
1550
)
1551
west = pl.LazyFrame(
1552
{
1553
"t_id": [404, 498, 676, 742],
1554
"time": [90, 130, 150, 170],
1555
"cost": [9, 13, 15, 16],
1556
"cores": [4, 2, 1, 4],
1557
}
1558
)
1559
out = east.join_where(
1560
west,
1561
pl.col("dur") < pl.col("time"),
1562
pl.col("rev") < pl.col("cost"),
1563
).collect()
1564
1565
expected = pl.DataFrame(
1566
{
1567
"id": [100, 100, 100, 101, 101],
1568
"dur": [120, 120, 120, 140, 140],
1569
"rev": [12, 12, 12, 14, 14],
1570
"cores": [2, 2, 2, 8, 8],
1571
"t_id": [498, 676, 742, 676, 742],
1572
"time": [130, 150, 170, 150, 170],
1573
"cost": [13, 15, 16, 15, 16],
1574
"cores_right": [2, 1, 4, 1, 4],
1575
}
1576
)
1577
1578
assert_frame_equal(out, expected)
1579
1580
1581
def test_join_where_bad_input_type() -> None:
1582
east = pl.LazyFrame(
1583
{
1584
"id": [100, 101, 102],
1585
"dur": [120, 140, 160],
1586
"rev": [12, 14, 16],
1587
"cores": [2, 8, 4],
1588
}
1589
)
1590
west = pl.LazyFrame(
1591
{
1592
"t_id": [404, 498, 676, 742],
1593
"time": [90, 130, 150, 170],
1594
"cost": [9, 13, 15, 16],
1595
"cores": [4, 2, 1, 4],
1596
}
1597
)
1598
with pytest.raises(
1599
TypeError,
1600
match="expected `other` .*to be a 'LazyFrame'.* not 'DataFrame'",
1601
):
1602
east.join_where(
1603
west.collect(), # type: ignore[arg-type]
1604
pl.col("dur") < pl.col("time"),
1605
pl.col("rev") < pl.col("cost"),
1606
)
1607
1608
with pytest.raises(
1609
TypeError,
1610
match="expected `other` .*to be a 'LazyFrame'.* not 'Series'",
1611
):
1612
east.join_where(
1613
pl.Series(west.collect()), # type: ignore[arg-type]
1614
pl.col("dur") < pl.col("time"),
1615
pl.col("rev") < pl.col("cost"),
1616
)
1617
1618
class DummyLazyFrameSubclass(pl.LazyFrame):
1619
pass
1620
1621
a = DummyLazyFrameSubclass(east.collect())
1622
b = DummyLazyFrameSubclass(west.collect())
1623
1624
a.join_where(
1625
b,
1626
pl.col("dur") < pl.col("time"),
1627
pl.col("rev") < pl.col("cost"),
1628
).collect()
1629
1630
1631
def test_cache_hit_with_proj_and_pred_pushdown() -> None:
1632
rgx = re.compile(r"CACHE\[id: (.*)\]")
1633
1634
lf = pl.LazyFrame({"a": [1, 2, 3], "b": [3, 4, 5], "c": ["x", "y", "z"]}).cache()
1635
1636
q = pl.concat([lf, lf]).select("a", "b")
1637
assert_frame_equal(
1638
q.collect(), pl.DataFrame({"a": [1, 2, 3] * 2, "b": [3, 4, 5] * 2})
1639
)
1640
e = rgx.findall(q.explain())
1641
1642
assert len(e) == 2 # there are only 2 caches
1643
assert e[0] == e[1] # all caches are the same
1644
1645
q = pl.concat([lf, lf]).filter(pl.col.a != 0)
1646
assert_frame_equal(
1647
q.collect(),
1648
pl.DataFrame(
1649
{"a": [1, 2, 3] * 2, "b": [3, 4, 5] * 2, "c": ["x", "y", "z"] * 2}
1650
),
1651
)
1652
e = rgx.findall(q.explain())
1653
1654
assert len(e) == 2 # there are only 2 caches
1655
assert e[0] == e[1] # all caches are the same
1656
1657
1658
def test_cache_hit_child_removal() -> None:
1659
df = pl.DataFrame(
1660
{
1661
"a": [1, 2, 3],
1662
}
1663
)
1664
1665
q = df.lazy().sort("a").cache()
1666
1667
q1 = pl.concat([q.unique(), q.unique()])
1668
q2 = pl.concat([q.unique(), q.unique(keep="none")])
1669
1670
e1 = q1.explain()
1671
e2 = q2.explain()
1672
1673
assert "SORT" not in e1
1674
assert "SORT" not in e2
1675
1676
rgx = re.compile(r"CACHE\[id: (.*)\]")
1677
1678
e1m = rgx.findall(e1)
1679
e2m = rgx.findall(e2)
1680
1681
assert len(e1m) == 2 # there are only 2 caches
1682
assert len(e2m) == 2 # there are only 2 caches
1683
assert e1m[0] == e1m[1] # all caches are the same
1684
assert e2m[0] == e2m[1] # all caches are the same
1685
1686
df1 = q1.collect()
1687
df2 = q2.collect()
1688
1689
assert_frame_equal(df1.head(3), df, check_row_order=False)
1690
assert_frame_equal(df1.tail(3), df, check_row_order=False)
1691
assert_frame_equal(df2.head(3), df, check_row_order=False)
1692
assert_frame_equal(df2.tail(3), df, check_row_order=False)
1693
1694