Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/streaming/test_streaming_join.py
8412 views
1
from __future__ import annotations
2
3
import typing
4
from datetime import datetime, timedelta
5
from typing import TYPE_CHECKING, Any, Literal
6
7
import hypothesis.strategies as st
8
import numpy as np
9
import pandas as pd
10
import pytest
11
from hypothesis import given, settings
12
13
import polars as pl
14
from polars._typing import AsofJoinStrategy
15
from polars.datatypes.group import (
16
FLOAT_DTYPES,
17
INTEGER_DTYPES,
18
)
19
from polars.testing import assert_frame_equal, assert_series_equal
20
from polars.testing.parametric.strategies.core import dataframes
21
22
if TYPE_CHECKING:
23
from pathlib import Path
24
25
from polars._typing import AsofJoinStrategy, JoinStrategy, MaintainOrderJoin
26
27
pytestmark = pytest.mark.xdist_group("streaming")
28
29
30
def test_streaming_full_outer_joins() -> None:
31
n = 100
32
dfa = pl.DataFrame(
33
{
34
"a": np.random.randint(0, 40, n),
35
"idx": np.arange(0, n),
36
}
37
)
38
39
n = 100
40
dfb = pl.DataFrame(
41
{
42
"a": np.random.randint(0, 40, n),
43
"idx": np.arange(0, n),
44
}
45
)
46
47
join_strategies: list[tuple[JoinStrategy, bool]] = [
48
("full", False),
49
("full", True),
50
]
51
for how, coalesce in join_strategies:
52
q = (
53
dfa.lazy()
54
.join(dfb.lazy(), on="a", how=how, coalesce=coalesce)
55
.sort(["idx"])
56
)
57
a = q.collect(engine="streaming")
58
b = q.collect(engine="in-memory")
59
assert_frame_equal(a, b, check_row_order=False)
60
61
62
def test_streaming_joins() -> None:
63
n = 100
64
dfa = pd.DataFrame(
65
{
66
"a": np.random.randint(0, 40, n),
67
"b": np.arange(0, n),
68
}
69
)
70
71
n = 100
72
dfb = pd.DataFrame(
73
{
74
"a": np.random.randint(0, 40, n),
75
"b": np.arange(0, n),
76
}
77
)
78
dfa_pl = pl.from_pandas(dfa).sort("a")
79
dfb_pl = pl.from_pandas(dfb)
80
81
join_strategies: list[Literal["inner", "left"]] = ["inner", "left"]
82
for how in join_strategies:
83
pd_result = dfa.merge(dfb, on="a", how=how)
84
pd_result.columns = pd.Index(["a", "b", "b_right"])
85
86
pl_result = (
87
dfa_pl.lazy()
88
.join(dfb_pl.lazy(), on="a", how=how)
89
.sort(["a", "b", "b_right"])
90
.collect(engine="streaming")
91
)
92
93
a = (
94
pl.from_pandas(pd_result)
95
.with_columns(pl.all().cast(int))
96
.sort(["a", "b", "b_right"])
97
)
98
assert_frame_equal(a, pl_result, check_dtypes=False)
99
100
pd_result = dfa.merge(dfb, on=["a", "b"], how=how)
101
102
pl_result = (
103
dfa_pl.lazy()
104
.join(dfb_pl.lazy(), on=["a", "b"], how=how)
105
.sort(["a", "b"])
106
.collect(engine="streaming")
107
)
108
109
# we cast to integer because pandas joins creates floats
110
a = pl.from_pandas(pd_result).with_columns(pl.all().cast(int)).sort(["a", "b"])
111
assert_frame_equal(a, pl_result, check_dtypes=False)
112
113
114
def test_streaming_cross_join_empty() -> None:
115
df1 = pl.LazyFrame(data={"col1": ["a"]})
116
117
df2 = pl.LazyFrame(
118
data={"col1": []},
119
schema={"col1": str},
120
)
121
122
out = df1.join(df2, how="cross").collect(engine="streaming")
123
assert out.shape == (0, 2)
124
assert out.columns == ["col1", "col1_right"]
125
126
127
def test_streaming_join_rechunk_12498() -> None:
128
rows = pl.int_range(0, 2)
129
130
a = pl.select(A=rows).lazy()
131
b = pl.select(B=rows).lazy()
132
133
q = a.join(b, how="cross")
134
assert q.collect(engine="streaming").sort(["B", "A"]).to_dict(as_series=False) == {
135
"A": [0, 1, 0, 1],
136
"B": [0, 0, 1, 1],
137
}
138
139
140
@pytest.mark.parametrize("maintain_order", [False, True])
141
def test_join_null_matches(maintain_order: bool) -> None:
142
# null values in joins should never find a match.
143
df_a = pl.LazyFrame(
144
{
145
"idx_a": [0, 1, 2],
146
"a": [None, 1, 2],
147
}
148
)
149
150
df_b = pl.LazyFrame(
151
{
152
"idx_b": [0, 1, 2, 3],
153
"a": [None, 2, 1, None],
154
}
155
)
156
# Semi
157
assert_series_equal(
158
df_a.join(
159
df_b,
160
on="a",
161
how="semi",
162
nulls_equal=True,
163
maintain_order="left" if maintain_order else "none",
164
).collect()["idx_a"],
165
pl.Series("idx_a", [0, 1, 2]),
166
check_order=maintain_order,
167
)
168
assert_series_equal(
169
df_a.join(
170
df_b,
171
on="a",
172
how="semi",
173
nulls_equal=False,
174
maintain_order="left" if maintain_order else "none",
175
).collect()["idx_a"],
176
pl.Series("idx_a", [1, 2]),
177
check_order=maintain_order,
178
)
179
180
# Inner
181
expected = pl.DataFrame({"idx_a": [2, 1], "a": [2, 1], "idx_b": [1, 2]})
182
assert_frame_equal(
183
df_a.join(
184
df_b,
185
on="a",
186
how="inner",
187
maintain_order="right" if maintain_order else "none",
188
).collect(),
189
expected,
190
check_row_order=maintain_order,
191
)
192
193
# Left outer
194
expected = pl.DataFrame(
195
{"idx_a": [0, 1, 2], "a": [None, 1, 2], "idx_b": [None, 2, 1]}
196
)
197
assert_frame_equal(
198
df_a.join(
199
df_b,
200
on="a",
201
how="left",
202
maintain_order="left" if maintain_order else "none",
203
).collect(),
204
expected,
205
check_row_order=maintain_order,
206
)
207
# Full outer
208
expected = pl.DataFrame(
209
{
210
"idx_a": [None, 2, 1, None, 0],
211
"a": [None, 2, 1, None, None],
212
"idx_b": [0, 1, 2, 3, None],
213
"a_right": [None, 2, 1, None, None],
214
}
215
)
216
assert_frame_equal(
217
df_a.join(
218
df_b,
219
on="a",
220
how="full",
221
maintain_order="right" if maintain_order else "none",
222
).collect(),
223
expected,
224
check_row_order=maintain_order,
225
)
226
227
228
@pytest.mark.parametrize("streaming", [False, True])
229
def test_join_null_matches_multiple_keys(streaming: bool) -> None:
230
df_a = pl.LazyFrame(
231
{
232
"a": [None, 1, 2],
233
"idx": [0, 1, 2],
234
}
235
)
236
237
df_b = pl.LazyFrame(
238
{
239
"a": [None, 2, 1, None, 1],
240
"idx": [0, 1, 2, 3, 1],
241
"c": [10, 20, 30, 40, 50],
242
}
243
)
244
245
expected = pl.DataFrame({"a": [1], "idx": [1], "c": [50]})
246
assert_frame_equal(
247
df_a.join(df_b, on=["a", "idx"], how="inner").collect(
248
engine="streaming" if streaming else "in-memory"
249
),
250
expected,
251
check_row_order=False,
252
)
253
expected = pl.DataFrame(
254
{"a": [None, 1, 2], "idx": [0, 1, 2], "c": [None, 50, None]}
255
)
256
assert_frame_equal(
257
df_a.join(df_b, on=["a", "idx"], how="left").collect(
258
engine="streaming" if streaming else "in-memory"
259
),
260
expected,
261
check_row_order=False,
262
)
263
264
expected = pl.DataFrame(
265
{
266
"a": [None, None, None, None, None, 1, 2],
267
"idx": [None, None, None, None, 0, 1, 2],
268
"a_right": [None, 2, 1, None, None, 1, None],
269
"idx_right": [0, 1, 2, 3, None, 1, None],
270
"c": [10, 20, 30, 40, None, 50, None],
271
}
272
)
273
assert_frame_equal(
274
df_a.join(df_b, on=["a", "idx"], how="full").sort("a").collect(),
275
expected,
276
check_row_order=False,
277
)
278
279
280
def test_streaming_join_and_union() -> None:
281
a = pl.LazyFrame({"a": [1, 2]})
282
283
b = pl.LazyFrame({"a": [1, 2, 4, 8]})
284
285
c = a.join(b, on="a", maintain_order="left_right")
286
# The join node latest ensures that the dispatcher
287
# needs to replace placeholders in unions.
288
q = pl.concat([a, b, c])
289
290
out = q.collect(engine="streaming")
291
assert_frame_equal(out, q.collect(engine="in-memory"))
292
assert out.to_series().to_list() == [1, 2, 1, 2, 4, 8, 1, 2]
293
294
295
def test_non_coalescing_streaming_left_join() -> None:
296
df1 = pl.LazyFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
297
298
df2 = pl.LazyFrame({"a": [1, 2], "c": ["j", "i"]})
299
300
q = df1.join(df2, on="a", how="left", coalesce=False)
301
assert_frame_equal(
302
q.collect(engine="streaming"),
303
pl.DataFrame(
304
{
305
"a": [1, 2, 3],
306
"b": ["a", "b", "c"],
307
"a_right": [1, 2, None],
308
"c": ["j", "i", None],
309
}
310
),
311
check_row_order=False,
312
)
313
314
315
@pytest.mark.write_disk
316
def test_streaming_outer_join_partial_flush(tmp_path: Path) -> None:
317
data = {
318
"value_at": [datetime(2024, i + 1, 1) for i in range(6)],
319
"value": list(range(6)),
320
}
321
322
parquet_path = tmp_path / "data.parquet"
323
pl.DataFrame(data=data).write_parquet(parquet_path)
324
325
other_parquet_path = tmp_path / "data2.parquet"
326
pl.DataFrame(data=data).write_parquet(other_parquet_path)
327
328
lf1 = pl.scan_parquet(other_parquet_path)
329
lf2 = pl.scan_parquet(parquet_path)
330
331
join_cols = set(lf1.collect_schema()).intersection(set(lf2.collect_schema()))
332
final_lf = lf1.join(lf2, on=list(join_cols), how="full", coalesce=True)
333
334
assert_frame_equal(
335
final_lf.collect(engine="streaming"),
336
pl.DataFrame(
337
{
338
"value_at": [
339
datetime(2024, 1, 1, 0, 0),
340
datetime(2024, 2, 1, 0, 0),
341
datetime(2024, 3, 1, 0, 0),
342
datetime(2024, 4, 1, 0, 0),
343
datetime(2024, 5, 1, 0, 0),
344
datetime(2024, 6, 1, 0, 0),
345
],
346
"value": [0, 1, 2, 3, 4, 5],
347
}
348
),
349
check_row_order=False,
350
)
351
352
353
def test_flush_join_and_operation_19040() -> None:
354
df_A = pl.LazyFrame({"K": [True, False], "A": [1, 1]})
355
356
df_B = pl.LazyFrame({"K": [True], "B": [1]})
357
358
df_C = pl.LazyFrame({"K": [True], "C": [1]})
359
360
q = (
361
df_A.join(df_B, how="full", on=["K"], coalesce=True)
362
.join(df_C, how="full", on=["K"], coalesce=True)
363
.with_columns(B=pl.col("B"))
364
.sort("K")
365
)
366
assert q.collect(engine="streaming").to_dict(as_series=False) == {
367
"K": [False, True],
368
"A": [1, 1],
369
"B": [None, 1],
370
"C": [None, 1],
371
}
372
373
374
def test_full_coalesce_join_and_rename_15583() -> None:
375
df1 = pl.LazyFrame({"a": [1, 2, 3]})
376
df2 = pl.LazyFrame({"a": [3, 4, 5]})
377
378
result = (
379
df1.join(df2, on="a", how="full", coalesce=True)
380
.select(pl.all().name.map(lambda c: c.upper()))
381
.sort("A")
382
.collect(engine="streaming")
383
)
384
assert result["A"].to_list() == [1, 2, 3, 4, 5]
385
386
387
def test_invert_order_full_join_22295() -> None:
388
lf = pl.LazyFrame(
389
{
390
"value_at": [datetime(2024, i + 1, 1) for i in range(6)],
391
"value": list(range(6)),
392
}
393
)
394
lf.join(lf, on=["value", "value_at"], how="full", coalesce=True).collect(
395
engine="streaming"
396
)
397
398
399
def test_cross_join_with_literal_column_25544() -> None:
400
df0 = pl.LazyFrame({"c0": [0]})
401
df1 = pl.LazyFrame({"c0": [1]})
402
403
result = df0.join(
404
df1.select(pl.col("c0")).with_columns(pl.lit(1)),
405
on=True, # type: ignore[arg-type]
406
).select("c0")
407
408
in_memory_result = result.collect(engine="in-memory")
409
streaming_result = result.collect(engine="streaming")
410
411
assert_frame_equal(streaming_result, in_memory_result)
412
assert streaming_result.item() == 0
413
414
415
@pytest.mark.parametrize("on", [["key"], ["key", "key_ext"]])
416
@pytest.mark.parametrize("how", ["inner", "left", "right", "full"])
417
@pytest.mark.parametrize("descending", [False, True])
418
@pytest.mark.parametrize("nulls_last", [False, True])
419
@pytest.mark.parametrize("nulls_equal", [False, True])
420
@pytest.mark.parametrize("coalesce", [None, True, False])
421
@pytest.mark.parametrize("maintain_order", ["none", "left_right", "right_left"])
422
@given(data=st.data())
423
@settings(max_examples=10)
424
def test_merge_join(
425
on: list[str],
426
how: JoinStrategy,
427
descending: bool,
428
nulls_last: bool,
429
nulls_equal: bool,
430
coalesce: bool | None,
431
maintain_order: MaintainOrderJoin,
432
data: st.DataObject,
433
) -> None:
434
check_row_order = maintain_order in {"left_right", "right_left"}
435
436
df_st = dataframes(min_cols=len(on), max_cols=len(on), allowed_dtypes=[pl.Int16])
437
left_df = data.draw(df_st)
438
right_df = data.draw(df_st)
439
440
left = left_df.rename(dict(zip(left_df.columns, ["key", "key_ext"], strict=False)))
441
right = right_df.rename(
442
dict(zip(right_df.columns, ["key", "key_ext"], strict=False))
443
)
444
445
def df_sorted(df: pl.DataFrame) -> pl.LazyFrame:
446
return (
447
df.lazy()
448
.sort(
449
*on,
450
descending=descending,
451
nulls_last=nulls_last,
452
maintain_order=True,
453
multithreaded=False,
454
)
455
.set_sorted(on, descending=descending, nulls_last=nulls_last)
456
)
457
458
q = df_sorted(left).join(
459
df_sorted(right),
460
on=on,
461
how=how,
462
nulls_equal=nulls_equal,
463
coalesce=coalesce,
464
maintain_order=maintain_order,
465
)
466
dot = q.show_graph(engine="streaming", plan_stage="physical", raw_output=True)
467
expected = q.collect(engine="in-memory")
468
actual = q.collect(engine="streaming")
469
470
assert "merge-join" in typing.cast("str", dot), "merge-join not used in plan"
471
assert_frame_equal(actual, expected, check_row_order=check_row_order)
472
473
474
@pytest.mark.parametrize(
475
("keys", "dtype"),
476
[
477
([False, True, False], pl.Boolean),
478
([1, 3, 2], pl.Int8),
479
([1, 3, 2], pl.Int16),
480
([1, 3, 2], pl.Int32),
481
([1, 3, 2], pl.Int64),
482
([1, 3, 2], pl.Int128),
483
([1, 3, 2], pl.UInt8),
484
([1, 3, 2], pl.UInt16),
485
([1, 3, 2], pl.UInt32),
486
([1, 3, 2], pl.UInt64),
487
([1, 3, 2], pl.UInt128),
488
([1.0, 3.0, 2.0], pl.Float16),
489
([1.0, 3.0, 2.0], pl.Float32),
490
([1.0, 3.0, 2.0], pl.Float64),
491
(["a", "b", "c"], pl.String),
492
([b"a", b"b", b"c"], pl.Binary),
493
([datetime(2024, 1, x) for x in [1, 3, 2]], pl.Date),
494
([datetime(2024, 1, x, 12, 0) for x in [1, 3, 2]], pl.Time),
495
([datetime(2024, 1, x, 12, 0) for x in [1, 3, 2]], pl.Datetime),
496
([timedelta(days=x) for x in [1, 3, 2]], pl.Duration),
497
([1, 3, 2], pl.Decimal),
498
([pl.Null, pl.Null, pl.Null], pl.Null),
499
(["a", "c", "b"], pl.Enum(["a", "b", "c"])),
500
(["a", "c", "b"], pl.Categorical),
501
],
502
)
503
@pytest.mark.parametrize("how", ["inner", "left", "right", "full"])
504
@pytest.mark.parametrize("nulls_equal", [False, True])
505
def test_join_dtypes(
506
keys: list[Any], dtype: pl.DataType, how: JoinStrategy, nulls_equal: bool
507
) -> None:
508
df_left = pl.DataFrame({"key": pl.Series("key", keys[:2], dtype=dtype)})
509
df_right = pl.DataFrame({"key": pl.Series("key", keys[2:], dtype=dtype)})
510
511
def df_sorted(df: pl.DataFrame) -> pl.LazyFrame:
512
return (
513
df.lazy()
514
.sort(
515
"key",
516
maintain_order=True,
517
multithreaded=False,
518
)
519
.set_sorted("key")
520
)
521
522
q_hashjoin = df_left.lazy().join(
523
df_right.lazy(),
524
on="key",
525
how=how,
526
nulls_equal=nulls_equal,
527
maintain_order="none",
528
)
529
dot = q_hashjoin.show_graph(
530
engine="streaming", plan_stage="physical", raw_output=True
531
)
532
expected = q_hashjoin.collect(engine="in-memory")
533
actual = q_hashjoin.collect(engine="streaming")
534
assert "equi-join" in typing.cast("str", dot), "hash-join not used in plan"
535
assert_frame_equal(actual, expected, check_row_order=False)
536
537
q_mergejoin = df_sorted(df_left).join(
538
df_sorted(df_right),
539
on="key",
540
how=how,
541
nulls_equal=nulls_equal,
542
maintain_order="none",
543
)
544
dot = q_mergejoin.show_graph(
545
engine="streaming", plan_stage="physical", raw_output=True
546
)
547
expected = q_mergejoin.collect(engine="in-memory")
548
actual = q_mergejoin.collect(engine="streaming")
549
assert "merge-join" in typing.cast("str", dot), "merge-join not used in plan"
550
assert_frame_equal(actual, expected, check_row_order=False)
551
552
553
def test_merge_join_exprs() -> None:
554
left = pl.LazyFrame(
555
{
556
"key": ["", "a", "c"],
557
"key_ext": [1, 2, 3],
558
"value": [1, 2, 3],
559
}
560
).set_sorted("key", "key_ext")
561
right = pl.LazyFrame(
562
{
563
"key": ["", "a", "b"],
564
"key_ext": [3, 2, 3],
565
"value": [4, 5, 6],
566
}
567
).set_sorted("key", "key_ext")
568
569
q = left.join(
570
right,
571
left_on="key",
572
right_on=pl.concat_str(pl.col("key"), ignore_nulls=False),
573
how="full",
574
maintain_order="none",
575
)
576
dot = q.show_graph(engine="streaming", plan_stage="physical", raw_output=True)
577
assert "merge-join" in typing.cast("str", dot), "merge-join not used in plan"
578
assert_frame_equal(q.collect(engine="streaming"), q.collect(engine="in-memory"))
579
580
581
@pytest.mark.parametrize("left_descending", [False, True])
582
@pytest.mark.parametrize("right_descending", [False, True])
583
@pytest.mark.parametrize("left_nulls_last", [False, True])
584
@pytest.mark.parametrize("right_nulls_last", [False, True])
585
def test_merge_join_applicable(
586
left_descending: bool,
587
right_descending: bool,
588
left_nulls_last: bool,
589
right_nulls_last: bool,
590
) -> None:
591
left = pl.LazyFrame({"key": [1]}).set_sorted(
592
"key", descending=left_descending, nulls_last=left_nulls_last
593
)
594
right = pl.LazyFrame({"key": [2]}).set_sorted(
595
"key", descending=right_descending, nulls_last=right_nulls_last
596
)
597
q = left.join(right, on="key", how="full", maintain_order="left_right")
598
dot = q.show_graph(engine="streaming", plan_stage="physical", raw_output=True)
599
if (left_descending, left_nulls_last) == (right_descending, right_nulls_last):
600
assert "merge-join" in typing.cast("str", dot)
601
else:
602
assert "merge-join" not in typing.cast("str", dot)
603
assert_frame_equal(q.collect(engine="streaming"), q.collect(engine="in-memory"))
604
605
606
@pytest.mark.parametrize("strategy", ["backward", "forward", "nearest"])
607
@pytest.mark.parametrize("allow_exact_matches", [False, True])
608
@pytest.mark.parametrize("coalesce", [False, True])
609
@pytest.mark.parametrize(
610
"dtypes",
611
[
612
FLOAT_DTYPES,
613
INTEGER_DTYPES,
614
{pl.String, pl.Binary},
615
{pl.Date},
616
{
617
pl.Datetime("ms"),
618
pl.Datetime("us"),
619
pl.Datetime("ns"),
620
},
621
{
622
pl.Datetime("ms", time_zone="Europe/Amsterdam"),
623
pl.Datetime("us", time_zone="Europe/Amsterdam"),
624
pl.Datetime("ns", time_zone="Europe/Amsterdam"),
625
},
626
{pl.Time},
627
{pl.Duration("ms"), pl.Duration("us"), pl.Duration("ns")},
628
],
629
)
630
@given(data=st.data())
631
def test_streaming_asof_join(
632
data: st.DataObject,
633
strategy: AsofJoinStrategy,
634
allow_exact_matches: bool,
635
coalesce: bool,
636
dtypes: set[pl.DataType],
637
) -> None:
638
if dtypes & {pl.String, pl.Binary} and strategy == "nearest":
639
pytest.skip("asof join with string/binary does not support 'nearest' strategy")
640
641
dtype = data.draw(st.sampled_from(list(dtypes)))
642
df_st = dataframes(
643
min_cols=1, max_cols=1, allowed_dtypes=[dtype], allow_time_zones=False
644
)
645
left_df = data.draw(df_st)
646
right_df = data.draw(df_st)
647
648
left = left_df.rename(lambda _: "key").sort("key").with_row_index().lazy()
649
right = right_df.rename(lambda _: "key").sort("key").with_row_index().lazy()
650
651
q = left.join_asof(
652
right,
653
on="key",
654
strategy=strategy,
655
allow_exact_matches=allow_exact_matches,
656
coalesce=coalesce,
657
)
658
expected = q.collect(engine="in-memory")
659
actual = q.collect(engine="streaming")
660
assert_frame_equal(actual, expected)
661
662