Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/operations/test_join_asof.py
6939 views
1
from __future__ import annotations
2
3
import warnings
4
from datetime import date, datetime, timedelta
5
from typing import TYPE_CHECKING, Any
6
7
import numpy as np
8
import pytest
9
10
import polars as pl
11
from polars.exceptions import DuplicateError, InvalidOperationError
12
from polars.testing import assert_frame_equal
13
14
if TYPE_CHECKING:
15
from polars._typing import AsofJoinStrategy, PolarsIntegerType
16
17
18
def test_asof_join_singular_right_11966() -> None:
19
df = pl.DataFrame({"id": [1, 2, 3], "time": [0.9, 2.1, 2.8]}).sort("time")
20
lookup = pl.DataFrame({"time": [2.0], "value": [100]}).sort("time")
21
joined = df.join_asof(lookup, on="time", strategy="nearest")
22
expected = pl.DataFrame(
23
{"id": [1, 2, 3], "time": [0.9, 2.1, 2.8], "value": [100, 100, 100]}
24
)
25
assert_frame_equal(joined, expected)
26
27
28
def test_asof_join_inline_cast_6438() -> None:
29
df_trades = pl.DataFrame(
30
{
31
"time": [
32
datetime(2020, 1, 1, 9, 1, 0),
33
datetime(2020, 1, 1, 9, 1, 0),
34
datetime(2020, 1, 1, 9, 3, 0),
35
datetime(2020, 1, 1, 9, 6, 0),
36
],
37
"stock": ["A", "B", "B", "C"],
38
"trade": [101, 299, 301, 500],
39
}
40
)
41
42
df_quotes = pl.DataFrame(
43
{
44
"time": [
45
datetime(2020, 1, 1, 9, 0, 0),
46
datetime(2020, 1, 1, 9, 2, 0),
47
datetime(2020, 1, 1, 9, 3, 0),
48
datetime(2020, 1, 1, 9, 6, 0),
49
],
50
"stock": ["A", "B", "C", "A"],
51
"quote": [100, 300, 501, 102],
52
}
53
).with_columns([pl.col("time").dt.cast_time_unit("ns")])
54
55
assert df_trades.join_asof(
56
df_quotes, on=pl.col("time").cast(pl.Datetime("ns")).set_sorted(), by="stock"
57
).to_dict(as_series=False) == {
58
"time": [
59
datetime(2020, 1, 1, 9, 1),
60
datetime(2020, 1, 1, 9, 1),
61
datetime(2020, 1, 1, 9, 3),
62
datetime(2020, 1, 1, 9, 6),
63
],
64
"time_right": [
65
datetime(2020, 1, 1, 9, 0),
66
None,
67
datetime(2020, 1, 1, 9, 2),
68
datetime(2020, 1, 1, 9, 3),
69
],
70
"stock": ["A", "B", "B", "C"],
71
"trade": [101, 299, 301, 500],
72
"quote": [100, None, 300, 501],
73
}
74
75
76
def test_asof_join_projection_resolution_4606() -> None:
77
a = pl.DataFrame({"a": [1], "b": [2], "c": [3]}).lazy()
78
b = pl.DataFrame({"a": [1], "b": [2], "d": [4]}).lazy()
79
joined_tbl = a.join_asof(b, on=pl.col("a").set_sorted(), by="b")
80
assert joined_tbl.group_by("a").agg(
81
[pl.col("c").sum().alias("c")]
82
).collect().columns == ["a", "c"]
83
84
85
def test_asof_join_schema_5211() -> None:
86
df1 = pl.DataFrame({"today": [1, 2]})
87
88
df2 = pl.DataFrame({"next_friday": [1, 2]})
89
90
assert (
91
df1.lazy()
92
.join_asof(
93
df2.lazy(), left_on="today", right_on="next_friday", strategy="forward"
94
)
95
.collect_schema()
96
) == {"today": pl.Int64, "next_friday": pl.Int64}
97
98
99
def test_asof_join_schema_5684() -> None:
100
df_a = (
101
pl.DataFrame(
102
{
103
"id": [1],
104
"a": [1],
105
"b": [1],
106
}
107
)
108
.lazy()
109
.set_sorted("a")
110
)
111
112
df_b = (
113
pl.DataFrame(
114
{
115
"id": [1, 1, 2],
116
"b": [-3, -3, 6],
117
}
118
)
119
.lazy()
120
.set_sorted("b")
121
)
122
123
q = (
124
df_a.join_asof(df_b, by="id", left_on="a", right_on="b")
125
.drop("b")
126
.join_asof(df_b, by="id", left_on="a", right_on="b")
127
.drop("b")
128
)
129
130
projected_result = q.select(pl.all()).collect()
131
result = q.collect()
132
133
assert_frame_equal(projected_result, result)
134
assert (
135
q.collect_schema()
136
== projected_result.schema
137
== {"id": pl.Int64, "a": pl.Int64, "b_right": pl.Int64}
138
)
139
140
141
def test_join_asof_mismatched_dtypes() -> None:
142
# test 'on' dtype mismatch
143
df1 = pl.DataFrame(
144
{"a": pl.Series([1, 2, 3], dtype=pl.Int64), "b": ["a", "b", "c"]}
145
)
146
df2 = pl.DataFrame(
147
{"a": pl.Series([1.0, 2.0, 3.0], dtype=pl.Float64), "c": ["d", "e", "f"]}
148
)
149
150
with pytest.raises(
151
pl.exceptions.SchemaError, match="datatypes of join keys don't match"
152
):
153
df1.join_asof(df2, on="a", strategy="forward")
154
155
# test 'by' dtype mismatch
156
df1 = pl.DataFrame(
157
{
158
"time": pl.date_range(date(2018, 1, 1), date(2018, 1, 8), eager=True),
159
"group": pl.Series([1, 1, 1, 1, 2, 2, 2, 2], dtype=pl.Int32),
160
"value": [0, 0, None, None, 2, None, 1, None],
161
}
162
)
163
df2 = pl.DataFrame(
164
{
165
"time": pl.date_range(date(2018, 1, 1), date(2018, 1, 8), eager=True),
166
"group": pl.Series([1, 1, 1, 1, 2, 2, 2, 2], dtype=pl.Int64),
167
"value": [0, 0, None, None, 2, None, 1, None],
168
}
169
)
170
171
with pytest.raises(
172
pl.exceptions.ComputeError, match="mismatching dtypes in 'by' parameter"
173
):
174
df1.join_asof(df2, on="time", by="group", strategy="forward")
175
176
177
def test_join_asof_floats() -> None:
178
df1 = pl.DataFrame({"a": [1.0, 2.0, 3.0], "b": ["lrow1", "lrow2", "lrow3"]})
179
df2 = pl.DataFrame({"a": [0.59, 1.49, 2.89], "b": ["rrow1", "rrow2", "rrow3"]})
180
181
result = df1.join_asof(df2, on=pl.col("a").set_sorted(), strategy="backward")
182
expected = {
183
"a": [1.0, 2.0, 3.0],
184
"b": ["lrow1", "lrow2", "lrow3"],
185
"a_right": [0.59, 1.49, 2.89],
186
"b_right": ["rrow1", "rrow2", "rrow3"],
187
}
188
assert result.to_dict(as_series=False) == expected
189
190
# with by argument
191
# 5740
192
df1 = pl.DataFrame(
193
{"b": np.linspace(0, 5, 7), "c": ["x" if i < 4 else "y" for i in range(7)]}
194
)
195
df2 = pl.DataFrame(
196
{
197
"val": [0.0, 2.5, 2.6, 2.7, 3.4, 4.0, 5.0],
198
"c": ["x", "x", "x", "y", "y", "y", "y"],
199
}
200
).with_columns(pl.col("val").alias("b").set_sorted())
201
assert df1.set_sorted("b").join_asof(df2, on=pl.col("b"), by="c").to_dict(
202
as_series=False
203
) == {
204
"b": [
205
0.0,
206
0.8333333333333334,
207
1.6666666666666667,
208
2.5,
209
3.3333333333333335,
210
4.166666666666667,
211
5.0,
212
],
213
"c": ["x", "x", "x", "x", "y", "y", "y"],
214
"val": [0.0, 0.0, 0.0, 2.5, 2.7, 4.0, 5.0],
215
}
216
217
218
def test_join_asof_tolerance() -> None:
219
df_trades = pl.DataFrame(
220
{
221
"time": [
222
datetime(2020, 1, 1, 9, 0, 1),
223
datetime(2020, 1, 1, 9, 0, 1),
224
datetime(2020, 1, 1, 9, 0, 3),
225
datetime(2020, 1, 1, 9, 0, 6),
226
],
227
"stock": ["A", "B", "B", "C"],
228
"trade": [101, 299, 301, 500],
229
}
230
).set_sorted("time")
231
232
df_quotes = pl.DataFrame(
233
{
234
"time": [
235
datetime(2020, 1, 1, 9, 0, 0),
236
datetime(2020, 1, 1, 9, 0, 2),
237
datetime(2020, 1, 1, 9, 0, 4),
238
datetime(2020, 1, 1, 9, 0, 6),
239
],
240
"stock": ["A", "B", "C", "A"],
241
"quote": [100, 300, 501, 102],
242
}
243
).set_sorted("time")
244
245
assert df_trades.join_asof(
246
df_quotes, on="time", by="stock", tolerance="2s"
247
).to_dict(as_series=False) == {
248
"time": [
249
datetime(2020, 1, 1, 9, 0, 1),
250
datetime(2020, 1, 1, 9, 0, 1),
251
datetime(2020, 1, 1, 9, 0, 3),
252
datetime(2020, 1, 1, 9, 0, 6),
253
],
254
"stock": ["A", "B", "B", "C"],
255
"trade": [101, 299, 301, 500],
256
"quote": [100, None, 300, 501],
257
}
258
259
assert df_trades.join_asof(
260
df_quotes, on="time", by="stock", tolerance="1s"
261
).to_dict(as_series=False) == {
262
"time": [
263
datetime(2020, 1, 1, 9, 0, 1),
264
datetime(2020, 1, 1, 9, 0, 1),
265
datetime(2020, 1, 1, 9, 0, 3),
266
datetime(2020, 1, 1, 9, 0, 6),
267
],
268
"stock": ["A", "B", "B", "C"],
269
"trade": [101, 299, 301, 500],
270
"quote": [100, None, 300, None],
271
}
272
273
for invalid_tolerance, match in [
274
("foo", "expected leading integer"),
275
([None], "could not extract number"),
276
]:
277
with pytest.raises(pl.exceptions.PolarsError, match=match):
278
df_trades.join_asof(
279
df_quotes,
280
on="time",
281
by="stock",
282
tolerance=invalid_tolerance, # type: ignore[arg-type]
283
)
284
285
286
def test_join_asof_tolerance_forward() -> None:
287
df_quotes = pl.DataFrame(
288
{
289
"time": [
290
datetime(2020, 1, 1, 9, 0, 0),
291
datetime(2020, 1, 1, 9, 0, 2),
292
datetime(2020, 1, 1, 9, 0, 4),
293
datetime(2020, 1, 1, 9, 0, 6),
294
datetime(2020, 1, 1, 9, 0, 7),
295
],
296
"stock": ["A", "B", "C", "A", "D"],
297
"quote": [100, 300, 501, 102, 10],
298
}
299
).set_sorted("time")
300
301
df_trades = pl.DataFrame(
302
{
303
"time": [
304
datetime(2020, 1, 1, 9, 0, 2),
305
datetime(2020, 1, 1, 9, 0, 1),
306
datetime(2020, 1, 1, 9, 0, 3),
307
datetime(2020, 1, 1, 9, 0, 6),
308
datetime(2020, 1, 1, 9, 0, 7),
309
],
310
"stock": ["A", "B", "B", "C", "D"],
311
"trade": [101, 299, 301, 500, 10],
312
}
313
).set_sorted("time")
314
315
assert df_quotes.join_asof(
316
df_trades, on="time", by="stock", tolerance="2s", strategy="forward"
317
).to_dict(as_series=False) == {
318
"time": [
319
datetime(2020, 1, 1, 9, 0, 0),
320
datetime(2020, 1, 1, 9, 0, 2),
321
datetime(2020, 1, 1, 9, 0, 4),
322
datetime(2020, 1, 1, 9, 0, 6),
323
datetime(2020, 1, 1, 9, 0, 7),
324
],
325
"stock": ["A", "B", "C", "A", "D"],
326
"quote": [100, 300, 501, 102, 10],
327
"trade": [101, 301, 500, None, 10],
328
}
329
330
assert df_quotes.join_asof(
331
df_trades, on="time", by="stock", tolerance="1s", strategy="forward"
332
).to_dict(as_series=False) == {
333
"time": [
334
datetime(2020, 1, 1, 9, 0, 0),
335
datetime(2020, 1, 1, 9, 0, 2),
336
datetime(2020, 1, 1, 9, 0, 4),
337
datetime(2020, 1, 1, 9, 0, 6),
338
datetime(2020, 1, 1, 9, 0, 7),
339
],
340
"stock": ["A", "B", "C", "A", "D"],
341
"quote": [100, 300, 501, 102, 10],
342
"trade": [None, 301, None, None, 10],
343
}
344
345
# Sanity check that this gives us equi-join
346
assert df_quotes.join_asof(
347
df_trades, on="time", by="stock", tolerance="0s", strategy="forward"
348
).to_dict(as_series=False) == {
349
"time": [
350
datetime(2020, 1, 1, 9, 0, 0),
351
datetime(2020, 1, 1, 9, 0, 2),
352
datetime(2020, 1, 1, 9, 0, 4),
353
datetime(2020, 1, 1, 9, 0, 6),
354
datetime(2020, 1, 1, 9, 0, 7),
355
],
356
"stock": ["A", "B", "C", "A", "D"],
357
"quote": [100, 300, 501, 102, 10],
358
"trade": [None, None, None, None, 10],
359
}
360
361
362
def test_join_asof_projection() -> None:
363
df1 = pl.DataFrame(
364
{
365
"df1_date": [20221011, 20221012, 20221013, 20221014, 20221016],
366
"df1_col1": ["foo", "bar", "foo", "bar", "foo"],
367
"key": ["a", "b", "b", "a", "b"],
368
}
369
).set_sorted("df1_date")
370
371
df2 = pl.DataFrame(
372
{
373
"df2_date": [20221012, 20221015, 20221018],
374
"df2_col1": ["1", "2", "3"],
375
"key": ["a", "b", "b"],
376
}
377
).set_sorted("df2_date")
378
379
assert (
380
(
381
df1.lazy().join_asof(df2.lazy(), left_on="df1_date", right_on="df2_date")
382
).select([pl.col("df2_date"), "df1_date"])
383
).collect().to_dict(as_series=False) == {
384
"df2_date": [None, 20221012, 20221012, 20221012, 20221015],
385
"df1_date": [20221011, 20221012, 20221013, 20221014, 20221016],
386
}
387
assert (
388
df1.lazy().join_asof(
389
df2.lazy(), by="key", left_on="df1_date", right_on="df2_date"
390
)
391
).select(["df2_date", "df1_date"]).collect().to_dict(as_series=False) == {
392
"df2_date": [None, None, None, 20221012, 20221015],
393
"df1_date": [20221011, 20221012, 20221013, 20221014, 20221016],
394
}
395
396
397
def test_asof_join_by_logical_types() -> None:
398
dates = (
399
pl.datetime_range(
400
datetime(2022, 1, 1), datetime(2022, 1, 2), interval="2h", eager=True
401
)
402
.cast(pl.Datetime("ns"))
403
.head(9)
404
)
405
x = pl.DataFrame({"a": dates, "b": map(float, range(9)), "c": ["1", "2", "3"] * 3})
406
407
result = x.join_asof(x, on=pl.col("b").set_sorted(), by=["c", "a"])
408
409
expected = {
410
"a": [
411
datetime(2022, 1, 1, 0, 0),
412
datetime(2022, 1, 1, 2, 0),
413
datetime(2022, 1, 1, 4, 0),
414
datetime(2022, 1, 1, 6, 0),
415
datetime(2022, 1, 1, 8, 0),
416
datetime(2022, 1, 1, 10, 0),
417
datetime(2022, 1, 1, 12, 0),
418
datetime(2022, 1, 1, 14, 0),
419
datetime(2022, 1, 1, 16, 0),
420
],
421
"b": [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
422
"c": ["1", "2", "3", "1", "2", "3", "1", "2", "3"],
423
"b_right": [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
424
}
425
assert result.to_dict(as_series=False) == expected
426
427
428
def test_join_asof_projection_7481() -> None:
429
ldf1 = pl.DataFrame({"a": [1, 2, 2], "b": "bleft"}).lazy().set_sorted("a")
430
ldf2 = pl.DataFrame({"a": 2, "b": [1, 2, 2]}).lazy().set_sorted("b")
431
432
assert (
433
ldf1.join_asof(ldf2, left_on="a", right_on="b").select("a", "b")
434
).collect().to_dict(as_series=False) == {
435
"a": [1, 2, 2],
436
"b": ["bleft", "bleft", "bleft"],
437
}
438
439
440
def test_asof_join_sorted_by_group(capsys: Any) -> None:
441
df1 = pl.DataFrame(
442
{
443
"key": ["a", "a", "a", "b", "b", "b"],
444
"asof_key": [2.0, 1.0, 3.0, 1.0, 2.0, 3.0],
445
"a": [102, 101, 103, 104, 105, 106],
446
}
447
).sort(by=["key", "asof_key"])
448
449
df2 = pl.DataFrame(
450
{
451
"key": ["a", "a", "a", "b", "b", "b"],
452
"asof_key": [0.9, 1.9, 2.9, 0.9, 1.9, 2.9],
453
"b": [201, 202, 203, 204, 205, 206],
454
}
455
).sort(by=["key", "asof_key"])
456
457
expected = pl.DataFrame(
458
[
459
pl.Series("key", ["a", "a", "a", "b", "b", "b"], dtype=pl.String),
460
pl.Series("asof_key", [1.0, 2.0, 3.0, 1.0, 2.0, 3.0], dtype=pl.Float64),
461
pl.Series("a", [101, 102, 103, 104, 105, 106], dtype=pl.Int64),
462
pl.Series("b", [201, 202, 203, 204, 205, 206], dtype=pl.Int64),
463
]
464
)
465
466
out = df1.join_asof(df2, on="asof_key", by="key")
467
assert_frame_equal(out, expected)
468
469
_, err = capsys.readouterr()
470
assert "is not explicitly sorted" not in err
471
472
473
def test_asof_join_nearest() -> None:
474
# Generic join_asof
475
df1 = pl.DataFrame(
476
{
477
"asof_key": [-1, 1, 2, 4, 6],
478
"a": [1, 2, 3, 4, 5],
479
}
480
).sort(by="asof_key")
481
482
df2 = pl.DataFrame(
483
{
484
"asof_key": [-1, 2, 4, 5],
485
"b": [1, 2, 3, 4],
486
}
487
).sort(by="asof_key")
488
489
expected = pl.DataFrame(
490
{"asof_key": [-1, 1, 2, 4, 6], "a": [1, 2, 3, 4, 5], "b": [1, 2, 2, 3, 4]}
491
)
492
493
out = df1.join_asof(df2, on="asof_key", strategy="nearest")
494
assert_frame_equal(out, expected)
495
496
# Edge case: last item of right matches multiples on left
497
df1 = pl.DataFrame(
498
{
499
"asof_key": [9, 9, 10, 10, 10],
500
"a": [1, 2, 3, 4, 5],
501
}
502
).set_sorted("asof_key")
503
df2 = pl.DataFrame(
504
{
505
"asof_key": [1, 2, 3, 10],
506
"b": [1, 2, 3, 4],
507
}
508
).set_sorted("asof_key")
509
expected = pl.DataFrame(
510
{
511
"asof_key": [9, 9, 10, 10, 10],
512
"a": [1, 2, 3, 4, 5],
513
"b": [4, 4, 4, 4, 4],
514
}
515
)
516
517
out = df1.join_asof(df2, on="asof_key", strategy="nearest")
518
assert_frame_equal(out, expected)
519
520
521
def test_asof_join_nearest_with_tolerance() -> None:
522
a = b = [1, 2, 3, 4, 5]
523
524
nones = pl.Series([None, None, None, None, None], dtype=pl.Int64)
525
526
# Case 1: complete miss
527
df1 = pl.DataFrame({"asof_key": [1, 2, 3, 4, 5], "a": a}).set_sorted("asof_key")
528
df2 = pl.DataFrame(
529
{
530
"asof_key": [7, 8, 9, 10, 11],
531
"b": b,
532
}
533
).set_sorted("asof_key")
534
expected = df1.with_columns(nones.alias("b"))
535
out = df1.join_asof(df2, on="asof_key", strategy="nearest", tolerance=1)
536
assert_frame_equal(out, expected)
537
538
# Case 2: complete miss in other direction
539
df1 = pl.DataFrame({"asof_key": [7, 8, 9, 10, 11], "a": a}).set_sorted("asof_key")
540
df2 = pl.DataFrame(
541
{
542
"asof_key": [1, 2, 3, 4, 5],
543
"b": b,
544
}
545
).set_sorted("asof_key")
546
expected = df1.with_columns(nones.alias("b"))
547
out = df1.join_asof(df2, on="asof_key", strategy="nearest", tolerance=1)
548
assert_frame_equal(out, expected)
549
550
# Case 3: match first item
551
df1 = pl.DataFrame({"asof_key": [1, 2, 3, 4, 5], "a": a}).set_sorted("asof_key")
552
df2 = pl.DataFrame(
553
{
554
"asof_key": [6, 7, 8, 9, 10],
555
"b": b,
556
}
557
).set_sorted("asof_key")
558
out = df1.join_asof(df2, on="asof_key", strategy="nearest", tolerance=1)
559
expected = df1.with_columns(pl.Series([None, None, None, None, 1]).alias("b"))
560
assert_frame_equal(out, expected)
561
562
# Case 4: match last item
563
df1 = pl.DataFrame({"asof_key": [1, 2, 3, 4, 5], "a": a}).set_sorted("asof_key")
564
df2 = pl.DataFrame(
565
{
566
"asof_key": [-4, -3, -2, -1, 0],
567
"b": b,
568
}
569
).set_sorted("asof_key")
570
out = df1.join_asof(df2, on="asof_key", strategy="nearest", tolerance=1)
571
expected = df1.with_columns(pl.Series([5, None, None, None, None]).alias("b"))
572
assert_frame_equal(out, expected)
573
574
# Case 5: match multiples, pick closer
575
df1 = pl.DataFrame(
576
{"asof_key": pl.Series([1, 2, 3, 4, 5], dtype=pl.Float64), "a": a}
577
).set_sorted("asof_key")
578
df2 = pl.DataFrame(
579
{
580
"asof_key": [0.0, 2.0, 2.4, 3.4, 10.0],
581
"b": b,
582
}
583
).set_sorted("asof_key")
584
out = df1.join_asof(df2, on="asof_key", strategy="nearest", tolerance=1)
585
expected = df1.with_columns(pl.Series([2, 2, 4, 4, None]).alias("b"))
586
assert_frame_equal(out, expected)
587
588
# Case 6: use 0 tolerance
589
df1 = pl.DataFrame(
590
{"asof_key": pl.Series([1, 2, 3, 4, 5], dtype=pl.Float64), "a": a}
591
).set_sorted("asof_key")
592
df2 = pl.DataFrame(
593
{
594
"asof_key": [0.0, 2.0, 2.4, 3.4, 10.0],
595
"b": b,
596
}
597
).set_sorted("asof_key")
598
out = df1.join_asof(df2, on="asof_key", strategy="nearest", tolerance=0)
599
expected = df1.with_columns(pl.Series([None, 2, None, None, None]).alias("b"))
600
assert_frame_equal(out, expected)
601
602
# Case 7: test with datetime
603
df1 = pl.DataFrame(
604
{
605
"asof_key": pl.Series(
606
[
607
datetime(2023, 1, 1),
608
datetime(2023, 1, 2),
609
datetime(2023, 1, 3),
610
datetime(2023, 1, 4),
611
datetime(2023, 1, 6),
612
]
613
),
614
"a": a,
615
}
616
).set_sorted("asof_key")
617
df2 = pl.DataFrame(
618
{
619
"asof_key": pl.Series(
620
[
621
datetime(2022, 1, 1),
622
datetime(2022, 1, 2),
623
datetime(2022, 1, 3),
624
datetime(
625
2023, 1, 2, 21, 30, 0
626
), # should match with 2023-01-02, 2023-01-03, and 2021-01-04
627
datetime(2023, 1, 7),
628
]
629
),
630
"b": b,
631
}
632
).set_sorted("asof_key")
633
out = df1.join_asof(df2, on="asof_key", strategy="nearest", tolerance="1d4h")
634
expected = df1.with_columns(pl.Series([None, 4, 4, 4, 5]).alias("b"))
635
assert_frame_equal(out, expected)
636
637
# Case 8: test using timedelta tolerance
638
out = df1.join_asof(
639
df2, on="asof_key", strategy="nearest", tolerance=timedelta(days=1, hours=4)
640
)
641
assert_frame_equal(out, expected)
642
643
# Case #9: last item is closest match
644
df1 = pl.DataFrame(
645
{
646
"asof_key_left": [10.00001, 20.0, 30.0],
647
}
648
).set_sorted("asof_key_left")
649
df2 = pl.DataFrame(
650
{
651
"asof_key_right": [10.00001, 20.0001, 29.0],
652
}
653
).set_sorted("asof_key_right")
654
out = df1.join_asof(
655
df2,
656
left_on="asof_key_left",
657
right_on="asof_key_right",
658
strategy="nearest",
659
tolerance=0.5,
660
)
661
expected = pl.DataFrame(
662
{
663
"asof_key_left": [10.00001, 20.0, 30.0],
664
"asof_key_right": [10.00001, 20.0001, None],
665
}
666
)
667
assert_frame_equal(out, expected)
668
669
670
def test_asof_join_nearest_by() -> None:
671
# Generic join_asof
672
df1 = pl.DataFrame(
673
{
674
"asof_key": [-1, 1, 2, 6, 1],
675
"group": [1, 1, 1, 2, 2],
676
"a": [1, 2, 3, 2, 5],
677
}
678
).sort(by=["group", "asof_key"])
679
680
df2 = pl.DataFrame(
681
{
682
"asof_key": [-1, 2, 5, 1],
683
"group": [1, 1, 2, 2],
684
"b": [1, 2, 3, 4],
685
}
686
).sort(by=["group", "asof_key"])
687
688
expected = pl.DataFrame(
689
{
690
"asof_key": [-1, 1, 2, 6, 1],
691
"group": [1, 1, 1, 2, 2],
692
"a": [1, 2, 3, 5, 2],
693
"b": [1, 2, 2, 4, 3],
694
}
695
).sort(by=["group", "asof_key"])
696
697
# Edge case: last item of right matches multiples on left
698
df1 = pl.DataFrame(
699
{
700
"asof_key": [9, 9, 10, 10, 10],
701
"group": [1, 1, 1, 2, 2],
702
"a": [1, 2, 3, 2, 5],
703
}
704
).sort(by=["group", "asof_key"])
705
706
df2 = pl.DataFrame(
707
{
708
"asof_key": [-1, 1, 1, 10],
709
"group": [1, 1, 2, 2],
710
"b": [1, 2, 3, 4],
711
}
712
).sort(by=["group", "asof_key"])
713
714
expected = pl.DataFrame(
715
{
716
"asof_key": [9, 9, 10, 10, 10],
717
"group": [1, 1, 1, 2, 2],
718
"a": [1, 2, 3, 2, 5],
719
"b": [2, 2, 2, 4, 4],
720
}
721
)
722
723
out = df1.join_asof(df2, on="asof_key", by="group", strategy="nearest")
724
assert_frame_equal(out, expected)
725
726
a = pl.DataFrame(
727
{
728
"code": [676, 35, 676, 676, 676],
729
"time": [364360, 364370, 364380, 365400, 367440],
730
}
731
)
732
b = pl.DataFrame(
733
{
734
"code": [676, 676, 35, 676, 676],
735
"time": [364000, 365000, 365000, 366000, 367000],
736
"price": [1.0, 2.0, 50, 3.0, None],
737
}
738
)
739
740
expected = pl.DataFrame(
741
{
742
"code": [676, 35, 676, 676, 676],
743
"time": [364360, 364370, 364380, 365400, 367440],
744
"price": [1.0, 50.0, 1.0, 2.0, None],
745
}
746
)
747
748
out = a.join_asof(b, by="code", on="time", strategy="nearest")
749
assert_frame_equal(out, expected)
750
751
# last item is closest match
752
df1 = pl.DataFrame(
753
{
754
"a": [1, 1, 1],
755
"asof_key_left": [10.00001, 20.0, 30.0],
756
}
757
).set_sorted("asof_key_left")
758
df2 = pl.DataFrame(
759
{
760
"a": [1, 1, 1],
761
"asof_key_right": [10.00001, 20.0001, 29.0],
762
}
763
).set_sorted("asof_key_right")
764
out = df1.join_asof(
765
df2,
766
left_on="asof_key_left",
767
right_on="asof_key_right",
768
by="a",
769
strategy="nearest",
770
)
771
expected = pl.DataFrame(
772
{
773
"a": [1, 1, 1],
774
"asof_key_left": [10.00001, 20.0, 30.0],
775
"asof_key_right": [10.00001, 20.0001, 29.0],
776
}
777
)
778
assert_frame_equal(out, expected)
779
780
781
def test_asof_join_nearest_by_with_tolerance() -> None:
782
df1 = pl.DataFrame(
783
{
784
"group": [
785
1,
786
1,
787
1,
788
1,
789
1,
790
2,
791
2,
792
2,
793
2,
794
2,
795
3,
796
3,
797
3,
798
3,
799
3,
800
4,
801
4,
802
4,
803
4,
804
4,
805
5,
806
5,
807
5,
808
5,
809
5,
810
6,
811
6,
812
6,
813
6,
814
6,
815
],
816
"asof_key": pl.Series(
817
[
818
1,
819
2,
820
3,
821
4,
822
5,
823
7,
824
8,
825
9,
826
10,
827
11,
828
1,
829
2,
830
3,
831
4,
832
5,
833
1,
834
2,
835
3,
836
4,
837
5,
838
1,
839
2,
840
3,
841
4,
842
5,
843
1,
844
2,
845
3,
846
4,
847
5,
848
],
849
dtype=pl.Float32,
850
),
851
"a": [
852
1,
853
2,
854
3,
855
4,
856
5,
857
1,
858
2,
859
3,
860
4,
861
5,
862
1,
863
2,
864
3,
865
4,
866
5,
867
1,
868
2,
869
3,
870
4,
871
5,
872
1,
873
2,
874
3,
875
4,
876
5,
877
1,
878
2,
879
3,
880
4,
881
5,
882
],
883
}
884
)
885
886
df2 = pl.DataFrame(
887
{
888
"group": [
889
1,
890
1,
891
1,
892
1,
893
1,
894
2,
895
2,
896
2,
897
2,
898
2,
899
3,
900
3,
901
3,
902
3,
903
3,
904
4,
905
4,
906
4,
907
4,
908
4,
909
5,
910
5,
911
5,
912
5,
913
5,
914
6,
915
6,
916
6,
917
6,
918
6,
919
],
920
"asof_key": pl.Series(
921
[
922
7,
923
8,
924
9,
925
10,
926
11,
927
1,
928
2,
929
3,
930
4,
931
5,
932
6,
933
7,
934
8,
935
9,
936
10,
937
5,
938
-3,
939
-2,
940
-1,
941
0,
942
0,
943
2,
944
2.4,
945
3.4,
946
10,
947
-3,
948
3,
949
8,
950
9,
951
10,
952
],
953
dtype=pl.Float32,
954
),
955
"b": [
956
1,
957
2,
958
3,
959
4,
960
5,
961
1,
962
2,
963
3,
964
4,
965
5,
966
1,
967
2,
968
3,
969
4,
970
5,
971
1,
972
2,
973
3,
974
4,
975
5,
976
1,
977
2,
978
3,
979
4,
980
5,
981
1,
982
2,
983
3,
984
4,
985
5,
986
],
987
}
988
)
989
990
expected = df1.with_columns(
991
pl.Series(
992
[
993
None,
994
None,
995
None,
996
None,
997
None,
998
None,
999
None,
1000
None,
1001
None,
1002
None,
1003
None,
1004
None,
1005
None,
1006
None,
1007
1,
1008
5,
1009
None,
1010
None,
1011
1,
1012
1,
1013
2,
1014
2,
1015
4,
1016
4,
1017
None,
1018
None,
1019
2,
1020
2,
1021
2,
1022
None,
1023
]
1024
).alias("b")
1025
)
1026
df1 = df1.sort(by=["group", "asof_key"])
1027
df2 = df2.sort(by=["group", "asof_key"])
1028
expected = expected.sort(by=["group", "a"])
1029
1030
out = df1.join_asof(
1031
df2, by="group", on="asof_key", strategy="nearest", tolerance=1.0
1032
).sort(by=["group", "a"])
1033
assert_frame_equal(out, expected)
1034
1035
# last item is closest match
1036
df1 = pl.DataFrame(
1037
{
1038
"a": [1, 1, 1],
1039
"asof_key_left": [10.00001, 20.0, 30.0],
1040
}
1041
).set_sorted("asof_key_left")
1042
df2 = pl.DataFrame(
1043
{
1044
"a": [1, 1, 1],
1045
"asof_key_right": [10.00001, 20.0001, 29.0],
1046
}
1047
).set_sorted("asof_key_right")
1048
out = df1.join_asof(
1049
df2,
1050
left_on="asof_key_left",
1051
right_on="asof_key_right",
1052
by="a",
1053
strategy="nearest",
1054
tolerance=0.5,
1055
)
1056
expected = pl.DataFrame(
1057
{
1058
"a": [1, 1, 1],
1059
"asof_key_left": [10.00001, 20.0, 30.0],
1060
"asof_key_right": [10.00001, 20.0001, None],
1061
}
1062
)
1063
assert_frame_equal(out, expected)
1064
1065
1066
def test_asof_join_nearest_by_date() -> None:
1067
df1 = pl.DataFrame(
1068
{
1069
"asof_key": [
1070
date(2019, 12, 30),
1071
date(2020, 1, 1),
1072
date(2020, 1, 2),
1073
date(2020, 1, 6),
1074
date(2020, 1, 1),
1075
],
1076
"group": [1, 1, 1, 2, 2],
1077
"a": [1, 2, 3, 2, 5],
1078
}
1079
).sort(by=["group", "asof_key"])
1080
1081
df2 = pl.DataFrame(
1082
{
1083
"asof_key": [
1084
date(2020, 1, 1),
1085
date(2020, 1, 2),
1086
date(2020, 1, 5),
1087
date(2020, 1, 1),
1088
],
1089
"group": [1, 1, 2, 2],
1090
"b": [1, 2, 3, 4],
1091
}
1092
).sort(by=["group", "asof_key"])
1093
1094
expected = pl.DataFrame(
1095
{
1096
"asof_key": [
1097
date(2019, 12, 30),
1098
date(2020, 1, 1),
1099
date(2020, 1, 2),
1100
date(2020, 1, 6),
1101
date(2020, 1, 1),
1102
],
1103
"group": [1, 1, 1, 2, 2],
1104
"a": [1, 2, 3, 2, 5],
1105
"b": [1, 1, 2, 3, 4],
1106
}
1107
).sort(by=["group", "asof_key"])
1108
1109
out = df1.join_asof(df2, on="asof_key", by="group", strategy="nearest")
1110
assert_frame_equal(out, expected)
1111
1112
1113
@pytest.mark.may_fail_auto_streaming # See #18927.
1114
def test_asof_join_string() -> None:
1115
left = pl.DataFrame({"x": [None, "a", "b", "c", None, "d", None]}).set_sorted("x")
1116
right = pl.DataFrame({"x": ["apple", None, "chutney"], "y": [0, 1, 2]}).set_sorted(
1117
"x"
1118
)
1119
forward = left.join_asof(right, on="x", strategy="forward")
1120
backward = left.join_asof(right, on="x", strategy="backward")
1121
forward_expected = pl.DataFrame(
1122
{
1123
"x": [None, "a", "b", "c", None, "d", None],
1124
"y": [None, 0, 2, 2, None, None, None],
1125
}
1126
)
1127
backward_expected = pl.DataFrame(
1128
{
1129
"x": [None, "a", "b", "c", None, "d", None],
1130
"y": [None, None, 0, 0, None, 2, None],
1131
}
1132
)
1133
assert_frame_equal(forward, forward_expected)
1134
assert_frame_equal(backward, backward_expected)
1135
1136
1137
def test_join_asof_by_argument_parsing() -> None:
1138
df1 = pl.DataFrame(
1139
{
1140
"n": [10, 20, 30, 40, 50, 60],
1141
"id1": [0, 0, 3, 3, 5, 5],
1142
"id2": [1, 2, 1, 2, 1, 2],
1143
"x": ["a", "b", "c", "d", "e", "f"],
1144
}
1145
).sort(by="n")
1146
1147
df2 = pl.DataFrame(
1148
{
1149
"n": [25, 8, 5, 23, 15, 35],
1150
"id1": [0, 0, 3, 3, 5, 5],
1151
"id2": [1, 2, 1, 2, 1, 2],
1152
"y": ["A", "B", "C", "D", "E", "F"],
1153
}
1154
).sort(by="n")
1155
1156
# any sequency for by argument is allowed, so we should see the same results here
1157
by_list = df1.join_asof(df2, on="n", by=["id1", "id2"])
1158
by_tuple = df1.join_asof(df2, on="n", by=("id1", "id2"))
1159
assert_frame_equal(by_list, by_tuple)
1160
1161
# same for using the by_left and by_right kwargs
1162
by_list2 = df1.join_asof(
1163
df2, on="n", by_left=["id1", "id2"], by_right=["id1", "id2"]
1164
)
1165
by_tuple2 = df1.join_asof(
1166
df2, on="n", by_left=("id1", "id2"), by_right=("id1", "id2")
1167
)
1168
assert_frame_equal(by_list2, by_list)
1169
assert_frame_equal(by_tuple2, by_list)
1170
1171
1172
def test_join_asof_invalid_args() -> None:
1173
df1 = pl.DataFrame(
1174
{
1175
"a": [1, 2, 3],
1176
"b": [1, 2, 3],
1177
}
1178
).set_sorted("a")
1179
df2 = pl.DataFrame(
1180
{
1181
"a": [1, 2, 3],
1182
"c": [1, 2, 3],
1183
}
1184
).set_sorted("a")
1185
1186
with pytest.raises(TypeError, match="expected `on` to be str or Expr, got 'list'"):
1187
df1.join_asof(df2, on=["a"]) # type: ignore[arg-type]
1188
with pytest.raises(
1189
TypeError, match="expected `left_on` to be str or Expr, got 'list'"
1190
):
1191
df1.join_asof(df2, left_on=["a"], right_on="a") # type: ignore[arg-type]
1192
with pytest.raises(
1193
TypeError, match="expected `right_on` to be str or Expr, got 'list'"
1194
):
1195
df1.join_asof(df2, left_on="a", right_on=["a"]) # type: ignore[arg-type]
1196
1197
1198
def test_join_as_of_by_schema() -> None:
1199
a = pl.DataFrame({"a": [1], "b": [2], "c": [3]}).lazy()
1200
b = pl.DataFrame({"a": [1], "b": [2], "d": [4]}).lazy()
1201
q = a.join_asof(b, on=pl.col("a").set_sorted(), by="b")
1202
assert q.collect_schema().names() == q.collect().columns
1203
1204
1205
def test_asof_join_by_schema() -> None:
1206
# different `by` names.
1207
df1 = pl.DataFrame({"on1": 0, "by1": 0})
1208
df2 = pl.DataFrame({"on1": 0, "by2": 0})
1209
1210
q = df1.lazy().join_asof(
1211
df2.lazy(),
1212
on="on1",
1213
by_left="by1",
1214
by_right="by2",
1215
)
1216
1217
assert q.collect_schema() == q.collect().schema
1218
1219
1220
def test_raise_invalid_by_arg_13020() -> None:
1221
df1 = pl.DataFrame({"asOfDate": [date(2020, 1, 1)]})
1222
df2 = pl.DataFrame(
1223
{
1224
"endityId": [date(2020, 1, 1)],
1225
"eventDate": ["A"],
1226
}
1227
)
1228
with pytest.raises(pl.exceptions.InvalidOperationError, match="expected both"):
1229
df1.sort("asOfDate").join_asof(
1230
df2.sort("eventDate"),
1231
left_on="asOfDate",
1232
right_on="eventDate",
1233
by_left=None,
1234
by_right=["entityId"],
1235
)
1236
1237
1238
def test_join_asof_no_exact_matches() -> None:
1239
trades = pl.DataFrame(
1240
{
1241
"time": [
1242
"2016-05-25 13:30:00.023",
1243
"2016-05-25 13:30:00.038",
1244
"2016-05-25 13:30:00.048",
1245
"2016-05-25 13:30:00.048",
1246
"2016-05-25 13:30:00.048",
1247
],
1248
"ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
1249
"price": [51.95, 51.95, 720.77, 720.92, 98.0],
1250
"quantity": [75, 155, 100, 100, 100],
1251
}
1252
).with_columns(pl.col("time").str.to_datetime())
1253
1254
quotes = pl.DataFrame(
1255
{
1256
"time": [
1257
"2016-05-25 13:30:00.023",
1258
"2016-05-25 13:30:00.023",
1259
"2016-05-25 13:30:00.030",
1260
"2016-05-25 13:30:00.041",
1261
"2016-05-25 13:30:00.048",
1262
"2016-05-25 13:30:00.049",
1263
"2016-05-25 13:30:00.072",
1264
"2016-05-25 13:30:00.075",
1265
],
1266
"ticker": ["GOOG", "MSFT", "MSFT", "MSFT", "GOOG", "AAPL", "GOOG", "MSFT"],
1267
"bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01],
1268
"ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03],
1269
}
1270
).with_columns(pl.col("time").str.to_datetime())
1271
1272
assert trades.join_asof(
1273
quotes, on="time", by="ticker", tolerance="10ms", allow_exact_matches=False
1274
).to_dict(as_series=False) == {
1275
"time": [
1276
datetime(2016, 5, 25, 13, 30, 0, 23000),
1277
datetime(2016, 5, 25, 13, 30, 0, 38000),
1278
datetime(2016, 5, 25, 13, 30, 0, 48000),
1279
datetime(2016, 5, 25, 13, 30, 0, 48000),
1280
datetime(2016, 5, 25, 13, 30, 0, 48000),
1281
],
1282
"ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
1283
"price": [51.95, 51.95, 720.77, 720.92, 98.0],
1284
"quantity": [75, 155, 100, 100, 100],
1285
"bid": [None, 51.97, None, None, None],
1286
"ask": [None, 51.98, None, None, None],
1287
}
1288
1289
1290
def test_join_asof_not_sorted() -> None:
1291
df = pl.DataFrame({"a": [1, 1, 1, 2, 2, 2], "b": [2, 1, 3, 1, 2, 3]})
1292
with pytest.raises(InvalidOperationError, match="is not sorted"):
1293
df.join_asof(df, on="b")
1294
1295
# When 'by' is provided, we do not check sortedness, but a warning is received
1296
with pytest.warns(
1297
UserWarning,
1298
match="Sortedness of columns cannot be checked when 'by' groups provided",
1299
):
1300
df.join_asof(df, on="b", by="a")
1301
1302
# When sortedness is False, we should get no warning
1303
with warnings.catch_warnings(record=True) as w:
1304
df.join_asof(df, on="b", check_sortedness=False)
1305
df.join_asof(df, on="b", by="a", check_sortedness=False)
1306
assert len(w) == 0 # no warnings caught
1307
1308
1309
@pytest.mark.parametrize("left_dtype", [pl.Int64, pl.UInt64, pl.Int128])
1310
@pytest.mark.parametrize("right_dtype", [pl.Int64, pl.UInt64, pl.Int128])
1311
@pytest.mark.parametrize("strategy", ["backward", "forward", "nearest"])
1312
def test_join_asof_large_int_21276(
1313
left_dtype: PolarsIntegerType,
1314
right_dtype: PolarsIntegerType,
1315
strategy: AsofJoinStrategy,
1316
) -> None:
1317
large_int64 = 1608129000134000123 # it only happen when "on" column is large
1318
left = pl.DataFrame({"ts": pl.Series([large_int64 + 2], dtype=left_dtype)})
1319
right = pl.DataFrame(
1320
{
1321
"ts": pl.Series([large_int64 + 1, large_int64 + 3], dtype=right_dtype),
1322
"value": [111, 333],
1323
}
1324
)
1325
result = left.join_asof(right, on="ts", strategy=strategy)
1326
idx = 0 if strategy == "backward" else 1
1327
expected = pl.DataFrame(
1328
{
1329
"ts": left["ts"],
1330
"value": right["value"].gather(idx),
1331
}
1332
)
1333
assert_frame_equal(result, expected)
1334
1335
1336
@pytest.mark.parametrize("by", ["constant", None])
1337
def test_join_asof_slice_23583(by: str | None) -> None:
1338
lhs = pl.LazyFrame(
1339
{
1340
"index": [0],
1341
"constant": 0,
1342
"date": [date(2025, 1, 1)],
1343
},
1344
).set_sorted("date")
1345
1346
rhs = pl.LazyFrame(
1347
{
1348
"index": [0, 1],
1349
"constant": 0,
1350
"date": [date(1970, 1, 1), date(2025, 1, 1)],
1351
},
1352
).set_sorted("date")
1353
1354
q = (
1355
lhs.join_asof(rhs, on="date", by=by, check_sortedness=False)
1356
.head(1)
1357
.select(pl.exclude("constant_right"))
1358
)
1359
1360
expect = pl.DataFrame(
1361
{
1362
"index": [0],
1363
"constant": 0,
1364
"date": [date(2025, 1, 1)],
1365
"index_right": [1],
1366
},
1367
)
1368
1369
assert_frame_equal(q.collect(optimizations=pl.QueryOptFlags.none()), expect)
1370
assert_frame_equal(q.collect(), expect)
1371
1372
1373
def test_join_asof_23751() -> None:
1374
a = pl.DataFrame(
1375
[
1376
pl.Series([1, 2, 3, 4, 5]).alias("index") * int(1e10),
1377
pl.Series([1, -1, 1, 1, -1]).alias("side"),
1378
]
1379
)
1380
1381
b = pl.DataFrame(
1382
[
1383
pl.Series([0, 1, 1, 3, 3, 5]).alias("index_right").cast(pl.UInt64)
1384
* int(1e10),
1385
pl.Series([-1, 1, -1, 1, 1, -1]).alias("side"),
1386
pl.Series([0, 10, 20, 30, 40, 50]).alias("value"),
1387
]
1388
)
1389
1390
assert a.join_asof(b, left_on="index", right_on="index_right", by="side").to_dict(
1391
as_series=False
1392
) == {
1393
"index": [10000000000, 20000000000, 30000000000, 40000000000, 50000000000],
1394
"side": [1, -1, 1, 1, -1],
1395
"index_right": [
1396
10000000000,
1397
10000000000,
1398
30000000000,
1399
30000000000,
1400
50000000000,
1401
],
1402
"value": [10, 20, 40, 40, 50],
1403
}
1404
1405
1406
def test_join_asof_nosuffix_dup_col_23834() -> None:
1407
a = pl.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
1408
b = pl.DataFrame({"b": [1, 2, 3], "c": [9, 10, 11]})
1409
with pytest.raises(DuplicateError):
1410
a.join_asof(b, left_on="a", right_on="b", suffix="")
1411
1412
1413
def test_join_asof_planner_schema_24000() -> None:
1414
a = pl.DataFrame([pl.Series("index", [1, 2, 3]) * 10])
1415
b = pl.DataFrame(
1416
[
1417
pl.Series("value", [10, 20, 30]),
1418
pl.Series("index_right", [1, 2, 3]).cast(pl.UInt64) * 10,
1419
]
1420
)
1421
q = a.lazy().join_asof(b.lazy(), left_on="index", right_on="index_right")
1422
1423
assert q.collect().schema == q.collect_schema()
1424
1425
b = pl.DataFrame(
1426
[
1427
pl.Series("index_right", [1, 2, 3]).cast(pl.UInt64) * 10,
1428
pl.Series("value", [10, 20, 30]),
1429
]
1430
)
1431
q = a.lazy().join_asof(b.lazy(), left_on="index", right_on="index_right")
1432
1433
assert q.collect().schema == q.collect_schema()
1434
1435