Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/docs/source/src/python/user-guide/io/cloud-storage.py
8336 views
1
"""
2
# --8<-- [start:read_parquet]
3
import polars as pl
4
5
source = "s3://bucket/*.parquet"
6
7
df = pl.read_parquet(source)
8
# --8<-- [end:read_parquet]
9
10
# --8<-- [start:scan_parquet_query]
11
import polars as pl
12
13
source = "s3://bucket/*.parquet"
14
15
df = pl.scan_parquet(source).filter(pl.col("id") < 100).select("id","value").collect()
16
# --8<-- [end:scan_parquet_query]
17
18
# --8<-- [start:storage_options_retry_configuration]
19
import polars as pl
20
21
pl.scan_parquet(
22
"s3://bucket/*.parquet",
23
storage_options={
24
"max_retries": 3,
25
"retry_timeout_ms": 9873,
26
"retry_init_backoff_ms": 9874,
27
"retry_max_backoff_ms": 9875,
28
"retry_base_multiplier": 3.14159,
29
},
30
)
31
# --8<-- [end:storage_options_retry_configuration]
32
33
# --8<-- [start:scan_parquet_storage_options_aws]
34
import polars as pl
35
36
source = "s3://bucket/*.parquet"
37
38
storage_options = {
39
"aws_access_key_id": "<secret>",
40
"aws_secret_access_key": "<secret>",
41
"aws_region": "us-east-1",
42
}
43
df = pl.scan_parquet(source, storage_options=storage_options).collect()
44
# --8<-- [end:scan_parquet_storage_options_aws]
45
46
# --8<-- [start:credential_provider_class]
47
lf = pl.scan_parquet(
48
"s3://.../...",
49
credential_provider=pl.CredentialProviderAWS(
50
profile_name="...",
51
assume_role={
52
"RoleArn": f"...",
53
"RoleSessionName": "...",
54
}
55
),
56
)
57
58
df = lf.collect()
59
# --8<-- [end:credential_provider_class]
60
61
# --8<-- [start:credential_provider_class_global_default]
62
pl.Config.set_default_credential_provider(
63
pl.CredentialProviderAWS(
64
profile_name="...",
65
assume_role={
66
"RoleArn": f"...",
67
"RoleSessionName": "...",
68
},
69
)
70
)
71
# --8<-- [end:credential_provider_class_global_default]
72
73
# --8<-- [start:credential_provider_custom_func]
74
def get_credentials() -> pl.CredentialProviderFunctionReturn:
75
expiry = None
76
77
return {
78
"aws_access_key_id": "...",
79
"aws_secret_access_key": "...",
80
"aws_session_token": "...",
81
}, expiry
82
83
84
lf = pl.scan_parquet(
85
"s3://.../...",
86
credential_provider=get_credentials,
87
)
88
89
df = lf.collect()
90
# --8<-- [end:credential_provider_custom_func]
91
92
# --8<-- [start:credential_provider_custom_func_azure]
93
def credential_provider():
94
credential = DefaultAzureCredential(exclude_managed_identity_credential=True)
95
token = credential.get_token("https://storage.azure.com/.default")
96
97
return {"bearer_token": token.token}, token.expires_on
98
99
100
pl.scan_parquet(
101
"abfss://...@.../...",
102
credential_provider=credential_provider,
103
)
104
105
# Note that for the above case, this shortcut is also available:
106
107
pl.scan_parquet(
108
"abfss://...@.../...",
109
credential_provider=pl.CredentialProviderAzure(
110
credential=DefaultAzureCredential(exclude_managed_identity_credential=True)
111
),
112
)
113
114
# --8<-- [end:credential_provider_custom_func_azure]
115
116
# --8<-- [start:scan_pyarrow_dataset]
117
import polars as pl
118
import pyarrow.dataset as ds
119
120
dset = ds.dataset("s3://my-partitioned-folder/", format="parquet")
121
(
122
pl.scan_pyarrow_dataset(dset)
123
.filter(pl.col("foo") == "a")
124
.select(["foo", "bar"])
125
.collect()
126
)
127
# --8<-- [end:scan_pyarrow_dataset]
128
129
# --8<-- [start:write_parquet]
130
import polars as pl
131
132
df = pl.DataFrame(
133
{
134
"foo": ["a", "b", "c", "d", "d"],
135
"bar": [1, 2, 3, 4, 5],
136
}
137
)
138
139
destination = "s3://bucket/my_file.parquet"
140
141
df.write_parquet(destination)
142
143
# --8<-- [end:write_parquet]
144
145
# --8<-- [start:write_file_object]
146
import polars as pl
147
import s3fs
148
import gzip
149
150
df = pl.DataFrame(
151
{
152
"foo": ["a", "b", "c", "d", "d"],
153
"bar": [1, 2, 3, 4, 5],
154
}
155
)
156
157
destination = "s3://bucket/my_file.csv.gz"
158
159
fs = s3fs.S3FileSystem()
160
161
with fs.open(destination, "wb") as cloud_f:
162
with gzip.open(cloud_f, "w") as f:
163
df.write_csv(f)
164
# --8<-- [end:write_file_object]
165
"""
166
167