from __future__ import annotations
import gzip
import io
import os
import sys
import textwrap
import zlib
from datetime import date, datetime, time, timedelta, timezone
from decimal import Decimal as D
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, TypedDict
import numpy as np
import pyarrow as pa
import pytest
import zstandard
import polars as pl
from polars._utils.various import normalize_filepath
from polars.exceptions import ComputeError, InvalidOperationError, NoDataError
from polars.io.csv import BatchedCsvReader
from polars.testing import assert_frame_equal, assert_series_equal
from tests.conftest import PlMonkeyPatch
if TYPE_CHECKING:
from pathlib import Path
from typing import Any
from polars._typing import CsvQuoteStyle, TimeUnit
@pytest.fixture
def foods_file_path(io_files_path: Path) -> Path:
return io_files_path / "foods1.csv"
@pytest.fixture(params=["chunk-size-default", "chunk-size-7"])
def chunk_override(request: Any, plmonkeypatch: PlMonkeyPatch) -> None:
env_var_name = "POLARS_FORCE_CSV_INFER_CHUNK_SIZE"
if request.param == "chunk-size-default":
plmonkeypatch.delenv(env_var_name, raising=False)
elif request.param == "chunk-size-7":
plmonkeypatch.setenv(env_var_name, "7")
else:
pytest.fail("unreachable")
def test_quoted_date(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
a,b
"2022-01-01",1
"2022-01-02",2
"""
)
result = pl.read_csv(csv.encode(), try_parse_dates=True)
expected = pl.DataFrame({"a": [date(2022, 1, 1), date(2022, 1, 2)], "b": [1, 2]})
assert_frame_equal(result, expected)
def test_date_pattern_with_datetime_override_10826(chunk_override: None) -> None:
result = pl.read_csv(
source=io.StringIO("col\n2023-01-01\n2023-02-01\n2023-03-01"),
schema_overrides={"col": pl.Datetime},
)
expected = pl.Series(
"col", [datetime(2023, 1, 1), datetime(2023, 2, 1), datetime(2023, 3, 1)]
).to_frame()
assert_frame_equal(result, expected)
result = pl.read_csv(
source=io.StringIO("col\n2023-01-01T01:02:03\n2023-02-01\n2023-03-01"),
schema_overrides={"col": pl.Datetime},
)
expected = pl.Series(
"col",
[datetime(2023, 1, 1, 1, 2, 3), datetime(2023, 2, 1), datetime(2023, 3, 1)],
).to_frame()
assert_frame_equal(result, expected)
def test_to_from_buffer(chunk_override: None, df_no_lists: pl.DataFrame) -> None:
df = df_no_lists
buf = io.BytesIO()
df.write_csv(buf)
buf.seek(0)
read_df = pl.read_csv(buf, try_parse_dates=True)
read_df = read_df.with_columns(
pl.col("cat").cast(pl.Categorical),
pl.col("enum").cast(pl.Enum(["foo", "ham", "bar"])),
pl.col("time").cast(pl.Time),
)
assert_frame_equal(df, read_df, categorical_as_str=True)
with pytest.raises(AssertionError):
assert_frame_equal(df.select("time", "cat"), read_df, categorical_as_str=True)
@pytest.mark.write_disk
def test_to_from_file(
chunk_override: None, df_no_lists: pl.DataFrame, tmp_path: Path
) -> None:
tmp_path.mkdir(exist_ok=True)
df = df_no_lists.drop("strings_nulls")
file_path = tmp_path / "small.csv"
df.write_csv(file_path)
read_df = pl.read_csv(file_path, try_parse_dates=True)
read_df = read_df.with_columns(
pl.col("cat").cast(pl.Categorical),
pl.col("enum").cast(pl.Enum(["foo", "ham", "bar"])),
pl.col("time").cast(pl.Time),
)
assert_frame_equal(df, read_df, categorical_as_str=True)
def test_normalize_filepath(chunk_override: None, io_files_path: Path) -> None:
with pytest.raises(IsADirectoryError):
normalize_filepath(io_files_path)
assert normalize_filepath(str(io_files_path), check_not_directory=False) == str(
io_files_path
)
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
def test_infer_schema_false(chunk_override: None, read_fn: str) -> None:
csv = textwrap.dedent(
"""\
a,b,c
1,2,3
1,2,3
"""
)
df = getattr(pl, read_fn)(io.StringIO(csv), infer_schema=False).lazy().collect()
assert df.dtypes == [pl.String, pl.String, pl.String]
@pytest.mark.may_fail_auto_streaming
def test_csv_null_values(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
a,b,c
na,b,c
a,na,c
"""
)
f = io.StringIO(csv)
df = pl.read_csv(f, null_values="na")
assert df.rows() == [(None, "b", "c"), ("a", None, "c")]
with pytest.raises(NoDataError, match=r"empty"):
pl.read_csv(f)
assert_frame_equal(pl.read_csv(f, raise_if_empty=False), pl.DataFrame())
out = io.BytesIO()
df.write_csv(out, null_value="na")
assert csv == out.getvalue().decode("ascii")
csv = textwrap.dedent(
"""\
a,b,c
na,b,c
a,n/a,c
"""
)
f = io.StringIO(csv)
df = pl.read_csv(f, null_values=["na", "n/a"])
assert df.rows() == [(None, "b", "c"), ("a", None, "c")]
csv = textwrap.dedent(
r"""
a,b,c
na,b,c
a,\N,c
,b,
"""
)
f = io.StringIO(csv)
df = pl.read_csv(f, null_values={"a": "na", "b": r"\N"})
assert df.rows() == [(None, "b", "c"), ("a", None, "c"), (None, "b", None)]
def test_csv_missing_utf8_is_empty_string(chunk_override: None) -> None:
csv = textwrap.dedent(
r"""
a,b,c
na,b,c
a,\N,c
,b,
"""
)
f = io.StringIO(csv)
df = pl.read_csv(
f,
null_values={"a": "na", "b": r"\N"},
missing_utf8_is_empty_string=True,
)
assert df.rows() == [(None, "b", "c"), ("a", None, "c"), ("", "b", "")]
csv = textwrap.dedent(
r"""
a,b,c,d,e,f,g
na,,,,\N,,
a,\N,c,,,,g
,,,
,,,na,,,
"""
)
f = io.StringIO(csv)
df = pl.read_csv(f, null_values=["na", r"\N"])
assert df.rows() == [
(None, None, None, None, None, None, None),
("a", None, "c", None, None, None, "g"),
(None, None, None, None, None, None, None),
(None, None, None, None, None, None, None),
]
f.seek(0)
df = pl.read_csv(
f,
null_values=["na", r"\N"],
missing_utf8_is_empty_string=True,
)
assert df.rows() == [
(None, "", "", "", None, "", ""),
("a", None, "c", "", "", "", "g"),
("", "", "", "", "", "", ""),
("", "", "", None, "", "", ""),
]
def test_csv_int_types(chunk_override: None) -> None:
f = io.StringIO(
"u8,i8,u16,i16,u32,i32,u64,i64,u128,i128\n"
"0,0,0,0,0,0,0,0,0,0\n"
"0,-128,0,-32768,0,-2147483648,0,-9223372036854775808,0,-170141183460469231731687303715884105728\n"
"255,127,65535,32767,4294967295,2147483647,18446744073709551615,9223372036854775807,340282366920938463463374607431768211455,170141183460469231731687303715884105727\n"
"01,01,01,01,01,01,01,01,01,01\n"
"01,-01,01,-01,01,-01,01,-01,01,-01\n"
)
df = pl.read_csv(
f,
schema={
"u8": pl.UInt8,
"i8": pl.Int8,
"u16": pl.UInt16,
"i16": pl.Int16,
"u32": pl.UInt32,
"i32": pl.Int32,
"u64": pl.UInt64,
"i64": pl.Int64,
"u128": pl.UInt128,
"i128": pl.Int128,
},
)
assert_frame_equal(
df,
pl.DataFrame(
{
"u8": pl.Series([0, 0, 255, 1, 1], dtype=pl.UInt8),
"i8": pl.Series([0, -128, 127, 1, -1], dtype=pl.Int8),
"u16": pl.Series([0, 0, 65535, 1, 1], dtype=pl.UInt16),
"i16": pl.Series([0, -32768, 32767, 1, -1], dtype=pl.Int16),
"u32": pl.Series([0, 0, 4294967295, 1, 1], dtype=pl.UInt32),
"i32": pl.Series([0, -2147483648, 2147483647, 1, -1], dtype=pl.Int32),
"u64": pl.Series([0, 0, 18446744073709551615, 1, 1], dtype=pl.UInt64),
"i64": pl.Series(
[0, -9223372036854775808, 9223372036854775807, 1, -1],
dtype=pl.Int64,
),
"u128": pl.Series(
[
0,
0,
340282366920938463463374607431768211455,
1,
1,
],
dtype=pl.UInt128,
),
"i128": pl.Series(
[
0,
-170141183460469231731687303715884105728,
170141183460469231731687303715884105727,
1,
-1,
],
dtype=pl.Int128,
),
}
),
)
def test_csv_float_parsing(chunk_override: None) -> None:
lines_with_floats = [
"123.86,+123.86,-123.86\n",
".987,+.987,-.987\n",
"5.,+5.,-5.\n",
"inf,+inf,-inf\n",
"NaN,+NaN,-NaN\n",
]
for line_with_floats in lines_with_floats:
f = io.StringIO(line_with_floats)
df = pl.read_csv(f, has_header=False, new_columns=["a", "b", "c"])
assert df.dtypes == [pl.Float64, pl.Float64, pl.Float64]
lines_with_scientific_numbers = [
"1e27,1E65,1e-28,1E-9\n",
"+1e27,+1E65,+1e-28,+1E-9\n",
"1e+27,1E+65,1e-28,1E-9\n",
"+1e+27,+1E+65,+1e-28,+1E-9\n",
"-1e+27,-1E+65,-1e-28,-1E-9\n",
]
for line_with_scientific_numbers in lines_with_scientific_numbers:
f = io.StringIO(line_with_scientific_numbers)
df = pl.read_csv(f, has_header=False, new_columns=["a", "b", "c", "d"])
assert df.dtypes == [pl.Float64, pl.Float64, pl.Float64, pl.Float64]
def test_datetime_parsing(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
timestamp,open,high
2021-01-01 00:00:00,0.00305500,0.00306000
2021-01-01 00:15:00,0.00298800,0.00300400
2021-01-01 00:30:00,0.00298300,0.00300100
2021-01-01 00:45:00,0.00299400,0.00304000
"""
)
f = io.StringIO(csv)
df = pl.read_csv(f, try_parse_dates=True)
assert df.dtypes == [pl.Datetime, pl.Float64, pl.Float64]
def test_datetime_parsing_default_formats(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
ts_dmy,ts_dmy_f,ts_dmy_p
01/01/2021 00:00:00,31-01-2021T00:00:00.123,31-01-2021 11:00
01/01/2021 00:15:00,31-01-2021T00:15:00.123,31-01-2021 01:00
01/01/2021 00:30:00,31-01-2021T00:30:00.123,31-01-2021 01:15
01/01/2021 00:45:00,31-01-2021T00:45:00.123,31-01-2021 01:30
"""
)
f = io.StringIO(csv)
df = pl.read_csv(f, try_parse_dates=True)
assert df.dtypes == [pl.Datetime, pl.Datetime, pl.Datetime]
@pytest.mark.may_fail_auto_streaming
def test_partial_schema_overrides(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
a,b,c
1,2,3
1,2,3
"""
)
f = io.StringIO(csv)
df = pl.read_csv(f, schema_overrides=[pl.String])
assert df.dtypes == [pl.String, pl.Int64, pl.Int64]
@pytest.mark.may_fail_auto_streaming
def test_schema_overrides_with_column_name_selection(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
a,b,c,d
1,2,3,4
1,2,3,4
"""
)
f = io.StringIO(csv)
df = pl.read_csv(f, columns=["c", "b", "d"], schema_overrides=[pl.Int32, pl.String])
assert df.dtypes == [pl.String, pl.Int32, pl.Int64]
@pytest.mark.may_fail_auto_streaming
def test_schema_overrides_with_column_idx_selection(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
a,b,c,d
1,2,3,4
1,2,3,4
"""
)
f = io.StringIO(csv)
df = pl.read_csv(f, columns=[2, 1, 3], schema_overrides=[pl.Int32, pl.String])
assert df.dtypes == [pl.String, pl.Int32, pl.String]
assert df.columns == ["b", "c", "d"]
def test_partial_column_rename(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
a,b,c
1,2,3
1,2,3
"""
)
f = io.StringIO(csv)
for use in [True, False]:
f.seek(0)
df = pl.read_csv(f, new_columns=["foo"], use_pyarrow=use)
assert df.columns == ["foo", "b", "c"]
@pytest.mark.parametrize(
("col_input", "col_out"),
[([0, 1], ["a", "b"]), ([0, 2], ["a", "c"]), (["b"], ["b"])],
)
def test_read_csv_columns_argument(
chunk_override: None, col_input: list[int] | list[str], col_out: list[str]
) -> None:
csv = textwrap.dedent(
"""\
a,b,c
1,2,3
1,2,3
"""
)
f = io.StringIO(csv)
df = pl.read_csv(f, columns=col_input)
assert df.shape[0] == 2
assert df.columns == col_out
@pytest.mark.may_fail_cloud
@pytest.mark.may_fail_auto_streaming
def test_read_csv_buffer_ownership(chunk_override: None) -> None:
bts = b"\xf0\x9f\x98\x80,5.55,333\n\xf0\x9f\x98\x86,-5.0,666"
buf = io.BytesIO(bts)
df = pl.read_csv(
buf,
has_header=False,
new_columns=["emoji", "flt", "int"],
)
assert df.shape == (2, 3)
assert df.rows() == [("😀", 5.55, 333), ("😆", -5.0, 666)]
assert not buf.closed
assert buf.read() == bts
@pytest.mark.may_fail_auto_streaming
@pytest.mark.write_disk
def test_read_csv_encoding(chunk_override: None, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)
bts = (
b"Value1,Value2,Value3,Value4,Region\n"
b"-30,7.5,2578,1,\xa5x\xa5_\n-32,7.97,3006,1,\xa5x\xa4\xa4\n"
b"-31,8,3242,2,\xb7s\xa6\xcb\n-33,7.97,3300,3,\xb0\xaa\xb6\xaf\n"
b"-20,7.91,3384,4,\xac\xfc\xb0\xea\n"
)
file_path = tmp_path / "encoding.csv"
file_path.write_bytes(bts)
file_str = str(file_path)
bytesio = io.BytesIO(bts)
for use_pyarrow in (False, True):
bytesio.seek(0)
for file in [file_path, file_str, bts, bytesio]:
assert_series_equal(
pl.read_csv(
file,
encoding="big5",
use_pyarrow=use_pyarrow,
).get_column("Region"),
pl.Series("Region", ["台北", "台中", "新竹", "高雄", "美國"]),
)
@pytest.mark.may_fail_auto_streaming
@pytest.mark.write_disk
def test_read_csv_encoding_lossy(chunk_override: None, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)
bts = (
b"\xc8\xec\xff,\xc2\xee\xe7\xf0\xe0\xf1\xf2,\xc3\xee\xf0\xee\xe4\n"
b"\xc8\xe2\xe0\xed,25,\xcc\xee\xf1\xea\xe2\xe0\n"
b"\xce\xeb\xfc\xe3\xe0,30,\xd1\xe0\xed\xea\xf2-\x98\xcf\xe5\xf2\xe5\xf0\xe1\xf3\xf0\xe3\n"
)
file_path = tmp_path / "encoding_lossy.csv"
file_path.write_bytes(bts)
file_str = str(file_path)
bytesio = io.BytesIO(bts)
bytesio.seek(0)
for file in [file_path, file_str, bts, bytesio]:
assert_series_equal(
pl.read_csv(
file,
encoding="windows-1251-lossy",
use_pyarrow=False,
).get_column("Город"),
pl.Series("Город", ["Москва", "Санкт-�Петербург"]),
)
@pytest.mark.may_fail_auto_streaming
def test_column_rename_and_schema_overrides(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
a,b,c
1,2,3
1,2,3
"""
)
f = io.StringIO(csv)
df = pl.read_csv(
f,
new_columns=["A", "B", "C"],
schema_overrides={"A": pl.String, "B": pl.Int64, "C": pl.Float32},
)
assert df.dtypes == [pl.String, pl.Int64, pl.Float32]
f = io.StringIO(csv)
df = pl.read_csv(
f,
columns=["a", "c"],
new_columns=["A", "C"],
schema_overrides={"A": pl.String, "C": pl.Float32},
)
assert df.dtypes == [pl.String, pl.Float32]
csv = textwrap.dedent(
"""\
1,2,3
1,2,3
"""
)
f = io.StringIO(csv)
df = pl.read_csv(
f,
new_columns=["A", "B", "C"],
schema_overrides={"A": pl.String, "C": pl.Float32},
has_header=False,
)
assert df.dtypes == [pl.String, pl.Int64, pl.Float32]
def test_compressed_csv(
chunk_override: None, io_files_path: Path, plmonkeypatch: PlMonkeyPatch
) -> None:
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "0")
csv = textwrap.dedent(
"""\
a,b,c
1,a,1.0
2,b,2.0
3,c,3.0
"""
)
fout = io.BytesIO()
with gzip.GzipFile(fileobj=fout, mode="w") as f:
f.write(csv.encode())
csv_bytes = fout.getvalue()
out = pl.read_csv(csv_bytes)
expected = pl.DataFrame(
{"a": [1, 2, 3], "b": ["a", "b", "c"], "c": [1.0, 2.0, 3.0]}
)
assert_frame_equal(out, expected)
csv_file = io_files_path / "gzipped.csv.gz"
out = pl.read_csv(str(csv_file), truncate_ragged_lines=True)
assert_frame_equal(out, expected)
schema = {"a": pl.Int64, "b": pl.Utf8, "c": pl.Float64}
out = pl.read_csv(str(csv_file), schema=schema, truncate_ragged_lines=True)
assert_frame_equal(out, expected)
out = pl.read_csv(csv_bytes, columns=["a", "b"])
expected = pl.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
assert_frame_equal(out, expected)
csv_bytes = zlib.compress(csv.encode())
out = pl.read_csv(csv_bytes)
expected = pl.DataFrame(
{"a": [1, 2, 3], "b": ["a", "b", "c"], "c": [1.0, 2.0, 3.0]}
)
assert_frame_equal(out, expected)
for level in range(10):
csv_bytes = zlib.compress(csv.encode(), level=level)
out = pl.read_csv(csv_bytes)
expected = pl.DataFrame(
{"a": [1, 2, 3], "b": ["a", "b", "c"], "c": [1.0, 2.0, 3.0]}
)
assert_frame_equal(out, expected)
csv_bytes = zstandard.compress(csv.encode())
out = pl.read_csv(csv_bytes)
assert_frame_equal(out, expected)
csv_file = io_files_path / "zstd_compressed.csv.zst"
out = pl.scan_csv(csv_file, truncate_ragged_lines=True).collect()
assert_frame_equal(out, expected)
out = pl.read_csv(str(csv_file), truncate_ragged_lines=True)
assert_frame_equal(out, expected)
f2 = io.BytesIO(b"a,b\n1,2\n")
out2 = pl.read_csv(f2)
expected = pl.DataFrame({"a": [1], "b": [2]})
assert_frame_equal(out2, expected)
def test_partial_decompression(chunk_override: None, foods_file_path: Path) -> None:
f_out = io.BytesIO()
with gzip.GzipFile(fileobj=f_out, mode="w") as f:
f.write(foods_file_path.read_bytes())
csv_bytes = f_out.getvalue()
for n_rows in [1, 5, 26]:
out = pl.read_csv(csv_bytes, n_rows=n_rows)
assert out.shape == (n_rows, 4)
csv_bytes = zstandard.compress(foods_file_path.read_bytes())
for n_rows in [1, 5, 26]:
out = pl.read_csv(csv_bytes, n_rows=n_rows)
assert out.shape == (n_rows, 4)
def test_empty_bytes(chunk_override: None) -> None:
b = b""
with pytest.raises(NoDataError):
pl.read_csv(b)
df = pl.read_csv(b, raise_if_empty=False)
assert_frame_equal(df, pl.DataFrame())
def test_empty_line_with_single_column(chunk_override: None) -> None:
df = pl.read_csv(
b"a\n\nb\n",
new_columns=["A"],
has_header=False,
comment_prefix="#",
use_pyarrow=False,
)
expected = pl.DataFrame({"A": ["a", None, "b"]})
assert_frame_equal(df, expected)
def test_empty_line_with_multiple_columns(chunk_override: None) -> None:
df = pl.read_csv(
b"a,b\n\nc,d\n",
new_columns=["A", "B"],
has_header=False,
comment_prefix="#",
use_pyarrow=False,
)
expected = pl.DataFrame({"A": ["a", None, "c"], "B": ["b", None, "d"]})
assert_frame_equal(df, expected)
def test_preserve_whitespace_at_line_start(chunk_override: None) -> None:
df = pl.read_csv(
b" a\n b \n c\nd",
new_columns=["A"],
has_header=False,
use_pyarrow=False,
)
expected = pl.DataFrame({"A": [" a", " b ", " c", "d"]})
assert_frame_equal(df, expected)
def test_csv_multi_char_comment(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
#a,b
##c,d
"""
)
f = io.StringIO(csv)
df = pl.read_csv(
f,
new_columns=["A", "B"],
has_header=False,
comment_prefix="##",
use_pyarrow=False,
)
expected = pl.DataFrame({"A": ["#a"], "B": ["b"]})
assert_frame_equal(df, expected)
for skip_rows, b in (
(1, io.BytesIO(b"<filemeta>\n#!skip\n#!skip\nCol1\tCol2\n")),
(0, io.BytesIO(b"\n#!skip\n#!skip\nCol1\tCol2")),
(0, io.BytesIO(b"#!skip\nCol1\tCol2\n#!skip\n")),
(0, io.BytesIO(b"#!skip\nCol1\tCol2")),
):
df = pl.read_csv(b, separator="\t", comment_prefix="#!", skip_rows=skip_rows)
assert_frame_equal(df, pl.DataFrame(schema=["Col1", "Col2"]).cast(pl.Utf8))
def test_csv_quote_char(chunk_override: None) -> None:
expected = pl.DataFrame(
[
pl.Series("linenum", [1, 2, 3, 4, 5, 6, 7, 8, 9]),
pl.Series(
"last_name",
[
"Jagger",
'O"Brian',
"Richards",
'L"Etoile',
"Watts",
"Smith",
'"Wyman"',
"Woods",
'J"o"ne"s',
],
),
pl.Series(
"first_name",
[
"Mick",
'"Mary"',
"Keith",
"Bennet",
"Charlie",
'D"Shawn',
"Bill",
"Ron",
"Brian",
],
),
]
)
rolling_stones = textwrap.dedent(
"""\
linenum,last_name,first_name
1,Jagger,Mick
2,O"Brian,"Mary"
3,Richards,Keith
4,L"Etoile,Bennet
5,Watts,Charlie
6,Smith,D"Shawn
7,"Wyman",Bill
8,Woods,Ron
9,J"o"ne"s,Brian
"""
)
for use_pyarrow in (False, True):
out = pl.read_csv(
rolling_stones.encode(), quote_char=None, use_pyarrow=use_pyarrow
)
assert out.shape == (9, 3)
assert_frame_equal(out, expected)
df = pl.DataFrame({"x": ["", "0*0", "xyz"]})
csv_data = df.write_csv(quote_char="*")
assert csv_data == "x\n**\n*0**0*\nxyz\n"
assert_frame_equal(df, pl.read_csv(io.StringIO(csv_data), quote_char="*"))
def test_csv_empty_quotes_char_1622(chunk_override: None) -> None:
pl.read_csv(b"a,b,c,d\nA1,B1,C1,1\nA2,B2,C2,2\n", quote_char="")
def test_ignore_try_parse_dates(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
a,b,c
1,i,16200126
2,j,16250130
3,k,17220012
4,l,17290009
"""
).encode()
headers = ["a", "b", "c"]
dtypes: dict[str, type[pl.DataType]] = dict.fromkeys(
headers, pl.String
)
df = pl.read_csv(csv, columns=headers, schema_overrides=dtypes)
assert df.dtypes == [pl.String, pl.String, pl.String]
def test_csv_date_handling(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
date
1745-04-02
1742-03-21
1743-06-16
1730-07-22
1739-03-16
"""
)
expected = pl.DataFrame(
{
"date": [
date(1745, 4, 2),
date(1742, 3, 21),
date(1743, 6, 16),
date(1730, 7, 22),
None,
date(1739, 3, 16),
]
}
)
out = pl.read_csv(csv.encode(), try_parse_dates=True)
assert_frame_equal(out, expected)
dtypes = {"date": pl.Date}
out = pl.read_csv(csv.encode(), schema_overrides=dtypes)
assert_frame_equal(out, expected)
def test_csv_no_date_dtype_because_string(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
date
2024-01-01
2024-01-02
hello
"""
)
out = pl.read_csv(csv.encode(), try_parse_dates=True)
assert out.dtypes == [pl.String]
def test_csv_infer_date_dtype(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
date
2024-01-01
"2024-01-02"
2024-01-04
"""
)
out = pl.read_csv(csv.encode(), try_parse_dates=True)
expected = pl.DataFrame(
{
"date": [
date(2024, 1, 1),
date(2024, 1, 2),
None,
date(2024, 1, 4),
]
}
)
assert_frame_equal(out, expected)
def test_csv_date_dtype_ignore_errors(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
date
hello
2024-01-02
world
!!
"""
)
out = pl.read_csv(
csv.encode(), ignore_errors=True, schema_overrides={"date": pl.Date}
)
expected = pl.DataFrame(
{
"date": [
None,
date(2024, 1, 2),
None,
None,
]
}
)
assert_frame_equal(out, expected)
@pytest.mark.may_fail_auto_streaming
def test_csv_globbing(chunk_override: None, io_files_path: Path) -> None:
path = io_files_path / "foods*.csv"
df = pl.read_csv(path)
assert df.shape == (135, 4)
with PlMonkeyPatch.context() as mp:
mp.setenv("POLARS_FORCE_ASYNC", "0")
with pytest.raises(ValueError):
_ = pl.read_csv(path, columns=[0, 1])
df = pl.read_csv(path, columns=["category", "sugars_g"])
assert df.shape == (135, 2)
assert df.row(-1) == ("seafood", 1)
assert df.row(0) == ("vegetables", 2)
with PlMonkeyPatch.context() as mp:
mp.setenv("POLARS_FORCE_ASYNC", "0")
with pytest.raises(ValueError):
_ = pl.read_csv(
path, schema_overrides=[pl.String, pl.Int64, pl.Int64, pl.Int64]
)
dtypes = {
"category": pl.String,
"calories": pl.Int32,
"fats_g": pl.Float32,
"sugars_g": pl.Int32,
}
df = pl.read_csv(path, schema_overrides=dtypes)
assert df.dtypes == list(dtypes.values())
def test_csv_schema_offset(chunk_override: None, foods_file_path: Path) -> None:
csv = textwrap.dedent(
"""\
metadata
line
col1,col2,col3
alpha,beta,gamma
1,2.0,"A"
3,4.0,"B"
5,6.0,"C"
"""
).encode()
df = pl.read_csv(csv, skip_rows=3)
assert df.columns == ["alpha", "beta", "gamma"]
assert df.shape == (3, 3)
assert df.dtypes == [pl.Int64, pl.Float64, pl.String]
df = pl.read_csv(csv, skip_rows=2, skip_rows_after_header=1)
assert df.columns == ["col1", "col2", "col3"]
assert df.shape == (3, 3)
assert df.dtypes == [pl.Int64, pl.Float64, pl.String]
df = pl.scan_csv(foods_file_path, skip_rows=4).collect()
assert df.columns == ["fruit", "60", "0", "11"]
assert df.shape == (23, 4)
assert df.dtypes == [pl.String, pl.Int64, pl.Float64, pl.Int64]
df = pl.scan_csv(foods_file_path, skip_rows_after_header=24).collect()
assert df.columns == ["category", "calories", "fats_g", "sugars_g"]
assert df.shape == (3, 4)
assert df.dtypes == [pl.String, pl.Int64, pl.Int64, pl.Int64]
df = pl.scan_csv(
foods_file_path, skip_rows_after_header=24, infer_schema_length=1
).collect()
assert df.columns == ["category", "calories", "fats_g", "sugars_g"]
assert df.shape == (3, 4)
assert df.dtypes == [pl.String, pl.Int64, pl.Int64, pl.Int64]
def test_empty_string_missing_round_trip(chunk_override: None) -> None:
df = pl.DataFrame({"varA": ["A", "", None], "varB": ["B", "", None]})
for null in (None, "NA", "NULL", r"\N"):
f = io.BytesIO()
df.write_csv(f, null_value=null)
f.seek(0)
df_read = pl.read_csv(f, null_values=null)
assert_frame_equal(df, df_read)
def test_write_csv_separator(chunk_override: None) -> None:
df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
f = io.BytesIO()
df.write_csv(f, separator="\t")
f.seek(0)
assert f.read() == b"a\tb\n1\t1\n2\t2\n3\t3\n"
f.seek(0)
assert_frame_equal(df, pl.read_csv(f, separator="\t"))
def test_write_csv_line_terminator(chunk_override: None) -> None:
df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
f = io.BytesIO()
df.write_csv(f, line_terminator="\r\n")
f.seek(0)
assert f.read() == b"a,b\r\n1,1\r\n2,2\r\n3,3\r\n"
f.seek(0)
assert_frame_equal(df, pl.read_csv(f, eol_char="\n"))
def test_escaped_null_values(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
"a","b","c"
"a","n/a","NA"
"None","2","3.0"
"""
)
f = io.StringIO(csv)
df = pl.read_csv(
f,
null_values={"a": "None", "b": "n/a", "c": "NA"},
schema_overrides={"a": pl.String, "b": pl.Int64, "c": pl.Float64},
)
assert df[1, "a"] is None
assert df[0, "b"] is None
assert df[0, "c"] is None
def test_quoting_round_trip(chunk_override: None) -> None:
f = io.BytesIO()
df = pl.DataFrame(
{
"a": [
"tab,separated,field",
"newline\nseparated\nfield",
'quote"separated"field',
]
}
)
df.write_csv(f)
f.seek(0)
read_df = pl.read_csv(f)
assert_frame_equal(read_df, df)
def test_csv_field_schema_inference_with_whitespace(chunk_override: None) -> None:
csv = """\
bool,bool-,-bool,float,float-,-float,int,int-,-int
true,true , true,1.2,1.2 , 1.2,1,1 , 1
"""
df = pl.read_csv(io.StringIO(csv), has_header=True)
expected = pl.DataFrame(
{
"bool": [True],
"bool-": ["true "],
"-bool": [" true"],
"float": [1.2],
"float-": ["1.2 "],
"-float": [" 1.2"],
"int": [1],
"int-": ["1 "],
"-int": [" 1"],
}
)
assert_frame_equal(df, expected)
def test_fallback_chrono_parser(chunk_override: None) -> None:
data = textwrap.dedent(
"""\
date_1,date_2
2021-01-01,2021-1-1
2021-02-02,2021-2-2
2021-10-10,2021-10-10
"""
)
df = pl.read_csv(data.encode(), try_parse_dates=True)
assert df.null_count().row(0) == (0, 0)
def test_tz_aware_try_parse_dates(chunk_override: None) -> None:
data = (
"a,b,c,d\n"
"2020-01-01T02:00:00+01:00,2021-04-28T00:00:00+02:00,2021-03-28T00:00:00+01:00,2\n"
"2020-01-01T03:00:00+01:00,2021-04-29T00:00:00+02:00,2021-03-29T00:00:00+02:00,3\n"
)
result = pl.read_csv(io.StringIO(data), try_parse_dates=True)
expected = pl.DataFrame(
{
"a": [
datetime(2020, 1, 1, 1, tzinfo=timezone.utc),
datetime(2020, 1, 1, 2, tzinfo=timezone.utc),
],
"b": [
datetime(2021, 4, 27, 22, tzinfo=timezone.utc),
datetime(2021, 4, 28, 22, tzinfo=timezone.utc),
],
"c": [
datetime(2021, 3, 27, 23, tzinfo=timezone.utc),
datetime(2021, 3, 28, 22, tzinfo=timezone.utc),
],
"d": [2, 3],
}
)
assert_frame_equal(result, expected)
@pytest.mark.parametrize("try_parse_dates", [True, False])
@pytest.mark.parametrize("time_unit", ["ms", "us", "ns"])
def test_csv_overwrite_datetime_dtype(
chunk_override: None, try_parse_dates: bool, time_unit: TimeUnit
) -> None:
data = """\
a
2020-1-1T00:00:00.123456789
2020-1-2T00:00:00.987654321
2020-1-3T00:00:00.132547698
"""
result = pl.read_csv(
io.StringIO(data),
try_parse_dates=try_parse_dates,
schema_overrides={"a": pl.Datetime(time_unit)},
)
expected = pl.DataFrame(
{
"a": pl.Series(
[
"2020-01-01T00:00:00.123456789",
"2020-01-02T00:00:00.987654321",
"2020-01-03T00:00:00.132547698",
]
).str.to_datetime(time_unit=time_unit)
}
)
assert_frame_equal(result, expected)
def test_csv_string_escaping(chunk_override: None) -> None:
df = pl.DataFrame({"a": ["Free trip to A,B", '''Special rate "1.79"''']})
f = io.BytesIO()
df.write_csv(f)
f.seek(0)
df_read = pl.read_csv(f)
assert_frame_equal(df_read, df)
@pytest.mark.write_disk
def test_glob_csv(
chunk_override: None, df_no_lists: pl.DataFrame, tmp_path: Path
) -> None:
tmp_path.mkdir(exist_ok=True)
df = df_no_lists.drop("strings_nulls")
file_path = tmp_path / "small.csv"
df.write_csv(file_path)
path_glob = tmp_path / "small*.csv"
assert pl.scan_csv(path_glob).collect().shape == (3, 12)
assert pl.read_csv(path_glob).shape == (3, 12)
def test_csv_whitespace_separator_at_start_do_not_skip(chunk_override: None) -> None:
csv = "\t\t\t\t0\t1"
result = pl.read_csv(csv.encode(), separator="\t", has_header=False)
expected = {
"column_1": [None],
"column_2": [None],
"column_3": [None],
"column_4": [None],
"column_5": [0],
"column_6": [1],
}
assert result.to_dict(as_series=False) == expected
def test_csv_whitespace_separator_at_end_do_not_skip(chunk_override: None) -> None:
csv = "0\t1\t\t\t\t"
result = pl.read_csv(csv.encode(), separator="\t", has_header=False)
expected = {
"column_1": [0],
"column_2": [1],
"column_3": [None],
"column_4": [None],
"column_5": [None],
"column_6": [None],
}
assert result.to_dict(as_series=False) == expected
def test_csv_multiple_null_values(chunk_override: None) -> None:
df = pl.DataFrame(
{
"a": [1, 2, None, 4],
"b": ["2022-01-01", "__NA__", "", "NA"],
}
)
f = io.BytesIO()
df.write_csv(f)
f.seek(0)
df2 = pl.read_csv(f, null_values=["__NA__", "NA"])
expected = pl.DataFrame(
{
"a": [1, 2, None, 4],
"b": ["2022-01-01", None, "", None],
}
)
assert_frame_equal(df2, expected)
def test_different_eol_char(chunk_override: None) -> None:
csv = "a,1,10;b,2,20;c,3,30"
expected = pl.DataFrame(
{"column_1": ["a", "b", "c"], "column_2": [1, 2, 3], "column_3": [10, 20, 30]}
)
assert_frame_equal(
pl.read_csv(csv.encode(), eol_char=";", has_header=False), expected
)
def test_csv_write_escape_headers(chunk_override: None) -> None:
df0 = pl.DataFrame({"col,1": ["data,1"], 'col"2': ['data"2'], "col:3": ["data:3"]})
out = io.BytesIO()
df0.write_csv(out)
assert out.getvalue() == b'"col,1","col""2",col:3\n"data,1","data""2",data:3\n'
df1 = pl.DataFrame({"c,o,l,u,m,n": [123]})
out = io.BytesIO()
df1.write_csv(out)
out.seek(0)
df2 = pl.read_csv(out)
assert_frame_equal(df1, df2)
assert df2.schema == {"c,o,l,u,m,n": pl.Int64}
def test_csv_write_escape_newlines(chunk_override: None) -> None:
df = pl.DataFrame({"escape": ["n\nn"]})
f = io.BytesIO()
df.write_csv(f)
f.seek(0)
read_df = pl.read_csv(f)
assert_frame_equal(df, read_df)
def test_skip_new_line_embedded_lines(chunk_override: None) -> None:
csv = r"""a,b,c,d,e\n
1,2,3,"\n Test",\n
4,5,6,"Test A",\n
7,8,,"Test B \n",\n"""
for empty_string, missing_value in ((True, ""), (False, None)):
df = pl.read_csv(
csv.encode(),
skip_rows_after_header=1,
infer_schema_length=0,
missing_utf8_is_empty_string=empty_string,
)
assert df.to_dict(as_series=False) == {
"a": ["4", "7"],
"b": ["5", "8"],
"c": ["6", missing_value],
"d": ["Test A", "Test B \\n"],
"e\\n": ["\\n", "\\n"],
}
def test_csv_schema_overrides_bool(chunk_override: None) -> None:
csv = "a, b\n" + ",false\n" + ",false\n" + ",false"
df = pl.read_csv(
csv.encode(),
schema_overrides={"a": pl.Boolean, "b": pl.Boolean},
)
assert df.dtypes == [pl.Boolean, pl.Boolean]
@pytest.mark.parametrize(
("fmt", "expected"),
[
(None, "dt\n2022-01-02T00:00:00.000000\n"),
("%F %T%.3f", "dt\n2022-01-02 00:00:00.000\n"),
("%Y", "dt\n2022\n"),
("%m", "dt\n01\n"),
("%m$%d", "dt\n01$02\n"),
("%R", "dt\n00:00\n"),
],
)
def test_datetime_format(chunk_override: None, fmt: str, expected: str) -> None:
df = pl.DataFrame({"dt": [datetime(2022, 1, 2)]})
csv = df.write_csv(datetime_format=fmt)
assert csv == expected
@pytest.mark.parametrize(
("fmt", "expected"),
[
(None, "dt\n2022-01-02T00:00:00.000000+0000\n"),
("%F %T%.3f%z", "dt\n2022-01-02 00:00:00.000+0000\n"),
("%Y%z", "dt\n2022+0000\n"),
("%m%z", "dt\n01+0000\n"),
("%m$%d%z", "dt\n01$02+0000\n"),
("%R%z", "dt\n00:00+0000\n"),
],
)
@pytest.mark.parametrize("tzinfo", [timezone.utc, timezone(timedelta(hours=0))])
def test_datetime_format_tz_aware(
chunk_override: None, fmt: str, expected: str, tzinfo: timezone
) -> None:
df = pl.DataFrame({"dt": [datetime(2022, 1, 2, tzinfo=tzinfo)]})
csv = df.write_csv(datetime_format=fmt)
assert csv == expected
@pytest.mark.parametrize(
("tu1", "tu2", "expected"),
[
(
"ns",
"ns",
"x,y\n2022-09-04T10:30:45.123000000,2022-09-04T10:30:45.123000000\n",
),
(
"ns",
"us",
"x,y\n2022-09-04T10:30:45.123000000,2022-09-04T10:30:45.123000\n",
),
(
"ns",
"ms",
"x,y\n2022-09-04T10:30:45.123000000,2022-09-04T10:30:45.123\n",
),
("us", "us", "x,y\n2022-09-04T10:30:45.123000,2022-09-04T10:30:45.123000\n"),
("us", "ms", "x,y\n2022-09-04T10:30:45.123000,2022-09-04T10:30:45.123\n"),
("ms", "us", "x,y\n2022-09-04T10:30:45.123,2022-09-04T10:30:45.123000\n"),
("ms", "ms", "x,y\n2022-09-04T10:30:45.123,2022-09-04T10:30:45.123\n"),
],
)
def test_datetime_format_inferred_precision(
chunk_override: None, tu1: TimeUnit, tu2: TimeUnit, expected: str
) -> None:
df = pl.DataFrame(
data={
"x": [datetime(2022, 9, 4, 10, 30, 45, 123000)],
"y": [datetime(2022, 9, 4, 10, 30, 45, 123000)],
},
schema=[
("x", pl.Datetime(tu1)),
("y", pl.Datetime(tu2)),
],
)
assert expected == df.write_csv()
def test_inferred_datetime_format_mixed(chunk_override: None) -> None:
ts = pl.datetime_range(datetime(2000, 1, 1), datetime(2000, 1, 2), eager=True)
df = pl.DataFrame({"naive": ts, "aware": ts.dt.replace_time_zone("UTC")})
result = df.write_csv()
expected = (
"naive,aware\n"
"2000-01-01T00:00:00.000000,2000-01-01T00:00:00.000000+0000\n"
"2000-01-02T00:00:00.000000,2000-01-02T00:00:00.000000+0000\n"
)
assert result == expected
@pytest.mark.parametrize(
("fmt", "expected"),
[
(None, "dt\n2022-01-02\n"),
("%Y", "dt\n2022\n"),
("%m", "dt\n01\n"),
("%m$%d", "dt\n01$02\n"),
],
)
def test_date_format(chunk_override: None, fmt: str, expected: str) -> None:
df = pl.DataFrame({"dt": [date(2022, 1, 2)]})
csv = df.write_csv(date_format=fmt)
assert csv == expected
@pytest.mark.parametrize(
("fmt", "expected"),
[
(None, "dt\n16:15:30.000000000\n"),
("%R", "dt\n16:15\n"),
],
)
def test_time_format(chunk_override: None, fmt: str, expected: str) -> None:
df = pl.DataFrame({"dt": [time(16, 15, 30)]})
csv = df.write_csv(time_format=fmt)
assert csv == expected
@pytest.mark.parametrize("dtype", [pl.Float32, pl.Float64])
def test_float_precision(chunk_override: None, dtype: pl.Float32 | pl.Float64) -> None:
df = pl.Series("col", [1.0, 2.2, 3.33], dtype=dtype).to_frame()
assert df.write_csv(float_precision=None) == "col\n1.0\n2.2\n3.33\n"
assert df.write_csv(float_precision=0) == "col\n1\n2\n3\n"
assert df.write_csv(float_precision=1) == "col\n1.0\n2.2\n3.3\n"
assert df.write_csv(float_precision=2) == "col\n1.00\n2.20\n3.33\n"
assert df.write_csv(float_precision=3) == "col\n1.000\n2.200\n3.330\n"
def test_float_scientific(chunk_override: None) -> None:
df = (
pl.Series(
"colf64",
[3.141592653589793 * mult for mult in (1e-8, 1e-3, 1e3, 1e17)],
dtype=pl.Float64,
)
.to_frame()
.with_columns(pl.col("colf64").cast(pl.Float32).alias("colf32"))
)
assert (
df.write_csv(float_precision=None, float_scientific=False)
== "colf64,colf32\n0.00000003141592653589793,0.00000003141592586075603\n0.0031415926535897933,0.0031415927223861217\n3141.592653589793,3141.5927734375\n314159265358979300,314159265516355600\n"
)
assert (
df.write_csv(float_precision=0, float_scientific=False)
== "colf64,colf32\n0,0\n0,0\n3142,3142\n314159265358979328,314159265516355584\n"
)
assert (
df.write_csv(float_precision=1, float_scientific=False)
== "colf64,colf32\n0.0,0.0\n0.0,0.0\n3141.6,3141.6\n314159265358979328.0,314159265516355584.0\n"
)
assert (
df.write_csv(float_precision=3, float_scientific=False)
== "colf64,colf32\n0.000,0.000\n0.003,0.003\n3141.593,3141.593\n314159265358979328.000,314159265516355584.000\n"
)
assert (
df.write_csv(float_precision=None, float_scientific=True)
== "colf64,colf32\n3.141592653589793e-8,3.1415926e-8\n3.1415926535897933e-3,3.1415927e-3\n3.141592653589793e3,3.1415928e3\n3.141592653589793e17,3.1415927e17\n"
)
assert (
df.write_csv(float_precision=0, float_scientific=True)
== "colf64,colf32\n3e-8,3e-8\n3e-3,3e-3\n3e3,3e3\n3e17,3e17\n"
)
assert (
df.write_csv(float_precision=1, float_scientific=True)
== "colf64,colf32\n3.1e-8,3.1e-8\n3.1e-3,3.1e-3\n3.1e3,3.1e3\n3.1e17,3.1e17\n"
)
assert (
df.write_csv(float_precision=3, float_scientific=True)
== "colf64,colf32\n3.142e-8,3.142e-8\n3.142e-3,3.142e-3\n3.142e3,3.142e3\n3.142e17,3.142e17\n"
)
def test_skip_rows_different_field_len(chunk_override: None) -> None:
csv = io.StringIO(
textwrap.dedent(
"""\
a,b
1,A
2,
3,B
4,
"""
)
)
for empty_string, missing_value in ((True, ""), (False, None)):
csv.seek(0)
assert pl.read_csv(
csv, skip_rows_after_header=2, missing_utf8_is_empty_string=empty_string
).to_dict(as_series=False) == {
"a": [3, 4],
"b": ["B", missing_value],
}
def test_duplicated_columns(chunk_override: None) -> None:
csv = textwrap.dedent(
"""a,a
1,2
"""
)
assert pl.read_csv(csv.encode()).columns == ["a", "a_duplicated_0"]
new = ["c", "d"]
assert pl.read_csv(csv.encode(), new_columns=new).columns == new
def test_error_message(chunk_override: None) -> None:
data = io.StringIO("target,wind,energy,miso\n1,2,3,4\n1,2,1e5,1\n")
with pytest.raises(
ComputeError,
match=r"could not parse `1e5` as dtype `i64` at column 'energy' \(column number 3\)",
):
pl.read_csv(data, infer_schema_length=1)
def test_csv_categorical_lifetime(chunk_override: None) -> None:
csv = textwrap.dedent(
r"""
a,b
"needs_escape",b
"" ""needs" escape" foo"",b
"" ""needs" escape" foo"",
"""
)
df = pl.read_csv(
csv.encode(), schema_overrides={"a": pl.Categorical, "b": pl.Categorical}
)
assert df.dtypes == [pl.Categorical, pl.Categorical]
assert df.to_dict(as_series=False) == {
"a": ["needs_escape", ' "needs escape foo', ' "needs escape foo'],
"b": ["b", "b", None],
}
assert (df["a"] == df["b"]).to_list() == [False, False, None]
def test_csv_categorical_categorical_merge(chunk_override: None) -> None:
N = 50
f = io.BytesIO()
pl.DataFrame({"x": ["A"] * N + ["B"] * N}).write_csv(f)
f.seek(0)
assert pl.read_csv(
f, schema_overrides={"x": pl.Categorical}, sample_size=10
).unique(maintain_order=True)["x"].to_list() == ["A", "B"]
@pytest.mark.write_disk
def test_batched_csv_reader(chunk_override: None, foods_file_path: Path) -> None:
with pytest.deprecated_call():
reader = pl.read_csv_batched(foods_file_path, batch_size=4)
assert isinstance(reader, BatchedCsvReader)
batches = reader.next_batches(5)
assert batches is not None
out = pl.concat(batches)
assert_frame_equal(out, pl.read_csv(foods_file_path).head(out.height))
reader = pl.read_csv_batched(foods_file_path, batch_size=4, low_memory=True)
batches = reader.next_batches(10)
assert batches is not None
assert_frame_equal(pl.concat(batches), pl.read_csv(foods_file_path))
reader = pl.read_csv_batched(foods_file_path, batch_size=4, low_memory=True)
batches = reader.next_batches(10)
assert_frame_equal(pl.concat(batches), pl.read_csv(foods_file_path))
with NamedTemporaryFile() as tmp:
data = b"A\nB,ragged\nC"
tmp.write(data)
tmp.seek(0)
expected = pl.DataFrame({"A": ["B", "C"]})
batches = pl.read_csv_batched(
tmp.name,
has_header=True,
truncate_ragged_lines=True,
).next_batches(1)
assert batches is not None
assert_frame_equal(pl.concat(batches), expected)
def test_batched_csv_reader_empty(chunk_override: None, io_files_path: Path) -> None:
with pytest.deprecated_call():
empty_csv = io_files_path / "empty.csv"
with pytest.raises(NoDataError, match="empty CSV"):
pl.read_csv_batched(source=empty_csv)
reader = pl.read_csv_batched(source=empty_csv, raise_if_empty=False)
assert reader.next_batches(1) is None
def test_batched_csv_reader_all_batches(
chunk_override: None, foods_file_path: Path
) -> None:
with pytest.deprecated_call():
for new_columns in [None, ["Category", "Calories", "Fats_g", "Sugars_g"]]:
out = pl.read_csv(foods_file_path, new_columns=new_columns)
reader = pl.read_csv_batched(
foods_file_path, new_columns=new_columns, batch_size=4
)
batches = reader.next_batches(5)
batched_dfs = []
while batches:
batched_dfs.extend(batches)
batches = reader.next_batches(5)
assert all(x.height > 0 for x in batched_dfs)
batched_concat_df = pl.concat(batched_dfs, rechunk=True)
assert_frame_equal(out, batched_concat_df)
def test_batched_csv_reader_no_batches(
chunk_override: None, foods_file_path: Path
) -> None:
with pytest.deprecated_call():
reader = pl.read_csv_batched(foods_file_path, batch_size=4)
batches = reader.next_batches(0)
assert batches is None
def test_csv_single_categorical_null(chunk_override: None) -> None:
f = io.BytesIO()
pl.DataFrame(
{
"x": ["A"],
"y": [None],
"z": ["A"],
}
).write_csv(f)
f.seek(0)
df = pl.read_csv(
f,
schema_overrides={"y": pl.Categorical},
)
assert df.dtypes == [pl.String, pl.Categorical, pl.String]
assert df.to_dict(as_series=False) == {"x": ["A"], "y": [None], "z": ["A"]}
def test_csv_quoted_missing(chunk_override: None) -> None:
csv = (
'"col1"|"col2"|"col3"|"col4"\n'
'"0"|"Free text with a line\nbreak"|"123"|"456"\n'
'"1"|"Free text without a linebreak"|""|"789"\n'
'"0"|"Free text with \ntwo \nlinebreaks"|"101112"|"131415"'
)
result = pl.read_csv(
csv.encode(), separator="|", schema_overrides={"col3": pl.Int32}
)
expected = pl.DataFrame(
{
"col1": [0, 1, 0],
"col2": [
"Free text with a line\nbreak",
"Free text without a linebreak",
"Free text with \ntwo \nlinebreaks",
],
"col3": [123, None, 101112],
"col4": [456, 789, 131415],
},
schema_overrides={"col3": pl.Int32},
)
assert_frame_equal(result, expected)
def test_csv_write_tz_aware(chunk_override: None) -> None:
df = pl.DataFrame({"times": datetime(2021, 1, 1)}).with_columns(
pl.col("times")
.dt.replace_time_zone("UTC")
.dt.convert_time_zone("Europe/Zurich")
)
assert df.write_csv() == "times\n2021-01-01T01:00:00.000000+0100\n"
def test_csv_statistics_offset(chunk_override: None) -> None:
N = 5_000
csv = "\n".join(str(x) for x in range(N))
assert pl.read_csv(io.StringIO(csv), n_rows=N).height == 4999
@pytest.mark.write_disk
def test_csv_scan_categorical(chunk_override: None, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)
N = 5_000
df = pl.DataFrame({"x": ["A"] * N})
file_path = tmp_path / "test_csv_scan_categorical.csv"
df.write_csv(file_path)
result = pl.scan_csv(file_path, schema_overrides={"x": pl.Categorical}).collect()
assert result["x"].dtype == pl.Categorical
@pytest.mark.write_disk
def test_csv_scan_new_columns_less_than_original_columns(
chunk_override: None, tmp_path: Path
) -> None:
tmp_path.mkdir(exist_ok=True)
df = pl.DataFrame({"x": ["A"], "y": ["A"], "z": "A"})
file_path = tmp_path / "test_csv_scan_new_columns.csv"
df.write_csv(file_path)
result = pl.scan_csv(file_path, new_columns=["x_new", "y_new"]).collect()
assert result.columns == ["x_new", "y_new", "z"]
def test_read_csv_chunked(chunk_override: None) -> None:
"""Check that row count is properly functioning."""
N = 10_000
csv = "1\n" * N
df = pl.read_csv(io.StringIO(csv), row_index_name="count")
assert df.filter(pl.col("count") < pl.col("count").shift(1)).is_empty()
def test_read_empty_csv(chunk_override: None, io_files_path: Path) -> None:
with pytest.raises(NoDataError) as err:
pl.read_csv(io_files_path / "empty.csv")
assert "empty CSV" in str(err.value)
df = pl.read_csv(io_files_path / "empty.csv", raise_if_empty=False)
assert_frame_equal(df, pl.DataFrame())
with pytest.raises(pa.ArrowInvalid) as err:
pl.read_csv(io_files_path / "empty.csv", use_pyarrow=True)
assert "Empty CSV" in str(err.value)
df = pl.read_csv(
io_files_path / "empty.csv", raise_if_empty=False, use_pyarrow=True
)
assert_frame_equal(df, pl.DataFrame())
@pytest.mark.slow
def test_read_web_file(chunk_override: None) -> None:
url = "https://raw.githubusercontent.com/pola-rs/polars/main/examples/datasets/foods1.csv"
df = pl.read_csv(url)
assert df.shape == (27, 4)
@pytest.mark.slow
def test_csv_multiline_splits(chunk_override: None) -> None:
np.random.seed(0)
f = io.BytesIO()
def some_multiline_str(n: int) -> str:
strs = []
strs.append('"')
strs.extend(f"{'xx,' * length}" for length in np.random.randint(0, 5, n))
strs.append('"')
return "\n".join(strs)
for _ in range(4):
f.write(f"field1,field2,{some_multiline_str(5000)}\n".encode())
f.seek(0)
assert pl.read_csv(f, has_header=False).shape == (4, 3)
def test_read_csv_n_rows_outside_heuristic(chunk_override: None) -> None:
f = io.StringIO()
f.write(",,,?????????\n" * 1000)
f.write("?????????????????????????????????????????????????,,,\n")
f.write(",,,?????????\n" * 1048)
f.seek(0)
assert pl.read_csv(f, n_rows=2048, has_header=False).shape == (2048, 4)
def test_read_csv_comments_on_top_with_schema_11667(chunk_override: None) -> None:
csv = """
# This is a comment
A,B
1,Hello
2,World
""".strip()
schema = {
"A": pl.Int32(),
"B": pl.Utf8(),
}
df = pl.read_csv(io.StringIO(csv), comment_prefix="#", schema=schema)
assert df.height == 2
assert df.schema == schema
def test_write_csv_stdout_stderr(
chunk_override: None, capsys: pytest.CaptureFixture[str]
) -> None:
df = pl.DataFrame(
{
"numbers": [1, 2, 3],
"strings": ["test", "csv", "stdout"],
"dates": [date(2023, 1, 1), date(2023, 1, 2), date(2023, 1, 3)],
}
)
df.write_csv(sys.stdout)
captured = capsys.readouterr()
assert captured.out == (
"numbers,strings,dates\n"
"1,test,2023-01-01\n"
"2,csv,2023-01-02\n"
"3,stdout,2023-01-03\n"
)
df.write_csv(sys.stderr)
captured = capsys.readouterr()
assert captured.err == (
"numbers,strings,dates\n"
"1,test,2023-01-01\n"
"2,csv,2023-01-02\n"
"3,stdout,2023-01-03\n"
)
def test_csv_9929(chunk_override: None) -> None:
df = pl.DataFrame({"nrs": [1, 2, 3]})
f = io.BytesIO()
df.write_csv(f)
f.seek(0)
with pytest.raises(NoDataError):
pl.read_csv(f, skip_rows=10**6)
def test_csv_quote_styles(chunk_override: None) -> None:
class TemporalFormats(TypedDict):
datetime_format: str
time_format: str
temporal_formats: TemporalFormats = {
"datetime_format": "%Y-%m-%dT%H:%M:%S",
"time_format": "%H:%M:%S",
}
dtm = datetime(2077, 7, 5, 3, 1, 0)
dt = dtm.date()
tm = dtm.time()
df = pl.DataFrame(
{
"float": [1.0, 2.0, None],
"string": ["a", "a,bc", '"hello'],
"int": [1, 2, 3],
"bool": [True, False, None],
"date": [dt, None, dt],
"datetime": [None, dtm, dtm],
"time": [tm, tm, None],
"decimal": [D("1.0"), D("2.0"), None],
}
)
assert df.write_csv(quote_style="always", **temporal_formats) == (
'"float","string","int","bool","date","datetime","time","decimal"\n'
'"1.0","a","1","true","2077-07-05","","03:01:00","1.0"\n'
'"2.0","a,bc","2","false","","2077-07-05T03:01:00","03:01:00","2.0"\n'
'"","""hello","3","","2077-07-05","2077-07-05T03:01:00","",""\n'
)
assert df.write_csv(quote_style="necessary", **temporal_formats) == (
"float,string,int,bool,date,datetime,time,decimal\n"
"1.0,a,1,true,2077-07-05,,03:01:00,1.0\n"
'2.0,"a,bc",2,false,,2077-07-05T03:01:00,03:01:00,2.0\n'
',"""hello",3,,2077-07-05,2077-07-05T03:01:00,,\n'
)
assert df.write_csv(quote_style="never", **temporal_formats) == (
"float,string,int,bool,date,datetime,time,decimal\n"
"1.0,a,1,true,2077-07-05,,03:01:00,1.0\n"
"2.0,a,bc,2,false,,2077-07-05T03:01:00,03:01:00,2.0\n"
',"hello,3,,2077-07-05,2077-07-05T03:01:00,,\n'
)
assert df.write_csv(
quote_style="non_numeric", quote_char="8", **temporal_formats
) == (
"8float8,8string8,8int8,8bool8,8date8,8datetime8,8time8,8decimal8\n"
"1.0,8a8,1,8true8,82077-07-058,,803:01:008,1.0\n"
"2.0,8a,bc8,2,8false8,,82077-07-05T03:01:008,803:01:008,2.0\n"
',8"hello8,3,,82077-07-058,82077-07-05T03:01:008,,\n'
)
def test_ignore_errors_casting_dtypes(chunk_override: None) -> None:
csv = """inventory
10
400
90
"""
assert pl.read_csv(
source=io.StringIO(csv),
schema_overrides={"inventory": pl.Int8},
ignore_errors=True,
).to_dict(as_series=False) == {"inventory": [10, None, None, 90]}
with pytest.raises(ComputeError):
pl.read_csv(
source=io.StringIO(csv),
schema_overrides={"inventory": pl.Int8},
ignore_errors=False,
)
def test_ignore_errors_date_parser(chunk_override: None) -> None:
data_invalid_date = "int,float,date\n3,3.4,X"
with pytest.raises(ComputeError):
pl.read_csv(
source=io.StringIO(data_invalid_date),
schema_overrides={"date": pl.Date},
ignore_errors=False,
)
def test_csv_ragged_lines(chunk_override: None) -> None:
expected = {"A": ["B", "C"]}
assert (
pl.read_csv(
io.StringIO("A\nB,ragged\nC"), has_header=True, truncate_ragged_lines=True
).to_dict(as_series=False)
== expected
)
assert (
pl.read_csv(
io.StringIO("A\nB\nC,ragged"), has_header=True, truncate_ragged_lines=True
).to_dict(as_series=False)
== expected
)
for s in ["A\nB,ragged\nC", "A\nB\nC,ragged"]:
with pytest.raises(ComputeError, match=r"found more fields than defined"):
pl.read_csv(io.StringIO(s), has_header=True, truncate_ragged_lines=False)
with pytest.raises(ComputeError, match=r"found more fields than defined"):
pl.read_csv(io.StringIO(s), has_header=True, truncate_ragged_lines=False)
@pytest.mark.may_fail_auto_streaming
def test_provide_schema(chunk_override: None) -> None:
assert pl.read_csv(
io.StringIO("A\nB,ragged\nC"),
has_header=False,
schema={"A": pl.String, "B": pl.String, "C": pl.String},
).to_dict(as_series=False) == {
"A": ["A", "B", "C"],
"B": [None, "ragged", None],
"C": [None, None, None],
}
def test_custom_writable_object(chunk_override: None) -> None:
df = pl.DataFrame({"a": [10, 20, 30], "b": ["x", "y", "z"]})
class CustomBuffer:
writes: list[bytes]
def __init__(self) -> None:
self.writes = []
def write(self, data: bytes) -> int:
self.writes.append(data)
return len(data)
buf = CustomBuffer()
df.write_csv(buf)
assert b"".join(buf.writes) == b"a,b\n10,x\n20,y\n30,z\n"
@pytest.mark.parametrize(
("csv", "expected"),
[
(b"a,b\n1,2\n1,2\n", pl.DataFrame({"a": [1, 1], "b": [2, 2]})),
(b"a,b\n1,2\n1,2", pl.DataFrame({"a": [1, 1], "b": [2, 2]})),
(b"a\n1\n1\n", pl.DataFrame({"a": [1, 1]})),
(b"a\n1\n1", pl.DataFrame({"a": [1, 1]})),
],
ids=[
"multiple columns, ends with LF",
"multiple columns, ends with non-LF",
"single column, ends with LF",
"single column, ends with non-LF",
],
)
def test_read_filelike_object_12266(
chunk_override: None, csv: bytes, expected: pl.DataFrame
) -> None:
buf = io.BufferedReader(io.BytesIO(csv))
df = pl.read_csv(buf)
assert_frame_equal(df, expected)
def test_read_filelike_object_12404(chunk_override: None) -> None:
expected = pl.DataFrame({"a": [1, 1], "b": [2, 2]})
csv = expected.write_csv(line_terminator=";").encode()
buf = io.BufferedReader(io.BytesIO(csv))
df = pl.read_csv(buf, eol_char=";")
assert_frame_equal(df, expected)
def test_write_csv_bom(chunk_override: None) -> None:
df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
f = io.BytesIO()
df.write_csv(f, include_bom=True)
f.seek(0)
assert f.read() == b"\xef\xbb\xbfa,b\n1,1\n2,2\n3,3\n"
def test_write_csv_batch_size_zero(chunk_override: None) -> None:
df = pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
f = io.BytesIO()
with pytest.raises(ValueError, match="invalid zero value"):
df.write_csv(f, batch_size=0)
def test_empty_csv_no_raise(chunk_override: None) -> None:
assert pl.read_csv(io.StringIO(), raise_if_empty=False, has_header=False).shape == (
0,
0,
)
def test_csv_no_new_line_last(chunk_override: None) -> None:
csv = io.StringIO("a b\n1 1\n2 2\n3 2.1")
assert pl.read_csv(csv, separator=" ").to_dict(as_series=False) == {
"a": [1, 2, 3],
"b": [1.0, 2.0, 2.1],
}
def test_invalid_csv_raise(chunk_override: None) -> None:
with pytest.raises(ComputeError):
pl.read_csv(
b"""
"WellCompletionCWI","FacilityID","ProductionMonth","ReportedHoursProdInj","ProdAccountingProductType","ReportedVolume","VolumetricActivityType"
"SK0000608V001","SK BT B1H3780","202001","","GAS","1.700","PROD"
"SK0127960V000","SK BT 0018977","202001","","GAS","45.500","PROD"
"SK0127960V000","SK BT 0018977","
""".strip()
)
@pytest.mark.write_disk
def test_partial_read_compressed_file(
chunk_override: None, tmp_path: Path, plmonkeypatch: PlMonkeyPatch
) -> None:
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "0")
df = pl.DataFrame(
{"idx": range(1_000), "dt": date(2025, 12, 31), "txt": "hello world"}
)
tmp_path.mkdir(exist_ok=True)
file_path = tmp_path / "large.csv.gz"
bytes_io = io.BytesIO()
df.write_csv(bytes_io)
bytes_io.seek(0)
with gzip.open(file_path, mode="wb") as f:
f.write(bytes_io.getvalue())
df = pl.read_csv(
file_path, skip_rows=40, has_header=False, skip_rows_after_header=20, n_rows=30
)
assert df.shape == (30, 3)
def test_read_csv_invalid_schema_overrides(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
a,b
1,foo
2,bar
3,baz
"""
)
f = io.StringIO(csv)
with pytest.raises(
TypeError, match="`schema_overrides` should be of type list or dict"
):
pl.read_csv(f, schema_overrides={pl.Int64, pl.String})
def test_read_csv_invalid_schema_overrides_length(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
a,b
1,foo
2,bar
3,baz
"""
)
f = io.StringIO(csv)
if (
os.getenv("POLARS_AUTO_NEW_STREAMING", os.getenv("POLARS_FORCE_NEW_STREAMING"))
== "1"
):
err = TypeError
match = "expected 'schema_overrides' dict, found 'list'"
else:
err = InvalidOperationError
match = "The number of schema overrides must be less than or equal to the number of fields"
with pytest.raises(err, match=match):
pl.read_csv(f, schema_overrides=[pl.Int64, pl.String, pl.Boolean])
def test_schema_overrides_dict_with_nonexistent_columns(chunk_override: None) -> None:
"""Test for issue #20903: schema_overrides should work consistently.
When schema_overrides is a dict with the same length as the number of columns
but contains non-existent column names, it should still work by name (not position).
Currently this fails because Polars incorrectly applies the overrides positionally
when the dict length matches the column count.
"""
csv = textwrap.dedent(
"""\
a,b
1,hi
"""
)
f = io.StringIO(csv)
df = pl.read_csv(f, schema_overrides={"a": pl.Int64, "c": pl.Int64})
assert df.schema == {"a": pl.Int64, "b": pl.String}
assert df.to_dict(as_series=False) == {"a": [1], "b": ["hi"]}
f = io.StringIO(csv)
df2 = pl.read_csv(f, schema_overrides={"a": pl.Int64, "c": pl.Int64, "d": pl.Int64})
assert df2.schema == {"a": pl.Int64, "b": pl.String}
assert df2.to_dict(as_series=False) == {"a": [1], "b": ["hi"]}
@pytest.mark.parametrize("columns", [["b"], "b"])
def test_read_csv_single_column(chunk_override: None, columns: list[str] | str) -> None:
csv = textwrap.dedent(
"""\
a,b,c
1,2,3
4,5,6
"""
)
f = io.StringIO(csv)
df = pl.read_csv(f, columns=columns)
expected = pl.DataFrame({"b": [2, 5]})
assert_frame_equal(df, expected)
def test_csv_invalid_escape_utf8_14960(chunk_override: None) -> None:
with pytest.raises(ComputeError, match=r"Field .* is not properly escaped"):
pl.read_csv('col1\n""•'.encode())
def test_csv_invalid_escape(chunk_override: None) -> None:
with pytest.raises(ComputeError):
pl.read_csv(b'col1,col2\n"a,b')
def test_csv_escape_cf_15349(chunk_override: None) -> None:
f = io.BytesIO()
df = pl.DataFrame({"test": ["normal", "with\rcr"]})
df.write_csv(f)
f.seek(0)
assert f.read() == b'test\nnormal\n"with\rcr"\n'
@pytest.mark.write_disk
@pytest.mark.parametrize("streaming", [True, False])
def test_skip_rows_after_header(
chunk_override: None, tmp_path: Path, streaming: bool
) -> None:
tmp_path.mkdir(exist_ok=True)
path = tmp_path / "data.csv"
df = pl.Series("a", [1, 2, 3, 4, 5], dtype=pl.Int64).to_frame()
df.write_csv(path)
skip = 2
expect = df.slice(skip)
out = pl.scan_csv(path, skip_rows_after_header=skip).collect(
engine="streaming" if streaming else "in-memory"
)
assert_frame_equal(out, expect)
@pytest.mark.parametrize("use_pyarrow", [True, False])
def test_skip_rows_after_header_pyarrow(
chunk_override: None, use_pyarrow: bool
) -> None:
csv = textwrap.dedent(
"""\
foo,bar
1,2
3,4
5,6
"""
)
f = io.StringIO(csv)
df = pl.read_csv(f, skip_rows_after_header=1, use_pyarrow=use_pyarrow)
expected = pl.DataFrame({"foo": [3, 5], "bar": [4, 6]})
assert_frame_equal(df, expected)
def test_read_csv_float_type_decimal_comma(chunk_override: None) -> None:
floats = b"a;b\n12,239;1,233\n13,908;87,32"
read = pl.read_csv(floats, decimal_comma=True, separator=";")
assert read.dtypes == [pl.Float64] * 2
assert read.to_dict(as_series=False) == {"a": [12.239, 13.908], "b": [1.233, 87.32]}
def test_read_csv_decimal_type_decimal_comma_24414(chunk_override: None) -> None:
schema = pl.Schema({"a": pl.Decimal(scale=3), "b": pl.Decimal(scale=2)})
csv_dot = b"a,b\n12.239,1.233\n13.908,87.32"
out_dot = pl.read_csv(csv_dot, schema=schema)
csv = b"a;b\n12,239;1,233\n13,908;87,32"
out = pl.read_csv(csv, decimal_comma=True, separator=";", schema=schema)
assert_frame_equal(out_dot, out)
csv = b"a;b\n 12,239;1,233\n 13,908;87,32"
out = pl.read_csv(csv, decimal_comma=True, separator=";", schema=schema)
assert_frame_equal(out_dot, out)
csv = b'a,b\n"12,239","1,233"\n"13,908","87,32"'
out = pl.read_csv(csv, decimal_comma=True, schema=schema)
assert_frame_equal(out_dot, out)
@pytest.mark.may_fail_auto_streaming
def test_fsspec_not_available(chunk_override: None) -> None:
with PlMonkeyPatch.context() as mp:
mp.setenv("POLARS_FORCE_ASYNC", "0")
mp.setattr("polars.io._utils._FSSPEC_AVAILABLE", False)
with pytest.raises(
ImportError, match=r"`fsspec` is required for `storage_options` argument"
):
pl.read_csv(
"s3://foods/cabbage.csv",
storage_options={"key": "key", "secret": "secret"},
)
@pytest.mark.may_fail_auto_streaming
def test_read_csv_dtypes_deprecated(chunk_override: None) -> None:
csv = textwrap.dedent(
"""\
a,b,c
1,2,3
4,5,6
"""
)
f = io.StringIO(csv)
with pytest.deprecated_call():
df = pl.read_csv(f, dtypes=[pl.Int8, pl.Int8, pl.Int8])
expected = pl.DataFrame(
{"a": [1, 4], "b": [2, 5], "c": [3, 6]},
schema={"a": pl.Int8, "b": pl.Int8, "c": pl.Int8},
)
assert_frame_equal(df, expected)
def test_projection_applied_on_file_with_no_rows_16606(
chunk_override: None, tmp_path: Path
) -> None:
tmp_path.mkdir(exist_ok=True)
path = tmp_path / "data.csv"
data = """\
a,b,c,d
"""
with path.open("w") as f:
f.write(data)
columns = ["a", "b"]
out = pl.read_csv(path, columns=columns).columns
assert out == columns
out = pl.scan_csv(path).select(columns).collect().columns
assert out == columns
@pytest.mark.write_disk
def test_write_csv_to_dangling_file_17328(
chunk_override: None, df_no_lists: pl.DataFrame, tmp_path: Path
) -> None:
tmp_path.mkdir(exist_ok=True)
df_no_lists.write_csv((tmp_path / "dangling.csv").open("w"))
@pytest.mark.may_fail_cloud
@pytest.mark.write_disk
def test_write_csv_raise_on_non_utf8_17328(
chunk_override: None, df_no_lists: pl.DataFrame, tmp_path: Path
) -> None:
tmp_path.mkdir(exist_ok=True)
with pytest.raises(InvalidOperationError, match="file encoding is not UTF-8"):
df_no_lists.write_csv((tmp_path / "dangling.csv").open("w", encoding="gbk"))
@pytest.mark.may_fail_auto_streaming
@pytest.mark.write_disk
def test_write_csv_appending_17543(chunk_override: None, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)
df = pl.DataFrame({"col": ["value"]})
with (tmp_path / "append.csv").open("w") as f:
f.write("# test\n")
df.write_csv(f)
with (tmp_path / "append.csv").open("r") as f:
assert f.readline() == "# test\n"
assert pl.read_csv(f).equals(df)
def test_write_csv_passing_params_18825(chunk_override: None) -> None:
df = pl.DataFrame({"c1": [1, 2], "c2": [3, 4]})
buffer = io.StringIO()
df.write_csv(buffer, separator="\t", include_header=False)
result_str = buffer.getvalue()
expected_str = "1\t3\n2\t4\n"
assert result_str == expected_str
@pytest.mark.parametrize(
("dtype", "df"),
[
(pl.Decimal(scale=2), pl.DataFrame({"x": ["0.1"]}).cast(pl.Decimal(scale=2))),
(pl.Categorical, pl.DataFrame({"x": ["A"]})),
(
pl.Time,
pl.DataFrame({"x": ["12:15:00"]}).with_columns(
pl.col("x").str.strptime(pl.Time)
),
),
],
)
def test_read_csv_cast_unparsable_later(
chunk_override: None, dtype: pl.Decimal | pl.Categorical | pl.Time, df: pl.DataFrame
) -> None:
f = io.BytesIO()
df.write_csv(f)
f.seek(0)
assert df.equals(pl.read_csv(f, schema={"x": dtype}))
def test_csv_double_new_line(chunk_override: None) -> None:
assert pl.read_csv(b"a,b,c\n\n", has_header=False).to_dict(as_series=False) == {
"column_1": ["a", None],
"column_2": ["b", None],
"column_3": ["c", None],
}
def test_csv_quoted_newlines_skip_rows_19535(chunk_override: None) -> None:
assert_frame_equal(
pl.read_csv(
b"""\
"a\nb"
0
""",
has_header=False,
skip_rows=1,
new_columns=["x"],
),
pl.DataFrame({"x": 0}),
)
@pytest.mark.write_disk
def test_csv_read_time_dtype(chunk_override: None, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)
path = tmp_path / "1"
path.write_bytes(b"""\
time
00:00:00.000000000
""")
df = pl.Series("time", [0]).cast(pl.Time()).to_frame()
assert_frame_equal(pl.read_csv(path, try_parse_dates=True), df)
assert_frame_equal(pl.read_csv(path, schema_overrides={"time": pl.Time}), df)
assert_frame_equal(pl.scan_csv(path, try_parse_dates=True).collect(), df)
assert_frame_equal(pl.scan_csv(path, schema={"time": pl.Time}).collect(), df)
assert_frame_equal(
pl.scan_csv(path, schema={"time": pl.Time}).collect(engine="streaming"), df
)
def test_csv_try_parse_dates_leading_zero_8_digits_22167(chunk_override: None) -> None:
result = pl.read_csv(
io.StringIO(
"a\n2025-04-06T18:56:42.617736974Z\n2025-04-06T18:57:42.77756192Z\n2025-04-06T18:58:44.56928733Z"
),
try_parse_dates=True,
)
expected = pl.DataFrame(
{
"a": [
datetime(2025, 4, 6, 18, 56, 42, 617736, tzinfo=timezone.utc),
datetime(2025, 4, 6, 18, 57, 42, 777561, tzinfo=timezone.utc),
datetime(2025, 4, 6, 18, 58, 44, 569287, tzinfo=timezone.utc),
]
}
)
assert_frame_equal(result, expected)
@pytest.mark.may_fail_auto_streaming
def test_csv_read_time_schema_overrides(chunk_override: None) -> None:
df = pl.Series("time", [0]).cast(pl.Time()).to_frame()
assert_frame_equal(
pl.read_csv(
b"""\
time
00:00:00.000000000
""",
schema_overrides=[pl.Time],
),
df,
)
def test_batched_csv_schema_overrides(
chunk_override: None, io_files_path: Path
) -> None:
with pytest.deprecated_call():
foods = io_files_path / "foods1.csv"
batched = pl.read_csv_batched(foods, schema_overrides={"calories": pl.String})
res = batched.next_batches(1)
assert res is not None
b = res[0]
assert b["calories"].dtype == pl.String
assert b.width == 4
def test_csv_ragged_lines_20062(chunk_override: None) -> None:
buf = io.StringIO("""A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V
,"B",,,,,,,,,A,,,,,,,,
a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,0.0,1.0,2.0,3.0
""")
assert pl.read_csv(buf, truncate_ragged_lines=True).to_dict(as_series=False) == {
"A": [None, "a"],
"B": ["B", "a"],
"C": [None, "a"],
"D": [None, "a"],
"E": [None, "a"],
"F": [None, "a"],
"G": [None, "a"],
"H": [None, "a"],
"I": [None, "a"],
"J": [None, "a"],
"K": ["A", "a"],
"L": [None, "a"],
"M": [None, "a"],
"N": [None, "a"],
"O": [None, "a"],
"P": [None, "a"],
"Q": [None, "a"],
"R": [None, "a"],
"S": [None, "a"],
"T": [None, 0.0],
"U": [None, 1.0],
"V": [None, 2.0],
}
def test_csv_skip_lines(chunk_override: None) -> None:
fh = io.BytesIO()
fh.write(b'Header line "1" -> quote count 2\n')
fh.write(b'Header line "2"" -> quote count 3\n')
fh.write(b'Header line "3" -> quote count 2 => Total 7 quotes ERROR\n')
fh.write(b"column_01, column_02, column_03\n")
fh.write(b"123.12, 21, 99.9\n")
fh.write(b"65.84, 75, 64.7\n")
fh.seek(0)
df = pl.read_csv(fh, has_header=True, skip_lines=3)
assert df.to_dict(as_series=False) == {
"column_01": [123.12, 65.84],
" column_02": [" 21", " 75"],
" column_03": [" 99.9", " 64.7"],
}
fh.seek(0)
assert_frame_equal(pl.scan_csv(fh, has_header=True, skip_lines=3).collect(), df)
def test_csv_invalid_quoted_comment_line(chunk_override: None) -> None:
assert pl.read_csv(
b'#"Comment\nColA\tColB\n1\t2', separator="\t", comment_prefix="#"
).to_dict(as_series=False) == {"ColA": [1], "ColB": [2]}
@pytest.mark.may_fail_auto_streaming
def test_csv_compressed_new_columns_19916(chunk_override: None) -> None:
n_rows = 100
df = pl.DataFrame(
{
"a": range(n_rows),
"b": range(n_rows),
"c": range(n_rows),
"d": range(n_rows),
"e": range(n_rows),
"f": range(n_rows),
}
)
b = zstandard.compress(df.write_csv(include_header=False).encode())
q = pl.scan_csv(b, has_header=False, new_columns=["a", "b", "c", "d", "e", "f"])
assert_frame_equal(q.collect(), df)
def test_trailing_separator_8240(chunk_override: None) -> None:
csv = "A|B|"
expected = pl.DataFrame(
{"column_1": ["A"], "column_2": ["B"], "column_3": [None]},
schema={"column_1": pl.String, "column_2": pl.String, "column_3": pl.String},
)
result = pl.read_csv(io.StringIO(csv), separator="|", has_header=False)
assert_frame_equal(result, expected)
result = pl.scan_csv(io.StringIO(csv), separator="|", has_header=False).collect()
assert_frame_equal(result, expected)
def test_header_only_column_selection_17173(chunk_override: None) -> None:
csv = "A,B"
result = pl.read_csv(io.StringIO(csv), columns=["B"])
expected = pl.Series("B", [], pl.String()).to_frame()
assert_frame_equal(result, expected)
def test_csv_enum_raise(chunk_override: None) -> None:
ENUM_DTYPE = pl.Enum(["foo", "bar"])
with (
io.StringIO("col\nfoo\nbaz\n") as csv,
pytest.raises(pl.exceptions.ComputeError, match="could not parse `baz`"),
):
pl.read_csv(
csv,
schema={"col": ENUM_DTYPE},
)
def test_csv_no_header_ragged_lines_1505(chunk_override: None) -> None:
csv = io.StringIO("""a,b,c
a,b,c,d,e,f
g,h,i,j,k""")
assert pl.read_csv(csv, has_header=False).to_dict(as_series=False) == {
"column_1": ["a", "a", "g"],
"column_2": ["b", "b", "h"],
"column_3": ["c", "c", "i"],
"column_4": [None, "d", "j"],
"column_5": [None, "e", "k"],
"column_6": [None, "f", None],
}
@pytest.mark.parametrize(
("filter_value", "expected"),
[
(10, "a,b,c\n10,20,99\n"),
(11, "a,b,c\n11,21,99\n"),
(12, "a,b,c\n12,22,99\n12,23,99\n"),
],
)
def test_csv_write_scalar_empty_chunk_20273(
chunk_override: None, filter_value: int, expected: str
) -> None:
df1 = pl.DataFrame(
{
"a": [10, 11, 12, 12],
"b": [20, 21, 22, 23],
},
)
df2 = pl.DataFrame({"c": [99]})
df3 = df1.join(df2, how="cross").filter(pl.col("a").eq(filter_value))
assert df3.write_csv() == expected
def test_csv_malformed_quote_in_unenclosed_field_22395(chunk_override: None) -> None:
malformed = b"""\
a,b,x"y
a,x"y,c
x"y,b,c
"""
with pytest.raises(pl.exceptions.ComputeError):
pl.read_csv(malformed, has_header=False)
with pytest.raises(pl.exceptions.ComputeError):
pl.scan_csv(malformed, has_header=False).collect()
with pytest.warns(UserWarning, match="CSV malformed:"):
pl.read_csv(malformed, has_header=False, ignore_errors=True)
malformed_long = malformed + ("k,l,m\n" * 10).encode()
with pytest.raises(pl.exceptions.ComputeError):
pl.read_csv(malformed_long, has_header=False)
with pytest.raises(pl.exceptions.ComputeError):
pl.scan_csv(malformed_long, has_header=False).collect()
@pytest.mark.parametrize(
(
"separator",
"quote_style",
"scientific",
"precision",
"decimal_comma",
"expected",
),
[
(",", None, None, None, False, b"123.75,60.0,9\n"),
(",", None, None, None, True, b'"123,75","60,0",9\n'),
(";", None, None, None, True, b"123,75;60,0;9\n"),
(",", None, None, 0, True, b"124,60,9\n"),
(",", None, None, 3, True, b'"123,750","60,000",9\n'),
(";", None, None, 0, True, b"124;60;9\n"),
(";", None, None, 3, True, b"123,750;60,000;9\n"),
(",", None, True, None, False, b"1.2375e2,6e1,9\n"),
(",", None, True, None, True, b'"1,2375e2","6e1",9\n'),
(",", None, False, None, False, b"123.75,60,9\n"),
(",", None, False, None, True, b'"123,75","60",9\n'),
(";", None, True, None, True, b"1,2375e2;6e1;9\n"),
(";", None, False, None, True, b"123,75;60;9\n"),
(",", None, True, 0, True, b"1e2,6e1,9\n"),
(",", None, True, 3, True, b'"1,238e2","6,000e1",9\n'),
(",", None, True, 4, True, b'"1,2375e2","6,0000e1",9\n'),
(",", None, True, 5, True, b'"1,23750e2","6,00000e1",9\n'),
(",", None, False, 0, True, b"124,60,9\n"),
(",", None, False, 3, True, b'"123,750","60,000",9\n'),
(",", "always", None, None, True, b'"123,75","60,0","9"\n'),
(",", "necessary", None, None, True, b'"123,75","60,0",9\n'),
(",", "non_numeric", None, None, True, b'"123,75","60,0",9\n'),
(",", "never", None, None, True, b"123,75,60,0,9\n"),
(";", "always", None, None, True, b'"123,75";"60,0";"9"\n'),
(";", "necessary", None, None, True, b"123,75;60,0;9\n"),
(";", "non_numeric", None, None, True, b"123,75;60,0;9\n"),
(";", "never", None, None, True, b"123,75;60,0;9\n"),
],
)
def test_write_csv_float_type_decimal_comma(
chunk_override: None,
separator: str,
quote_style: CsvQuoteStyle | None,
scientific: bool | None,
precision: int | None,
decimal_comma: bool,
expected: bytes,
) -> None:
df = pl.DataFrame({"a": [123.75], "b": [60.0], "c": [9]})
buf = io.BytesIO()
df.write_csv(
buf,
separator=separator,
quote_style=quote_style,
float_precision=precision,
float_scientific=scientific,
decimal_comma=decimal_comma,
include_header=False,
)
buf.seek(0)
assert buf.read() == expected
df32 = df.with_columns(pl.col("a", "b").cast(pl.Float32))
buf.seek(0)
df32.write_csv(
buf,
separator=separator,
quote_style=quote_style,
float_precision=precision,
float_scientific=scientific,
decimal_comma=decimal_comma,
include_header=False,
)
buf.seek(0)
assert buf.read() == expected
round_trip = not (
(not scientific and precision is not None and precision <= 2)
or (scientific and precision is not None and precision != 4)
or (quote_style == "never" and decimal_comma and separator == ",")
)
if round_trip:
buf.seek(0)
df.write_csv(
buf,
separator=separator,
quote_style=quote_style,
float_precision=precision,
float_scientific=scientific,
decimal_comma=decimal_comma,
include_header=True,
)
buf.seek(0)
out = pl.read_csv(
buf, decimal_comma=decimal_comma, separator=separator, schema=df.schema
)
assert_frame_equal(df, out)
buf.seek(0)
df.lazy().sink_csv(
buf,
separator=separator,
quote_style=quote_style,
float_precision=precision,
float_scientific=scientific,
decimal_comma=decimal_comma,
include_header=True,
)
buf.seek(0)
out = pl.scan_csv(
buf, decimal_comma=decimal_comma, separator=separator, schema=df.schema
).collect()
assert_frame_equal(df, out)
@pytest.mark.parametrize(
(
"separator",
"quote_style",
"decimal_comma",
"expected",
),
[
(",", None, False, b"123.75,60.0,9\n"),
(",", None, True, b'"123,75","60,0",9\n'),
(";", None, False, b"123.75;60.0;9\n"),
(";", None, True, b"123,75;60,0;9\n"),
(",", "always", True, b'"123,75","60,0","9"\n'),
(",", "necessary", True, b'"123,75","60,0",9\n'),
(",", "non_numeric", True, b'"123,75","60,0",9\n'),
(",", "never", True, b"123,75,60,0,9\n"),
(";", "always", True, b'"123,75";"60,0";"9"\n'),
(";", "necessary", True, b"123,75;60,0;9\n"),
],
)
def test_write_csv_decimal_type_decimal_comma(
chunk_override: None,
separator: str,
quote_style: CsvQuoteStyle | None,
decimal_comma: bool,
expected: bytes,
) -> None:
schema = {
"a": pl.Decimal(scale=2),
"b": pl.Decimal(scale=1),
"c": pl.Decimal(scale=0),
}
df = pl.DataFrame(
data={
"a": [123.75],
"b": [60.0],
"c": [9],
},
schema=schema,
)
buf = io.BytesIO()
df.write_csv(
buf,
separator=separator,
quote_style=quote_style,
decimal_comma=decimal_comma,
include_header=False,
)
buf.seek(0)
assert buf.read() == expected
round_trip = not (quote_style == "never" and decimal_comma and separator == ",")
if round_trip:
print("BOO")
buf.seek(0)
df.write_csv(
buf,
separator=separator,
quote_style=quote_style,
decimal_comma=decimal_comma,
include_header=True,
)
buf.seek(0)
out = pl.read_csv(
buf, decimal_comma=decimal_comma, separator=separator, schema=df.schema
)
assert_frame_equal(df, out)
buf.seek(0)
df.lazy().sink_csv(
buf,
separator=separator,
quote_style=quote_style,
decimal_comma=decimal_comma,
include_header=True,
)
buf.seek(0)
out = pl.scan_csv(
buf, decimal_comma=decimal_comma, separator=separator, schema=df.schema
).collect()
assert_frame_equal(df, out)
def test_write_csv_large_number_autoformat_decimal_comma(chunk_override: None) -> None:
df = pl.DataFrame(
{
"a": [12345678901234567890.123457890],
"b": [1_000_000_000_000_000_000_000_000.0],
}
)
buf = io.BytesIO()
df.write_csv(
buf,
decimal_comma=True,
include_header=False,
)
buf.seek(0)
expected = b'"1,2345678901234567e+19","1e+24"\n'
assert buf.read() == expected
def test_stop_split_fields_simd_23651(chunk_override: None) -> None:
csv = """C,NEMP.WORLD,DAILY,AEMO,PUBLIC,2025/05/29,04:05:04,0000000465336084,,0000000465336084
I,DISPATCH,CASESOLUTION,1,SETTLEMENTDATE,RUNNO,INTERVENTION,CASESUBTYPE,SOLUTIONSTATUS,SPDVERSION,NONPHYSICALLOSSES,TOTALOBJECTIVE,TOTALAREAGENVIOLATION,TOTALINTERCONNECTORVIOLATION,TOTALGENERICVIOLATION,TOTALRAMPRATEVIOLATION,TOTALUNITMWCAPACITYVIOLATION,TOTAL5MINVIOLATION,TOTALREGVIOLATION,TOTAL6SECVIOLATION,TOTAL60SECVIOLATION,TOTALASPROFILEVIOLATION,TOTALFASTSTARTVIOLATION,TOTALENERGYOFFERVIOLATION,LASTCHANGED
D,DISPATCH,CASESOLUTION,1,"2025/05/28 04:05:00",1,0,,0,,0,-60421745.3380,0,0,0,0,0,,,,,0,0,0,"2025/05/28 04:00:04"
D,DISPATCH,CASESOLUTION,1,"2025/05/28 04:10:00",1,0,,0,,0,-60871813.2780,0,0,0,0,0,,,,,0,0,0,"2025/05/28 04:05:04"
D,DISPATCH,CASESOLUTION,1,"2025/05/28 04:15:00",1,0,,1,,0,-61228162.2270,0,0,0,0,0,,,,,0,0,0,"2025/05/28 04:10:03"
D,DISPATCH,CASESOLUTION,1,"2025/05/28 04:20:00",1,0,,1,,0,-60901926.5760,0,0,0,0,0,,,,,0,0,0,"2025/05/28 04:15:03"
D,DISPATCH,CASESOLUTION,1,"""
buf = io.StringIO(csv)
schema = {f"column_{i + 1}": pl.String for i in range(27)}
buf = io.StringIO(csv)
df = pl.read_csv(buf, truncate_ragged_lines=True, has_header=False, schema=schema)
assert df.shape == (7, 27)
assert df["column_26"].null_count() == 7
def test_read_csv_decimal_header_only_200008(chunk_override: None) -> None:
csv = "a,b"
df = pl.read_csv(csv.encode(), schema={"a": pl.Decimal(scale=2), "b": pl.String})
assert df.dtypes == [pl.Decimal(scale=2), pl.String]
@pytest.mark.parametrize(
"dt",
[
pl.Enum(["a"]),
pl.Categorical(),
],
)
def test_write_csv_categorical_23939(chunk_override: None, dt: pl.DataType) -> None:
n_rows = pl.thread_pool_size() * 1024 + 1
df = pl.DataFrame(
{
"b": pl.Series(["a"] * n_rows, dtype=dt),
}
)
expected = "b\n" + "a\n" * n_rows
assert df.write_csv() == expected
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
@pytest.mark.parametrize(
"csv_str", [b"A,B\n1,x\n2,y\n3,z", b"A,B\n1,x\n2,y\n3,z\n", b"\n\n\n\n2,u"]
)
def test_skip_more_lines_than_empty_25852(
chunk_override: None, read_fn: str, csv_str: bytes
) -> None:
with pytest.raises(pl.exceptions.NoDataError):
getattr(pl, read_fn)(csv_str, skip_lines=5).lazy().collect()
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
def test_skip_more_lines_no_raise_25852(chunk_override: None, read_fn: str) -> None:
csv_str = b"A,B\n1,x\n2,y"
result = (
getattr(pl, read_fn)(
csv_str,
skip_lines=100,
schema={"col1": pl.String, "col2": pl.String},
has_header=False,
raise_if_empty=False,
)
.lazy()
.collect()
)
expected = pl.DataFrame(schema={"col1": pl.String, "col2": pl.String})
assert_frame_equal(result, expected)
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
def test_skip_crlf(chunk_override: None, read_fn: str) -> None:
csv_str = b"\r\n\r\nline before <3a>\r\nA,B\r\n1,2"
df = getattr(pl, read_fn)(csv_str, skip_rows=1).lazy().collect()
expected = pl.DataFrame(
[
pl.Series("A", [1], pl.Int64),
pl.Series("B", [2], pl.Int64),
]
)
assert_frame_equal(df, expected)
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
def test_only_empty_quote_string(chunk_override: None, read_fn: str) -> None:
csv_str = b'""'
df = getattr(pl, read_fn)(csv_str).lazy().collect()
expected = pl.DataFrame({"": []}, schema={"": pl.String})
assert_frame_equal(df, expected)
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
def test_only_header_with_newline(chunk_override: None, read_fn: str) -> None:
csv_str = b"xx\n"
df = getattr(pl, read_fn)(csv_str).lazy().collect()
expected = pl.DataFrame([pl.Series("xx", [], pl.String)])
assert_frame_equal(df, expected)
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
def test_single_char_input_25908(chunk_override: None, read_fn: str) -> None:
csv_str = b"x"
df = getattr(pl, read_fn)(csv_str).lazy().collect()
expected = pl.DataFrame([pl.Series("x", [], pl.String)])
assert_frame_equal(df, expected)
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
def test_csv_skip_rows_with_interleaved_comments_25840(
chunk_override: None, read_fn: str
) -> None:
csv_data = b"// x//\na,b\n//a, b\n,\nu\n2"
result = (
getattr(pl, read_fn)(csv_data, comment_prefix="//", skip_rows=2)
.lazy()
.collect()
)
expected = pl.DataFrame([pl.Series("u", [2], dtype=pl.Int64)])
assert_frame_equal(result, expected)
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
def test_csv_comment_after_header_25841(chunk_override: None, read_fn: str) -> None:
csv_data = b"RowA,RowB,RowC\n// Comment line\na,b,c"
result = getattr(pl, read_fn)(csv_data, comment_prefix="//").lazy().collect()
expected = pl.DataFrame({"RowA": ["a"], "RowB": ["b"], "RowC": ["c"]})
assert_frame_equal(result, expected)
csv_data2 = b"A,B\n# Comment 1\n# Comment 2\n1,2\n3,4"
result2 = getattr(pl, read_fn)(csv_data2, comment_prefix="#").lazy().collect()
expected2 = pl.DataFrame({"A": [1, 3], "B": [2, 4]})
assert_frame_equal(result2, expected2)
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
def test_empty_csv(chunk_override: None, read_fn: str) -> None:
csv_str = b""
df = getattr(pl, read_fn)(csv_str, raise_if_empty=False).lazy().collect()
expected = pl.DataFrame([])
assert_frame_equal(df, expected)
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
def test_empty_csv_raise(chunk_override: None, read_fn: str) -> None:
csv_str = b""
with pytest.raises(pl.exceptions.NoDataError):
getattr(pl, read_fn)(csv_str, raise_if_empty=True).lazy().collect()
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
def test_skip_lines_and_rows_raise(chunk_override: None, read_fn: str) -> None:
csv_str = b"A,1,2,3"
with pytest.raises(pl.exceptions.InvalidOperationError):
getattr(pl, read_fn)(csv_str, skip_lines=1, skip_rows=2).lazy().collect()
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
@pytest.mark.parametrize(
("csv_str", "expected"),
[
(b"", []),
(b"A", [pl.Series("A", [], pl.String)]),
(b"A\n1\n2\n3", [pl.Series("A", [1, 2, 3])]),
],
)
def test_utf8_bom(
chunk_override: None, read_fn: str, csv_str: bytes, expected: list[pl.Series]
) -> None:
csv_str = b"\xef\xbb\xbf" + csv_str
df = getattr(pl, read_fn)(csv_str, raise_if_empty=False).lazy().collect()
assert_frame_equal(df, pl.DataFrame(expected))
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
def test_invalid_utf8_bom(chunk_override: None, read_fn: str) -> None:
csv_str = b"\xef\xaa\xbdA\n3"
df = getattr(pl, read_fn)(csv_str, raise_if_empty=False).lazy().collect()
expected = [pl.Series("諾A", [3])]
assert_frame_equal(df, pl.DataFrame(expected))
def test_invalid_utf8_in_schema(chunk_override: None) -> None:
csv_str = b"\xef\xff\xbdA,B\n3,\xe0\x80\x80\n-6,x3"
lf = pl.scan_csv(csv_str)
assert lf.collect_schema() == {"���A": pl.Int64, "B": pl.String}
with pytest.raises(pl.exceptions.ComputeError):
lf.collect()
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
def test_provided_schema_mismatch_raise(chunk_override: None, read_fn: str) -> None:
csv_str = b"A,B\n1,2"
schema = {"A": pl.Int64}
with pytest.raises(pl.exceptions.SchemaError):
getattr(pl, read_fn)(csv_str, schema=schema).lazy().collect()
@pytest.mark.parametrize("read_fn", ["read_csv", "scan_csv"])
def test_provided_schema_mismatch_truncate(chunk_override: None, read_fn: str) -> None:
csv_str = b"A,B\n1,2"
schema = {"A": pl.Int64}
df = (
getattr(pl, read_fn)(csv_str, schema=schema, truncate_ragged_lines=True)
.lazy()
.collect()
)
expected = [pl.Series("A", [1])]
assert_frame_equal(df, pl.DataFrame(expected))
def test_read_batch_csv_deprecations_26479(foods_file_path: Path) -> None:
with pytest.warns(DeprecationWarning, match=r"`read_csv_batched` is deprecated"):
pl.read_csv_batched(foods_file_path)