Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/streaming/test_streaming_join.py
6939 views
1
from __future__ import annotations
2
3
from datetime import datetime
4
from typing import TYPE_CHECKING, Literal
5
6
import numpy as np
7
import pandas as pd
8
import pytest
9
10
import polars as pl
11
from polars.testing import assert_frame_equal, assert_series_equal
12
13
if TYPE_CHECKING:
14
from pathlib import Path
15
16
from polars._typing import JoinStrategy
17
18
pytestmark = pytest.mark.xdist_group("streaming")
19
20
21
def test_streaming_full_outer_joins() -> None:
22
n = 100
23
dfa = pl.DataFrame(
24
{
25
"a": np.random.randint(0, 40, n),
26
"idx": np.arange(0, n),
27
}
28
)
29
30
n = 100
31
dfb = pl.DataFrame(
32
{
33
"a": np.random.randint(0, 40, n),
34
"idx": np.arange(0, n),
35
}
36
)
37
38
join_strategies: list[tuple[JoinStrategy, bool]] = [
39
("full", False),
40
("full", True),
41
]
42
for how, coalesce in join_strategies:
43
q = (
44
dfa.lazy()
45
.join(dfb.lazy(), on="a", how=how, coalesce=coalesce)
46
.sort(["idx"])
47
)
48
a = q.collect(engine="streaming")
49
b = q.collect(engine="in-memory")
50
assert_frame_equal(a, b, check_row_order=False)
51
52
53
def test_streaming_joins() -> None:
54
n = 100
55
dfa = pd.DataFrame(
56
{
57
"a": np.random.randint(0, 40, n),
58
"b": np.arange(0, n),
59
}
60
)
61
62
n = 100
63
dfb = pd.DataFrame(
64
{
65
"a": np.random.randint(0, 40, n),
66
"b": np.arange(0, n),
67
}
68
)
69
dfa_pl = pl.from_pandas(dfa).sort("a")
70
dfb_pl = pl.from_pandas(dfb)
71
72
join_strategies: list[Literal["inner", "left"]] = ["inner", "left"]
73
for how in join_strategies:
74
pd_result = dfa.merge(dfb, on="a", how=how)
75
pd_result.columns = pd.Index(["a", "b", "b_right"])
76
77
pl_result = (
78
dfa_pl.lazy()
79
.join(dfb_pl.lazy(), on="a", how=how)
80
.sort(["a", "b", "b_right"])
81
.collect(engine="streaming")
82
)
83
84
a = (
85
pl.from_pandas(pd_result)
86
.with_columns(pl.all().cast(int))
87
.sort(["a", "b", "b_right"])
88
)
89
assert_frame_equal(a, pl_result, check_dtypes=False)
90
91
pd_result = dfa.merge(dfb, on=["a", "b"], how=how)
92
93
pl_result = (
94
dfa_pl.lazy()
95
.join(dfb_pl.lazy(), on=["a", "b"], how=how)
96
.sort(["a", "b"])
97
.collect(engine="streaming")
98
)
99
100
# we cast to integer because pandas joins creates floats
101
a = pl.from_pandas(pd_result).with_columns(pl.all().cast(int)).sort(["a", "b"])
102
assert_frame_equal(a, pl_result, check_dtypes=False)
103
104
105
def test_streaming_cross_join_empty() -> None:
106
df1 = pl.LazyFrame(data={"col1": ["a"]})
107
108
df2 = pl.LazyFrame(
109
data={"col1": []},
110
schema={"col1": str},
111
)
112
113
out = df1.join(df2, how="cross").collect(engine="streaming")
114
assert out.shape == (0, 2)
115
assert out.columns == ["col1", "col1_right"]
116
117
118
def test_streaming_join_rechunk_12498() -> None:
119
rows = pl.int_range(0, 2)
120
121
a = pl.select(A=rows).lazy()
122
b = pl.select(B=rows).lazy()
123
124
q = a.join(b, how="cross")
125
assert q.collect(engine="streaming").sort(["B", "A"]).to_dict(as_series=False) == {
126
"A": [0, 1, 0, 1],
127
"B": [0, 0, 1, 1],
128
}
129
130
131
@pytest.mark.parametrize("maintain_order", [False, True])
132
def test_join_null_matches(maintain_order: bool) -> None:
133
# null values in joins should never find a match.
134
df_a = pl.LazyFrame(
135
{
136
"idx_a": [0, 1, 2],
137
"a": [None, 1, 2],
138
}
139
)
140
141
df_b = pl.LazyFrame(
142
{
143
"idx_b": [0, 1, 2, 3],
144
"a": [None, 2, 1, None],
145
}
146
)
147
# Semi
148
assert_series_equal(
149
df_a.join(
150
df_b,
151
on="a",
152
how="semi",
153
nulls_equal=True,
154
maintain_order="left" if maintain_order else "none",
155
).collect()["idx_a"],
156
pl.Series("idx_a", [0, 1, 2]),
157
check_order=maintain_order,
158
)
159
assert_series_equal(
160
df_a.join(
161
df_b,
162
on="a",
163
how="semi",
164
nulls_equal=False,
165
maintain_order="left" if maintain_order else "none",
166
).collect()["idx_a"],
167
pl.Series("idx_a", [1, 2]),
168
check_order=maintain_order,
169
)
170
171
# Inner
172
expected = pl.DataFrame({"idx_a": [2, 1], "a": [2, 1], "idx_b": [1, 2]})
173
assert_frame_equal(
174
df_a.join(
175
df_b,
176
on="a",
177
how="inner",
178
maintain_order="right" if maintain_order else "none",
179
).collect(),
180
expected,
181
check_row_order=maintain_order,
182
)
183
184
# Left outer
185
expected = pl.DataFrame(
186
{"idx_a": [0, 1, 2], "a": [None, 1, 2], "idx_b": [None, 2, 1]}
187
)
188
assert_frame_equal(
189
df_a.join(
190
df_b,
191
on="a",
192
how="left",
193
maintain_order="left" if maintain_order else "none",
194
).collect(),
195
expected,
196
check_row_order=maintain_order,
197
)
198
# Full outer
199
expected = pl.DataFrame(
200
{
201
"idx_a": [None, 2, 1, None, 0],
202
"a": [None, 2, 1, None, None],
203
"idx_b": [0, 1, 2, 3, None],
204
"a_right": [None, 2, 1, None, None],
205
}
206
)
207
assert_frame_equal(
208
df_a.join(
209
df_b,
210
on="a",
211
how="full",
212
maintain_order="right" if maintain_order else "none",
213
).collect(),
214
expected,
215
check_row_order=maintain_order,
216
)
217
218
219
@pytest.mark.parametrize("streaming", [False, True])
220
def test_join_null_matches_multiple_keys(streaming: bool) -> None:
221
df_a = pl.LazyFrame(
222
{
223
"a": [None, 1, 2],
224
"idx": [0, 1, 2],
225
}
226
)
227
228
df_b = pl.LazyFrame(
229
{
230
"a": [None, 2, 1, None, 1],
231
"idx": [0, 1, 2, 3, 1],
232
"c": [10, 20, 30, 40, 50],
233
}
234
)
235
236
expected = pl.DataFrame({"a": [1], "idx": [1], "c": [50]})
237
assert_frame_equal(
238
df_a.join(df_b, on=["a", "idx"], how="inner").collect(
239
engine="streaming" if streaming else "in-memory"
240
),
241
expected,
242
check_row_order=False,
243
)
244
expected = pl.DataFrame(
245
{"a": [None, 1, 2], "idx": [0, 1, 2], "c": [None, 50, None]}
246
)
247
assert_frame_equal(
248
df_a.join(df_b, on=["a", "idx"], how="left").collect(
249
engine="streaming" if streaming else "in-memory"
250
),
251
expected,
252
check_row_order=False,
253
)
254
255
expected = pl.DataFrame(
256
{
257
"a": [None, None, None, None, None, 1, 2],
258
"idx": [None, None, None, None, 0, 1, 2],
259
"a_right": [None, 2, 1, None, None, 1, None],
260
"idx_right": [0, 1, 2, 3, None, 1, None],
261
"c": [10, 20, 30, 40, None, 50, None],
262
}
263
)
264
assert_frame_equal(
265
df_a.join(df_b, on=["a", "idx"], how="full").sort("a").collect(),
266
expected,
267
check_row_order=False,
268
)
269
270
271
def test_streaming_join_and_union() -> None:
272
a = pl.LazyFrame({"a": [1, 2]})
273
274
b = pl.LazyFrame({"a": [1, 2, 4, 8]})
275
276
c = a.join(b, on="a", maintain_order="left_right")
277
# The join node latest ensures that the dispatcher
278
# needs to replace placeholders in unions.
279
q = pl.concat([a, b, c])
280
281
out = q.collect(engine="streaming")
282
assert_frame_equal(out, q.collect(engine="in-memory"))
283
assert out.to_series().to_list() == [1, 2, 1, 2, 4, 8, 1, 2]
284
285
286
def test_non_coalescing_streaming_left_join() -> None:
287
df1 = pl.LazyFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
288
289
df2 = pl.LazyFrame({"a": [1, 2], "c": ["j", "i"]})
290
291
q = df1.join(df2, on="a", how="left", coalesce=False)
292
assert_frame_equal(
293
q.collect(engine="streaming"),
294
pl.DataFrame(
295
{
296
"a": [1, 2, 3],
297
"b": ["a", "b", "c"],
298
"a_right": [1, 2, None],
299
"c": ["j", "i", None],
300
}
301
),
302
check_row_order=False,
303
)
304
305
306
@pytest.mark.write_disk
307
def test_streaming_outer_join_partial_flush(tmp_path: Path) -> None:
308
data = {
309
"value_at": [datetime(2024, i + 1, 1) for i in range(6)],
310
"value": list(range(6)),
311
}
312
313
parquet_path = tmp_path / "data.parquet"
314
pl.DataFrame(data=data).write_parquet(parquet_path)
315
316
other_parquet_path = tmp_path / "data2.parquet"
317
pl.DataFrame(data=data).write_parquet(other_parquet_path)
318
319
lf1 = pl.scan_parquet(other_parquet_path)
320
lf2 = pl.scan_parquet(parquet_path)
321
322
join_cols = set(lf1.collect_schema()).intersection(set(lf2.collect_schema()))
323
final_lf = lf1.join(lf2, on=list(join_cols), how="full", coalesce=True)
324
325
assert_frame_equal(
326
final_lf.collect(engine="streaming"),
327
pl.DataFrame(
328
{
329
"value_at": [
330
datetime(2024, 1, 1, 0, 0),
331
datetime(2024, 2, 1, 0, 0),
332
datetime(2024, 3, 1, 0, 0),
333
datetime(2024, 4, 1, 0, 0),
334
datetime(2024, 5, 1, 0, 0),
335
datetime(2024, 6, 1, 0, 0),
336
],
337
"value": [0, 1, 2, 3, 4, 5],
338
}
339
),
340
check_row_order=False,
341
)
342
343
344
def test_flush_join_and_operation_19040() -> None:
345
df_A = pl.LazyFrame({"K": [True, False], "A": [1, 1]})
346
347
df_B = pl.LazyFrame({"K": [True], "B": [1]})
348
349
df_C = pl.LazyFrame({"K": [True], "C": [1]})
350
351
q = (
352
df_A.join(df_B, how="full", on=["K"], coalesce=True)
353
.join(df_C, how="full", on=["K"], coalesce=True)
354
.with_columns(B=pl.col("B"))
355
.sort("K")
356
)
357
assert q.collect(engine="streaming").to_dict(as_series=False) == {
358
"K": [False, True],
359
"A": [1, 1],
360
"B": [None, 1],
361
"C": [None, 1],
362
}
363
364
365
def test_full_coalesce_join_and_rename_15583() -> None:
366
df1 = pl.LazyFrame({"a": [1, 2, 3]})
367
df2 = pl.LazyFrame({"a": [3, 4, 5]})
368
369
result = (
370
df1.join(df2, on="a", how="full", coalesce=True)
371
.select(pl.all().name.map(lambda c: c.upper()))
372
.sort("A")
373
.collect(engine="streaming")
374
)
375
assert result["A"].to_list() == [1, 2, 3, 4, 5]
376
377
378
def test_invert_order_full_join_22295() -> None:
379
lf = pl.LazyFrame(
380
{
381
"value_at": [datetime(2024, i + 1, 1) for i in range(6)],
382
"value": list(range(6)),
383
}
384
)
385
386
lf.join(lf, on=["value", "value_at"], how="full", coalesce=True).collect(
387
engine="streaming"
388
)
389
390