Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-lazy/src/scan/csv.rs
6939 views
1
use polars_core::prelude::*;
2
use polars_io::cloud::CloudOptions;
3
use polars_io::csv::read::{
4
CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues, infer_file_schema,
5
};
6
use polars_io::path_utils::expand_paths;
7
use polars_io::utils::compression::maybe_decompress_bytes;
8
use polars_io::utils::get_reader_bytes;
9
use polars_io::{HiveOptions, RowIndex};
10
use polars_utils::mmap::MemSlice;
11
use polars_utils::plpath::PlPath;
12
use polars_utils::slice_enum::Slice;
13
14
use crate::prelude::*;
15
16
#[derive(Clone)]
17
#[cfg(feature = "csv")]
18
pub struct LazyCsvReader {
19
sources: ScanSources,
20
glob: bool,
21
cache: bool,
22
read_options: CsvReadOptions,
23
cloud_options: Option<CloudOptions>,
24
include_file_paths: Option<PlSmallStr>,
25
}
26
27
#[cfg(feature = "csv")]
28
impl LazyCsvReader {
29
/// Re-export to shorten code.
30
pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
31
mut self,
32
map_func: F,
33
) -> Self {
34
self.read_options = self.read_options.map_parse_options(map_func);
35
self
36
}
37
38
pub fn new_paths(paths: Arc<[PlPath]>) -> Self {
39
Self::new_with_sources(ScanSources::Paths(paths))
40
}
41
42
pub fn new_with_sources(sources: ScanSources) -> Self {
43
LazyCsvReader {
44
sources,
45
glob: true,
46
cache: true,
47
read_options: Default::default(),
48
cloud_options: Default::default(),
49
include_file_paths: None,
50
}
51
}
52
53
pub fn new(path: PlPath) -> Self {
54
Self::new_with_sources(ScanSources::Paths([path].into()))
55
}
56
57
/// Skip this number of rows after the header location.
58
#[must_use]
59
pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
60
self.read_options.skip_rows_after_header = offset;
61
self
62
}
63
64
/// Add a row index column.
65
#[must_use]
66
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
67
self.read_options.row_index = row_index;
68
self
69
}
70
71
/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
72
/// be guaranteed.
73
#[must_use]
74
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
75
self.read_options.n_rows = num_rows;
76
self
77
}
78
79
/// Set the number of rows to use when inferring the csv schema.
80
/// The default is 100 rows.
81
/// Setting to [None] will do a full table scan, which is very slow.
82
#[must_use]
83
pub fn with_infer_schema_length(mut self, num_rows: Option<usize>) -> Self {
84
self.read_options.infer_schema_length = num_rows;
85
self
86
}
87
88
/// Continue with next batch when a ParserError is encountered.
89
#[must_use]
90
pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
91
self.read_options.ignore_errors = ignore;
92
self
93
}
94
95
/// Set the CSV file's schema
96
#[must_use]
97
pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
98
self.read_options.schema = schema;
99
self
100
}
101
102
/// Skip the first `n` rows during parsing. The header will be parsed at row `n`.
103
/// Note that by row we mean valid CSV, encoding and comments are respected.
104
#[must_use]
105
pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
106
self.read_options.skip_rows = skip_rows;
107
self
108
}
109
110
/// Skip the first `n` lines during parsing. The header will be parsed at line `n`.
111
/// We don't respect CSV escaping when skipping lines.
112
#[must_use]
113
pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
114
self.read_options.skip_lines = skip_lines;
115
self
116
}
117
118
/// Overwrite the schema with the dtypes in this given Schema. The given schema may be a subset
119
/// of the total schema.
120
#[must_use]
121
pub fn with_dtype_overwrite(mut self, schema: Option<SchemaRef>) -> Self {
122
self.read_options.schema_overwrite = schema;
123
self
124
}
125
126
/// Set whether the CSV file has headers
127
#[must_use]
128
pub fn with_has_header(mut self, has_header: bool) -> Self {
129
self.read_options.has_header = has_header;
130
self
131
}
132
133
/// Sets the chunk size used by the parser. This influences performance.
134
/// This can be used as a way to reduce memory usage during the parsing at the cost of performance.
135
pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
136
self.read_options.chunk_size = chunk_size;
137
self
138
}
139
140
/// Set the CSV file's column separator as a byte character
141
#[must_use]
142
pub fn with_separator(self, separator: u8) -> Self {
143
self.map_parse_options(|opts| opts.with_separator(separator))
144
}
145
146
/// Set the comment prefix for this instance. Lines starting with this prefix will be ignored.
147
#[must_use]
148
pub fn with_comment_prefix(self, comment_prefix: Option<PlSmallStr>) -> Self {
149
self.map_parse_options(|opts| {
150
opts.with_comment_prefix(comment_prefix.clone().map(|s| {
151
if s.len() == 1 && s.chars().next().unwrap().is_ascii() {
152
CommentPrefix::Single(s.as_bytes()[0])
153
} else {
154
CommentPrefix::Multi(s)
155
}
156
}))
157
})
158
}
159
160
/// Set the `char` used as quote char. The default is `b'"'`. If set to [`None`] quoting is disabled.
161
#[must_use]
162
pub fn with_quote_char(self, quote_char: Option<u8>) -> Self {
163
self.map_parse_options(|opts| opts.with_quote_char(quote_char))
164
}
165
166
/// Set the `char` used as end of line. The default is `b'\n'`.
167
#[must_use]
168
pub fn with_eol_char(self, eol_char: u8) -> Self {
169
self.map_parse_options(|opts| opts.with_eol_char(eol_char))
170
}
171
172
/// Set values that will be interpreted as missing/ null.
173
#[must_use]
174
pub fn with_null_values(self, null_values: Option<NullValues>) -> Self {
175
self.map_parse_options(|opts| opts.with_null_values(null_values.clone()))
176
}
177
178
/// Treat missing fields as null.
179
pub fn with_missing_is_null(self, missing_is_null: bool) -> Self {
180
self.map_parse_options(|opts| opts.with_missing_is_null(missing_is_null))
181
}
182
183
/// Cache the DataFrame after reading.
184
#[must_use]
185
pub fn with_cache(mut self, cache: bool) -> Self {
186
self.cache = cache;
187
self
188
}
189
190
/// Reduce memory usage at the expense of performance
191
#[must_use]
192
pub fn with_low_memory(mut self, low_memory: bool) -> Self {
193
self.read_options.low_memory = low_memory;
194
self
195
}
196
197
/// Set [`CsvEncoding`]
198
#[must_use]
199
pub fn with_encoding(self, encoding: CsvEncoding) -> Self {
200
self.map_parse_options(|opts| opts.with_encoding(encoding))
201
}
202
203
/// Automatically try to parse dates/datetimes and time.
204
/// If parsing fails, columns remain of dtype [`DataType::String`].
205
#[cfg(feature = "temporal")]
206
pub fn with_try_parse_dates(self, try_parse_dates: bool) -> Self {
207
self.map_parse_options(|opts| opts.with_try_parse_dates(try_parse_dates))
208
}
209
210
/// Raise an error if CSV is empty (otherwise return an empty frame)
211
#[must_use]
212
pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
213
self.read_options.raise_if_empty = raise_if_empty;
214
self
215
}
216
217
/// Truncate lines that are longer than the schema.
218
#[must_use]
219
pub fn with_truncate_ragged_lines(self, truncate_ragged_lines: bool) -> Self {
220
self.map_parse_options(|opts| opts.with_truncate_ragged_lines(truncate_ragged_lines))
221
}
222
223
#[must_use]
224
pub fn with_decimal_comma(self, decimal_comma: bool) -> Self {
225
self.map_parse_options(|opts| opts.with_decimal_comma(decimal_comma))
226
}
227
228
#[must_use]
229
/// Expand path given via globbing rules.
230
pub fn with_glob(mut self, toggle: bool) -> Self {
231
self.glob = toggle;
232
self
233
}
234
235
pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
236
self.cloud_options = cloud_options;
237
self
238
}
239
240
/// Modify a schema before we run the lazy scanning.
241
///
242
/// Important! Run this function latest in the builder!
243
pub fn with_schema_modify<F>(mut self, f: F) -> PolarsResult<Self>
244
where
245
F: Fn(Schema) -> PolarsResult<Schema>,
246
{
247
let n_threads = self.read_options.n_threads;
248
249
let infer_schema = |bytes: MemSlice| {
250
let skip_rows = self.read_options.skip_rows;
251
let skip_lines = self.read_options.skip_lines;
252
let parse_options = self.read_options.get_parse_options();
253
254
let mut owned = vec![];
255
let bytes = maybe_decompress_bytes(bytes.as_ref(), &mut owned)?;
256
257
PolarsResult::Ok(
258
infer_file_schema(
259
&get_reader_bytes(&mut std::io::Cursor::new(bytes))?,
260
&parse_options,
261
self.read_options.infer_schema_length,
262
self.read_options.has_header,
263
// we set it to None and modify them after the schema is updated
264
None,
265
skip_rows,
266
skip_lines,
267
self.read_options.skip_rows_after_header,
268
self.read_options.raise_if_empty,
269
)?
270
.0,
271
)
272
};
273
274
let schema = match self.sources.clone() {
275
ScanSources::Paths(paths) => {
276
// TODO: Path expansion should happen when converting to the IR
277
// https://github.com/pola-rs/polars/issues/17634
278
let paths = expand_paths(&paths[..], self.glob(), self.cloud_options())?;
279
280
let Some(path) = paths.first() else {
281
polars_bail!(ComputeError: "no paths specified for this reader");
282
};
283
284
infer_schema(MemSlice::from_file(&polars_utils::open_file(
285
path.as_ref().as_local_path().unwrap(),
286
)?)?)?
287
},
288
ScanSources::Files(files) => {
289
let Some(file) = files.first() else {
290
polars_bail!(ComputeError: "no buffers specified for this reader");
291
};
292
293
infer_schema(MemSlice::from_file(file)?)?
294
},
295
ScanSources::Buffers(buffers) => {
296
let Some(buffer) = buffers.first() else {
297
polars_bail!(ComputeError: "no buffers specified for this reader");
298
};
299
300
infer_schema(buffer.clone())?
301
},
302
};
303
304
self.read_options.n_threads = n_threads;
305
let mut schema = f(schema)?;
306
307
// the dtypes set may be for the new names, so update again
308
if let Some(overwrite_schema) = &self.read_options.schema_overwrite {
309
for (name, dtype) in overwrite_schema.iter() {
310
schema.with_column(name.clone(), dtype.clone());
311
}
312
}
313
314
Ok(self.with_schema(Some(Arc::new(schema))))
315
}
316
317
pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
318
self.include_file_paths = include_file_paths;
319
self
320
}
321
}
322
323
impl LazyFileListReader for LazyCsvReader {
324
/// Get the final [LazyFrame].
325
fn finish(self) -> PolarsResult<LazyFrame> {
326
let rechunk = self.rechunk();
327
let row_index = self.row_index().cloned();
328
let pre_slice = self.n_rows().map(|len| Slice::Positive { offset: 0, len });
329
330
let lf: LazyFrame = DslBuilder::scan_csv(
331
self.sources,
332
self.read_options,
333
UnifiedScanArgs {
334
schema: None,
335
cloud_options: self.cloud_options,
336
hive_options: HiveOptions::new_disabled(),
337
rechunk,
338
cache: self.cache,
339
glob: self.glob,
340
projection: None,
341
column_mapping: None,
342
default_values: None,
343
row_index,
344
pre_slice,
345
cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
346
missing_columns_policy: MissingColumnsPolicy::Raise,
347
extra_columns_policy: ExtraColumnsPolicy::Raise,
348
include_file_paths: self.include_file_paths,
349
deletion_files: None,
350
},
351
)?
352
.build()
353
.into();
354
Ok(lf)
355
}
356
357
fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
358
unreachable!();
359
}
360
361
fn glob(&self) -> bool {
362
self.glob
363
}
364
365
fn sources(&self) -> &ScanSources {
366
&self.sources
367
}
368
369
fn with_sources(mut self, sources: ScanSources) -> Self {
370
self.sources = sources;
371
self
372
}
373
374
fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
375
self.read_options.n_rows = n_rows.into();
376
self
377
}
378
379
fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
380
self.read_options.row_index = row_index.into();
381
self
382
}
383
384
fn rechunk(&self) -> bool {
385
self.read_options.rechunk
386
}
387
388
/// Rechunk the memory to contiguous chunks when parsing is done.
389
fn with_rechunk(mut self, rechunk: bool) -> Self {
390
self.read_options.rechunk = rechunk;
391
self
392
}
393
394
/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
395
/// be guaranteed.
396
fn n_rows(&self) -> Option<usize> {
397
self.read_options.n_rows
398
}
399
400
/// Return the row index settings.
401
fn row_index(&self) -> Option<&RowIndex> {
402
self.read_options.row_index.as_ref()
403
}
404
405
fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
406
// set to false, as the csv parser has full thread utilization
407
let args = UnionArgs {
408
rechunk: self.rechunk(),
409
parallel: false,
410
to_supertypes: false,
411
from_partitioned_ds: true,
412
..Default::default()
413
};
414
concat_impl(&lfs, args)
415
}
416
417
/// [CloudOptions] used to list files.
418
fn cloud_options(&self) -> Option<&CloudOptions> {
419
self.cloud_options.as_ref()
420
}
421
}
422
423