Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/csv/read/parser.rs
6939 views
1
use memchr::memchr2_iter;
2
use num_traits::Pow;
3
use polars_core::prelude::*;
4
use polars_core::{POOL, config};
5
use polars_error::feature_gated;
6
use polars_utils::mmap::MMapSemaphore;
7
use polars_utils::plpath::PlPathRef;
8
use polars_utils::select::select_unpredictable;
9
use rayon::prelude::*;
10
11
use super::CsvParseOptions;
12
use super::buffer::Buffer;
13
use super::options::{CommentPrefix, NullValuesCompiled};
14
use super::splitfields::SplitFields;
15
use super::utils::get_file_chunks;
16
use crate::prelude::_csv_read_internal::find_starting_point;
17
use crate::utils::compression::maybe_decompress_bytes;
18
19
/// Read the number of rows without parsing columns
20
/// useful for count(*) queries
21
#[allow(clippy::too_many_arguments)]
22
pub fn count_rows(
23
addr: PlPathRef<'_>,
24
separator: u8,
25
quote_char: Option<u8>,
26
comment_prefix: Option<&CommentPrefix>,
27
eol_char: u8,
28
has_header: bool,
29
skip_lines: usize,
30
skip_rows_before_header: usize,
31
skip_rows_after_header: usize,
32
) -> PolarsResult<usize> {
33
let file = match addr
34
.as_local_path()
35
.and_then(|v| (!config::force_async()).then_some(v))
36
{
37
None => feature_gated!("cloud", {
38
crate::file_cache::FILE_CACHE
39
.get_entry(addr)
40
// Safety: This was initialized by schema inference.
41
.unwrap()
42
.try_open_assume_latest()?
43
}),
44
Some(path) => polars_utils::open_file(path)?,
45
};
46
47
let mmap = MMapSemaphore::new_from_file(&file).unwrap();
48
let owned = &mut vec![];
49
let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;
50
51
count_rows_from_slice_par(
52
reader_bytes,
53
separator,
54
quote_char,
55
comment_prefix,
56
eol_char,
57
has_header,
58
skip_lines,
59
skip_rows_before_header,
60
skip_rows_after_header,
61
)
62
}
63
64
/// Read the number of rows without parsing columns
65
/// useful for count(*) queries
66
#[allow(clippy::too_many_arguments)]
67
pub fn count_rows_from_slice_par(
68
mut bytes: &[u8],
69
separator: u8,
70
quote_char: Option<u8>,
71
comment_prefix: Option<&CommentPrefix>,
72
eol_char: u8,
73
has_header: bool,
74
skip_lines: usize,
75
skip_rows_before_header: usize,
76
skip_rows_after_header: usize,
77
) -> PolarsResult<usize> {
78
for _ in 0..bytes.len() {
79
if bytes[0] != eol_char {
80
break;
81
}
82
83
bytes = &bytes[1..];
84
}
85
86
// Skip lines and jump past header
87
88
let start_offset = find_starting_point(
89
bytes,
90
quote_char,
91
eol_char,
92
// schema_len
93
// NOTE: schema_len is normally required to differentiate handling a leading blank line
94
// between case (a) when schema_len == 1 (as an empty string) vs case (b) when
95
// schema_len > 1 (as a blank line to be ignored).
96
// We skip blank lines, even when UFT8-BOM is present and schema_len == 1.
97
usize::MAX,
98
skip_lines,
99
skip_rows_before_header,
100
skip_rows_after_header,
101
comment_prefix,
102
has_header,
103
)?;
104
bytes = &bytes[start_offset..];
105
106
const MIN_ROWS_PER_THREAD: usize = 1024;
107
let max_threads = POOL.current_num_threads();
108
109
// Determine if parallelism is beneficial and how many threads
110
let n_threads = get_line_stats(
111
bytes,
112
MIN_ROWS_PER_THREAD,
113
eol_char,
114
None,
115
separator,
116
quote_char,
117
)
118
.map(|(mean, std)| {
119
let n_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
120
(n_rows / MIN_ROWS_PER_THREAD).clamp(1, max_threads)
121
})
122
.unwrap_or(1);
123
124
if n_threads == 1 {
125
return count_rows_from_slice(bytes, quote_char, comment_prefix, eol_char, false);
126
}
127
128
let file_chunks: Vec<(usize, usize)> =
129
get_file_chunks(bytes, n_threads, None, separator, quote_char, eol_char);
130
131
let iter = file_chunks.into_par_iter().map(|(start, stop)| {
132
let bytes = &bytes[start..stop];
133
134
if comment_prefix.is_some() {
135
SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
136
.filter(|line| !is_comment_line(line, comment_prefix))
137
.count()
138
} else {
139
CountLines::new(quote_char, eol_char).count(bytes).0
140
+ bytes.last().is_some_and(|x| *x != b'\n') as usize
141
}
142
});
143
144
let n: usize = POOL.install(|| iter.sum());
145
146
Ok(n)
147
}
148
149
/// Read the number of rows without parsing columns
150
pub fn count_rows_from_slice(
151
mut bytes: &[u8],
152
quote_char: Option<u8>,
153
comment_prefix: Option<&CommentPrefix>,
154
eol_char: u8,
155
has_header: bool,
156
) -> PolarsResult<usize> {
157
for _ in 0..bytes.len() {
158
if bytes[0] != eol_char {
159
break;
160
}
161
162
bytes = &bytes[1..];
163
}
164
165
let n = if comment_prefix.is_some() {
166
SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
167
.filter(|line| !is_comment_line(line, comment_prefix))
168
.count()
169
} else {
170
CountLines::new(quote_char, eol_char).count(bytes).0
171
+ bytes.last().is_some_and(|x| *x != b'\n') as usize
172
};
173
174
Ok(n - (has_header as usize))
175
}
176
177
/// Skip the utf-8 Byte Order Mark.
178
/// credits to csv-core
179
pub(super) fn skip_bom(input: &[u8]) -> &[u8] {
180
if input.len() >= 3 && &input[0..3] == b"\xef\xbb\xbf" {
181
&input[3..]
182
} else {
183
input
184
}
185
}
186
187
/// Checks if a line in a CSV file is a comment based on the given comment prefix configuration.
188
///
189
/// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters.
190
#[inline]
191
pub(super) fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
192
match comment_prefix {
193
Some(CommentPrefix::Single(c)) => line.first() == Some(c),
194
Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
195
None => false,
196
}
197
}
198
199
/// Find the nearest next line position.
200
/// Does not check for new line characters embedded in String fields.
201
pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
202
let pos = memchr::memchr(eol_char, input)? + 1;
203
if input.len() - pos == 0 {
204
return None;
205
}
206
Some(pos)
207
}
208
209
pub(super) fn skip_lines_naive(mut input: &[u8], eol_char: u8, skip: usize) -> &[u8] {
210
for _ in 0..skip {
211
if let Some(pos) = next_line_position_naive(input, eol_char) {
212
input = &input[pos..];
213
} else {
214
return input;
215
}
216
}
217
input
218
}
219
220
/// Find the nearest next line position that is not embedded in a String field.
221
pub(super) fn next_line_position(
222
mut input: &[u8],
223
mut expected_fields: Option<usize>,
224
separator: u8,
225
quote_char: Option<u8>,
226
eol_char: u8,
227
) -> Option<usize> {
228
fn accept_line(
229
line: &[u8],
230
expected_fields: usize,
231
separator: u8,
232
eol_char: u8,
233
quote_char: Option<u8>,
234
) -> bool {
235
let mut count = 0usize;
236
for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
237
if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
238
return false;
239
}
240
count += 1;
241
}
242
243
// if the latest field is missing
244
// e.g.:
245
// a,b,c
246
// vala,valb,
247
// SplitFields returns a count that is 1 less
248
// There fore we accept:
249
// expected == count
250
// and
251
// expected == count - 1
252
expected_fields.wrapping_sub(count) <= 1
253
}
254
255
// we check 3 subsequent lines for `accept_line` before we accept
256
// if 3 groups are rejected we reject completely
257
let mut rejected_line_groups = 0u8;
258
259
let mut total_pos = 0;
260
if input.is_empty() {
261
return None;
262
}
263
let mut lines_checked = 0u8;
264
loop {
265
if rejected_line_groups >= 3 {
266
return None;
267
}
268
lines_checked = lines_checked.wrapping_add(1);
269
// headers might have an extra value
270
// So if we have churned through enough lines
271
// we try one field less.
272
if lines_checked == u8::MAX {
273
if let Some(ef) = expected_fields {
274
expected_fields = Some(ef.saturating_sub(1))
275
}
276
};
277
let pos = memchr::memchr(eol_char, input)? + 1;
278
if input.len() - pos == 0 {
279
return None;
280
}
281
debug_assert!(pos <= input.len());
282
let new_input = unsafe { input.get_unchecked(pos..) };
283
let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
284
let line = lines.next();
285
286
match (line, expected_fields) {
287
// count the fields, and determine if they are equal to what we expect from the schema
288
(Some(line), Some(expected_fields)) => {
289
if accept_line(line, expected_fields, separator, eol_char, quote_char) {
290
let mut valid = true;
291
for line in lines.take(2) {
292
if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
293
valid = false;
294
break;
295
}
296
}
297
if valid {
298
return Some(total_pos + pos);
299
} else {
300
rejected_line_groups += 1;
301
}
302
} else {
303
debug_assert!(pos < input.len());
304
unsafe {
305
input = input.get_unchecked(pos + 1..);
306
}
307
total_pos += pos + 1;
308
}
309
},
310
// don't count the fields
311
(Some(_), None) => return Some(total_pos + pos),
312
// // no new line found, check latest line (without eol) for number of fields
313
_ => return None,
314
}
315
}
316
}
317
318
pub(super) fn is_line_ending(b: u8, eol_char: u8) -> bool {
319
b == eol_char || b == b'\r'
320
}
321
322
pub(super) fn is_whitespace(b: u8) -> bool {
323
b == b' ' || b == b'\t'
324
}
325
326
#[inline]
327
fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
328
where
329
F: Fn(u8) -> bool,
330
{
331
if input.is_empty() {
332
return input;
333
}
334
335
let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
336
&input[read..]
337
}
338
339
/// Remove whitespace from the start of buffer.
340
/// Makes sure that the bytes stream starts with
341
/// 'field_1,field_2'
342
/// and not with
343
/// '\nfield_1,field_1'
344
#[inline]
345
pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
346
skip_condition(input, is_whitespace)
347
}
348
349
#[inline]
350
pub(super) fn skip_line_ending(input: &[u8], eol_char: u8) -> &[u8] {
351
skip_condition(input, |b| is_line_ending(b, eol_char))
352
}
353
354
/// Get the mean and standard deviation of length of lines in bytes
355
pub(super) fn get_line_stats(
356
bytes: &[u8],
357
n_lines: usize,
358
eol_char: u8,
359
expected_fields: Option<usize>,
360
separator: u8,
361
quote_char: Option<u8>,
362
) -> Option<(f32, f32)> {
363
let mut lengths = Vec::with_capacity(n_lines);
364
365
let mut bytes_trunc;
366
let n_lines_per_iter = n_lines / 2;
367
368
let mut n_read = 0;
369
370
// sample from start and 75% in the file
371
for offset in [0, (bytes.len() as f32 * 0.75) as usize] {
372
bytes_trunc = &bytes[offset..];
373
let pos = next_line_position(
374
bytes_trunc,
375
expected_fields,
376
separator,
377
quote_char,
378
eol_char,
379
)?;
380
bytes_trunc = &bytes_trunc[pos + 1..];
381
382
for _ in offset..(offset + n_lines_per_iter) {
383
let pos = next_line_position_naive(bytes_trunc, eol_char)? + 1;
384
n_read += pos;
385
lengths.push(pos);
386
bytes_trunc = &bytes_trunc[pos..];
387
}
388
}
389
390
let n_samples = lengths.len();
391
392
let mean = (n_read as f32) / (n_samples as f32);
393
let mut std = 0.0;
394
for &len in lengths.iter() {
395
std += (len as f32 - mean).pow(2.0)
396
}
397
std = (std / n_samples as f32).sqrt();
398
Some((mean, std))
399
}
400
401
/// An adapted version of std::iter::Split.
402
/// This exists solely because we cannot split the file in lines naively as
403
///
404
/// ```text
405
/// for line in bytes.split(b'\n') {
406
/// ```
407
///
408
/// This will fail when strings fields are have embedded end line characters.
409
/// For instance: "This is a valid field\nI have multiples lines" is a valid string field, that contains multiple lines.
410
pub(super) struct SplitLines<'a> {
411
v: &'a [u8],
412
quote_char: u8,
413
eol_char: u8,
414
#[cfg(feature = "simd")]
415
simd_eol_char: SimdVec,
416
#[cfg(feature = "simd")]
417
simd_quote_char: SimdVec,
418
#[cfg(feature = "simd")]
419
previous_valid_eols: u64,
420
total_index: usize,
421
quoting: bool,
422
comment_prefix: Option<&'a CommentPrefix>,
423
}
424
425
#[cfg(feature = "simd")]
426
const SIMD_SIZE: usize = 64;
427
#[cfg(feature = "simd")]
428
use std::simd::prelude::*;
429
430
#[cfg(feature = "simd")]
431
use polars_utils::clmul::prefix_xorsum_inclusive;
432
433
#[cfg(feature = "simd")]
434
type SimdVec = u8x64;
435
436
impl<'a> SplitLines<'a> {
437
pub(super) fn new(
438
slice: &'a [u8],
439
quote_char: Option<u8>,
440
eol_char: u8,
441
comment_prefix: Option<&'a CommentPrefix>,
442
) -> Self {
443
let quoting = quote_char.is_some();
444
let quote_char = quote_char.unwrap_or(b'\"');
445
#[cfg(feature = "simd")]
446
let simd_eol_char = SimdVec::splat(eol_char);
447
#[cfg(feature = "simd")]
448
let simd_quote_char = SimdVec::splat(quote_char);
449
Self {
450
v: slice,
451
quote_char,
452
eol_char,
453
#[cfg(feature = "simd")]
454
simd_eol_char,
455
#[cfg(feature = "simd")]
456
simd_quote_char,
457
#[cfg(feature = "simd")]
458
previous_valid_eols: 0,
459
total_index: 0,
460
quoting,
461
comment_prefix,
462
}
463
}
464
}
465
466
impl<'a> SplitLines<'a> {
467
// scalar as in non-simd
468
fn next_scalar(&mut self) -> Option<&'a [u8]> {
469
if self.v.is_empty() {
470
return None;
471
}
472
if is_comment_line(self.v, self.comment_prefix) {
473
return self.next_comment_line();
474
}
475
{
476
let mut pos = 0u32;
477
let mut iter = self.v.iter();
478
let mut in_field = false;
479
loop {
480
match iter.next() {
481
Some(&c) => {
482
pos += 1;
483
484
if self.quoting && c == self.quote_char {
485
// toggle between string field enclosure
486
// if we encounter a starting '"' -> in_field = true;
487
// if we encounter a closing '"' -> in_field = false;
488
in_field = !in_field;
489
}
490
// if we are not in a string and we encounter '\n' we can stop at this position.
491
else if c == self.eol_char && !in_field {
492
break;
493
}
494
},
495
None => {
496
let remainder = self.v;
497
self.v = &[];
498
return Some(remainder);
499
},
500
}
501
}
502
503
unsafe {
504
debug_assert!((pos as usize) <= self.v.len());
505
506
// return line up to this position
507
let ret = Some(
508
self.v
509
.get_unchecked(..(self.total_index + pos as usize - 1)),
510
);
511
// skip the '\n' token and update slice.
512
self.v = self.v.get_unchecked(self.total_index + pos as usize..);
513
ret
514
}
515
}
516
}
517
fn next_comment_line(&mut self) -> Option<&'a [u8]> {
518
if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
519
unsafe {
520
// return line up to this position
521
let ret = Some(self.v.get_unchecked(..(pos - 1)));
522
// skip the '\n' token and update slice.
523
self.v = self.v.get_unchecked(pos..);
524
ret
525
}
526
} else {
527
let remainder = self.v;
528
self.v = &[];
529
Some(remainder)
530
}
531
}
532
}
533
534
impl<'a> Iterator for SplitLines<'a> {
535
type Item = &'a [u8];
536
537
#[inline]
538
#[cfg(not(feature = "simd"))]
539
fn next(&mut self) -> Option<&'a [u8]> {
540
self.next_scalar()
541
}
542
543
#[inline]
544
#[cfg(feature = "simd")]
545
fn next(&mut self) -> Option<&'a [u8]> {
546
// First check cached value
547
if self.previous_valid_eols != 0 {
548
let pos = self.previous_valid_eols.trailing_zeros() as usize;
549
self.previous_valid_eols >>= (pos + 1) as u64;
550
551
unsafe {
552
debug_assert!((pos) <= self.v.len());
553
554
// return line up to this position
555
let ret = Some(self.v.get_unchecked(..pos));
556
// skip the '\n' token and update slice.
557
self.v = self.v.get_unchecked(pos + 1..);
558
return ret;
559
}
560
}
561
if self.v.is_empty() {
562
return None;
563
}
564
if self.comment_prefix.is_some() {
565
return self.next_scalar();
566
}
567
568
self.total_index = 0;
569
let mut not_in_field_previous_iter = true;
570
571
loop {
572
let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
573
if bytes.len() > SIMD_SIZE {
574
let lane: [u8; SIMD_SIZE] = unsafe {
575
bytes
576
.get_unchecked(0..SIMD_SIZE)
577
.try_into()
578
.unwrap_unchecked()
579
};
580
let simd_bytes = SimdVec::from(lane);
581
let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
582
583
let valid_eols = if self.quoting {
584
let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
585
let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
586
587
if not_in_field_previous_iter {
588
not_in_quote_field = !not_in_quote_field;
589
}
590
not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
591
eol_mask & not_in_quote_field
592
} else {
593
eol_mask
594
};
595
596
if valid_eols != 0 {
597
let pos = valid_eols.trailing_zeros() as usize;
598
if pos == SIMD_SIZE - 1 {
599
self.previous_valid_eols = 0;
600
} else {
601
self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
602
}
603
604
unsafe {
605
let pos = self.total_index + pos;
606
debug_assert!((pos) <= self.v.len());
607
608
// return line up to this position
609
let ret = Some(self.v.get_unchecked(..pos));
610
// skip the '\n' token and update slice.
611
self.v = self.v.get_unchecked(pos + 1..);
612
return ret;
613
}
614
} else {
615
self.total_index += SIMD_SIZE;
616
}
617
} else {
618
// Denotes if we are in a string field, started with a quote
619
let mut in_field = !not_in_field_previous_iter;
620
let mut pos = 0u32;
621
let mut iter = bytes.iter();
622
loop {
623
match iter.next() {
624
Some(&c) => {
625
pos += 1;
626
627
if self.quoting && c == self.quote_char {
628
// toggle between string field enclosure
629
// if we encounter a starting '"' -> in_field = true;
630
// if we encounter a closing '"' -> in_field = false;
631
in_field = !in_field;
632
}
633
// if we are not in a string and we encounter '\n' we can stop at this position.
634
else if c == self.eol_char && !in_field {
635
break;
636
}
637
},
638
None => {
639
let remainder = self.v;
640
self.v = &[];
641
return Some(remainder);
642
},
643
}
644
}
645
646
unsafe {
647
debug_assert!((pos as usize) <= self.v.len());
648
649
// return line up to this position
650
let ret = Some(
651
self.v
652
.get_unchecked(..(self.total_index + pos as usize - 1)),
653
);
654
// skip the '\n' token and update slice.
655
self.v = self.v.get_unchecked(self.total_index + pos as usize..);
656
return ret;
657
}
658
}
659
}
660
}
661
}
662
663
pub struct CountLines {
664
quote_char: u8,
665
eol_char: u8,
666
#[cfg(feature = "simd")]
667
simd_eol_char: SimdVec,
668
#[cfg(feature = "simd")]
669
simd_quote_char: SimdVec,
670
quoting: bool,
671
}
672
673
#[derive(Copy, Clone, Debug)]
674
pub struct LineStats {
675
newline_count: usize,
676
last_newline_offset: usize,
677
end_inside_string: bool,
678
}
679
680
impl CountLines {
681
pub fn new(quote_char: Option<u8>, eol_char: u8) -> Self {
682
let quoting = quote_char.is_some();
683
let quote_char = quote_char.unwrap_or(b'\"');
684
#[cfg(feature = "simd")]
685
let simd_eol_char = SimdVec::splat(eol_char);
686
#[cfg(feature = "simd")]
687
let simd_quote_char = SimdVec::splat(quote_char);
688
Self {
689
quote_char,
690
eol_char,
691
#[cfg(feature = "simd")]
692
simd_eol_char,
693
#[cfg(feature = "simd")]
694
simd_quote_char,
695
quoting,
696
}
697
}
698
699
/// Analyzes a chunk of CSV data.
700
///
701
/// Returns (newline_count, last_newline_offset, end_inside_string) twice,
702
/// the first is assuming the start of the chunk is *not* inside a string,
703
/// the second assuming the start is inside a string.
704
pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
705
let mut scan_offset = 0;
706
let mut states = [
707
LineStats {
708
newline_count: 0,
709
last_newline_offset: 0,
710
end_inside_string: false,
711
},
712
LineStats {
713
newline_count: 0,
714
last_newline_offset: 0,
715
end_inside_string: false,
716
},
717
];
718
719
// false if even number of quotes seen so far, true otherwise.
720
#[allow(unused_assignments)]
721
let mut global_quote_parity = false;
722
723
#[cfg(feature = "simd")]
724
{
725
// 0 if even number of quotes seen so far, u64::MAX otherwise.
726
let mut global_quote_parity_mask = 0;
727
while scan_offset + 64 <= bytes.len() {
728
let block: [u8; 64] = unsafe {
729
bytes
730
.get_unchecked(scan_offset..scan_offset + 64)
731
.try_into()
732
.unwrap_unchecked()
733
};
734
let simd_bytes = SimdVec::from(block);
735
let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
736
if self.quoting {
737
let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
738
let quote_parity =
739
prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
740
global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
741
742
let start_outside_string_eol_mask = eol_mask & !quote_parity;
743
states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
744
states[0].last_newline_offset = select_unpredictable(
745
start_outside_string_eol_mask != 0,
746
(scan_offset + 63)
747
.wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
748
states[0].last_newline_offset,
749
);
750
751
let start_inside_string_eol_mask = eol_mask & quote_parity;
752
states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
753
states[1].last_newline_offset = select_unpredictable(
754
start_inside_string_eol_mask != 0,
755
(scan_offset + 63)
756
.wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
757
states[1].last_newline_offset,
758
);
759
} else {
760
states[0].newline_count += eol_mask.count_ones() as usize;
761
states[0].last_newline_offset = select_unpredictable(
762
eol_mask != 0,
763
(scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
764
states[0].last_newline_offset,
765
);
766
}
767
768
scan_offset += 64;
769
}
770
771
global_quote_parity = global_quote_parity_mask > 0;
772
}
773
774
while scan_offset < bytes.len() {
775
let c = unsafe { *bytes.get_unchecked(scan_offset) };
776
global_quote_parity ^= (c == self.quote_char) & self.quoting;
777
778
let state = &mut states[global_quote_parity as usize];
779
state.newline_count += (c == self.eol_char) as usize;
780
state.last_newline_offset =
781
select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
782
783
scan_offset += 1;
784
}
785
786
states[0].end_inside_string = global_quote_parity;
787
states[1].end_inside_string = !global_quote_parity;
788
states
789
}
790
791
pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
792
loop {
793
let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
794
795
let (count, offset) = self.count(b);
796
797
if count > 0 || b.len() == bytes.len() {
798
return (count, offset);
799
}
800
801
*chunk_size *= 2;
802
}
803
}
804
805
/// Returns count and offset to split for remainder in slice.
806
#[cfg(feature = "simd")]
807
pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
808
let mut total_idx = 0;
809
let original_bytes = bytes;
810
let mut count = 0;
811
let mut position = 0;
812
let mut not_in_field_previous_iter = true;
813
814
loop {
815
let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
816
817
if bytes.len() > SIMD_SIZE {
818
let lane: [u8; SIMD_SIZE] = unsafe {
819
bytes
820
.get_unchecked(0..SIMD_SIZE)
821
.try_into()
822
.unwrap_unchecked()
823
};
824
let simd_bytes = SimdVec::from(lane);
825
let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
826
827
let valid_eols = if self.quoting {
828
let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
829
let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
830
831
if not_in_field_previous_iter {
832
not_in_quote_field = !not_in_quote_field;
833
}
834
not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
835
eol_mask & not_in_quote_field
836
} else {
837
eol_mask
838
};
839
840
if valid_eols != 0 {
841
count += valid_eols.count_ones() as usize;
842
position = total_idx + 63 - valid_eols.leading_zeros() as usize;
843
debug_assert_eq!(original_bytes[position], self.eol_char)
844
}
845
total_idx += SIMD_SIZE;
846
} else if bytes.is_empty() {
847
debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
848
return (count, position);
849
} else {
850
let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
851
852
let (count, position) = if c > 0 {
853
(count + c, total_idx + o)
854
} else {
855
(count, position)
856
};
857
debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
858
859
return (count, position);
860
}
861
}
862
}
863
864
#[cfg(not(feature = "simd"))]
865
pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
866
self.count_no_simd(bytes, false)
867
}
868
869
fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
870
let iter = bytes.iter();
871
let mut in_field = in_field;
872
let mut count = 0;
873
let mut position = 0;
874
875
for b in iter {
876
let c = *b;
877
if self.quoting && c == self.quote_char {
878
// toggle between string field enclosure
879
// if we encounter a starting '"' -> in_field = true;
880
// if we encounter a closing '"' -> in_field = false;
881
in_field = !in_field;
882
}
883
// If we are not in a string and we encounter '\n' we can stop at this position.
884
else if c == self.eol_char && !in_field {
885
position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
886
count += 1;
887
}
888
}
889
debug_assert!(count == 0 || bytes[position] == self.eol_char);
890
891
(count, position)
892
}
893
}
894
895
#[inline]
896
fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
897
let mut in_field = false;
898
899
let mut idx = 0u32;
900
// micro optimizations
901
#[allow(clippy::explicit_counter_loop)]
902
for &c in bytes.iter() {
903
if c == quote_char {
904
// toggle between string field enclosure
905
// if we encounter a starting '"' -> in_field = true;
906
// if we encounter a closing '"' -> in_field = false;
907
in_field = !in_field;
908
}
909
910
if !in_field && c == needle {
911
return Some(idx as usize);
912
}
913
idx += 1;
914
}
915
None
916
}
917
918
#[inline]
919
pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
920
let pos = match quote {
921
Some(quote) => find_quoted(bytes, quote, eol_char),
922
None => bytes.iter().position(|x| *x == eol_char),
923
};
924
match pos {
925
None => &[],
926
Some(pos) => &bytes[pos + 1..],
927
}
928
}
929
930
#[inline]
931
pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
932
if let Some(pos) = next_line_position_naive(input, eol_char) {
933
unsafe { input.get_unchecked(pos..) }
934
} else {
935
&[]
936
}
937
}
938
939
/// Parse CSV.
940
///
941
/// # Arguments
942
/// * `bytes` - input to parse
943
/// * `offset` - offset in bytes in total input. This is 0 if single threaded. If multi-threaded every
944
/// thread has a different offset.
945
/// * `projection` - Indices of the columns to project.
946
/// * `buffers` - Parsed output will be written to these buffers. Except for UTF8 data. The offsets of the
947
/// fields are written to the buffers. The UTF8 data will be parsed later.
948
///
949
/// Returns the number of bytes parsed successfully.
950
#[allow(clippy::too_many_arguments)]
951
pub(super) fn parse_lines(
952
mut bytes: &[u8],
953
parse_options: &CsvParseOptions,
954
offset: usize,
955
ignore_errors: bool,
956
null_values: Option<&NullValuesCompiled>,
957
projection: &[usize],
958
buffers: &mut [Buffer],
959
n_lines: usize,
960
// length of original schema
961
schema_len: usize,
962
schema: &Schema,
963
) -> PolarsResult<usize> {
964
assert!(
965
!projection.is_empty(),
966
"at least one column should be projected"
967
);
968
let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
969
// During projection pushdown we are not checking other csv fields.
970
// This would be very expensive and we don't care as we only want
971
// the projected columns.
972
if projection.len() != schema_len {
973
truncate_ragged_lines = true
974
}
975
976
// we use the pointers to track the no of bytes read.
977
let start = bytes.as_ptr() as usize;
978
let original_bytes_len = bytes.len();
979
let n_lines = n_lines as u32;
980
981
let mut line_count = 0u32;
982
loop {
983
if line_count > n_lines {
984
let end = bytes.as_ptr() as usize;
985
return Ok(end - start);
986
}
987
988
if bytes.is_empty() {
989
return Ok(original_bytes_len);
990
} else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
991
// deal with comments
992
let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
993
bytes = bytes_rem;
994
continue;
995
}
996
997
// Every line we only need to parse the columns that are projected.
998
// Therefore we check if the idx of the field is in our projected columns.
999
// If it is not, we skip the field.
1000
let mut projection_iter = projection.iter().copied();
1001
let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
1002
let mut processed_fields = 0;
1003
1004
let mut iter = SplitFields::new(
1005
bytes,
1006
parse_options.separator,
1007
parse_options.quote_char,
1008
parse_options.eol_char,
1009
);
1010
let mut idx = 0u32;
1011
let mut read_sol = 0;
1012
loop {
1013
match iter.next() {
1014
// end of line
1015
None => {
1016
bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
1017
break;
1018
},
1019
Some((mut field, needs_escaping)) => {
1020
let field_len = field.len();
1021
1022
// +1 is the split character that is consumed by the iterator.
1023
read_sol += field_len + 1;
1024
1025
if idx == next_projected as u32 {
1026
// the iterator is finished when it encounters a `\n`
1027
// this could be preceded by a '\r'
1028
unsafe {
1029
if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
1030
field = field.get_unchecked(..field_len - 1);
1031
}
1032
}
1033
1034
debug_assert!(processed_fields < buffers.len());
1035
let buf = unsafe {
1036
// SAFETY: processed fields index can never exceed the projection indices.
1037
buffers.get_unchecked_mut(processed_fields)
1038
};
1039
let mut add_null = false;
1040
1041
// if we have null values argument, check if this field equal null value
1042
if let Some(null_values) = null_values {
1043
let field = if needs_escaping && !field.is_empty() {
1044
unsafe { field.get_unchecked(1..field.len() - 1) }
1045
} else {
1046
field
1047
};
1048
1049
// SAFETY:
1050
// process fields is in bounds
1051
add_null = unsafe { null_values.is_null(field, idx as usize) }
1052
}
1053
if add_null {
1054
buf.add_null(!parse_options.missing_is_null && field.is_empty())
1055
} else {
1056
buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1057
.map_err(|e| {
1058
let bytes_offset = offset + field.as_ptr() as usize - start;
1059
let unparsable = String::from_utf8_lossy(field);
1060
let column_name = schema.get_at_index(idx as usize).unwrap().0;
1061
polars_err!(
1062
ComputeError:
1063
"could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1064
The current offset in the file is {} bytes.\n\
1065
\n\
1066
You might want to try:\n\
1067
- increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1068
- specifying correct dtype with the `schema_overrides` argument\n\
1069
- setting `ignore_errors` to `True`,\n\
1070
- adding `{}` to the `null_values` list.\n\n\
1071
Original error: ```{}```",
1072
&unparsable,
1073
buf.dtype(),
1074
column_name,
1075
idx + 1,
1076
bytes_offset,
1077
&unparsable,
1078
e
1079
)
1080
})?;
1081
}
1082
processed_fields += 1;
1083
1084
// if we have all projected columns we are done with this line
1085
match projection_iter.next() {
1086
Some(p) => next_projected = p,
1087
None => {
1088
if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1089
bytes = unsafe { bytes.get_unchecked(read_sol..) };
1090
} else {
1091
if !truncate_ragged_lines && read_sol < bytes.len() {
1092
polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1093
1094
Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1095
}
1096
let bytes_rem = skip_this_line(
1097
unsafe { bytes.get_unchecked(read_sol - 1..) },
1098
parse_options.quote_char,
1099
parse_options.eol_char,
1100
);
1101
bytes = bytes_rem;
1102
}
1103
break;
1104
},
1105
}
1106
}
1107
idx += 1;
1108
},
1109
}
1110
}
1111
1112
// there can be lines that miss fields (also the comma values)
1113
// this means the splitter won't process them.
1114
// We traverse them to read them as null values.
1115
while processed_fields < projection.len() {
1116
debug_assert!(processed_fields < buffers.len());
1117
let buf = unsafe {
1118
// SAFETY: processed fields index can never exceed the projection indices.
1119
buffers.get_unchecked_mut(processed_fields)
1120
};
1121
buf.add_null(!parse_options.missing_is_null);
1122
processed_fields += 1;
1123
}
1124
line_count += 1;
1125
}
1126
}
1127
1128
#[cfg(test)]
1129
mod test {
1130
use super::SplitLines;
1131
1132
#[test]
1133
fn test_splitlines() {
1134
let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1135
let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1136
assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1137
assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1138
assert_eq!(lines.next(), None);
1139
1140
let input2 = "1,'foo\n'\n2,'foo\n'\n";
1141
let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1142
assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1143
assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1144
assert_eq!(lines2.next(), None);
1145
}
1146
}
1147
1148