Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/cloud/test_cloud.py
8415 views
1
from __future__ import annotations
2
3
import contextlib
4
import subprocess
5
import sys
6
from functools import partial
7
from typing import TYPE_CHECKING
8
9
import pytest
10
11
import polars as pl
12
from polars.io.cloud._utils import _is_aws_cloud
13
14
if TYPE_CHECKING:
15
from tests.conftest import PlMonkeyPatch
16
17
18
@pytest.mark.slow
19
@pytest.mark.parametrize("format", ["parquet", "csv", "ndjson", "ipc"])
20
def test_scan_nonexistent_cloud_path_17444(format: str) -> None:
21
# https://github.com/pola-rs/polars/issues/17444
22
23
path_str = f"s3://my-nonexistent-bucket/data.{format}"
24
scan_function = getattr(pl, f"scan_{format}")
25
# Prevent automatic credential provideder instantiation, otherwise CI may fail with
26
# * pytest.PytestUnraisableExceptionWarning:
27
# * Exception ignored:
28
# * ResourceWarning: unclosed socket
29
scan_function = partial(scan_function, credential_provider=None)
30
31
# Just calling the scan function should not raise any errors
32
if format == "ndjson":
33
# NDJSON does not have a `retries` parameter yet - so use the default
34
result = scan_function(path_str)
35
else:
36
result = scan_function(path_str, storage_options={"max_retries": 0})
37
assert isinstance(result, pl.LazyFrame)
38
39
# Upon collection, it should fail
40
with pytest.raises(IOError):
41
result.collect()
42
43
44
def test_scan_err_rebuild_store_19933() -> None:
45
call_count = 0
46
47
def f() -> None:
48
nonlocal call_count
49
call_count += 1
50
raise AssertionError
51
52
q = pl.scan_parquet(
53
"s3://.../...",
54
storage_options={"aws_region": "eu-west-1"},
55
credential_provider=f, # type: ignore[arg-type]
56
)
57
58
with contextlib.suppress(Exception):
59
q.collect()
60
61
# Note: We get called 2 times per attempt
62
if call_count != 4:
63
raise AssertionError(call_count)
64
65
66
def test_is_aws_cloud() -> None:
67
assert _is_aws_cloud(
68
scheme="https",
69
first_scan_path="https://bucket.s3.eu-west-1.amazonaws.com/key",
70
)
71
72
# Slash in front of amazonaws.com
73
assert not _is_aws_cloud(
74
scheme="https",
75
first_scan_path="https://bucket/.s3.eu-west-1.amazonaws.com/key",
76
)
77
78
assert not _is_aws_cloud(
79
scheme="https",
80
first_scan_path="https://bucket?.s3.eu-west-1.amazonaws.com/key",
81
)
82
83
# Legacy global endpoint
84
assert not _is_aws_cloud(
85
scheme="https", first_scan_path="https://bucket.s3.amazonaws.com/key"
86
)
87
88
# Has query parameters (e.g. presigned URL).
89
assert not _is_aws_cloud(
90
scheme="https",
91
first_scan_path="https://bucket.s3.eu-west-1.amazonaws.com/key?",
92
)
93
94
95
def test_storage_options_retry_config(
96
plmonkeypatch: PlMonkeyPatch,
97
capfd: pytest.CaptureFixture[str],
98
) -> None:
99
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
100
101
capture = subprocess.check_output(
102
[
103
sys.executable,
104
"-c",
105
"""\
106
import contextlib
107
import os
108
109
import polars as pl
110
111
os.environ["POLARS_VERBOSE"] = "1"
112
os.environ["POLARS_CLOUD_MAX_RETRIES"] = "1"
113
os.environ["POLARS_CLOUD_RETRY_TIMEOUT_MS"] = "1"
114
os.environ["POLARS_CLOUD_RETRY_INIT_BACKOFF_MS"] = "2"
115
os.environ["POLARS_CLOUD_RETRY_MAX_BACKOFF_MS"] = "10373"
116
os.environ["POLARS_CLOUD_RETRY_BASE_MULTIPLIER"] = "6.28"
117
118
q = pl.scan_parquet(
119
"s3://.../...",
120
storage_options={"aws_endpoint_url": "https://localhost:333"},
121
credential_provider=None,
122
)
123
124
with contextlib.suppress(OSError):
125
q.collect()
126
127
""",
128
],
129
stderr=subprocess.STDOUT,
130
).decode()
131
132
assert (
133
"""\
134
init_backoff: 2ms, \
135
max_backoff: 10.373s, \
136
base: 6.28 }, \
137
max_retries: 1, \
138
retry_timeout: 1ms"""
139
in capture
140
)
141
142
q = pl.scan_parquet(
143
"s3://.../...",
144
storage_options={
145
"file_cache_ttl": 7,
146
"max_retries": 0,
147
"retry_timeout_ms": 23,
148
"retry_init_backoff_ms": 24,
149
"retry_max_backoff_ms": 9875,
150
"retry_base_multiplier": 3.14159,
151
"aws_endpoint_url": "https://localhost:333",
152
},
153
credential_provider=None,
154
)
155
156
capfd.readouterr()
157
158
with pytest.raises(OSError):
159
q.collect()
160
161
capture = capfd.readouterr().err
162
163
assert "file_cache_ttl: 7" in capture
164
165
assert (
166
"""\
167
init_backoff: 24ms, \
168
max_backoff: 9.875s, \
169
base: 3.14159 }, \
170
max_retries: 0, \
171
retry_timeout: 23ms"""
172
in capture
173
)
174
175