Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/csv/read/reader.rs
8416 views
1
use std::fs::File;
2
use std::path::PathBuf;
3
4
use polars_core::prelude::*;
5
6
use super::options::CsvReadOptions;
7
use super::read_impl::CoreReader;
8
use crate::mmap::MmapBytesReader;
9
use crate::path_utils::resolve_homedir;
10
use crate::predicates::PhysicalIoExpr;
11
use crate::shared::SerReader;
12
use crate::utils::get_reader_bytes;
13
14
/// Create a new DataFrame by reading a csv file.
15
///
16
/// # Example
17
///
18
/// ```
19
/// use polars_core::prelude::*;
20
/// use polars_io::prelude::*;
21
/// use std::fs::File;
22
///
23
/// fn example() -> PolarsResult<DataFrame> {
24
/// CsvReadOptions::default()
25
/// .with_has_header(true)
26
/// .try_into_reader_with_file_path(Some("iris.csv".into()))?
27
/// .finish()
28
/// }
29
/// ```
30
#[must_use]
31
pub struct CsvReader<R>
32
where
33
R: MmapBytesReader,
34
{
35
/// File or Stream object.
36
reader: R,
37
/// Options for the CSV reader.
38
options: CsvReadOptions,
39
predicate: Option<Arc<dyn PhysicalIoExpr>>,
40
}
41
42
impl<R> CsvReader<R>
43
where
44
R: MmapBytesReader,
45
{
46
pub fn _with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
47
self.predicate = predicate;
48
self
49
}
50
}
51
52
impl CsvReadOptions {
53
/// Creates a CSV reader using a file path.
54
///
55
/// # Panics
56
/// If both self.path and the path parameter are non-null. Only one of them is
57
/// to be non-null.
58
pub fn try_into_reader_with_file_path(
59
mut self,
60
path: Option<PathBuf>,
61
) -> PolarsResult<CsvReader<File>> {
62
if self.path.is_some() {
63
assert!(
64
path.is_none(),
65
"impl error: only 1 of self.path or the path parameter is to be non-null"
66
);
67
} else {
68
self.path = path;
69
};
70
71
assert!(
72
self.path.is_some(),
73
"impl error: either one of self.path or the path parameter is to be non-null"
74
);
75
76
let path = resolve_homedir(self.path.as_ref().unwrap());
77
let reader = polars_utils::open_file(&path)?;
78
let options = self;
79
80
Ok(CsvReader {
81
reader,
82
options,
83
predicate: None,
84
})
85
}
86
87
/// Creates a CSV reader using a file handle.
88
pub fn into_reader_with_file_handle<R: MmapBytesReader>(self, reader: R) -> CsvReader<R> {
89
let options = self;
90
91
CsvReader {
92
reader,
93
options,
94
predicate: Default::default(),
95
}
96
}
97
}
98
99
impl<R: MmapBytesReader> CsvReader<R> {
100
fn core_reader(&mut self) -> PolarsResult<CoreReader<'_>> {
101
let reader_bytes = get_reader_bytes(&mut self.reader)?;
102
103
let parse_options = self.options.get_parse_options();
104
105
CoreReader::new(
106
reader_bytes,
107
parse_options,
108
self.options.n_rows,
109
self.options.skip_rows,
110
self.options.skip_lines,
111
self.options.projection.clone().map(|x| x.as_ref().clone()),
112
self.options.infer_schema_length,
113
self.options.has_header,
114
self.options.ignore_errors,
115
self.options.schema.clone(),
116
self.options.columns.clone(),
117
self.options.n_threads,
118
self.options.schema_overwrite.clone(),
119
self.options.dtype_overwrite.clone(),
120
self.predicate.clone(),
121
self.options.fields_to_cast.clone(),
122
self.options.skip_rows_after_header,
123
self.options.row_index.clone(),
124
self.options.raise_if_empty,
125
)
126
}
127
}
128
129
impl<R> SerReader<R> for CsvReader<R>
130
where
131
R: MmapBytesReader,
132
{
133
/// Create a new CsvReader from a file/stream using default read options. To
134
/// use non-default read options, first construct [CsvReadOptions] and then use
135
/// any of the `(try)_into_` methods.
136
fn new(reader: R) -> Self {
137
CsvReader {
138
reader,
139
options: Default::default(),
140
predicate: None,
141
}
142
}
143
144
/// Read the file and create the DataFrame.
145
fn finish(mut self) -> PolarsResult<DataFrame> {
146
let rechunk = self.options.rechunk;
147
let low_memory = self.options.low_memory;
148
149
let csv_reader = self.core_reader()?;
150
let mut df = csv_reader.finish()?;
151
152
// Important that this rechunk is never done in parallel.
153
// As that leads to great memory overhead.
154
if rechunk && df.first_col_n_chunks() > 1 {
155
if low_memory {
156
df.rechunk_mut();
157
} else {
158
df.rechunk_mut_par();
159
}
160
}
161
162
Ok(df)
163
}
164
}
165
166
impl<R: MmapBytesReader> CsvReader<R> {
167
/// Sets custom CSV read options.
168
pub fn with_options(mut self, options: CsvReadOptions) -> Self {
169
self.options = options;
170
self
171
}
172
}
173
174
/// Splits datatypes that cannot be natively read into a `fields_to_cast` for
175
/// post-read casting.
176
pub fn prepare_csv_schema(
177
schema: &mut SchemaRef,
178
fields_to_cast: &mut Vec<Field>,
179
) -> PolarsResult<()> {
180
// This branch we check if there are dtypes we cannot parse.
181
// We only support a few dtypes in the parser and later cast to the required dtype.
182
let mut changed = false;
183
184
let new_schema = schema
185
.iter_fields()
186
.map(|mut fld| {
187
use DataType::*;
188
189
let mut matched = true;
190
191
let out = match fld.dtype() {
192
Time => {
193
fields_to_cast.push(fld.clone());
194
fld.coerce(String);
195
PolarsResult::Ok(fld)
196
},
197
_ => {
198
matched = false;
199
PolarsResult::Ok(fld)
200
},
201
}?;
202
203
changed |= matched;
204
205
PolarsResult::Ok(out)
206
})
207
.collect::<PolarsResult<Schema>>()?;
208
209
if changed {
210
*schema = Arc::new(new_schema);
211
}
212
213
Ok(())
214
}
215
216