Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/csv/read/parser.rs
8415 views
1
use std::cmp;
2
3
use memchr::memchr2_iter;
4
use polars_buffer::Buffer;
5
use polars_core::POOL;
6
use polars_core::prelude::*;
7
use polars_error::feature_gated;
8
use polars_utils::mmap::MMapSemaphore;
9
use polars_utils::pl_path::PlRefPath;
10
use polars_utils::select::select_unpredictable;
11
use rayon::prelude::*;
12
13
use super::CsvParseOptions;
14
use super::builder::Builder;
15
use super::options::{CommentPrefix, NullValuesCompiled};
16
use super::splitfields::SplitFields;
17
use crate::csv::read::read_until_start_and_infer_schema;
18
use crate::prelude::CsvReadOptions;
19
use crate::utils::compression::CompressedReader;
20
21
/// Read the number of rows without parsing columns
22
/// useful for count(*) queries
23
#[allow(clippy::too_many_arguments)]
24
pub fn count_rows(
25
path: PlRefPath,
26
quote_char: Option<u8>,
27
comment_prefix: Option<&CommentPrefix>,
28
eol_char: u8,
29
has_header: bool,
30
skip_lines: usize,
31
skip_rows_before_header: usize,
32
skip_rows_after_header: usize,
33
) -> PolarsResult<usize> {
34
let file = if path.has_scheme() || polars_config::config().force_async() {
35
feature_gated!("cloud", {
36
crate::file_cache::FILE_CACHE
37
.get_entry(path)
38
// Safety: This was initialized by schema inference.
39
.unwrap()
40
.try_open_assume_latest()?
41
})
42
} else {
43
polars_utils::open_file(path.as_std_path())?
44
};
45
46
let mmap = MMapSemaphore::new_from_file(&file).unwrap();
47
48
count_rows_from_slice_par(
49
Buffer::from_owner(mmap),
50
quote_char,
51
comment_prefix,
52
eol_char,
53
has_header,
54
skip_lines,
55
skip_rows_before_header,
56
skip_rows_after_header,
57
)
58
}
59
60
/// Read the number of rows without parsing columns
61
/// useful for count(*) queries
62
#[allow(clippy::too_many_arguments)]
63
pub fn count_rows_from_slice_par(
64
buffer: Buffer<u8>,
65
quote_char: Option<u8>,
66
comment_prefix: Option<&CommentPrefix>,
67
eol_char: u8,
68
has_header: bool,
69
skip_lines: usize,
70
skip_rows_before_header: usize,
71
skip_rows_after_header: usize,
72
) -> PolarsResult<usize> {
73
let mut reader = CompressedReader::try_new(buffer)?;
74
75
let reader_options = CsvReadOptions {
76
parse_options: Arc::new(CsvParseOptions {
77
quote_char,
78
comment_prefix: comment_prefix.cloned(),
79
eol_char,
80
..Default::default()
81
}),
82
has_header,
83
skip_lines,
84
skip_rows: skip_rows_before_header,
85
skip_rows_after_header,
86
..Default::default()
87
};
88
89
let (_, mut leftover) =
90
read_until_start_and_infer_schema(&reader_options, None, None, &mut reader)?;
91
92
const BYTES_PER_CHUNK: usize = if cfg!(debug_assertions) {
93
128
94
} else {
95
512 * 1024
96
};
97
98
let count = CountLines::new(quote_char, eol_char, comment_prefix.cloned());
99
POOL.install(|| {
100
let mut states = Vec::new();
101
let eof_unterminated_row;
102
103
if comment_prefix.is_none() {
104
let mut last_slice = Buffer::new();
105
let mut err = None;
106
107
let streaming_iter = std::iter::from_fn(|| {
108
let (slice, read_n) = match reader.read_next_slice(&leftover, BYTES_PER_CHUNK) {
109
Ok(tup) => tup,
110
Err(e) => {
111
err = Some(e);
112
return None;
113
},
114
};
115
116
leftover = Buffer::new();
117
if slice.is_empty() && read_n == 0 {
118
return None;
119
}
120
121
last_slice = slice.clone();
122
Some(slice)
123
});
124
125
states = streaming_iter
126
.enumerate()
127
.par_bridge()
128
.map(|(id, slice)| (count.analyze_chunk(&slice), id))
129
.collect::<Vec<_>>();
130
131
if let Some(e) = err {
132
return Err(e.into());
133
}
134
135
// par_bridge does not guarantee order, but is mostly sorted so `slice::sort` is a
136
// decent fit.
137
states.sort_by_key(|(_, id)| *id);
138
139
// Technically this is broken if the input has a comment line at the end that is longer
140
// than `BYTES_PER_CHUNK`, but in practice this ought to be fine.
141
eof_unterminated_row = ends_in_unterminated_row(&last_slice, eol_char, comment_prefix);
142
} else {
143
// For the non-compressed case this is a zero-copy op.
144
// TODO: Implement streaming chunk logic.
145
let (bytes, _) = reader.read_next_slice(&leftover, usize::MAX)?;
146
147
let num_chunks = bytes.len().div_ceil(BYTES_PER_CHUNK);
148
(0..num_chunks)
149
.into_par_iter()
150
.map(|chunk_idx| {
151
let mut start_offset = chunk_idx * BYTES_PER_CHUNK;
152
let next_start_offset = (start_offset + BYTES_PER_CHUNK).min(bytes.len());
153
154
if start_offset != 0 {
155
// Ensure we start at the start of a line.
156
if let Some(nl_off) = bytes[start_offset..next_start_offset]
157
.iter()
158
.position(|b| *b == eol_char)
159
{
160
start_offset += nl_off + 1;
161
} else {
162
return (count.analyze_chunk(&[]), 0);
163
}
164
}
165
166
let stop_offset = if let Some(nl_off) = bytes[next_start_offset..]
167
.iter()
168
.position(|b| *b == eol_char)
169
{
170
next_start_offset + nl_off + 1
171
} else {
172
bytes.len()
173
};
174
175
(count.analyze_chunk(&bytes[start_offset..stop_offset]), 0)
176
})
177
.collect_into_vec(&mut states);
178
179
eof_unterminated_row = ends_in_unterminated_row(&bytes, eol_char, comment_prefix);
180
}
181
182
let mut n = 0;
183
let mut in_string = false;
184
for (pair, _) in states {
185
n += pair[in_string as usize].newline_count;
186
in_string = pair[in_string as usize].end_inside_string;
187
}
188
n += eof_unterminated_row as usize;
189
190
Ok(n)
191
})
192
}
193
194
/// Checks if a line in a CSV file is a comment based on the given comment prefix configuration.
195
///
196
/// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters.
197
#[inline]
198
pub fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
199
match comment_prefix {
200
Some(CommentPrefix::Single(c)) => line.first() == Some(c),
201
Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
202
None => false,
203
}
204
}
205
206
/// Find the nearest next line position.
207
/// Does not check for new line characters embedded in String fields.
208
pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
209
let pos = memchr::memchr(eol_char, input)? + 1;
210
if input.len() - pos == 0 {
211
return None;
212
}
213
Some(pos)
214
}
215
216
/// Find the nearest next line position that is not embedded in a String field.
217
pub(super) fn next_line_position(
218
mut input: &[u8],
219
mut expected_fields: Option<usize>,
220
separator: u8,
221
quote_char: Option<u8>,
222
eol_char: u8,
223
) -> Option<usize> {
224
fn accept_line(
225
line: &[u8],
226
expected_fields: usize,
227
separator: u8,
228
eol_char: u8,
229
quote_char: Option<u8>,
230
) -> bool {
231
let mut count = 0usize;
232
for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
233
if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
234
return false;
235
}
236
count += 1;
237
}
238
239
// if the latest field is missing
240
// e.g.:
241
// a,b,c
242
// vala,valb,
243
// SplitFields returns a count that is 1 less
244
// There fore we accept:
245
// expected == count
246
// and
247
// expected == count - 1
248
expected_fields.wrapping_sub(count) <= 1
249
}
250
251
// we check 3 subsequent lines for `accept_line` before we accept
252
// if 3 groups are rejected we reject completely
253
let mut rejected_line_groups = 0u8;
254
255
let mut total_pos = 0;
256
if input.is_empty() {
257
return None;
258
}
259
let mut lines_checked = 0u8;
260
loop {
261
if rejected_line_groups >= 3 {
262
return None;
263
}
264
lines_checked = lines_checked.wrapping_add(1);
265
// headers might have an extra value
266
// So if we have churned through enough lines
267
// we try one field less.
268
if lines_checked == u8::MAX {
269
if let Some(ef) = expected_fields {
270
expected_fields = Some(ef.saturating_sub(1))
271
}
272
};
273
let pos = memchr::memchr(eol_char, input)? + 1;
274
if input.len() - pos == 0 {
275
return None;
276
}
277
debug_assert!(pos <= input.len());
278
let new_input = unsafe { input.get_unchecked(pos..) };
279
let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
280
let line = lines.next();
281
282
match (line, expected_fields) {
283
// count the fields, and determine if they are equal to what we expect from the schema
284
(Some(line), Some(expected_fields)) => {
285
if accept_line(line, expected_fields, separator, eol_char, quote_char) {
286
let mut valid = true;
287
for line in lines.take(2) {
288
if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
289
valid = false;
290
break;
291
}
292
}
293
if valid {
294
return Some(total_pos + pos);
295
} else {
296
rejected_line_groups += 1;
297
}
298
} else {
299
debug_assert!(pos < input.len());
300
unsafe {
301
input = input.get_unchecked(pos + 1..);
302
}
303
total_pos += pos + 1;
304
}
305
},
306
// don't count the fields
307
(Some(_), None) => return Some(total_pos + pos),
308
// // no new line found, check latest line (without eol) for number of fields
309
_ => return None,
310
}
311
}
312
}
313
314
#[inline(always)]
315
pub(super) fn is_whitespace(b: u8) -> bool {
316
b == b' ' || b == b'\t'
317
}
318
319
/// May have false-positives, but not false negatives.
320
#[inline(always)]
321
pub(super) fn could_be_whitespace_fast(b: u8) -> bool {
322
// We're interested in \t (ASCII 9) and " " (ASCII 32), both of which are
323
// <= 32. In that range there aren't a lot of other common symbols (besides
324
// newline), so this is a quick test which can be worth doing to avoid the
325
// exact test.
326
b <= 32
327
}
328
329
#[inline]
330
fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
331
where
332
F: Fn(u8) -> bool,
333
{
334
if input.is_empty() {
335
return input;
336
}
337
338
let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
339
&input[read..]
340
}
341
342
/// Remove whitespace from the start of buffer.
343
/// Makes sure that the bytes stream starts with
344
/// 'field_1,field_2'
345
/// and not with
346
/// '\nfield_1,field_1'
347
#[inline]
348
pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
349
skip_condition(input, is_whitespace)
350
}
351
352
/// An adapted version of std::iter::Split.
353
/// This exists solely because we cannot split the file in lines naively as
354
///
355
/// ```text
356
/// for line in bytes.split(b'\n') {
357
/// ```
358
///
359
/// This will fail when strings fields are have embedded end line characters.
360
/// For instance: "This is a valid field\nI have multiples lines" is a valid string field, that contains multiple lines.
361
pub struct SplitLines<'a> {
362
v: &'a [u8],
363
quote_char: u8,
364
eol_char: u8,
365
#[cfg(feature = "simd")]
366
simd_eol_char: SimdVec,
367
#[cfg(feature = "simd")]
368
simd_quote_char: SimdVec,
369
#[cfg(feature = "simd")]
370
previous_valid_eols: u64,
371
total_index: usize,
372
quoting: bool,
373
comment_prefix: Option<&'a CommentPrefix>,
374
}
375
376
#[cfg(feature = "simd")]
377
const SIMD_SIZE: usize = 64;
378
#[cfg(feature = "simd")]
379
use std::simd::prelude::*;
380
381
#[cfg(feature = "simd")]
382
use polars_utils::clmul::prefix_xorsum_inclusive;
383
384
#[cfg(feature = "simd")]
385
type SimdVec = u8x64;
386
387
impl<'a> SplitLines<'a> {
388
pub fn new(
389
slice: &'a [u8],
390
quote_char: Option<u8>,
391
eol_char: u8,
392
comment_prefix: Option<&'a CommentPrefix>,
393
) -> Self {
394
let quoting = quote_char.is_some();
395
let quote_char = quote_char.unwrap_or(b'\"');
396
#[cfg(feature = "simd")]
397
let simd_eol_char = SimdVec::splat(eol_char);
398
#[cfg(feature = "simd")]
399
let simd_quote_char = SimdVec::splat(quote_char);
400
Self {
401
v: slice,
402
quote_char,
403
eol_char,
404
#[cfg(feature = "simd")]
405
simd_eol_char,
406
#[cfg(feature = "simd")]
407
simd_quote_char,
408
#[cfg(feature = "simd")]
409
previous_valid_eols: 0,
410
total_index: 0,
411
quoting,
412
comment_prefix,
413
}
414
}
415
}
416
417
impl<'a> SplitLines<'a> {
418
// scalar as in non-simd
419
fn next_scalar(&mut self) -> Option<&'a [u8]> {
420
if self.v.is_empty() {
421
return None;
422
}
423
if is_comment_line(self.v, self.comment_prefix) {
424
return self.next_comment_line();
425
}
426
{
427
let mut pos = 0u32;
428
let mut iter = self.v.iter();
429
let mut in_field = false;
430
loop {
431
match iter.next() {
432
Some(&c) => {
433
pos += 1;
434
435
if self.quoting && c == self.quote_char {
436
// toggle between string field enclosure
437
// if we encounter a starting '"' -> in_field = true;
438
// if we encounter a closing '"' -> in_field = false;
439
in_field = !in_field;
440
}
441
// if we are not in a string and we encounter '\n' we can stop at this position.
442
else if c == self.eol_char && !in_field {
443
break;
444
}
445
},
446
None => {
447
let remainder = self.v;
448
self.v = &[];
449
return Some(remainder);
450
},
451
}
452
}
453
454
unsafe {
455
debug_assert!((pos as usize) <= self.v.len());
456
457
// return line up to this position
458
let ret = Some(
459
self.v
460
.get_unchecked(..(self.total_index + pos as usize - 1)),
461
);
462
// skip the '\n' token and update slice.
463
self.v = self.v.get_unchecked(self.total_index + pos as usize..);
464
ret
465
}
466
}
467
}
468
fn next_comment_line(&mut self) -> Option<&'a [u8]> {
469
if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
470
unsafe {
471
// return line up to this position
472
let ret = Some(self.v.get_unchecked(..(pos - 1)));
473
// skip the '\n' token and update slice.
474
self.v = self.v.get_unchecked(pos..);
475
ret
476
}
477
} else {
478
let remainder = self.v;
479
self.v = &[];
480
Some(remainder)
481
}
482
}
483
}
484
485
impl<'a> Iterator for SplitLines<'a> {
486
type Item = &'a [u8];
487
488
#[inline]
489
#[cfg(not(feature = "simd"))]
490
fn next(&mut self) -> Option<&'a [u8]> {
491
self.next_scalar()
492
}
493
494
#[inline]
495
#[cfg(feature = "simd")]
496
fn next(&mut self) -> Option<&'a [u8]> {
497
// First check cached value
498
if self.previous_valid_eols != 0 {
499
let pos = self.previous_valid_eols.trailing_zeros() as usize;
500
self.previous_valid_eols >>= (pos + 1) as u64;
501
502
unsafe {
503
debug_assert!((pos) <= self.v.len());
504
505
// return line up to this position
506
let ret = Some(self.v.get_unchecked(..pos));
507
// skip the '\n' token and update slice.
508
self.v = self.v.get_unchecked(pos + 1..);
509
return ret;
510
}
511
}
512
if self.v.is_empty() {
513
return None;
514
}
515
if self.comment_prefix.is_some() {
516
return self.next_scalar();
517
}
518
519
self.total_index = 0;
520
let mut not_in_field_previous_iter = true;
521
522
loop {
523
let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
524
if bytes.len() > SIMD_SIZE {
525
let lane: [u8; SIMD_SIZE] = unsafe {
526
bytes
527
.get_unchecked(0..SIMD_SIZE)
528
.try_into()
529
.unwrap_unchecked()
530
};
531
let simd_bytes = SimdVec::from(lane);
532
let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
533
534
let valid_eols = if self.quoting {
535
let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
536
let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
537
538
if not_in_field_previous_iter {
539
not_in_quote_field = !not_in_quote_field;
540
}
541
not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
542
eol_mask & not_in_quote_field
543
} else {
544
eol_mask
545
};
546
547
if valid_eols != 0 {
548
let pos = valid_eols.trailing_zeros() as usize;
549
if pos == SIMD_SIZE - 1 {
550
self.previous_valid_eols = 0;
551
} else {
552
self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
553
}
554
555
unsafe {
556
let pos = self.total_index + pos;
557
debug_assert!((pos) <= self.v.len());
558
559
// return line up to this position
560
let ret = Some(self.v.get_unchecked(..pos));
561
// skip the '\n' token and update slice.
562
self.v = self.v.get_unchecked(pos + 1..);
563
return ret;
564
}
565
} else {
566
self.total_index += SIMD_SIZE;
567
}
568
} else {
569
// Denotes if we are in a string field, started with a quote
570
let mut in_field = !not_in_field_previous_iter;
571
let mut pos = 0u32;
572
let mut iter = bytes.iter();
573
loop {
574
match iter.next() {
575
Some(&c) => {
576
pos += 1;
577
578
if self.quoting && c == self.quote_char {
579
// toggle between string field enclosure
580
// if we encounter a starting '"' -> in_field = true;
581
// if we encounter a closing '"' -> in_field = false;
582
in_field = !in_field;
583
}
584
// if we are not in a string and we encounter '\n' we can stop at this position.
585
else if c == self.eol_char && !in_field {
586
break;
587
}
588
},
589
None => {
590
let remainder = self.v;
591
self.v = &[];
592
return Some(remainder);
593
},
594
}
595
}
596
597
unsafe {
598
debug_assert!((pos as usize) <= self.v.len());
599
600
// return line up to this position
601
let ret = Some(
602
self.v
603
.get_unchecked(..(self.total_index + pos as usize - 1)),
604
);
605
// skip the '\n' token and update slice.
606
self.v = self.v.get_unchecked(self.total_index + pos as usize..);
607
return ret;
608
}
609
}
610
}
611
}
612
}
613
614
pub struct CountLines {
615
quote_char: u8,
616
eol_char: u8,
617
#[cfg(feature = "simd")]
618
simd_eol_char: SimdVec,
619
#[cfg(feature = "simd")]
620
simd_quote_char: SimdVec,
621
quoting: bool,
622
comment_prefix: Option<CommentPrefix>,
623
}
624
625
#[derive(Copy, Clone, Debug, Default)]
626
pub struct LineStats {
627
pub newline_count: usize,
628
pub last_newline_offset: usize,
629
pub end_inside_string: bool,
630
}
631
632
impl CountLines {
633
pub fn new(
634
quote_char: Option<u8>,
635
eol_char: u8,
636
comment_prefix: Option<CommentPrefix>,
637
) -> Self {
638
let quoting = quote_char.is_some();
639
let quote_char = quote_char.unwrap_or(b'\"');
640
#[cfg(feature = "simd")]
641
let simd_eol_char = SimdVec::splat(eol_char);
642
#[cfg(feature = "simd")]
643
let simd_quote_char = SimdVec::splat(quote_char);
644
Self {
645
quote_char,
646
eol_char,
647
#[cfg(feature = "simd")]
648
simd_eol_char,
649
#[cfg(feature = "simd")]
650
simd_quote_char,
651
quoting,
652
comment_prefix,
653
}
654
}
655
656
/// Analyzes a chunk of CSV data.
657
///
658
/// Returns (newline_count, last_newline_offset, end_inside_string) twice,
659
/// the first is assuming the start of the chunk is *not* inside a string,
660
/// the second assuming the start is inside a string.
661
///
662
/// If comment_prefix is not None the start of bytes must be at the start of
663
/// a line (and thus not in the middle of a comment).
664
pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
665
let mut states = [
666
LineStats {
667
newline_count: 0,
668
last_newline_offset: 0,
669
end_inside_string: false,
670
},
671
LineStats {
672
newline_count: 0,
673
last_newline_offset: 0,
674
end_inside_string: false,
675
},
676
];
677
678
// If we have to deal with comments we can't use SIMD and have to explicitly do two passes.
679
if self.comment_prefix.is_some() {
680
states[0] = self.analyze_chunk_with_comment(bytes, false);
681
states[1] = self.analyze_chunk_with_comment(bytes, true);
682
return states;
683
}
684
685
// False if even number of quotes seen so far, true otherwise.
686
#[allow(unused_assignments)]
687
let mut global_quote_parity = false;
688
let mut scan_offset = 0;
689
690
#[cfg(feature = "simd")]
691
{
692
// 0 if even number of quotes seen so far, u64::MAX otherwise.
693
let mut global_quote_parity_mask = 0;
694
while scan_offset + 64 <= bytes.len() {
695
let block: [u8; 64] = unsafe {
696
bytes
697
.get_unchecked(scan_offset..scan_offset + 64)
698
.try_into()
699
.unwrap_unchecked()
700
};
701
let simd_bytes = SimdVec::from(block);
702
let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
703
if self.quoting {
704
let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
705
let quote_parity =
706
prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
707
global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
708
709
let start_outside_string_eol_mask = eol_mask & !quote_parity;
710
states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
711
states[0].last_newline_offset = select_unpredictable(
712
start_outside_string_eol_mask != 0,
713
(scan_offset + 63)
714
.wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
715
states[0].last_newline_offset,
716
);
717
718
let start_inside_string_eol_mask = eol_mask & quote_parity;
719
states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
720
states[1].last_newline_offset = select_unpredictable(
721
start_inside_string_eol_mask != 0,
722
(scan_offset + 63)
723
.wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
724
states[1].last_newline_offset,
725
);
726
} else {
727
states[0].newline_count += eol_mask.count_ones() as usize;
728
states[0].last_newline_offset = select_unpredictable(
729
eol_mask != 0,
730
(scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
731
states[0].last_newline_offset,
732
);
733
}
734
735
scan_offset += 64;
736
}
737
738
global_quote_parity = global_quote_parity_mask > 0;
739
}
740
741
while scan_offset < bytes.len() {
742
let c = unsafe { *bytes.get_unchecked(scan_offset) };
743
global_quote_parity ^= (c == self.quote_char) & self.quoting;
744
745
let state = &mut states[global_quote_parity as usize];
746
state.newline_count += (c == self.eol_char) as usize;
747
state.last_newline_offset =
748
select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
749
750
scan_offset += 1;
751
}
752
753
states[0].end_inside_string = global_quote_parity;
754
states[1].end_inside_string = !global_quote_parity;
755
states
756
}
757
758
// bytes must begin at the start of a line.
759
fn analyze_chunk_with_comment(&self, bytes: &[u8], mut in_string: bool) -> LineStats {
760
let pre_s = match self.comment_prefix.as_ref().unwrap() {
761
CommentPrefix::Single(pc) => core::slice::from_ref(pc),
762
CommentPrefix::Multi(ps) => ps.as_bytes(),
763
};
764
765
let mut state = LineStats::default();
766
let mut scan_offset = 0;
767
while scan_offset < bytes.len() {
768
// Skip comment line if needed.
769
while bytes[scan_offset..].starts_with(pre_s) {
770
scan_offset += pre_s.len();
771
let Some(nl_off) = bytes[scan_offset..]
772
.iter()
773
.position(|c| *c == self.eol_char)
774
else {
775
break;
776
};
777
scan_offset += nl_off + 1;
778
}
779
780
while scan_offset < bytes.len() {
781
let c = unsafe { *bytes.get_unchecked(scan_offset) };
782
in_string ^= (c == self.quote_char) & self.quoting;
783
784
if c == self.eol_char && !in_string {
785
state.newline_count += 1;
786
state.last_newline_offset = scan_offset;
787
scan_offset += 1;
788
break;
789
} else {
790
scan_offset += 1;
791
}
792
}
793
}
794
795
state.end_inside_string = in_string;
796
state
797
}
798
799
pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
800
loop {
801
let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
802
803
let (count, offset) = if self.comment_prefix.is_some() {
804
let stats = self.analyze_chunk_with_comment(b, false);
805
(stats.newline_count, stats.last_newline_offset)
806
} else {
807
self.count(b)
808
};
809
810
if count > 0 || b.len() == bytes.len() {
811
return (count, offset);
812
}
813
814
*chunk_size = chunk_size.saturating_mul(2);
815
}
816
}
817
818
pub fn count_rows(&self, bytes: &[u8], is_eof: bool) -> (usize, usize) {
819
let stats = if self.comment_prefix.is_some() {
820
self.analyze_chunk_with_comment(bytes, false)
821
} else {
822
self.analyze_chunk(bytes)[0]
823
};
824
825
let mut count = stats.newline_count;
826
let mut offset = stats.last_newline_offset;
827
828
if count > 0 {
829
offset = cmp::min(offset + 1, bytes.len());
830
} else {
831
debug_assert!(offset == 0);
832
}
833
834
if is_eof {
835
count += ends_in_unterminated_row(bytes, self.eol_char, self.comment_prefix.as_ref())
836
as usize;
837
offset = bytes.len();
838
}
839
840
(count, offset)
841
}
842
843
/// Returns count and offset to split for remainder in slice.
844
#[cfg(feature = "simd")]
845
pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
846
let mut total_idx = 0;
847
let original_bytes = bytes;
848
let mut count = 0;
849
let mut position = 0;
850
let mut not_in_field_previous_iter = true;
851
852
loop {
853
let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
854
855
if bytes.len() > SIMD_SIZE {
856
let lane: [u8; SIMD_SIZE] = unsafe {
857
bytes
858
.get_unchecked(0..SIMD_SIZE)
859
.try_into()
860
.unwrap_unchecked()
861
};
862
let simd_bytes = SimdVec::from(lane);
863
let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
864
865
let valid_eols = if self.quoting {
866
let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
867
let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
868
869
if not_in_field_previous_iter {
870
not_in_quote_field = !not_in_quote_field;
871
}
872
not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
873
eol_mask & not_in_quote_field
874
} else {
875
eol_mask
876
};
877
878
if valid_eols != 0 {
879
count += valid_eols.count_ones() as usize;
880
position = total_idx + 63 - valid_eols.leading_zeros() as usize;
881
debug_assert_eq!(original_bytes[position], self.eol_char)
882
}
883
total_idx += SIMD_SIZE;
884
} else if bytes.is_empty() {
885
debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
886
return (count, position);
887
} else {
888
let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
889
890
let (count, position) = if c > 0 {
891
(count + c, total_idx + o)
892
} else {
893
(count, position)
894
};
895
debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
896
897
return (count, position);
898
}
899
}
900
}
901
902
#[cfg(not(feature = "simd"))]
903
pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
904
self.count_no_simd(bytes, false)
905
}
906
907
fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
908
let iter = bytes.iter();
909
let mut in_field = in_field;
910
let mut count = 0;
911
let mut position = 0;
912
913
for b in iter {
914
let c = *b;
915
if self.quoting && c == self.quote_char {
916
// toggle between string field enclosure
917
// if we encounter a starting '"' -> in_field = true;
918
// if we encounter a closing '"' -> in_field = false;
919
in_field = !in_field;
920
}
921
// If we are not in a string and we encounter '\n' we can stop at this position.
922
else if c == self.eol_char && !in_field {
923
position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
924
count += 1;
925
}
926
}
927
debug_assert!(count == 0 || bytes[position] == self.eol_char);
928
929
(count, position)
930
}
931
}
932
933
fn ends_in_unterminated_row(
934
bytes: &[u8],
935
eol_char: u8,
936
comment_prefix: Option<&CommentPrefix>,
937
) -> bool {
938
if !bytes.is_empty() && bytes.last().copied().unwrap() != eol_char {
939
// We can do a simple backwards-scan to find the start of last line if it is a
940
// comment line, since comment lines can't escape new-lines.
941
let last_new_line_post = memchr::memrchr(eol_char, bytes).unwrap_or(0);
942
let last_line_is_comment_line = bytes
943
.get(last_new_line_post + 1..)
944
.map(|line| is_comment_line(line, comment_prefix))
945
.unwrap_or(false);
946
947
return !last_line_is_comment_line;
948
}
949
950
false
951
}
952
953
#[inline]
954
fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
955
let mut in_field = false;
956
957
let mut idx = 0u32;
958
// micro optimizations
959
#[allow(clippy::explicit_counter_loop)]
960
for &c in bytes.iter() {
961
if c == quote_char {
962
// toggle between string field enclosure
963
// if we encounter a starting '"' -> in_field = true;
964
// if we encounter a closing '"' -> in_field = false;
965
in_field = !in_field;
966
}
967
968
if !in_field && c == needle {
969
return Some(idx as usize);
970
}
971
idx += 1;
972
}
973
None
974
}
975
976
#[inline]
977
pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
978
let pos = match quote {
979
Some(quote) => find_quoted(bytes, quote, eol_char),
980
None => bytes.iter().position(|x| *x == eol_char),
981
};
982
match pos {
983
None => &[],
984
Some(pos) => &bytes[pos + 1..],
985
}
986
}
987
988
#[inline]
989
pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
990
if let Some(pos) = next_line_position_naive(input, eol_char) {
991
unsafe { input.get_unchecked(pos..) }
992
} else {
993
&[]
994
}
995
}
996
997
/// Parse CSV.
998
///
999
/// # Arguments
1000
/// * `bytes` - input to parse
1001
/// * `offset` - offset in bytes in total input. This is 0 if single threaded. If multi-threaded every
1002
/// thread has a different offset.
1003
/// * `projection` - Indices of the columns to project.
1004
/// * `buffers` - Parsed output will be written to these buffers. Except for UTF8 data. The offsets of the
1005
/// fields are written to the buffers. The UTF8 data will be parsed later.
1006
///
1007
/// Returns the number of bytes parsed successfully.
1008
#[allow(clippy::too_many_arguments)]
1009
pub(super) fn parse_lines(
1010
mut bytes: &[u8],
1011
parse_options: &CsvParseOptions,
1012
offset: usize,
1013
ignore_errors: bool,
1014
null_values: Option<&NullValuesCompiled>,
1015
projection: &[usize],
1016
buffers: &mut [Builder],
1017
n_lines: usize,
1018
// length of original schema
1019
schema_len: usize,
1020
schema: &Schema,
1021
) -> PolarsResult<usize> {
1022
assert!(
1023
!projection.is_empty(),
1024
"at least one column should be projected"
1025
);
1026
let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
1027
// During projection pushdown we are not checking other csv fields.
1028
// This would be very expensive and we don't care as we only want
1029
// the projected columns.
1030
if projection.len() != schema_len {
1031
truncate_ragged_lines = true
1032
}
1033
1034
// we use the pointers to track the no of bytes read.
1035
let start = bytes.as_ptr() as usize;
1036
let original_bytes_len = bytes.len();
1037
let n_lines = n_lines as u32;
1038
1039
let mut line_count = 0u32;
1040
loop {
1041
if line_count > n_lines {
1042
let end = bytes.as_ptr() as usize;
1043
return Ok(end - start);
1044
}
1045
1046
if bytes.is_empty() {
1047
return Ok(original_bytes_len);
1048
} else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
1049
// deal with comments
1050
let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
1051
bytes = bytes_rem;
1052
continue;
1053
}
1054
1055
// Every line we only need to parse the columns that are projected.
1056
// Therefore we check if the idx of the field is in our projected columns.
1057
// If it is not, we skip the field.
1058
let mut projection_iter = projection.iter().copied();
1059
let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
1060
let mut processed_fields = 0;
1061
1062
let mut iter = SplitFields::new(
1063
bytes,
1064
parse_options.separator,
1065
parse_options.quote_char,
1066
parse_options.eol_char,
1067
);
1068
let mut idx = 0u32;
1069
let mut read_sol = 0;
1070
loop {
1071
match iter.next() {
1072
// end of line
1073
None => {
1074
bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
1075
break;
1076
},
1077
Some((mut field, needs_escaping)) => {
1078
let field_len = field.len();
1079
1080
// +1 is the split character that is consumed by the iterator.
1081
read_sol += field_len + 1;
1082
1083
if idx == next_projected as u32 {
1084
// the iterator is finished when it encounters a `\n`
1085
// this could be preceded by a '\r'
1086
unsafe {
1087
if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
1088
field = field.get_unchecked(..field_len - 1);
1089
}
1090
}
1091
1092
debug_assert!(processed_fields < buffers.len());
1093
let buf = unsafe {
1094
// SAFETY: processed fields index can never exceed the projection indices.
1095
buffers.get_unchecked_mut(processed_fields)
1096
};
1097
let mut add_null = false;
1098
1099
// if we have null values argument, check if this field equal null value
1100
if let Some(null_values) = null_values {
1101
let field = if needs_escaping && !field.is_empty() {
1102
unsafe { field.get_unchecked(1..field.len() - 1) }
1103
} else {
1104
field
1105
};
1106
1107
// SAFETY:
1108
// process fields is in bounds
1109
add_null = unsafe { null_values.is_null(field, idx as usize) }
1110
}
1111
if add_null {
1112
buf.add_null(!parse_options.missing_is_null && field.is_empty())
1113
} else {
1114
buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1115
.map_err(|e| {
1116
let bytes_offset = offset + field.as_ptr() as usize - start;
1117
let unparsable = String::from_utf8_lossy(field);
1118
let column_name = schema.get_at_index(idx as usize).unwrap().0;
1119
polars_err!(
1120
ComputeError:
1121
"could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1122
The current offset in the file is {} bytes.\n\
1123
\n\
1124
You might want to try:\n\
1125
- increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1126
- specifying correct dtype with the `schema_overrides` argument\n\
1127
- setting `ignore_errors` to `True`,\n\
1128
- adding `{}` to the `null_values` list.\n\n\
1129
Original error: ```{}```",
1130
&unparsable,
1131
buf.dtype(),
1132
column_name,
1133
idx + 1,
1134
bytes_offset,
1135
&unparsable,
1136
e
1137
)
1138
})?;
1139
}
1140
processed_fields += 1;
1141
1142
// if we have all projected columns we are done with this line
1143
match projection_iter.next() {
1144
Some(p) => next_projected = p,
1145
None => {
1146
if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1147
bytes = unsafe { bytes.get_unchecked(read_sol..) };
1148
} else {
1149
if !truncate_ragged_lines && read_sol < bytes.len() {
1150
polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1151
1152
Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1153
}
1154
let bytes_rem = skip_this_line(
1155
unsafe { bytes.get_unchecked(read_sol - 1..) },
1156
parse_options.quote_char,
1157
parse_options.eol_char,
1158
);
1159
bytes = bytes_rem;
1160
}
1161
break;
1162
},
1163
}
1164
}
1165
idx += 1;
1166
},
1167
}
1168
}
1169
1170
// there can be lines that miss fields (also the comma values)
1171
// this means the splitter won't process them.
1172
// We traverse them to read them as null values.
1173
while processed_fields < projection.len() {
1174
debug_assert!(processed_fields < buffers.len());
1175
let buf = unsafe {
1176
// SAFETY: processed fields index can never exceed the projection indices.
1177
buffers.get_unchecked_mut(processed_fields)
1178
};
1179
buf.add_null(!parse_options.missing_is_null);
1180
processed_fields += 1;
1181
}
1182
line_count += 1;
1183
}
1184
}
1185
1186
#[cfg(test)]
1187
mod test {
1188
use super::SplitLines;
1189
1190
#[test]
1191
fn test_splitlines() {
1192
let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1193
let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1194
assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1195
assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1196
assert_eq!(lines.next(), None);
1197
1198
let input2 = "1,'foo\n'\n2,'foo\n'\n";
1199
let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1200
assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1201
assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1202
assert_eq!(lines2.next(), None);
1203
}
1204
}
1205
1206