Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_ipc.py
6939 views
1
from __future__ import annotations
2
3
import io
4
from decimal import Decimal
5
from typing import TYPE_CHECKING, Any, no_type_check
6
7
import pandas as pd
8
import pytest
9
10
import polars as pl
11
from polars.interchange.protocol import CompatLevel
12
from polars.testing import assert_frame_equal, assert_series_equal
13
14
if TYPE_CHECKING:
15
from pathlib import Path
16
17
from polars._typing import IpcCompression
18
from tests.unit.conftest import MemoryUsage
19
20
COMPRESSIONS = ["uncompressed", "lz4", "zstd"]
21
22
23
def read_ipc(is_stream: bool, *args: Any, **kwargs: Any) -> pl.DataFrame:
24
if is_stream:
25
return pl.read_ipc_stream(*args, **kwargs)
26
else:
27
return pl.read_ipc(*args, **kwargs)
28
29
30
def write_ipc(df: pl.DataFrame, is_stream: bool, *args: Any, **kwargs: Any) -> Any:
31
if is_stream:
32
return df.write_ipc_stream(*args, **kwargs)
33
else:
34
return df.write_ipc(*args, **kwargs)
35
36
37
@pytest.mark.parametrize("compression", COMPRESSIONS)
38
@pytest.mark.parametrize("stream", [True, False])
39
def test_from_to_buffer(
40
df: pl.DataFrame, compression: IpcCompression, stream: bool
41
) -> None:
42
# use an ad-hoc buffer (file=None)
43
buf1 = write_ipc(df, stream, None, compression=compression)
44
buf1.seek(0)
45
read_df = read_ipc(stream, buf1, use_pyarrow=False)
46
assert_frame_equal(df, read_df, categorical_as_str=True)
47
48
# explicitly supply an existing buffer
49
buf2 = io.BytesIO()
50
buf2.seek(0)
51
write_ipc(df, stream, buf2, compression=compression)
52
buf2.seek(0)
53
read_df = read_ipc(stream, buf2, use_pyarrow=False)
54
assert_frame_equal(df, read_df, categorical_as_str=True)
55
56
57
@pytest.mark.parametrize("compression", COMPRESSIONS)
58
@pytest.mark.parametrize("path_as_string", [True, False])
59
@pytest.mark.parametrize("stream", [True, False])
60
@pytest.mark.write_disk
61
def test_from_to_file(
62
df: pl.DataFrame,
63
compression: IpcCompression,
64
path_as_string: bool,
65
tmp_path: Path,
66
stream: bool,
67
) -> None:
68
tmp_path.mkdir(exist_ok=True)
69
file_path = tmp_path / "small.ipc"
70
if path_as_string:
71
file_path = str(file_path) # type: ignore[assignment]
72
write_ipc(df, stream, file_path, compression=compression)
73
df_read = read_ipc(stream, file_path, use_pyarrow=False)
74
75
assert_frame_equal(df, df_read, categorical_as_str=True)
76
77
78
@pytest.mark.parametrize("stream", [True, False])
79
@pytest.mark.write_disk
80
def test_select_columns_from_file(
81
df: pl.DataFrame, tmp_path: Path, stream: bool
82
) -> None:
83
tmp_path.mkdir(exist_ok=True)
84
file_path = tmp_path / "small.ipc"
85
write_ipc(df, stream, file_path)
86
df_read = read_ipc(stream, file_path, columns=["bools"])
87
88
assert df_read.columns == ["bools"]
89
90
91
@pytest.mark.parametrize("stream", [True, False])
92
def test_select_columns_from_buffer(stream: bool) -> None:
93
df = pl.DataFrame(
94
{
95
"a": [1],
96
"b": [2],
97
"c": [3],
98
},
99
schema={"a": pl.Int64(), "b": pl.Int128(), "c": pl.UInt8()},
100
)
101
102
f = io.BytesIO()
103
write_ipc(df, stream, f)
104
f.seek(0)
105
106
actual = read_ipc(stream, f, columns=["b", "c", "a"], use_pyarrow=False)
107
108
expected = pl.DataFrame(
109
{
110
"b": [2],
111
"c": [3],
112
"a": [1],
113
},
114
schema={"b": pl.Int128(), "c": pl.UInt8(), "a": pl.Int64()},
115
)
116
assert_frame_equal(expected, actual)
117
118
119
@pytest.mark.parametrize("stream", [True, False])
120
def test_select_columns_projection(stream: bool) -> None:
121
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
122
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})
123
124
f = io.BytesIO()
125
write_ipc(df, stream, f)
126
f.seek(0)
127
128
read_df = read_ipc(stream, f, columns=[1, 2], use_pyarrow=False)
129
assert_frame_equal(expected, read_df)
130
131
132
@pytest.mark.parametrize("compression", COMPRESSIONS)
133
@pytest.mark.parametrize("stream", [True, False])
134
def test_compressed_simple(compression: IpcCompression, stream: bool) -> None:
135
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
136
137
f = io.BytesIO()
138
write_ipc(df, stream, f, compression=compression)
139
f.seek(0)
140
141
df_read = read_ipc(stream, f, use_pyarrow=False)
142
assert_frame_equal(df_read, df)
143
144
145
@pytest.mark.parametrize("compression", COMPRESSIONS)
146
def test_ipc_schema(compression: IpcCompression) -> None:
147
schema = {
148
"i64": pl.Int64(),
149
"i128": pl.Int128(),
150
"u8": pl.UInt8(),
151
"f32": pl.Float32(),
152
"f64": pl.Float64(),
153
"str": pl.String(),
154
"bool": pl.Boolean(),
155
}
156
df = pl.DataFrame(
157
{
158
"i64": [1, 2],
159
"i128": [1, 2],
160
"u8": [1, 2],
161
"f32": [1, 2],
162
"f64": [1, 2],
163
"str": ["a", None],
164
"bool": [True, False],
165
},
166
schema=schema,
167
)
168
169
f = io.BytesIO()
170
df.write_ipc(f, compression=compression)
171
f.seek(0)
172
173
assert pl.read_ipc_schema(f) == schema
174
175
176
@pytest.mark.write_disk
177
@pytest.mark.parametrize("compression", COMPRESSIONS)
178
@pytest.mark.parametrize("path_as_string", [True, False])
179
def test_ipc_schema_from_file(
180
df_no_lists: pl.DataFrame,
181
compression: IpcCompression,
182
path_as_string: bool,
183
tmp_path: Path,
184
) -> None:
185
tmp_path.mkdir(exist_ok=True)
186
187
file_path = tmp_path / "small.ipc"
188
if path_as_string:
189
file_path = str(file_path) # type: ignore[assignment]
190
df_no_lists.write_ipc(file_path, compression=compression)
191
schema = pl.read_ipc_schema(file_path)
192
193
expected = {
194
"bools": pl.Boolean(),
195
"bools_nulls": pl.Boolean(),
196
"int": pl.Int64(),
197
"int_nulls": pl.Int64(),
198
"floats": pl.Float64(),
199
"floats_nulls": pl.Float64(),
200
"strings": pl.String(),
201
"strings_nulls": pl.String(),
202
"date": pl.Date(),
203
"datetime": pl.Datetime(),
204
"time": pl.Time(),
205
"cat": pl.Categorical(),
206
"enum": pl.Enum(["foo", "ham", "bar"]),
207
}
208
assert schema == expected
209
210
211
@pytest.mark.parametrize("stream", [True, False])
212
def test_ipc_column_order(stream: bool) -> None:
213
df = pl.DataFrame(
214
{
215
"cola": ["x", "y", "z"],
216
"colb": [1, 2, 3],
217
"colc": [4.5, 5.6, 6.7],
218
}
219
)
220
f = io.BytesIO()
221
write_ipc(df, stream, f)
222
f.seek(0)
223
224
columns = ["colc", "colb", "cola"]
225
# read file into polars; the specified column order is no longer respected
226
assert read_ipc(stream, f, columns=columns).columns == columns
227
228
229
@pytest.mark.write_disk
230
def test_glob_ipc(df: pl.DataFrame, tmp_path: Path) -> None:
231
file_path = tmp_path / "small.ipc"
232
df.write_ipc(file_path)
233
234
file_path_glob = tmp_path / "small*.ipc"
235
236
result_scan = pl.scan_ipc(file_path_glob).collect()
237
result_read = pl.read_ipc(file_path_glob, use_pyarrow=False)
238
239
for result in [result_scan, result_read]:
240
assert_frame_equal(result, df, categorical_as_str=True)
241
242
243
def test_from_float16() -> None:
244
# Create a feather file with a 16-bit floating point column
245
pandas_df = pd.DataFrame({"column": [1.0]}, dtype="float16")
246
f = io.BytesIO()
247
pandas_df.to_feather(f)
248
f.seek(0)
249
assert pl.read_ipc(f, use_pyarrow=False).dtypes == [pl.Float32]
250
251
252
@pytest.mark.write_disk
253
def test_binview_ipc_mmap(tmp_path: Path) -> None:
254
df = pl.DataFrame({"foo": ["aa" * 10, "bb", None, "small", "big" * 20]})
255
file_path = tmp_path / "dump.ipc"
256
df.write_ipc(file_path, compat_level=CompatLevel.newest())
257
read = pl.read_ipc(file_path, memory_map=True)
258
assert_frame_equal(df, read)
259
260
261
def test_list_nested_enum() -> None:
262
dtype = pl.List(pl.Enum(["a", "b", "c"]))
263
df = pl.DataFrame(pl.Series("list_cat", [["a", "b", "c", None]], dtype=dtype))
264
buffer = io.BytesIO()
265
df.write_ipc(buffer, compat_level=CompatLevel.newest())
266
buffer.seek(0)
267
df = pl.read_ipc(buffer)
268
assert df.get_column("list_cat").dtype == dtype
269
270
271
def test_struct_nested_enum() -> None:
272
dtype = pl.Struct({"enum": pl.Enum(["a", "b", "c"])})
273
df = pl.DataFrame(
274
pl.Series(
275
"struct_cat", [{"enum": "a"}, {"enum": "b"}, {"enum": None}], dtype=dtype
276
)
277
)
278
buffer = io.BytesIO()
279
df.write_ipc(buffer, compat_level=CompatLevel.newest())
280
buffer.seek(0)
281
df = pl.read_ipc(buffer)
282
assert df.get_column("struct_cat").dtype == dtype
283
284
285
@pytest.mark.slow
286
def test_ipc_view_gc_14448() -> None:
287
f = io.BytesIO()
288
# This size was required to trigger the bug
289
df = pl.DataFrame(
290
pl.Series(["small"] * 10 + ["looooooong string......."] * 750).slice(20, 20)
291
)
292
df.write_ipc(f, compat_level=CompatLevel.newest())
293
f.seek(0)
294
assert_frame_equal(pl.read_ipc(f), df)
295
296
297
@pytest.mark.slow
298
@pytest.mark.write_disk
299
@pytest.mark.parametrize("stream", [True, False])
300
def test_read_ipc_only_loads_selected_columns(
301
memory_usage_without_pyarrow: MemoryUsage,
302
tmp_path: Path,
303
stream: bool,
304
) -> None:
305
"""Only requested columns are loaded by ``read_ipc()``/``read_ipc_stream()``."""
306
tmp_path.mkdir(exist_ok=True)
307
308
# Each column will be about 16MB of RAM. There's a fixed overhead tied to
309
# block size so smaller file sizes can be misleading in terms of memory
310
# usage.
311
series = pl.arange(0, 2_000_000, dtype=pl.Int64, eager=True)
312
313
file_path = tmp_path / "multicolumn.ipc"
314
df = pl.DataFrame(
315
{
316
"a": series,
317
"b": series,
318
}
319
)
320
write_ipc(df, stream, file_path)
321
del df, series
322
323
memory_usage_without_pyarrow.reset_tracking()
324
325
# Only load one column:
326
kwargs = {}
327
if not stream:
328
kwargs["memory_map"] = False
329
df = read_ipc(stream, str(file_path), columns=["b"], rechunk=False, **kwargs)
330
del df
331
# Only one column's worth of memory should be used; 2 columns would be
332
# 32_000_000 at least, but there's some overhead.
333
# assert 16_000_000 < memory_usage_without_pyarrow.get_peak() < 23_000_000
334
335
336
@pytest.mark.write_disk
337
def test_ipc_decimal_15920(
338
tmp_path: Path,
339
) -> None:
340
tmp_path.mkdir(exist_ok=True)
341
342
base_df = pl.Series(
343
"x",
344
[
345
*[
346
Decimal(x)
347
for x in [
348
"10.1", "11.2", "12.3", "13.4", "14.5", "15.6", "16.7", "17.8", "18.9", "19.0",
349
"20.1", "21.2", "22.3", "23.4", "24.5", "25.6", "26.7", "27.8", "28.9", "29.0",
350
"30.1", "31.2", "32.3", "33.4", "34.5", "35.6", "36.7", "37.8", "38.9", "39.0"
351
]
352
],
353
*(50 * [None])
354
],
355
dtype=pl.Decimal(18, 2),
356
).to_frame() # fmt: skip
357
358
for df in [base_df, base_df.drop_nulls()]:
359
path = f"{tmp_path}/data.ipc"
360
df.write_ipc(path)
361
assert_frame_equal(pl.read_ipc(path), df)
362
363
364
def test_ipc_variadic_buffers_categorical_binview_18636() -> None:
365
df = pl.DataFrame(
366
{
367
"Test": pl.Series(["Value012"], dtype=pl.Categorical),
368
"Test2": pl.Series(["Value Two 20032"], dtype=pl.String),
369
}
370
)
371
372
b = io.BytesIO()
373
df.write_ipc(b)
374
b.seek(0)
375
assert_frame_equal(pl.read_ipc(b), df)
376
377
378
@pytest.mark.parametrize("size", [0, 1, 2, 13])
379
def test_ipc_chunked_roundtrip(size: int) -> None:
380
a = pl.Series("a", [{"x": 1}] * size, pl.Struct({"x": pl.Int8})).to_frame()
381
382
c = pl.concat([a] * 2, how="vertical")
383
384
f = io.BytesIO()
385
c.write_ipc(f)
386
387
f.seek(0)
388
assert_frame_equal(c, pl.read_ipc(f))
389
390
391
@pytest.mark.parametrize("size", [0, 1, 2, 13])
392
def test_zfs_ipc_roundtrip(size: int) -> None:
393
a = pl.Series("a", [{}] * size, pl.Struct([])).to_frame()
394
395
f = io.BytesIO()
396
a.write_ipc(f)
397
398
f.seek(0)
399
assert_frame_equal(a, pl.read_ipc(f))
400
401
402
@pytest.mark.parametrize("size", [0, 1, 2, 13])
403
def test_zfs_ipc_chunked_roundtrip(size: int) -> None:
404
a = pl.Series("a", [{}] * size, pl.Struct([])).to_frame()
405
406
c = pl.concat([a] * 2, how="vertical")
407
408
f = io.BytesIO()
409
c.write_ipc(f)
410
411
f.seek(0)
412
assert_frame_equal(c, pl.read_ipc(f))
413
414
415
@pytest.mark.parametrize("size", [0, 1, 2, 13])
416
@pytest.mark.parametrize("value", [{}, {"x": 1}])
417
@pytest.mark.write_disk
418
def test_memmap_ipc_chunked_structs(
419
size: int, value: dict[str, int], tmp_path: Path
420
) -> None:
421
a = pl.Series("a", [value] * size, pl.Struct).to_frame()
422
423
c = pl.concat([a] * 2, how="vertical")
424
425
f = tmp_path / "f.ipc"
426
c.write_ipc(f)
427
assert_frame_equal(c, pl.read_ipc(f))
428
429
430
def test_categorical_lexical_sort_2732() -> None:
431
df = pl.DataFrame(
432
{
433
"a": ["foo", "bar", "baz"],
434
"b": [1, 3, 2],
435
},
436
schema_overrides={"a": pl.Categorical("lexical")},
437
)
438
f = io.BytesIO()
439
df.write_ipc(f)
440
f.seek(0)
441
assert_frame_equal(df, pl.read_ipc(f))
442
443
444
def test_enum_scan_21564() -> None:
445
s = pl.Series("a", ["A"], pl.Enum(["A"]))
446
447
# DataFrame with a an enum field
448
f = io.BytesIO()
449
s.to_frame().write_ipc(f)
450
451
f.seek(0)
452
assert_series_equal(
453
pl.scan_ipc(f).collect().to_series(),
454
s,
455
)
456
457
458
@no_type_check
459
def test_roundtrip_empty_str_list_21163() -> None:
460
schema = {
461
"s": pl.Utf8,
462
"list": pl.List(pl.Utf8),
463
}
464
row1 = pl.DataFrame({"s": ["A"], "list": [[]]}, schema=schema)
465
row2 = pl.DataFrame({"s": ["B"], "list": [[]]}, schema=schema)
466
df = pl.concat([row1, row2])
467
bytes = df.serialize()
468
deserialized = pl.DataFrame.deserialize(io.BytesIO(bytes))
469
assert_frame_equal(df, deserialized)
470
471