Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_csv.py
6939 views
1
from __future__ import annotations
2
3
import gzip
4
import io
5
import os
6
import sys
7
import textwrap
8
import zlib
9
from datetime import date, datetime, time, timedelta, timezone
10
from decimal import Decimal as D
11
from tempfile import NamedTemporaryFile
12
from typing import TYPE_CHECKING, TypedDict
13
14
import numpy as np
15
import pyarrow as pa
16
import pytest
17
import zstandard
18
19
import polars as pl
20
from polars._utils.various import normalize_filepath
21
from polars.exceptions import ComputeError, InvalidOperationError, NoDataError
22
from polars.io.csv import BatchedCsvReader
23
from polars.testing import assert_frame_equal, assert_series_equal
24
25
if TYPE_CHECKING:
26
from pathlib import Path
27
28
from polars._typing import CsvQuoteStyle, TimeUnit
29
from tests.unit.conftest import MemoryUsage
30
31
32
@pytest.fixture
33
def foods_file_path(io_files_path: Path) -> Path:
34
return io_files_path / "foods1.csv"
35
36
37
def test_quoted_date() -> None:
38
csv = textwrap.dedent(
39
"""\
40
a,b
41
"2022-01-01",1
42
"2022-01-02",2
43
"""
44
)
45
result = pl.read_csv(csv.encode(), try_parse_dates=True)
46
expected = pl.DataFrame({"a": [date(2022, 1, 1), date(2022, 1, 2)], "b": [1, 2]})
47
assert_frame_equal(result, expected)
48
49
50
# Issue: https://github.com/pola-rs/polars/issues/10826
51
def test_date_pattern_with_datetime_override_10826() -> None:
52
result = pl.read_csv(
53
source=io.StringIO("col\n2023-01-01\n2023-02-01\n2023-03-01"),
54
schema_overrides={"col": pl.Datetime},
55
)
56
expected = pl.Series(
57
"col", [datetime(2023, 1, 1), datetime(2023, 2, 1), datetime(2023, 3, 1)]
58
).to_frame()
59
assert_frame_equal(result, expected)
60
61
result = pl.read_csv(
62
source=io.StringIO("col\n2023-01-01T01:02:03\n2023-02-01\n2023-03-01"),
63
schema_overrides={"col": pl.Datetime},
64
)
65
expected = pl.Series(
66
"col",
67
[datetime(2023, 1, 1, 1, 2, 3), datetime(2023, 2, 1), datetime(2023, 3, 1)],
68
).to_frame()
69
assert_frame_equal(result, expected)
70
71
72
def test_to_from_buffer(df_no_lists: pl.DataFrame) -> None:
73
df = df_no_lists
74
buf = io.BytesIO()
75
df.write_csv(buf)
76
buf.seek(0)
77
78
read_df = pl.read_csv(buf, try_parse_dates=True)
79
read_df = read_df.with_columns(
80
pl.col("cat").cast(pl.Categorical),
81
pl.col("enum").cast(pl.Enum(["foo", "ham", "bar"])),
82
pl.col("time").cast(pl.Time),
83
)
84
assert_frame_equal(df, read_df, categorical_as_str=True)
85
with pytest.raises(AssertionError):
86
assert_frame_equal(df.select("time", "cat"), read_df, categorical_as_str=True)
87
88
89
@pytest.mark.write_disk
90
def test_to_from_file(df_no_lists: pl.DataFrame, tmp_path: Path) -> None:
91
tmp_path.mkdir(exist_ok=True)
92
93
df = df_no_lists.drop("strings_nulls")
94
95
file_path = tmp_path / "small.csv"
96
df.write_csv(file_path)
97
read_df = pl.read_csv(file_path, try_parse_dates=True)
98
99
read_df = read_df.with_columns(
100
pl.col("cat").cast(pl.Categorical),
101
pl.col("enum").cast(pl.Enum(["foo", "ham", "bar"])),
102
pl.col("time").cast(pl.Time),
103
)
104
assert_frame_equal(df, read_df, categorical_as_str=True)
105
106
107
def test_normalize_filepath(io_files_path: Path) -> None:
108
with pytest.raises(IsADirectoryError):
109
normalize_filepath(io_files_path)
110
111
assert normalize_filepath(str(io_files_path), check_not_directory=False) == str(
112
io_files_path
113
)
114
115
116
def test_infer_schema_false() -> None:
117
csv = textwrap.dedent(
118
"""\
119
a,b,c
120
1,2,3
121
1,2,3
122
"""
123
)
124
f = io.StringIO(csv)
125
df = pl.read_csv(f, infer_schema=False)
126
assert df.dtypes == [pl.String, pl.String, pl.String]
127
128
129
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
130
def test_csv_null_values() -> None:
131
csv = textwrap.dedent(
132
"""\
133
a,b,c
134
na,b,c
135
a,na,c
136
"""
137
)
138
f = io.StringIO(csv)
139
df = pl.read_csv(f, null_values="na")
140
assert df.rows() == [(None, "b", "c"), ("a", None, "c")]
141
142
# note: after reading, the buffer position in StringIO will have been
143
# advanced; reading again will raise NoDataError, so we provide a hint
144
# in the error string about this, suggesting "seek(0)" as a possible fix...
145
with pytest.raises(NoDataError, match=r"empty"):
146
pl.read_csv(f)
147
148
# ... unless we explicitly tell read_csv not to raise an
149
# exception, in which case we expect an empty dataframe
150
assert_frame_equal(pl.read_csv(f, raise_if_empty=False), pl.DataFrame())
151
152
out = io.BytesIO()
153
df.write_csv(out, null_value="na")
154
assert csv == out.getvalue().decode("ascii")
155
156
csv = textwrap.dedent(
157
"""\
158
a,b,c
159
na,b,c
160
a,n/a,c
161
"""
162
)
163
f = io.StringIO(csv)
164
df = pl.read_csv(f, null_values=["na", "n/a"])
165
assert df.rows() == [(None, "b", "c"), ("a", None, "c")]
166
167
csv = textwrap.dedent(
168
r"""
169
a,b,c
170
na,b,c
171
a,\N,c
172
,b,
173
"""
174
)
175
f = io.StringIO(csv)
176
df = pl.read_csv(f, null_values={"a": "na", "b": r"\N"})
177
assert df.rows() == [(None, "b", "c"), ("a", None, "c"), (None, "b", None)]
178
179
180
def test_csv_missing_utf8_is_empty_string() -> None:
181
# validate 'missing_utf8_is_empty_string' for missing fields that are...
182
# >> ...leading
183
# >> ...trailing (both EOL & EOF)
184
# >> ...in lines that have missing fields
185
# >> ...in cols containing no other strings
186
# >> ...interacting with other user-supplied null values
187
188
csv = textwrap.dedent(
189
r"""
190
a,b,c
191
na,b,c
192
a,\N,c
193
,b,
194
"""
195
)
196
f = io.StringIO(csv)
197
df = pl.read_csv(
198
f,
199
null_values={"a": "na", "b": r"\N"},
200
missing_utf8_is_empty_string=True,
201
)
202
# ┌──────┬──────┬─────┐
203
# │ a ┆ b ┆ c │
204
# ╞══════╪══════╪═════╡
205
# │ null ┆ b ┆ c │
206
# │ a ┆ null ┆ c │
207
# │ ┆ b ┆ │
208
# └──────┴──────┴─────┘
209
assert df.rows() == [(None, "b", "c"), ("a", None, "c"), ("", "b", "")]
210
211
csv = textwrap.dedent(
212
r"""
213
a,b,c,d,e,f,g
214
na,,,,\N,,
215
a,\N,c,,,,g
216
,,,
217
,,,na,,,
218
"""
219
)
220
f = io.StringIO(csv)
221
df = pl.read_csv(f, null_values=["na", r"\N"])
222
# ┌──────┬──────┬──────┬──────┬──────┬──────┬──────┐
223
# │ a ┆ b ┆ c ┆ d ┆ e ┆ f ┆ g │
224
# ╞══════╪══════╪══════╪══════╪══════╪══════╪══════╡
225
# │ null ┆ null ┆ null ┆ null ┆ null ┆ null ┆ null │
226
# │ a ┆ null ┆ c ┆ null ┆ null ┆ null ┆ g │
227
# │ null ┆ null ┆ null ┆ null ┆ null ┆ null ┆ null │
228
# │ null ┆ null ┆ null ┆ null ┆ null ┆ null ┆ null │
229
# └──────┴──────┴──────┴──────┴──────┴──────┴──────┘
230
assert df.rows() == [
231
(None, None, None, None, None, None, None),
232
("a", None, "c", None, None, None, "g"),
233
(None, None, None, None, None, None, None),
234
(None, None, None, None, None, None, None),
235
]
236
237
f.seek(0)
238
df = pl.read_csv(
239
f,
240
null_values=["na", r"\N"],
241
missing_utf8_is_empty_string=True,
242
)
243
# ┌──────┬──────┬─────┬──────┬──────┬──────┬─────┐
244
# │ a ┆ b ┆ c ┆ d ┆ e ┆ f ┆ g │
245
# ╞══════╪══════╪═════╪══════╪══════╪══════╪═════╡
246
# │ null ┆ ┆ ┆ ┆ null ┆ ┆ │
247
# │ a ┆ null ┆ c ┆ ┆ ┆ ┆ g │
248
# │ ┆ ┆ ┆ ┆ ┆ ┆ │
249
# │ ┆ ┆ ┆ null ┆ ┆ ┆ │
250
# └──────┴──────┴─────┴──────┴──────┴──────┴─────┘
251
assert df.rows() == [
252
(None, "", "", "", None, "", ""),
253
("a", None, "c", "", "", "", "g"),
254
("", "", "", "", "", "", ""),
255
("", "", "", None, "", "", ""),
256
]
257
258
259
def test_csv_int_types() -> None:
260
f = io.StringIO(
261
"u8,i8,u16,i16,u32,i32,u64,i64,i128\n"
262
"0,0,0,0,0,0,0,0,0\n"
263
"0,-128,0,-32768,0,-2147483648,0,-9223372036854775808,-170141183460469231731687303715884105728\n"
264
"255,127,65535,32767,4294967295,2147483647,18446744073709551615,9223372036854775807,170141183460469231731687303715884105727\n"
265
"01,01,01,01,01,01,01,01,01\n"
266
"01,-01,01,-01,01,-01,01,-01,01\n"
267
)
268
df = pl.read_csv(
269
f,
270
schema={
271
"u8": pl.UInt8,
272
"i8": pl.Int8,
273
"u16": pl.UInt16,
274
"i16": pl.Int16,
275
"u32": pl.UInt32,
276
"i32": pl.Int32,
277
"u64": pl.UInt64,
278
"i64": pl.Int64,
279
"i128": pl.Int128,
280
},
281
)
282
283
assert_frame_equal(
284
df,
285
pl.DataFrame(
286
{
287
"u8": pl.Series([0, 0, 255, 1, 1], dtype=pl.UInt8),
288
"i8": pl.Series([0, -128, 127, 1, -1], dtype=pl.Int8),
289
"u16": pl.Series([0, 0, 65535, 1, 1], dtype=pl.UInt16),
290
"i16": pl.Series([0, -32768, 32767, 1, -1], dtype=pl.Int16),
291
"u32": pl.Series([0, 0, 4294967295, 1, 1], dtype=pl.UInt32),
292
"i32": pl.Series([0, -2147483648, 2147483647, 1, -1], dtype=pl.Int32),
293
"u64": pl.Series([0, 0, 18446744073709551615, 1, 1], dtype=pl.UInt64),
294
"i64": pl.Series(
295
[0, -9223372036854775808, 9223372036854775807, 1, -1],
296
dtype=pl.Int64,
297
),
298
"i128": pl.Series(
299
[
300
0,
301
-170141183460469231731687303715884105728,
302
170141183460469231731687303715884105727,
303
1,
304
1,
305
],
306
dtype=pl.Int128,
307
),
308
}
309
),
310
)
311
312
313
def test_csv_float_parsing() -> None:
314
lines_with_floats = [
315
"123.86,+123.86,-123.86\n",
316
".987,+.987,-.987\n",
317
"5.,+5.,-5.\n",
318
"inf,+inf,-inf\n",
319
"NaN,+NaN,-NaN\n",
320
]
321
322
for line_with_floats in lines_with_floats:
323
f = io.StringIO(line_with_floats)
324
df = pl.read_csv(f, has_header=False, new_columns=["a", "b", "c"])
325
assert df.dtypes == [pl.Float64, pl.Float64, pl.Float64]
326
327
lines_with_scientific_numbers = [
328
"1e27,1E65,1e-28,1E-9\n",
329
"+1e27,+1E65,+1e-28,+1E-9\n",
330
"1e+27,1E+65,1e-28,1E-9\n",
331
"+1e+27,+1E+65,+1e-28,+1E-9\n",
332
"-1e+27,-1E+65,-1e-28,-1E-9\n",
333
# "e27,E65,e-28,E-9\n",
334
# "+e27,+E65,+e-28,+E-9\n",
335
# "-e27,-E65,-e-28,-E-9\n",
336
]
337
338
for line_with_scientific_numbers in lines_with_scientific_numbers:
339
f = io.StringIO(line_with_scientific_numbers)
340
df = pl.read_csv(f, has_header=False, new_columns=["a", "b", "c", "d"])
341
assert df.dtypes == [pl.Float64, pl.Float64, pl.Float64, pl.Float64]
342
343
344
def test_datetime_parsing() -> None:
345
csv = textwrap.dedent(
346
"""\
347
timestamp,open,high
348
2021-01-01 00:00:00,0.00305500,0.00306000
349
2021-01-01 00:15:00,0.00298800,0.00300400
350
2021-01-01 00:30:00,0.00298300,0.00300100
351
2021-01-01 00:45:00,0.00299400,0.00304000
352
"""
353
)
354
355
f = io.StringIO(csv)
356
df = pl.read_csv(f, try_parse_dates=True)
357
assert df.dtypes == [pl.Datetime, pl.Float64, pl.Float64]
358
359
360
def test_datetime_parsing_default_formats() -> None:
361
csv = textwrap.dedent(
362
"""\
363
ts_dmy,ts_dmy_f,ts_dmy_p
364
01/01/2021 00:00:00,31-01-2021T00:00:00.123,31-01-2021 11:00
365
01/01/2021 00:15:00,31-01-2021T00:15:00.123,31-01-2021 01:00
366
01/01/2021 00:30:00,31-01-2021T00:30:00.123,31-01-2021 01:15
367
01/01/2021 00:45:00,31-01-2021T00:45:00.123,31-01-2021 01:30
368
"""
369
)
370
371
f = io.StringIO(csv)
372
df = pl.read_csv(f, try_parse_dates=True)
373
assert df.dtypes == [pl.Datetime, pl.Datetime, pl.Datetime]
374
375
376
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
377
def test_partial_schema_overrides() -> None:
378
csv = textwrap.dedent(
379
"""\
380
a,b,c
381
1,2,3
382
1,2,3
383
"""
384
)
385
f = io.StringIO(csv)
386
df = pl.read_csv(f, schema_overrides=[pl.String])
387
assert df.dtypes == [pl.String, pl.Int64, pl.Int64]
388
389
390
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
391
def test_schema_overrides_with_column_name_selection() -> None:
392
csv = textwrap.dedent(
393
"""\
394
a,b,c,d
395
1,2,3,4
396
1,2,3,4
397
"""
398
)
399
f = io.StringIO(csv)
400
df = pl.read_csv(f, columns=["c", "b", "d"], schema_overrides=[pl.Int32, pl.String])
401
assert df.dtypes == [pl.String, pl.Int32, pl.Int64]
402
403
404
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
405
def test_schema_overrides_with_column_idx_selection() -> None:
406
csv = textwrap.dedent(
407
"""\
408
a,b,c,d
409
1,2,3,4
410
1,2,3,4
411
"""
412
)
413
f = io.StringIO(csv)
414
df = pl.read_csv(f, columns=[2, 1, 3], schema_overrides=[pl.Int32, pl.String])
415
# Columns without an explicit dtype set will get pl.String if dtypes is a list
416
# if the column selection is done with column indices instead of column names.
417
assert df.dtypes == [pl.String, pl.Int32, pl.String]
418
# Projections are sorted.
419
assert df.columns == ["b", "c", "d"]
420
421
422
def test_partial_column_rename() -> None:
423
csv = textwrap.dedent(
424
"""\
425
a,b,c
426
1,2,3
427
1,2,3
428
"""
429
)
430
f = io.StringIO(csv)
431
for use in [True, False]:
432
f.seek(0)
433
df = pl.read_csv(f, new_columns=["foo"], use_pyarrow=use)
434
assert df.columns == ["foo", "b", "c"]
435
436
437
@pytest.mark.parametrize(
438
("col_input", "col_out"),
439
[([0, 1], ["a", "b"]), ([0, 2], ["a", "c"]), (["b"], ["b"])],
440
)
441
def test_read_csv_columns_argument(
442
col_input: list[int] | list[str], col_out: list[str]
443
) -> None:
444
csv = textwrap.dedent(
445
"""\
446
a,b,c
447
1,2,3
448
1,2,3
449
"""
450
)
451
f = io.StringIO(csv)
452
df = pl.read_csv(f, columns=col_input)
453
assert df.shape[0] == 2
454
assert df.columns == col_out
455
456
457
@pytest.mark.may_fail_cloud # read->scan_csv dispatch
458
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
459
def test_read_csv_buffer_ownership() -> None:
460
bts = b"\xf0\x9f\x98\x80,5.55,333\n\xf0\x9f\x98\x86,-5.0,666"
461
buf = io.BytesIO(bts)
462
df = pl.read_csv(
463
buf,
464
has_header=False,
465
new_columns=["emoji", "flt", "int"],
466
)
467
# confirm that read_csv succeeded, and didn't close the input buffer (#2696)
468
assert df.shape == (2, 3)
469
assert df.rows() == [("😀", 5.55, 333), ("😆", -5.0, 666)]
470
assert not buf.closed
471
assert buf.read() == bts
472
473
474
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
475
@pytest.mark.write_disk
476
def test_read_csv_encoding(tmp_path: Path) -> None:
477
tmp_path.mkdir(exist_ok=True)
478
479
bts = (
480
b"Value1,Value2,Value3,Value4,Region\n"
481
b"-30,7.5,2578,1,\xa5x\xa5_\n-32,7.97,3006,1,\xa5x\xa4\xa4\n"
482
b"-31,8,3242,2,\xb7s\xa6\xcb\n-33,7.97,3300,3,\xb0\xaa\xb6\xaf\n"
483
b"-20,7.91,3384,4,\xac\xfc\xb0\xea\n"
484
)
485
486
file_path = tmp_path / "encoding.csv"
487
file_path.write_bytes(bts)
488
489
file_str = str(file_path)
490
bytesio = io.BytesIO(bts)
491
492
for use_pyarrow in (False, True):
493
bytesio.seek(0)
494
for file in [file_path, file_str, bts, bytesio]:
495
assert_series_equal(
496
pl.read_csv(
497
file, # type: ignore[arg-type]
498
encoding="big5",
499
use_pyarrow=use_pyarrow,
500
).get_column("Region"),
501
pl.Series("Region", ["台北", "台中", "新竹", "高雄", "美國"]),
502
)
503
504
505
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
506
@pytest.mark.write_disk
507
def test_read_csv_encoding_lossy(tmp_path: Path) -> None:
508
tmp_path.mkdir(exist_ok=True)
509
510
bts = (
511
b"\xc8\xec\xff,\xc2\xee\xe7\xf0\xe0\xf1\xf2,\xc3\xee\xf0\xee\xe4\n"
512
b"\xc8\xe2\xe0\xed,25,\xcc\xee\xf1\xea\xe2\xe0\n"
513
# \x98 is not supported in "windows-1251".
514
b"\xce\xeb\xfc\xe3\xe0,30,\xd1\xe0\xed\xea\xf2-\x98\xcf\xe5\xf2\xe5\xf0\xe1\xf3\xf0\xe3\n"
515
)
516
517
file_path = tmp_path / "encoding_lossy.csv"
518
file_path.write_bytes(bts)
519
520
file_str = str(file_path)
521
bytesio = io.BytesIO(bts)
522
bytesio.seek(0)
523
524
for file in [file_path, file_str, bts, bytesio]:
525
assert_series_equal(
526
pl.read_csv(
527
file, # type: ignore[arg-type]
528
encoding="windows-1251-lossy",
529
use_pyarrow=False,
530
).get_column("Город"),
531
pl.Series("Город", ["Москва", "Санкт-�Петербург"]),
532
)
533
534
535
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
536
def test_column_rename_and_schema_overrides() -> None:
537
csv = textwrap.dedent(
538
"""\
539
a,b,c
540
1,2,3
541
1,2,3
542
"""
543
)
544
f = io.StringIO(csv)
545
df = pl.read_csv(
546
f,
547
new_columns=["A", "B", "C"],
548
schema_overrides={"A": pl.String, "B": pl.Int64, "C": pl.Float32},
549
)
550
assert df.dtypes == [pl.String, pl.Int64, pl.Float32]
551
552
f = io.StringIO(csv)
553
df = pl.read_csv(
554
f,
555
columns=["a", "c"],
556
new_columns=["A", "C"],
557
schema_overrides={"A": pl.String, "C": pl.Float32},
558
)
559
assert df.dtypes == [pl.String, pl.Float32]
560
561
csv = textwrap.dedent(
562
"""\
563
1,2,3
564
1,2,3
565
"""
566
)
567
f = io.StringIO(csv)
568
df = pl.read_csv(
569
f,
570
new_columns=["A", "B", "C"],
571
schema_overrides={"A": pl.String, "C": pl.Float32},
572
has_header=False,
573
)
574
assert df.dtypes == [pl.String, pl.Int64, pl.Float32]
575
576
577
def test_compressed_csv(io_files_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
578
monkeypatch.setenv("POLARS_FORCE_ASYNC", "0")
579
580
# gzip compression
581
csv = textwrap.dedent(
582
"""\
583
a,b,c
584
1,a,1.0
585
2,b,2.0
586
3,c,3.0
587
"""
588
)
589
fout = io.BytesIO()
590
with gzip.GzipFile(fileobj=fout, mode="w") as f:
591
f.write(csv.encode())
592
593
csv_bytes = fout.getvalue()
594
out = pl.read_csv(csv_bytes)
595
expected = pl.DataFrame(
596
{"a": [1, 2, 3], "b": ["a", "b", "c"], "c": [1.0, 2.0, 3.0]}
597
)
598
assert_frame_equal(out, expected)
599
600
# now from disk
601
csv_file = io_files_path / "gzipped.csv.gz"
602
out = pl.read_csv(str(csv_file), truncate_ragged_lines=True)
603
assert_frame_equal(out, expected)
604
605
# now with schema defined
606
schema = {"a": pl.Int64, "b": pl.Utf8, "c": pl.Float64}
607
out = pl.read_csv(str(csv_file), schema=schema, truncate_ragged_lines=True)
608
assert_frame_equal(out, expected)
609
610
# now with column projection
611
out = pl.read_csv(csv_bytes, columns=["a", "b"])
612
expected = pl.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
613
assert_frame_equal(out, expected)
614
615
# zlib compression
616
csv_bytes = zlib.compress(csv.encode())
617
out = pl.read_csv(csv_bytes)
618
expected = pl.DataFrame(
619
{"a": [1, 2, 3], "b": ["a", "b", "c"], "c": [1.0, 2.0, 3.0]}
620
)
621
assert_frame_equal(out, expected)
622
623
# different levels of zlib create different magic strings,
624
# try to cover them all.
625
for level in range(10):
626
csv_bytes = zlib.compress(csv.encode(), level=level)
627
out = pl.read_csv(csv_bytes)
628
expected = pl.DataFrame(
629
{"a": [1, 2, 3], "b": ["a", "b", "c"], "c": [1.0, 2.0, 3.0]}
630
)
631
assert_frame_equal(out, expected)
632
633
# zstd compression
634
csv_bytes = zstandard.compress(csv.encode())
635
out = pl.read_csv(csv_bytes)
636
assert_frame_equal(out, expected)
637
638
# zstd compressed file
639
csv_file = io_files_path / "zstd_compressed.csv.zst"
640
out = pl.scan_csv(csv_file, truncate_ragged_lines=True).collect()
641
assert_frame_equal(out, expected)
642
out = pl.read_csv(str(csv_file), truncate_ragged_lines=True)
643
assert_frame_equal(out, expected)
644
645
# no compression
646
f2 = io.BytesIO(b"a,b\n1,2\n")
647
out2 = pl.read_csv(f2)
648
expected = pl.DataFrame({"a": [1], "b": [2]})
649
assert_frame_equal(out2, expected)
650
651
652
def test_partial_decompression(foods_file_path: Path) -> None:
653
f_out = io.BytesIO()
654
with gzip.GzipFile(fileobj=f_out, mode="w") as f:
655
f.write(foods_file_path.read_bytes())
656
657
csv_bytes = f_out.getvalue()
658
for n_rows in [1, 5, 26]:
659
out = pl.read_csv(csv_bytes, n_rows=n_rows)
660
assert out.shape == (n_rows, 4)
661
662
# zstd compression
663
csv_bytes = zstandard.compress(foods_file_path.read_bytes())
664
for n_rows in [1, 5, 26]:
665
out = pl.read_csv(csv_bytes, n_rows=n_rows)
666
assert out.shape == (n_rows, 4)
667
668
669
def test_empty_bytes() -> None:
670
b = b""
671
with pytest.raises(NoDataError):
672
pl.read_csv(b)
673
674
df = pl.read_csv(b, raise_if_empty=False)
675
assert_frame_equal(df, pl.DataFrame())
676
677
678
def test_empty_line_with_single_column() -> None:
679
df = pl.read_csv(
680
b"a\n\nb\n",
681
new_columns=["A"],
682
has_header=False,
683
comment_prefix="#",
684
use_pyarrow=False,
685
)
686
expected = pl.DataFrame({"A": ["a", None, "b"]})
687
assert_frame_equal(df, expected)
688
689
690
def test_empty_line_with_multiple_columns() -> None:
691
df = pl.read_csv(
692
b"a,b\n\nc,d\n",
693
new_columns=["A", "B"],
694
has_header=False,
695
comment_prefix="#",
696
use_pyarrow=False,
697
)
698
expected = pl.DataFrame({"A": ["a", None, "c"], "B": ["b", None, "d"]})
699
assert_frame_equal(df, expected)
700
701
702
def test_preserve_whitespace_at_line_start() -> None:
703
df = pl.read_csv(
704
b" a\n b \n c\nd",
705
new_columns=["A"],
706
has_header=False,
707
use_pyarrow=False,
708
)
709
expected = pl.DataFrame({"A": [" a", " b ", " c", "d"]})
710
assert_frame_equal(df, expected)
711
712
713
def test_csv_multi_char_comment() -> None:
714
csv = textwrap.dedent(
715
"""\
716
#a,b
717
##c,d
718
"""
719
)
720
f = io.StringIO(csv)
721
df = pl.read_csv(
722
f,
723
new_columns=["A", "B"],
724
has_header=False,
725
comment_prefix="##",
726
use_pyarrow=False,
727
)
728
expected = pl.DataFrame({"A": ["#a"], "B": ["b"]})
729
assert_frame_equal(df, expected)
730
731
# check comment interaction with headers/skip_rows
732
for skip_rows, b in (
733
(1, io.BytesIO(b"<filemeta>\n#!skip\n#!skip\nCol1\tCol2\n")),
734
(0, io.BytesIO(b"\n#!skip\n#!skip\nCol1\tCol2")),
735
(0, io.BytesIO(b"#!skip\nCol1\tCol2\n#!skip\n")),
736
(0, io.BytesIO(b"#!skip\nCol1\tCol2")),
737
):
738
df = pl.read_csv(b, separator="\t", comment_prefix="#!", skip_rows=skip_rows)
739
assert_frame_equal(df, pl.DataFrame(schema=["Col1", "Col2"]).cast(pl.Utf8))
740
741
742
def test_csv_quote_char() -> None:
743
expected = pl.DataFrame(
744
[
745
pl.Series("linenum", [1, 2, 3, 4, 5, 6, 7, 8, 9]),
746
pl.Series(
747
"last_name",
748
[
749
"Jagger",
750
'O"Brian',
751
"Richards",
752
'L"Etoile',
753
"Watts",
754
"Smith",
755
'"Wyman"',
756
"Woods",
757
'J"o"ne"s',
758
],
759
),
760
pl.Series(
761
"first_name",
762
[
763
"Mick",
764
'"Mary"',
765
"Keith",
766
"Bennet",
767
"Charlie",
768
'D"Shawn',
769
"Bill",
770
"Ron",
771
"Brian",
772
],
773
),
774
]
775
)
776
rolling_stones = textwrap.dedent(
777
"""\
778
linenum,last_name,first_name
779
1,Jagger,Mick
780
2,O"Brian,"Mary"
781
3,Richards,Keith
782
4,L"Etoile,Bennet
783
5,Watts,Charlie
784
6,Smith,D"Shawn
785
7,"Wyman",Bill
786
8,Woods,Ron
787
9,J"o"ne"s,Brian
788
"""
789
)
790
for use_pyarrow in (False, True):
791
out = pl.read_csv(
792
rolling_stones.encode(), quote_char=None, use_pyarrow=use_pyarrow
793
)
794
assert out.shape == (9, 3)
795
assert_frame_equal(out, expected)
796
797
# non-standard quote char
798
df = pl.DataFrame({"x": ["", "0*0", "xyz"]})
799
csv_data = df.write_csv(quote_char="*")
800
801
assert csv_data == "x\n**\n*0**0*\nxyz\n"
802
assert_frame_equal(df, pl.read_csv(io.StringIO(csv_data), quote_char="*"))
803
804
805
def test_csv_empty_quotes_char_1622() -> None:
806
pl.read_csv(b"a,b,c,d\nA1,B1,C1,1\nA2,B2,C2,2\n", quote_char="")
807
808
809
def test_ignore_try_parse_dates() -> None:
810
csv = textwrap.dedent(
811
"""\
812
a,b,c
813
1,i,16200126
814
2,j,16250130
815
3,k,17220012
816
4,l,17290009
817
"""
818
).encode()
819
820
headers = ["a", "b", "c"]
821
dtypes: dict[str, type[pl.DataType]] = dict.fromkeys(
822
headers, pl.String
823
) # Forces String type for every column
824
df = pl.read_csv(csv, columns=headers, schema_overrides=dtypes)
825
assert df.dtypes == [pl.String, pl.String, pl.String]
826
827
828
def test_csv_date_handling() -> None:
829
csv = textwrap.dedent(
830
"""\
831
date
832
1745-04-02
833
1742-03-21
834
1743-06-16
835
1730-07-22
836
837
1739-03-16
838
"""
839
)
840
expected = pl.DataFrame(
841
{
842
"date": [
843
date(1745, 4, 2),
844
date(1742, 3, 21),
845
date(1743, 6, 16),
846
date(1730, 7, 22),
847
None,
848
date(1739, 3, 16),
849
]
850
}
851
)
852
out = pl.read_csv(csv.encode(), try_parse_dates=True)
853
assert_frame_equal(out, expected)
854
dtypes = {"date": pl.Date}
855
out = pl.read_csv(csv.encode(), schema_overrides=dtypes)
856
assert_frame_equal(out, expected)
857
858
859
def test_csv_no_date_dtype_because_string() -> None:
860
csv = textwrap.dedent(
861
"""\
862
date
863
2024-01-01
864
2024-01-02
865
hello
866
"""
867
)
868
out = pl.read_csv(csv.encode(), try_parse_dates=True)
869
assert out.dtypes == [pl.String]
870
871
872
def test_csv_infer_date_dtype() -> None:
873
csv = textwrap.dedent(
874
"""\
875
date
876
2024-01-01
877
"2024-01-02"
878
879
2024-01-04
880
"""
881
)
882
out = pl.read_csv(csv.encode(), try_parse_dates=True)
883
expected = pl.DataFrame(
884
{
885
"date": [
886
date(2024, 1, 1),
887
date(2024, 1, 2),
888
None,
889
date(2024, 1, 4),
890
]
891
}
892
)
893
assert_frame_equal(out, expected)
894
895
896
def test_csv_date_dtype_ignore_errors() -> None:
897
csv = textwrap.dedent(
898
"""\
899
date
900
hello
901
2024-01-02
902
world
903
!!
904
"""
905
)
906
out = pl.read_csv(
907
csv.encode(), ignore_errors=True, schema_overrides={"date": pl.Date}
908
)
909
expected = pl.DataFrame(
910
{
911
"date": [
912
None,
913
date(2024, 1, 2),
914
None,
915
None,
916
]
917
}
918
)
919
assert_frame_equal(out, expected)
920
921
922
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
923
def test_csv_globbing(io_files_path: Path) -> None:
924
path = io_files_path / "foods*.csv"
925
df = pl.read_csv(path)
926
assert df.shape == (135, 4)
927
928
with pytest.MonkeyPatch.context() as mp:
929
mp.setenv("POLARS_FORCE_ASYNC", "0")
930
931
with pytest.raises(ValueError):
932
_ = pl.read_csv(path, columns=[0, 1])
933
934
df = pl.read_csv(path, columns=["category", "sugars_g"])
935
assert df.shape == (135, 2)
936
assert df.row(-1) == ("seafood", 1)
937
assert df.row(0) == ("vegetables", 2)
938
939
with pytest.MonkeyPatch.context() as mp:
940
mp.setenv("POLARS_FORCE_ASYNC", "0")
941
942
with pytest.raises(ValueError):
943
_ = pl.read_csv(
944
path, schema_overrides=[pl.String, pl.Int64, pl.Int64, pl.Int64]
945
)
946
947
dtypes = {
948
"category": pl.String,
949
"calories": pl.Int32,
950
"fats_g": pl.Float32,
951
"sugars_g": pl.Int32,
952
}
953
954
df = pl.read_csv(path, schema_overrides=dtypes)
955
assert df.dtypes == list(dtypes.values())
956
957
958
def test_csv_schema_offset(foods_file_path: Path) -> None:
959
csv = textwrap.dedent(
960
"""\
961
metadata
962
line
963
col1,col2,col3
964
alpha,beta,gamma
965
1,2.0,"A"
966
3,4.0,"B"
967
5,6.0,"C"
968
"""
969
).encode()
970
971
df = pl.read_csv(csv, skip_rows=3)
972
assert df.columns == ["alpha", "beta", "gamma"]
973
assert df.shape == (3, 3)
974
assert df.dtypes == [pl.Int64, pl.Float64, pl.String]
975
976
df = pl.read_csv(csv, skip_rows=2, skip_rows_after_header=1)
977
assert df.columns == ["col1", "col2", "col3"]
978
assert df.shape == (3, 3)
979
assert df.dtypes == [pl.Int64, pl.Float64, pl.String]
980
981
df = pl.scan_csv(foods_file_path, skip_rows=4).collect()
982
assert df.columns == ["fruit", "60", "0", "11"]
983
assert df.shape == (23, 4)
984
assert df.dtypes == [pl.String, pl.Int64, pl.Float64, pl.Int64]
985
986
df = pl.scan_csv(foods_file_path, skip_rows_after_header=24).collect()
987
assert df.columns == ["category", "calories", "fats_g", "sugars_g"]
988
assert df.shape == (3, 4)
989
assert df.dtypes == [pl.String, pl.Int64, pl.Int64, pl.Int64]
990
991
df = pl.scan_csv(
992
foods_file_path, skip_rows_after_header=24, infer_schema_length=1
993
).collect()
994
assert df.columns == ["category", "calories", "fats_g", "sugars_g"]
995
assert df.shape == (3, 4)
996
assert df.dtypes == [pl.String, pl.Int64, pl.Int64, pl.Int64]
997
998
999
def test_empty_string_missing_round_trip() -> None:
1000
df = pl.DataFrame({"varA": ["A", "", None], "varB": ["B", "", None]})
1001
for null in (None, "NA", "NULL", r"\N"):
1002
f = io.BytesIO()
1003
df.write_csv(f, null_value=null)
1004
f.seek(0)
1005
df_read = pl.read_csv(f, null_values=null)
1006
assert_frame_equal(df, df_read)
1007
1008
1009
def test_write_csv_separator() -> None:
1010
df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
1011
f = io.BytesIO()
1012
df.write_csv(f, separator="\t")
1013
f.seek(0)
1014
assert f.read() == b"a\tb\n1\t1\n2\t2\n3\t3\n"
1015
f.seek(0)
1016
assert_frame_equal(df, pl.read_csv(f, separator="\t"))
1017
1018
1019
def test_write_csv_line_terminator() -> None:
1020
df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
1021
f = io.BytesIO()
1022
df.write_csv(f, line_terminator="\r\n")
1023
f.seek(0)
1024
assert f.read() == b"a,b\r\n1,1\r\n2,2\r\n3,3\r\n"
1025
f.seek(0)
1026
assert_frame_equal(df, pl.read_csv(f, eol_char="\n"))
1027
1028
1029
def test_escaped_null_values() -> None:
1030
csv = textwrap.dedent(
1031
"""\
1032
"a","b","c"
1033
"a","n/a","NA"
1034
"None","2","3.0"
1035
"""
1036
)
1037
f = io.StringIO(csv)
1038
df = pl.read_csv(
1039
f,
1040
null_values={"a": "None", "b": "n/a", "c": "NA"},
1041
schema_overrides={"a": pl.String, "b": pl.Int64, "c": pl.Float64},
1042
)
1043
assert df[1, "a"] is None
1044
assert df[0, "b"] is None
1045
assert df[0, "c"] is None
1046
1047
1048
def test_quoting_round_trip() -> None:
1049
f = io.BytesIO()
1050
df = pl.DataFrame(
1051
{
1052
"a": [
1053
"tab,separated,field",
1054
"newline\nseparated\nfield",
1055
'quote"separated"field',
1056
]
1057
}
1058
)
1059
df.write_csv(f)
1060
f.seek(0)
1061
read_df = pl.read_csv(f)
1062
assert_frame_equal(read_df, df)
1063
1064
1065
def test_csv_field_schema_inference_with_whitespace() -> None:
1066
csv = """\
1067
bool,bool-,-bool,float,float-,-float,int,int-,-int
1068
true,true , true,1.2,1.2 , 1.2,1,1 , 1
1069
"""
1070
df = pl.read_csv(io.StringIO(csv), has_header=True)
1071
expected = pl.DataFrame(
1072
{
1073
"bool": [True],
1074
"bool-": ["true "],
1075
"-bool": [" true"],
1076
"float": [1.2],
1077
"float-": ["1.2 "],
1078
"-float": [" 1.2"],
1079
"int": [1],
1080
"int-": ["1 "],
1081
"-int": [" 1"],
1082
}
1083
)
1084
assert_frame_equal(df, expected)
1085
1086
1087
def test_fallback_chrono_parser() -> None:
1088
data = textwrap.dedent(
1089
"""\
1090
date_1,date_2
1091
2021-01-01,2021-1-1
1092
2021-02-02,2021-2-2
1093
2021-10-10,2021-10-10
1094
"""
1095
)
1096
df = pl.read_csv(data.encode(), try_parse_dates=True)
1097
assert df.null_count().row(0) == (0, 0)
1098
1099
1100
def test_tz_aware_try_parse_dates() -> None:
1101
data = (
1102
"a,b,c,d\n"
1103
"2020-01-01T02:00:00+01:00,2021-04-28T00:00:00+02:00,2021-03-28T00:00:00+01:00,2\n"
1104
"2020-01-01T03:00:00+01:00,2021-04-29T00:00:00+02:00,2021-03-29T00:00:00+02:00,3\n"
1105
)
1106
result = pl.read_csv(io.StringIO(data), try_parse_dates=True)
1107
expected = pl.DataFrame(
1108
{
1109
"a": [
1110
datetime(2020, 1, 1, 1, tzinfo=timezone.utc),
1111
datetime(2020, 1, 1, 2, tzinfo=timezone.utc),
1112
],
1113
"b": [
1114
datetime(2021, 4, 27, 22, tzinfo=timezone.utc),
1115
datetime(2021, 4, 28, 22, tzinfo=timezone.utc),
1116
],
1117
"c": [
1118
datetime(2021, 3, 27, 23, tzinfo=timezone.utc),
1119
datetime(2021, 3, 28, 22, tzinfo=timezone.utc),
1120
],
1121
"d": [2, 3],
1122
}
1123
)
1124
assert_frame_equal(result, expected)
1125
1126
1127
@pytest.mark.parametrize("try_parse_dates", [True, False])
1128
@pytest.mark.parametrize("time_unit", ["ms", "us", "ns"])
1129
def test_csv_overwrite_datetime_dtype(
1130
try_parse_dates: bool, time_unit: TimeUnit
1131
) -> None:
1132
data = """\
1133
a
1134
2020-1-1T00:00:00.123456789
1135
2020-1-2T00:00:00.987654321
1136
2020-1-3T00:00:00.132547698
1137
"""
1138
result = pl.read_csv(
1139
io.StringIO(data),
1140
try_parse_dates=try_parse_dates,
1141
schema_overrides={"a": pl.Datetime(time_unit)},
1142
)
1143
expected = pl.DataFrame(
1144
{
1145
"a": pl.Series(
1146
[
1147
"2020-01-01T00:00:00.123456789",
1148
"2020-01-02T00:00:00.987654321",
1149
"2020-01-03T00:00:00.132547698",
1150
]
1151
).str.to_datetime(time_unit=time_unit)
1152
}
1153
)
1154
assert_frame_equal(result, expected)
1155
1156
1157
def test_csv_string_escaping() -> None:
1158
df = pl.DataFrame({"a": ["Free trip to A,B", '''Special rate "1.79"''']})
1159
f = io.BytesIO()
1160
df.write_csv(f)
1161
f.seek(0)
1162
df_read = pl.read_csv(f)
1163
assert_frame_equal(df_read, df)
1164
1165
1166
@pytest.mark.write_disk
1167
def test_glob_csv(df_no_lists: pl.DataFrame, tmp_path: Path) -> None:
1168
tmp_path.mkdir(exist_ok=True)
1169
1170
df = df_no_lists.drop("strings_nulls")
1171
file_path = tmp_path / "small.csv"
1172
df.write_csv(file_path)
1173
1174
path_glob = tmp_path / "small*.csv"
1175
assert pl.scan_csv(path_glob).collect().shape == (3, 12)
1176
assert pl.read_csv(path_glob).shape == (3, 12)
1177
1178
1179
def test_csv_whitespace_separator_at_start_do_not_skip() -> None:
1180
csv = "\t\t\t\t0\t1"
1181
result = pl.read_csv(csv.encode(), separator="\t", has_header=False)
1182
expected = {
1183
"column_1": [None],
1184
"column_2": [None],
1185
"column_3": [None],
1186
"column_4": [None],
1187
"column_5": [0],
1188
"column_6": [1],
1189
}
1190
assert result.to_dict(as_series=False) == expected
1191
1192
1193
def test_csv_whitespace_separator_at_end_do_not_skip() -> None:
1194
csv = "0\t1\t\t\t\t"
1195
result = pl.read_csv(csv.encode(), separator="\t", has_header=False)
1196
expected = {
1197
"column_1": [0],
1198
"column_2": [1],
1199
"column_3": [None],
1200
"column_4": [None],
1201
"column_5": [None],
1202
"column_6": [None],
1203
}
1204
assert result.to_dict(as_series=False) == expected
1205
1206
1207
def test_csv_multiple_null_values() -> None:
1208
df = pl.DataFrame(
1209
{
1210
"a": [1, 2, None, 4],
1211
"b": ["2022-01-01", "__NA__", "", "NA"],
1212
}
1213
)
1214
f = io.BytesIO()
1215
df.write_csv(f)
1216
f.seek(0)
1217
1218
df2 = pl.read_csv(f, null_values=["__NA__", "NA"])
1219
expected = pl.DataFrame(
1220
{
1221
"a": [1, 2, None, 4],
1222
"b": ["2022-01-01", None, "", None],
1223
}
1224
)
1225
assert_frame_equal(df2, expected)
1226
1227
1228
def test_different_eol_char() -> None:
1229
csv = "a,1,10;b,2,20;c,3,30"
1230
expected = pl.DataFrame(
1231
{"column_1": ["a", "b", "c"], "column_2": [1, 2, 3], "column_3": [10, 20, 30]}
1232
)
1233
assert_frame_equal(
1234
pl.read_csv(csv.encode(), eol_char=";", has_header=False), expected
1235
)
1236
1237
1238
def test_csv_write_escape_headers() -> None:
1239
df0 = pl.DataFrame({"col,1": ["data,1"], 'col"2': ['data"2'], "col:3": ["data:3"]})
1240
out = io.BytesIO()
1241
df0.write_csv(out)
1242
assert out.getvalue() == b'"col,1","col""2",col:3\n"data,1","data""2",data:3\n'
1243
1244
df1 = pl.DataFrame({"c,o,l,u,m,n": [123]})
1245
out = io.BytesIO()
1246
df1.write_csv(out)
1247
1248
out.seek(0)
1249
df2 = pl.read_csv(out)
1250
assert_frame_equal(df1, df2)
1251
assert df2.schema == {"c,o,l,u,m,n": pl.Int64}
1252
1253
1254
def test_csv_write_escape_newlines() -> None:
1255
df = pl.DataFrame({"escape": ["n\nn"]})
1256
f = io.BytesIO()
1257
df.write_csv(f)
1258
f.seek(0)
1259
read_df = pl.read_csv(f)
1260
assert_frame_equal(df, read_df)
1261
1262
1263
def test_skip_new_line_embedded_lines() -> None:
1264
csv = r"""a,b,c,d,e\n
1265
1,2,3,"\n Test",\n
1266
4,5,6,"Test A",\n
1267
7,8,,"Test B \n",\n"""
1268
1269
for empty_string, missing_value in ((True, ""), (False, None)):
1270
df = pl.read_csv(
1271
csv.encode(),
1272
skip_rows_after_header=1,
1273
infer_schema_length=0,
1274
missing_utf8_is_empty_string=empty_string,
1275
)
1276
assert df.to_dict(as_series=False) == {
1277
"a": ["4", "7"],
1278
"b": ["5", "8"],
1279
"c": ["6", missing_value],
1280
"d": ["Test A", "Test B \\n"],
1281
"e\\n": ["\\n", "\\n"],
1282
}
1283
1284
1285
def test_csv_schema_overrides_bool() -> None:
1286
csv = "a, b\n" + ",false\n" + ",false\n" + ",false"
1287
df = pl.read_csv(
1288
csv.encode(),
1289
schema_overrides={"a": pl.Boolean, "b": pl.Boolean},
1290
)
1291
assert df.dtypes == [pl.Boolean, pl.Boolean]
1292
1293
1294
@pytest.mark.parametrize(
1295
("fmt", "expected"),
1296
[
1297
(None, "dt\n2022-01-02T00:00:00.000000\n"),
1298
("%F %T%.3f", "dt\n2022-01-02 00:00:00.000\n"),
1299
("%Y", "dt\n2022\n"),
1300
("%m", "dt\n01\n"),
1301
("%m$%d", "dt\n01$02\n"),
1302
("%R", "dt\n00:00\n"),
1303
],
1304
)
1305
def test_datetime_format(fmt: str, expected: str) -> None:
1306
df = pl.DataFrame({"dt": [datetime(2022, 1, 2)]})
1307
csv = df.write_csv(datetime_format=fmt)
1308
assert csv == expected
1309
1310
1311
@pytest.mark.parametrize(
1312
("fmt", "expected"),
1313
[
1314
(None, "dt\n2022-01-02T00:00:00.000000+0000\n"),
1315
("%F %T%.3f%z", "dt\n2022-01-02 00:00:00.000+0000\n"),
1316
("%Y%z", "dt\n2022+0000\n"),
1317
("%m%z", "dt\n01+0000\n"),
1318
("%m$%d%z", "dt\n01$02+0000\n"),
1319
("%R%z", "dt\n00:00+0000\n"),
1320
],
1321
)
1322
@pytest.mark.parametrize("tzinfo", [timezone.utc, timezone(timedelta(hours=0))])
1323
def test_datetime_format_tz_aware(fmt: str, expected: str, tzinfo: timezone) -> None:
1324
df = pl.DataFrame({"dt": [datetime(2022, 1, 2, tzinfo=tzinfo)]})
1325
csv = df.write_csv(datetime_format=fmt)
1326
assert csv == expected
1327
1328
1329
@pytest.mark.parametrize(
1330
("tu1", "tu2", "expected"),
1331
[
1332
(
1333
"ns",
1334
"ns",
1335
"x,y\n2022-09-04T10:30:45.123000000,2022-09-04T10:30:45.123000000\n",
1336
),
1337
(
1338
"ns",
1339
"us",
1340
"x,y\n2022-09-04T10:30:45.123000000,2022-09-04T10:30:45.123000\n",
1341
),
1342
(
1343
"ns",
1344
"ms",
1345
"x,y\n2022-09-04T10:30:45.123000000,2022-09-04T10:30:45.123\n",
1346
),
1347
("us", "us", "x,y\n2022-09-04T10:30:45.123000,2022-09-04T10:30:45.123000\n"),
1348
("us", "ms", "x,y\n2022-09-04T10:30:45.123000,2022-09-04T10:30:45.123\n"),
1349
("ms", "us", "x,y\n2022-09-04T10:30:45.123,2022-09-04T10:30:45.123000\n"),
1350
("ms", "ms", "x,y\n2022-09-04T10:30:45.123,2022-09-04T10:30:45.123\n"),
1351
],
1352
)
1353
def test_datetime_format_inferred_precision(
1354
tu1: TimeUnit, tu2: TimeUnit, expected: str
1355
) -> None:
1356
df = pl.DataFrame(
1357
data={
1358
"x": [datetime(2022, 9, 4, 10, 30, 45, 123000)],
1359
"y": [datetime(2022, 9, 4, 10, 30, 45, 123000)],
1360
},
1361
schema=[
1362
("x", pl.Datetime(tu1)),
1363
("y", pl.Datetime(tu2)),
1364
],
1365
)
1366
assert expected == df.write_csv()
1367
1368
1369
def test_inferred_datetime_format_mixed() -> None:
1370
ts = pl.datetime_range(datetime(2000, 1, 1), datetime(2000, 1, 2), eager=True)
1371
df = pl.DataFrame({"naive": ts, "aware": ts.dt.replace_time_zone("UTC")})
1372
result = df.write_csv()
1373
expected = (
1374
"naive,aware\n"
1375
"2000-01-01T00:00:00.000000,2000-01-01T00:00:00.000000+0000\n"
1376
"2000-01-02T00:00:00.000000,2000-01-02T00:00:00.000000+0000\n"
1377
)
1378
assert result == expected
1379
1380
1381
@pytest.mark.parametrize(
1382
("fmt", "expected"),
1383
[
1384
(None, "dt\n2022-01-02\n"),
1385
("%Y", "dt\n2022\n"),
1386
("%m", "dt\n01\n"),
1387
("%m$%d", "dt\n01$02\n"),
1388
],
1389
)
1390
def test_date_format(fmt: str, expected: str) -> None:
1391
df = pl.DataFrame({"dt": [date(2022, 1, 2)]})
1392
csv = df.write_csv(date_format=fmt)
1393
assert csv == expected
1394
1395
1396
@pytest.mark.parametrize(
1397
("fmt", "expected"),
1398
[
1399
(None, "dt\n16:15:30.000000000\n"),
1400
("%R", "dt\n16:15\n"),
1401
],
1402
)
1403
def test_time_format(fmt: str, expected: str) -> None:
1404
df = pl.DataFrame({"dt": [time(16, 15, 30)]})
1405
csv = df.write_csv(time_format=fmt)
1406
assert csv == expected
1407
1408
1409
@pytest.mark.parametrize("dtype", [pl.Float32, pl.Float64])
1410
def test_float_precision(dtype: pl.Float32 | pl.Float64) -> None:
1411
df = pl.Series("col", [1.0, 2.2, 3.33], dtype=dtype).to_frame()
1412
1413
assert df.write_csv(float_precision=None) == "col\n1.0\n2.2\n3.33\n"
1414
assert df.write_csv(float_precision=0) == "col\n1\n2\n3\n"
1415
assert df.write_csv(float_precision=1) == "col\n1.0\n2.2\n3.3\n"
1416
assert df.write_csv(float_precision=2) == "col\n1.00\n2.20\n3.33\n"
1417
assert df.write_csv(float_precision=3) == "col\n1.000\n2.200\n3.330\n"
1418
1419
1420
def test_float_scientific() -> None:
1421
df = (
1422
pl.Series(
1423
"colf64",
1424
[3.141592653589793 * mult for mult in (1e-8, 1e-3, 1e3, 1e17)],
1425
dtype=pl.Float64,
1426
)
1427
.to_frame()
1428
.with_columns(pl.col("colf64").cast(pl.Float32).alias("colf32"))
1429
)
1430
1431
assert (
1432
df.write_csv(float_precision=None, float_scientific=False)
1433
== "colf64,colf32\n0.00000003141592653589793,0.00000003141592586075603\n0.0031415926535897933,0.0031415927223861217\n3141.592653589793,3141.5927734375\n314159265358979300,314159265516355600\n"
1434
)
1435
assert (
1436
df.write_csv(float_precision=0, float_scientific=False)
1437
== "colf64,colf32\n0,0\n0,0\n3142,3142\n314159265358979328,314159265516355584\n"
1438
)
1439
assert (
1440
df.write_csv(float_precision=1, float_scientific=False)
1441
== "colf64,colf32\n0.0,0.0\n0.0,0.0\n3141.6,3141.6\n314159265358979328.0,314159265516355584.0\n"
1442
)
1443
assert (
1444
df.write_csv(float_precision=3, float_scientific=False)
1445
== "colf64,colf32\n0.000,0.000\n0.003,0.003\n3141.593,3141.593\n314159265358979328.000,314159265516355584.000\n"
1446
)
1447
1448
assert (
1449
df.write_csv(float_precision=None, float_scientific=True)
1450
== "colf64,colf32\n3.141592653589793e-8,3.1415926e-8\n3.1415926535897933e-3,3.1415927e-3\n3.141592653589793e3,3.1415928e3\n3.141592653589793e17,3.1415927e17\n"
1451
)
1452
assert (
1453
df.write_csv(float_precision=0, float_scientific=True)
1454
== "colf64,colf32\n3e-8,3e-8\n3e-3,3e-3\n3e3,3e3\n3e17,3e17\n"
1455
)
1456
assert (
1457
df.write_csv(float_precision=1, float_scientific=True)
1458
== "colf64,colf32\n3.1e-8,3.1e-8\n3.1e-3,3.1e-3\n3.1e3,3.1e3\n3.1e17,3.1e17\n"
1459
)
1460
assert (
1461
df.write_csv(float_precision=3, float_scientific=True)
1462
== "colf64,colf32\n3.142e-8,3.142e-8\n3.142e-3,3.142e-3\n3.142e3,3.142e3\n3.142e17,3.142e17\n"
1463
)
1464
1465
1466
def test_skip_rows_different_field_len() -> None:
1467
csv = io.StringIO(
1468
textwrap.dedent(
1469
"""\
1470
a,b
1471
1,A
1472
2,
1473
3,B
1474
4,
1475
"""
1476
)
1477
)
1478
for empty_string, missing_value in ((True, ""), (False, None)):
1479
csv.seek(0)
1480
assert pl.read_csv(
1481
csv, skip_rows_after_header=2, missing_utf8_is_empty_string=empty_string
1482
).to_dict(as_series=False) == {
1483
"a": [3, 4],
1484
"b": ["B", missing_value],
1485
}
1486
1487
1488
def test_duplicated_columns() -> None:
1489
csv = textwrap.dedent(
1490
"""a,a
1491
1,2
1492
"""
1493
)
1494
assert pl.read_csv(csv.encode()).columns == ["a", "a_duplicated_0"]
1495
new = ["c", "d"]
1496
assert pl.read_csv(csv.encode(), new_columns=new).columns == new
1497
1498
1499
def test_error_message() -> None:
1500
data = io.StringIO("target,wind,energy,miso\n1,2,3,4\n1,2,1e5,1\n")
1501
with pytest.raises(
1502
ComputeError,
1503
match=r"could not parse `1e5` as dtype `i64` at column 'energy' \(column number 3\)",
1504
):
1505
pl.read_csv(data, infer_schema_length=1)
1506
1507
1508
def test_csv_categorical_lifetime() -> None:
1509
# escaped strings do some heap allocates in the builder
1510
# this tests of the lifetimes remains valid
1511
csv = textwrap.dedent(
1512
r"""
1513
a,b
1514
"needs_escape",b
1515
"" ""needs" escape" foo"",b
1516
"" ""needs" escape" foo"",
1517
"""
1518
)
1519
1520
df = pl.read_csv(
1521
csv.encode(), schema_overrides={"a": pl.Categorical, "b": pl.Categorical}
1522
)
1523
assert df.dtypes == [pl.Categorical, pl.Categorical]
1524
assert df.to_dict(as_series=False) == {
1525
"a": ["needs_escape", ' "needs escape foo', ' "needs escape foo'],
1526
"b": ["b", "b", None],
1527
}
1528
1529
assert (df["a"] == df["b"]).to_list() == [False, False, None]
1530
1531
1532
def test_csv_categorical_categorical_merge() -> None:
1533
N = 50
1534
f = io.BytesIO()
1535
pl.DataFrame({"x": ["A"] * N + ["B"] * N}).write_csv(f)
1536
f.seek(0)
1537
assert pl.read_csv(
1538
f, schema_overrides={"x": pl.Categorical}, sample_size=10
1539
).unique(maintain_order=True)["x"].to_list() == ["A", "B"]
1540
1541
1542
@pytest.mark.write_disk
1543
def test_batched_csv_reader(foods_file_path: Path) -> None:
1544
reader = pl.read_csv_batched(foods_file_path, batch_size=4)
1545
assert isinstance(reader, BatchedCsvReader)
1546
1547
batches = reader.next_batches(5)
1548
assert batches is not None
1549
out = pl.concat(batches)
1550
assert_frame_equal(out, pl.read_csv(foods_file_path).head(out.height))
1551
1552
# the final batch of the low-memory variant is different
1553
reader = pl.read_csv_batched(foods_file_path, batch_size=4, low_memory=True)
1554
batches = reader.next_batches(10)
1555
assert batches is not None
1556
1557
assert_frame_equal(pl.concat(batches), pl.read_csv(foods_file_path))
1558
1559
reader = pl.read_csv_batched(foods_file_path, batch_size=4, low_memory=True)
1560
batches = reader.next_batches(10)
1561
assert_frame_equal(pl.concat(batches), pl.read_csv(foods_file_path)) # type: ignore[arg-type]
1562
1563
# ragged lines
1564
with NamedTemporaryFile() as tmp:
1565
data = b"A\nB,ragged\nC"
1566
tmp.write(data)
1567
tmp.seek(0)
1568
1569
expected = pl.DataFrame({"A": ["B", "C"]})
1570
batches = pl.read_csv_batched(
1571
tmp.name,
1572
has_header=True,
1573
truncate_ragged_lines=True,
1574
).next_batches(1)
1575
1576
assert batches is not None
1577
assert_frame_equal(pl.concat(batches), expected)
1578
1579
1580
def test_batched_csv_reader_empty(io_files_path: Path) -> None:
1581
empty_csv = io_files_path / "empty.csv"
1582
with pytest.raises(NoDataError, match="empty CSV"):
1583
pl.read_csv_batched(source=empty_csv)
1584
1585
reader = pl.read_csv_batched(source=empty_csv, raise_if_empty=False)
1586
assert reader.next_batches(1) is None
1587
1588
1589
def test_batched_csv_reader_all_batches(foods_file_path: Path) -> None:
1590
for new_columns in [None, ["Category", "Calories", "Fats_g", "Sugars_g"]]:
1591
out = pl.read_csv(foods_file_path, new_columns=new_columns)
1592
reader = pl.read_csv_batched(
1593
foods_file_path, new_columns=new_columns, batch_size=4
1594
)
1595
batches = reader.next_batches(5)
1596
batched_dfs = []
1597
1598
while batches:
1599
batched_dfs.extend(batches)
1600
batches = reader.next_batches(5)
1601
1602
assert all(x.height > 0 for x in batched_dfs)
1603
1604
batched_concat_df = pl.concat(batched_dfs, rechunk=True)
1605
assert_frame_equal(out, batched_concat_df)
1606
1607
1608
def test_batched_csv_reader_no_batches(foods_file_path: Path) -> None:
1609
reader = pl.read_csv_batched(foods_file_path, batch_size=4)
1610
batches = reader.next_batches(0)
1611
1612
assert batches is None
1613
1614
1615
def test_read_csv_batched_invalid_source() -> None:
1616
with pytest.raises(TypeError):
1617
pl.read_csv_batched(source=5) # type: ignore[arg-type]
1618
1619
1620
def test_csv_single_categorical_null() -> None:
1621
f = io.BytesIO()
1622
pl.DataFrame(
1623
{
1624
"x": ["A"],
1625
"y": [None],
1626
"z": ["A"],
1627
}
1628
).write_csv(f)
1629
f.seek(0)
1630
1631
df = pl.read_csv(
1632
f,
1633
schema_overrides={"y": pl.Categorical},
1634
)
1635
1636
assert df.dtypes == [pl.String, pl.Categorical, pl.String]
1637
assert df.to_dict(as_series=False) == {"x": ["A"], "y": [None], "z": ["A"]}
1638
1639
1640
def test_csv_quoted_missing() -> None:
1641
csv = (
1642
'"col1"|"col2"|"col3"|"col4"\n'
1643
'"0"|"Free text with a line\nbreak"|"123"|"456"\n'
1644
'"1"|"Free text without a linebreak"|""|"789"\n'
1645
'"0"|"Free text with \ntwo \nlinebreaks"|"101112"|"131415"'
1646
)
1647
result = pl.read_csv(
1648
csv.encode(), separator="|", schema_overrides={"col3": pl.Int32}
1649
)
1650
expected = pl.DataFrame(
1651
{
1652
"col1": [0, 1, 0],
1653
"col2": [
1654
"Free text with a line\nbreak",
1655
"Free text without a linebreak",
1656
"Free text with \ntwo \nlinebreaks",
1657
],
1658
"col3": [123, None, 101112],
1659
"col4": [456, 789, 131415],
1660
},
1661
schema_overrides={"col3": pl.Int32},
1662
)
1663
assert_frame_equal(result, expected)
1664
1665
1666
def test_csv_write_tz_aware() -> None:
1667
df = pl.DataFrame({"times": datetime(2021, 1, 1)}).with_columns(
1668
pl.col("times")
1669
.dt.replace_time_zone("UTC")
1670
.dt.convert_time_zone("Europe/Zurich")
1671
)
1672
assert df.write_csv() == "times\n2021-01-01T01:00:00.000000+0100\n"
1673
1674
1675
def test_csv_statistics_offset() -> None:
1676
# this would fail if the statistics sample did not also sample
1677
# from the end of the file
1678
# the lines at the end have larger rows as the numbers increase
1679
N = 5_000
1680
csv = "\n".join(str(x) for x in range(N))
1681
assert pl.read_csv(io.StringIO(csv), n_rows=N).height == 4999
1682
1683
1684
@pytest.mark.write_disk
1685
def test_csv_scan_categorical(tmp_path: Path) -> None:
1686
tmp_path.mkdir(exist_ok=True)
1687
1688
N = 5_000
1689
df = pl.DataFrame({"x": ["A"] * N})
1690
1691
file_path = tmp_path / "test_csv_scan_categorical.csv"
1692
df.write_csv(file_path)
1693
result = pl.scan_csv(file_path, schema_overrides={"x": pl.Categorical}).collect()
1694
1695
assert result["x"].dtype == pl.Categorical
1696
1697
1698
@pytest.mark.write_disk
1699
def test_csv_scan_new_columns_less_than_original_columns(tmp_path: Path) -> None:
1700
tmp_path.mkdir(exist_ok=True)
1701
1702
df = pl.DataFrame({"x": ["A"], "y": ["A"], "z": "A"})
1703
1704
file_path = tmp_path / "test_csv_scan_new_columns.csv"
1705
df.write_csv(file_path)
1706
result = pl.scan_csv(file_path, new_columns=["x_new", "y_new"]).collect()
1707
1708
assert result.columns == ["x_new", "y_new", "z"]
1709
1710
1711
def test_read_csv_chunked() -> None:
1712
"""Check that row count is properly functioning."""
1713
N = 10_000
1714
csv = "1\n" * N
1715
df = pl.read_csv(io.StringIO(csv), row_index_name="count")
1716
1717
# The next value should always be higher if monotonically increasing.
1718
assert df.filter(pl.col("count") < pl.col("count").shift(1)).is_empty()
1719
1720
1721
def test_read_empty_csv(io_files_path: Path) -> None:
1722
with pytest.raises(NoDataError) as err:
1723
pl.read_csv(io_files_path / "empty.csv")
1724
assert "empty CSV" in str(err.value)
1725
1726
df = pl.read_csv(io_files_path / "empty.csv", raise_if_empty=False)
1727
assert_frame_equal(df, pl.DataFrame())
1728
1729
with pytest.raises(pa.ArrowInvalid) as err:
1730
pl.read_csv(io_files_path / "empty.csv", use_pyarrow=True)
1731
assert "Empty CSV" in str(err.value)
1732
1733
df = pl.read_csv(
1734
io_files_path / "empty.csv", raise_if_empty=False, use_pyarrow=True
1735
)
1736
assert_frame_equal(df, pl.DataFrame())
1737
1738
1739
@pytest.mark.slow
1740
def test_read_web_file() -> None:
1741
url = "https://raw.githubusercontent.com/pola-rs/polars/main/examples/datasets/foods1.csv"
1742
df = pl.read_csv(url)
1743
assert df.shape == (27, 4)
1744
1745
1746
@pytest.mark.slow
1747
def test_csv_multiline_splits() -> None:
1748
# create a very unlikely csv file with many multilines in a
1749
# single field (e.g. 5000). polars must reject multi-threading here
1750
# as it cannot find proper file chunks without sequentially parsing.
1751
1752
np.random.seed(0)
1753
f = io.BytesIO()
1754
1755
def some_multiline_str(n: int) -> str:
1756
strs = []
1757
strs.append('"')
1758
# sample between 0-5 so it is likely the multiline field also gets 3 separators.
1759
strs.extend(f"{'xx,' * length}" for length in np.random.randint(0, 5, n))
1760
1761
strs.append('"')
1762
return "\n".join(strs)
1763
1764
for _ in range(4):
1765
f.write(f"field1,field2,{some_multiline_str(5000)}\n".encode())
1766
1767
f.seek(0)
1768
assert pl.read_csv(f, has_header=False).shape == (4, 3)
1769
1770
1771
def test_read_csv_n_rows_outside_heuristic() -> None:
1772
# create a fringe case csv file that breaks the heuristic determining how much of
1773
# the file to read, and ensure n_rows is still adhered to
1774
1775
f = io.StringIO()
1776
1777
f.write(",,,?????????\n" * 1000)
1778
f.write("?????????????????????????????????????????????????,,,\n")
1779
f.write(",,,?????????\n" * 1048)
1780
1781
f.seek(0)
1782
assert pl.read_csv(f, n_rows=2048, has_header=False).shape == (2048, 4)
1783
1784
1785
def test_read_csv_comments_on_top_with_schema_11667() -> None:
1786
csv = """
1787
# This is a comment
1788
A,B
1789
1,Hello
1790
2,World
1791
""".strip()
1792
1793
schema = {
1794
"A": pl.Int32(),
1795
"B": pl.Utf8(),
1796
}
1797
1798
df = pl.read_csv(io.StringIO(csv), comment_prefix="#", schema=schema)
1799
assert df.height == 2
1800
assert df.schema == schema
1801
1802
1803
def test_write_csv_stdout_stderr(capsys: pytest.CaptureFixture[str]) -> None:
1804
df = pl.DataFrame(
1805
{
1806
"numbers": [1, 2, 3],
1807
"strings": ["test", "csv", "stdout"],
1808
"dates": [date(2023, 1, 1), date(2023, 1, 2), date(2023, 1, 3)],
1809
}
1810
)
1811
df.write_csv(sys.stdout)
1812
captured = capsys.readouterr()
1813
assert captured.out == (
1814
"numbers,strings,dates\n"
1815
"1,test,2023-01-01\n"
1816
"2,csv,2023-01-02\n"
1817
"3,stdout,2023-01-03\n"
1818
)
1819
1820
df.write_csv(sys.stderr)
1821
captured = capsys.readouterr()
1822
assert captured.err == (
1823
"numbers,strings,dates\n"
1824
"1,test,2023-01-01\n"
1825
"2,csv,2023-01-02\n"
1826
"3,stdout,2023-01-03\n"
1827
)
1828
1829
1830
def test_csv_9929() -> None:
1831
df = pl.DataFrame({"nrs": [1, 2, 3]})
1832
f = io.BytesIO()
1833
df.write_csv(f)
1834
f.seek(0)
1835
with pytest.raises(NoDataError):
1836
pl.read_csv(f, skip_rows=10**6)
1837
1838
1839
def test_csv_quote_styles() -> None:
1840
class TemporalFormats(TypedDict):
1841
datetime_format: str
1842
time_format: str
1843
1844
temporal_formats: TemporalFormats = {
1845
"datetime_format": "%Y-%m-%dT%H:%M:%S",
1846
"time_format": "%H:%M:%S",
1847
}
1848
1849
dtm = datetime(2077, 7, 5, 3, 1, 0)
1850
dt = dtm.date()
1851
tm = dtm.time()
1852
1853
df = pl.DataFrame(
1854
{
1855
"float": [1.0, 2.0, None],
1856
"string": ["a", "a,bc", '"hello'],
1857
"int": [1, 2, 3],
1858
"bool": [True, False, None],
1859
"date": [dt, None, dt],
1860
"datetime": [None, dtm, dtm],
1861
"time": [tm, tm, None],
1862
"decimal": [D("1.0"), D("2.0"), None],
1863
}
1864
)
1865
1866
assert df.write_csv(quote_style="always", **temporal_formats) == (
1867
'"float","string","int","bool","date","datetime","time","decimal"\n'
1868
'"1.0","a","1","true","2077-07-05","","03:01:00","1.0"\n'
1869
'"2.0","a,bc","2","false","","2077-07-05T03:01:00","03:01:00","2.0"\n'
1870
'"","""hello","3","","2077-07-05","2077-07-05T03:01:00","",""\n'
1871
)
1872
assert df.write_csv(quote_style="necessary", **temporal_formats) == (
1873
"float,string,int,bool,date,datetime,time,decimal\n"
1874
"1.0,a,1,true,2077-07-05,,03:01:00,1.0\n"
1875
'2.0,"a,bc",2,false,,2077-07-05T03:01:00,03:01:00,2.0\n'
1876
',"""hello",3,,2077-07-05,2077-07-05T03:01:00,,\n'
1877
)
1878
assert df.write_csv(quote_style="never", **temporal_formats) == (
1879
"float,string,int,bool,date,datetime,time,decimal\n"
1880
"1.0,a,1,true,2077-07-05,,03:01:00,1.0\n"
1881
"2.0,a,bc,2,false,,2077-07-05T03:01:00,03:01:00,2.0\n"
1882
',"hello,3,,2077-07-05,2077-07-05T03:01:00,,\n'
1883
)
1884
assert df.write_csv(
1885
quote_style="non_numeric", quote_char="8", **temporal_formats
1886
) == (
1887
"8float8,8string8,8int8,8bool8,8date8,8datetime8,8time8,8decimal8\n"
1888
"1.0,8a8,1,8true8,82077-07-058,,803:01:008,1.0\n"
1889
"2.0,8a,bc8,2,8false8,,82077-07-05T03:01:008,803:01:008,2.0\n"
1890
',8"hello8,3,,82077-07-058,82077-07-05T03:01:008,,\n'
1891
)
1892
1893
1894
def test_ignore_errors_casting_dtypes() -> None:
1895
csv = """inventory
1896
10
1897
1898
400
1899
90
1900
"""
1901
1902
assert pl.read_csv(
1903
source=io.StringIO(csv),
1904
schema_overrides={"inventory": pl.Int8},
1905
ignore_errors=True,
1906
).to_dict(as_series=False) == {"inventory": [10, None, None, 90]}
1907
1908
with pytest.raises(ComputeError):
1909
pl.read_csv(
1910
source=io.StringIO(csv),
1911
schema_overrides={"inventory": pl.Int8},
1912
ignore_errors=False,
1913
)
1914
1915
1916
def test_ignore_errors_date_parser() -> None:
1917
data_invalid_date = "int,float,date\n3,3.4,X"
1918
with pytest.raises(ComputeError):
1919
pl.read_csv(
1920
source=io.StringIO(data_invalid_date),
1921
schema_overrides={"date": pl.Date},
1922
ignore_errors=False,
1923
)
1924
1925
1926
def test_csv_ragged_lines() -> None:
1927
expected = {"A": ["B", "C"]}
1928
assert (
1929
pl.read_csv(
1930
io.StringIO("A\nB,ragged\nC"), has_header=True, truncate_ragged_lines=True
1931
).to_dict(as_series=False)
1932
== expected
1933
)
1934
assert (
1935
pl.read_csv(
1936
io.StringIO("A\nB\nC,ragged"), has_header=True, truncate_ragged_lines=True
1937
).to_dict(as_series=False)
1938
== expected
1939
)
1940
1941
for s in ["A\nB,ragged\nC", "A\nB\nC,ragged"]:
1942
with pytest.raises(ComputeError, match=r"found more fields than defined"):
1943
pl.read_csv(io.StringIO(s), has_header=True, truncate_ragged_lines=False)
1944
with pytest.raises(ComputeError, match=r"found more fields than defined"):
1945
pl.read_csv(io.StringIO(s), has_header=True, truncate_ragged_lines=False)
1946
1947
1948
@pytest.mark.may_fail_auto_streaming # missing_columns parameter for CSV
1949
def test_provide_schema() -> None:
1950
# can be used to overload schema with ragged csv files
1951
assert pl.read_csv(
1952
io.StringIO("A\nB,ragged\nC"),
1953
has_header=False,
1954
schema={"A": pl.String, "B": pl.String, "C": pl.String},
1955
).to_dict(as_series=False) == {
1956
"A": ["A", "B", "C"],
1957
"B": [None, "ragged", None],
1958
"C": [None, None, None],
1959
}
1960
1961
1962
def test_custom_writable_object() -> None:
1963
df = pl.DataFrame({"a": [10, 20, 30], "b": ["x", "y", "z"]})
1964
1965
class CustomBuffer:
1966
writes: list[bytes]
1967
1968
def __init__(self) -> None:
1969
self.writes = []
1970
1971
def write(self, data: bytes) -> int:
1972
self.writes.append(data)
1973
return len(data)
1974
1975
buf = CustomBuffer()
1976
df.write_csv(buf) # type: ignore[call-overload]
1977
1978
assert b"".join(buf.writes) == b"a,b\n10,x\n20,y\n30,z\n"
1979
1980
1981
@pytest.mark.parametrize(
1982
("csv", "expected"),
1983
[
1984
(b"a,b\n1,2\n1,2\n", pl.DataFrame({"a": [1, 1], "b": [2, 2]})),
1985
(b"a,b\n1,2\n1,2", pl.DataFrame({"a": [1, 1], "b": [2, 2]})),
1986
(b"a\n1\n1\n", pl.DataFrame({"a": [1, 1]})),
1987
(b"a\n1\n1", pl.DataFrame({"a": [1, 1]})),
1988
],
1989
ids=[
1990
"multiple columns, ends with LF",
1991
"multiple columns, ends with non-LF",
1992
"single column, ends with LF",
1993
"single column, ends with non-LF",
1994
],
1995
)
1996
def test_read_filelike_object_12266(csv: bytes, expected: pl.DataFrame) -> None:
1997
buf = io.BufferedReader(io.BytesIO(csv)) # type: ignore[arg-type]
1998
df = pl.read_csv(buf)
1999
assert_frame_equal(df, expected)
2000
2001
2002
def test_read_filelike_object_12404() -> None:
2003
expected = pl.DataFrame({"a": [1, 1], "b": [2, 2]})
2004
csv = expected.write_csv(line_terminator=";").encode()
2005
buf = io.BufferedReader(io.BytesIO(csv)) # type: ignore[arg-type]
2006
df = pl.read_csv(buf, eol_char=";")
2007
assert_frame_equal(df, expected)
2008
2009
2010
def test_write_csv_bom() -> None:
2011
df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
2012
f = io.BytesIO()
2013
df.write_csv(f, include_bom=True)
2014
f.seek(0)
2015
assert f.read() == b"\xef\xbb\xbfa,b\n1,1\n2,2\n3,3\n"
2016
2017
2018
def test_write_csv_batch_size_zero() -> None:
2019
df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
2020
f = io.BytesIO()
2021
with pytest.raises(ValueError, match="invalid zero value"):
2022
df.write_csv(f, batch_size=0)
2023
2024
2025
def test_empty_csv_no_raise() -> None:
2026
assert pl.read_csv(io.StringIO(), raise_if_empty=False, has_header=False).shape == (
2027
0,
2028
0,
2029
)
2030
2031
2032
def test_csv_no_new_line_last() -> None:
2033
csv = io.StringIO("a b\n1 1\n2 2\n3 2.1")
2034
assert pl.read_csv(csv, separator=" ").to_dict(as_series=False) == {
2035
"a": [1, 2, 3],
2036
"b": [1.0, 2.0, 2.1],
2037
}
2038
2039
2040
def test_invalid_csv_raise() -> None:
2041
with pytest.raises(ComputeError):
2042
pl.read_csv(
2043
b"""
2044
"WellCompletionCWI","FacilityID","ProductionMonth","ReportedHoursProdInj","ProdAccountingProductType","ReportedVolume","VolumetricActivityType"
2045
"SK0000608V001","SK BT B1H3780","202001","","GAS","1.700","PROD"
2046
"SK0127960V000","SK BT 0018977","202001","","GAS","45.500","PROD"
2047
"SK0127960V000","SK BT 0018977","
2048
""".strip()
2049
)
2050
2051
2052
@pytest.mark.write_disk
2053
def test_partial_read_compressed_file(
2054
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
2055
) -> None:
2056
monkeypatch.setenv("POLARS_FORCE_ASYNC", "0")
2057
2058
df = pl.DataFrame(
2059
{"idx": range(1_000), "dt": date(2025, 12, 31), "txt": "hello world"}
2060
)
2061
tmp_path.mkdir(exist_ok=True)
2062
file_path = tmp_path / "large.csv.gz"
2063
bytes_io = io.BytesIO()
2064
df.write_csv(bytes_io)
2065
bytes_io.seek(0)
2066
with gzip.open(file_path, mode="wb") as f:
2067
f.write(bytes_io.getvalue())
2068
df = pl.read_csv(
2069
file_path, skip_rows=40, has_header=False, skip_rows_after_header=20, n_rows=30
2070
)
2071
assert df.shape == (30, 3)
2072
2073
2074
def test_read_csv_invalid_schema_overrides() -> None:
2075
csv = textwrap.dedent(
2076
"""\
2077
a,b
2078
1,foo
2079
2,bar
2080
3,baz
2081
"""
2082
)
2083
f = io.StringIO(csv)
2084
with pytest.raises(
2085
TypeError, match="`schema_overrides` should be of type list or dict"
2086
):
2087
pl.read_csv(f, schema_overrides={pl.Int64, pl.String}) # type: ignore[arg-type]
2088
2089
2090
def test_read_csv_invalid_schema_overrides_length() -> None:
2091
csv = textwrap.dedent(
2092
"""\
2093
a,b
2094
1,foo
2095
2,bar
2096
3,baz
2097
"""
2098
)
2099
f = io.StringIO(csv)
2100
2101
# streaming dispatches read_csv -> _scan_csv_impl which does not accept a list
2102
if (
2103
os.getenv("POLARS_AUTO_NEW_STREAMING", os.getenv("POLARS_FORCE_NEW_STREAMING"))
2104
== "1"
2105
):
2106
err = TypeError
2107
match = "expected 'schema_overrides' dict, found 'list'"
2108
else:
2109
err = InvalidOperationError # type: ignore[assignment]
2110
match = "The number of schema overrides must be less than or equal to the number of fields"
2111
2112
with pytest.raises(err, match=match):
2113
pl.read_csv(f, schema_overrides=[pl.Int64, pl.String, pl.Boolean])
2114
2115
2116
@pytest.mark.parametrize("columns", [["b"], "b"])
2117
def test_read_csv_single_column(columns: list[str] | str) -> None:
2118
csv = textwrap.dedent(
2119
"""\
2120
a,b,c
2121
1,2,3
2122
4,5,6
2123
"""
2124
)
2125
f = io.StringIO(csv)
2126
df = pl.read_csv(f, columns=columns)
2127
expected = pl.DataFrame({"b": [2, 5]})
2128
assert_frame_equal(df, expected)
2129
2130
2131
def test_csv_invalid_escape_utf8_14960() -> None:
2132
with pytest.raises(ComputeError, match=r"Field .* is not properly escaped"):
2133
pl.read_csv('col1\n""•'.encode())
2134
2135
2136
def test_csv_invalid_escape() -> None:
2137
with pytest.raises(ComputeError):
2138
pl.read_csv(b'col1,col2\n"a,b')
2139
2140
2141
@pytest.mark.slow
2142
@pytest.mark.write_disk
2143
def test_read_csv_only_loads_selected_columns(
2144
memory_usage_without_pyarrow: MemoryUsage,
2145
tmp_path: Path,
2146
) -> None:
2147
"""Only requested columns are loaded by ``read_csv()``."""
2148
tmp_path.mkdir(exist_ok=True)
2149
2150
# Each column will be about 8MB of RAM
2151
series = pl.arange(0, 1_000_000, dtype=pl.Int64, eager=True)
2152
2153
file_path = tmp_path / "multicolumn.csv"
2154
df = pl.DataFrame(
2155
{
2156
"a": series,
2157
"b": series,
2158
}
2159
)
2160
df.write_csv(file_path)
2161
del df, series
2162
2163
memory_usage_without_pyarrow.reset_tracking()
2164
2165
# Only load one column:
2166
df = pl.read_csv(str(file_path), columns=["b"], rechunk=False)
2167
del df
2168
# Only one column's worth of memory should be used; 2 columns would be
2169
# 16_000_000 at least, but there's some overhead.
2170
# assert 8_000_000 < memory_usage_without_pyarrow.get_peak() < 13_000_000
2171
2172
# Globs use a different code path for reading
2173
memory_usage_without_pyarrow.reset_tracking()
2174
df = pl.read_csv(str(tmp_path / "*.csv"), columns=["b"], rechunk=False)
2175
del df
2176
# Only one column's worth of memory should be used; 2 columns would be
2177
# 16_000_000 at least, but there's some overhead.
2178
# assert 8_000_000 < memory_usage_without_pyarrow.get_peak() < 13_000_000
2179
2180
# read_csv_batched() test:
2181
memory_usage_without_pyarrow.reset_tracking()
2182
result: list[pl.DataFrame] = []
2183
batched = pl.read_csv_batched(
2184
str(file_path),
2185
columns=["b"],
2186
rechunk=False,
2187
n_threads=1,
2188
low_memory=True,
2189
batch_size=10_000,
2190
)
2191
while sum(df.height for df in result) < 1_000_000:
2192
next_batch = batched.next_batches(1)
2193
if next_batch is None:
2194
break
2195
result += next_batch
2196
del result
2197
# assert 8_000_000 < memory_usage_without_pyarrow.get_peak() < 20_000_000
2198
2199
2200
def test_csv_escape_cf_15349() -> None:
2201
f = io.BytesIO()
2202
df = pl.DataFrame({"test": ["normal", "with\rcr"]})
2203
df.write_csv(f)
2204
f.seek(0)
2205
assert f.read() == b'test\nnormal\n"with\rcr"\n'
2206
2207
2208
@pytest.mark.write_disk
2209
@pytest.mark.parametrize("streaming", [True, False])
2210
def test_skip_rows_after_header(tmp_path: Path, streaming: bool) -> None:
2211
tmp_path.mkdir(exist_ok=True)
2212
path = tmp_path / "data.csv"
2213
2214
df = pl.Series("a", [1, 2, 3, 4, 5], dtype=pl.Int64).to_frame()
2215
df.write_csv(path)
2216
2217
skip = 2
2218
expect = df.slice(skip)
2219
out = pl.scan_csv(path, skip_rows_after_header=skip).collect(
2220
engine="streaming" if streaming else "in-memory"
2221
)
2222
2223
assert_frame_equal(out, expect)
2224
2225
2226
@pytest.mark.parametrize("use_pyarrow", [True, False])
2227
def test_skip_rows_after_header_pyarrow(use_pyarrow: bool) -> None:
2228
csv = textwrap.dedent(
2229
"""\
2230
foo,bar
2231
1,2
2232
3,4
2233
5,6
2234
"""
2235
)
2236
f = io.StringIO(csv)
2237
df = pl.read_csv(f, skip_rows_after_header=1, use_pyarrow=use_pyarrow)
2238
expected = pl.DataFrame({"foo": [3, 5], "bar": [4, 6]})
2239
assert_frame_equal(df, expected)
2240
2241
2242
def test_csv_float_decimal() -> None:
2243
floats = b"a;b\n12,239;1,233\n13,908;87,32"
2244
read = pl.read_csv(floats, decimal_comma=True, separator=";")
2245
assert read.dtypes == [pl.Float64] * 2
2246
assert read.to_dict(as_series=False) == {"a": [12.239, 13.908], "b": [1.233, 87.32]}
2247
2248
2249
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
2250
def test_fsspec_not_available() -> None:
2251
with pytest.MonkeyPatch.context() as mp:
2252
mp.setenv("POLARS_FORCE_ASYNC", "0")
2253
mp.setattr("polars.io._utils._FSSPEC_AVAILABLE", False)
2254
2255
with pytest.raises(
2256
ImportError, match=r"`fsspec` is required for `storage_options` argument"
2257
):
2258
pl.read_csv(
2259
"s3://foods/cabbage.csv",
2260
storage_options={"key": "key", "secret": "secret"},
2261
)
2262
2263
2264
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
2265
def test_read_csv_dtypes_deprecated() -> None:
2266
csv = textwrap.dedent(
2267
"""\
2268
a,b,c
2269
1,2,3
2270
4,5,6
2271
"""
2272
)
2273
f = io.StringIO(csv)
2274
2275
with pytest.deprecated_call():
2276
df = pl.read_csv(f, dtypes=[pl.Int8, pl.Int8, pl.Int8]) # type: ignore[call-arg]
2277
2278
expected = pl.DataFrame(
2279
{"a": [1, 4], "b": [2, 5], "c": [3, 6]},
2280
schema={"a": pl.Int8, "b": pl.Int8, "c": pl.Int8},
2281
)
2282
assert_frame_equal(df, expected)
2283
2284
2285
def test_projection_applied_on_file_with_no_rows_16606(tmp_path: Path) -> None:
2286
tmp_path.mkdir(exist_ok=True)
2287
2288
path = tmp_path / "data.csv"
2289
2290
data = """\
2291
a,b,c,d
2292
"""
2293
2294
with path.open("w") as f:
2295
f.write(data)
2296
2297
columns = ["a", "b"]
2298
2299
out = pl.read_csv(path, columns=columns).columns
2300
assert out == columns
2301
2302
out = pl.scan_csv(path).select(columns).collect().columns
2303
assert out == columns
2304
2305
2306
@pytest.mark.write_disk
2307
def test_write_csv_to_dangling_file_17328(
2308
df_no_lists: pl.DataFrame, tmp_path: Path
2309
) -> None:
2310
tmp_path.mkdir(exist_ok=True)
2311
df_no_lists.write_csv((tmp_path / "dangling.csv").open("w"))
2312
2313
2314
@pytest.mark.may_fail_cloud # really hard to mimic this error
2315
@pytest.mark.write_disk
2316
def test_write_csv_raise_on_non_utf8_17328(
2317
df_no_lists: pl.DataFrame, tmp_path: Path
2318
) -> None:
2319
tmp_path.mkdir(exist_ok=True)
2320
with pytest.raises(InvalidOperationError, match="file encoding is not UTF-8"):
2321
df_no_lists.write_csv((tmp_path / "dangling.csv").open("w", encoding="gbk"))
2322
2323
2324
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
2325
@pytest.mark.write_disk
2326
def test_write_csv_appending_17543(tmp_path: Path) -> None:
2327
tmp_path.mkdir(exist_ok=True)
2328
df = pl.DataFrame({"col": ["value"]})
2329
with (tmp_path / "append.csv").open("w") as f:
2330
f.write("# test\n")
2331
df.write_csv(f)
2332
with (tmp_path / "append.csv").open("r") as f:
2333
assert f.readline() == "# test\n"
2334
assert pl.read_csv(f).equals(df)
2335
2336
2337
def test_write_csv_passing_params_18825() -> None:
2338
df = pl.DataFrame({"c1": [1, 2], "c2": [3, 4]})
2339
buffer = io.StringIO()
2340
df.write_csv(buffer, separator="\t", include_header=False)
2341
2342
result_str = buffer.getvalue()
2343
expected_str = "1\t3\n2\t4\n"
2344
2345
assert result_str == expected_str
2346
2347
2348
@pytest.mark.parametrize(
2349
("dtype", "df"),
2350
[
2351
(pl.Decimal(scale=2), pl.DataFrame({"x": ["0.1"]}).cast(pl.Decimal(scale=2))),
2352
(pl.Categorical, pl.DataFrame({"x": ["A"]})),
2353
(
2354
pl.Time,
2355
pl.DataFrame({"x": ["12:15:00"]}).with_columns(
2356
pl.col("x").str.strptime(pl.Time)
2357
),
2358
),
2359
],
2360
)
2361
def test_read_csv_cast_unparsable_later(
2362
dtype: pl.Decimal | pl.Categorical | pl.Time, df: pl.DataFrame
2363
) -> None:
2364
f = io.BytesIO()
2365
df.write_csv(f)
2366
f.seek(0)
2367
assert df.equals(pl.read_csv(f, schema={"x": dtype}))
2368
2369
2370
def test_csv_double_new_line() -> None:
2371
assert pl.read_csv(b"a,b,c\n\n", has_header=False).to_dict(as_series=False) == {
2372
"column_1": ["a", None],
2373
"column_2": ["b", None],
2374
"column_3": ["c", None],
2375
}
2376
2377
2378
def test_csv_quoted_newlines_skip_rows_19535() -> None:
2379
assert_frame_equal(
2380
pl.read_csv(
2381
b"""\
2382
"a\nb"
2383
0
2384
""",
2385
has_header=False,
2386
skip_rows=1,
2387
new_columns=["x"],
2388
),
2389
pl.DataFrame({"x": 0}),
2390
)
2391
2392
2393
@pytest.mark.write_disk
2394
def test_csv_read_time_dtype(tmp_path: Path) -> None:
2395
tmp_path.mkdir(exist_ok=True)
2396
path = tmp_path / "1"
2397
path.write_bytes(b"""\
2398
time
2399
00:00:00.000000000
2400
""")
2401
2402
df = pl.Series("time", [0]).cast(pl.Time()).to_frame()
2403
2404
assert_frame_equal(pl.read_csv(path, try_parse_dates=True), df)
2405
assert_frame_equal(pl.read_csv(path, schema_overrides={"time": pl.Time}), df)
2406
assert_frame_equal(pl.scan_csv(path, try_parse_dates=True).collect(), df)
2407
assert_frame_equal(pl.scan_csv(path, schema={"time": pl.Time}).collect(), df)
2408
assert_frame_equal(
2409
pl.scan_csv(path, schema={"time": pl.Time}).collect(engine="streaming"), df
2410
)
2411
2412
2413
def test_csv_try_parse_dates_leading_zero_8_digits_22167() -> None:
2414
result = pl.read_csv(
2415
io.StringIO(
2416
"a\n2025-04-06T18:56:42.617736974Z\n2025-04-06T18:57:42.77756192Z\n2025-04-06T18:58:44.56928733Z"
2417
),
2418
try_parse_dates=True,
2419
)
2420
expected = pl.DataFrame(
2421
{
2422
"a": [
2423
datetime(2025, 4, 6, 18, 56, 42, 617736, tzinfo=timezone.utc),
2424
datetime(2025, 4, 6, 18, 57, 42, 777561, tzinfo=timezone.utc),
2425
datetime(2025, 4, 6, 18, 58, 44, 569287, tzinfo=timezone.utc),
2426
]
2427
}
2428
)
2429
assert_frame_equal(result, expected)
2430
2431
2432
@pytest.mark.may_fail_auto_streaming # read->scan_csv dispatch
2433
def test_csv_read_time_schema_overrides() -> None:
2434
df = pl.Series("time", [0]).cast(pl.Time()).to_frame()
2435
2436
assert_frame_equal(
2437
pl.read_csv(
2438
b"""\
2439
time
2440
00:00:00.000000000
2441
""",
2442
schema_overrides=[pl.Time],
2443
),
2444
df,
2445
)
2446
2447
2448
def test_batched_csv_schema_overrides(io_files_path: Path) -> None:
2449
foods = io_files_path / "foods1.csv"
2450
batched = pl.read_csv_batched(foods, schema_overrides={"calories": pl.String})
2451
res = batched.next_batches(1)
2452
assert res is not None
2453
b = res[0]
2454
assert b["calories"].dtype == pl.String
2455
assert b.width == 4
2456
2457
2458
def test_csv_ragged_lines_20062() -> None:
2459
buf = io.StringIO("""A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V
2460
,"B",,,,,,,,,A,,,,,,,,
2461
a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,0.0,1.0,2.0,3.0
2462
""")
2463
assert pl.read_csv(buf, truncate_ragged_lines=True).to_dict(as_series=False) == {
2464
"A": [None, "a"],
2465
"B": ["B", "a"],
2466
"C": [None, "a"],
2467
"D": [None, "a"],
2468
"E": [None, "a"],
2469
"F": [None, "a"],
2470
"G": [None, "a"],
2471
"H": [None, "a"],
2472
"I": [None, "a"],
2473
"J": [None, "a"],
2474
"K": ["A", "a"],
2475
"L": [None, "a"],
2476
"M": [None, "a"],
2477
"N": [None, "a"],
2478
"O": [None, "a"],
2479
"P": [None, "a"],
2480
"Q": [None, "a"],
2481
"R": [None, "a"],
2482
"S": [None, "a"],
2483
"T": [None, 0.0],
2484
"U": [None, 1.0],
2485
"V": [None, 2.0],
2486
}
2487
2488
2489
def test_csv_skip_lines() -> None:
2490
fh = io.BytesIO()
2491
fh.write(b'Header line "1" -> quote count 2\n')
2492
fh.write(b'Header line "2"" -> quote count 3\n')
2493
fh.write(b'Header line "3" -> quote count 2 => Total 7 quotes ERROR\n')
2494
fh.write(b"column_01, column_02, column_03\n")
2495
fh.write(b"123.12, 21, 99.9\n")
2496
fh.write(b"65.84, 75, 64.7\n")
2497
fh.seek(0)
2498
2499
df = pl.read_csv(fh, has_header=True, skip_lines=3)
2500
assert df.to_dict(as_series=False) == {
2501
"column_01": [123.12, 65.84],
2502
" column_02": [" 21", " 75"],
2503
" column_03": [" 99.9", " 64.7"],
2504
}
2505
2506
fh.seek(0)
2507
assert_frame_equal(pl.scan_csv(fh, has_header=True, skip_lines=3).collect(), df)
2508
2509
2510
def test_csv_invalid_quoted_comment_line() -> None:
2511
# Comment quotes should be ignored.
2512
assert pl.read_csv(
2513
b'#"Comment\nColA\tColB\n1\t2', separator="\t", comment_prefix="#"
2514
).to_dict(as_series=False) == {"ColA": [1], "ColB": [2]}
2515
2516
2517
@pytest.mark.may_fail_auto_streaming # missing_columns parameter for CSV
2518
def test_csv_compressed_new_columns_19916() -> None:
2519
n_rows = 100
2520
2521
df = pl.DataFrame(
2522
{
2523
"a": range(n_rows),
2524
"b": range(n_rows),
2525
"c": range(n_rows),
2526
"d": range(n_rows),
2527
"e": range(n_rows),
2528
"f": range(n_rows),
2529
}
2530
)
2531
2532
b = zstandard.compress(df.write_csv(include_header=False).encode())
2533
2534
q = pl.scan_csv(b, has_header=False, new_columns=["a", "b", "c", "d", "e", "f"])
2535
assert_frame_equal(q.collect(), df)
2536
2537
2538
def test_trailing_separator_8240() -> None:
2539
csv = "A|B|"
2540
2541
expected = pl.DataFrame(
2542
{"column_1": ["A"], "column_2": ["B"], "column_3": [None]},
2543
schema={"column_1": pl.String, "column_2": pl.String, "column_3": pl.String},
2544
)
2545
2546
result = pl.read_csv(io.StringIO(csv), separator="|", has_header=False)
2547
assert_frame_equal(result, expected)
2548
2549
result = pl.scan_csv(io.StringIO(csv), separator="|", has_header=False).collect()
2550
assert_frame_equal(result, expected)
2551
2552
2553
def test_header_only_column_selection_17173() -> None:
2554
csv = "A,B"
2555
result = pl.read_csv(io.StringIO(csv), columns=["B"])
2556
expected = pl.Series("B", [], pl.String()).to_frame()
2557
assert_frame_equal(result, expected)
2558
2559
2560
def test_csv_enum_raise() -> None:
2561
ENUM_DTYPE = pl.Enum(["foo", "bar"])
2562
with (
2563
io.StringIO("col\nfoo\nbaz\n") as csv,
2564
pytest.raises(pl.exceptions.ComputeError, match="could not parse `baz`"),
2565
):
2566
pl.read_csv(
2567
csv,
2568
schema={"col": ENUM_DTYPE},
2569
)
2570
2571
2572
def test_csv_no_header_ragged_lines_1505() -> None:
2573
# Test that the header schema will grow dynamically.
2574
csv = io.StringIO("""a,b,c
2575
a,b,c,d,e,f
2576
g,h,i,j,k""")
2577
2578
assert pl.read_csv(csv, has_header=False).to_dict(as_series=False) == {
2579
"column_1": ["a", "a", "g"],
2580
"column_2": ["b", "b", "h"],
2581
"column_3": ["c", "c", "i"],
2582
"column_4": [None, "d", "j"],
2583
"column_5": [None, "e", "k"],
2584
"column_6": [None, "f", None],
2585
}
2586
2587
2588
@pytest.mark.parametrize(
2589
("filter_value", "expected"),
2590
[
2591
(10, "a,b,c\n10,20,99\n"),
2592
(11, "a,b,c\n11,21,99\n"),
2593
(12, "a,b,c\n12,22,99\n12,23,99\n"),
2594
],
2595
)
2596
def test_csv_write_scalar_empty_chunk_20273(filter_value: int, expected: str) -> None:
2597
# df and filter expression are designed to test different
2598
# Column variants (Series, Scalar) and different number of chunks:
2599
# 10 > single row, ScalarColumn, multiple chunks, first is non-empty
2600
# 11 > single row, ScalarColumn, multiple chunks, first is empty
2601
# 12 > multiple rows, SeriesColumn, multiple chunks, some empty
2602
df1 = pl.DataFrame(
2603
{
2604
"a": [10, 11, 12, 12], # (12, 12 is intentional)
2605
"b": [20, 21, 22, 23],
2606
},
2607
)
2608
df2 = pl.DataFrame({"c": [99]})
2609
df3 = df1.join(df2, how="cross").filter(pl.col("a").eq(filter_value))
2610
assert df3.write_csv() == expected
2611
2612
2613
def test_csv_malformed_quote_in_unenclosed_field_22395() -> None:
2614
# Note - the malformed detection logic is very basic, and fails to detect many
2615
# types at this point (for eaxample: 'a,b"c,x"y' will not be detected).
2616
# Below is a one pattern that will be flagged (odd number of quotes in a row).
2617
malformed = b"""\
2618
a,b,x"y
2619
a,x"y,c
2620
x"y,b,c
2621
"""
2622
# short: non-SIMD code path
2623
with pytest.raises(pl.exceptions.ComputeError):
2624
pl.read_csv(malformed, has_header=False)
2625
with pytest.raises(pl.exceptions.ComputeError):
2626
pl.scan_csv(malformed, has_header=False).collect()
2627
with pytest.warns(UserWarning):
2628
pl.read_csv(malformed, has_header=False, ignore_errors=True)
2629
2630
# long: trigger SIMD code path (> 64 bytes)
2631
malformed_long = malformed + ("k,l,m\n" * 10).encode()
2632
with pytest.raises(pl.exceptions.ComputeError):
2633
pl.read_csv(malformed_long, has_header=False)
2634
with pytest.raises(pl.exceptions.ComputeError):
2635
pl.scan_csv(malformed_long, has_header=False).collect()
2636
2637
2638
# Note: in some cases, marked "(excessive quoting)", the expected value has
2639
# quoted fields even when that is not strictly necessary.
2640
# It is okay to relax these tests in the future when code is refactored
2641
@pytest.mark.parametrize(
2642
(
2643
"separator",
2644
"quote_style",
2645
"scientific",
2646
"precision",
2647
"decimal_comma",
2648
"expected",
2649
),
2650
[
2651
(",", None, None, None, False, b"123.75,60.0,9\n"),
2652
(",", None, None, None, True, b'"123,75","60,0",9\n'),
2653
(";", None, None, None, True, b"123,75;60,0;9\n"),
2654
(",", None, None, 0, True, b"124,60,9\n"),
2655
(",", None, None, 3, True, b'"123,750","60,000",9\n'),
2656
(";", None, None, 0, True, b"124;60;9\n"),
2657
(";", None, None, 3, True, b"123,750;60,000;9\n"),
2658
(",", None, True, None, False, b"1.2375e2,6e1,9\n"),
2659
(",", None, True, None, True, b'"1,2375e2","6e1",9\n'), # (excessive quoting)
2660
(",", None, False, None, False, b"123.75,60,9\n"),
2661
(",", None, False, None, True, b'"123,75","60",9\n'), # (excessive quoting)
2662
(";", None, True, None, True, b"1,2375e2;6e1;9\n"),
2663
(";", None, False, None, True, b"123,75;60;9\n"),
2664
(",", None, True, 0, True, b"1e2,6e1,9\n"),
2665
(",", None, True, 3, True, b'"1,238e2","6,000e1",9\n'),
2666
(",", None, True, 4, True, b'"1,2375e2","6,0000e1",9\n'),
2667
(",", None, True, 5, True, b'"1,23750e2","6,00000e1",9\n'),
2668
(",", None, False, 0, True, b"124,60,9\n"),
2669
(",", None, False, 3, True, b'"123,750","60,000",9\n'),
2670
(",", "always", None, None, True, b'"123,75","60,0","9"\n'),
2671
(",", "necessary", None, None, True, b'"123,75","60,0",9\n'),
2672
(",", "non_numeric", None, None, True, b'"123,75","60,0",9\n'),
2673
(",", "never", None, None, True, b"123,75,60,0,9\n"),
2674
(";", "always", None, None, True, b'"123,75";"60,0";"9"\n'),
2675
(";", "necessary", None, None, True, b"123,75;60,0;9\n"),
2676
(";", "non_numeric", None, None, True, b"123,75;60,0;9\n"),
2677
(";", "never", None, None, True, b"123,75;60,0;9\n"),
2678
],
2679
)
2680
def test_write_csv_decimal_comma(
2681
separator: str,
2682
quote_style: CsvQuoteStyle | None,
2683
scientific: bool | None,
2684
precision: int | None,
2685
decimal_comma: bool,
2686
expected: bytes,
2687
) -> None:
2688
# as Float64 (implicit)
2689
df = pl.DataFrame({"a": [123.75], "b": [60.0], "c": [9]})
2690
buf = io.BytesIO()
2691
df.write_csv(
2692
buf,
2693
separator=separator,
2694
quote_style=quote_style,
2695
float_precision=precision,
2696
float_scientific=scientific,
2697
decimal_comma=decimal_comma,
2698
include_header=False,
2699
)
2700
buf.seek(0)
2701
assert buf.read() == expected
2702
2703
# as Float32 (explicit)
2704
df32 = df.with_columns(pl.col("a", "b").cast(pl.Float32))
2705
buf.seek(0)
2706
df32.write_csv(
2707
buf,
2708
separator=separator,
2709
quote_style=quote_style,
2710
float_precision=precision,
2711
float_scientific=scientific,
2712
decimal_comma=decimal_comma,
2713
include_header=False,
2714
)
2715
buf.seek(0)
2716
assert buf.read() == expected
2717
2718
# Round-trip testing: assert df == read_csv(write_csv(df)), unless:
2719
# - precision affects the value, or
2720
# - quote_style = 'never' generates malformed csv
2721
round_trip = not (
2722
(not scientific and precision is not None and precision <= 2)
2723
or (scientific and precision is not None and precision != 4)
2724
or (quote_style == "never" and decimal_comma and separator == ",")
2725
)
2726
if round_trip:
2727
# eager
2728
buf.seek(0)
2729
df.write_csv(
2730
buf,
2731
separator=separator,
2732
quote_style=quote_style,
2733
float_precision=precision,
2734
float_scientific=scientific,
2735
decimal_comma=decimal_comma,
2736
include_header=True,
2737
)
2738
buf.seek(0)
2739
out = pl.read_csv(
2740
buf, decimal_comma=decimal_comma, separator=separator, schema=df.schema
2741
)
2742
assert_frame_equal(df, out)
2743
2744
# lazy
2745
buf.seek(0)
2746
df.lazy().sink_csv(
2747
buf,
2748
separator=separator,
2749
quote_style=quote_style,
2750
float_precision=precision,
2751
float_scientific=scientific,
2752
decimal_comma=decimal_comma,
2753
include_header=True,
2754
)
2755
buf.seek(0)
2756
out = pl.scan_csv(
2757
buf, decimal_comma=decimal_comma, separator=separator, schema=df.schema
2758
).collect()
2759
assert_frame_equal(df, out)
2760
2761
2762
def test_write_csv_large_number_autoformat_decimal_comma() -> None:
2763
df = pl.DataFrame(
2764
{
2765
"a": [12345678901234567890.123457890],
2766
"b": [1_000_000_000_000_000_000_000_000.0],
2767
}
2768
)
2769
2770
buf = io.BytesIO()
2771
df.write_csv(
2772
buf,
2773
decimal_comma=True,
2774
include_header=False,
2775
)
2776
buf.seek(0)
2777
expected = b'"1,2345678901234567e19","1e24"\n' # note, excessive quoting when fractional is all-zero, ok to relax
2778
assert buf.read() == expected
2779
2780
2781
def test_stop_split_fields_simd_23651() -> None:
2782
csv = """C,NEMP.WORLD,DAILY,AEMO,PUBLIC,2025/05/29,04:05:04,0000000465336084,,0000000465336084
2783
I,DISPATCH,CASESOLUTION,1,SETTLEMENTDATE,RUNNO,INTERVENTION,CASESUBTYPE,SOLUTIONSTATUS,SPDVERSION,NONPHYSICALLOSSES,TOTALOBJECTIVE,TOTALAREAGENVIOLATION,TOTALINTERCONNECTORVIOLATION,TOTALGENERICVIOLATION,TOTALRAMPRATEVIOLATION,TOTALUNITMWCAPACITYVIOLATION,TOTAL5MINVIOLATION,TOTALREGVIOLATION,TOTAL6SECVIOLATION,TOTAL60SECVIOLATION,TOTALASPROFILEVIOLATION,TOTALFASTSTARTVIOLATION,TOTALENERGYOFFERVIOLATION,LASTCHANGED
2784
D,DISPATCH,CASESOLUTION,1,"2025/05/28 04:05:00",1,0,,0,,0,-60421745.3380,0,0,0,0,0,,,,,0,0,0,"2025/05/28 04:00:04"
2785
D,DISPATCH,CASESOLUTION,1,"2025/05/28 04:10:00",1,0,,0,,0,-60871813.2780,0,0,0,0,0,,,,,0,0,0,"2025/05/28 04:05:04"
2786
D,DISPATCH,CASESOLUTION,1,"2025/05/28 04:15:00",1,0,,1,,0,-61228162.2270,0,0,0,0,0,,,,,0,0,0,"2025/05/28 04:10:03"
2787
D,DISPATCH,CASESOLUTION,1,"2025/05/28 04:20:00",1,0,,1,,0,-60901926.5760,0,0,0,0,0,,,,,0,0,0,"2025/05/28 04:15:03"
2788
D,DISPATCH,CASESOLUTION,1,"""
2789
buf = io.StringIO(csv)
2790
2791
schema = {f"column_{i + 1}": pl.String for i in range(27)}
2792
2793
buf = io.StringIO(csv)
2794
df = pl.read_csv(buf, truncate_ragged_lines=True, has_header=False, schema=schema)
2795
assert df.shape == (7, 27)
2796
assert df["column_26"].null_count() == 7
2797
2798
2799
def test_read_csv_decimal_header_only_200008() -> None:
2800
csv = "a,b"
2801
2802
df = pl.read_csv(csv.encode(), schema={"a": pl.Decimal(scale=2), "b": pl.String})
2803
assert df.dtypes == [pl.Decimal(scale=2), pl.String]
2804
2805
2806
@pytest.mark.parametrize(
2807
"dt",
2808
[
2809
pl.Enum(["a"]),
2810
pl.Categorical(),
2811
],
2812
)
2813
def test_write_csv_categorical_23939(dt: pl.DataType) -> None:
2814
n_rows = pl.thread_pool_size() * 1024 + 1
2815
df = pl.DataFrame(
2816
{
2817
"b": pl.Series(["a"] * n_rows, dtype=dt),
2818
}
2819
)
2820
expected = "b\n" + "a\n" * n_rows
2821
assert df.write_csv() == expected
2822
2823