Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/docs/source/src/python/user-guide/io/cloud-storage.py
7890 views
1
"""
2
# --8<-- [start:read_parquet]
3
import polars as pl
4
5
source = "s3://bucket/*.parquet"
6
7
df = pl.read_parquet(source)
8
# --8<-- [end:read_parquet]
9
10
# --8<-- [start:scan_parquet_query]
11
import polars as pl
12
13
source = "s3://bucket/*.parquet"
14
15
df = pl.scan_parquet(source).filter(pl.col("id") < 100).select("id","value").collect()
16
# --8<-- [end:scan_parquet_query]
17
18
19
# --8<-- [start:scan_parquet_storage_options_aws]
20
import polars as pl
21
22
source = "s3://bucket/*.parquet"
23
24
storage_options = {
25
"aws_access_key_id": "<secret>",
26
"aws_secret_access_key": "<secret>",
27
"aws_region": "us-east-1",
28
}
29
df = pl.scan_parquet(source, storage_options=storage_options).collect()
30
# --8<-- [end:scan_parquet_storage_options_aws]
31
32
# --8<-- [start:credential_provider_class]
33
lf = pl.scan_parquet(
34
"s3://.../...",
35
credential_provider=pl.CredentialProviderAWS(
36
profile_name="...",
37
assume_role={
38
"RoleArn": f"...",
39
"RoleSessionName": "...",
40
}
41
),
42
)
43
44
df = lf.collect()
45
# --8<-- [end:credential_provider_class]
46
47
# --8<-- [start:credential_provider_class_global_default]
48
pl.Config.set_default_credential_provider(
49
pl.CredentialProviderAWS(
50
profile_name="...",
51
assume_role={
52
"RoleArn": f"...",
53
"RoleSessionName": "...",
54
},
55
)
56
)
57
# --8<-- [end:credential_provider_class_global_default]
58
59
# --8<-- [start:credential_provider_custom_func]
60
def get_credentials() -> pl.CredentialProviderFunctionReturn:
61
expiry = None
62
63
return {
64
"aws_access_key_id": "...",
65
"aws_secret_access_key": "...",
66
"aws_session_token": "...",
67
}, expiry
68
69
70
lf = pl.scan_parquet(
71
"s3://.../...",
72
credential_provider=get_credentials,
73
)
74
75
df = lf.collect()
76
# --8<-- [end:credential_provider_custom_func]
77
78
# --8<-- [start:credential_provider_custom_func_azure]
79
def credential_provider():
80
credential = DefaultAzureCredential(exclude_managed_identity_credential=True)
81
token = credential.get_token("https://storage.azure.com/.default")
82
83
return {"bearer_token": token.token}, token.expires_on
84
85
86
pl.scan_parquet(
87
"abfss://...@.../...",
88
credential_provider=credential_provider,
89
)
90
91
# Note that for the above case, this shortcut is also available:
92
93
pl.scan_parquet(
94
"abfss://...@.../...",
95
credential_provider=pl.CredentialProviderAzure(
96
credential=DefaultAzureCredential(exclude_managed_identity_credential=True)
97
),
98
)
99
100
# --8<-- [end:credential_provider_custom_func_azure]
101
102
# --8<-- [start:scan_pyarrow_dataset]
103
import polars as pl
104
import pyarrow.dataset as ds
105
106
dset = ds.dataset("s3://my-partitioned-folder/", format="parquet")
107
(
108
pl.scan_pyarrow_dataset(dset)
109
.filter(pl.col("foo") == "a")
110
.select(["foo", "bar"])
111
.collect()
112
)
113
# --8<-- [end:scan_pyarrow_dataset]
114
115
# --8<-- [start:write_parquet]
116
import polars as pl
117
118
df = pl.DataFrame(
119
{
120
"foo": ["a", "b", "c", "d", "d"],
121
"bar": [1, 2, 3, 4, 5],
122
}
123
)
124
125
destination = "s3://bucket/my_file.parquet"
126
127
df.write_parquet(destination)
128
129
# --8<-- [end:write_parquet]
130
131
# --8<-- [start:write_file_object]
132
import polars as pl
133
import s3fs
134
import gzip
135
136
df = pl.DataFrame(
137
{
138
"foo": ["a", "b", "c", "d", "d"],
139
"bar": [1, 2, 3, 4, 5],
140
}
141
)
142
143
destination = "s3://bucket/my_file.csv.gz"
144
145
fs = s3fs.S3FileSystem()
146
147
with fs.open(destination, "wb") as cloud_f:
148
with gzip.open(cloud_f, "w") as f:
149
df.write_csv(f)
150
# --8<-- [end:write_file_object]
151
"""
152
153