Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_scan_lines.py
8424 views
1
from __future__ import annotations
2
3
from typing import TYPE_CHECKING, Any
4
5
import pytest
6
7
import polars as pl
8
from polars.exceptions import ComputeError
9
from polars.testing.asserts.frame import assert_frame_equal
10
11
if TYPE_CHECKING:
12
from tests.conftest import PlMonkeyPatch
13
14
15
def lazified_read_lines(*a: Any, **kw: Any) -> pl.LazyFrame:
16
return pl.read_lines(*a, **kw).lazy()
17
18
19
@pytest.mark.parametrize("patch_scan_lines", [True, False])
20
@pytest.mark.parametrize("force_unit_chunk_size", [True, False])
21
@pytest.mark.parametrize("carriage_return", [True, False])
22
def test_scan_lines(
23
patch_scan_lines: bool,
24
force_unit_chunk_size: bool,
25
carriage_return: bool,
26
capfd: pytest.CaptureFixture[str],
27
plmonkeypatch: PlMonkeyPatch,
28
) -> None:
29
if patch_scan_lines:
30
plmonkeypatch.setattr(pl, "scan_lines", lazified_read_lines)
31
assert pl.scan_lines is lazified_read_lines
32
33
if carriage_return:
34
inner = pl.scan_lines
35
last_bytes = b""
36
37
def wrapped(data: Any, *a: Any, **kw: Any) -> Any:
38
nonlocal last_bytes
39
last_bytes = bytes.replace(data, b"\n", b"\r\n")
40
return inner(last_bytes, *a, **kw)
41
42
plmonkeypatch.setattr(pl, "scan_lines", wrapped)
43
44
pl.scan_lines(b"\n\n")
45
assert last_bytes == b"\r\n\r\n"
46
47
if force_unit_chunk_size:
48
plmonkeypatch.setenv("POLARS_FORCE_NDJSON_READ_SIZE", "1")
49
50
with plmonkeypatch.context() as cx:
51
capfd.readouterr()
52
cx.setenv("POLARS_VERBOSE", "1")
53
pl.scan_lines(b"").collect()
54
capture = capfd.readouterr().err
55
assert "fixed_read_size: Some(1)" in capture
56
57
assert_frame_equal(
58
pl.scan_lines(b"").collect(),
59
pl.DataFrame(schema={"lines": pl.String}),
60
)
61
62
assert_frame_equal(
63
pl.scan_lines(b"", name="A").collect(),
64
pl.DataFrame(schema={"A": pl.String}),
65
)
66
67
assert_frame_equal(
68
pl.scan_lines(b"").collect(),
69
pl.DataFrame(schema={"lines": pl.String}),
70
)
71
72
lf = pl.scan_lines(b"""\
73
AAA
74
BBB
75
CCC
76
DDD
77
EEE
78
""")
79
80
assert_frame_equal(
81
lf.slice(2, 1).collect(),
82
pl.DataFrame({"lines": ["CCC"]}),
83
)
84
85
assert_frame_equal(
86
lf.with_row_index().slice(2, 1).collect(),
87
pl.DataFrame(
88
{"index": [2], "lines": ["CCC"]},
89
schema_overrides={"index": pl.get_index_type()},
90
),
91
)
92
93
assert_frame_equal(
94
lf.slice(-2, 1).collect(),
95
pl.DataFrame({"lines": ["DDD"]}),
96
)
97
98
assert_frame_equal(
99
lf.with_row_index().slice(-2, 1).collect(),
100
pl.DataFrame(
101
{"index": [3], "lines": ["DDD"]},
102
schema_overrides={"index": pl.get_index_type()},
103
),
104
)
105
106
def f(n_spaces: int, use_file_eol: bool) -> None:
107
v = n_spaces * " "
108
file_eol = "\n" if use_file_eol else ""
109
110
lf = pl.scan_lines(f"{v}\n{v}\n{v}\n{v}\n{v}{file_eol}".encode())
111
112
q = lf
113
114
assert_frame_equal(
115
q.collect(),
116
pl.DataFrame({"lines": 5 * [v]}),
117
)
118
119
assert q.select(pl.len()).collect().item() == 5
120
121
q = lf.slice(4)
122
123
assert_frame_equal(
124
q.collect(),
125
pl.DataFrame({"lines": [v]}),
126
)
127
128
assert q.select(pl.len()).collect().item() == 1
129
130
q = lf.with_row_index().slice(4)
131
132
assert_frame_equal(
133
q.collect(),
134
pl.DataFrame(
135
{"index": [4], "lines": [v]},
136
schema_overrides={"index": pl.get_index_type()},
137
),
138
)
139
140
assert q.select(pl.len()).collect().item() == 1
141
142
q = lf.slice(5)
143
144
assert_frame_equal(
145
q.collect(),
146
pl.DataFrame(schema={"lines": pl.String}),
147
)
148
149
assert q.select(pl.len()).collect().item() == 0
150
151
q = lf.slice(-1)
152
153
assert_frame_equal(
154
q.collect(),
155
pl.DataFrame({"lines": [v]}),
156
)
157
158
assert q.select(pl.len()).collect().item() == 1
159
160
q = lf.with_row_index().slice(-1)
161
162
assert_frame_equal(
163
q.collect(),
164
pl.DataFrame(
165
{"index": [4], "lines": [v]},
166
schema_overrides={"index": pl.get_index_type()},
167
),
168
)
169
170
assert q.select(pl.len()).collect().item() == 1
171
172
q = lf.slice(-4)
173
174
assert_frame_equal(
175
q.collect(),
176
pl.DataFrame({"lines": 4 * [v]}),
177
)
178
179
assert q.select(pl.len()).collect().item() == 4
180
181
q = lf.slice(-99)
182
183
assert_frame_equal(
184
q.collect(),
185
pl.DataFrame({"lines": 5 * [v]}),
186
)
187
188
assert q.select(pl.len()).collect().item() == 5
189
190
f(n_spaces=0, use_file_eol=True)
191
192
for n_spaces in [1, 100]:
193
for use_file_eol in [True, False]:
194
f(n_spaces, use_file_eol)
195
196
197
def test_scan_lines_negative_slice_reversed_read(
198
plmonkeypatch: PlMonkeyPatch,
199
) -> None:
200
plmonkeypatch.setenv("POLARS_FORCE_NDJSON_READ_SIZE", "1")
201
q = pl.scan_lines(b"\xff" + 5000 * b"abc\n")
202
203
with pytest.raises(ComputeError, match="invalid utf8"):
204
q.collect()
205
206
assert q.tail(1).collect().item() == "abc"
207
assert q.tail(1).select(pl.len()).collect().item() == 1
208
209
# This succeeds because the line counter simply counts '\n' bytes without
210
# parsing to string.
211
assert q.select(pl.len()).collect().item() == 5000
212
213