Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_lazy_ipc.py
8460 views
1
from __future__ import annotations
2
3
import io
4
import typing
5
from typing import IO, TYPE_CHECKING, Any
6
7
import pyarrow.ipc
8
import pytest
9
10
import polars as pl
11
from polars.interchange.protocol import CompatLevel
12
from polars.testing.asserts.frame import assert_frame_equal
13
14
if TYPE_CHECKING:
15
from pathlib import Path
16
17
from polars._typing import IpcCompression
18
from tests.conftest import PlMonkeyPatch
19
20
COMPRESSIONS = ["uncompressed", "lz4", "zstd"]
21
22
23
@pytest.fixture
24
def foods_ipc_path(io_files_path: Path) -> Path:
25
return io_files_path / "foods1.ipc"
26
27
28
def test_row_index(foods_ipc_path: Path) -> None:
29
df = pl.read_ipc(foods_ipc_path, row_index_name="row_index", use_pyarrow=False)
30
assert df["row_index"].to_list() == list(range(27))
31
32
df = (
33
pl.scan_ipc(foods_ipc_path, row_index_name="row_index")
34
.filter(pl.col("category") == pl.lit("vegetables"))
35
.collect()
36
)
37
38
assert df["row_index"].to_list() == [0, 6, 11, 13, 14, 20, 25]
39
40
df = (
41
pl.scan_ipc(foods_ipc_path, row_index_name="row_index")
42
.with_row_index("foo", 10)
43
.filter(pl.col("category") == pl.lit("vegetables"))
44
.collect()
45
)
46
47
assert df["foo"].to_list() == [10, 16, 21, 23, 24, 30, 35]
48
49
50
def test_is_in_type_coercion(foods_ipc_path: Path) -> None:
51
out = (
52
pl.scan_ipc(foods_ipc_path)
53
.filter(pl.col("category").is_in(("vegetables", "ice cream")))
54
.collect()
55
)
56
assert out.shape == (7, 4)
57
out = (
58
pl.scan_ipc(foods_ipc_path)
59
.select(pl.col("category").alias("cat"))
60
.filter(pl.col("cat").is_in(["vegetables"]))
61
.collect()
62
)
63
assert out.shape == (7, 1)
64
65
66
def test_row_index_schema(foods_ipc_path: Path) -> None:
67
assert (
68
pl.scan_ipc(foods_ipc_path, row_index_name="id")
69
.select(["id", "category"])
70
.collect()
71
).dtypes == [pl.get_index_type(), pl.String]
72
73
74
def test_glob_n_rows(io_files_path: Path) -> None:
75
file_path = io_files_path / "foods*.ipc"
76
df = pl.scan_ipc(file_path, n_rows=40).collect()
77
78
# 27 rows from foods1.ipc and 13 from foods2.ipc
79
assert df.shape == (40, 4)
80
81
# take first and last rows
82
assert df[[0, 39]].to_dict(as_series=False) == {
83
"category": ["vegetables", "seafood"],
84
"calories": [45, 146],
85
"fats_g": [0.5, 6.0],
86
"sugars_g": [2, 2],
87
}
88
89
90
def test_ipc_list_arg(io_files_path: Path) -> None:
91
first = io_files_path / "foods1.ipc"
92
second = io_files_path / "foods2.ipc"
93
94
df = pl.scan_ipc(source=[first, second]).collect()
95
assert df.shape == (54, 4)
96
assert df.row(-1) == ("seafood", 194, 12.0, 1)
97
assert df.row(0) == ("vegetables", 45, 0.5, 2)
98
99
100
def test_scan_ipc_local_with_async(
101
plmonkeypatch: PlMonkeyPatch,
102
io_files_path: Path,
103
) -> None:
104
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
105
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
106
107
assert_frame_equal(
108
pl.scan_ipc(io_files_path / "foods1.ipc").head(1).collect(),
109
pl.DataFrame(
110
{
111
"category": ["vegetables"],
112
"calories": [45],
113
"fats_g": [0.5],
114
"sugars_g": [2],
115
}
116
),
117
)
118
119
120
def test_sink_ipc_compat_level_22930() -> None:
121
df = pl.DataFrame({"a": ["foo"]})
122
123
f1 = io.BytesIO()
124
f2 = io.BytesIO()
125
126
df.lazy().sink_ipc(f1, compat_level=CompatLevel.oldest(), engine="in-memory")
127
df.lazy().sink_ipc(f2, compat_level=CompatLevel.oldest(), engine="streaming")
128
129
f1.seek(0)
130
f2.seek(0)
131
132
t1 = pyarrow.ipc.open_file(f1)
133
assert "large_string" in str(t1.schema)
134
assert_frame_equal(pl.DataFrame(t1.read_all()), df)
135
136
t2 = pyarrow.ipc.open_file(f2)
137
assert "large_string" in str(t2.schema)
138
assert_frame_equal(pl.DataFrame(t2.read_all()), df)
139
140
141
def test_scan_file_info_cache(
142
capfd: Any, plmonkeypatch: PlMonkeyPatch, foods_ipc_path: Path
143
) -> None:
144
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
145
a = pl.scan_ipc(foods_ipc_path)
146
b = pl.scan_ipc(foods_ipc_path)
147
148
a.join(b, how="cross").explain()
149
150
captured = capfd.readouterr().err
151
assert "FILE_INFO CACHE HIT" in captured
152
153
154
def test_scan_ipc_file_async(
155
plmonkeypatch: PlMonkeyPatch,
156
io_files_path: Path,
157
) -> None:
158
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
159
160
foods1 = io_files_path / "foods1.ipc"
161
162
df = pl.scan_ipc(foods1).collect()
163
164
assert_frame_equal(
165
pl.scan_ipc(foods1).select(pl.len()).collect(), df.select(pl.len())
166
)
167
168
assert_frame_equal(
169
pl.scan_ipc(foods1).head(1).collect(),
170
df.head(1),
171
)
172
173
assert_frame_equal(
174
pl.scan_ipc(foods1).tail(1).collect(),
175
df.tail(1),
176
)
177
178
assert_frame_equal(
179
pl.scan_ipc(foods1).slice(-1, 1).collect(),
180
df.slice(-1, 1),
181
)
182
183
assert_frame_equal(
184
pl.scan_ipc(foods1).slice(7, 10).collect(),
185
df.slice(7, 10),
186
)
187
188
assert_frame_equal(
189
pl.scan_ipc(foods1).select(pl.col.calories).collect(),
190
df.select(pl.col.calories),
191
)
192
193
assert_frame_equal(
194
pl.scan_ipc(foods1).select([pl.col.calories, pl.col.category]).collect(),
195
df.select([pl.col.calories, pl.col.category]),
196
)
197
198
assert_frame_equal(
199
pl.scan_ipc([foods1, foods1]).collect(),
200
pl.concat([df, df]),
201
)
202
203
assert_frame_equal(
204
pl.scan_ipc(foods1).select(pl.col.calories.sum()).collect(),
205
df.select(pl.col.calories.sum()),
206
)
207
208
assert_frame_equal(
209
pl.scan_ipc(foods1, row_index_name="ri", row_index_offset=42)
210
.slice(0, 1)
211
.select(pl.col.ri)
212
.collect(),
213
df.with_row_index(name="ri", offset=42).slice(0, 1).select(pl.col.ri),
214
)
215
216
217
def test_scan_ipc_file_async_dict(
218
plmonkeypatch: PlMonkeyPatch,
219
) -> None:
220
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
221
222
buf = io.BytesIO()
223
lf = pl.LazyFrame(
224
{"cat": ["A", "B", "C", "A", "C", "B"]}, schema={"cat": pl.Categorical}
225
).with_row_index()
226
lf.sink_ipc(buf)
227
228
out = pl.scan_ipc(buf).collect()
229
expected = lf.collect()
230
assert_frame_equal(out, expected)
231
232
233
# TODO: create multiple record batches through API instead of env variable
234
def test_scan_ipc_file_async_multiple_record_batches(
235
plmonkeypatch: PlMonkeyPatch,
236
) -> None:
237
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
238
plmonkeypatch.setenv("POLARS_IDEAL_SINK_MORSEL_SIZE_ROWS", "10")
239
240
buf = io.BytesIO()
241
lf = pl.LazyFrame({"a": list(range(100))})
242
lf.sink_ipc(buf)
243
df = lf.collect()
244
245
buffers = typing.cast("list[IO[bytes]]", [buf, buf])
246
247
assert_frame_equal(
248
pl.scan_ipc(buf).collect(),
249
df,
250
)
251
252
assert_frame_equal(
253
pl.scan_ipc(buf).head(15).collect(),
254
df.head(15),
255
)
256
257
assert_frame_equal(
258
pl.scan_ipc(buf).tail(15).collect(),
259
df.tail(15),
260
)
261
262
assert_frame_equal(
263
pl.scan_ipc(buf).slice(45, 20).collect(),
264
df.slice(45, 20),
265
)
266
267
assert_frame_equal(
268
pl.scan_ipc(buffers).slice(85, 30).collect(),
269
pl.concat([df.slice(85, 15), df.slice(0, 15)]),
270
)
271
272
assert_frame_equal(
273
pl.scan_ipc(buf).select(pl.col.a.sum()).collect(),
274
df.select(pl.col.a.sum()),
275
)
276
277
assert_frame_equal(
278
pl.scan_ipc(buffers, row_index_name="ri").tail(15).select(pl.col.ri).collect(),
279
pl.concat([df, df]).with_row_index("ri").tail(15).select(pl.col.ri),
280
)
281
282
283
@pytest.mark.parametrize("n_a", [1, 999])
284
@pytest.mark.parametrize("n_b", [1, 12, 13, 999]) # problem starts 13
285
@pytest.mark.parametrize("compression", COMPRESSIONS)
286
def test_scan_ipc_varying_block_metadata_len_c4812(
287
n_a: int, n_b: int, compression: IpcCompression, plmonkeypatch: PlMonkeyPatch
288
) -> None:
289
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
290
291
buf = io.BytesIO()
292
df = pl.DataFrame({"a": [n_a * "A", n_b * "B"]})
293
df.lazy().sink_ipc(buf, compression=compression, record_batch_size=1)
294
295
with pyarrow.ipc.open_file(buf) as reader:
296
assert [
297
reader.get_batch(i).num_rows for i in range(reader.num_record_batches)
298
] == [1, 1]
299
300
assert_frame_equal(pl.scan_ipc(buf).collect(), df)
301
302
303
@pytest.mark.parametrize(
304
"record_batch_size", [1, 2, 5, 7, 50, 99, 100, 101, 299, 300, 100_000]
305
)
306
@pytest.mark.parametrize("n_chunks", [1, 2, 3])
307
def test_sink_ipc_record_batch_size(record_batch_size: int, n_chunks: int) -> None:
308
n_rows = 100
309
buf = io.BytesIO()
310
311
df0 = pl.DataFrame({"a": list(range(n_rows))})
312
df = df0
313
while n_chunks > 1:
314
df = pl.concat([df, df0])
315
n_chunks -= 1
316
317
df.lazy().sink_ipc(buf, record_batch_size=record_batch_size)
318
319
buf.seek(0)
320
out = pl.scan_ipc(buf).collect()
321
assert_frame_equal(out, df)
322
323
buf.seek(0)
324
reader = pyarrow.ipc.open_file(buf)
325
n_batches = reader.num_record_batches
326
for i in range(n_batches):
327
n_rows = reader.get_batch(i).num_rows
328
assert n_rows == record_batch_size or (
329
i + 1 == n_batches and n_rows <= record_batch_size
330
)
331
332
333
@pytest.mark.parametrize("record_batch_size", [None, 3])
334
@pytest.mark.parametrize("slice", [(0, 0), (0, 1), (0, 5), (4, 7), (-1, 1), (-5, 4)])
335
@pytest.mark.parametrize("compression", COMPRESSIONS)
336
def test_scan_ipc_compression_with_slice_26063(
337
record_batch_size: int, slice: tuple[int, int], compression: IpcCompression
338
) -> None:
339
n_rows = 15
340
df = pl.DataFrame({"a": range(n_rows)}).with_columns(
341
pl.col.a.pow(3).cast(pl.String).alias("b")
342
)
343
buf = io.BytesIO()
344
345
df.lazy().sink_ipc(
346
buf, compression=compression, record_batch_size=record_batch_size
347
)
348
out = pl.scan_ipc(buf).slice(slice[0], slice[1]).collect()
349
expected = df.slice(slice[0], slice[1])
350
assert_frame_equal(out, expected)
351
352
353
def test_sink_scan_ipc_round_trip_statistics() -> None:
354
n_rows = 4_000 # must be higher than (n_vCPU)^2 to avoid sortedness inference
355
buf = io.BytesIO()
356
357
df = (
358
pl.DataFrame({"a": range(n_rows)})
359
.with_columns(pl.col.a.reverse().alias("b"))
360
.with_columns(pl.col.a.shuffle().alias("d"))
361
.with_columns(pl.col.a.shuffle().sort().alias("d"))
362
)
363
df.lazy().sink_ipc(buf, _record_batch_statistics=True)
364
365
metadata = df._to_metadata()
366
367
# baseline
368
assert metadata.select(pl.col("sorted_asc").sum()).item() == 2
369
assert metadata.select(pl.col("sorted_dsc").sum()).item() == 1
370
371
# round-trip
372
out = pl.scan_ipc(buf, _record_batch_statistics=True).collect()
373
assert_frame_equal(metadata, out._to_metadata())
374
375
# do not read unless requested
376
out = pl.scan_ipc(buf).collect()
377
assert out._to_metadata().select(pl.col("sorted_asc").sum()).item() == 0
378
assert out._to_metadata().select(pl.col("sorted_dsc").sum()).item() == 0
379
380
# remain pyarrow compatible
381
out = pl.read_ipc(buf, use_pyarrow=True)
382
assert_frame_equal(df, out)
383
384
385
@pytest.mark.parametrize(
386
"selection",
387
[["b"], ["a", "b", "c", "d"], ["d", "c", "a", "b"], ["d", "a", "b"]],
388
)
389
@pytest.mark.parametrize("record_batch_size", [None, 100])
390
def test_sink_scan_ipc_round_trip_statistics_projection(
391
selection: list[str], record_batch_size: int
392
) -> None:
393
n_rows = 4_000 # must be higher than (n_vCPU)^2 to avoid sortedness inference
394
buf = io.BytesIO()
395
396
df = (
397
pl.DataFrame({"a": range(n_rows)})
398
.with_columns(pl.col.a.reverse().alias("b"))
399
.with_columns(pl.col.a.shuffle().alias("c"))
400
.with_columns(pl.col.a.shuffle().sort().alias("d"))
401
)
402
df.lazy().sink_ipc(
403
buf, record_batch_size=record_batch_size, _record_batch_statistics=True
404
)
405
406
# round-trip with projection
407
df = df.select(selection)
408
out = pl.scan_ipc(buf, _record_batch_statistics=True).select(selection).collect()
409
assert_frame_equal(df, out)
410
assert_frame_equal(df._to_metadata(), out._to_metadata())
411
412