Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/csv/read/read_impl.rs
8420 views
1
use std::fmt;
2
use std::sync::Mutex;
3
4
use polars_buffer::{Buffer, SharedStorage};
5
use polars_core::POOL;
6
use polars_core::prelude::*;
7
use polars_core::utils::{accumulate_dataframes_vertical, handle_casting_failures};
8
#[cfg(feature = "polars-time")]
9
use polars_time::prelude::*;
10
use polars_utils::relaxed_cell::RelaxedCell;
11
use rayon::prelude::*;
12
13
use super::CsvParseOptions;
14
use super::builder::init_builders;
15
use super::options::{CsvEncoding, NullValuesCompiled};
16
use super::parser::{CountLines, is_comment_line, parse_lines};
17
use super::reader::prepare_csv_schema;
18
#[cfg(feature = "decompress")]
19
use super::utils::decompress;
20
use crate::RowIndex;
21
use crate::csv::read::{CsvReadOptions, read_until_start_and_infer_schema};
22
use crate::mmap::ReaderBytes;
23
use crate::predicates::PhysicalIoExpr;
24
use crate::utils::compression::{CompressedReader, SupportedCompression};
25
use crate::utils::update_row_counts2;
26
27
pub fn cast_columns(
28
df: &mut DataFrame,
29
to_cast: &[Field],
30
parallel: bool,
31
ignore_errors: bool,
32
) -> PolarsResult<()> {
33
let cast_fn = |c: &Column, fld: &Field| {
34
let out = match (c.dtype(), fld.dtype()) {
35
#[cfg(feature = "temporal")]
36
(DataType::String, DataType::Date) => c
37
.str()
38
.unwrap()
39
.as_date(None, false)
40
.map(|ca| ca.into_column()),
41
#[cfg(feature = "temporal")]
42
(DataType::String, DataType::Time) => c
43
.str()
44
.unwrap()
45
.as_time(None, false)
46
.map(|ca| ca.into_column()),
47
#[cfg(feature = "temporal")]
48
(DataType::String, DataType::Datetime(tu, _)) => c
49
.str()
50
.unwrap()
51
.as_datetime(
52
None,
53
*tu,
54
false,
55
false,
56
None,
57
&StringChunked::from_iter(std::iter::once("raise")),
58
)
59
.map(|ca| ca.into_column()),
60
(_, dt) => c.cast(dt),
61
}?;
62
if !ignore_errors && c.null_count() != out.null_count() {
63
handle_casting_failures(c.as_materialized_series(), out.as_materialized_series())?;
64
}
65
Ok(out)
66
};
67
68
if parallel {
69
let cols = POOL.install(|| {
70
df.columns()
71
.into_par_iter()
72
.map(|s| {
73
if let Some(fld) = to_cast.iter().find(|fld| fld.name() == s.name()) {
74
cast_fn(s, fld)
75
} else {
76
Ok(s.clone())
77
}
78
})
79
.collect::<PolarsResult<Vec<_>>>()
80
})?;
81
*df = unsafe { DataFrame::new_unchecked(df.height(), cols) }
82
} else {
83
// cast to the original dtypes in the schema
84
for fld in to_cast {
85
// field may not be projected
86
if let Some(idx) = df.get_column_index(fld.name()) {
87
df.try_apply_at_idx(idx, |s| cast_fn(s, fld))?;
88
}
89
}
90
}
91
Ok(())
92
}
93
94
struct ReaderBytesAndDependents<'a> {
95
// Ensure lifetime dependents are dropped before `reader_bytes`, since their drop impls
96
// could access themselves, this is achieved by placing them before `reader_bytes`.
97
// SAFETY: This is lifetime bound to `reader_bytes`
98
compressed_reader: CompressedReader,
99
// SAFETY: This is lifetime bound to `reader_bytes`
100
leftover: Buffer<u8>,
101
_reader_bytes: ReaderBytes<'a>,
102
}
103
104
/// CSV file reader
105
pub(crate) struct CoreReader<'a> {
106
reader_bytes: Option<ReaderBytesAndDependents<'a>>,
107
108
/// Explicit schema for the CSV file
109
schema: SchemaRef,
110
parse_options: CsvParseOptions,
111
/// Optional projection for which columns to load (zero-based column indices)
112
projection: Option<Vec<usize>>,
113
/// Current line number, used in error reporting
114
current_line: usize,
115
ignore_errors: bool,
116
n_rows: Option<usize>,
117
n_threads: Option<usize>,
118
null_values: Option<NullValuesCompiled>,
119
predicate: Option<Arc<dyn PhysicalIoExpr>>,
120
to_cast: Vec<Field>,
121
row_index: Option<RowIndex>,
122
}
123
124
impl fmt::Debug for CoreReader<'_> {
125
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126
f.debug_struct("Reader")
127
.field("schema", &self.schema)
128
.field("projection", &self.projection)
129
.field("current_line", &self.current_line)
130
.finish()
131
}
132
}
133
134
impl<'a> CoreReader<'a> {
135
#[allow(clippy::too_many_arguments)]
136
pub(crate) fn new(
137
reader_bytes: ReaderBytes<'a>,
138
parse_options: Arc<CsvParseOptions>,
139
n_rows: Option<usize>,
140
skip_rows: usize,
141
skip_lines: usize,
142
mut projection: Option<Vec<usize>>,
143
max_records: Option<usize>,
144
has_header: bool,
145
ignore_errors: bool,
146
schema: Option<SchemaRef>,
147
columns: Option<Arc<[PlSmallStr]>>,
148
n_threads: Option<usize>,
149
schema_overwrite: Option<SchemaRef>,
150
dtype_overwrite: Option<Arc<Vec<DataType>>>,
151
predicate: Option<Arc<dyn PhysicalIoExpr>>,
152
mut to_cast: Vec<Field>,
153
skip_rows_after_header: usize,
154
row_index: Option<RowIndex>,
155
raise_if_empty: bool,
156
) -> PolarsResult<CoreReader<'a>> {
157
let separator = parse_options.separator;
158
159
#[cfg(feature = "decompress")]
160
let mut reader_bytes = reader_bytes;
161
162
if !cfg!(feature = "decompress") && SupportedCompression::check(&reader_bytes).is_some() {
163
polars_bail!(
164
ComputeError: "cannot read compressed CSV file; \
165
compile with feature 'decompress'"
166
);
167
}
168
// We keep track of the inferred schema bool
169
// In case the file is compressed this schema inference is wrong and has to be done
170
// again after decompression.
171
#[cfg(feature = "decompress")]
172
{
173
let total_n_rows =
174
n_rows.map(|n| skip_rows + (has_header as usize) + skip_rows_after_header + n);
175
if let Some(b) = decompress(
176
&reader_bytes,
177
total_n_rows,
178
separator,
179
parse_options.quote_char,
180
parse_options.eol_char,
181
) {
182
reader_bytes = ReaderBytes::Owned(b.into());
183
}
184
}
185
186
let reader_slice = match &reader_bytes {
187
ReaderBytes::Borrowed(slice) => {
188
// SAFETY: The produced slice and derived slices MUST not live longer than
189
// `reader_bytes`. TODO use `scan_csv` to implement `read_csv`.
190
let ss = unsafe { SharedStorage::from_slice_unchecked(slice) };
191
Buffer::from_storage(ss)
192
},
193
ReaderBytes::Owned(slice) => slice.clone(),
194
};
195
let mut compressed_reader = CompressedReader::try_new(reader_slice)?;
196
197
let read_options = CsvReadOptions {
198
parse_options: parse_options.clone(),
199
n_rows,
200
skip_rows,
201
skip_lines,
202
projection: projection.clone().map(Arc::new),
203
has_header,
204
ignore_errors,
205
schema: schema.clone(),
206
columns: columns.clone(),
207
n_threads,
208
schema_overwrite,
209
dtype_overwrite: dtype_overwrite.clone(),
210
fields_to_cast: to_cast.clone(),
211
skip_rows_after_header,
212
row_index: row_index.clone(),
213
raise_if_empty,
214
infer_schema_length: max_records,
215
..Default::default()
216
};
217
218
// Since this is also used to skip to the start, always call it.
219
let (inferred_schema, leftover) =
220
read_until_start_and_infer_schema(&read_options, None, None, &mut compressed_reader)?;
221
222
let mut schema = match schema {
223
Some(schema) => schema,
224
None => Arc::new(inferred_schema),
225
};
226
if let Some(dtypes) = dtype_overwrite {
227
polars_ensure!(
228
dtypes.len() <= schema.len(),
229
InvalidOperation: "The number of schema overrides must be less than or equal to the number of fields"
230
);
231
let s = Arc::make_mut(&mut schema);
232
for (index, dt) in dtypes.iter().enumerate() {
233
s.set_dtype_at_index(index, dt.clone()).unwrap();
234
}
235
}
236
237
prepare_csv_schema(&mut schema, &mut to_cast)?;
238
239
// Create a null value for every column
240
let null_values = parse_options
241
.null_values
242
.as_ref()
243
.map(|nv| nv.clone().compile(&schema))
244
.transpose()?;
245
246
if let Some(cols) = columns {
247
let mut prj = Vec::with_capacity(cols.len());
248
for col in cols.as_ref() {
249
let i = schema.try_index_of(col)?;
250
prj.push(i);
251
}
252
projection = Some(prj);
253
}
254
255
Ok(CoreReader {
256
reader_bytes: Some(ReaderBytesAndDependents {
257
compressed_reader,
258
leftover,
259
_reader_bytes: reader_bytes,
260
}),
261
parse_options: (*parse_options).clone(),
262
schema,
263
projection,
264
current_line: usize::from(has_header),
265
ignore_errors,
266
n_rows,
267
n_threads,
268
null_values,
269
predicate,
270
to_cast,
271
row_index,
272
})
273
}
274
275
fn get_projection(&mut self) -> PolarsResult<Vec<usize>> {
276
// we also need to sort the projection to have predictable output.
277
// the `parse_lines` function expects this.
278
self.projection
279
.take()
280
.map(|mut v| {
281
v.sort_unstable();
282
if let Some(idx) = v.last() {
283
polars_ensure!(*idx < self.schema.len(), OutOfBounds: "projection index: {} is out of bounds for csv schema with length: {}", idx, self.schema.len())
284
}
285
Ok(v)
286
})
287
.unwrap_or_else(|| Ok((0..self.schema.len()).collect()))
288
}
289
290
fn read_chunk(
291
&self,
292
bytes: &[u8],
293
projection: &[usize],
294
bytes_offset: usize,
295
capacity: usize,
296
starting_point_offset: Option<usize>,
297
stop_at_nbytes: usize,
298
) -> PolarsResult<DataFrame> {
299
let mut df = read_chunk(
300
bytes,
301
&self.parse_options,
302
self.schema.as_ref(),
303
self.ignore_errors,
304
projection,
305
bytes_offset,
306
capacity,
307
self.null_values.as_ref(),
308
usize::MAX,
309
stop_at_nbytes,
310
starting_point_offset,
311
)?;
312
313
cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
314
Ok(df)
315
}
316
317
// The code adheres to RFC 4180 in a strict sense, unless explicitly documented otherwise.
318
// Malformed CSV is common, see e.g. the use of lazy_quotes, whitespace and comments.
319
// In case malformed CSV is detected, a warning or an error will be issued.
320
// Not all malformed CSV will be detected, as that would impact performance.
321
fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {
322
let projection = self.get_projection()?;
323
324
// An empty file with a schema should return an empty DataFrame with that schema
325
if bytes.is_empty() {
326
let mut df = if projection.len() == self.schema.len() {
327
DataFrame::empty_with_schema(self.schema.as_ref())
328
} else {
329
DataFrame::empty_with_schema(
330
&projection
331
.iter()
332
.map(|&i| self.schema.get_at_index(i).unwrap())
333
.map(|(name, dtype)| Field {
334
name: name.clone(),
335
dtype: dtype.clone(),
336
})
337
.collect::<Schema>(),
338
)
339
};
340
341
cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
342
343
if let Some(ref row_index) = self.row_index {
344
df.insert_column(0, Column::new_empty(row_index.name.clone(), &IDX_DTYPE))?;
345
}
346
return Ok(df);
347
}
348
349
let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
350
351
// This is chosen by benchmarking on ny city trip csv dataset.
352
// We want small enough chunks such that threads start working as soon as possible
353
// But we also want them large enough, so that we have less chunks related overhead.
354
// We minimize chunks to 16 MB to still fit L3 cache.
355
//
356
// Width-aware adjustment: For wide data (many columns), per-chunk overhead
357
// (allocating column buffers) becomes significant. Each chunk must allocate
358
// O(n_cols) buffers, so total allocation overhead is O(n_chunks * n_cols).
359
// To keep this bounded, we limit n_chunks such that n_chunks * n_cols <= threshold.
360
// With threshold ~500K, this gives:
361
// - 100 cols: up to 5000 chunks (no practical limit)
362
// - 1000 cols: up to 500 chunks
363
// - 10000 cols: up to 50 chunks
364
// - 30000 cols: up to 16 chunks
365
let n_cols = projection.len();
366
// Empirically determined to balance allocation overhead and parallelism.
367
const ALLOCATION_BUDGET: usize = 500_000;
368
let max_chunks_for_width = ALLOCATION_BUDGET / n_cols.max(1);
369
let n_parts_hint = std::cmp::min(n_threads * 16, max_chunks_for_width.max(n_threads));
370
let chunk_size = std::cmp::min(bytes.len() / n_parts_hint.max(1), 16 * 1024 * 1024);
371
372
// Use a small min chunk size to catch failures in tests.
373
#[cfg(debug_assertions)]
374
let min_chunk_size = 64;
375
#[cfg(not(debug_assertions))]
376
let min_chunk_size = 1024 * 4;
377
378
let mut chunk_size = std::cmp::max(chunk_size, min_chunk_size);
379
let mut total_bytes_offset = 0;
380
381
let results = Arc::new(Mutex::new(vec![]));
382
// We have to do this after parsing as there can be comments.
383
let total_line_count = &RelaxedCell::new_usize(0);
384
385
let counter = CountLines::new(
386
self.parse_options.quote_char,
387
self.parse_options.eol_char,
388
None,
389
);
390
let mut total_offset = 0;
391
let mut previous_total_offset = 0;
392
let check_utf8 = matches!(self.parse_options.encoding, CsvEncoding::Utf8)
393
&& self.schema.iter_fields().any(|f| f.dtype().is_string());
394
395
POOL.scope(|s| {
396
// Pass 1: identify chunks for parallel processing (line parsing).
397
loop {
398
let b = unsafe { bytes.get_unchecked(total_offset..) };
399
if b.is_empty() {
400
break;
401
}
402
debug_assert!(
403
total_offset == 0 || bytes[total_offset - 1] == self.parse_options.eol_char
404
);
405
406
// Count is the number of rows for the next chunk. In case of malformed CSV data,
407
// count may not be as expected.
408
let (count, position) = counter.find_next(b, &mut chunk_size);
409
debug_assert!(count == 0 || b[position] == self.parse_options.eol_char);
410
411
let (b, count) = if count == 0
412
&& unsafe {
413
std::ptr::eq(b.as_ptr().add(b.len()), bytes.as_ptr().add(bytes.len()))
414
} {
415
total_offset = bytes.len();
416
let c = if is_comment_line(bytes, self.parse_options.comment_prefix.as_ref()) {
417
0
418
} else {
419
1
420
};
421
(b, c)
422
} else {
423
let end = total_offset + position + 1;
424
let b = unsafe { bytes.get_unchecked(total_offset..end) };
425
426
previous_total_offset = total_offset;
427
total_offset = end;
428
(b, count)
429
};
430
431
// Pass 2: process each individual chunk in parallel (field parsing)
432
if !b.is_empty() {
433
let results = results.clone();
434
let projection = projection.as_ref();
435
let slf = &(*self);
436
s.spawn(move |_| {
437
if check_utf8 && !super::builder::validate_utf8(b) {
438
let mut results = results.lock().unwrap();
439
results.push((
440
b.as_ptr() as usize,
441
Err(polars_err!(ComputeError: "invalid utf-8 sequence")),
442
));
443
return;
444
}
445
446
let result = slf
447
.read_chunk(b, projection, 0, count, Some(0), b.len())
448
.and_then(|mut df| {
449
// Check malformed
450
if df.height() > count
451
|| (df.height() < count
452
&& slf.parse_options.comment_prefix.is_none())
453
{
454
// Note: in case data is malformed, df.height() is more likely to be correct than count.
455
let msg = format!(
456
"CSV malformed: expected {} rows, \
457
actual {} rows, in chunk starting at \
458
byte offset {}, length {}",
459
count,
460
df.height(),
461
previous_total_offset,
462
b.len()
463
);
464
if slf.ignore_errors {
465
polars_warn!("{}", msg);
466
} else {
467
polars_bail!(ComputeError: msg);
468
}
469
}
470
471
if slf.n_rows.is_some() {
472
total_line_count.fetch_add(df.height());
473
}
474
475
// We cannot use the line count as there can be comments in the lines so we must correct line counts later.
476
if let Some(rc) = &slf.row_index {
477
// is first chunk
478
let offset = if std::ptr::eq(b.as_ptr(), bytes.as_ptr()) {
479
Some(rc.offset)
480
} else {
481
None
482
};
483
484
unsafe { df.with_row_index_mut(rc.name.clone(), offset) };
485
};
486
487
if let Some(predicate) = slf.predicate.as_ref() {
488
let s = predicate.evaluate_io(&df)?;
489
let mask = s.bool()?;
490
df = df.filter(mask)?;
491
}
492
Ok(df)
493
});
494
495
results.lock().unwrap().push((b.as_ptr() as usize, result));
496
});
497
498
// Check just after we spawned a chunk. That mean we processed all data up until
499
// row count.
500
if self.n_rows.is_some() && total_line_count.load() > self.n_rows.unwrap() {
501
break;
502
}
503
}
504
total_bytes_offset += b.len();
505
}
506
});
507
508
let mut results = std::mem::take(&mut *results.lock().unwrap());
509
results.sort_unstable_by_key(|k| k.0);
510
let mut dfs = results
511
.into_iter()
512
.map(|k| k.1)
513
.collect::<PolarsResult<Vec<_>>>()?;
514
515
if let Some(rc) = &self.row_index {
516
update_row_counts2(&mut dfs, rc.offset)
517
};
518
accumulate_dataframes_vertical(dfs)
519
}
520
521
/// Read the csv into a DataFrame. The predicate can come from a lazy physical plan.
522
pub fn finish(mut self) -> PolarsResult<DataFrame> {
523
let mut reader_bytes = self.reader_bytes.take().unwrap();
524
let (body_bytes, _) = reader_bytes
525
.compressed_reader
526
.read_next_slice(&reader_bytes.leftover, usize::MAX)?;
527
528
let mut df = self.parse_csv(&body_bytes)?;
529
530
// if multi-threaded the n_rows was probabilistically determined.
531
// Let's slice to correct number of rows if possible.
532
if let Some(n_rows) = self.n_rows {
533
if n_rows < df.height() {
534
df = df.slice(0, n_rows)
535
}
536
}
537
Ok(df)
538
}
539
}
540
541
#[allow(clippy::too_many_arguments)]
542
pub fn read_chunk(
543
bytes: &[u8],
544
parse_options: &CsvParseOptions,
545
schema: &Schema,
546
ignore_errors: bool,
547
projection: &[usize],
548
bytes_offset_thread: usize,
549
capacity: usize,
550
null_values: Option<&NullValuesCompiled>,
551
chunk_size: usize,
552
stop_at_nbytes: usize,
553
starting_point_offset: Option<usize>,
554
) -> PolarsResult<DataFrame> {
555
let mut read = bytes_offset_thread;
556
// There's an off-by-one error somewhere in the reading code, where it reads
557
// one more item than the requested capacity. Given the batch sizes are
558
// approximate (sometimes they're smaller), this isn't broken, but it does
559
// mean a bunch of extra allocation and copying. So we allocate a
560
// larger-by-one buffer so the size is more likely to be accurate.
561
let mut buffers = init_builders(
562
projection,
563
capacity + 1,
564
schema,
565
parse_options.quote_char,
566
parse_options.encoding,
567
parse_options.decimal_comma,
568
)?;
569
570
debug_assert!(projection.is_sorted());
571
572
let mut last_read = usize::MAX;
573
loop {
574
if read >= stop_at_nbytes || read == last_read {
575
break;
576
}
577
let local_bytes = &bytes[read..stop_at_nbytes];
578
579
last_read = read;
580
let offset = read + starting_point_offset.unwrap();
581
read += parse_lines(
582
local_bytes,
583
parse_options,
584
offset,
585
ignore_errors,
586
null_values,
587
projection,
588
&mut buffers,
589
chunk_size,
590
schema.len(),
591
schema,
592
)?;
593
}
594
595
let columns = buffers
596
.into_iter()
597
.map(|buf| buf.into_series().map(Column::from))
598
.collect::<PolarsResult<Vec<_>>>()?;
599
Ok(unsafe { DataFrame::new_unchecked_infer_height(columns) })
600
}
601
602