from __future__ import annotations
import sys
import urllib.parse
import warnings
from collections import OrderedDict
from datetime import date, datetime
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any
import pyarrow.parquet as pq
import pytest
import polars as pl
from polars.exceptions import ComputeError, SchemaFieldNotFoundError
from polars.testing import assert_frame_equal, assert_series_equal
from tests.unit.io.conftest import format_file_uri
if TYPE_CHECKING:
from collections.abc import Callable
from tests.conftest import PlMonkeyPatch
def impl_test_hive_partitioned_predicate_pushdown(
io_files_path: Path,
tmp_path: Path,
plmonkeypatch: PlMonkeyPatch,
) -> None:
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
df = pl.read_ipc(io_files_path / "*.ipc")
root = tmp_path / "partitioned_data"
pq.write_to_dataset(
df.to_arrow(),
root_path=root,
partition_cols=["category", "fats_g"],
)
q = pl.scan_parquet(root / "**/*.parquet", hive_partitioning=False)
assert q.collect_schema().names() == ["calories", "sugars_g"]
assert q.collect().columns == ["calories", "sugars_g"]
q = pl.scan_parquet(root / "**/*.parquet", hive_partitioning=True)
assert q.collect_schema().names() == ["calories", "sugars_g", "category", "fats_g"]
sort_by = ["fats_g", "category", "calories", "sugars_g"]
df = df.select(["calories", "sugars_g", "category", "fats_g"])
for streaming in [True, False]:
for pred in [
pl.col("category") == "vegetables",
pl.col("category") != "vegetables",
pl.col("fats_g") > 0.5,
(pl.col("fats_g") == 0.5) & (pl.col("category") == "vegetables"),
]:
assert_frame_equal(
q.filter(pred)
.sort(sort_by)
.collect(engine="streaming" if streaming else "in-memory"),
df.filter(pred).sort(sort_by),
)
assert q.filter(pl.col("sugars_g") == 25).collect().shape == (1, 4)
assert q.filter(pl.col("fats_g") == 1225.0).select("category").collect().shape == (
0,
1,
)
@pytest.mark.xdist_group("streaming")
@pytest.mark.write_disk
def test_hive_partitioned_predicate_pushdown(
io_files_path: Path,
tmp_path: Path,
plmonkeypatch: PlMonkeyPatch,
) -> None:
impl_test_hive_partitioned_predicate_pushdown(
io_files_path,
tmp_path,
plmonkeypatch,
)
@pytest.mark.xdist_group("streaming")
@pytest.mark.write_disk
def test_hive_partitioned_predicate_pushdown_single_threaded_async_17155(
io_files_path: Path,
tmp_path: Path,
plmonkeypatch: PlMonkeyPatch,
) -> None:
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
plmonkeypatch.setenv("POLARS_PREFETCH_SIZE", "1")
impl_test_hive_partitioned_predicate_pushdown(
io_files_path,
tmp_path,
plmonkeypatch,
)
@pytest.mark.write_disk
@pytest.mark.may_fail_auto_streaming
@pytest.mark.may_fail_cloud
def test_hive_partitioned_predicate_pushdown_skips_correct_number_of_files(
tmp_path: Path, plmonkeypatch: PlMonkeyPatch, capfd: Any
) -> None:
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
df = pl.DataFrame({"d": pl.arange(0, 5, eager=True)}).with_columns(
a=pl.col("d") % 5
)
root = tmp_path / "test_int_partitions"
df.write_parquet(
root,
use_pyarrow=True,
pyarrow_options={"partition_cols": ["a"]},
)
q = pl.scan_parquet(root / "**/*.parquet", hive_partitioning=True)
assert q.filter(pl.col("a").is_in([1, 4])).collect().shape == (2, 2)
assert "allows skipping 3 / 5" in capfd.readouterr().err
q = q.filter(pl.col("a").gt(2))
result = q.join(q, on="a", how="left").collect(
optimizations=pl.QueryOptFlags(comm_subplan_elim=True)
)
expected = {
"a": [3, 4],
"d": [3, 4],
"d_right": [3, 4],
}
assert result.to_dict(as_series=False) == expected
@pytest.mark.write_disk
def test_hive_streaming_pushdown_is_in_22212(tmp_path: Path) -> None:
(
pl.DataFrame({"x": range(5)}).write_parquet(
tmp_path,
partition_by="x",
)
)
lf = pl.scan_parquet(tmp_path, hive_partitioning=True).filter(
pl.col("x").is_in([1, 4])
)
assert_frame_equal(
lf.collect(
engine="streaming", optimizations=pl.QueryOptFlags(predicate_pushdown=False)
),
lf.collect(
engine="streaming", optimizations=pl.QueryOptFlags(predicate_pushdown=True)
),
)
@pytest.mark.xdist_group("streaming")
@pytest.mark.write_disk
@pytest.mark.parametrize("streaming", [True, False])
def test_hive_partitioned_slice_pushdown(
io_files_path: Path, tmp_path: Path, streaming: bool
) -> None:
df = pl.read_ipc(io_files_path / "*.ipc")
root = tmp_path / "partitioned_data"
warnings.filterwarnings("ignore")
pq.write_to_dataset(
df.to_arrow(),
root_path=root,
partition_cols=["category", "fats_g"],
)
q = pl.scan_parquet(root / "**/*.parquet", hive_partitioning=True)
schema = q.collect_schema()
expect_count = pl.select(pl.lit(1, dtype=pl.UInt32).alias(x) for x in schema)
assert_frame_equal(
q.head(1)
.collect(engine="streaming" if streaming else "in-memory")
.select(pl.all().len()),
expect_count,
)
assert q.head(0).collect(
engine="streaming" if streaming else "in-memory"
).columns == [
"calories",
"sugars_g",
"category",
"fats_g",
]
@pytest.mark.xdist_group("streaming")
@pytest.mark.write_disk
def test_hive_partitioned_projection_pushdown(
io_files_path: Path, tmp_path: Path
) -> None:
df = pl.read_ipc(io_files_path / "*.ipc")
root = tmp_path / "partitioned_data"
warnings.filterwarnings("ignore")
pq.write_to_dataset(
df.to_arrow(),
root_path=root,
partition_cols=["category", "fats_g"],
)
q = pl.scan_parquet(root / "**/*.parquet", hive_partitioning=True)
columns = ["sugars_g", "category"]
for streaming in [True, False]:
assert (
q.select(columns)
.collect(engine="streaming" if streaming else "in-memory")
.columns
== columns
)
for parallel in ("row_groups", "columns"):
q = pl.scan_parquet(
root / "**/*.parquet",
hive_partitioning=True,
parallel=parallel,
)
expected = q.collect().select("category")
result = q.select("category").collect()
assert_frame_equal(result, expected)
@pytest.mark.write_disk
def test_hive_partitioned_projection_skips_files(tmp_path: Path) -> None:
df = pl.DataFrame(
{"sqlver": [10012.0, 10013.0], "namespace": ["eos", "fda"], "a": [1, 2]}
)
root = tmp_path / "partitioned_data"
for dir_tuple, sub_df in df.partition_by(
["sqlver", "namespace"], include_key=False, as_dict=True
).items():
new_path = root / f"sqlver={dir_tuple[0]}" / f"namespace={dir_tuple[1]}"
new_path.mkdir(parents=True, exist_ok=True)
sub_df.write_parquet(new_path / "file=8484.parquet")
test_df = (
pl.scan_parquet(str(root) + "/**/**/*.parquet", hive_partitioning=True)
.select("sqlver", "namespace", "a", pl.exclude("sqlver", "namespace", "a"))
.collect()
)
assert_frame_equal(df, test_df)
@pytest.fixture
def dataset_path(tmp_path: Path) -> Path:
tmp_path.mkdir(exist_ok=True)
root = tmp_path / "dataset"
part1 = root / "c=1"
part2 = root / "c=2"
root.mkdir()
part1.mkdir()
part2.mkdir()
df1 = pl.DataFrame({"a": [1, 2], "b": [11.0, 12.0]})
df2 = pl.DataFrame({"a": [3, 4], "b": [13.0, 14.0]})
df3 = pl.DataFrame({"a": [5, 6], "b": [15.0, 16.0]})
df1.write_parquet(part1 / "one.parquet")
df2.write_parquet(part1 / "two.parquet")
df3.write_parquet(part2 / "three.parquet")
return root
@pytest.mark.write_disk
def test_scan_parquet_hive_schema(dataset_path: Path) -> None:
result = pl.scan_parquet(dataset_path / "**/*.parquet", hive_partitioning=True)
assert result.collect_schema() == OrderedDict(
{"a": pl.Int64, "b": pl.Float64, "c": pl.Int64}
)
result = pl.scan_parquet(
dataset_path / "**/*.parquet",
hive_partitioning=True,
hive_schema={"c": pl.Int32},
)
expected_schema = OrderedDict({"a": pl.Int64, "b": pl.Float64, "c": pl.Int32})
assert result.collect_schema() == expected_schema
assert result.collect().schema == expected_schema
@pytest.mark.write_disk
def test_read_parquet_invalid_hive_schema(dataset_path: Path) -> None:
with pytest.raises(
SchemaFieldNotFoundError,
match='path contains column not present in the given Hive schema: "c"',
):
pl.read_parquet(
dataset_path / "**/*.parquet",
hive_partitioning=True,
hive_schema={"nonexistent": pl.Int32},
)
def test_read_parquet_hive_schema_with_pyarrow(tmp_path: Path) -> None:
with pytest.raises(
TypeError,
match="cannot use `hive_partitions` with `use_pyarrow=True`",
):
pl.read_parquet(
tmp_path / "test.parquet", hive_schema={"c": pl.Int32}, use_pyarrow=True
)
@pytest.mark.parametrize(
("scan_func", "write_func"),
[
(pl.scan_parquet, pl.DataFrame.write_parquet),
(pl.scan_ipc, pl.DataFrame.write_ipc),
],
)
@pytest.mark.parametrize(
"glob",
[True, False],
)
def test_hive_partition_directory_scan(
tmp_path: Path,
scan_func: Callable[..., pl.LazyFrame],
write_func: Callable[[pl.DataFrame, Path], None],
glob: bool,
) -> None:
tmp_path.mkdir(exist_ok=True)
dfs = [
pl.DataFrame({'x': 5 * [1], 'a': 1, 'b': 1}),
pl.DataFrame({'x': 5 * [2], 'a': 1, 'b': 2}),
pl.DataFrame({'x': 5 * [3], 'a': 22, 'b': 1}),
pl.DataFrame({'x': 5 * [4], 'a': 22, 'b': 2}),
]
for df in dfs:
a = df.item(0, "a")
b = df.item(0, "b")
path = tmp_path / f"a={a}/b={b}/data.bin"
path.parent.mkdir(exist_ok=True, parents=True)
write_func(df.drop("a", "b"), path)
df = pl.concat(dfs)
hive_schema = df.lazy().select("a", "b").collect_schema()
scan = scan_func
if scan_func is pl.scan_parquet:
scan = partial(scan, glob=glob)
scan_with_hive_schema = partial(scan_func, hive_schema=hive_schema)
out = scan_with_hive_schema(
tmp_path,
hive_partitioning=True,
).collect()
assert_frame_equal(out, df)
out = scan(tmp_path, hive_partitioning=False).collect()
assert_frame_equal(out, df.drop("a", "b"))
out = scan_with_hive_schema(
tmp_path / "a=1",
hive_partitioning=True,
).collect()
assert_frame_equal(out, df.filter(a=1).drop("a"))
out = scan(tmp_path / "a=1", hive_partitioning=False).collect()
assert_frame_equal(out, df.filter(a=1).drop("a", "b"))
path = tmp_path / "a=1/b=1/data.bin"
out = scan_with_hive_schema(path, hive_partitioning=True).collect()
assert_frame_equal(out, dfs[0])
out = scan(path, hive_partitioning=False).collect()
assert_frame_equal(out, dfs[0].drop("a", "b"))
out = scan_with_hive_schema(tmp_path).collect()
assert_frame_equal(out, df)
out = scan(tmp_path / "a=1/b=1/data.bin").collect()
assert out.columns == ["x"]
out = scan([tmp_path / "a=1/", tmp_path / "a=22/"]).collect()
assert out.columns == ["x"]
out = scan([tmp_path / "a=1/", tmp_path / "a=22/b=1/data.bin"]).collect()
assert out.columns == ["x"]
if glob:
out = scan(tmp_path / "a=1/**/*.bin").collect()
assert out.columns == ["x"]
out = scan_with_hive_schema(tmp_path, hive_partitioning=True).collect()
assert_frame_equal(out, df)
out = scan_with_hive_schema(
[tmp_path / "a=1", tmp_path / "a=22"], hive_partitioning=True
).collect()
assert_frame_equal(out, df.drop("a"))
with pytest.raises(
pl.exceptions.InvalidOperationError,
match="attempted to read from different directory levels with hive partitioning enabled:",
):
scan_with_hive_schema(
[tmp_path / "a=1", tmp_path / "a=22/b=1"], hive_partitioning=True
).collect()
if glob:
out = scan_with_hive_schema(
tmp_path / "**/*.bin", hive_partitioning=True
).collect()
assert_frame_equal(out, df)
out = scan_with_hive_schema(
[tmp_path / "a=1/**/*.bin", tmp_path / "a=22/**/*.bin"],
hive_partitioning=True,
).collect()
assert_frame_equal(out, df)
out = scan_with_hive_schema(
tmp_path / "a=1/b=1/data.bin", hive_partitioning=True
).collect()
assert_frame_equal(out, df.filter(a=1, b=1))
out = scan_with_hive_schema(
[tmp_path / "a=1/b=1/data.bin", tmp_path / "a=22/b=1/data.bin"],
hive_partitioning=True,
).collect()
assert_frame_equal(
out,
df.filter(
((pl.col("a") == 1) & (pl.col("b") == 1))
| ((pl.col("a") == 22) & (pl.col("b") == 1))
),
)
out = scan(tmp_path, hive_partitioning=False).collect()
assert_frame_equal(out, df.drop("a", "b"))
if glob:
out = scan(tmp_path / "**/*.bin", hive_partitioning=False).collect()
assert_frame_equal(out, df.drop("a", "b"))
out = scan(tmp_path / "a=1/b=1/data.bin", hive_partitioning=False).collect()
assert_frame_equal(out, df.filter(a=1, b=1).drop("a", "b"))
def test_hive_partition_schema_inference(tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)
dfs = [
pl.DataFrame({"x": 1}),
pl.DataFrame({"x": 2}),
pl.DataFrame({"x": 3}),
]
paths = [
tmp_path / "a=1/data.bin",
tmp_path / "a=1.5/data.bin",
tmp_path / "a=polars/data.bin",
]
expected = [
pl.Series("a", [1], dtype=pl.Int64),
pl.Series("a", [1.0, 1.5], dtype=pl.Float64),
pl.Series("a", ["1", "1.5", "polars"], dtype=pl.String),
]
for i in range(3):
paths[i].parent.mkdir(exist_ok=True, parents=True)
dfs[i].write_parquet(paths[i])
out = pl.scan_parquet(tmp_path).sort("x").collect()
assert_series_equal(out["a"], expected[i])
@pytest.mark.write_disk
def test_hive_partition_force_async_17155(
tmp_path: Path, plmonkeypatch: PlMonkeyPatch
) -> None:
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
plmonkeypatch.setenv("POLARS_PREFETCH_SIZE", "1")
dfs = [
pl.DataFrame({"x": 1}),
pl.DataFrame({"x": 2}),
pl.DataFrame({"x": 3}),
]
paths = [
tmp_path / "a=1/b=1/data.bin",
tmp_path / "a=2/b=2/data.bin",
tmp_path / "a=3/b=3/data.bin",
]
for i in range(3):
paths[i].parent.mkdir(exist_ok=True, parents=True)
dfs[i].write_parquet(paths[i])
lf = pl.scan_parquet(tmp_path)
assert_frame_equal(
lf.collect(), pl.DataFrame({k: [1, 2, 3] for k in ["x", "a", "b"]})
)
@pytest.mark.parametrize(
("scan_func", "write_func"),
[
(partial(pl.scan_parquet, parallel="row_groups"), pl.DataFrame.write_parquet),
(partial(pl.scan_parquet, parallel="columns"), pl.DataFrame.write_parquet),
(partial(pl.scan_parquet, parallel="prefiltered"), pl.DataFrame.write_parquet),
(
lambda *a, **kw: pl.scan_parquet(*a, parallel="prefiltered", **kw).filter(
pl.col("b") == pl.col("b")
),
pl.DataFrame.write_parquet,
),
(pl.scan_ipc, pl.DataFrame.write_ipc),
],
)
@pytest.mark.write_disk
@pytest.mark.slow
@pytest.mark.parametrize("projection_pushdown", [True, False])
def test_hive_partition_columns_contained_in_file(
tmp_path: Path,
scan_func: Callable[[Any], pl.LazyFrame],
write_func: Callable[[pl.DataFrame, Path], None],
projection_pushdown: bool,
) -> None:
path = tmp_path / "a=1/b=2/data.bin"
path.parent.mkdir(exist_ok=True, parents=True)
df = pl.DataFrame(
{"x": 1, "a": 1, "b": 2, "y": 1},
schema={"x": pl.Int32, "a": pl.Int8, "b": pl.Int16, "y": pl.Int32},
)
write_func(df, path)
def assert_with_projections(
lf: pl.LazyFrame, df: pl.DataFrame, *, row_index: str | None = None
) -> None:
row_index: list[str] = [row_index] if row_index is not None else []
from itertools import permutations
cols = ["a", "b", "x", "y", *row_index]
for projection in (
x for i in range(len(cols)) for x in permutations(cols[: 1 + i])
):
assert_frame_equal(
lf.select(projection).collect(
optimizations=pl.QueryOptFlags(
projection_pushdown=projection_pushdown
)
),
df.select(projection),
)
lf = scan_func(path, hive_partitioning=True)
rhs = df
assert_frame_equal(
lf.collect(
optimizations=pl.QueryOptFlags(projection_pushdown=projection_pushdown)
),
rhs,
)
assert_with_projections(lf, rhs)
lf = scan_func(
path,
hive_schema={"a": pl.String, "b": pl.String},
hive_partitioning=True,
)
rhs = df.with_columns(pl.col("a", "b").cast(pl.String))
assert_frame_equal(
lf.collect(
optimizations=pl.QueryOptFlags(projection_pushdown=projection_pushdown)
),
rhs,
)
assert_with_projections(lf, rhs)
partial_path = tmp_path / "a=1/b=2/partial_data.bin"
df = pl.DataFrame(
{"x": 1, "b": 2, "y": 1},
schema={"x": pl.Int32, "b": pl.Int16, "y": pl.Int32},
)
write_func(df, partial_path)
rhs = rhs.select(
pl.col("x").cast(pl.Int32),
pl.col("b").cast(pl.Int16),
pl.col("y").cast(pl.Int32),
pl.col("a").cast(pl.Int64),
)
lf = scan_func(partial_path, hive_partitioning=True)
assert_frame_equal(
lf.collect(
optimizations=pl.QueryOptFlags(projection_pushdown=projection_pushdown)
),
rhs,
)
assert_with_projections(lf, rhs)
assert_frame_equal(
lf.with_row_index().collect(
optimizations=pl.QueryOptFlags(projection_pushdown=projection_pushdown)
),
rhs.with_row_index(),
)
assert_with_projections(
lf.with_row_index(), rhs.with_row_index(), row_index="index"
)
assert_frame_equal(
lf.with_row_index()
.select(pl.exclude("index"), "index")
.collect(
optimizations=pl.QueryOptFlags(projection_pushdown=projection_pushdown)
),
rhs.with_row_index().select(pl.exclude("index"), "index"),
)
assert_with_projections(
lf.with_row_index().select(pl.exclude("index"), "index"),
rhs.with_row_index().select(pl.exclude("index"), "index"),
row_index="index",
)
lf = scan_func(
partial_path,
hive_schema={"a": pl.String, "b": pl.String},
hive_partitioning=True,
)
rhs = rhs.select(
pl.col("x").cast(pl.Int32),
pl.col("b").cast(pl.String),
pl.col("y").cast(pl.Int32),
pl.col("a").cast(pl.String),
)
assert_frame_equal(
lf.collect(
optimizations=pl.QueryOptFlags(projection_pushdown=projection_pushdown)
),
rhs,
)
assert_with_projections(lf, rhs)
@pytest.mark.write_disk
def test_hive_partition_dates(tmp_path: Path) -> None:
df = pl.DataFrame(
{
"date1": [
datetime(2024, 1, 1),
datetime(2024, 2, 1),
datetime(2024, 3, 1),
None,
],
"date2": [
datetime(2023, 1, 1),
datetime(2023, 2, 1),
None,
datetime(2023, 3, 1),
],
"x": [1, 2, 3, 4],
},
schema={"date1": pl.Date, "date2": pl.Datetime, "x": pl.Int32},
)
root = tmp_path / "pyarrow"
pq.write_to_dataset(
df.to_arrow(),
root_path=root,
partition_cols=["date1", "date2"],
)
lf = pl.scan_parquet(
root, hive_schema=df.clear().select("date1", "date2").collect_schema()
)
assert_frame_equal(lf.collect(), df.select("x", "date1", "date2"))
lf = pl.scan_parquet(root)
assert_frame_equal(lf.collect(), df.select("x", "date1", "date2"))
lf = pl.scan_parquet(root, try_parse_hive_dates=False)
assert_frame_equal(
lf.collect(),
df.select("x", "date1", "date2").with_columns(
pl.col("date1", "date2").cast(pl.String)
),
)
for perc_escape in [True, False] if sys.platform != "win32" else [True]:
root = tmp_path / f"includes_hive_cols_in_file_{perc_escape}"
for (date1, date2), part_df in df.group_by(
pl.col("date1").cast(pl.String).fill_null("__HIVE_DEFAULT_PARTITION__"),
pl.col("date2").cast(pl.String).fill_null("__HIVE_DEFAULT_PARTITION__"),
):
if perc_escape:
date2 = urllib.parse.quote(date2)
path = root / f"date1={date1}/date2={date2}/data.bin"
path.parent.mkdir(exist_ok=True, parents=True)
part_df.write_parquet(path)
lf = pl.scan_parquet(root)
assert_frame_equal(lf.collect(), df)
lf = pl.scan_parquet(root, try_parse_hive_dates=False)
assert_frame_equal(
lf.collect(),
df.with_columns(pl.col("date1", "date2").cast(pl.String)),
)
@pytest.mark.write_disk
def test_hive_partition_filter_null_23005(tmp_path: Path) -> None:
root = tmp_path
df = pl.DataFrame(
{
"date1": [
datetime(2024, 1, 1),
datetime(2024, 2, 1),
datetime(2024, 3, 1),
None,
],
"date2": [
datetime(2023, 1, 1),
datetime(2023, 2, 1),
None,
datetime(2023, 3, 1),
],
"x": [1, 2, 3, 4],
},
schema={"date1": pl.Date, "date2": pl.Datetime, "x": pl.Int32},
)
df.write_parquet(root, partition_by=["date1", "date2"])
q = pl.scan_parquet(root, include_file_paths="path")
full = q.collect()
assert (
full.select(
(
pl.any_horizontal(pl.col("date1", "date2").is_null())
& pl.col("path").str.contains("__HIVE_DEFAULT_PARTITION__")
).sum()
).item()
== 2
)
lf = pl.scan_parquet(root).filter(pl.col("date1") == datetime(2024, 1, 1))
assert_frame_equal(lf.collect(), df.head(1))
@pytest.mark.parametrize(
("scan_func", "write_func"),
[
(pl.scan_parquet, pl.DataFrame.write_parquet),
(pl.scan_ipc, pl.DataFrame.write_ipc),
],
)
@pytest.mark.write_disk
def test_projection_only_hive_parts_gives_correct_number_of_rows(
tmp_path: Path,
scan_func: Callable[[Any], pl.LazyFrame],
write_func: Callable[[pl.DataFrame, Path], None],
) -> None:
path = tmp_path / "a=3/data.bin"
path.parent.mkdir(exist_ok=True, parents=True)
write_func(pl.DataFrame({"x": [1, 1, 1]}), path)
assert_frame_equal(
scan_func(path, hive_partitioning=True).select("a").collect(),
pl.DataFrame({"a": [3, 3, 3]}),
)
@pytest.mark.parametrize(
"df",
[
pl.select(
pl.Series("a", [1, 2, 3, 4], dtype=pl.Int8),
pl.Series("b", [1, 2, 3, 4], dtype=pl.Int8),
pl.Series("x", [1, 2, 3, 4]),
),
pl.select(
pl.Series(
"a",
[1.2981275, 2.385974035, 3.1231892749185718397510, 4.129387128949156],
dtype=pl.Float64,
),
pl.Series("b", ["a", "b", " / c = : ", "d"]),
pl.Series("x", [1, 2, 3, 4]),
),
],
)
@pytest.mark.write_disk
def test_hive_write(tmp_path: Path, df: pl.DataFrame) -> None:
root = tmp_path
df.write_parquet(root, partition_by=["a", "b"])
lf = pl.scan_parquet(root)
assert_frame_equal(lf.collect(), df)
lf = pl.scan_parquet(root, hive_schema={"a": pl.String, "b": pl.String})
assert_frame_equal(lf.collect(), df.with_columns(pl.col("a", "b").cast(pl.String)))
@pytest.mark.slow
@pytest.mark.write_disk
def test_hive_write_multiple_files(tmp_path: Path) -> None:
chunk_size = 1
n_rows = 500_000
df = pl.select(a=pl.repeat(0, n_rows), b=pl.int_range(0, n_rows))
root = tmp_path
df.write_parquet(root, partition_by="a", partition_chunk_size_bytes=chunk_size)
n_out = sum(1 for _ in (root / "a=0").iterdir())
assert n_out == 5
assert_frame_equal(pl.scan_parquet(root).collect(), df)
@pytest.mark.write_disk
def test_hive_write_dates(tmp_path: Path) -> None:
df = pl.DataFrame(
{
"date1": [
datetime(2024, 1, 1),
datetime(2024, 2, 1),
datetime(2024, 3, 1),
None,
],
"date2": [
datetime(2023, 1, 1),
datetime(2023, 2, 1),
None,
datetime(2023, 3, 1, 1, 1, 1, 1),
],
"x": [1, 2, 3, 4],
},
schema={"date1": pl.Date, "date2": pl.Datetime, "x": pl.Int32},
)
root = tmp_path
df.write_parquet(root, partition_by=["date1", "date2"])
lf = pl.scan_parquet(root)
assert_frame_equal(lf.collect(), df)
lf = pl.scan_parquet(root, try_parse_hive_dates=False)
assert_frame_equal(
lf.collect(),
df.with_columns(pl.col("date1", "date2").cast(pl.String)),
)
@pytest.mark.write_disk
@pytest.mark.may_fail_auto_streaming
@pytest.mark.may_fail_cloud
def test_hive_predicate_dates_14712(
tmp_path: Path, plmonkeypatch: PlMonkeyPatch, capfd: Any
) -> None:
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
pl.DataFrame({"a": [datetime(2024, 1, 1)]}).write_parquet(
tmp_path, partition_by="a"
)
pl.scan_parquet(tmp_path).filter(pl.col("a") != datetime(2024, 1, 1)).collect()
assert "allows skipping 1 / 1" in capfd.readouterr().err
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows paths")
@pytest.mark.write_disk
@pytest.mark.parametrize("prefix", ["", "file:/", "file:///"])
def test_hive_windows_splits_on_forward_slashes(tmp_path: Path, prefix: str) -> None:
tmp_path = tmp_path.resolve()
d = str(tmp_path)[:2]
assert d[0].isalpha()
assert d[1] == ":"
path = f"{tmp_path}/a=1/b=1/c=1/d=1/e=1"
Path(path).mkdir(exist_ok=True, parents=True)
df = pl.DataFrame({"x": "x"})
df.write_parquet(f"{path}/data.parquet")
expect = pl.DataFrame(
[
s.new_from_index(0, 5)
for s in pl.DataFrame(
{
"x": "x",
"a": 1,
"b": 1,
"c": 1,
"d": 1,
"e": 1,
}
)
]
)
assert_frame_equal(
pl.scan_parquet(
[
f"{prefix}{tmp_path}/a=1/b=1/c=1/d=1/e=1/data.parquet",
f"{prefix}{tmp_path}\\a=1\\b=1\\c=1\\d=1\\e=1\\data.parquet",
f"{prefix}{tmp_path}\\a=1/b=1/c=1/d=1/**/*",
f"{prefix}{tmp_path}/a=1/b=1\\c=1/d=1/**/*",
f"{prefix}{tmp_path}/a=1/b=1/c=1/d=1\\e=1/*",
],
hive_partitioning=True,
).collect(),
expect,
)
q = pl.scan_parquet("file://C:/")
with pytest.raises(
ComputeError, match="unsupported: non-empty hostname for 'file:' URI: 'C:'"
):
q.collect()
@pytest.mark.write_disk
def test_passing_hive_schema_with_hive_partitioning_disabled_raises(
tmp_path: Path,
) -> None:
with pytest.raises(
ComputeError,
match="a hive schema was given but hive_partitioning was disabled",
):
pl.scan_parquet(
tmp_path,
schema={"x": pl.Int64},
hive_schema={"h": pl.String},
hive_partitioning=False,
).collect()
@pytest.mark.write_disk
def test_hive_auto_enables_when_unspecified_and_hive_schema_passed(
tmp_path: Path,
) -> None:
tmp_path.mkdir(exist_ok=True)
(tmp_path / "a=1").mkdir(exist_ok=True)
pl.DataFrame({"x": 1}).write_parquet(tmp_path / "a=1/1")
for path in [tmp_path / "a=1/1", tmp_path / "**/*"]:
lf = pl.scan_parquet(path, hive_schema={"a": pl.UInt8})
assert_frame_equal(
lf.collect(),
pl.select(
pl.Series("x", [1]),
pl.Series("a", [1], dtype=pl.UInt8),
),
)
@pytest.mark.write_disk
def test_hive_file_as_uri_with_hive_start_idx_23830(
tmp_path: Path,
) -> None:
tmp_path.mkdir(exist_ok=True)
(tmp_path / "a=1").mkdir(exist_ok=True)
pl.DataFrame({"x": 1}).write_parquet(tmp_path / "a=1/1")
uri = tmp_path.resolve().as_posix().rstrip("/") + "/"
lf = pl.scan_parquet(format_file_uri(uri), hive_schema={"a": pl.UInt8})
assert_frame_equal(
lf.collect(),
pl.select(
pl.Series("x", [1]),
pl.Series("a", [1], dtype=pl.UInt8),
),
)
lf = pl.scan_parquet(f"file:{uri}", hive_schema={"a": pl.UInt8})
assert_frame_equal(
lf.collect(),
pl.select(
pl.Series("x", [1]),
pl.Series("a", [1], dtype=pl.UInt8),
),
)
@pytest.mark.write_disk
@pytest.mark.parametrize("force_single_thread", [True, False])
def test_hive_parquet_prefiltered_20894_21327(
tmp_path: Path, force_single_thread: bool
) -> None:
n_threads = 1 if force_single_thread else pl.thread_pool_size()
file_path = tmp_path / "date=2025-01-01/00000000.parquet"
file_path.parent.mkdir(exist_ok=True, parents=True)
data = pl.DataFrame(
{
"date": [date(2025, 1, 1), date(2025, 1, 1)],
"value": ["1", "2"],
}
)
data.write_parquet(file_path)
import base64
import subprocess
scan_path_b64 = base64.b64encode(str(file_path.absolute()).encode()).decode()
out = subprocess.check_output(
[
sys.executable,
"-c",
f"""\
import os
os.environ["POLARS_MAX_THREADS"] = "{n_threads}"
import polars as pl
import datetime
import base64
from polars.testing import assert_frame_equal
assert pl.thread_pool_size() == {n_threads}
tmp_path = base64.b64decode("{scan_path_b64}").decode()
df = pl.scan_parquet(tmp_path, hive_partitioning=True).filter(pl.col("value") == "1").collect()
# We need the str() to trigger panic on invalid state
str(df)
assert_frame_equal(df, pl.DataFrame(
[
pl.Series('date', [datetime.date(2025, 1, 1)], dtype=pl.Date),
pl.Series('value', ['1'], dtype=pl.String),
]
))
print("OK", end="")
""",
],
)
assert out == b"OK"
def test_hive_decode_reserved_ascii_23241(tmp_path: Path) -> None:
partitioned_tbl_uri = (tmp_path / "partitioned_data").resolve()
start, stop = 32, 127
df = pl.DataFrame(
{
"a": list(range(start, stop)),
"strings": [chr(i) for i in range(start, stop)],
}
)
df.write_delta(partitioned_tbl_uri, delta_write_options={"partition_by": "strings"})
out = pl.read_delta(str(partitioned_tbl_uri)).sort("a").select(pl.col("strings"))
assert_frame_equal(df.sort(by=pl.col("a")).select(pl.col("strings")), out)
def test_hive_decode_utf8_23241(tmp_path: Path) -> None:
df = pl.DataFrame(
{
"strings": [
"Türkiye And Egpyt",
"résumé père forêt Noël",
"😊",
"北极熊",
],
"a": [10, 20, 30, 40],
}
)
partitioned_tbl_uri = (tmp_path / "partitioned_data").resolve()
df.write_delta(partitioned_tbl_uri, delta_write_options={"partition_by": "strings"})
out = pl.read_delta(str(partitioned_tbl_uri)).sort("a").select(pl.col("strings"))
assert_frame_equal(df.sort(by=pl.col("a")).select(pl.col("strings")), out)
@pytest.mark.write_disk
def test_hive_filter_lit_true_24235(tmp_path: Path) -> None:
df = pl.DataFrame({"p": [1, 2, 3, 4, 5], "x": [1, 1, 2, 2, 3]})
df.lazy().sink_parquet(pl.PartitionBy(tmp_path, key="p"), mkdir=True)
assert_frame_equal(
pl.scan_parquet(tmp_path).filter(True).collect(),
df,
)
assert_frame_equal(
pl.scan_parquet(tmp_path).filter(pl.lit(True)).collect(),
df,
)
assert_frame_equal(
pl.scan_parquet(tmp_path).filter(False).collect(),
df.clear(),
)
assert_frame_equal(
pl.scan_parquet(tmp_path).filter(pl.lit(False)).collect(),
df.clear(),
)
def test_hive_filter_in_ir(
tmp_path: Path, plmonkeypatch: PlMonkeyPatch, capfd: pytest.CaptureFixture[str]
) -> None:
(tmp_path / "a=1").mkdir()
pl.DataFrame({"x": [0, 1, 2, 3, 4]}).write_parquet(tmp_path / "a=1/data.parquet")
(tmp_path / "a=2").mkdir()
pl.DataFrame({"x": [5, 6, 7, 8, 9]}).write_parquet(tmp_path / "a=2/data.parquet")
with plmonkeypatch.context() as cx:
cx.setenv("POLARS_VERBOSE", "1")
capfd.readouterr()
assert_frame_equal(
pl.scan_parquet(tmp_path).filter(pl.col("a") == 1).collect(),
pl.DataFrame({"x": [0, 1, 2, 3, 4], "a": [1, 1, 1, 1, 1]}),
)
capture = capfd.readouterr().err
assert (
capture.count(
"initialize_scan_predicate: Predicate pushdown allows skipping 1 / 2 files"
)
== 1
)
plan = pl.scan_parquet(tmp_path).filter(pl.col("a") < 0).explain()
assert plan.startswith("Parquet SCAN []")
assert_frame_equal(
pl.scan_parquet(tmp_path).with_row_index().filter(pl.col("a") == 2).collect(),
pl.DataFrame(
{"index": [5, 6, 7, 8, 9], "x": [5, 6, 7, 8, 9], "a": [2, 2, 2, 2, 2]},
schema_overrides={"index": pl.get_index_type()},
),
)
assert_frame_equal(
pl.scan_parquet(tmp_path).tail(1).filter(pl.col("a") == 1).collect(),
pl.DataFrame(schema={"x": pl.Int64, "a": pl.Int64}),
)