Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/batched_csv.rs
7884 views
1
use std::path::PathBuf;
2
use std::sync::Mutex;
3
4
use polars::io::RowIndex;
5
use polars::io::csv::read::OwnedBatchedCsvReader;
6
use polars::io::mmap::MmapBytesReader;
7
use polars::prelude::*;
8
use polars_utils::open_file;
9
use pyo3::prelude::*;
10
use pyo3::pybacked::PyBackedStr;
11
12
use crate::error::PyPolarsErr;
13
use crate::utils::EnterPolarsExt;
14
use crate::{PyDataFrame, Wrap};
15
16
#[pyclass(frozen)]
17
#[repr(transparent)]
18
pub struct PyBatchedCsv {
19
reader: Mutex<OwnedBatchedCsvReader>,
20
}
21
22
#[pymethods]
23
#[allow(clippy::wrong_self_convention, clippy::should_implement_trait)]
24
impl PyBatchedCsv {
25
#[staticmethod]
26
#[pyo3(signature = (
27
infer_schema_length, chunk_size, has_header, ignore_errors, n_rows, skip_rows, skip_lines,
28
projection, separator, rechunk, columns, encoding, n_threads, path, schema_overrides,
29
overwrite_dtype_slice, low_memory, comment_prefix, quote_char, null_values,
30
missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header, row_index,
31
eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma)
32
)]
33
fn new(
34
infer_schema_length: Option<usize>,
35
chunk_size: usize,
36
has_header: bool,
37
ignore_errors: bool,
38
n_rows: Option<usize>,
39
skip_rows: usize,
40
skip_lines: usize,
41
projection: Option<Vec<usize>>,
42
separator: &str,
43
rechunk: bool,
44
columns: Option<Vec<String>>,
45
encoding: Wrap<CsvEncoding>,
46
n_threads: Option<usize>,
47
path: PathBuf,
48
schema_overrides: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
49
overwrite_dtype_slice: Option<Vec<Wrap<DataType>>>,
50
low_memory: bool,
51
comment_prefix: Option<&str>,
52
quote_char: Option<&str>,
53
null_values: Option<Wrap<NullValues>>,
54
missing_utf8_is_empty_string: bool,
55
try_parse_dates: bool,
56
skip_rows_after_header: usize,
57
row_index: Option<(String, IdxSize)>,
58
eol_char: &str,
59
raise_if_empty: bool,
60
truncate_ragged_lines: bool,
61
decimal_comma: bool,
62
) -> PyResult<PyBatchedCsv> {
63
let null_values = null_values.map(|w| w.0);
64
let eol_char = eol_char.as_bytes()[0];
65
let row_index = row_index.map(|(name, offset)| RowIndex {
66
name: name.into(),
67
offset,
68
});
69
let quote_char = if let Some(s) = quote_char {
70
if s.is_empty() {
71
None
72
} else {
73
Some(s.as_bytes()[0])
74
}
75
} else {
76
None
77
};
78
79
let schema_overrides = schema_overrides.map(|overwrite_dtype| {
80
overwrite_dtype
81
.iter()
82
.map(|(name, dtype)| {
83
let dtype = dtype.0.clone();
84
Field::new((&**name).into(), dtype)
85
})
86
.collect::<Schema>()
87
});
88
89
let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| {
90
overwrite_dtype
91
.iter()
92
.map(|dt| dt.0.clone())
93
.collect::<Vec<_>>()
94
});
95
96
let file = open_file(&path).map_err(PyPolarsErr::from)?;
97
let reader = Box::new(file) as Box<dyn MmapBytesReader>;
98
let reader = CsvReadOptions::default()
99
.with_infer_schema_length(infer_schema_length)
100
.with_has_header(has_header)
101
.with_n_rows(n_rows)
102
.with_skip_rows(skip_rows)
103
.with_skip_lines(skip_lines)
104
.with_ignore_errors(ignore_errors)
105
.with_projection(projection.map(Arc::new))
106
.with_rechunk(rechunk)
107
.with_chunk_size(chunk_size)
108
.with_columns(columns.map(|x| x.into_iter().map(PlSmallStr::from_string).collect()))
109
.with_n_threads(n_threads)
110
.with_dtype_overwrite(overwrite_dtype_slice.map(Arc::new))
111
.with_low_memory(low_memory)
112
.with_schema_overwrite(schema_overrides.map(Arc::new))
113
.with_skip_rows_after_header(skip_rows_after_header)
114
.with_row_index(row_index)
115
.with_raise_if_empty(raise_if_empty)
116
.with_parse_options(
117
CsvParseOptions::default()
118
.with_separator(separator.as_bytes()[0])
119
.with_encoding(encoding.0)
120
.with_missing_is_null(!missing_utf8_is_empty_string)
121
.with_comment_prefix(comment_prefix)
122
.with_null_values(null_values)
123
.with_try_parse_dates(try_parse_dates)
124
.with_quote_char(quote_char)
125
.with_eol_char(eol_char)
126
.with_truncate_ragged_lines(truncate_ragged_lines)
127
.with_decimal_comma(decimal_comma),
128
)
129
.into_reader_with_file_handle(reader);
130
131
let reader = reader.batched(None).map_err(PyPolarsErr::from)?;
132
133
Ok(PyBatchedCsv {
134
reader: Mutex::new(reader),
135
})
136
}
137
138
fn next_batches(&self, py: Python<'_>, n: usize) -> PyResult<Option<Vec<PyDataFrame>>> {
139
let reader = &self.reader;
140
let batches = py.enter_polars(move || reader.lock().unwrap().next_batches(n))?;
141
Ok(batches.map(|b| b.into_iter().map(PyDataFrame::from).collect()))
142
}
143
}
144
145