Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_plugins.py
6939 views
1
from __future__ import annotations
2
3
from typing import TYPE_CHECKING
4
5
import polars as pl
6
from polars.io.plugins import register_io_source
7
from polars.testing import assert_frame_equal
8
9
if TYPE_CHECKING:
10
from collections.abc import Iterator
11
12
13
# A simple python source. But this can dispatch into a rust IO source as well.
14
def my_source(
15
with_columns: list[str] | None,
16
predicate: pl.Expr | None,
17
_n_rows: int | None,
18
_batch_size: int | None,
19
) -> Iterator[pl.DataFrame]:
20
for i in [1, 2, 3]:
21
df = pl.DataFrame({"a": [i], "b": [i]})
22
23
if predicate is not None:
24
df = df.filter(predicate)
25
26
if with_columns is not None:
27
df = df.select(with_columns)
28
29
yield df
30
31
32
def scan_my_source() -> pl.LazyFrame:
33
# schema inference logic
34
# TODO: make lazy via callable
35
schema = pl.Schema({"a": pl.Int64(), "b": pl.Int64()})
36
37
return register_io_source(my_source, schema=schema)
38
39
40
def test_my_source() -> None:
41
assert_frame_equal(
42
scan_my_source().collect(), pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
43
)
44
assert_frame_equal(
45
scan_my_source().filter(pl.col("b") > 1).collect(),
46
pl.DataFrame({"a": [2, 3], "b": [2, 3]}),
47
)
48
assert_frame_equal(
49
scan_my_source().filter(pl.col("b") > 1).select("a").collect(),
50
pl.DataFrame({"a": [2, 3]}),
51
)
52
assert_frame_equal(
53
scan_my_source().select("a").collect(), pl.DataFrame({"a": [1, 2, 3]})
54
)
55
56