Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_avro.py
6939 views
1
from __future__ import annotations
2
3
import io
4
from typing import TYPE_CHECKING
5
6
import pytest
7
8
import polars as pl
9
from polars.testing import assert_frame_equal
10
11
if TYPE_CHECKING:
12
from pathlib import Path
13
14
from polars._typing import AvroCompression
15
16
17
COMPRESSIONS = ["uncompressed", "snappy", "deflate"]
18
19
20
@pytest.fixture
21
def example_df() -> pl.DataFrame:
22
return pl.DataFrame({"i64": [1, 2], "f64": [0.1, 0.2], "str": ["a", "b"]})
23
24
25
@pytest.mark.parametrize("compression", COMPRESSIONS)
26
def test_from_to_buffer(example_df: pl.DataFrame, compression: AvroCompression) -> None:
27
buf = io.BytesIO()
28
example_df.write_avro(buf, compression=compression)
29
buf.seek(0)
30
31
read_df = pl.read_avro(buf)
32
assert_frame_equal(example_df, read_df)
33
34
35
@pytest.mark.write_disk
36
@pytest.mark.parametrize("compression", COMPRESSIONS)
37
def test_from_to_file(
38
example_df: pl.DataFrame, compression: AvroCompression, tmp_path: Path
39
) -> None:
40
tmp_path.mkdir(exist_ok=True)
41
42
file_path = tmp_path / "small.avro"
43
example_df.write_avro(file_path, compression=compression)
44
df_read = pl.read_avro(file_path)
45
46
assert_frame_equal(example_df, df_read)
47
48
49
def test_select_columns() -> None:
50
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
51
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})
52
53
f = io.BytesIO()
54
df.write_avro(f)
55
f.seek(0)
56
57
read_df = pl.read_avro(f, columns=["b", "c"])
58
assert_frame_equal(expected, read_df)
59
60
61
def test_select_projection() -> None:
62
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
63
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})
64
65
f = io.BytesIO()
66
df.write_avro(f)
67
f.seek(0)
68
69
read_df = pl.read_avro(f, columns=[1, 2])
70
assert_frame_equal(expected, read_df)
71
72
73
def test_with_name() -> None:
74
df = pl.DataFrame({"a": [1]})
75
expected = pl.DataFrame(
76
{
77
"type": ["record"],
78
"name": ["my_schema_name"],
79
"fields": [[{"name": "a", "type": ["null", "long"]}]],
80
}
81
)
82
83
f = io.BytesIO()
84
df.write_avro(f, name="my_schema_name")
85
86
f.seek(0)
87
raw = f.read()
88
89
read_df = pl.read_json(raw[raw.find(b"{") : raw.rfind(b"}") + 1])
90
91
assert_frame_equal(expected, read_df)
92
93