Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-lazy/src/scan/csv.rs
8446 views
1
#[cfg(feature = "csv")]
2
use polars_buffer::Buffer;
3
use polars_core::prelude::*;
4
use polars_io::cloud::CloudOptions;
5
use polars_io::csv::read::{
6
CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues,
7
read_until_start_and_infer_schema,
8
};
9
use polars_io::path_utils::expand_paths;
10
use polars_io::utils::compression::CompressedReader;
11
use polars_io::{HiveOptions, RowIndex};
12
use polars_utils::mmap::MMapSemaphore;
13
use polars_utils::pl_path::PlRefPath;
14
use polars_utils::slice_enum::Slice;
15
16
use crate::prelude::*;
17
18
#[derive(Clone)]
19
#[cfg(feature = "csv")]
20
pub struct LazyCsvReader {
21
sources: ScanSources,
22
glob: bool,
23
cache: bool,
24
read_options: CsvReadOptions,
25
cloud_options: Option<CloudOptions>,
26
include_file_paths: Option<PlSmallStr>,
27
}
28
29
#[cfg(feature = "csv")]
30
impl LazyCsvReader {
31
/// Re-export to shorten code.
32
pub fn map_parse_options<F: Fn(CsvParseOptions) -> CsvParseOptions>(
33
mut self,
34
map_func: F,
35
) -> Self {
36
self.read_options = self.read_options.map_parse_options(map_func);
37
self
38
}
39
40
pub fn new_paths(paths: Buffer<PlRefPath>) -> Self {
41
Self::new_with_sources(ScanSources::Paths(paths))
42
}
43
44
pub fn new_with_sources(sources: ScanSources) -> Self {
45
LazyCsvReader {
46
sources,
47
glob: true,
48
cache: true,
49
read_options: Default::default(),
50
cloud_options: Default::default(),
51
include_file_paths: None,
52
}
53
}
54
55
pub fn new(path: PlRefPath) -> Self {
56
Self::new_with_sources(ScanSources::Paths(Buffer::from_iter([path])))
57
}
58
59
/// Skip this number of rows after the header location.
60
#[must_use]
61
pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
62
self.read_options.skip_rows_after_header = offset;
63
self
64
}
65
66
/// Add a row index column.
67
#[must_use]
68
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
69
self.read_options.row_index = row_index;
70
self
71
}
72
73
/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
74
/// be guaranteed.
75
#[must_use]
76
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
77
self.read_options.n_rows = num_rows;
78
self
79
}
80
81
/// Sets the number of threads used for CSV parsing.
82
#[must_use]
83
pub fn with_n_threads(mut self, n_threads: Option<usize>) -> Self {
84
self.read_options.n_threads = n_threads;
85
self
86
}
87
88
/// Set the number of rows to use when inferring the csv schema.
89
/// The default is 100 rows.
90
/// Setting to [None] will do a full table scan, which is very slow.
91
#[must_use]
92
pub fn with_infer_schema_length(mut self, num_rows: Option<usize>) -> Self {
93
self.read_options.infer_schema_length = num_rows;
94
self
95
}
96
97
/// Continue with next batch when a ParserError is encountered.
98
#[must_use]
99
pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
100
self.read_options.ignore_errors = ignore;
101
self
102
}
103
104
/// Set the CSV file's schema
105
#[must_use]
106
pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
107
self.read_options.schema = schema;
108
self
109
}
110
111
/// Skip the first `n` rows during parsing. The header will be parsed at row `n`.
112
/// Note that by row we mean valid CSV, encoding and comments are respected.
113
#[must_use]
114
pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
115
self.read_options.skip_rows = skip_rows;
116
self
117
}
118
119
/// Skip the first `n` lines during parsing. The header will be parsed at line `n`.
120
/// We don't respect CSV escaping when skipping lines.
121
#[must_use]
122
pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
123
self.read_options.skip_lines = skip_lines;
124
self
125
}
126
127
/// Overwrite the schema with the dtypes in this given Schema. The given schema may be a subset
128
/// of the total schema.
129
#[must_use]
130
pub fn with_dtype_overwrite(mut self, schema: Option<SchemaRef>) -> Self {
131
self.read_options.schema_overwrite = schema;
132
self
133
}
134
135
/// Set whether the CSV file has headers
136
#[must_use]
137
pub fn with_has_header(mut self, has_header: bool) -> Self {
138
self.read_options.has_header = has_header;
139
self
140
}
141
142
/// Sets the chunk size used by the parser. This influences performance.
143
/// This can be used as a way to reduce memory usage during the parsing at the cost of performance.
144
pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
145
self.read_options.chunk_size = chunk_size;
146
self
147
}
148
149
/// Set the CSV file's column separator as a byte character
150
#[must_use]
151
pub fn with_separator(self, separator: u8) -> Self {
152
self.map_parse_options(|opts| opts.with_separator(separator))
153
}
154
155
/// Set the comment prefix for this instance. Lines starting with this prefix will be ignored.
156
#[must_use]
157
pub fn with_comment_prefix(self, comment_prefix: Option<PlSmallStr>) -> Self {
158
self.map_parse_options(|opts| {
159
opts.with_comment_prefix(comment_prefix.clone().map(|s| {
160
if s.len() == 1 && s.chars().next().unwrap().is_ascii() {
161
CommentPrefix::Single(s.as_bytes()[0])
162
} else {
163
CommentPrefix::Multi(s)
164
}
165
}))
166
})
167
}
168
169
/// Set the `char` used as quote char. The default is `b'"'`. If set to [`None`] quoting is disabled.
170
#[must_use]
171
pub fn with_quote_char(self, quote_char: Option<u8>) -> Self {
172
self.map_parse_options(|opts| opts.with_quote_char(quote_char))
173
}
174
175
/// Set the `char` used as end of line. The default is `b'\n'`.
176
#[must_use]
177
pub fn with_eol_char(self, eol_char: u8) -> Self {
178
self.map_parse_options(|opts| opts.with_eol_char(eol_char))
179
}
180
181
/// Set values that will be interpreted as missing/ null.
182
#[must_use]
183
pub fn with_null_values(self, null_values: Option<NullValues>) -> Self {
184
self.map_parse_options(|opts| opts.with_null_values(null_values.clone()))
185
}
186
187
/// Treat missing fields as null.
188
pub fn with_missing_is_null(self, missing_is_null: bool) -> Self {
189
self.map_parse_options(|opts| opts.with_missing_is_null(missing_is_null))
190
}
191
192
/// Cache the DataFrame after reading.
193
#[must_use]
194
pub fn with_cache(mut self, cache: bool) -> Self {
195
self.cache = cache;
196
self
197
}
198
199
/// Reduce memory usage at the expense of performance
200
#[must_use]
201
pub fn with_low_memory(mut self, low_memory: bool) -> Self {
202
self.read_options.low_memory = low_memory;
203
self
204
}
205
206
/// Set [`CsvEncoding`]
207
#[must_use]
208
pub fn with_encoding(self, encoding: CsvEncoding) -> Self {
209
self.map_parse_options(|opts| opts.with_encoding(encoding))
210
}
211
212
/// Automatically try to parse dates/datetimes and time.
213
/// If parsing fails, columns remain of dtype [`DataType::String`].
214
#[cfg(feature = "temporal")]
215
pub fn with_try_parse_dates(self, try_parse_dates: bool) -> Self {
216
self.map_parse_options(|opts| opts.with_try_parse_dates(try_parse_dates))
217
}
218
219
/// Raise an error if CSV is empty (otherwise return an empty frame)
220
#[must_use]
221
pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self {
222
self.read_options.raise_if_empty = raise_if_empty;
223
self
224
}
225
226
/// Truncate lines that are longer than the schema.
227
#[must_use]
228
pub fn with_truncate_ragged_lines(self, truncate_ragged_lines: bool) -> Self {
229
self.map_parse_options(|opts| opts.with_truncate_ragged_lines(truncate_ragged_lines))
230
}
231
232
#[must_use]
233
pub fn with_decimal_comma(self, decimal_comma: bool) -> Self {
234
self.map_parse_options(|opts| opts.with_decimal_comma(decimal_comma))
235
}
236
237
#[must_use]
238
/// Expand path given via globbing rules.
239
pub fn with_glob(mut self, toggle: bool) -> Self {
240
self.glob = toggle;
241
self
242
}
243
244
pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
245
self.cloud_options = cloud_options;
246
self
247
}
248
249
/// Modify a schema before we run the lazy scanning.
250
///
251
/// Important! Run this function latest in the builder!
252
pub fn with_schema_modify<F>(mut self, f: F) -> PolarsResult<Self>
253
where
254
F: Fn(Schema) -> PolarsResult<Schema>,
255
{
256
let n_threads = self.read_options.n_threads;
257
258
let infer_schema = |bytes: Buffer<u8>| {
259
let mut reader = CompressedReader::try_new(bytes)?;
260
261
let (inferred_schema, _) =
262
read_until_start_and_infer_schema(&self.read_options, None, None, &mut reader)?;
263
264
PolarsResult::Ok(inferred_schema)
265
};
266
267
let schema = match self.sources.clone() {
268
ScanSources::Paths(paths) => {
269
// TODO: Path expansion should happen when converting to the IR
270
// https://github.com/pola-rs/polars/issues/17634
271
272
use polars_io::pl_async::get_runtime;
273
274
let paths = get_runtime().block_on(expand_paths(
275
&paths[..],
276
self.glob(),
277
&[], // hidden_file_prefix
278
&mut self.cloud_options,
279
))?;
280
281
let Some(path) = paths.first() else {
282
polars_bail!(ComputeError: "no paths specified for this reader");
283
};
284
285
let file = polars_utils::open_file(path.as_std_path())?;
286
let mmap = MMapSemaphore::new_from_file(&file)?;
287
infer_schema(Buffer::from_owner(mmap))?
288
},
289
ScanSources::Files(files) => {
290
let Some(file) = files.first() else {
291
polars_bail!(ComputeError: "no buffers specified for this reader");
292
};
293
294
let mmap = MMapSemaphore::new_from_file(file)?;
295
infer_schema(Buffer::from_owner(mmap))?
296
},
297
ScanSources::Buffers(buffers) => {
298
let Some(buffer) = buffers.first() else {
299
polars_bail!(ComputeError: "no buffers specified for this reader");
300
};
301
302
infer_schema(buffer.clone())?
303
},
304
};
305
306
self.read_options.n_threads = n_threads;
307
let mut schema = f(schema)?;
308
309
// the dtypes set may be for the new names, so update again
310
if let Some(overwrite_schema) = &self.read_options.schema_overwrite {
311
for (name, dtype) in overwrite_schema.iter() {
312
schema.with_column(name.clone(), dtype.clone());
313
}
314
}
315
316
Ok(self.with_schema(Some(Arc::new(schema))))
317
}
318
319
pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
320
self.include_file_paths = include_file_paths;
321
self
322
}
323
}
324
325
impl LazyFileListReader for LazyCsvReader {
326
/// Get the final [LazyFrame].
327
fn finish(self) -> PolarsResult<LazyFrame> {
328
let rechunk = self.rechunk();
329
let row_index = self.row_index().cloned();
330
let pre_slice = self.n_rows().map(|len| Slice::Positive { offset: 0, len });
331
332
let lf: LazyFrame = DslBuilder::scan_csv(
333
self.sources,
334
self.read_options,
335
UnifiedScanArgs {
336
schema: None,
337
cloud_options: self.cloud_options,
338
hive_options: HiveOptions::new_disabled(),
339
rechunk,
340
cache: self.cache,
341
glob: self.glob,
342
hidden_file_prefix: None,
343
projection: None,
344
column_mapping: None,
345
default_values: None,
346
row_index,
347
pre_slice,
348
cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
349
missing_columns_policy: MissingColumnsPolicy::Raise,
350
extra_columns_policy: ExtraColumnsPolicy::Raise,
351
include_file_paths: self.include_file_paths,
352
deletion_files: None,
353
table_statistics: None,
354
row_count: None,
355
},
356
)?
357
.build()
358
.into();
359
Ok(lf)
360
}
361
362
fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
363
unreachable!();
364
}
365
366
fn glob(&self) -> bool {
367
self.glob
368
}
369
370
fn sources(&self) -> &ScanSources {
371
&self.sources
372
}
373
374
fn with_sources(mut self, sources: ScanSources) -> Self {
375
self.sources = sources;
376
self
377
}
378
379
fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
380
self.read_options.n_rows = n_rows.into();
381
self
382
}
383
384
fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
385
self.read_options.row_index = row_index.into();
386
self
387
}
388
389
fn rechunk(&self) -> bool {
390
self.read_options.rechunk
391
}
392
393
/// Rechunk the memory to contiguous chunks when parsing is done.
394
fn with_rechunk(mut self, rechunk: bool) -> Self {
395
self.read_options.rechunk = rechunk;
396
self
397
}
398
399
/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
400
/// be guaranteed.
401
fn n_rows(&self) -> Option<usize> {
402
self.read_options.n_rows
403
}
404
405
/// Return the row index settings.
406
fn row_index(&self) -> Option<&RowIndex> {
407
self.read_options.row_index.as_ref()
408
}
409
410
fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
411
// set to false, as the csv parser has full thread utilization
412
let args = UnionArgs {
413
rechunk: self.rechunk(),
414
parallel: false,
415
to_supertypes: false,
416
from_partitioned_ds: true,
417
..Default::default()
418
};
419
concat_impl(&lfs, args)
420
}
421
422
/// [CloudOptions] used to list files.
423
fn cloud_options(&self) -> Option<&CloudOptions> {
424
self.cloud_options.as_ref()
425
}
426
}
427
428