Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/docs/source/src/python/user-guide/io/hive.py
7890 views
1
# --8<-- [start:init_paths]
2
import polars as pl
3
from pathlib import Path
4
5
dfs = [
6
pl.DataFrame({"x": [1, 2]}),
7
pl.DataFrame({"x": [3, 4, 5]}),
8
pl.DataFrame({"x": [6, 7]}),
9
pl.DataFrame({"x": [8, 9, 10, 11]}),
10
]
11
12
parts = [
13
"year=2023/month=11",
14
"year=2023/month=12",
15
"year=2024/month=01",
16
"year=2024/month=02",
17
]
18
19
for df, part in zip(dfs, parts):
20
path = Path("docs/assets/data/hive/") / part / "data.parquet"
21
Path(path).parent.mkdir(exist_ok=True, parents=True)
22
df.write_parquet(path)
23
24
path = Path("docs/assets/data/hive_mixed/") / part / "data.parquet"
25
Path(path).parent.mkdir(exist_ok=True, parents=True)
26
df.write_parquet(path)
27
28
# Make sure the file is not empty because path expansion ignores empty files.
29
Path("docs/assets/data/hive_mixed/description.txt").write_text("A")
30
31
32
def print_paths(path: str) -> None:
33
def dir_recurse(path: Path):
34
if path.is_dir():
35
for p in path.iterdir():
36
yield from dir_recurse(p)
37
else:
38
yield path
39
40
df = (
41
pl.Series(
42
"File path",
43
(str(x) for x in dir_recurse(Path(path))),
44
dtype=pl.String,
45
)
46
.sort()
47
.to_frame()
48
)
49
50
with pl.Config(
51
tbl_hide_column_data_types=True,
52
tbl_hide_dataframe_shape=True,
53
fmt_str_lengths=999,
54
):
55
print(df)
56
57
58
print_paths("docs/assets/data/hive/")
59
# --8<-- [end:init_paths]
60
61
# --8<-- [start:show_mixed_paths]
62
print_paths("docs/assets/data/hive_mixed/")
63
# --8<-- [end:show_mixed_paths]
64
65
# --8<-- [start:scan_dir]
66
import polars as pl
67
68
df = pl.scan_parquet("docs/assets/data/hive/").collect()
69
70
with pl.Config(tbl_rows=99):
71
print(df)
72
# --8<-- [end:scan_dir]
73
74
# --8<-- [start:scan_dir_err]
75
from pathlib import Path
76
77
try:
78
pl.scan_parquet("docs/assets/data/hive_mixed/").collect()
79
except Exception as e:
80
print(e)
81
82
# --8<-- [end:scan_dir_err]
83
84
# --8<-- [start:scan_glob]
85
df = pl.scan_parquet(
86
# Glob to match all files ending in `.parquet`
87
"docs/assets/data/hive_mixed/**/*.parquet",
88
hive_partitioning=True,
89
).collect()
90
91
with pl.Config(tbl_rows=99):
92
print(df)
93
94
# --8<-- [end:scan_glob]
95
96
# --8<-- [start:scan_file_no_hive]
97
df = pl.scan_parquet(
98
[
99
"docs/assets/data/hive/year=2024/month=01/data.parquet",
100
"docs/assets/data/hive/year=2024/month=02/data.parquet",
101
],
102
).collect()
103
104
print(df)
105
106
# --8<-- [end:scan_file_no_hive]
107
108
# --8<-- [start:scan_file_hive]
109
df = pl.scan_parquet(
110
[
111
"docs/assets/data/hive/year=2024/month=01/data.parquet",
112
"docs/assets/data/hive/year=2024/month=02/data.parquet",
113
],
114
hive_partitioning=True,
115
).collect()
116
117
print(df)
118
119
# --8<-- [end:scan_file_hive]
120
121
# --8<-- [start:write_parquet_partitioned_show_data]
122
df = pl.DataFrame({"a": [1, 1, 2, 2, 3], "b": [1, 1, 1, 2, 2], "c": 1})
123
print(df)
124
# --8<-- [end:write_parquet_partitioned_show_data]
125
126
# --8<-- [start:write_parquet_partitioned]
127
df.write_parquet("docs/assets/data/hive_write/", partition_by=["a", "b"])
128
# --8<-- [end:write_parquet_partitioned]
129
130
# --8<-- [start:write_parquet_partitioned_show_paths]
131
print_paths("docs/assets/data/hive_write/")
132
# --8<-- [end:write_parquet_partitioned_show_paths]
133
134