Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_csv.py
8415 views
1
from __future__ import annotations
2
3
import gzip
4
import io
5
import os
6
import sys
7
import textwrap
8
import zlib
9
from datetime import date, datetime, time, timedelta, timezone
10
from decimal import Decimal as D
11
from tempfile import NamedTemporaryFile
12
from typing import TYPE_CHECKING, TypedDict
13
14
import numpy as np
15
import pyarrow as pa
16
import pytest
17
import zstandard
18
19
import polars as pl
20
from polars._utils.various import normalize_filepath
21
from polars.exceptions import ComputeError, InvalidOperationError, NoDataError
22
from polars.io.csv import BatchedCsvReader
23
from polars.testing import assert_frame_equal, assert_series_equal
24
from tests.conftest import PlMonkeyPatch
25
26
if TYPE_CHECKING:
27
from pathlib import Path
28
from typing import Any
29
30
from polars._typing import CsvQuoteStyle, TimeUnit
31
32
33
@pytest.fixture
34
def foods_file_path(io_files_path: Path) -> Path:
35
return io_files_path / "foods1.csv"
36
37
38
@pytest.fixture(params=["chunk-size-default", "chunk-size-7"])
39
def chunk_override(request: Any, plmonkeypatch: PlMonkeyPatch) -> None:
40
env_var_name = "POLARS_FORCE_CSV_INFER_CHUNK_SIZE"
41
42
if request.param == "chunk-size-default":
43
plmonkeypatch.delenv(env_var_name, raising=False)
44
elif request.param == "chunk-size-7":
45
# 7 is good because it can contain some test lines fully but not all
46
# and it tests chunks merging. The chunks in question are only the ones
47
# for schema inference and start point finding.
48
plmonkeypatch.setenv(env_var_name, "7")
49
else:
50
pytest.fail("unreachable")
51
52
53
def test_quoted_date(chunk_override: None) -> None:
54
csv = textwrap.dedent(
55
"""\
56
a,b
57
"2022-01-01",1
58
"2022-01-02",2
59
"""
60
)
61
result = pl.read_csv(csv.encode(), try_parse_dates=True)
62
expected = pl.DataFrame({"a": [date(2022, 1, 1), date(2022, 1, 2)], "b": [1, 2]})
63
assert_frame_equal(result, expected)
64
65
66
# Issue: https://github.com/pola-rs/polars/issues/10826
67
def test_date_pattern_with_datetime_override_10826(chunk_override: None) -> None:
68
result = pl.read_csv(
69
source=io.StringIO("col\n2023-01-01\n2023-02-01\n2023-03-01"),
70
schema_overrides={"col": pl.Datetime},
71
)
72
expected = pl.Series(
73
"col", [datetime(2023, 1, 1), datetime(2023, 2, 1), datetime(2023, 3, 1)]
74
).to_frame()
75
assert_frame_equal(result, expected)
76
77
result = pl.read_csv(
78
source=io.StringIO("col\n2023-01-01T01:02:03\n2023-02-01\n2023-03-01"),
79
schema_overrides={"col": pl.Datetime},
80
)
81
expected = pl.Series(
82
"col",
83
[datetime(2023, 1, 1, 1, 2, 3), datetime(2023, 2, 1), datetime(2023, 3, 1)],
84
).to_frame()
85
assert_frame_equal(result, expected)
86
87
88
def test_to_from_buffer(chunk_override: None, df_no_lists: pl.DataFrame) -> None:
89
df = df_no_lists
90
buf = io.BytesIO()
91
df.write_csv(buf)
92
buf.seek(0)
93
94
read_df = pl.read_csv(buf, try_parse_dates=True)
95
read_df = read_df.with_columns(
96
pl.col("cat").cast(pl.Categorical),
97
pl.col("enum").cast(pl.Enum(["foo", "ham", "bar"])),
98
pl.col("time").cast(pl.Time),
99
)
100
assert_frame_equal(df, read_df, categorical_as_str=True)
101
with pytest.raises(AssertionError):
102
assert_frame_equal(df.select("time", "cat"), read_df, categorical_as_str=True)
103
104
105
@pytest.mark.write_disk
106
def test_to_from_file(
107
chunk_override: None, df_no_lists: pl.DataFrame, tmp_path: Path
108
) -> None:
109
tmp_path.mkdir(exist_ok=True)
110
111
df = df_no_lists.drop("strings_nulls")
112
113
file_path = tmp_path / "small.csv"
114
df.write_csv(file_path)
115
read_df = pl.read_csv(file_path, try_parse_dates=True)
116
117
read_df = read_df.with_columns(
118
pl.col("cat").cast(pl.Categorical),
119
pl.col("enum").cast(pl.Enum(["foo", "ham", "bar"])),
120
pl.col("time").cast(pl.Time),
121
)
122
assert_frame_equal(df, read_df, categorical_as_str=True)
123
124
125
def test_normalize_filepath(chunk_override: None, io_files_path: Path) -> None:
126
with pytest.raises(IsADirectoryError):
127
normalize_filepath(io_files_path)
128
129
assert normalize_filepath(str(io_files_path), check_not_directory=False) == str(
130
io_files_path
131
)
132
133
134
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
135
def test_infer_schema_false(chunk_override: None, read_fn: str) -> None:
136
csv = textwrap.dedent(
137
"""\
138
a,b,c
139
1,2,3
140
1,2,3
141
"""
142
)
143
df = getattr(pl, read_fn)(io.StringIO(csv), infer_schema=False).lazy().collect()
144
assert df.dtypes == [pl.String, pl.String, pl.String]
145
146
147
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
148
def test_csv_null_values(chunk_override: None) -> None:
149
csv = textwrap.dedent(
150
"""\
151
a,b,c
152
na,b,c
153
a,na,c
154
"""
155
)
156
f = io.StringIO(csv)
157
df = pl.read_csv(f, null_values="na")
158
assert df.rows() == [(None, "b", "c"), ("a", None, "c")]
159
160
# note: after reading, the buffer position in StringIO will have been
161
# advanced; reading again will raise NoDataError, so we provide a hint
162
# in the error string about this, suggesting "seek(0)" as a possible fix...
163
with pytest.raises(NoDataError, match=r"empty"):
164
pl.read_csv(f)
165
166
# ... unless we explicitly tell read_csv not to raise an
167
# exception, in which case we expect an empty dataframe
168
assert_frame_equal(pl.read_csv(f, raise_if_empty=False), pl.DataFrame())
169
170
out = io.BytesIO()
171
df.write_csv(out, null_value="na")
172
assert csv == out.getvalue().decode("ascii")
173
174
csv = textwrap.dedent(
175
"""\
176
a,b,c
177
na,b,c
178
a,n/a,c
179
"""
180
)
181
f = io.StringIO(csv)
182
df = pl.read_csv(f, null_values=["na", "n/a"])
183
assert df.rows() == [(None, "b", "c"), ("a", None, "c")]
184
185
csv = textwrap.dedent(
186
r"""
187
a,b,c
188
na,b,c
189
a,\N,c
190
,b,
191
"""
192
)
193
f = io.StringIO(csv)
194
df = pl.read_csv(f, null_values={"a": "na", "b": r"\N"})
195
assert df.rows() == [(None, "b", "c"), ("a", None, "c"), (None, "b", None)]
196
197
198
def test_csv_missing_utf8_is_empty_string(chunk_override: None) -> None:
199
# validate 'missing_utf8_is_empty_string' for missing fields that are...
200
# >> ...leading
201
# >> ...trailing (both EOL & EOF)
202
# >> ...in lines that have missing fields
203
# >> ...in cols containing no other strings
204
# >> ...interacting with other user-supplied null values
205
206
csv = textwrap.dedent(
207
r"""
208
a,b,c
209
na,b,c
210
a,\N,c
211
,b,
212
"""
213
)
214
f = io.StringIO(csv)
215
df = pl.read_csv(
216
f,
217
null_values={"a": "na", "b": r"\N"},
218
missing_utf8_is_empty_string=True,
219
)
220
# ┌──────┬──────┬─────┐
221
# │ a ┆ b ┆ c │
222
# ╞══════╪══════╪═════╡
223
# │ null ┆ b ┆ c │
224
# │ a ┆ null ┆ c │
225
# │ ┆ b ┆ │
226
# └──────┴──────┴─────┘
227
assert df.rows() == [(None, "b", "c"), ("a", None, "c"), ("", "b", "")]
228
229
csv = textwrap.dedent(
230
r"""
231
a,b,c,d,e,f,g
232
na,,,,\N,,
233
a,\N,c,,,,g
234
,,,
235
,,,na,,,
236
"""
237
)
238
f = io.StringIO(csv)
239
df = pl.read_csv(f, null_values=["na", r"\N"])
240
# ┌──────┬──────┬──────┬──────┬──────┬──────┬──────┐
241
# │ a ┆ b ┆ c ┆ d ┆ e ┆ f ┆ g │
242
# ╞══════╪══════╪══════╪══════╪══════╪══════╪══════╡
243
# │ null ┆ null ┆ null ┆ null ┆ null ┆ null ┆ null │
244
# │ a ┆ null ┆ c ┆ null ┆ null ┆ null ┆ g │
245
# │ null ┆ null ┆ null ┆ null ┆ null ┆ null ┆ null │
246
# │ null ┆ null ┆ null ┆ null ┆ null ┆ null ┆ null │
247
# └──────┴──────┴──────┴──────┴──────┴──────┴──────┘
248
assert df.rows() == [
249
(None, None, None, None, None, None, None),
250
("a", None, "c", None, None, None, "g"),
251
(None, None, None, None, None, None, None),
252
(None, None, None, None, None, None, None),
253
]
254
255
f.seek(0)
256
df = pl.read_csv(
257
f,
258
null_values=["na", r"\N"],
259
missing_utf8_is_empty_string=True,
260
)
261
# ┌──────┬──────┬─────┬──────┬──────┬──────┬─────┐
262
# │ a ┆ b ┆ c ┆ d ┆ e ┆ f ┆ g │
263
# ╞══════╪══════╪═════╪══════╪══════╪══════╪═════╡
264
# │ null ┆ ┆ ┆ ┆ null ┆ ┆ │
265
# │ a ┆ null ┆ c ┆ ┆ ┆ ┆ g │
266
# │ ┆ ┆ ┆ ┆ ┆ ┆ │
267
# │ ┆ ┆ ┆ null ┆ ┆ ┆ │
268
# └──────┴──────┴─────┴──────┴──────┴──────┴─────┘
269
assert df.rows() == [
270
(None, "", "", "", None, "", ""),
271
("a", None, "c", "", "", "", "g"),
272
("", "", "", "", "", "", ""),
273
("", "", "", None, "", "", ""),
274
]
275
276
277
def test_csv_int_types(chunk_override: None) -> None:
278
f = io.StringIO(
279
"u8,i8,u16,i16,u32,i32,u64,i64,u128,i128\n"
280
"0,0,0,0,0,0,0,0,0,0\n"
281
"0,-128,0,-32768,0,-2147483648,0,-9223372036854775808,0,-170141183460469231731687303715884105728\n"
282
"255,127,65535,32767,4294967295,2147483647,18446744073709551615,9223372036854775807,340282366920938463463374607431768211455,170141183460469231731687303715884105727\n"
283
"01,01,01,01,01,01,01,01,01,01\n"
284
"01,-01,01,-01,01,-01,01,-01,01,-01\n"
285
)
286
df = pl.read_csv(
287
f,
288
schema={
289
"u8": pl.UInt8,
290
"i8": pl.Int8,
291
"u16": pl.UInt16,
292
"i16": pl.Int16,
293
"u32": pl.UInt32,
294
"i32": pl.Int32,
295
"u64": pl.UInt64,
296
"i64": pl.Int64,
297
"u128": pl.UInt128,
298
"i128": pl.Int128,
299
},
300
)
301
302
assert_frame_equal(
303
df,
304
pl.DataFrame(
305
{
306
"u8": pl.Series([0, 0, 255, 1, 1], dtype=pl.UInt8),
307
"i8": pl.Series([0, -128, 127, 1, -1], dtype=pl.Int8),
308
"u16": pl.Series([0, 0, 65535, 1, 1], dtype=pl.UInt16),
309
"i16": pl.Series([0, -32768, 32767, 1, -1], dtype=pl.Int16),
310
"u32": pl.Series([0, 0, 4294967295, 1, 1], dtype=pl.UInt32),
311
"i32": pl.Series([0, -2147483648, 2147483647, 1, -1], dtype=pl.Int32),
312
"u64": pl.Series([0, 0, 18446744073709551615, 1, 1], dtype=pl.UInt64),
313
"i64": pl.Series(
314
[0, -9223372036854775808, 9223372036854775807, 1, -1],
315
dtype=pl.Int64,
316
),
317
"u128": pl.Series(
318
[
319
0,
320
0,
321
340282366920938463463374607431768211455,
322
1,
323
1,
324
],
325
dtype=pl.UInt128,
326
),
327
"i128": pl.Series(
328
[
329
0,
330
-170141183460469231731687303715884105728,
331
170141183460469231731687303715884105727,
332
1,
333
-1,
334
],
335
dtype=pl.Int128,
336
),
337
}
338
),
339
)
340
341
342
def test_csv_float_parsing(chunk_override: None) -> None:
343
lines_with_floats = [
344
"123.86,+123.86,-123.86\n",
345
".987,+.987,-.987\n",
346
"5.,+5.,-5.\n",
347
"inf,+inf,-inf\n",
348
"NaN,+NaN,-NaN\n",
349
]
350
351
for line_with_floats in lines_with_floats:
352
f = io.StringIO(line_with_floats)
353
df = pl.read_csv(f, has_header=False, new_columns=["a", "b", "c"])
354
assert df.dtypes == [pl.Float64, pl.Float64, pl.Float64]
355
356
lines_with_scientific_numbers = [
357
"1e27,1E65,1e-28,1E-9\n",
358
"+1e27,+1E65,+1e-28,+1E-9\n",
359
"1e+27,1E+65,1e-28,1E-9\n",
360
"+1e+27,+1E+65,+1e-28,+1E-9\n",
361
"-1e+27,-1E+65,-1e-28,-1E-9\n",
362
# "e27,E65,e-28,E-9\n",
363
# "+e27,+E65,+e-28,+E-9\n",
364
# "-e27,-E65,-e-28,-E-9\n",
365
]
366
367
for line_with_scientific_numbers in lines_with_scientific_numbers:
368
f = io.StringIO(line_with_scientific_numbers)
369
df = pl.read_csv(f, has_header=False, new_columns=["a", "b", "c", "d"])
370
assert df.dtypes == [pl.Float64, pl.Float64, pl.Float64, pl.Float64]
371
372
373
def test_datetime_parsing(chunk_override: None) -> None:
374
csv = textwrap.dedent(
375
"""\
376
timestamp,open,high
377
2021-01-01 00:00:00,0.00305500,0.00306000
378
2021-01-01 00:15:00,0.00298800,0.00300400
379
2021-01-01 00:30:00,0.00298300,0.00300100
380
2021-01-01 00:45:00,0.00299400,0.00304000
381
"""
382
)
383
384
f = io.StringIO(csv)
385
df = pl.read_csv(f, try_parse_dates=True)
386
assert df.dtypes == [pl.Datetime, pl.Float64, pl.Float64]
387
388
389
def test_datetime_parsing_default_formats(chunk_override: None) -> None:
390
csv = textwrap.dedent(
391
"""\
392
ts_dmy,ts_dmy_f,ts_dmy_p
393
01/01/2021 00:00:00,31-01-2021T00:00:00.123,31-01-2021 11:00
394
01/01/2021 00:15:00,31-01-2021T00:15:00.123,31-01-2021 01:00
395
01/01/2021 00:30:00,31-01-2021T00:30:00.123,31-01-2021 01:15
396
01/01/2021 00:45:00,31-01-2021T00:45:00.123,31-01-2021 01:30
397
"""
398
)
399
400
f = io.StringIO(csv)
401
df = pl.read_csv(f, try_parse_dates=True)
402
assert df.dtypes == [pl.Datetime, pl.Datetime, pl.Datetime]
403
404
405
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
406
def test_partial_schema_overrides(chunk_override: None) -> None:
407
csv = textwrap.dedent(
408
"""\
409
a,b,c
410
1,2,3
411
1,2,3
412
"""
413
)
414
f = io.StringIO(csv)
415
df = pl.read_csv(f, schema_overrides=[pl.String])
416
assert df.dtypes == [pl.String, pl.Int64, pl.Int64]
417
418
419
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
420
def test_schema_overrides_with_column_name_selection(chunk_override: None) -> None:
421
csv = textwrap.dedent(
422
"""\
423
a,b,c,d
424
1,2,3,4
425
1,2,3,4
426
"""
427
)
428
f = io.StringIO(csv)
429
df = pl.read_csv(f, columns=["c", "b", "d"], schema_overrides=[pl.Int32, pl.String])
430
assert df.dtypes == [pl.String, pl.Int32, pl.Int64]
431
432
433
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
434
def test_schema_overrides_with_column_idx_selection(chunk_override: None) -> None:
435
csv = textwrap.dedent(
436
"""\
437
a,b,c,d
438
1,2,3,4
439
1,2,3,4
440
"""
441
)
442
f = io.StringIO(csv)
443
df = pl.read_csv(f, columns=[2, 1, 3], schema_overrides=[pl.Int32, pl.String])
444
# Columns without an explicit dtype set will get pl.String if dtypes is a list
445
# if the column selection is done with column indices instead of column names.
446
assert df.dtypes == [pl.String, pl.Int32, pl.String]
447
# Projections are sorted.
448
assert df.columns == ["b", "c", "d"]
449
450
451
def test_partial_column_rename(chunk_override: None) -> None:
452
csv = textwrap.dedent(
453
"""\
454
a,b,c
455
1,2,3
456
1,2,3
457
"""
458
)
459
f = io.StringIO(csv)
460
for use in [True, False]:
461
f.seek(0)
462
df = pl.read_csv(f, new_columns=["foo"], use_pyarrow=use)
463
assert df.columns == ["foo", "b", "c"]
464
465
466
@pytest.mark.parametrize(
467
("col_input", "col_out"),
468
[([0, 1], ["a", "b"]), ([0, 2], ["a", "c"]), (["b"], ["b"])],
469
)
470
def test_read_csv_columns_argument(
471
chunk_override: None, col_input: list[int] | list[str], col_out: list[str]
472
) -> None:
473
csv = textwrap.dedent(
474
"""\
475
a,b,c
476
1,2,3
477
1,2,3
478
"""
479
)
480
f = io.StringIO(csv)
481
df = pl.read_csv(f, columns=col_input)
482
assert df.shape[0] == 2
483
assert df.columns == col_out
484
485
486
@pytest.mark.may_fail_cloud # read->scan_csv dispatch
487
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
488
def test_read_csv_buffer_ownership(chunk_override: None) -> None:
489
bts = b"\xf0\x9f\x98\x80,5.55,333\n\xf0\x9f\x98\x86,-5.0,666"
490
buf = io.BytesIO(bts)
491
df = pl.read_csv(
492
buf,
493
has_header=False,
494
new_columns=["emoji", "flt", "int"],
495
)
496
# confirm that read_csv succeeded, and didn't close the input buffer (#2696)
497
assert df.shape == (2, 3)
498
assert df.rows() == [("😀", 5.55, 333), ("😆", -5.0, 666)]
499
assert not buf.closed
500
assert buf.read() == bts
501
502
503
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
504
@pytest.mark.write_disk
505
def test_read_csv_encoding(chunk_override: None, tmp_path: Path) -> None:
506
tmp_path.mkdir(exist_ok=True)
507
508
bts = (
509
b"Value1,Value2,Value3,Value4,Region\n"
510
b"-30,7.5,2578,1,\xa5x\xa5_\n-32,7.97,3006,1,\xa5x\xa4\xa4\n"
511
b"-31,8,3242,2,\xb7s\xa6\xcb\n-33,7.97,3300,3,\xb0\xaa\xb6\xaf\n"
512
b"-20,7.91,3384,4,\xac\xfc\xb0\xea\n"
513
)
514
515
file_path = tmp_path / "encoding.csv"
516
file_path.write_bytes(bts)
517
518
file_str = str(file_path)
519
bytesio = io.BytesIO(bts)
520
521
for use_pyarrow in (False, True):
522
bytesio.seek(0)
523
for file in [file_path, file_str, bts, bytesio]:
524
assert_series_equal(
525
pl.read_csv(
526
file, # type: ignore[arg-type]
527
encoding="big5",
528
use_pyarrow=use_pyarrow,
529
).get_column("Region"),
530
pl.Series("Region", ["台北", "台中", "新竹", "高雄", "美國"]),
531
)
532
533
534
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
535
@pytest.mark.write_disk
536
def test_read_csv_encoding_lossy(chunk_override: None, tmp_path: Path) -> None:
537
tmp_path.mkdir(exist_ok=True)
538
539
bts = (
540
b"\xc8\xec\xff,\xc2\xee\xe7\xf0\xe0\xf1\xf2,\xc3\xee\xf0\xee\xe4\n"
541
b"\xc8\xe2\xe0\xed,25,\xcc\xee\xf1\xea\xe2\xe0\n"
542
# \x98 is not supported in "windows-1251".
543
b"\xce\xeb\xfc\xe3\xe0,30,\xd1\xe0\xed\xea\xf2-\x98\xcf\xe5\xf2\xe5\xf0\xe1\xf3\xf0\xe3\n"
544
)
545
546
file_path = tmp_path / "encoding_lossy.csv"
547
file_path.write_bytes(bts)
548
549
file_str = str(file_path)
550
bytesio = io.BytesIO(bts)
551
bytesio.seek(0)
552
553
for file in [file_path, file_str, bts, bytesio]:
554
assert_series_equal(
555
pl.read_csv(
556
file, # type: ignore[arg-type]
557
encoding="windows-1251-lossy",
558
use_pyarrow=False,
559
).get_column("Город"),
560
pl.Series("Город", ["Москва", "Санкт-�Петербург"]),
561
)
562
563
564
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
565
def test_column_rename_and_schema_overrides(chunk_override: None) -> None:
566
csv = textwrap.dedent(
567
"""\
568
a,b,c
569
1,2,3
570
1,2,3
571
"""
572
)
573
f = io.StringIO(csv)
574
df = pl.read_csv(
575
f,
576
new_columns=["A", "B", "C"],
577
schema_overrides={"A": pl.String, "B": pl.Int64, "C": pl.Float32},
578
)
579
assert df.dtypes == [pl.String, pl.Int64, pl.Float32]
580
581
f = io.StringIO(csv)
582
df = pl.read_csv(
583
f,
584
columns=["a", "c"],
585
new_columns=["A", "C"],
586
schema_overrides={"A": pl.String, "C": pl.Float32},
587
)
588
assert df.dtypes == [pl.String, pl.Float32]
589
590
csv = textwrap.dedent(
591
"""\
592
1,2,3
593
1,2,3
594
"""
595
)
596
f = io.StringIO(csv)
597
df = pl.read_csv(
598
f,
599
new_columns=["A", "B", "C"],
600
schema_overrides={"A": pl.String, "C": pl.Float32},
601
has_header=False,
602
)
603
assert df.dtypes == [pl.String, pl.Int64, pl.Float32]
604
605
606
def test_compressed_csv(
607
chunk_override: None, io_files_path: Path, plmonkeypatch: PlMonkeyPatch
608
) -> None:
609
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "0")
610
611
# gzip compression
612
csv = textwrap.dedent(
613
"""\
614
a,b,c
615
1,a,1.0
616
2,b,2.0
617
3,c,3.0
618
"""
619
)
620
fout = io.BytesIO()
621
with gzip.GzipFile(fileobj=fout, mode="w") as f:
622
f.write(csv.encode())
623
624
csv_bytes = fout.getvalue()
625
out = pl.read_csv(csv_bytes)
626
expected = pl.DataFrame(
627
{"a": [1, 2, 3], "b": ["a", "b", "c"], "c": [1.0, 2.0, 3.0]}
628
)
629
assert_frame_equal(out, expected)
630
631
# now from disk
632
csv_file = io_files_path / "gzipped.csv.gz"
633
out = pl.read_csv(str(csv_file), truncate_ragged_lines=True)
634
assert_frame_equal(out, expected)
635
636
# now with schema defined
637
schema = {"a": pl.Int64, "b": pl.Utf8, "c": pl.Float64}
638
out = pl.read_csv(str(csv_file), schema=schema, truncate_ragged_lines=True)
639
assert_frame_equal(out, expected)
640
641
# now with column projection
642
out = pl.read_csv(csv_bytes, columns=["a", "b"])
643
expected = pl.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
644
assert_frame_equal(out, expected)
645
646
# zlib compression
647
csv_bytes = zlib.compress(csv.encode())
648
out = pl.read_csv(csv_bytes)
649
expected = pl.DataFrame(
650
{"a": [1, 2, 3], "b": ["a", "b", "c"], "c": [1.0, 2.0, 3.0]}
651
)
652
assert_frame_equal(out, expected)
653
654
# different levels of zlib create different magic strings,
655
# try to cover them all.
656
for level in range(10):
657
csv_bytes = zlib.compress(csv.encode(), level=level)
658
out = pl.read_csv(csv_bytes)
659
expected = pl.DataFrame(
660
{"a": [1, 2, 3], "b": ["a", "b", "c"], "c": [1.0, 2.0, 3.0]}
661
)
662
assert_frame_equal(out, expected)
663
664
# zstd compression
665
csv_bytes = zstandard.compress(csv.encode())
666
out = pl.read_csv(csv_bytes)
667
assert_frame_equal(out, expected)
668
669
# zstd compressed file
670
csv_file = io_files_path / "zstd_compressed.csv.zst"
671
out = pl.scan_csv(csv_file, truncate_ragged_lines=True).collect()
672
assert_frame_equal(out, expected)
673
out = pl.read_csv(str(csv_file), truncate_ragged_lines=True)
674
assert_frame_equal(out, expected)
675
676
# no compression
677
f2 = io.BytesIO(b"a,b\n1,2\n")
678
out2 = pl.read_csv(f2)
679
expected = pl.DataFrame({"a": [1], "b": [2]})
680
assert_frame_equal(out2, expected)
681
682
683
def test_partial_decompression(chunk_override: None, foods_file_path: Path) -> None:
684
f_out = io.BytesIO()
685
with gzip.GzipFile(fileobj=f_out, mode="w") as f:
686
f.write(foods_file_path.read_bytes())
687
688
csv_bytes = f_out.getvalue()
689
for n_rows in [1, 5, 26]:
690
out = pl.read_csv(csv_bytes, n_rows=n_rows)
691
assert out.shape == (n_rows, 4)
692
693
# zstd compression
694
csv_bytes = zstandard.compress(foods_file_path.read_bytes())
695
for n_rows in [1, 5, 26]:
696
out = pl.read_csv(csv_bytes, n_rows=n_rows)
697
assert out.shape == (n_rows, 4)
698
699
700
def test_empty_bytes(chunk_override: None) -> None:
701
b = b""
702
with pytest.raises(NoDataError):
703
pl.read_csv(b)
704
705
df = pl.read_csv(b, raise_if_empty=False)
706
assert_frame_equal(df, pl.DataFrame())
707
708
709
def test_empty_line_with_single_column(chunk_override: None) -> None:
710
df = pl.read_csv(
711
b"a\n\nb\n",
712
new_columns=["A"],
713
has_header=False,
714
comment_prefix="#",
715
use_pyarrow=False,
716
)
717
expected = pl.DataFrame({"A": ["a", None, "b"]})
718
assert_frame_equal(df, expected)
719
720
721
def test_empty_line_with_multiple_columns(chunk_override: None) -> None:
722
df = pl.read_csv(
723
b"a,b\n\nc,d\n",
724
new_columns=["A", "B"],
725
has_header=False,
726
comment_prefix="#",
727
use_pyarrow=False,
728
)
729
expected = pl.DataFrame({"A": ["a", None, "c"], "B": ["b", None, "d"]})
730
assert_frame_equal(df, expected)
731
732
733
def test_preserve_whitespace_at_line_start(chunk_override: None) -> None:
734
df = pl.read_csv(
735
b" a\n b \n c\nd",
736
new_columns=["A"],
737
has_header=False,
738
use_pyarrow=False,
739
)
740
expected = pl.DataFrame({"A": [" a", " b ", " c", "d"]})
741
assert_frame_equal(df, expected)
742
743
744
def test_csv_multi_char_comment(chunk_override: None) -> None:
745
csv = textwrap.dedent(
746
"""\
747
#a,b
748
##c,d
749
"""
750
)
751
f = io.StringIO(csv)
752
df = pl.read_csv(
753
f,
754
new_columns=["A", "B"],
755
has_header=False,
756
comment_prefix="##",
757
use_pyarrow=False,
758
)
759
expected = pl.DataFrame({"A": ["#a"], "B": ["b"]})
760
assert_frame_equal(df, expected)
761
762
# check comment interaction with headers/skip_rows
763
for skip_rows, b in (
764
(1, io.BytesIO(b"<filemeta>\n#!skip\n#!skip\nCol1\tCol2\n")),
765
(0, io.BytesIO(b"\n#!skip\n#!skip\nCol1\tCol2")),
766
(0, io.BytesIO(b"#!skip\nCol1\tCol2\n#!skip\n")),
767
(0, io.BytesIO(b"#!skip\nCol1\tCol2")),
768
):
769
df = pl.read_csv(b, separator="\t", comment_prefix="#!", skip_rows=skip_rows)
770
assert_frame_equal(df, pl.DataFrame(schema=["Col1", "Col2"]).cast(pl.Utf8))
771
772
773
def test_csv_quote_char(chunk_override: None) -> None:
774
expected = pl.DataFrame(
775
[
776
pl.Series("linenum", [1, 2, 3, 4, 5, 6, 7, 8, 9]),
777
pl.Series(
778
"last_name",
779
[
780
"Jagger",
781
'O"Brian',
782
"Richards",
783
'L"Etoile',
784
"Watts",
785
"Smith",
786
'"Wyman"',
787
"Woods",
788
'J"o"ne"s',
789
],
790
),
791
pl.Series(
792
"first_name",
793
[
794
"Mick",
795
'"Mary"',
796
"Keith",
797
"Bennet",
798
"Charlie",
799
'D"Shawn',
800
"Bill",
801
"Ron",
802
"Brian",
803
],
804
),
805
]
806
)
807
rolling_stones = textwrap.dedent(
808
"""\
809
linenum,last_name,first_name
810
1,Jagger,Mick
811
2,O"Brian,"Mary"
812
3,Richards,Keith
813
4,L"Etoile,Bennet
814
5,Watts,Charlie
815
6,Smith,D"Shawn
816
7,"Wyman",Bill
817
8,Woods,Ron
818
9,J"o"ne"s,Brian
819
"""
820
)
821
for use_pyarrow in (False, True):
822
out = pl.read_csv(
823
rolling_stones.encode(), quote_char=None, use_pyarrow=use_pyarrow
824
)
825
assert out.shape == (9, 3)
826
assert_frame_equal(out, expected)
827
828
# non-standard quote char
829
df = pl.DataFrame({"x": ["", "0*0", "xyz"]})
830
csv_data = df.write_csv(quote_char="*")
831
832
assert csv_data == "x\n**\n*0**0*\nxyz\n"
833
assert_frame_equal(df, pl.read_csv(io.StringIO(csv_data), quote_char="*"))
834
835
836
def test_csv_empty_quotes_char_1622(chunk_override: None) -> None:
837
pl.read_csv(b"a,b,c,d\nA1,B1,C1,1\nA2,B2,C2,2\n", quote_char="")
838
839
840
def test_ignore_try_parse_dates(chunk_override: None) -> None:
841
csv = textwrap.dedent(
842
"""\
843
a,b,c
844
1,i,16200126
845
2,j,16250130
846
3,k,17220012
847
4,l,17290009
848
"""
849
).encode()
850
851
headers = ["a", "b", "c"]
852
dtypes: dict[str, type[pl.DataType]] = dict.fromkeys(
853
headers, pl.String
854
) # Forces String type for every column
855
df = pl.read_csv(csv, columns=headers, schema_overrides=dtypes)
856
assert df.dtypes == [pl.String, pl.String, pl.String]
857
858
859
def test_csv_date_handling(chunk_override: None) -> None:
860
csv = textwrap.dedent(
861
"""\
862
date
863
1745-04-02
864
1742-03-21
865
1743-06-16
866
1730-07-22
867
868
1739-03-16
869
"""
870
)
871
expected = pl.DataFrame(
872
{
873
"date": [
874
date(1745, 4, 2),
875
date(1742, 3, 21),
876
date(1743, 6, 16),
877
date(1730, 7, 22),
878
None,
879
date(1739, 3, 16),
880
]
881
}
882
)
883
out = pl.read_csv(csv.encode(), try_parse_dates=True)
884
assert_frame_equal(out, expected)
885
dtypes = {"date": pl.Date}
886
out = pl.read_csv(csv.encode(), schema_overrides=dtypes)
887
assert_frame_equal(out, expected)
888
889
890
def test_csv_no_date_dtype_because_string(chunk_override: None) -> None:
891
csv = textwrap.dedent(
892
"""\
893
date
894
2024-01-01
895
2024-01-02
896
hello
897
"""
898
)
899
out = pl.read_csv(csv.encode(), try_parse_dates=True)
900
assert out.dtypes == [pl.String]
901
902
903
def test_csv_infer_date_dtype(chunk_override: None) -> None:
904
csv = textwrap.dedent(
905
"""\
906
date
907
2024-01-01
908
"2024-01-02"
909
910
2024-01-04
911
"""
912
)
913
out = pl.read_csv(csv.encode(), try_parse_dates=True)
914
expected = pl.DataFrame(
915
{
916
"date": [
917
date(2024, 1, 1),
918
date(2024, 1, 2),
919
None,
920
date(2024, 1, 4),
921
]
922
}
923
)
924
assert_frame_equal(out, expected)
925
926
927
def test_csv_date_dtype_ignore_errors(chunk_override: None) -> None:
928
csv = textwrap.dedent(
929
"""\
930
date
931
hello
932
2024-01-02
933
world
934
!!
935
"""
936
)
937
out = pl.read_csv(
938
csv.encode(), ignore_errors=True, schema_overrides={"date": pl.Date}
939
)
940
expected = pl.DataFrame(
941
{
942
"date": [
943
None,
944
date(2024, 1, 2),
945
None,
946
None,
947
]
948
}
949
)
950
assert_frame_equal(out, expected)
951
952
953
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
954
def test_csv_globbing(chunk_override: None, io_files_path: Path) -> None:
955
path = io_files_path / "foods*.csv"
956
df = pl.read_csv(path)
957
assert df.shape == (135, 4)
958
959
with PlMonkeyPatch.context() as mp:
960
mp.setenv("POLARS_FORCE_ASYNC", "0")
961
962
with pytest.raises(ValueError):
963
_ = pl.read_csv(path, columns=[0, 1])
964
965
df = pl.read_csv(path, columns=["category", "sugars_g"])
966
assert df.shape == (135, 2)
967
assert df.row(-1) == ("seafood", 1)
968
assert df.row(0) == ("vegetables", 2)
969
970
with PlMonkeyPatch.context() as mp:
971
mp.setenv("POLARS_FORCE_ASYNC", "0")
972
973
with pytest.raises(ValueError):
974
_ = pl.read_csv(
975
path, schema_overrides=[pl.String, pl.Int64, pl.Int64, pl.Int64]
976
)
977
978
dtypes = {
979
"category": pl.String,
980
"calories": pl.Int32,
981
"fats_g": pl.Float32,
982
"sugars_g": pl.Int32,
983
}
984
985
df = pl.read_csv(path, schema_overrides=dtypes)
986
assert df.dtypes == list(dtypes.values())
987
988
989
def test_csv_schema_offset(chunk_override: None, foods_file_path: Path) -> None:
990
csv = textwrap.dedent(
991
"""\
992
metadata
993
line
994
col1,col2,col3
995
alpha,beta,gamma
996
1,2.0,"A"
997
3,4.0,"B"
998
5,6.0,"C"
999
"""
1000
).encode()
1001
1002
df = pl.read_csv(csv, skip_rows=3)
1003
assert df.columns == ["alpha", "beta", "gamma"]
1004
assert df.shape == (3, 3)
1005
assert df.dtypes == [pl.Int64, pl.Float64, pl.String]
1006
1007
df = pl.read_csv(csv, skip_rows=2, skip_rows_after_header=1)
1008
assert df.columns == ["col1", "col2", "col3"]
1009
assert df.shape == (3, 3)
1010
assert df.dtypes == [pl.Int64, pl.Float64, pl.String]
1011
1012
df = pl.scan_csv(foods_file_path, skip_rows=4).collect()
1013
assert df.columns == ["fruit", "60", "0", "11"]
1014
assert df.shape == (23, 4)
1015
assert df.dtypes == [pl.String, pl.Int64, pl.Float64, pl.Int64]
1016
1017
df = pl.scan_csv(foods_file_path, skip_rows_after_header=24).collect()
1018
assert df.columns == ["category", "calories", "fats_g", "sugars_g"]
1019
assert df.shape == (3, 4)
1020
assert df.dtypes == [pl.String, pl.Int64, pl.Int64, pl.Int64]
1021
1022
df = pl.scan_csv(
1023
foods_file_path, skip_rows_after_header=24, infer_schema_length=1
1024
).collect()
1025
assert df.columns == ["category", "calories", "fats_g", "sugars_g"]
1026
assert df.shape == (3, 4)
1027
assert df.dtypes == [pl.String, pl.Int64, pl.Int64, pl.Int64]
1028
1029
1030
def test_empty_string_missing_round_trip(chunk_override: None) -> None:
1031
df = pl.DataFrame({"varA": ["A", "", None], "varB": ["B", "", None]})
1032
for null in (None, "NA", "NULL", r"\N"):
1033
f = io.BytesIO()
1034
df.write_csv(f, null_value=null)
1035
f.seek(0)
1036
df_read = pl.read_csv(f, null_values=null)
1037
assert_frame_equal(df, df_read)
1038
1039
1040
def test_write_csv_separator(chunk_override: None) -> None:
1041
df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
1042
f = io.BytesIO()
1043
df.write_csv(f, separator="\t")
1044
f.seek(0)
1045
assert f.read() == b"a\tb\n1\t1\n2\t2\n3\t3\n"
1046
f.seek(0)
1047
assert_frame_equal(df, pl.read_csv(f, separator="\t"))
1048
1049
1050
def test_write_csv_line_terminator(chunk_override: None) -> None:
1051
df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
1052
f = io.BytesIO()
1053
df.write_csv(f, line_terminator="\r\n")
1054
f.seek(0)
1055
assert f.read() == b"a,b\r\n1,1\r\n2,2\r\n3,3\r\n"
1056
f.seek(0)
1057
assert_frame_equal(df, pl.read_csv(f, eol_char="\n"))
1058
1059
1060
def test_escaped_null_values(chunk_override: None) -> None:
1061
csv = textwrap.dedent(
1062
"""\
1063
"a","b","c"
1064
"a","n/a","NA"
1065
"None","2","3.0"
1066
"""
1067
)
1068
f = io.StringIO(csv)
1069
df = pl.read_csv(
1070
f,
1071
null_values={"a": "None", "b": "n/a", "c": "NA"},
1072
schema_overrides={"a": pl.String, "b": pl.Int64, "c": pl.Float64},
1073
)
1074
assert df[1, "a"] is None
1075
assert df[0, "b"] is None
1076
assert df[0, "c"] is None
1077
1078
1079
def test_quoting_round_trip(chunk_override: None) -> None:
1080
f = io.BytesIO()
1081
df = pl.DataFrame(
1082
{
1083
"a": [
1084
"tab,separated,field",
1085
"newline\nseparated\nfield",
1086
'quote"separated"field',
1087
]
1088
}
1089
)
1090
df.write_csv(f)
1091
f.seek(0)
1092
read_df = pl.read_csv(f)
1093
assert_frame_equal(read_df, df)
1094
1095
1096
def test_csv_field_schema_inference_with_whitespace(chunk_override: None) -> None:
1097
csv = """\
1098
bool,bool-,-bool,float,float-,-float,int,int-,-int
1099
true,true , true,1.2,1.2 , 1.2,1,1 , 1
1100
"""
1101
df = pl.read_csv(io.StringIO(csv), has_header=True)
1102
expected = pl.DataFrame(
1103
{
1104
"bool": [True],
1105
"bool-": ["true "],
1106
"-bool": [" true"],
1107
"float": [1.2],
1108
"float-": ["1.2 "],
1109
"-float": [" 1.2"],
1110
"int": [1],
1111
"int-": ["1 "],
1112
"-int": [" 1"],
1113
}
1114
)
1115
assert_frame_equal(df, expected)
1116
1117
1118
def test_fallback_chrono_parser(chunk_override: None) -> None:
1119
data = textwrap.dedent(
1120
"""\
1121
date_1,date_2
1122
2021-01-01,2021-1-1
1123
2021-02-02,2021-2-2
1124
2021-10-10,2021-10-10
1125
"""
1126
)
1127
df = pl.read_csv(data.encode(), try_parse_dates=True)
1128
assert df.null_count().row(0) == (0, 0)
1129
1130
1131
def test_tz_aware_try_parse_dates(chunk_override: None) -> None:
1132
data = (
1133
"a,b,c,d\n"
1134
"2020-01-01T02:00:00+01:00,2021-04-28T00:00:00+02:00,2021-03-28T00:00:00+01:00,2\n"
1135
"2020-01-01T03:00:00+01:00,2021-04-29T00:00:00+02:00,2021-03-29T00:00:00+02:00,3\n"
1136
)
1137
result = pl.read_csv(io.StringIO(data), try_parse_dates=True)
1138
expected = pl.DataFrame(
1139
{
1140
"a": [
1141
datetime(2020, 1, 1, 1, tzinfo=timezone.utc),
1142
datetime(2020, 1, 1, 2, tzinfo=timezone.utc),
1143
],
1144
"b": [
1145
datetime(2021, 4, 27, 22, tzinfo=timezone.utc),
1146
datetime(2021, 4, 28, 22, tzinfo=timezone.utc),
1147
],
1148
"c": [
1149
datetime(2021, 3, 27, 23, tzinfo=timezone.utc),
1150
datetime(2021, 3, 28, 22, tzinfo=timezone.utc),
1151
],
1152
"d": [2, 3],
1153
}
1154
)
1155
assert_frame_equal(result, expected)
1156
1157
1158
@pytest.mark.parametrize("try_parse_dates", [True, False])
1159
@pytest.mark.parametrize("time_unit", ["ms", "us", "ns"])
1160
def test_csv_overwrite_datetime_dtype(
1161
chunk_override: None, try_parse_dates: bool, time_unit: TimeUnit
1162
) -> None:
1163
data = """\
1164
a
1165
2020-1-1T00:00:00.123456789
1166
2020-1-2T00:00:00.987654321
1167
2020-1-3T00:00:00.132547698
1168
"""
1169
result = pl.read_csv(
1170
io.StringIO(data),
1171
try_parse_dates=try_parse_dates,
1172
schema_overrides={"a": pl.Datetime(time_unit)},
1173
)
1174
expected = pl.DataFrame(
1175
{
1176
"a": pl.Series(
1177
[
1178
"2020-01-01T00:00:00.123456789",
1179
"2020-01-02T00:00:00.987654321",
1180
"2020-01-03T00:00:00.132547698",
1181
]
1182
).str.to_datetime(time_unit=time_unit)
1183
}
1184
)
1185
assert_frame_equal(result, expected)
1186
1187
1188
def test_csv_string_escaping(chunk_override: None) -> None:
1189
df = pl.DataFrame({"a": ["Free trip to A,B", '''Special rate "1.79"''']})
1190
f = io.BytesIO()
1191
df.write_csv(f)
1192
f.seek(0)
1193
df_read = pl.read_csv(f)
1194
assert_frame_equal(df_read, df)
1195
1196
1197
@pytest.mark.write_disk
1198
def test_glob_csv(
1199
chunk_override: None, df_no_lists: pl.DataFrame, tmp_path: Path
1200
) -> None:
1201
tmp_path.mkdir(exist_ok=True)
1202
1203
df = df_no_lists.drop("strings_nulls")
1204
file_path = tmp_path / "small.csv"
1205
df.write_csv(file_path)
1206
1207
path_glob = tmp_path / "small*.csv"
1208
assert pl.scan_csv(path_glob).collect().shape == (3, 12)
1209
assert pl.read_csv(path_glob).shape == (3, 12)
1210
1211
1212
def test_csv_whitespace_separator_at_start_do_not_skip(chunk_override: None) -> None:
1213
csv = "\t\t\t\t0\t1"
1214
result = pl.read_csv(csv.encode(), separator="\t", has_header=False)
1215
expected = {
1216
"column_1": [None],
1217
"column_2": [None],
1218
"column_3": [None],
1219
"column_4": [None],
1220
"column_5": [0],
1221
"column_6": [1],
1222
}
1223
assert result.to_dict(as_series=False) == expected
1224
1225
1226
def test_csv_whitespace_separator_at_end_do_not_skip(chunk_override: None) -> None:
1227
csv = "0\t1\t\t\t\t"
1228
result = pl.read_csv(csv.encode(), separator="\t", has_header=False)
1229
expected = {
1230
"column_1": [0],
1231
"column_2": [1],
1232
"column_3": [None],
1233
"column_4": [None],
1234
"column_5": [None],
1235
"column_6": [None],
1236
}
1237
assert result.to_dict(as_series=False) == expected
1238
1239
1240
def test_csv_multiple_null_values(chunk_override: None) -> None:
1241
df = pl.DataFrame(
1242
{
1243
"a": [1, 2, None, 4],
1244
"b": ["2022-01-01", "__NA__", "", "NA"],
1245
}
1246
)
1247
f = io.BytesIO()
1248
df.write_csv(f)
1249
f.seek(0)
1250
1251
df2 = pl.read_csv(f, null_values=["__NA__", "NA"])
1252
expected = pl.DataFrame(
1253
{
1254
"a": [1, 2, None, 4],
1255
"b": ["2022-01-01", None, "", None],
1256
}
1257
)
1258
assert_frame_equal(df2, expected)
1259
1260
1261
def test_different_eol_char(chunk_override: None) -> None:
1262
csv = "a,1,10;b,2,20;c,3,30"
1263
expected = pl.DataFrame(
1264
{"column_1": ["a", "b", "c"], "column_2": [1, 2, 3], "column_3": [10, 20, 30]}
1265
)
1266
assert_frame_equal(
1267
pl.read_csv(csv.encode(), eol_char=";", has_header=False), expected
1268
)
1269
1270
1271
def test_csv_write_escape_headers(chunk_override: None) -> None:
1272
df0 = pl.DataFrame({"col,1": ["data,1"], 'col"2': ['data"2'], "col:3": ["data:3"]})
1273
out = io.BytesIO()
1274
df0.write_csv(out)
1275
assert out.getvalue() == b'"col,1","col""2",col:3\n"data,1","data""2",data:3\n'
1276
1277
df1 = pl.DataFrame({"c,o,l,u,m,n": [123]})
1278
out = io.BytesIO()
1279
df1.write_csv(out)
1280
1281
out.seek(0)
1282
df2 = pl.read_csv(out)
1283
assert_frame_equal(df1, df2)
1284
assert df2.schema == {"c,o,l,u,m,n": pl.Int64}
1285
1286
1287
def test_csv_write_escape_newlines(chunk_override: None) -> None:
1288
df = pl.DataFrame({"escape": ["n\nn"]})
1289
f = io.BytesIO()
1290
df.write_csv(f)
1291
f.seek(0)
1292
read_df = pl.read_csv(f)
1293
assert_frame_equal(df, read_df)
1294
1295
1296
def test_skip_new_line_embedded_lines(chunk_override: None) -> None:
1297
csv = r"""a,b,c,d,e\n
1298
1,2,3,"\n Test",\n
1299
4,5,6,"Test A",\n
1300
7,8,,"Test B \n",\n"""
1301
1302
for empty_string, missing_value in ((True, ""), (False, None)):
1303
df = pl.read_csv(
1304
csv.encode(),
1305
skip_rows_after_header=1,
1306
infer_schema_length=0,
1307
missing_utf8_is_empty_string=empty_string,
1308
)
1309
assert df.to_dict(as_series=False) == {
1310
"a": ["4", "7"],
1311
"b": ["5", "8"],
1312
"c": ["6", missing_value],
1313
"d": ["Test A", "Test B \\n"],
1314
"e\\n": ["\\n", "\\n"],
1315
}
1316
1317
1318
def test_csv_schema_overrides_bool(chunk_override: None) -> None:
1319
csv = "a, b\n" + ",false\n" + ",false\n" + ",false"
1320
df = pl.read_csv(
1321
csv.encode(),
1322
schema_overrides={"a": pl.Boolean, "b": pl.Boolean},
1323
)
1324
assert df.dtypes == [pl.Boolean, pl.Boolean]
1325
1326
1327
@pytest.mark.parametrize(
1328
("fmt", "expected"),
1329
[
1330
(None, "dt\n2022-01-02T00:00:00.000000\n"),
1331
("%F %T%.3f", "dt\n2022-01-02 00:00:00.000\n"),
1332
("%Y", "dt\n2022\n"),
1333
("%m", "dt\n01\n"),
1334
("%m$%d", "dt\n01$02\n"),
1335
("%R", "dt\n00:00\n"),
1336
],
1337
)
1338
def test_datetime_format(chunk_override: None, fmt: str, expected: str) -> None:
1339
df = pl.DataFrame({"dt": [datetime(2022, 1, 2)]})
1340
csv = df.write_csv(datetime_format=fmt)
1341
assert csv == expected
1342
1343
1344
@pytest.mark.parametrize(
1345
("fmt", "expected"),
1346
[
1347
(None, "dt\n2022-01-02T00:00:00.000000+0000\n"),
1348
("%F %T%.3f%z", "dt\n2022-01-02 00:00:00.000+0000\n"),
1349
("%Y%z", "dt\n2022+0000\n"),
1350
("%m%z", "dt\n01+0000\n"),
1351
("%m$%d%z", "dt\n01$02+0000\n"),
1352
("%R%z", "dt\n00:00+0000\n"),
1353
],
1354
)
1355
@pytest.mark.parametrize("tzinfo", [timezone.utc, timezone(timedelta(hours=0))])
1356
def test_datetime_format_tz_aware(
1357
chunk_override: None, fmt: str, expected: str, tzinfo: timezone
1358
) -> None:
1359
df = pl.DataFrame({"dt": [datetime(2022, 1, 2, tzinfo=tzinfo)]})
1360
csv = df.write_csv(datetime_format=fmt)
1361
assert csv == expected
1362
1363
1364
@pytest.mark.parametrize(
1365
("tu1", "tu2", "expected"),
1366
[
1367
(
1368
"ns",
1369
"ns",
1370
"x,y\n2022-09-04T10:30:45.123000000,2022-09-04T10:30:45.123000000\n",
1371
),
1372
(
1373
"ns",
1374
"us",
1375
"x,y\n2022-09-04T10:30:45.123000000,2022-09-04T10:30:45.123000\n",
1376
),
1377
(
1378
"ns",
1379
"ms",
1380
"x,y\n2022-09-04T10:30:45.123000000,2022-09-04T10:30:45.123\n",
1381
),
1382
("us", "us", "x,y\n2022-09-04T10:30:45.123000,2022-09-04T10:30:45.123000\n"),
1383
("us", "ms", "x,y\n2022-09-04T10:30:45.123000,2022-09-04T10:30:45.123\n"),
1384
("ms", "us", "x,y\n2022-09-04T10:30:45.123,2022-09-04T10:30:45.123000\n"),
1385
("ms", "ms", "x,y\n2022-09-04T10:30:45.123,2022-09-04T10:30:45.123\n"),
1386
],
1387
)
1388
def test_datetime_format_inferred_precision(
1389
chunk_override: None, tu1: TimeUnit, tu2: TimeUnit, expected: str
1390
) -> None:
1391
df = pl.DataFrame(
1392
data={
1393
"x": [datetime(2022, 9, 4, 10, 30, 45, 123000)],
1394
"y": [datetime(2022, 9, 4, 10, 30, 45, 123000)],
1395
},
1396
schema=[
1397
("x", pl.Datetime(tu1)),
1398
("y", pl.Datetime(tu2)),
1399
],
1400
)
1401
assert expected == df.write_csv()
1402
1403
1404
def test_inferred_datetime_format_mixed(chunk_override: None) -> None:
1405
ts = pl.datetime_range(datetime(2000, 1, 1), datetime(2000, 1, 2), eager=True)
1406
df = pl.DataFrame({"naive": ts, "aware": ts.dt.replace_time_zone("UTC")})
1407
result = df.write_csv()
1408
expected = (
1409
"naive,aware\n"
1410
"2000-01-01T00:00:00.000000,2000-01-01T00:00:00.000000+0000\n"
1411
"2000-01-02T00:00:00.000000,2000-01-02T00:00:00.000000+0000\n"
1412
)
1413
assert result == expected
1414
1415
1416
@pytest.mark.parametrize(
1417
("fmt", "expected"),
1418
[
1419
(None, "dt\n2022-01-02\n"),
1420
("%Y", "dt\n2022\n"),
1421
("%m", "dt\n01\n"),
1422
("%m$%d", "dt\n01$02\n"),
1423
],
1424
)
1425
def test_date_format(chunk_override: None, fmt: str, expected: str) -> None:
1426
df = pl.DataFrame({"dt": [date(2022, 1, 2)]})
1427
csv = df.write_csv(date_format=fmt)
1428
assert csv == expected
1429
1430
1431
@pytest.mark.parametrize(
1432
("fmt", "expected"),
1433
[
1434
(None, "dt\n16:15:30.000000000\n"),
1435
("%R", "dt\n16:15\n"),
1436
],
1437
)
1438
def test_time_format(chunk_override: None, fmt: str, expected: str) -> None:
1439
df = pl.DataFrame({"dt": [time(16, 15, 30)]})
1440
csv = df.write_csv(time_format=fmt)
1441
assert csv == expected
1442
1443
1444
@pytest.mark.parametrize("dtype", [pl.Float32, pl.Float64])
1445
def test_float_precision(chunk_override: None, dtype: pl.Float32 | pl.Float64) -> None:
1446
df = pl.Series("col", [1.0, 2.2, 3.33], dtype=dtype).to_frame()
1447
1448
assert df.write_csv(float_precision=None) == "col\n1.0\n2.2\n3.33\n"
1449
assert df.write_csv(float_precision=0) == "col\n1\n2\n3\n"
1450
assert df.write_csv(float_precision=1) == "col\n1.0\n2.2\n3.3\n"
1451
assert df.write_csv(float_precision=2) == "col\n1.00\n2.20\n3.33\n"
1452
assert df.write_csv(float_precision=3) == "col\n1.000\n2.200\n3.330\n"
1453
1454
1455
def test_float_scientific(chunk_override: None) -> None:
1456
df = (
1457
pl.Series(
1458
"colf64",
1459
[3.141592653589793 * mult for mult in (1e-8, 1e-3, 1e3, 1e17)],
1460
dtype=pl.Float64,
1461
)
1462
.to_frame()
1463
.with_columns(pl.col("colf64").cast(pl.Float32).alias("colf32"))
1464
)
1465
1466
assert (
1467
df.write_csv(float_precision=None, float_scientific=False)
1468
== "colf64,colf32\n0.00000003141592653589793,0.00000003141592586075603\n0.0031415926535897933,0.0031415927223861217\n3141.592653589793,3141.5927734375\n314159265358979300,314159265516355600\n"
1469
)
1470
assert (
1471
df.write_csv(float_precision=0, float_scientific=False)
1472
== "colf64,colf32\n0,0\n0,0\n3142,3142\n314159265358979328,314159265516355584\n"
1473
)
1474
assert (
1475
df.write_csv(float_precision=1, float_scientific=False)
1476
== "colf64,colf32\n0.0,0.0\n0.0,0.0\n3141.6,3141.6\n314159265358979328.0,314159265516355584.0\n"
1477
)
1478
assert (
1479
df.write_csv(float_precision=3, float_scientific=False)
1480
== "colf64,colf32\n0.000,0.000\n0.003,0.003\n3141.593,3141.593\n314159265358979328.000,314159265516355584.000\n"
1481
)
1482
1483
assert (
1484
df.write_csv(float_precision=None, float_scientific=True)
1485
== "colf64,colf32\n3.141592653589793e-8,3.1415926e-8\n3.1415926535897933e-3,3.1415927e-3\n3.141592653589793e3,3.1415928e3\n3.141592653589793e17,3.1415927e17\n"
1486
)
1487
assert (
1488
df.write_csv(float_precision=0, float_scientific=True)
1489
== "colf64,colf32\n3e-8,3e-8\n3e-3,3e-3\n3e3,3e3\n3e17,3e17\n"
1490
)
1491
assert (
1492
df.write_csv(float_precision=1, float_scientific=True)
1493
== "colf64,colf32\n3.1e-8,3.1e-8\n3.1e-3,3.1e-3\n3.1e3,3.1e3\n3.1e17,3.1e17\n"
1494
)
1495
assert (
1496
df.write_csv(float_precision=3, float_scientific=True)
1497
== "colf64,colf32\n3.142e-8,3.142e-8\n3.142e-3,3.142e-3\n3.142e3,3.142e3\n3.142e17,3.142e17\n"
1498
)
1499
1500
1501
def test_skip_rows_different_field_len(chunk_override: None) -> None:
1502
csv = io.StringIO(
1503
textwrap.dedent(
1504
"""\
1505
a,b
1506
1,A
1507
2,
1508
3,B
1509
4,
1510
"""
1511
)
1512
)
1513
for empty_string, missing_value in ((True, ""), (False, None)):
1514
csv.seek(0)
1515
assert pl.read_csv(
1516
csv, skip_rows_after_header=2, missing_utf8_is_empty_string=empty_string
1517
).to_dict(as_series=False) == {
1518
"a": [3, 4],
1519
"b": ["B", missing_value],
1520
}
1521
1522
1523
def test_duplicated_columns(chunk_override: None) -> None:
1524
csv = textwrap.dedent(
1525
"""a,a
1526
1,2
1527
"""
1528
)
1529
assert pl.read_csv(csv.encode()).columns == ["a", "a_duplicated_0"]
1530
new = ["c", "d"]
1531
assert pl.read_csv(csv.encode(), new_columns=new).columns == new
1532
1533
1534
def test_error_message(chunk_override: None) -> None:
1535
data = io.StringIO("target,wind,energy,miso\n1,2,3,4\n1,2,1e5,1\n")
1536
with pytest.raises(
1537
ComputeError,
1538
match=r"could not parse `1e5` as dtype `i64` at column 'energy' \(column number 3\)",
1539
):
1540
pl.read_csv(data, infer_schema_length=1)
1541
1542
1543
def test_csv_categorical_lifetime(chunk_override: None) -> None:
1544
# escaped strings do some heap allocates in the builder
1545
# this tests of the lifetimes remains valid
1546
csv = textwrap.dedent(
1547
r"""
1548
a,b
1549
"needs_escape",b
1550
"" ""needs" escape" foo"",b
1551
"" ""needs" escape" foo"",
1552
"""
1553
)
1554
1555
df = pl.read_csv(
1556
csv.encode(), schema_overrides={"a": pl.Categorical, "b": pl.Categorical}
1557
)
1558
assert df.dtypes == [pl.Categorical, pl.Categorical]
1559
assert df.to_dict(as_series=False) == {
1560
"a": ["needs_escape", ' "needs escape foo', ' "needs escape foo'],
1561
"b": ["b", "b", None],
1562
}
1563
1564
assert (df["a"] == df["b"]).to_list() == [False, False, None]
1565
1566
1567
def test_csv_categorical_categorical_merge(chunk_override: None) -> None:
1568
N = 50
1569
f = io.BytesIO()
1570
pl.DataFrame({"x": ["A"] * N + ["B"] * N}).write_csv(f)
1571
f.seek(0)
1572
assert pl.read_csv(
1573
f, schema_overrides={"x": pl.Categorical}, sample_size=10
1574
).unique(maintain_order=True)["x"].to_list() == ["A", "B"]
1575
1576
1577
@pytest.mark.write_disk
1578
def test_batched_csv_reader(chunk_override: None, foods_file_path: Path) -> None:
1579
with pytest.deprecated_call():
1580
reader = pl.read_csv_batched(foods_file_path, batch_size=4)
1581
assert isinstance(reader, BatchedCsvReader)
1582
1583
batches = reader.next_batches(5)
1584
assert batches is not None
1585
out = pl.concat(batches)
1586
assert_frame_equal(out, pl.read_csv(foods_file_path).head(out.height))
1587
1588
# the final batch of the low-memory variant is different
1589
reader = pl.read_csv_batched(foods_file_path, batch_size=4, low_memory=True)
1590
batches = reader.next_batches(10)
1591
assert batches is not None
1592
1593
assert_frame_equal(pl.concat(batches), pl.read_csv(foods_file_path))
1594
1595
reader = pl.read_csv_batched(foods_file_path, batch_size=4, low_memory=True)
1596
batches = reader.next_batches(10)
1597
assert_frame_equal(pl.concat(batches), pl.read_csv(foods_file_path)) # type: ignore[arg-type]
1598
1599
# ragged lines
1600
with NamedTemporaryFile() as tmp:
1601
data = b"A\nB,ragged\nC"
1602
tmp.write(data)
1603
tmp.seek(0)
1604
1605
expected = pl.DataFrame({"A": ["B", "C"]})
1606
batches = pl.read_csv_batched(
1607
tmp.name,
1608
has_header=True,
1609
truncate_ragged_lines=True,
1610
).next_batches(1)
1611
1612
assert batches is not None
1613
assert_frame_equal(pl.concat(batches), expected)
1614
1615
1616
def test_batched_csv_reader_empty(chunk_override: None, io_files_path: Path) -> None:
1617
with pytest.deprecated_call():
1618
empty_csv = io_files_path / "empty.csv"
1619
with pytest.raises(NoDataError, match="empty CSV"):
1620
pl.read_csv_batched(source=empty_csv)
1621
1622
reader = pl.read_csv_batched(source=empty_csv, raise_if_empty=False)
1623
assert reader.next_batches(1) is None
1624
1625
1626
def test_batched_csv_reader_all_batches(
1627
chunk_override: None, foods_file_path: Path
1628
) -> None:
1629
with pytest.deprecated_call():
1630
for new_columns in [None, ["Category", "Calories", "Fats_g", "Sugars_g"]]:
1631
out = pl.read_csv(foods_file_path, new_columns=new_columns)
1632
reader = pl.read_csv_batched(
1633
foods_file_path, new_columns=new_columns, batch_size=4
1634
)
1635
batches = reader.next_batches(5)
1636
batched_dfs = []
1637
1638
while batches:
1639
batched_dfs.extend(batches)
1640
batches = reader.next_batches(5)
1641
1642
assert all(x.height > 0 for x in batched_dfs)
1643
1644
batched_concat_df = pl.concat(batched_dfs, rechunk=True)
1645
assert_frame_equal(out, batched_concat_df)
1646
1647
1648
def test_batched_csv_reader_no_batches(
1649
chunk_override: None, foods_file_path: Path
1650
) -> None:
1651
with pytest.deprecated_call():
1652
reader = pl.read_csv_batched(foods_file_path, batch_size=4)
1653
batches = reader.next_batches(0)
1654
1655
assert batches is None
1656
1657
1658
def test_csv_single_categorical_null(chunk_override: None) -> None:
1659
f = io.BytesIO()
1660
pl.DataFrame(
1661
{
1662
"x": ["A"],
1663
"y": [None],
1664
"z": ["A"],
1665
}
1666
).write_csv(f)
1667
f.seek(0)
1668
1669
df = pl.read_csv(
1670
f,
1671
schema_overrides={"y": pl.Categorical},
1672
)
1673
1674
assert df.dtypes == [pl.String, pl.Categorical, pl.String]
1675
assert df.to_dict(as_series=False) == {"x": ["A"], "y": [None], "z": ["A"]}
1676
1677
1678
def test_csv_quoted_missing(chunk_override: None) -> None:
1679
csv = (
1680
'"col1"|"col2"|"col3"|"col4"\n'
1681
'"0"|"Free text with a line\nbreak"|"123"|"456"\n'
1682
'"1"|"Free text without a linebreak"|""|"789"\n'
1683
'"0"|"Free text with \ntwo \nlinebreaks"|"101112"|"131415"'
1684
)
1685
result = pl.read_csv(
1686
csv.encode(), separator="|", schema_overrides={"col3": pl.Int32}
1687
)
1688
expected = pl.DataFrame(
1689
{
1690
"col1": [0, 1, 0],
1691
"col2": [
1692
"Free text with a line\nbreak",
1693
"Free text without a linebreak",
1694
"Free text with \ntwo \nlinebreaks",
1695
],
1696
"col3": [123, None, 101112],
1697
"col4": [456, 789, 131415],
1698
},
1699
schema_overrides={"col3": pl.Int32},
1700
)
1701
assert_frame_equal(result, expected)
1702
1703
1704
def test_csv_write_tz_aware(chunk_override: None) -> None:
1705
df = pl.DataFrame({"times": datetime(2021, 1, 1)}).with_columns(
1706
pl.col("times")
1707
.dt.replace_time_zone("UTC")
1708
.dt.convert_time_zone("Europe/Zurich")
1709
)
1710
assert df.write_csv() == "times\n2021-01-01T01:00:00.000000+0100\n"
1711
1712
1713
def test_csv_statistics_offset(chunk_override: None) -> None:
1714
# this would fail if the statistics sample did not also sample
1715
# from the end of the file
1716
# the lines at the end have larger rows as the numbers increase
1717
N = 5_000
1718
csv = "\n".join(str(x) for x in range(N))
1719
assert pl.read_csv(io.StringIO(csv), n_rows=N).height == 4999
1720
1721
1722
@pytest.mark.write_disk
1723
def test_csv_scan_categorical(chunk_override: None, tmp_path: Path) -> None:
1724
tmp_path.mkdir(exist_ok=True)
1725
1726
N = 5_000
1727
df = pl.DataFrame({"x": ["A"] * N})
1728
1729
file_path = tmp_path / "test_csv_scan_categorical.csv"
1730
df.write_csv(file_path)
1731
result = pl.scan_csv(file_path, schema_overrides={"x": pl.Categorical}).collect()
1732
1733
assert result["x"].dtype == pl.Categorical
1734
1735
1736
@pytest.mark.write_disk
1737
def test_csv_scan_new_columns_less_than_original_columns(
1738
chunk_override: None, tmp_path: Path
1739
) -> None:
1740
tmp_path.mkdir(exist_ok=True)
1741
1742
df = pl.DataFrame({"x": ["A"], "y": ["A"], "z": "A"})
1743
1744
file_path = tmp_path / "test_csv_scan_new_columns.csv"
1745
df.write_csv(file_path)
1746
result = pl.scan_csv(file_path, new_columns=["x_new", "y_new"]).collect()
1747
1748
assert result.columns == ["x_new", "y_new", "z"]
1749
1750
1751
def test_read_csv_chunked(chunk_override: None) -> None:
1752
"""Check that row count is properly functioning."""
1753
N = 10_000
1754
csv = "1\n" * N
1755
df = pl.read_csv(io.StringIO(csv), row_index_name="count")
1756
1757
# The next value should always be higher if monotonically increasing.
1758
assert df.filter(pl.col("count") < pl.col("count").shift(1)).is_empty()
1759
1760
1761
def test_read_empty_csv(chunk_override: None, io_files_path: Path) -> None:
1762
with pytest.raises(NoDataError) as err:
1763
pl.read_csv(io_files_path / "empty.csv")
1764
assert "empty CSV" in str(err.value)
1765
1766
df = pl.read_csv(io_files_path / "empty.csv", raise_if_empty=False)
1767
assert_frame_equal(df, pl.DataFrame())
1768
1769
with pytest.raises(pa.ArrowInvalid) as err:
1770
pl.read_csv(io_files_path / "empty.csv", use_pyarrow=True)
1771
assert "Empty CSV" in str(err.value)
1772
1773
df = pl.read_csv(
1774
io_files_path / "empty.csv", raise_if_empty=False, use_pyarrow=True
1775
)
1776
assert_frame_equal(df, pl.DataFrame())
1777
1778
1779
@pytest.mark.slow
1780
def test_read_web_file(chunk_override: None) -> None:
1781
url = "https://raw.githubusercontent.com/pola-rs/polars/main/examples/datasets/foods1.csv"
1782
df = pl.read_csv(url)
1783
assert df.shape == (27, 4)
1784
1785
1786
@pytest.mark.slow
1787
def test_csv_multiline_splits(chunk_override: None) -> None:
1788
# create a very unlikely csv file with many multilines in a
1789
# single field (e.g. 5000). polars must reject multi-threading here
1790
# as it cannot find proper file chunks without sequentially parsing.
1791
1792
np.random.seed(0)
1793
f = io.BytesIO()
1794
1795
def some_multiline_str(n: int) -> str:
1796
strs = []
1797
strs.append('"')
1798
# sample between 0-5 so it is likely the multiline field also gets 3 separators.
1799
strs.extend(f"{'xx,' * length}" for length in np.random.randint(0, 5, n))
1800
1801
strs.append('"')
1802
return "\n".join(strs)
1803
1804
for _ in range(4):
1805
f.write(f"field1,field2,{some_multiline_str(5000)}\n".encode())
1806
1807
f.seek(0)
1808
assert pl.read_csv(f, has_header=False).shape == (4, 3)
1809
1810
1811
def test_read_csv_n_rows_outside_heuristic(chunk_override: None) -> None:
1812
# create a fringe case csv file that breaks the heuristic determining how much of
1813
# the file to read, and ensure n_rows is still adhered to
1814
1815
f = io.StringIO()
1816
1817
f.write(",,,?????????\n" * 1000)
1818
f.write("?????????????????????????????????????????????????,,,\n")
1819
f.write(",,,?????????\n" * 1048)
1820
1821
f.seek(0)
1822
assert pl.read_csv(f, n_rows=2048, has_header=False).shape == (2048, 4)
1823
1824
1825
def test_read_csv_comments_on_top_with_schema_11667(chunk_override: None) -> None:
1826
csv = """
1827
# This is a comment
1828
A,B
1829
1,Hello
1830
2,World
1831
""".strip()
1832
1833
schema = {
1834
"A": pl.Int32(),
1835
"B": pl.Utf8(),
1836
}
1837
1838
df = pl.read_csv(io.StringIO(csv), comment_prefix="#", schema=schema)
1839
assert df.height == 2
1840
assert df.schema == schema
1841
1842
1843
def test_write_csv_stdout_stderr(
1844
chunk_override: None, capsys: pytest.CaptureFixture[str]
1845
) -> None:
1846
df = pl.DataFrame(
1847
{
1848
"numbers": [1, 2, 3],
1849
"strings": ["test", "csv", "stdout"],
1850
"dates": [date(2023, 1, 1), date(2023, 1, 2), date(2023, 1, 3)],
1851
}
1852
)
1853
df.write_csv(sys.stdout)
1854
captured = capsys.readouterr()
1855
assert captured.out == (
1856
"numbers,strings,dates\n"
1857
"1,test,2023-01-01\n"
1858
"2,csv,2023-01-02\n"
1859
"3,stdout,2023-01-03\n"
1860
)
1861
1862
df.write_csv(sys.stderr)
1863
captured = capsys.readouterr()
1864
assert captured.err == (
1865
"numbers,strings,dates\n"
1866
"1,test,2023-01-01\n"
1867
"2,csv,2023-01-02\n"
1868
"3,stdout,2023-01-03\n"
1869
)
1870
1871
1872
def test_csv_9929(chunk_override: None) -> None:
1873
df = pl.DataFrame({"nrs": [1, 2, 3]})
1874
f = io.BytesIO()
1875
df.write_csv(f)
1876
f.seek(0)
1877
with pytest.raises(NoDataError):
1878
pl.read_csv(f, skip_rows=10**6)
1879
1880
1881
def test_csv_quote_styles(chunk_override: None) -> None:
1882
class TemporalFormats(TypedDict):
1883
datetime_format: str
1884
time_format: str
1885
1886
temporal_formats: TemporalFormats = {
1887
"datetime_format": "%Y-%m-%dT%H:%M:%S",
1888
"time_format": "%H:%M:%S",
1889
}
1890
1891
dtm = datetime(2077, 7, 5, 3, 1, 0)
1892
dt = dtm.date()
1893
tm = dtm.time()
1894
1895
df = pl.DataFrame(
1896
{
1897
"float": [1.0, 2.0, None],
1898
"string": ["a", "a,bc", '"hello'],
1899
"int": [1, 2, 3],
1900
"bool": [True, False, None],
1901
"date": [dt, None, dt],
1902
"datetime": [None, dtm, dtm],
1903
"time": [tm, tm, None],
1904
"decimal": [D("1.0"), D("2.0"), None],
1905
}
1906
)
1907
1908
assert df.write_csv(quote_style="always", **temporal_formats) == (
1909
'"float","string","int","bool","date","datetime","time","decimal"\n'
1910
'"1.0","a","1","true","2077-07-05","","03:01:00","1.0"\n'
1911
'"2.0","a,bc","2","false","","2077-07-05T03:01:00","03:01:00","2.0"\n'
1912
'"","""hello","3","","2077-07-05","2077-07-05T03:01:00","",""\n'
1913
)
1914
assert df.write_csv(quote_style="necessary", **temporal_formats) == (
1915
"float,string,int,bool,date,datetime,time,decimal\n"
1916
"1.0,a,1,true,2077-07-05,,03:01:00,1.0\n"
1917
'2.0,"a,bc",2,false,,2077-07-05T03:01:00,03:01:00,2.0\n'
1918
',"""hello",3,,2077-07-05,2077-07-05T03:01:00,,\n'
1919
)
1920
assert df.write_csv(quote_style="never", **temporal_formats) == (
1921
"float,string,int,bool,date,datetime,time,decimal\n"
1922
"1.0,a,1,true,2077-07-05,,03:01:00,1.0\n"
1923
"2.0,a,bc,2,false,,2077-07-05T03:01:00,03:01:00,2.0\n"
1924
',"hello,3,,2077-07-05,2077-07-05T03:01:00,,\n'
1925
)
1926
assert df.write_csv(
1927
quote_style="non_numeric", quote_char="8", **temporal_formats
1928
) == (
1929
"8float8,8string8,8int8,8bool8,8date8,8datetime8,8time8,8decimal8\n"
1930
"1.0,8a8,1,8true8,82077-07-058,,803:01:008,1.0\n"
1931
"2.0,8a,bc8,2,8false8,,82077-07-05T03:01:008,803:01:008,2.0\n"
1932
',8"hello8,3,,82077-07-058,82077-07-05T03:01:008,,\n'
1933
)
1934
1935
1936
def test_ignore_errors_casting_dtypes(chunk_override: None) -> None:
1937
csv = """inventory
1938
10
1939
1940
400
1941
90
1942
"""
1943
1944
assert pl.read_csv(
1945
source=io.StringIO(csv),
1946
schema_overrides={"inventory": pl.Int8},
1947
ignore_errors=True,
1948
).to_dict(as_series=False) == {"inventory": [10, None, None, 90]}
1949
1950
with pytest.raises(ComputeError):
1951
pl.read_csv(
1952
source=io.StringIO(csv),
1953
schema_overrides={"inventory": pl.Int8},
1954
ignore_errors=False,
1955
)
1956
1957
1958
def test_ignore_errors_date_parser(chunk_override: None) -> None:
1959
data_invalid_date = "int,float,date\n3,3.4,X"
1960
with pytest.raises(ComputeError):
1961
pl.read_csv(
1962
source=io.StringIO(data_invalid_date),
1963
schema_overrides={"date": pl.Date},
1964
ignore_errors=False,
1965
)
1966
1967
1968
def test_csv_ragged_lines(chunk_override: None) -> None:
1969
expected = {"A": ["B", "C"]}
1970
assert (
1971
pl.read_csv(
1972
io.StringIO("A\nB,ragged\nC"), has_header=True, truncate_ragged_lines=True
1973
).to_dict(as_series=False)
1974
== expected
1975
)
1976
assert (
1977
pl.read_csv(
1978
io.StringIO("A\nB\nC,ragged"), has_header=True, truncate_ragged_lines=True
1979
).to_dict(as_series=False)
1980
== expected
1981
)
1982
1983
for s in ["A\nB,ragged\nC", "A\nB\nC,ragged"]:
1984
with pytest.raises(ComputeError, match=r"found more fields than defined"):
1985
pl.read_csv(io.StringIO(s), has_header=True, truncate_ragged_lines=False)
1986
with pytest.raises(ComputeError, match=r"found more fields than defined"):
1987
pl.read_csv(io.StringIO(s), has_header=True, truncate_ragged_lines=False)
1988
1989
1990
@pytest.mark.may_fail_auto_streaming # missing_columns parameter for CSV
1991
def test_provide_schema(chunk_override: None) -> None:
1992
# can be used to overload schema with ragged csv files
1993
assert pl.read_csv(
1994
io.StringIO("A\nB,ragged\nC"),
1995
has_header=False,
1996
schema={"A": pl.String, "B": pl.String, "C": pl.String},
1997
).to_dict(as_series=False) == {
1998
"A": ["A", "B", "C"],
1999
"B": [None, "ragged", None],
2000
"C": [None, None, None],
2001
}
2002
2003
2004
def test_custom_writable_object(chunk_override: None) -> None:
2005
df = pl.DataFrame({"a": [10, 20, 30], "b": ["x", "y", "z"]})
2006
2007
class CustomBuffer:
2008
writes: list[bytes]
2009
2010
def __init__(self) -> None:
2011
self.writes = []
2012
2013
def write(self, data: bytes) -> int:
2014
self.writes.append(data)
2015
return len(data)
2016
2017
buf = CustomBuffer()
2018
df.write_csv(buf) # type: ignore[call-overload]
2019
2020
assert b"".join(buf.writes) == b"a,b\n10,x\n20,y\n30,z\n"
2021
2022
2023
@pytest.mark.parametrize(
2024
("csv", "expected"),
2025
[
2026
(b"a,b\n1,2\n1,2\n", pl.DataFrame({"a": [1, 1], "b": [2, 2]})),
2027
(b"a,b\n1,2\n1,2", pl.DataFrame({"a": [1, 1], "b": [2, 2]})),
2028
(b"a\n1\n1\n", pl.DataFrame({"a": [1, 1]})),
2029
(b"a\n1\n1", pl.DataFrame({"a": [1, 1]})),
2030
],
2031
ids=[
2032
"multiple columns, ends with LF",
2033
"multiple columns, ends with non-LF",
2034
"single column, ends with LF",
2035
"single column, ends with non-LF",
2036
],
2037
)
2038
def test_read_filelike_object_12266(
2039
chunk_override: None, csv: bytes, expected: pl.DataFrame
2040
) -> None:
2041
buf = io.BufferedReader(io.BytesIO(csv)) # type: ignore[arg-type]
2042
df = pl.read_csv(buf)
2043
assert_frame_equal(df, expected)
2044
2045
2046
def test_read_filelike_object_12404(chunk_override: None) -> None:
2047
expected = pl.DataFrame({"a": [1, 1], "b": [2, 2]})
2048
csv = expected.write_csv(line_terminator=";").encode()
2049
buf = io.BufferedReader(io.BytesIO(csv)) # type: ignore[arg-type]
2050
df = pl.read_csv(buf, eol_char=";")
2051
assert_frame_equal(df, expected)
2052
2053
2054
def test_write_csv_bom(chunk_override: None) -> None:
2055
df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
2056
f = io.BytesIO()
2057
df.write_csv(f, include_bom=True)
2058
f.seek(0)
2059
assert f.read() == b"\xef\xbb\xbfa,b\n1,1\n2,2\n3,3\n"
2060
2061
2062
def test_write_csv_batch_size_zero(chunk_override: None) -> None:
2063
df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
2064
f = io.BytesIO()
2065
with pytest.raises(ValueError, match="invalid zero value"):
2066
df.write_csv(f, batch_size=0)
2067
2068
2069
def test_empty_csv_no_raise(chunk_override: None) -> None:
2070
assert pl.read_csv(io.StringIO(), raise_if_empty=False, has_header=False).shape == (
2071
0,
2072
0,
2073
)
2074
2075
2076
def test_csv_no_new_line_last(chunk_override: None) -> None:
2077
csv = io.StringIO("a b\n1 1\n2 2\n3 2.1")
2078
assert pl.read_csv(csv, separator=" ").to_dict(as_series=False) == {
2079
"a": [1, 2, 3],
2080
"b": [1.0, 2.0, 2.1],
2081
}
2082
2083
2084
def test_invalid_csv_raise(chunk_override: None) -> None:
2085
with pytest.raises(ComputeError):
2086
pl.read_csv(
2087
b"""
2088
"WellCompletionCWI","FacilityID","ProductionMonth","ReportedHoursProdInj","ProdAccountingProductType","ReportedVolume","VolumetricActivityType"
2089
"SK0000608V001","SK BT B1H3780","202001","","GAS","1.700","PROD"
2090
"SK0127960V000","SK BT 0018977","202001","","GAS","45.500","PROD"
2091
"SK0127960V000","SK BT 0018977","
2092
""".strip()
2093
)
2094
2095
2096
@pytest.mark.write_disk
2097
def test_partial_read_compressed_file(
2098
chunk_override: None, tmp_path: Path, plmonkeypatch: PlMonkeyPatch
2099
) -> None:
2100
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "0")
2101
2102
df = pl.DataFrame(
2103
{"idx": range(1_000), "dt": date(2025, 12, 31), "txt": "hello world"}
2104
)
2105
tmp_path.mkdir(exist_ok=True)
2106
file_path = tmp_path / "large.csv.gz"
2107
bytes_io = io.BytesIO()
2108
df.write_csv(bytes_io)
2109
bytes_io.seek(0)
2110
with gzip.open(file_path, mode="wb") as f:
2111
f.write(bytes_io.getvalue())
2112
df = pl.read_csv(
2113
file_path, skip_rows=40, has_header=False, skip_rows_after_header=20, n_rows=30
2114
)
2115
assert df.shape == (30, 3)
2116
2117
2118
def test_read_csv_invalid_schema_overrides(chunk_override: None) -> None:
2119
csv = textwrap.dedent(
2120
"""\
2121
a,b
2122
1,foo
2123
2,bar
2124
3,baz
2125
"""
2126
)
2127
f = io.StringIO(csv)
2128
with pytest.raises(
2129
TypeError, match="`schema_overrides` should be of type list or dict"
2130
):
2131
pl.read_csv(f, schema_overrides={pl.Int64, pl.String}) # type: ignore[arg-type]
2132
2133
2134
def test_read_csv_invalid_schema_overrides_length(chunk_override: None) -> None:
2135
csv = textwrap.dedent(
2136
"""\
2137
a,b
2138
1,foo
2139
2,bar
2140
3,baz
2141
"""
2142
)
2143
f = io.StringIO(csv)
2144
2145
# streaming dispatches read_csv -> _scan_csv_impl which does not accept a list
2146
if (
2147
os.getenv("POLARS_AUTO_NEW_STREAMING", os.getenv("POLARS_FORCE_NEW_STREAMING"))
2148
== "1"
2149
):
2150
err = TypeError
2151
match = "expected 'schema_overrides' dict, found 'list'"
2152
else:
2153
err = InvalidOperationError # type: ignore[assignment]
2154
match = "The number of schema overrides must be less than or equal to the number of fields"
2155
2156
with pytest.raises(err, match=match):
2157
pl.read_csv(f, schema_overrides=[pl.Int64, pl.String, pl.Boolean])
2158
2159
2160
def test_schema_overrides_dict_with_nonexistent_columns(chunk_override: None) -> None:
2161
"""Test for issue #20903: schema_overrides should work consistently.
2162
2163
When schema_overrides is a dict with the same length as the number of columns
2164
but contains non-existent column names, it should still work by name (not position).
2165
Currently this fails because Polars incorrectly applies the overrides positionally
2166
when the dict length matches the column count.
2167
"""
2168
csv = textwrap.dedent(
2169
"""\
2170
a,b
2171
1,hi
2172
"""
2173
)
2174
2175
# This should work: override 'a' to Int64, 'c' doesn't exist so should be ignored
2176
f = io.StringIO(csv)
2177
df = pl.read_csv(f, schema_overrides={"a": pl.Int64, "c": pl.Int64})
2178
2179
# Expected: column 'a' is Int64, column 'b' is inferred as String
2180
assert df.schema == {"a": pl.Int64, "b": pl.String}
2181
assert df.to_dict(as_series=False) == {"a": [1], "b": ["hi"]}
2182
2183
# Sanity check: this works when we have a different number of overrides
2184
f = io.StringIO(csv)
2185
df2 = pl.read_csv(f, schema_overrides={"a": pl.Int64, "c": pl.Int64, "d": pl.Int64})
2186
assert df2.schema == {"a": pl.Int64, "b": pl.String}
2187
assert df2.to_dict(as_series=False) == {"a": [1], "b": ["hi"]}
2188
2189
2190
@pytest.mark.parametrize("columns", [["b"], "b"])
2191
def test_read_csv_single_column(chunk_override: None, columns: list[str] | str) -> None:
2192
csv = textwrap.dedent(
2193
"""\
2194
a,b,c
2195
1,2,3
2196
4,5,6
2197
"""
2198
)
2199
f = io.StringIO(csv)
2200
df = pl.read_csv(f, columns=columns)
2201
expected = pl.DataFrame({"b": [2, 5]})
2202
assert_frame_equal(df, expected)
2203
2204
2205
def test_csv_invalid_escape_utf8_14960(chunk_override: None) -> None:
2206
with pytest.raises(ComputeError, match=r"Field .* is not properly escaped"):
2207
pl.read_csv('col1\n""•'.encode())
2208
2209
2210
def test_csv_invalid_escape(chunk_override: None) -> None:
2211
with pytest.raises(ComputeError):
2212
pl.read_csv(b'col1,col2\n"a,b')
2213
2214
2215
def test_csv_escape_cf_15349(chunk_override: None) -> None:
2216
f = io.BytesIO()
2217
df = pl.DataFrame({"test": ["normal", "with\rcr"]})
2218
df.write_csv(f)
2219
f.seek(0)
2220
assert f.read() == b'test\nnormal\n"with\rcr"\n'
2221
2222
2223
@pytest.mark.write_disk
2224
@pytest.mark.parametrize("streaming", [True, False])
2225
def test_skip_rows_after_header(
2226
chunk_override: None, tmp_path: Path, streaming: bool
2227
) -> None:
2228
tmp_path.mkdir(exist_ok=True)
2229
path = tmp_path / "data.csv"
2230
2231
df = pl.Series("a", [1, 2, 3, 4, 5], dtype=pl.Int64).to_frame()
2232
df.write_csv(path)
2233
2234
skip = 2
2235
expect = df.slice(skip)
2236
out = pl.scan_csv(path, skip_rows_after_header=skip).collect(
2237
engine="streaming" if streaming else "in-memory"
2238
)
2239
2240
assert_frame_equal(out, expect)
2241
2242
2243
@pytest.mark.parametrize("use_pyarrow", [True, False])
2244
def test_skip_rows_after_header_pyarrow(
2245
chunk_override: None, use_pyarrow: bool
2246
) -> None:
2247
csv = textwrap.dedent(
2248
"""\
2249
foo,bar
2250
1,2
2251
3,4
2252
5,6
2253
"""
2254
)
2255
f = io.StringIO(csv)
2256
df = pl.read_csv(f, skip_rows_after_header=1, use_pyarrow=use_pyarrow)
2257
expected = pl.DataFrame({"foo": [3, 5], "bar": [4, 6]})
2258
assert_frame_equal(df, expected)
2259
2260
2261
def test_read_csv_float_type_decimal_comma(chunk_override: None) -> None:
2262
floats = b"a;b\n12,239;1,233\n13,908;87,32"
2263
read = pl.read_csv(floats, decimal_comma=True, separator=";")
2264
assert read.dtypes == [pl.Float64] * 2
2265
assert read.to_dict(as_series=False) == {"a": [12.239, 13.908], "b": [1.233, 87.32]}
2266
2267
2268
def test_read_csv_decimal_type_decimal_comma_24414(chunk_override: None) -> None:
2269
schema = pl.Schema({"a": pl.Decimal(scale=3), "b": pl.Decimal(scale=2)})
2270
2271
csv_dot = b"a,b\n12.239,1.233\n13.908,87.32"
2272
out_dot = pl.read_csv(csv_dot, schema=schema)
2273
2274
csv = b"a;b\n12,239;1,233\n13,908;87,32"
2275
out = pl.read_csv(csv, decimal_comma=True, separator=";", schema=schema)
2276
assert_frame_equal(out_dot, out)
2277
2278
csv = b"a;b\n 12,239;1,233\n 13,908;87,32"
2279
out = pl.read_csv(csv, decimal_comma=True, separator=";", schema=schema)
2280
assert_frame_equal(out_dot, out)
2281
2282
csv = b'a,b\n"12,239","1,233"\n"13,908","87,32"'
2283
out = pl.read_csv(csv, decimal_comma=True, schema=schema)
2284
assert_frame_equal(out_dot, out)
2285
2286
2287
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
2288
def test_fsspec_not_available(chunk_override: None) -> None:
2289
with PlMonkeyPatch.context() as mp:
2290
mp.setenv("POLARS_FORCE_ASYNC", "0")
2291
mp.setattr("polars.io._utils._FSSPEC_AVAILABLE", False)
2292
2293
with pytest.raises(
2294
ImportError, match=r"`fsspec` is required for `storage_options` argument"
2295
):
2296
pl.read_csv(
2297
"s3://foods/cabbage.csv",
2298
storage_options={"key": "key", "secret": "secret"},
2299
)
2300
2301
2302
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
2303
def test_read_csv_dtypes_deprecated(chunk_override: None) -> None:
2304
csv = textwrap.dedent(
2305
"""\
2306
a,b,c
2307
1,2,3
2308
4,5,6
2309
"""
2310
)
2311
f = io.StringIO(csv)
2312
2313
with pytest.deprecated_call():
2314
df = pl.read_csv(f, dtypes=[pl.Int8, pl.Int8, pl.Int8]) # type: ignore[call-arg]
2315
2316
expected = pl.DataFrame(
2317
{"a": [1, 4], "b": [2, 5], "c": [3, 6]},
2318
schema={"a": pl.Int8, "b": pl.Int8, "c": pl.Int8},
2319
)
2320
assert_frame_equal(df, expected)
2321
2322
2323
def test_projection_applied_on_file_with_no_rows_16606(
2324
chunk_override: None, tmp_path: Path
2325
) -> None:
2326
tmp_path.mkdir(exist_ok=True)
2327
2328
path = tmp_path / "data.csv"
2329
2330
data = """\
2331
a,b,c,d
2332
"""
2333
2334
with path.open("w") as f:
2335
f.write(data)
2336
2337
columns = ["a", "b"]
2338
2339
out = pl.read_csv(path, columns=columns).columns
2340
assert out == columns
2341
2342
out = pl.scan_csv(path).select(columns).collect().columns
2343
assert out == columns
2344
2345
2346
@pytest.mark.write_disk
2347
def test_write_csv_to_dangling_file_17328(
2348
chunk_override: None, df_no_lists: pl.DataFrame, tmp_path: Path
2349
) -> None:
2350
tmp_path.mkdir(exist_ok=True)
2351
df_no_lists.write_csv((tmp_path / "dangling.csv").open("w"))
2352
2353
2354
@pytest.mark.may_fail_cloud # really hard to mimic this error
2355
@pytest.mark.write_disk
2356
def test_write_csv_raise_on_non_utf8_17328(
2357
chunk_override: None, df_no_lists: pl.DataFrame, tmp_path: Path
2358
) -> None:
2359
tmp_path.mkdir(exist_ok=True)
2360
with pytest.raises(InvalidOperationError, match="file encoding is not UTF-8"):
2361
df_no_lists.write_csv((tmp_path / "dangling.csv").open("w", encoding="gbk"))
2362
2363
2364
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
2365
@pytest.mark.write_disk
2366
def test_write_csv_appending_17543(chunk_override: None, tmp_path: Path) -> None:
2367
tmp_path.mkdir(exist_ok=True)
2368
df = pl.DataFrame({"col": ["value"]})
2369
with (tmp_path / "append.csv").open("w") as f:
2370
f.write("# test\n")
2371
df.write_csv(f)
2372
with (tmp_path / "append.csv").open("r") as f:
2373
assert f.readline() == "# test\n"
2374
assert pl.read_csv(f).equals(df)
2375
2376
2377
def test_write_csv_passing_params_18825(chunk_override: None) -> None:
2378
df = pl.DataFrame({"c1": [1, 2], "c2": [3, 4]})
2379
buffer = io.StringIO()
2380
df.write_csv(buffer, separator="\t", include_header=False)
2381
2382
result_str = buffer.getvalue()
2383
expected_str = "1\t3\n2\t4\n"
2384
2385
assert result_str == expected_str
2386
2387
2388
@pytest.mark.parametrize(
2389
("dtype", "df"),
2390
[
2391
(pl.Decimal(scale=2), pl.DataFrame({"x": ["0.1"]}).cast(pl.Decimal(scale=2))),
2392
(pl.Categorical, pl.DataFrame({"x": ["A"]})),
2393
(
2394
pl.Time,
2395
pl.DataFrame({"x": ["12:15:00"]}).with_columns(
2396
pl.col("x").str.strptime(pl.Time)
2397
),
2398
),
2399
],
2400
)
2401
def test_read_csv_cast_unparsable_later(
2402
chunk_override: None, dtype: pl.Decimal | pl.Categorical | pl.Time, df: pl.DataFrame
2403
) -> None:
2404
f = io.BytesIO()
2405
df.write_csv(f)
2406
f.seek(0)
2407
assert df.equals(pl.read_csv(f, schema={"x": dtype}))
2408
2409
2410
def test_csv_double_new_line(chunk_override: None) -> None:
2411
assert pl.read_csv(b"a,b,c\n\n", has_header=False).to_dict(as_series=False) == {
2412
"column_1": ["a", None],
2413
"column_2": ["b", None],
2414
"column_3": ["c", None],
2415
}
2416
2417
2418
def test_csv_quoted_newlines_skip_rows_19535(chunk_override: None) -> None:
2419
assert_frame_equal(
2420
pl.read_csv(
2421
b"""\
2422
"a\nb"
2423
0
2424
""",
2425
has_header=False,
2426
skip_rows=1,
2427
new_columns=["x"],
2428
),
2429
pl.DataFrame({"x": 0}),
2430
)
2431
2432
2433
@pytest.mark.write_disk
2434
def test_csv_read_time_dtype(chunk_override: None, tmp_path: Path) -> None:
2435
tmp_path.mkdir(exist_ok=True)
2436
path = tmp_path / "1"
2437
path.write_bytes(b"""\
2438
time
2439
00:00:00.000000000
2440
""")
2441
2442
df = pl.Series("time", [0]).cast(pl.Time()).to_frame()
2443
2444
assert_frame_equal(pl.read_csv(path, try_parse_dates=True), df)
2445
assert_frame_equal(pl.read_csv(path, schema_overrides={"time": pl.Time}), df)
2446
assert_frame_equal(pl.scan_csv(path, try_parse_dates=True).collect(), df)
2447
assert_frame_equal(pl.scan_csv(path, schema={"time": pl.Time}).collect(), df)
2448
assert_frame_equal(
2449
pl.scan_csv(path, schema={"time": pl.Time}).collect(engine="streaming"), df
2450
)
2451
2452
2453
def test_csv_try_parse_dates_leading_zero_8_digits_22167(chunk_override: None) -> None:
2454
result = pl.read_csv(
2455
io.StringIO(
2456
"a\n2025-04-06T18:56:42.617736974Z\n2025-04-06T18:57:42.77756192Z\n2025-04-06T18:58:44.56928733Z"
2457
),
2458
try_parse_dates=True,
2459
)
2460
expected = pl.DataFrame(
2461
{
2462
"a": [
2463
datetime(2025, 4, 6, 18, 56, 42, 617736, tzinfo=timezone.utc),
2464
datetime(2025, 4, 6, 18, 57, 42, 777561, tzinfo=timezone.utc),
2465
datetime(2025, 4, 6, 18, 58, 44, 569287, tzinfo=timezone.utc),
2466
]
2467
}
2468
)
2469
assert_frame_equal(result, expected)
2470
2471
2472
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
2473
def test_csv_read_time_schema_overrides(chunk_override: None) -> None:
2474
df = pl.Series("time", [0]).cast(pl.Time()).to_frame()
2475
2476
assert_frame_equal(
2477
pl.read_csv(
2478
b"""\
2479
time
2480
00:00:00.000000000
2481
""",
2482
schema_overrides=[pl.Time],
2483
),
2484
df,
2485
)
2486
2487
2488
def test_batched_csv_schema_overrides(
2489
chunk_override: None, io_files_path: Path
2490
) -> None:
2491
with pytest.deprecated_call():
2492
foods = io_files_path / "foods1.csv"
2493
batched = pl.read_csv_batched(foods, schema_overrides={"calories": pl.String})
2494
res = batched.next_batches(1)
2495
assert res is not None
2496
b = res[0]
2497
assert b["calories"].dtype == pl.String
2498
assert b.width == 4
2499
2500
2501
def test_csv_ragged_lines_20062(chunk_override: None) -> None:
2502
buf = io.StringIO("""A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V
2503
,"B",,,,,,,,,A,,,,,,,,
2504
a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,0.0,1.0,2.0,3.0
2505
""")
2506
2507
assert pl.read_csv(buf, truncate_ragged_lines=True).to_dict(as_series=False) == {
2508
"A": [None, "a"],
2509
"B": ["B", "a"],
2510
"C": [None, "a"],
2511
"D": [None, "a"],
2512
"E": [None, "a"],
2513
"F": [None, "a"],
2514
"G": [None, "a"],
2515
"H": [None, "a"],
2516
"I": [None, "a"],
2517
"J": [None, "a"],
2518
"K": ["A", "a"],
2519
"L": [None, "a"],
2520
"M": [None, "a"],
2521
"N": [None, "a"],
2522
"O": [None, "a"],
2523
"P": [None, "a"],
2524
"Q": [None, "a"],
2525
"R": [None, "a"],
2526
"S": [None, "a"],
2527
"T": [None, 0.0],
2528
"U": [None, 1.0],
2529
"V": [None, 2.0],
2530
}
2531
2532
2533
def test_csv_skip_lines(chunk_override: None) -> None:
2534
fh = io.BytesIO()
2535
fh.write(b'Header line "1" -> quote count 2\n')
2536
fh.write(b'Header line "2"" -> quote count 3\n')
2537
fh.write(b'Header line "3" -> quote count 2 => Total 7 quotes ERROR\n')
2538
fh.write(b"column_01, column_02, column_03\n")
2539
fh.write(b"123.12, 21, 99.9\n")
2540
fh.write(b"65.84, 75, 64.7\n")
2541
fh.seek(0)
2542
2543
df = pl.read_csv(fh, has_header=True, skip_lines=3)
2544
assert df.to_dict(as_series=False) == {
2545
"column_01": [123.12, 65.84],
2546
" column_02": [" 21", " 75"],
2547
" column_03": [" 99.9", " 64.7"],
2548
}
2549
2550
fh.seek(0)
2551
assert_frame_equal(pl.scan_csv(fh, has_header=True, skip_lines=3).collect(), df)
2552
2553
2554
def test_csv_invalid_quoted_comment_line(chunk_override: None) -> None:
2555
# Comment quotes should be ignored.
2556
assert pl.read_csv(
2557
b'#"Comment\nColA\tColB\n1\t2', separator="\t", comment_prefix="#"
2558
).to_dict(as_series=False) == {"ColA": [1], "ColB": [2]}
2559
2560
2561
@pytest.mark.may_fail_auto_streaming # missing_columns parameter for CSV
2562
def test_csv_compressed_new_columns_19916(chunk_override: None) -> None:
2563
n_rows = 100
2564
2565
df = pl.DataFrame(
2566
{
2567
"a": range(n_rows),
2568
"b": range(n_rows),
2569
"c": range(n_rows),
2570
"d": range(n_rows),
2571
"e": range(n_rows),
2572
"f": range(n_rows),
2573
}
2574
)
2575
2576
b = zstandard.compress(df.write_csv(include_header=False).encode())
2577
2578
q = pl.scan_csv(b, has_header=False, new_columns=["a", "b", "c", "d", "e", "f"])
2579
assert_frame_equal(q.collect(), df)
2580
2581
2582
def test_trailing_separator_8240(chunk_override: None) -> None:
2583
csv = "A|B|"
2584
2585
expected = pl.DataFrame(
2586
{"column_1": ["A"], "column_2": ["B"], "column_3": [None]},
2587
schema={"column_1": pl.String, "column_2": pl.String, "column_3": pl.String},
2588
)
2589
2590
result = pl.read_csv(io.StringIO(csv), separator="|", has_header=False)
2591
assert_frame_equal(result, expected)
2592
2593
result = pl.scan_csv(io.StringIO(csv), separator="|", has_header=False).collect()
2594
assert_frame_equal(result, expected)
2595
2596
2597
def test_header_only_column_selection_17173(chunk_override: None) -> None:
2598
csv = "A,B"
2599
result = pl.read_csv(io.StringIO(csv), columns=["B"])
2600
expected = pl.Series("B", [], pl.String()).to_frame()
2601
assert_frame_equal(result, expected)
2602
2603
2604
def test_csv_enum_raise(chunk_override: None) -> None:
2605
ENUM_DTYPE = pl.Enum(["foo", "bar"])
2606
with (
2607
io.StringIO("col\nfoo\nbaz\n") as csv,
2608
pytest.raises(pl.exceptions.ComputeError, match="could not parse `baz`"),
2609
):
2610
pl.read_csv(
2611
csv,
2612
schema={"col": ENUM_DTYPE},
2613
)
2614
2615
2616
def test_csv_no_header_ragged_lines_1505(chunk_override: None) -> None:
2617
# Test that the header schema will grow dynamically.
2618
csv = io.StringIO("""a,b,c
2619
a,b,c,d,e,f
2620
g,h,i,j,k""")
2621
2622
assert pl.read_csv(csv, has_header=False).to_dict(as_series=False) == {
2623
"column_1": ["a", "a", "g"],
2624
"column_2": ["b", "b", "h"],
2625
"column_3": ["c", "c", "i"],
2626
"column_4": [None, "d", "j"],
2627
"column_5": [None, "e", "k"],
2628
"column_6": [None, "f", None],
2629
}
2630
2631
2632
@pytest.mark.parametrize(
2633
("filter_value", "expected"),
2634
[
2635
(10, "a,b,c\n10,20,99\n"),
2636
(11, "a,b,c\n11,21,99\n"),
2637
(12, "a,b,c\n12,22,99\n12,23,99\n"),
2638
],
2639
)
2640
def test_csv_write_scalar_empty_chunk_20273(
2641
chunk_override: None, filter_value: int, expected: str
2642
) -> None:
2643
# df and filter expression are designed to test different
2644
# Column variants (Series, Scalar) and different number of chunks:
2645
# 10 > single row, ScalarColumn, multiple chunks, first is non-empty
2646
# 11 > single row, ScalarColumn, multiple chunks, first is empty
2647
# 12 > multiple rows, SeriesColumn, multiple chunks, some empty
2648
df1 = pl.DataFrame(
2649
{
2650
"a": [10, 11, 12, 12], # (12, 12 is intentional)
2651
"b": [20, 21, 22, 23],
2652
},
2653
)
2654
df2 = pl.DataFrame({"c": [99]})
2655
df3 = df1.join(df2, how="cross").filter(pl.col("a").eq(filter_value))
2656
assert df3.write_csv() == expected
2657
2658
2659
def test_csv_malformed_quote_in_unenclosed_field_22395(chunk_override: None) -> None:
2660
# Note - the malformed detection logic is very basic, and fails to detect many
2661
# types at this point (for eaxample: 'a,b"c,x"y' will not be detected).
2662
# Below is a one pattern that will be flagged (odd number of quotes in a row).
2663
malformed = b"""\
2664
a,b,x"y
2665
a,x"y,c
2666
x"y,b,c
2667
"""
2668
# short: non-SIMD code path
2669
with pytest.raises(pl.exceptions.ComputeError):
2670
pl.read_csv(malformed, has_header=False)
2671
with pytest.raises(pl.exceptions.ComputeError):
2672
pl.scan_csv(malformed, has_header=False).collect()
2673
with pytest.warns(UserWarning, match="CSV malformed:"):
2674
pl.read_csv(malformed, has_header=False, ignore_errors=True)
2675
2676
# long: trigger SIMD code path (> 64 bytes)
2677
malformed_long = malformed + ("k,l,m\n" * 10).encode()
2678
with pytest.raises(pl.exceptions.ComputeError):
2679
pl.read_csv(malformed_long, has_header=False)
2680
with pytest.raises(pl.exceptions.ComputeError):
2681
pl.scan_csv(malformed_long, has_header=False).collect()
2682
2683
2684
# Note: in some cases, marked "(excessive quoting)", the expected value has
2685
# quoted fields even when that is not strictly necessary.
2686
# It is okay to relax these tests in the future when code is refactored
2687
@pytest.mark.parametrize(
2688
(
2689
"separator",
2690
"quote_style",
2691
"scientific",
2692
"precision",
2693
"decimal_comma",
2694
"expected",
2695
),
2696
[
2697
(",", None, None, None, False, b"123.75,60.0,9\n"),
2698
(",", None, None, None, True, b'"123,75","60,0",9\n'),
2699
(";", None, None, None, True, b"123,75;60,0;9\n"),
2700
(",", None, None, 0, True, b"124,60,9\n"),
2701
(",", None, None, 3, True, b'"123,750","60,000",9\n'),
2702
(";", None, None, 0, True, b"124;60;9\n"),
2703
(";", None, None, 3, True, b"123,750;60,000;9\n"),
2704
(",", None, True, None, False, b"1.2375e2,6e1,9\n"),
2705
(",", None, True, None, True, b'"1,2375e2","6e1",9\n'), # (excessive quoting)
2706
(",", None, False, None, False, b"123.75,60,9\n"),
2707
(",", None, False, None, True, b'"123,75","60",9\n'), # (excessive quoting)
2708
(";", None, True, None, True, b"1,2375e2;6e1;9\n"),
2709
(";", None, False, None, True, b"123,75;60;9\n"),
2710
(",", None, True, 0, True, b"1e2,6e1,9\n"),
2711
(",", None, True, 3, True, b'"1,238e2","6,000e1",9\n'),
2712
(",", None, True, 4, True, b'"1,2375e2","6,0000e1",9\n'),
2713
(",", None, True, 5, True, b'"1,23750e2","6,00000e1",9\n'),
2714
(",", None, False, 0, True, b"124,60,9\n"),
2715
(",", None, False, 3, True, b'"123,750","60,000",9\n'),
2716
(",", "always", None, None, True, b'"123,75","60,0","9"\n'),
2717
(",", "necessary", None, None, True, b'"123,75","60,0",9\n'),
2718
(",", "non_numeric", None, None, True, b'"123,75","60,0",9\n'),
2719
(",", "never", None, None, True, b"123,75,60,0,9\n"),
2720
(";", "always", None, None, True, b'"123,75";"60,0";"9"\n'),
2721
(";", "necessary", None, None, True, b"123,75;60,0;9\n"),
2722
(";", "non_numeric", None, None, True, b"123,75;60,0;9\n"),
2723
(";", "never", None, None, True, b"123,75;60,0;9\n"),
2724
],
2725
)
2726
def test_write_csv_float_type_decimal_comma(
2727
chunk_override: None,
2728
separator: str,
2729
quote_style: CsvQuoteStyle | None,
2730
scientific: bool | None,
2731
precision: int | None,
2732
decimal_comma: bool,
2733
expected: bytes,
2734
) -> None:
2735
# as Float64 (implicit)
2736
df = pl.DataFrame({"a": [123.75], "b": [60.0], "c": [9]})
2737
buf = io.BytesIO()
2738
df.write_csv(
2739
buf,
2740
separator=separator,
2741
quote_style=quote_style,
2742
float_precision=precision,
2743
float_scientific=scientific,
2744
decimal_comma=decimal_comma,
2745
include_header=False,
2746
)
2747
buf.seek(0)
2748
assert buf.read() == expected
2749
2750
# as Float32 (explicit)
2751
df32 = df.with_columns(pl.col("a", "b").cast(pl.Float32))
2752
buf.seek(0)
2753
df32.write_csv(
2754
buf,
2755
separator=separator,
2756
quote_style=quote_style,
2757
float_precision=precision,
2758
float_scientific=scientific,
2759
decimal_comma=decimal_comma,
2760
include_header=False,
2761
)
2762
buf.seek(0)
2763
assert buf.read() == expected
2764
2765
# Round-trip testing: assert df == read_csv(write_csv(df)), unless:
2766
# - precision affects the value, or
2767
# - quote_style = 'never' generates malformed csv
2768
round_trip = not (
2769
(not scientific and precision is not None and precision <= 2)
2770
or (scientific and precision is not None and precision != 4)
2771
or (quote_style == "never" and decimal_comma and separator == ",")
2772
)
2773
if round_trip:
2774
# eager
2775
buf.seek(0)
2776
df.write_csv(
2777
buf,
2778
separator=separator,
2779
quote_style=quote_style,
2780
float_precision=precision,
2781
float_scientific=scientific,
2782
decimal_comma=decimal_comma,
2783
include_header=True,
2784
)
2785
buf.seek(0)
2786
out = pl.read_csv(
2787
buf, decimal_comma=decimal_comma, separator=separator, schema=df.schema
2788
)
2789
assert_frame_equal(df, out)
2790
2791
# lazy
2792
buf.seek(0)
2793
df.lazy().sink_csv(
2794
buf,
2795
separator=separator,
2796
quote_style=quote_style,
2797
float_precision=precision,
2798
float_scientific=scientific,
2799
decimal_comma=decimal_comma,
2800
include_header=True,
2801
)
2802
buf.seek(0)
2803
out = pl.scan_csv(
2804
buf, decimal_comma=decimal_comma, separator=separator, schema=df.schema
2805
).collect()
2806
assert_frame_equal(df, out)
2807
2808
2809
@pytest.mark.parametrize(
2810
(
2811
"separator",
2812
"quote_style",
2813
"decimal_comma",
2814
"expected",
2815
),
2816
[
2817
(",", None, False, b"123.75,60.0,9\n"),
2818
(",", None, True, b'"123,75","60,0",9\n'),
2819
(";", None, False, b"123.75;60.0;9\n"),
2820
(";", None, True, b"123,75;60,0;9\n"),
2821
(",", "always", True, b'"123,75","60,0","9"\n'),
2822
(",", "necessary", True, b'"123,75","60,0",9\n'),
2823
(",", "non_numeric", True, b'"123,75","60,0",9\n'),
2824
(",", "never", True, b"123,75,60,0,9\n"), # mal-formed
2825
(";", "always", True, b'"123,75";"60,0";"9"\n'),
2826
(";", "necessary", True, b"123,75;60,0;9\n"),
2827
],
2828
)
2829
def test_write_csv_decimal_type_decimal_comma(
2830
chunk_override: None,
2831
separator: str,
2832
quote_style: CsvQuoteStyle | None,
2833
decimal_comma: bool,
2834
expected: bytes,
2835
) -> None:
2836
schema = {
2837
"a": pl.Decimal(scale=2),
2838
"b": pl.Decimal(scale=1),
2839
"c": pl.Decimal(scale=0),
2840
}
2841
2842
df = pl.DataFrame(
2843
data={
2844
"a": [123.75],
2845
"b": [60.0],
2846
"c": [9],
2847
},
2848
schema=schema,
2849
)
2850
2851
buf = io.BytesIO()
2852
df.write_csv(
2853
buf,
2854
separator=separator,
2855
quote_style=quote_style,
2856
decimal_comma=decimal_comma,
2857
include_header=False,
2858
)
2859
buf.seek(0)
2860
assert buf.read() == expected
2861
2862
# Round-trip testing: assert df == read_csv(write_csv(df))
2863
# eager
2864
round_trip = not (quote_style == "never" and decimal_comma and separator == ",")
2865
if round_trip:
2866
print("BOO")
2867
buf.seek(0)
2868
df.write_csv(
2869
buf,
2870
separator=separator,
2871
quote_style=quote_style,
2872
decimal_comma=decimal_comma,
2873
include_header=True,
2874
)
2875
buf.seek(0)
2876
out = pl.read_csv(
2877
buf, decimal_comma=decimal_comma, separator=separator, schema=df.schema
2878
)
2879
assert_frame_equal(df, out)
2880
2881
# lazy
2882
buf.seek(0)
2883
df.lazy().sink_csv(
2884
buf,
2885
separator=separator,
2886
quote_style=quote_style,
2887
decimal_comma=decimal_comma,
2888
include_header=True,
2889
)
2890
buf.seek(0)
2891
out = pl.scan_csv(
2892
buf, decimal_comma=decimal_comma, separator=separator, schema=df.schema
2893
).collect()
2894
assert_frame_equal(df, out)
2895
2896
2897
def test_write_csv_large_number_autoformat_decimal_comma(chunk_override: None) -> None:
2898
df = pl.DataFrame(
2899
{
2900
"a": [12345678901234567890.123457890],
2901
"b": [1_000_000_000_000_000_000_000_000.0],
2902
}
2903
)
2904
2905
buf = io.BytesIO()
2906
df.write_csv(
2907
buf,
2908
decimal_comma=True,
2909
include_header=False,
2910
)
2911
buf.seek(0)
2912
expected = b'"1,2345678901234567e+19","1e+24"\n' # note, excessive quoting when fractional is all-zero, ok to relax
2913
assert buf.read() == expected
2914
2915
2916
def test_stop_split_fields_simd_23651(chunk_override: None) -> None:
2917
csv = """C,NEMP.WORLD,DAILY,AEMO,PUBLIC,2025/05/29,04:05:04,0000000465336084,,0000000465336084
2918
I,DISPATCH,CASESOLUTION,1,SETTLEMENTDATE,RUNNO,INTERVENTION,CASESUBTYPE,SOLUTIONSTATUS,SPDVERSION,NONPHYSICALLOSSES,TOTALOBJECTIVE,TOTALAREAGENVIOLATION,TOTALINTERCONNECTORVIOLATION,TOTALGENERICVIOLATION,TOTALRAMPRATEVIOLATION,TOTALUNITMWCAPACITYVIOLATION,TOTAL5MINVIOLATION,TOTALREGVIOLATION,TOTAL6SECVIOLATION,TOTAL60SECVIOLATION,TOTALASPROFILEVIOLATION,TOTALFASTSTARTVIOLATION,TOTALENERGYOFFERVIOLATION,LASTCHANGED
2919
D,DISPATCH,CASESOLUTION,1,"2025/05/28 04:05:00",1,0,,0,,0,-60421745.3380,0,0,0,0,0,,,,,0,0,0,"2025/05/28 04:00:04"
2920
D,DISPATCH,CASESOLUTION,1,"2025/05/28 04:10:00",1,0,,0,,0,-60871813.2780,0,0,0,0,0,,,,,0,0,0,"2025/05/28 04:05:04"
2921
D,DISPATCH,CASESOLUTION,1,"2025/05/28 04:15:00",1,0,,1,,0,-61228162.2270,0,0,0,0,0,,,,,0,0,0,"2025/05/28 04:10:03"
2922
D,DISPATCH,CASESOLUTION,1,"2025/05/28 04:20:00",1,0,,1,,0,-60901926.5760,0,0,0,0,0,,,,,0,0,0,"2025/05/28 04:15:03"
2923
D,DISPATCH,CASESOLUTION,1,"""
2924
buf = io.StringIO(csv)
2925
2926
schema = {f"column_{i + 1}": pl.String for i in range(27)}
2927
2928
buf = io.StringIO(csv)
2929
df = pl.read_csv(buf, truncate_ragged_lines=True, has_header=False, schema=schema)
2930
assert df.shape == (7, 27)
2931
assert df["column_26"].null_count() == 7
2932
2933
2934
def test_read_csv_decimal_header_only_200008(chunk_override: None) -> None:
2935
csv = "a,b"
2936
2937
df = pl.read_csv(csv.encode(), schema={"a": pl.Decimal(scale=2), "b": pl.String})
2938
assert df.dtypes == [pl.Decimal(scale=2), pl.String]
2939
2940
2941
@pytest.mark.parametrize(
2942
"dt",
2943
[
2944
pl.Enum(["a"]),
2945
pl.Categorical(),
2946
],
2947
)
2948
def test_write_csv_categorical_23939(chunk_override: None, dt: pl.DataType) -> None:
2949
n_rows = pl.thread_pool_size() * 1024 + 1
2950
df = pl.DataFrame(
2951
{
2952
"b": pl.Series(["a"] * n_rows, dtype=dt),
2953
}
2954
)
2955
expected = "b\n" + "a\n" * n_rows
2956
assert df.write_csv() == expected
2957
2958
2959
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
2960
@pytest.mark.parametrize(
2961
"csv_str", [b"A,B\n1,x\n2,y\n3,z", b"A,B\n1,x\n2,y\n3,z\n", b"\n\n\n\n2,u"]
2962
)
2963
def test_skip_more_lines_than_empty_25852(
2964
chunk_override: None, read_fn: str, csv_str: bytes
2965
) -> None:
2966
with pytest.raises(pl.exceptions.NoDataError):
2967
getattr(pl, read_fn)(csv_str, skip_lines=5).lazy().collect()
2968
2969
2970
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
2971
def test_skip_more_lines_no_raise_25852(chunk_override: None, read_fn: str) -> None:
2972
# When skip_lines exceeds total lines and raise_if_empty=False,
2973
# should return empty DataFrame with provided schema
2974
csv_str = b"A,B\n1,x\n2,y"
2975
result = (
2976
getattr(pl, read_fn)(
2977
csv_str,
2978
skip_lines=100,
2979
schema={"col1": pl.String, "col2": pl.String},
2980
has_header=False,
2981
raise_if_empty=False,
2982
)
2983
.lazy()
2984
.collect()
2985
)
2986
expected = pl.DataFrame(schema={"col1": pl.String, "col2": pl.String})
2987
assert_frame_equal(result, expected)
2988
2989
2990
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
2991
def test_skip_crlf(chunk_override: None, read_fn: str) -> None:
2992
csv_str = b"\r\n\r\nline before <3a>\r\nA,B\r\n1,2"
2993
df = getattr(pl, read_fn)(csv_str, skip_rows=1).lazy().collect()
2994
expected = pl.DataFrame(
2995
[
2996
pl.Series("A", [1], pl.Int64),
2997
pl.Series("B", [2], pl.Int64),
2998
]
2999
)
3000
assert_frame_equal(df, expected)
3001
3002
3003
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
3004
def test_only_empty_quote_string(chunk_override: None, read_fn: str) -> None:
3005
csv_str = b'""'
3006
df = getattr(pl, read_fn)(csv_str).lazy().collect()
3007
expected = pl.DataFrame({"": []}, schema={"": pl.String})
3008
assert_frame_equal(df, expected)
3009
3010
3011
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
3012
def test_only_header_with_newline(chunk_override: None, read_fn: str) -> None:
3013
csv_str = b"xx\n"
3014
df = getattr(pl, read_fn)(csv_str).lazy().collect()
3015
expected = pl.DataFrame([pl.Series("xx", [], pl.String)])
3016
assert_frame_equal(df, expected)
3017
3018
3019
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
3020
def test_single_char_input_25908(chunk_override: None, read_fn: str) -> None:
3021
csv_str = b"x"
3022
df = getattr(pl, read_fn)(csv_str).lazy().collect()
3023
expected = pl.DataFrame([pl.Series("x", [], pl.String)])
3024
assert_frame_equal(df, expected)
3025
3026
3027
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
3028
def test_csv_skip_rows_with_interleaved_comments_25840(
3029
chunk_override: None, read_fn: str
3030
) -> None:
3031
# skip_rows should only count non-comment lines
3032
csv_data = b"// x//\na,b\n//a, b\n,\nu\n2"
3033
result = (
3034
getattr(pl, read_fn)(csv_data, comment_prefix="//", skip_rows=2)
3035
.lazy()
3036
.collect()
3037
)
3038
expected = pl.DataFrame([pl.Series("u", [2], dtype=pl.Int64)])
3039
assert_frame_equal(result, expected)
3040
3041
3042
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
3043
def test_csv_comment_after_header_25841(chunk_override: None, read_fn: str) -> None:
3044
# Test that comment lines after header are properly skipped
3045
csv_data = b"RowA,RowB,RowC\n// Comment line\na,b,c"
3046
result = getattr(pl, read_fn)(csv_data, comment_prefix="//").lazy().collect()
3047
expected = pl.DataFrame({"RowA": ["a"], "RowB": ["b"], "RowC": ["c"]})
3048
assert_frame_equal(result, expected)
3049
3050
# Test with multiple comments after header
3051
csv_data2 = b"A,B\n# Comment 1\n# Comment 2\n1,2\n3,4"
3052
result2 = getattr(pl, read_fn)(csv_data2, comment_prefix="#").lazy().collect()
3053
expected2 = pl.DataFrame({"A": [1, 3], "B": [2, 4]})
3054
assert_frame_equal(result2, expected2)
3055
3056
3057
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
3058
def test_empty_csv(chunk_override: None, read_fn: str) -> None:
3059
csv_str = b""
3060
df = getattr(pl, read_fn)(csv_str, raise_if_empty=False).lazy().collect()
3061
expected = pl.DataFrame([])
3062
assert_frame_equal(df, expected)
3063
3064
3065
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
3066
def test_empty_csv_raise(chunk_override: None, read_fn: str) -> None:
3067
csv_str = b""
3068
with pytest.raises(pl.exceptions.NoDataError):
3069
getattr(pl, read_fn)(csv_str, raise_if_empty=True).lazy().collect()
3070
3071
3072
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
3073
def test_skip_lines_and_rows_raise(chunk_override: None, read_fn: str) -> None:
3074
csv_str = b"A,1,2,3"
3075
with pytest.raises(pl.exceptions.InvalidOperationError):
3076
getattr(pl, read_fn)(csv_str, skip_lines=1, skip_rows=2).lazy().collect()
3077
3078
3079
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
3080
@pytest.mark.parametrize(
3081
("csv_str", "expected"),
3082
[
3083
(b"", []),
3084
(b"A", [pl.Series("A", [], pl.String)]),
3085
(b"A\n1\n2\n3", [pl.Series("A", [1, 2, 3])]),
3086
],
3087
)
3088
def test_utf8_bom(
3089
chunk_override: None, read_fn: str, csv_str: bytes, expected: list[pl.Series]
3090
) -> None:
3091
csv_str = b"\xef\xbb\xbf" + csv_str
3092
df = getattr(pl, read_fn)(csv_str, raise_if_empty=False).lazy().collect()
3093
assert_frame_equal(df, pl.DataFrame(expected))
3094
3095
3096
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
3097
def test_invalid_utf8_bom(chunk_override: None, read_fn: str) -> None:
3098
csv_str = b"\xef\xaa\xbdA\n3"
3099
df = getattr(pl, read_fn)(csv_str, raise_if_empty=False).lazy().collect()
3100
expected = [pl.Series("諾A", [3])]
3101
assert_frame_equal(df, pl.DataFrame(expected))
3102
3103
3104
def test_invalid_utf8_in_schema(chunk_override: None) -> None:
3105
csv_str = b"\xef\xff\xbdA,B\n3,\xe0\x80\x80\n-6,x3"
3106
lf = pl.scan_csv(csv_str)
3107
3108
# Schema inference should not fail because of invalid utf-8.
3109
assert lf.collect_schema() == {"���A": pl.Int64, "B": pl.String}
3110
3111
# But actual execution should.
3112
with pytest.raises(pl.exceptions.ComputeError):
3113
lf.collect()
3114
3115
3116
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
3117
def test_provided_schema_mismatch_raise(chunk_override: None, read_fn: str) -> None:
3118
csv_str = b"A,B\n1,2"
3119
schema = {"A": pl.Int64}
3120
with pytest.raises(pl.exceptions.SchemaError):
3121
getattr(pl, read_fn)(csv_str, schema=schema).lazy().collect()
3122
3123
3124
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
3125
def test_provided_schema_mismatch_truncate(chunk_override: None, read_fn: str) -> None:
3126
csv_str = b"A,B\n1,2"
3127
schema = {"A": pl.Int64}
3128
df = (
3129
getattr(pl, read_fn)(csv_str, schema=schema, truncate_ragged_lines=True)
3130
.lazy()
3131
.collect()
3132
)
3133
expected = [pl.Series("A", [1])]
3134
assert_frame_equal(df, pl.DataFrame(expected))
3135
3136
3137
def test_read_batch_csv_deprecations_26479(foods_file_path: Path) -> None:
3138
with pytest.warns(DeprecationWarning, match=r"`read_csv_batched` is deprecated"):
3139
pl.read_csv_batched(foods_file_path)
3140
3141