Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_ipc.py
8424 views
1
from __future__ import annotations
2
3
import io
4
import typing
5
from decimal import Decimal
6
from typing import TYPE_CHECKING, Any, no_type_check
7
8
import pandas as pd
9
import pyarrow.feather as paf
10
import pytest
11
from hypothesis import given
12
13
import polars as pl
14
from polars.interchange.protocol import CompatLevel
15
from polars.testing import assert_frame_equal, assert_series_equal
16
from polars.testing.parametric.strategies import dataframes
17
18
if TYPE_CHECKING:
19
from pathlib import Path
20
21
from polars._typing import IpcCompression
22
from tests.unit.conftest import MemoryUsage
23
24
COMPRESSIONS = ["uncompressed", "lz4", "zstd"]
25
26
27
def read_ipc(is_stream: bool, *args: Any, **kwargs: Any) -> pl.DataFrame:
28
if is_stream:
29
return pl.read_ipc_stream(*args, **kwargs)
30
else:
31
return pl.read_ipc(*args, **kwargs)
32
33
34
def write_ipc(df: pl.DataFrame, is_stream: bool, *args: Any, **kwargs: Any) -> Any:
35
if is_stream:
36
return df.write_ipc_stream(*args, **kwargs)
37
else:
38
return df.write_ipc(*args, **kwargs)
39
40
41
@pytest.mark.parametrize("compression", COMPRESSIONS)
42
@given(
43
df=dataframes(
44
min_size=1,
45
max_size=1000,
46
)
47
)
48
@pytest.mark.slow
49
def test_ipc_roundtrip_stream_parametric(
50
df: pl.DataFrame, compression: IpcCompression
51
) -> None:
52
f = io.BytesIO()
53
df.write_ipc_stream(f, compression=compression)
54
f.seek(0)
55
read_df = pl.read_ipc_stream(f, use_pyarrow=False)
56
assert_frame_equal(df, read_df, categorical_as_str=True)
57
58
59
@pytest.mark.parametrize("compression", COMPRESSIONS)
60
@given(
61
df=dataframes(
62
max_cols=1,
63
min_size=1,
64
max_size=1000,
65
)
66
)
67
@pytest.mark.slow
68
def test_ipc_roundtrip_nostream_parametric(
69
df: pl.DataFrame, compression: IpcCompression
70
) -> None:
71
f = io.BytesIO()
72
df.write_ipc(f, compression=compression)
73
f.seek(0)
74
read_df = pl.read_ipc(f, use_pyarrow=False)
75
assert_frame_equal(df, read_df, categorical_as_str=True)
76
77
78
@pytest.mark.parametrize("compression", COMPRESSIONS)
79
@given(
80
df=dataframes(
81
allowed_dtypes=[
82
pl.Float16,
83
pl.Float32,
84
pl.Float64,
85
pl.Int8,
86
pl.Int16,
87
pl.Int32,
88
pl.Int64,
89
pl.UInt8,
90
pl.UInt16,
91
pl.UInt32,
92
pl.UInt64,
93
pl.Boolean,
94
pl.Datetime,
95
],
96
allow_null=False,
97
allow_nan=False, # NaN values come back as nulls
98
max_size=1000,
99
)
100
)
101
@pytest.mark.slow
102
def test_ipc_roundtrip_pandas_parametric(
103
df: pl.DataFrame, compression: IpcCompression
104
) -> None:
105
pd_df = df.to_pandas()
106
f = io.BytesIO()
107
pd_df.to_feather(f, compression=compression)
108
f.seek(0)
109
df_read = pl.read_ipc(f, use_pyarrow=False)
110
assert_frame_equal(df, df_read, categorical_as_str=True)
111
f = io.BytesIO()
112
df.write_ipc(f, compression=compression)
113
f.seek(0)
114
pd_df_read = pd.read_feather(f)
115
assert pd_df.equals(pd_df_read)
116
117
118
@pytest.mark.parametrize("compression", COMPRESSIONS)
119
@given(
120
df=dataframes(
121
excluded_dtypes=[
122
pl.Int128,
123
pl.UInt128,
124
pl.Categorical,
125
pl.Struct,
126
pl.Enum,
127
],
128
max_size=1000,
129
)
130
)
131
@pytest.mark.slow
132
def test_ipc_roundtrip_pyarrow_parametric(
133
df: pl.DataFrame, compression: IpcCompression
134
) -> None:
135
f = io.BytesIO()
136
df.write_ipc(f, compression=compression)
137
f.seek(0)
138
139
table = paf.read_table(f)
140
assert_frame_equal(df, typing.cast("pl.DataFrame", pl.from_arrow(table)))
141
142
f = io.BytesIO()
143
paf.write_feather(df.to_arrow(), f, compression=compression)
144
f.seek(0)
145
assert_frame_equal(df, pl.read_ipc(f, use_pyarrow=False))
146
147
148
@pytest.mark.parametrize("compression", COMPRESSIONS)
149
@pytest.mark.parametrize("stream", [True, False])
150
def test_from_to_buffer(
151
df: pl.DataFrame, compression: IpcCompression, stream: bool
152
) -> None:
153
# use an ad-hoc buffer (file=None)
154
buf1 = write_ipc(df, stream, None, compression=compression)
155
buf1.seek(0)
156
read_df = read_ipc(stream, buf1, use_pyarrow=False)
157
assert_frame_equal(df, read_df, categorical_as_str=True)
158
159
# explicitly supply an existing buffer
160
buf2 = io.BytesIO()
161
buf2.seek(0)
162
write_ipc(df, stream, buf2, compression=compression)
163
buf2.seek(0)
164
read_df = read_ipc(stream, buf2, use_pyarrow=False)
165
assert_frame_equal(df, read_df, categorical_as_str=True)
166
167
168
@pytest.mark.parametrize("compression", COMPRESSIONS)
169
@pytest.mark.parametrize("path_as_string", [True, False])
170
@pytest.mark.parametrize("stream", [True, False])
171
@pytest.mark.write_disk
172
def test_from_to_file(
173
df: pl.DataFrame,
174
compression: IpcCompression,
175
path_as_string: bool,
176
tmp_path: Path,
177
stream: bool,
178
) -> None:
179
tmp_path.mkdir(exist_ok=True)
180
file_path = tmp_path / "small.ipc"
181
if path_as_string:
182
file_path = str(file_path) # type: ignore[assignment]
183
write_ipc(df, stream, file_path, compression=compression)
184
df_read = read_ipc(stream, file_path, use_pyarrow=False)
185
186
assert_frame_equal(df, df_read, categorical_as_str=True)
187
188
189
@pytest.mark.parametrize("stream", [True, False])
190
@pytest.mark.write_disk
191
def test_select_columns_from_file(
192
df: pl.DataFrame, tmp_path: Path, stream: bool
193
) -> None:
194
tmp_path.mkdir(exist_ok=True)
195
file_path = tmp_path / "small.ipc"
196
write_ipc(df, stream, file_path)
197
df_read = read_ipc(stream, file_path, columns=["bools"])
198
199
assert df_read.columns == ["bools"]
200
201
202
@pytest.mark.parametrize("stream", [True, False])
203
def test_select_columns_from_buffer(stream: bool) -> None:
204
df = pl.DataFrame(
205
{
206
"a": [1],
207
"b": [2],
208
"c": [3],
209
},
210
schema={"a": pl.Int64(), "b": pl.Int128(), "c": pl.UInt8()},
211
)
212
213
f = io.BytesIO()
214
write_ipc(df, stream, f)
215
f.seek(0)
216
217
actual = read_ipc(stream, f, columns=["b", "c", "a"], use_pyarrow=False)
218
219
expected = pl.DataFrame(
220
{
221
"b": [2],
222
"c": [3],
223
"a": [1],
224
},
225
schema={"b": pl.Int128(), "c": pl.UInt8(), "a": pl.Int64()},
226
)
227
assert_frame_equal(expected, actual)
228
229
230
@pytest.mark.parametrize("stream", [True, False])
231
def test_select_columns_projection(stream: bool) -> None:
232
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
233
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})
234
235
f = io.BytesIO()
236
write_ipc(df, stream, f)
237
f.seek(0)
238
239
read_df = read_ipc(stream, f, columns=[1, 2], use_pyarrow=False)
240
assert_frame_equal(expected, read_df)
241
242
243
@pytest.mark.parametrize("compression", COMPRESSIONS)
244
@pytest.mark.parametrize("stream", [True, False])
245
def test_compressed_simple(compression: IpcCompression, stream: bool) -> None:
246
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
247
248
f = io.BytesIO()
249
write_ipc(df, stream, f, compression=compression)
250
f.seek(0)
251
252
df_read = read_ipc(stream, f, use_pyarrow=False)
253
assert_frame_equal(df_read, df)
254
255
256
@pytest.mark.parametrize("compression", COMPRESSIONS)
257
def test_ipc_schema(compression: IpcCompression) -> None:
258
schema = {
259
"i64": pl.Int64(),
260
"i128": pl.Int128(),
261
"u8": pl.UInt8(),
262
"f32": pl.Float32(),
263
"f64": pl.Float64(),
264
"str": pl.String(),
265
"bool": pl.Boolean(),
266
}
267
df = pl.DataFrame(
268
{
269
"i64": [1, 2],
270
"i128": [1, 2],
271
"u8": [1, 2],
272
"f32": [1, 2],
273
"f64": [1, 2],
274
"str": ["a", None],
275
"bool": [True, False],
276
},
277
schema=schema,
278
)
279
280
f = io.BytesIO()
281
df.write_ipc(f, compression=compression)
282
f.seek(0)
283
284
assert pl.read_ipc_schema(f) == schema
285
286
287
@pytest.mark.write_disk
288
@pytest.mark.parametrize("compression", COMPRESSIONS)
289
@pytest.mark.parametrize("path_as_string", [True, False])
290
def test_ipc_schema_from_file(
291
df_no_lists: pl.DataFrame,
292
compression: IpcCompression,
293
path_as_string: bool,
294
tmp_path: Path,
295
) -> None:
296
tmp_path.mkdir(exist_ok=True)
297
298
file_path = tmp_path / "small.ipc"
299
if path_as_string:
300
file_path = str(file_path) # type: ignore[assignment]
301
df_no_lists.write_ipc(file_path, compression=compression)
302
schema = pl.read_ipc_schema(file_path)
303
304
expected = {
305
"bools": pl.Boolean(),
306
"bools_nulls": pl.Boolean(),
307
"int": pl.Int64(),
308
"int_nulls": pl.Int64(),
309
"floats": pl.Float64(),
310
"floats_nulls": pl.Float64(),
311
"strings": pl.String(),
312
"strings_nulls": pl.String(),
313
"date": pl.Date(),
314
"datetime": pl.Datetime(),
315
"time": pl.Time(),
316
"cat": pl.Categorical(),
317
"enum": pl.Enum(["foo", "ham", "bar"]),
318
}
319
assert schema == expected
320
321
322
@pytest.mark.parametrize("stream", [True, False])
323
def test_ipc_column_order(stream: bool) -> None:
324
df = pl.DataFrame(
325
{
326
"cola": ["x", "y", "z"],
327
"colb": [1, 2, 3],
328
"colc": [4.5, 5.6, 6.7],
329
}
330
)
331
f = io.BytesIO()
332
write_ipc(df, stream, f)
333
f.seek(0)
334
335
columns = ["colc", "colb", "cola"]
336
# read file into polars; the specified column order is no longer respected
337
assert read_ipc(stream, f, columns=columns).columns == columns
338
339
340
@pytest.mark.write_disk
341
def test_glob_ipc(df: pl.DataFrame, tmp_path: Path) -> None:
342
file_path = tmp_path / "small.ipc"
343
df.write_ipc(file_path)
344
345
file_path_glob = tmp_path / "small*.ipc"
346
347
result_scan = pl.scan_ipc(file_path_glob).collect()
348
result_read = pl.read_ipc(file_path_glob, use_pyarrow=False)
349
350
for result in [result_scan, result_read]:
351
assert_frame_equal(result, df, categorical_as_str=True)
352
353
354
@pytest.mark.write_disk
355
def test_binview_ipc_mmap(tmp_path: Path) -> None:
356
df = pl.DataFrame({"foo": ["aa" * 10, "bb", None, "small", "big" * 20]})
357
file_path = tmp_path / "dump.ipc"
358
df.write_ipc(file_path, compat_level=CompatLevel.newest())
359
read = pl.read_ipc(file_path, memory_map=True)
360
assert_frame_equal(df, read)
361
362
363
def test_list_nested_enum() -> None:
364
dtype = pl.List(pl.Enum(["a", "b", "c"]))
365
df = pl.DataFrame(pl.Series("list_cat", [["a", "b", "c", None]], dtype=dtype))
366
buffer = io.BytesIO()
367
df.write_ipc(buffer, compat_level=CompatLevel.newest())
368
buffer.seek(0)
369
df = pl.read_ipc(buffer)
370
assert df.get_column("list_cat").dtype == dtype
371
372
373
def test_struct_nested_enum() -> None:
374
dtype = pl.Struct({"enum": pl.Enum(["a", "b", "c"])})
375
df = pl.DataFrame(
376
pl.Series(
377
"struct_cat", [{"enum": "a"}, {"enum": "b"}, {"enum": None}], dtype=dtype
378
)
379
)
380
buffer = io.BytesIO()
381
df.write_ipc(buffer, compat_level=CompatLevel.newest())
382
buffer.seek(0)
383
df = pl.read_ipc(buffer)
384
assert df.get_column("struct_cat").dtype == dtype
385
386
387
@pytest.mark.slow
388
def test_ipc_view_gc_14448() -> None:
389
f = io.BytesIO()
390
# This size was required to trigger the bug
391
df = pl.DataFrame(
392
pl.Series(["small"] * 10 + ["looooooong string......."] * 750).slice(20, 20)
393
)
394
df.write_ipc(f, compat_level=CompatLevel.newest())
395
f.seek(0)
396
assert_frame_equal(pl.read_ipc(f), df)
397
398
399
@pytest.mark.slow
400
@pytest.mark.write_disk
401
@pytest.mark.parametrize("stream", [True, False])
402
def test_read_ipc_only_loads_selected_columns(
403
memory_usage_without_pyarrow: MemoryUsage,
404
tmp_path: Path,
405
stream: bool,
406
) -> None:
407
"""Only requested columns are loaded by ``read_ipc()``/``read_ipc_stream()``."""
408
tmp_path.mkdir(exist_ok=True)
409
410
# Each column will be about 16MB of RAM. There's a fixed overhead tied to
411
# block size so smaller file sizes can be misleading in terms of memory
412
# usage.
413
series = pl.arange(0, 2_000_000, dtype=pl.Int64, eager=True)
414
415
file_path = tmp_path / "multicolumn.ipc"
416
df = pl.DataFrame(
417
{
418
"a": series,
419
"b": series,
420
}
421
)
422
write_ipc(df, stream, file_path)
423
del df, series
424
425
memory_usage_without_pyarrow.reset_tracking()
426
427
# Only load one column:
428
kwargs = {}
429
if not stream:
430
kwargs["memory_map"] = False
431
df = read_ipc(stream, str(file_path), columns=["b"], rechunk=False, **kwargs)
432
del df
433
# Only one column's worth of memory should be used; 2 columns would be
434
# 32_000_000 at least, but there's some overhead.
435
# assert 16_000_000 < memory_usage_without_pyarrow.get_peak() < 23_000_000
436
437
438
@pytest.mark.write_disk
439
def test_ipc_decimal_15920(
440
tmp_path: Path,
441
) -> None:
442
tmp_path.mkdir(exist_ok=True)
443
444
base_df = pl.Series(
445
"x",
446
[
447
*[
448
Decimal(x)
449
for x in [
450
"10.1", "11.2", "12.3", "13.4", "14.5", "15.6", "16.7", "17.8", "18.9", "19.0",
451
"20.1", "21.2", "22.3", "23.4", "24.5", "25.6", "26.7", "27.8", "28.9", "29.0",
452
"30.1", "31.2", "32.3", "33.4", "34.5", "35.6", "36.7", "37.8", "38.9", "39.0"
453
]
454
],
455
*(50 * [None])
456
],
457
dtype=pl.Decimal(18, 2),
458
).to_frame() # fmt: skip
459
460
for df in [base_df, base_df.drop_nulls()]:
461
path = f"{tmp_path}/data.ipc"
462
df.write_ipc(path)
463
assert_frame_equal(pl.read_ipc(path), df)
464
465
466
def test_ipc_variadic_buffers_categorical_binview_18636() -> None:
467
df = pl.DataFrame(
468
{
469
"Test": pl.Series(["Value012"], dtype=pl.Categorical),
470
"Test2": pl.Series(["Value Two 20032"], dtype=pl.String),
471
}
472
)
473
474
b = io.BytesIO()
475
df.write_ipc(b)
476
b.seek(0)
477
assert_frame_equal(pl.read_ipc(b), df)
478
479
480
@pytest.mark.parametrize("size", [0, 1, 2, 13])
481
def test_ipc_chunked_roundtrip(size: int) -> None:
482
a = pl.Series("a", [{"x": 1}] * size, pl.Struct({"x": pl.Int8})).to_frame()
483
484
c = pl.concat([a] * 2, how="vertical")
485
486
f = io.BytesIO()
487
c.write_ipc(f)
488
489
f.seek(0)
490
assert_frame_equal(c, pl.read_ipc(f))
491
492
493
@pytest.mark.parametrize("size", [0, 1, 2, 13])
494
def test_zfs_ipc_roundtrip(size: int) -> None:
495
a = pl.Series("a", [{}] * size, pl.Struct([])).to_frame()
496
497
f = io.BytesIO()
498
a.write_ipc(f)
499
500
f.seek(0)
501
assert_frame_equal(a, pl.read_ipc(f))
502
503
504
@pytest.mark.parametrize("size", [0, 1, 2, 13])
505
def test_zfs_ipc_chunked_roundtrip(size: int) -> None:
506
a = pl.Series("a", [{}] * size, pl.Struct([])).to_frame()
507
508
c = pl.concat([a] * 2, how="vertical")
509
510
f = io.BytesIO()
511
c.write_ipc(f)
512
513
f.seek(0)
514
assert_frame_equal(c, pl.read_ipc(f))
515
516
517
@pytest.mark.parametrize("size", [0, 1, 2, 13])
518
@pytest.mark.parametrize("value", [{}, {"x": 1}])
519
@pytest.mark.write_disk
520
def test_memmap_ipc_chunked_structs(
521
size: int, value: dict[str, int], tmp_path: Path
522
) -> None:
523
a = pl.Series("a", [value] * size, pl.Struct).to_frame()
524
525
c = pl.concat([a] * 2, how="vertical")
526
527
f = tmp_path / "f.ipc"
528
c.write_ipc(f)
529
assert_frame_equal(c, pl.read_ipc(f))
530
531
532
def test_categorical_lexical_sort_2732() -> None:
533
df = pl.DataFrame(
534
{
535
"a": ["foo", "bar", "baz"],
536
"b": [1, 3, 2],
537
},
538
schema_overrides={"a": pl.Categorical()},
539
)
540
f = io.BytesIO()
541
df.write_ipc(f)
542
f.seek(0)
543
assert_frame_equal(df, pl.read_ipc(f))
544
545
546
def test_enum_scan_21564() -> None:
547
s = pl.Series("a", ["A"], pl.Enum(["A"]))
548
549
# DataFrame with a an enum field
550
f = io.BytesIO()
551
s.to_frame().write_ipc(f)
552
553
f.seek(0)
554
assert_series_equal(
555
pl.scan_ipc(f).collect().to_series(),
556
s,
557
)
558
559
560
@no_type_check
561
def test_roundtrip_empty_str_list_21163() -> None:
562
schema = {
563
"s": pl.Utf8,
564
"list": pl.List(pl.Utf8),
565
}
566
row1 = pl.DataFrame({"s": ["A"], "list": [[]]}, schema=schema)
567
row2 = pl.DataFrame({"s": ["B"], "list": [[]]}, schema=schema)
568
df = pl.concat([row1, row2])
569
bytes = df.serialize()
570
deserialized = pl.DataFrame.deserialize(io.BytesIO(bytes))
571
assert_frame_equal(df, deserialized)
572
573