Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/csv/read/read_impl.rs
6939 views
1
pub(super) mod batched;
2
3
use std::fmt;
4
use std::sync::Mutex;
5
6
use polars_core::POOL;
7
use polars_core::prelude::*;
8
use polars_core::utils::{accumulate_dataframes_vertical, handle_casting_failures};
9
#[cfg(feature = "polars-time")]
10
use polars_time::prelude::*;
11
use polars_utils::relaxed_cell::RelaxedCell;
12
use rayon::prelude::*;
13
14
use super::CsvParseOptions;
15
use super::buffer::init_buffers;
16
use super::options::{CommentPrefix, CsvEncoding, NullValuesCompiled};
17
use super::parser::{
18
CountLines, SplitLines, is_comment_line, parse_lines, skip_bom, skip_line_ending,
19
skip_lines_naive, skip_this_line,
20
};
21
use super::reader::prepare_csv_schema;
22
use super::schema_inference::infer_file_schema;
23
#[cfg(feature = "decompress")]
24
use super::utils::decompress;
25
use crate::RowIndex;
26
use crate::csv::read::parser::skip_this_line_naive;
27
use crate::mmap::ReaderBytes;
28
use crate::predicates::PhysicalIoExpr;
29
use crate::utils::compression::SupportedCompression;
30
use crate::utils::update_row_counts2;
31
32
pub fn cast_columns(
33
df: &mut DataFrame,
34
to_cast: &[Field],
35
parallel: bool,
36
ignore_errors: bool,
37
) -> PolarsResult<()> {
38
let cast_fn = |c: &Column, fld: &Field| {
39
let out = match (c.dtype(), fld.dtype()) {
40
#[cfg(feature = "temporal")]
41
(DataType::String, DataType::Date) => c
42
.str()
43
.unwrap()
44
.as_date(None, false)
45
.map(|ca| ca.into_column()),
46
#[cfg(feature = "temporal")]
47
(DataType::String, DataType::Time) => c
48
.str()
49
.unwrap()
50
.as_time(None, false)
51
.map(|ca| ca.into_column()),
52
#[cfg(feature = "temporal")]
53
(DataType::String, DataType::Datetime(tu, _)) => c
54
.str()
55
.unwrap()
56
.as_datetime(
57
None,
58
*tu,
59
false,
60
false,
61
None,
62
&StringChunked::from_iter(std::iter::once("raise")),
63
)
64
.map(|ca| ca.into_column()),
65
(_, dt) => c.cast(dt),
66
}?;
67
if !ignore_errors && c.null_count() != out.null_count() {
68
handle_casting_failures(c.as_materialized_series(), out.as_materialized_series())?;
69
}
70
Ok(out)
71
};
72
73
if parallel {
74
let cols = POOL.install(|| {
75
df.get_columns()
76
.into_par_iter()
77
.map(|s| {
78
if let Some(fld) = to_cast.iter().find(|fld| fld.name() == s.name()) {
79
cast_fn(s, fld)
80
} else {
81
Ok(s.clone())
82
}
83
})
84
.collect::<PolarsResult<Vec<_>>>()
85
})?;
86
*df = unsafe { DataFrame::new_no_checks(df.height(), cols) }
87
} else {
88
// cast to the original dtypes in the schema
89
for fld in to_cast {
90
// field may not be projected
91
if let Some(idx) = df.get_column_index(fld.name()) {
92
df.try_apply_at_idx(idx, |s| cast_fn(s, fld))?;
93
}
94
}
95
96
df.clear_schema();
97
}
98
Ok(())
99
}
100
101
/// CSV file reader
102
pub(crate) struct CoreReader<'a> {
103
reader_bytes: Option<ReaderBytes<'a>>,
104
/// Explicit schema for the CSV file
105
schema: SchemaRef,
106
parse_options: CsvParseOptions,
107
/// Optional projection for which columns to load (zero-based column indices)
108
projection: Option<Vec<usize>>,
109
/// Current line number, used in error reporting
110
current_line: usize,
111
ignore_errors: bool,
112
skip_lines: usize,
113
skip_rows_before_header: usize,
114
// after the header, we need to take embedded lines into account
115
skip_rows_after_header: usize,
116
n_rows: Option<usize>,
117
n_threads: Option<usize>,
118
has_header: bool,
119
chunk_size: usize,
120
null_values: Option<NullValuesCompiled>,
121
predicate: Option<Arc<dyn PhysicalIoExpr>>,
122
to_cast: Vec<Field>,
123
row_index: Option<RowIndex>,
124
}
125
126
impl fmt::Debug for CoreReader<'_> {
127
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128
f.debug_struct("Reader")
129
.field("schema", &self.schema)
130
.field("projection", &self.projection)
131
.field("current_line", &self.current_line)
132
.finish()
133
}
134
}
135
136
impl<'a> CoreReader<'a> {
137
#[allow(clippy::too_many_arguments)]
138
pub(crate) fn new(
139
reader_bytes: ReaderBytes<'a>,
140
parse_options: Arc<CsvParseOptions>,
141
n_rows: Option<usize>,
142
skip_rows: usize,
143
skip_lines: usize,
144
mut projection: Option<Vec<usize>>,
145
max_records: Option<usize>,
146
has_header: bool,
147
ignore_errors: bool,
148
schema: Option<SchemaRef>,
149
columns: Option<Arc<[PlSmallStr]>>,
150
n_threads: Option<usize>,
151
schema_overwrite: Option<SchemaRef>,
152
dtype_overwrite: Option<Arc<Vec<DataType>>>,
153
chunk_size: usize,
154
predicate: Option<Arc<dyn PhysicalIoExpr>>,
155
mut to_cast: Vec<Field>,
156
skip_rows_after_header: usize,
157
row_index: Option<RowIndex>,
158
raise_if_empty: bool,
159
) -> PolarsResult<CoreReader<'a>> {
160
let separator = parse_options.separator;
161
162
#[cfg(feature = "decompress")]
163
let mut reader_bytes = reader_bytes;
164
165
if !cfg!(feature = "decompress") && SupportedCompression::check(&reader_bytes).is_some() {
166
polars_bail!(
167
ComputeError: "cannot read compressed CSV file; \
168
compile with feature 'decompress'"
169
);
170
}
171
// We keep track of the inferred schema bool
172
// In case the file is compressed this schema inference is wrong and has to be done
173
// again after decompression.
174
#[cfg(feature = "decompress")]
175
{
176
let total_n_rows =
177
n_rows.map(|n| skip_rows + (has_header as usize) + skip_rows_after_header + n);
178
if let Some(b) = decompress(
179
&reader_bytes,
180
total_n_rows,
181
separator,
182
parse_options.quote_char,
183
parse_options.eol_char,
184
) {
185
reader_bytes = ReaderBytes::Owned(b.into());
186
}
187
}
188
189
let mut schema = match schema {
190
Some(schema) => schema,
191
None => {
192
let (inferred_schema, _, _) = infer_file_schema(
193
&reader_bytes,
194
&parse_options,
195
max_records,
196
has_header,
197
schema_overwrite.as_deref(),
198
skip_rows,
199
skip_lines,
200
skip_rows_after_header,
201
raise_if_empty,
202
)?;
203
Arc::new(inferred_schema)
204
},
205
};
206
if let Some(dtypes) = dtype_overwrite {
207
polars_ensure!(
208
dtypes.len() <= schema.len(),
209
InvalidOperation: "The number of schema overrides must be less than or equal to the number of fields"
210
);
211
let s = Arc::make_mut(&mut schema);
212
for (index, dt) in dtypes.iter().enumerate() {
213
s.set_dtype_at_index(index, dt.clone()).unwrap();
214
}
215
}
216
217
prepare_csv_schema(&mut schema, &mut to_cast)?;
218
219
// Create a null value for every column
220
let null_values = parse_options
221
.null_values
222
.as_ref()
223
.map(|nv| nv.clone().compile(&schema))
224
.transpose()?;
225
226
if let Some(cols) = columns {
227
let mut prj = Vec::with_capacity(cols.len());
228
for col in cols.as_ref() {
229
let i = schema.try_index_of(col)?;
230
prj.push(i);
231
}
232
projection = Some(prj);
233
}
234
235
Ok(CoreReader {
236
reader_bytes: Some(reader_bytes),
237
parse_options: (*parse_options).clone(),
238
schema,
239
projection,
240
current_line: usize::from(has_header),
241
ignore_errors,
242
skip_lines,
243
skip_rows_before_header: skip_rows,
244
skip_rows_after_header,
245
n_rows,
246
n_threads,
247
has_header,
248
chunk_size,
249
null_values,
250
predicate,
251
to_cast,
252
row_index,
253
})
254
}
255
256
fn find_starting_point<'b>(
257
&self,
258
bytes: &'b [u8],
259
quote_char: Option<u8>,
260
eol_char: u8,
261
) -> PolarsResult<(&'b [u8], Option<usize>)> {
262
let i = find_starting_point(
263
bytes,
264
quote_char,
265
eol_char,
266
self.schema.len(),
267
self.skip_lines,
268
self.skip_rows_before_header,
269
self.skip_rows_after_header,
270
self.parse_options.comment_prefix.as_ref(),
271
self.has_header,
272
)?;
273
274
Ok((&bytes[i..], (i <= bytes.len()).then_some(i)))
275
}
276
277
fn get_projection(&mut self) -> PolarsResult<Vec<usize>> {
278
// we also need to sort the projection to have predictable output.
279
// the `parse_lines` function expects this.
280
self.projection
281
.take()
282
.map(|mut v| {
283
v.sort_unstable();
284
if let Some(idx) = v.last() {
285
polars_ensure!(*idx < self.schema.len(), OutOfBounds: "projection index: {} is out of bounds for csv schema with length: {}", idx, self.schema.len())
286
}
287
Ok(v)
288
})
289
.unwrap_or_else(|| Ok((0..self.schema.len()).collect()))
290
}
291
292
fn read_chunk(
293
&self,
294
bytes: &[u8],
295
projection: &[usize],
296
bytes_offset: usize,
297
capacity: usize,
298
starting_point_offset: Option<usize>,
299
stop_at_nbytes: usize,
300
) -> PolarsResult<DataFrame> {
301
let mut df = read_chunk(
302
bytes,
303
&self.parse_options,
304
self.schema.as_ref(),
305
self.ignore_errors,
306
projection,
307
bytes_offset,
308
capacity,
309
self.null_values.as_ref(),
310
usize::MAX,
311
stop_at_nbytes,
312
starting_point_offset,
313
)?;
314
315
cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
316
Ok(df)
317
}
318
319
// The code adheres to RFC 4180 in a strict sense, unless explicitly documented otherwise.
320
// Malformed CSV is common, see e.g. the use of lazy_quotes, whitespace and comments.
321
// In case malformed CSV is detected, a warning or an error will be issued.
322
// Not all malformed CSV will be detected, as that would impact performance.
323
fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {
324
let (bytes, _) = self.find_starting_point(
325
bytes,
326
self.parse_options.quote_char,
327
self.parse_options.eol_char,
328
)?;
329
330
let projection = self.get_projection()?;
331
332
// An empty file with a schema should return an empty DataFrame with that schema
333
if bytes.is_empty() {
334
let mut df = if projection.len() == self.schema.len() {
335
DataFrame::empty_with_schema(self.schema.as_ref())
336
} else {
337
DataFrame::empty_with_schema(
338
&projection
339
.iter()
340
.map(|&i| self.schema.get_at_index(i).unwrap())
341
.map(|(name, dtype)| Field {
342
name: name.clone(),
343
dtype: dtype.clone(),
344
})
345
.collect::<Schema>(),
346
)
347
};
348
349
cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
350
351
if let Some(ref row_index) = self.row_index {
352
df.insert_column(0, Series::new_empty(row_index.name.clone(), &IDX_DTYPE))?;
353
}
354
return Ok(df);
355
}
356
357
let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
358
359
// This is chosen by benchmarking on ny city trip csv dataset.
360
// We want small enough chunks such that threads start working as soon as possible
361
// But we also want them large enough, so that we have less chunks related overhead, but
362
// We minimize chunks to 16 MB to still fit L3 cache.
363
let n_parts_hint = n_threads * 16;
364
let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);
365
366
// Use a small min chunk size to catch failures in tests.
367
#[cfg(debug_assertions)]
368
let min_chunk_size = 64;
369
#[cfg(not(debug_assertions))]
370
let min_chunk_size = 1024 * 4;
371
372
let mut chunk_size = std::cmp::max(chunk_size, min_chunk_size);
373
let mut total_bytes_offset = 0;
374
375
let results = Arc::new(Mutex::new(vec![]));
376
// We have to do this after parsing as there can be comments.
377
let total_line_count = &RelaxedCell::new_usize(0);
378
379
#[cfg(not(target_family = "wasm"))]
380
let pool;
381
#[cfg(not(target_family = "wasm"))]
382
let pool = if n_threads == POOL.current_num_threads() {
383
&POOL
384
} else {
385
pool = rayon::ThreadPoolBuilder::new()
386
.num_threads(n_threads)
387
.build()
388
.map_err(|_| polars_err!(ComputeError: "could not spawn threads"))?;
389
&pool
390
};
391
#[cfg(target_family = "wasm")]
392
let pool = &POOL;
393
394
let counter = CountLines::new(self.parse_options.quote_char, self.parse_options.eol_char);
395
let mut total_offset = 0;
396
let mut previous_total_offset = 0;
397
let check_utf8 = matches!(self.parse_options.encoding, CsvEncoding::Utf8)
398
&& self.schema.iter_fields().any(|f| f.dtype().is_string());
399
400
pool.scope(|s| {
401
// Pass 1: identify chunks for parallel processing (line parsing).
402
loop {
403
let b = unsafe { bytes.get_unchecked(total_offset..) };
404
if b.is_empty() {
405
break;
406
}
407
debug_assert!(
408
total_offset == 0 || bytes[total_offset - 1] == self.parse_options.eol_char
409
);
410
411
// Count is the number of rows for the next chunk. In case of malformed CSV data,
412
// count may not be as expected.
413
let (count, position) = counter.find_next(b, &mut chunk_size);
414
debug_assert!(count == 0 || b[position] == self.parse_options.eol_char);
415
416
let (b, count) = if count == 0
417
&& unsafe {
418
std::ptr::eq(b.as_ptr().add(b.len()), bytes.as_ptr().add(bytes.len()))
419
} {
420
total_offset = bytes.len();
421
(b, 1)
422
} else {
423
if count == 0 {
424
chunk_size *= 2;
425
continue;
426
}
427
428
let end = total_offset + position + 1;
429
let b = unsafe { bytes.get_unchecked(total_offset..end) };
430
431
previous_total_offset = total_offset;
432
total_offset = end;
433
(b, count)
434
};
435
436
// Pass 2: process each individual chunk in parallel (field parsing)
437
if !b.is_empty() {
438
let results = results.clone();
439
let projection = projection.as_ref();
440
let slf = &(*self);
441
s.spawn(move |_| {
442
if check_utf8 && !super::buffer::validate_utf8(b) {
443
let mut results = results.lock().unwrap();
444
results.push((
445
b.as_ptr() as usize,
446
Err(polars_err!(ComputeError: "invalid utf-8 sequence")),
447
));
448
return;
449
}
450
451
let result = slf
452
.read_chunk(b, projection, 0, count, Some(0), b.len())
453
.and_then(|mut df| {
454
455
// Check malformed
456
if df.height() > count || (df.height() < count && slf.parse_options.comment_prefix.is_none()) {
457
// Note: in case data is malformed, df.height() is more likely to be correct than count.
458
let msg = format!("CSV malformed: expected {} rows, actual {} rows, in chunk starting at byte offset {}, length {}",
459
count, df.height(), previous_total_offset, b.len());
460
if slf.ignore_errors {
461
polars_warn!(msg);
462
} else {
463
polars_bail!(ComputeError: msg);
464
}
465
}
466
467
if slf.n_rows.is_some() {
468
total_line_count.fetch_add(df.height());
469
}
470
471
// We cannot use the line count as there can be comments in the lines so we must correct line counts later.
472
if let Some(rc) = &slf.row_index {
473
// is first chunk
474
let offset = if std::ptr::eq(b.as_ptr(), bytes.as_ptr()) {
475
Some(rc.offset)
476
} else {
477
None
478
};
479
480
unsafe { df.with_row_index_mut(rc.name.clone(), offset) };
481
};
482
483
if let Some(predicate) = slf.predicate.as_ref() {
484
let s = predicate.evaluate_io(&df)?;
485
let mask = s.bool()?;
486
df = df.filter(mask)?;
487
}
488
Ok(df)
489
});
490
491
results.lock().unwrap().push((b.as_ptr() as usize, result));
492
});
493
494
// Check just after we spawned a chunk. That mean we processed all data up until
495
// row count.
496
if self.n_rows.is_some()
497
&& total_line_count.load() > self.n_rows.unwrap()
498
{
499
break;
500
}
501
}
502
total_bytes_offset += b.len();
503
}
504
});
505
let mut results = std::mem::take(&mut *results.lock().unwrap());
506
results.sort_unstable_by_key(|k| k.0);
507
let mut dfs = results
508
.into_iter()
509
.map(|k| k.1)
510
.collect::<PolarsResult<Vec<_>>>()?;
511
512
if let Some(rc) = &self.row_index {
513
update_row_counts2(&mut dfs, rc.offset)
514
};
515
accumulate_dataframes_vertical(dfs)
516
}
517
518
/// Read the csv into a DataFrame. The predicate can come from a lazy physical plan.
519
pub fn finish(mut self) -> PolarsResult<DataFrame> {
520
let reader_bytes = self.reader_bytes.take().unwrap();
521
let mut df = self.parse_csv(&reader_bytes)?;
522
523
// if multi-threaded the n_rows was probabilistically determined.
524
// Let's slice to correct number of rows if possible.
525
if let Some(n_rows) = self.n_rows {
526
if n_rows < df.height() {
527
df = df.slice(0, n_rows)
528
}
529
}
530
Ok(df)
531
}
532
}
533
534
#[allow(clippy::too_many_arguments)]
535
pub fn read_chunk(
536
bytes: &[u8],
537
parse_options: &CsvParseOptions,
538
schema: &Schema,
539
ignore_errors: bool,
540
projection: &[usize],
541
bytes_offset_thread: usize,
542
capacity: usize,
543
null_values: Option<&NullValuesCompiled>,
544
chunk_size: usize,
545
stop_at_nbytes: usize,
546
starting_point_offset: Option<usize>,
547
) -> PolarsResult<DataFrame> {
548
let mut read = bytes_offset_thread;
549
// There's an off-by-one error somewhere in the reading code, where it reads
550
// one more item than the requested capacity. Given the batch sizes are
551
// approximate (sometimes they're smaller), this isn't broken, but it does
552
// mean a bunch of extra allocation and copying. So we allocate a
553
// larger-by-one buffer so the size is more likely to be accurate.
554
let mut buffers = init_buffers(
555
projection,
556
capacity + 1,
557
schema,
558
parse_options.quote_char,
559
parse_options.encoding,
560
parse_options.decimal_comma,
561
)?;
562
563
debug_assert!(projection.is_sorted());
564
565
let mut last_read = usize::MAX;
566
loop {
567
if read >= stop_at_nbytes || read == last_read {
568
break;
569
}
570
let local_bytes = &bytes[read..stop_at_nbytes];
571
572
last_read = read;
573
let offset = read + starting_point_offset.unwrap();
574
read += parse_lines(
575
local_bytes,
576
parse_options,
577
offset,
578
ignore_errors,
579
null_values,
580
projection,
581
&mut buffers,
582
chunk_size,
583
schema.len(),
584
schema,
585
)?;
586
}
587
588
let columns = buffers
589
.into_iter()
590
.map(|buf| buf.into_series().map(Column::from))
591
.collect::<PolarsResult<Vec<_>>>()?;
592
Ok(unsafe { DataFrame::new_no_checks_height_from_first(columns) })
593
}
594
595
#[allow(clippy::too_many_arguments)]
596
pub fn find_starting_point(
597
mut bytes: &[u8],
598
quote_char: Option<u8>,
599
eol_char: u8,
600
schema_len: usize,
601
skip_lines: usize,
602
skip_rows_before_header: usize,
603
skip_rows_after_header: usize,
604
comment_prefix: Option<&CommentPrefix>,
605
has_header: bool,
606
) -> PolarsResult<usize> {
607
let full_len = bytes.len();
608
let starting_point_offset = bytes.as_ptr() as usize;
609
610
bytes = if skip_lines > 0 {
611
polars_ensure!(skip_rows_before_header == 0, InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set");
612
skip_lines_naive(bytes, eol_char, skip_lines)
613
} else {
614
// Skip utf8 byte-order-mark (BOM)
615
bytes = skip_bom(bytes);
616
617
// \n\n can be a empty string row of a single column
618
// in other cases we skip it.
619
if schema_len > 1 {
620
bytes = skip_line_ending(bytes, eol_char)
621
}
622
bytes
623
};
624
625
// skip 'n' leading rows
626
if skip_rows_before_header > 0 {
627
let mut split_lines = SplitLines::new(bytes, quote_char, eol_char, comment_prefix);
628
let mut current_line = &bytes[..0];
629
630
for _ in 0..skip_rows_before_header {
631
current_line = split_lines
632
.next()
633
.ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;
634
}
635
636
current_line = split_lines
637
.next()
638
.unwrap_or(&current_line[current_line.len()..]);
639
bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];
640
}
641
642
// skip lines that are comments
643
while is_comment_line(bytes, comment_prefix) {
644
bytes = skip_this_line_naive(bytes, eol_char);
645
}
646
647
// skip header row
648
if has_header {
649
bytes = skip_this_line(bytes, quote_char, eol_char);
650
}
651
// skip 'n' rows following the header
652
if skip_rows_after_header > 0 {
653
let mut split_lines = SplitLines::new(bytes, quote_char, eol_char, comment_prefix);
654
let mut current_line = &bytes[..0];
655
656
for _ in 0..skip_rows_after_header {
657
current_line = split_lines
658
.next()
659
.ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;
660
}
661
662
current_line = split_lines
663
.next()
664
.unwrap_or(&current_line[current_line.len()..]);
665
bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];
666
}
667
668
Ok(
669
// Some of the functions we call may return `&'static []` instead of
670
// slices of `&bytes[..]`.
671
if bytes.is_empty() {
672
full_len
673
} else {
674
bytes.as_ptr() as usize - starting_point_offset
675
},
676
)
677
}
678
679