Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/csv/read/reader.rs
6939 views
1
use std::fs::File;
2
use std::path::PathBuf;
3
4
use polars_core::prelude::*;
5
6
use super::options::CsvReadOptions;
7
use super::read_impl::CoreReader;
8
use super::read_impl::batched::to_batched_owned;
9
use super::{BatchedCsvReader, OwnedBatchedCsvReader};
10
use crate::mmap::MmapBytesReader;
11
use crate::path_utils::resolve_homedir;
12
use crate::predicates::PhysicalIoExpr;
13
use crate::shared::SerReader;
14
use crate::utils::get_reader_bytes;
15
16
/// Create a new DataFrame by reading a csv file.
17
///
18
/// # Example
19
///
20
/// ```
21
/// use polars_core::prelude::*;
22
/// use polars_io::prelude::*;
23
/// use std::fs::File;
24
///
25
/// fn example() -> PolarsResult<DataFrame> {
26
/// CsvReadOptions::default()
27
/// .with_has_header(true)
28
/// .try_into_reader_with_file_path(Some("iris.csv".into()))?
29
/// .finish()
30
/// }
31
/// ```
32
#[must_use]
33
pub struct CsvReader<R>
34
where
35
R: MmapBytesReader,
36
{
37
/// File or Stream object.
38
reader: R,
39
/// Options for the CSV reader.
40
options: CsvReadOptions,
41
predicate: Option<Arc<dyn PhysicalIoExpr>>,
42
}
43
44
impl<R> CsvReader<R>
45
where
46
R: MmapBytesReader,
47
{
48
pub fn _with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
49
self.predicate = predicate;
50
self
51
}
52
53
// TODO: Investigate if we can remove this
54
pub(crate) fn with_schema(mut self, schema: SchemaRef) -> Self {
55
self.options.schema = Some(schema);
56
self
57
}
58
}
59
60
impl CsvReadOptions {
61
/// Creates a CSV reader using a file path.
62
///
63
/// # Panics
64
/// If both self.path and the path parameter are non-null. Only one of them is
65
/// to be non-null.
66
pub fn try_into_reader_with_file_path(
67
mut self,
68
path: Option<PathBuf>,
69
) -> PolarsResult<CsvReader<File>> {
70
if self.path.is_some() {
71
assert!(
72
path.is_none(),
73
"impl error: only 1 of self.path or the path parameter is to be non-null"
74
);
75
} else {
76
self.path = path;
77
};
78
79
assert!(
80
self.path.is_some(),
81
"impl error: either one of self.path or the path parameter is to be non-null"
82
);
83
84
let path = resolve_homedir(self.path.as_ref().unwrap());
85
let reader = polars_utils::open_file(&path)?;
86
let options = self;
87
88
Ok(CsvReader {
89
reader,
90
options,
91
predicate: None,
92
})
93
}
94
95
/// Creates a CSV reader using a file handle.
96
pub fn into_reader_with_file_handle<R: MmapBytesReader>(self, reader: R) -> CsvReader<R> {
97
let options = self;
98
99
CsvReader {
100
reader,
101
options,
102
predicate: Default::default(),
103
}
104
}
105
}
106
107
impl<R: MmapBytesReader> CsvReader<R> {
108
fn core_reader(&mut self) -> PolarsResult<CoreReader<'_>> {
109
let reader_bytes = get_reader_bytes(&mut self.reader)?;
110
111
let parse_options = self.options.get_parse_options();
112
113
CoreReader::new(
114
reader_bytes,
115
parse_options,
116
self.options.n_rows,
117
self.options.skip_rows,
118
self.options.skip_lines,
119
self.options.projection.clone().map(|x| x.as_ref().clone()),
120
self.options.infer_schema_length,
121
self.options.has_header,
122
self.options.ignore_errors,
123
self.options.schema.clone(),
124
self.options.columns.clone(),
125
self.options.n_threads,
126
self.options.schema_overwrite.clone(),
127
self.options.dtype_overwrite.clone(),
128
self.options.chunk_size,
129
self.predicate.clone(),
130
self.options.fields_to_cast.clone(),
131
self.options.skip_rows_after_header,
132
self.options.row_index.clone(),
133
self.options.raise_if_empty,
134
)
135
}
136
137
pub fn batched_borrowed(&mut self) -> PolarsResult<BatchedCsvReader<'_>> {
138
let csv_reader = self.core_reader()?;
139
csv_reader.batched()
140
}
141
}
142
143
impl CsvReader<Box<dyn MmapBytesReader>> {
144
pub fn batched(mut self, schema: Option<SchemaRef>) -> PolarsResult<OwnedBatchedCsvReader> {
145
if let Some(schema) = schema {
146
self = self.with_schema(schema);
147
}
148
149
to_batched_owned(self)
150
}
151
}
152
153
impl<R> SerReader<R> for CsvReader<R>
154
where
155
R: MmapBytesReader,
156
{
157
/// Create a new CsvReader from a file/stream using default read options. To
158
/// use non-default read options, first construct [CsvReadOptions] and then use
159
/// any of the `(try)_into_` methods.
160
fn new(reader: R) -> Self {
161
CsvReader {
162
reader,
163
options: Default::default(),
164
predicate: None,
165
}
166
}
167
168
/// Read the file and create the DataFrame.
169
fn finish(mut self) -> PolarsResult<DataFrame> {
170
let rechunk = self.options.rechunk;
171
let low_memory = self.options.low_memory;
172
173
let csv_reader = self.core_reader()?;
174
let mut df = csv_reader.finish()?;
175
176
// Important that this rechunk is never done in parallel.
177
// As that leads to great memory overhead.
178
if rechunk && df.first_col_n_chunks() > 1 {
179
if low_memory {
180
df.as_single_chunk();
181
} else {
182
df.as_single_chunk_par();
183
}
184
}
185
186
Ok(df)
187
}
188
}
189
190
impl<R: MmapBytesReader> CsvReader<R> {
191
/// Sets custom CSV read options.
192
pub fn with_options(mut self, options: CsvReadOptions) -> Self {
193
self.options = options;
194
self
195
}
196
}
197
198
/// Splits datatypes that cannot be natively read into a `fields_to_cast` for
199
/// post-read casting.
200
pub fn prepare_csv_schema(
201
schema: &mut SchemaRef,
202
fields_to_cast: &mut Vec<Field>,
203
) -> PolarsResult<()> {
204
// This branch we check if there are dtypes we cannot parse.
205
// We only support a few dtypes in the parser and later cast to the required dtype.
206
let mut changed = false;
207
208
let new_schema = schema
209
.iter_fields()
210
.map(|mut fld| {
211
use DataType::*;
212
213
let mut matched = true;
214
215
let out = match fld.dtype() {
216
Time => {
217
fields_to_cast.push(fld.clone());
218
fld.coerce(String);
219
PolarsResult::Ok(fld)
220
},
221
#[cfg(feature = "dtype-decimal")]
222
Decimal(precision, scale) => match (precision, scale) {
223
(_, Some(_)) => {
224
fields_to_cast.push(fld.clone());
225
fld.coerce(String);
226
PolarsResult::Ok(fld)
227
},
228
_ => Err(PolarsError::ComputeError(
229
"'scale' must be set when reading csv column as Decimal".into(),
230
)),
231
},
232
_ => {
233
matched = false;
234
PolarsResult::Ok(fld)
235
},
236
}?;
237
238
changed |= matched;
239
240
PolarsResult::Ok(out)
241
})
242
.collect::<PolarsResult<Schema>>()?;
243
244
if changed {
245
*schema = Arc::new(new_schema);
246
}
247
248
Ok(())
249
}
250
251