Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/csv.rs
6939 views
1
use std::ops::Range;
2
use std::sync::Arc;
3
4
use async_trait::async_trait;
5
use polars_core::prelude::{Column, Field};
6
use polars_core::schema::{SchemaExt, SchemaRef};
7
use polars_error::{PolarsResult, polars_bail, polars_err, polars_warn};
8
use polars_io::RowIndex;
9
use polars_io::cloud::CloudOptions;
10
use polars_io::prelude::_csv_read_internal::{
11
CountLines, NullValuesCompiled, cast_columns, find_starting_point, prepare_csv_schema,
12
read_chunk,
13
};
14
use polars_io::prelude::buffer::validate_utf8;
15
use polars_io::prelude::{
16
CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, count_rows_from_slice,
17
};
18
use polars_io::utils::compression::maybe_decompress_bytes;
19
use polars_io::utils::slice::SplitSlicePosition;
20
use polars_plan::dsl::ScanSource;
21
use polars_utils::IdxSize;
22
use polars_utils::mmap::MemSlice;
23
use polars_utils::slice_enum::Slice;
24
25
use super::multi_scan::reader_interface::output::FileReaderOutputRecv;
26
use super::multi_scan::reader_interface::{BeginReadArgs, FileReader, FileReaderCallbacks};
27
use crate::DEFAULT_DISTRIBUTOR_BUFFER_SIZE;
28
use crate::async_executor::{AbortOnDropHandle, spawn};
29
use crate::async_primitives::distributor_channel::{self, distributor_channel};
30
use crate::morsel::SourceToken;
31
use crate::nodes::compute_node_prelude::*;
32
use crate::nodes::io_sources::multi_scan::reader_interface::Projection;
33
use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;
34
use crate::nodes::{MorselSeq, TaskPriority};
35
36
pub mod builder {
37
use std::sync::Arc;
38
39
use polars_core::config;
40
use polars_io::cloud::CloudOptions;
41
use polars_io::prelude::CsvReadOptions;
42
use polars_plan::dsl::ScanSource;
43
44
use super::CsvFileReader;
45
use crate::nodes::io_sources::multi_scan::reader_interface::FileReader;
46
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
47
use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;
48
49
impl FileReaderBuilder for Arc<CsvReadOptions> {
50
fn reader_name(&self) -> &str {
51
"csv"
52
}
53
54
fn reader_capabilities(&self) -> ReaderCapabilities {
55
use ReaderCapabilities as RC;
56
57
RC::NEEDS_FILE_CACHE_INIT
58
| if self.parse_options.comment_prefix.is_some() {
59
RC::empty()
60
} else {
61
RC::PRE_SLICE
62
}
63
}
64
65
fn build_file_reader(
66
&self,
67
source: ScanSource,
68
cloud_options: Option<Arc<CloudOptions>>,
69
_scan_source_idx: usize,
70
) -> Box<dyn FileReader> {
71
let scan_source = source;
72
let verbose = config::verbose();
73
let options = self.clone();
74
75
let reader = CsvFileReader {
76
scan_source,
77
cloud_options,
78
options,
79
verbose,
80
cached_bytes: None,
81
};
82
83
Box::new(reader) as Box<dyn FileReader>
84
}
85
}
86
}
87
88
/// Read all rows in the chunk
89
const NO_SLICE: (usize, usize) = (0, usize::MAX);
90
/// This is used if we finish the slice but still need a row count. It signals to the workers to
91
/// go into line-counting mode where they can skip parsing the chunks.
92
const SLICE_ENDED: (usize, usize) = (usize::MAX, 0);
93
94
struct LineBatch {
95
// Safety: All receivers (LineBatchProcessors) hold a MemSlice ref to this.
96
bytes: &'static [u8],
97
n_lines: usize,
98
slice: (usize, usize),
99
/// Position of this chunk relative to the start of the file according to CountLines.
100
row_offset: usize,
101
morsel_seq: MorselSeq,
102
}
103
104
struct CsvFileReader {
105
scan_source: ScanSource,
106
#[expect(unused)] // Will be used when implementing cloud streaming.
107
cloud_options: Option<Arc<CloudOptions>>,
108
options: Arc<CsvReadOptions>,
109
// Cached on first access - we may be called multiple times e.g. on negative slice.
110
cached_bytes: Option<MemSlice>,
111
verbose: bool,
112
}
113
114
#[async_trait]
115
impl FileReader for CsvFileReader {
116
async fn initialize(&mut self) -> PolarsResult<()> {
117
let memslice = self
118
.scan_source
119
.as_scan_source_ref()
120
.to_memslice_async_assume_latest(self.scan_source.run_async())?;
121
122
// Note: We do not decompress in `initialize()`.
123
self.cached_bytes = Some(memslice);
124
125
Ok(())
126
}
127
128
fn begin_read(
129
&mut self,
130
args: BeginReadArgs,
131
) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {
132
let verbose = self.verbose;
133
134
let memslice = self.get_bytes_maybe_decompress()?;
135
136
let BeginReadArgs {
137
projection: Projection::Plain(projected_schema),
138
// Because we currently only support PRE_SLICE we don't need to handle row index here.
139
row_index,
140
pre_slice,
141
predicate: None,
142
cast_columns_policy: _,
143
num_pipelines,
144
callbacks:
145
FileReaderCallbacks {
146
file_schema_tx,
147
n_rows_in_file_tx,
148
row_position_on_end_tx,
149
},
150
} = args
151
else {
152
panic!("unsupported args: {:?}", &args)
153
};
154
155
match &pre_slice {
156
Some(Slice::Negative { .. }) => unimplemented!(),
157
158
// We don't account for comments when slicing lines. We should never hit this panic -
159
// the FileReaderBuilder does not indicate PRE_SLICE support when we have a comment
160
// prefix.
161
Some(pre_slice)
162
if self.options.parse_options.comment_prefix.is_some() && pre_slice.len() > 0 =>
163
{
164
panic!("{pre_slice:?}")
165
},
166
167
_ => {},
168
}
169
170
// We need to infer the schema to get the columns of this file.
171
let infer_schema_length = if self.options.has_header {
172
Some(1)
173
} else {
174
// If there is no header the line length may increase later in the
175
// file (https://github.com/pola-rs/polars/pull/21979).
176
self.options.infer_schema_length
177
};
178
179
let (mut inferred_schema, ..) = polars_io::csv::read::infer_file_schema(
180
&polars_io::mmap::ReaderBytes::Owned(memslice.clone()),
181
&self.options.parse_options,
182
infer_schema_length,
183
self.options.has_header,
184
self.options.schema_overwrite.as_deref(),
185
self.options.skip_rows,
186
self.options.skip_lines,
187
self.options.skip_rows_after_header,
188
self.options.raise_if_empty,
189
)?;
190
191
if let Some(schema) = &self.options.schema {
192
// Note: User can provide schema with more columns, they will simply
193
// be projected as NULL.
194
// TODO: Should maybe expose a missing_columns parameter to the API for this.
195
if schema.len() < inferred_schema.len()
196
&& !self.options.parse_options.truncate_ragged_lines
197
{
198
polars_bail!(
199
SchemaMismatch:
200
"provided schema does not match number of columns in file ({} != {} in file)",
201
schema.len(),
202
inferred_schema.len(),
203
);
204
}
205
206
if self.options.parse_options.truncate_ragged_lines {
207
inferred_schema = Arc::unwrap_or_clone(schema.clone());
208
} else {
209
inferred_schema = schema
210
.iter_names()
211
.zip(inferred_schema.into_iter().map(|(_, dtype)| dtype))
212
.map(|(name, dtype)| (name.clone(), dtype))
213
.collect();
214
}
215
}
216
217
if let Some(dtypes) = self.options.dtype_overwrite.as_deref() {
218
for (i, dtype) in dtypes.iter().enumerate() {
219
inferred_schema.set_dtype_at_index(i, dtype.clone());
220
}
221
}
222
223
// TODO
224
// We currently always override with the projected dtype, but this may cause
225
// issues e.g. with temporal types. This can be improved to better choose
226
// between the 2 dtypes.
227
for (name, inferred_dtype) in inferred_schema.iter_mut() {
228
if let Some(projected_dtype) = projected_schema.get(name) {
229
*inferred_dtype = projected_dtype.clone();
230
}
231
}
232
233
let inferred_schema = Arc::new(inferred_schema);
234
235
if let Some(mut tx) = file_schema_tx {
236
_ = tx.try_send(inferred_schema.clone())
237
}
238
239
let projection: Vec<usize> = projected_schema
240
.iter_names()
241
.filter_map(|name| inferred_schema.index_of(name))
242
.collect();
243
244
if verbose {
245
eprintln!(
246
"[CsvFileReader]: project: {} / {}, slice: {:?}, row_index: {:?}",
247
projection.len(),
248
inferred_schema.len(),
249
&pre_slice,
250
row_index,
251
)
252
}
253
254
// Only used on empty projection, or if we need the exact row count.
255
let alt_count_lines: Option<Arc<CountLinesWithComments>> =
256
CountLinesWithComments::opt_new(&self.options.parse_options).map(Arc::new);
257
let chunk_reader = Arc::new(ChunkReader::try_new(
258
self.options.clone(),
259
inferred_schema.clone(),
260
projection,
261
row_index,
262
alt_count_lines.clone(),
263
)?);
264
265
let needs_full_row_count = n_rows_in_file_tx.is_some();
266
267
let (line_batch_tx, line_batch_receivers) =
268
distributor_channel(num_pipelines, *DEFAULT_DISTRIBUTOR_BUFFER_SIZE);
269
270
let line_batch_source_handle = AbortOnDropHandle::new(spawn(
271
TaskPriority::Low,
272
LineBatchSource {
273
memslice: memslice.clone(),
274
line_counter: CountLines::new(
275
self.options.parse_options.quote_char,
276
self.options.parse_options.eol_char,
277
),
278
line_batch_tx,
279
options: self.options.clone(),
280
file_schema_len: inferred_schema.len(),
281
pre_slice,
282
needs_full_row_count,
283
num_pipelines,
284
verbose,
285
}
286
.run(),
287
));
288
289
let n_workers = line_batch_receivers.len();
290
291
let (morsel_senders, rx) = FileReaderOutputSend::new_parallel(num_pipelines);
292
293
let line_batch_decode_handles = line_batch_receivers
294
.into_iter()
295
.zip(morsel_senders)
296
.enumerate()
297
.map(|(worker_idx, (mut line_batch_rx, mut morsel_tx))| {
298
// Hold a ref as we are receiving `&'static [u8]`s pointing to this.
299
let global_memslice = memslice.clone();
300
// Only verbose log from the last worker to avoid flooding output.
301
let verbose = verbose && worker_idx == n_workers - 1;
302
let mut n_rows_processed: usize = 0;
303
let chunk_reader = chunk_reader.clone();
304
// Note: We don't use this (it is handled by the bridge). But morsels require a source token.
305
let source_token = SourceToken::new();
306
let alt_count_lines = alt_count_lines.clone();
307
308
AbortOnDropHandle::new(spawn(TaskPriority::Low, async move {
309
while let Ok(LineBatch {
310
bytes,
311
n_lines,
312
slice,
313
row_offset,
314
morsel_seq,
315
}) = line_batch_rx.recv().await
316
{
317
debug_assert!(bytes.as_ptr() as usize >= global_memslice.as_ptr() as usize);
318
debug_assert!(
319
bytes.as_ptr() as usize + bytes.len()
320
<= global_memslice.as_ptr() as usize + global_memslice.len()
321
);
322
323
let (offset, len) = match slice {
324
SLICE_ENDED => (0, 1),
325
v => v,
326
};
327
328
let (df, n_rows_in_chunk) =
329
chunk_reader.read_chunk(bytes, n_lines, (offset, len), row_offset)?;
330
331
n_rows_processed = n_rows_processed.saturating_add(n_rows_in_chunk);
332
333
if (offset, len) == SLICE_ENDED {
334
break;
335
}
336
337
let morsel = Morsel::new(df, morsel_seq, source_token.clone());
338
339
if morsel_tx.send_morsel(morsel).await.is_err() {
340
break;
341
}
342
}
343
344
drop(morsel_tx);
345
346
if needs_full_row_count {
347
if verbose {
348
eprintln!(
349
"[CSV LineBatchProcessor {worker_idx}]: entering row count mode"
350
);
351
}
352
353
while let Ok(LineBatch {
354
bytes,
355
n_lines,
356
slice,
357
row_offset: _,
358
morsel_seq: _,
359
}) = line_batch_rx.recv().await
360
{
361
assert_eq!(slice, SLICE_ENDED);
362
363
let n_lines = if let Some(v) = alt_count_lines.as_deref() {
364
v.count_lines(bytes)?
365
} else {
366
n_lines
367
};
368
369
n_rows_processed = n_rows_processed.saturating_add(n_lines);
370
}
371
}
372
373
PolarsResult::Ok(n_rows_processed)
374
}))
375
})
376
.collect::<Vec<_>>();
377
378
Ok((
379
rx,
380
spawn(TaskPriority::Low, async move {
381
let mut row_position: usize = 0;
382
383
for handle in line_batch_decode_handles {
384
let rows_processed = handle.await?;
385
row_position = row_position.saturating_add(rows_processed);
386
}
387
388
row_position = {
389
let rows_skipped = line_batch_source_handle.await?;
390
row_position.saturating_add(rows_skipped)
391
};
392
393
let row_position = IdxSize::try_from(row_position)
394
.map_err(|_| polars_err!(bigidx, ctx = "csv file", size = row_position))?;
395
396
if let Some(mut n_rows_in_file_tx) = n_rows_in_file_tx {
397
assert!(needs_full_row_count);
398
_ = n_rows_in_file_tx.try_send(row_position);
399
}
400
401
if let Some(mut row_position_on_end_tx) = row_position_on_end_tx {
402
_ = row_position_on_end_tx.try_send(row_position);
403
}
404
405
Ok(())
406
}),
407
))
408
}
409
}
410
411
impl CsvFileReader {
412
/// # Panics
413
/// Panics if `self.cached_bytes` is None.
414
fn get_bytes_maybe_decompress(&mut self) -> PolarsResult<MemSlice> {
415
let mut out = vec![];
416
maybe_decompress_bytes(self.cached_bytes.as_deref().unwrap(), &mut out)?;
417
418
if !out.is_empty() {
419
self.cached_bytes = Some(MemSlice::from_vec(out));
420
}
421
422
Ok(self.cached_bytes.clone().unwrap())
423
}
424
}
425
426
struct LineBatchSource {
427
memslice: MemSlice,
428
line_counter: CountLines,
429
line_batch_tx: distributor_channel::Sender<LineBatch>,
430
options: Arc<CsvReadOptions>,
431
file_schema_len: usize,
432
pre_slice: Option<Slice>,
433
needs_full_row_count: bool,
434
num_pipelines: usize,
435
verbose: bool,
436
}
437
438
impl LineBatchSource {
439
/// Returns the number of rows skipped from the start of the file according to CountLines.
440
async fn run(self) -> PolarsResult<usize> {
441
let LineBatchSource {
442
memslice,
443
line_counter,
444
mut line_batch_tx,
445
options,
446
file_schema_len,
447
pre_slice,
448
needs_full_row_count,
449
num_pipelines,
450
verbose,
451
} = self;
452
453
let mut n_rows_skipped: usize = 0;
454
455
let global_slice = if let Some(pre_slice) = pre_slice {
456
match pre_slice {
457
Slice::Positive { .. } => Some(Range::<usize>::from(pre_slice)),
458
// IR lowering puts negative slice in separate node.
459
// TODO: Native line buffering for negative slice
460
Slice::Negative { .. } => unreachable!(),
461
}
462
} else {
463
None
464
};
465
466
let morsel_seq_ref = &mut MorselSeq::default();
467
let current_row_offset_ref = &mut 0usize;
468
469
if verbose {
470
eprintln!("[CsvSource]: Start line splitting",);
471
}
472
473
let global_bytes: &[u8] = memslice.as_ref();
474
let global_bytes: &'static [u8] = unsafe { std::mem::transmute(global_bytes) };
475
476
let i = {
477
let parse_options = options.parse_options.as_ref();
478
479
let quote_char = parse_options.quote_char;
480
let eol_char = parse_options.eol_char;
481
482
let skip_lines = options.skip_lines;
483
let skip_rows_before_header = options.skip_rows;
484
let skip_rows_after_header = options.skip_rows_after_header;
485
let comment_prefix = parse_options.comment_prefix.clone();
486
let has_header = options.has_header;
487
488
find_starting_point(
489
global_bytes,
490
quote_char,
491
eol_char,
492
file_schema_len,
493
skip_lines,
494
skip_rows_before_header,
495
skip_rows_after_header,
496
comment_prefix.as_ref(),
497
has_header,
498
)?
499
};
500
501
let mut bytes = &global_bytes[i..];
502
503
let mut chunk_size = {
504
let max_chunk_size = 16 * 1024 * 1024;
505
let chunk_size = if global_slice.is_some() {
506
max_chunk_size
507
} else {
508
std::cmp::min(bytes.len() / (16 * num_pipelines), max_chunk_size)
509
};
510
511
// Use a small min chunk size to catch failures in tests.
512
#[cfg(debug_assertions)]
513
let min_chunk_size = 64;
514
#[cfg(not(debug_assertions))]
515
let min_chunk_size = 1024 * 4;
516
std::cmp::max(chunk_size, min_chunk_size)
517
};
518
519
loop {
520
if bytes.is_empty() {
521
break;
522
}
523
524
let (count, position) = line_counter.find_next(bytes, &mut chunk_size);
525
let (count, position) = if count == 0 {
526
(1, bytes.len())
527
} else {
528
let pos = (position + 1).min(bytes.len()); // +1 for '\n'
529
(count, pos)
530
};
531
532
let slice_start = bytes.as_ptr() as usize - global_bytes.as_ptr() as usize;
533
534
bytes = &bytes[position..];
535
536
let current_row_offset = *current_row_offset_ref;
537
*current_row_offset_ref += count;
538
539
let slice = if let Some(global_slice) = &global_slice {
540
match SplitSlicePosition::split_slice_at_file(
541
current_row_offset,
542
count,
543
global_slice.clone(),
544
) {
545
// Note that we don't check that the skipped line batches actually contain this many
546
// lines.
547
SplitSlicePosition::Before => {
548
n_rows_skipped = n_rows_skipped.saturating_add(count);
549
continue;
550
},
551
SplitSlicePosition::Overlapping(offset, len) => (offset, len),
552
SplitSlicePosition::After => {
553
if needs_full_row_count {
554
// If we need to know the unrestricted row count, we need
555
// to go until the end.
556
SLICE_ENDED
557
} else {
558
break;
559
}
560
},
561
}
562
} else {
563
NO_SLICE
564
};
565
566
let bytes_this_chunk = &global_bytes[slice_start..slice_start + position];
567
568
let morsel_seq = *morsel_seq_ref;
569
*morsel_seq_ref = morsel_seq.successor();
570
571
let batch = LineBatch {
572
bytes: bytes_this_chunk,
573
n_lines: count,
574
slice,
575
row_offset: current_row_offset,
576
morsel_seq,
577
};
578
579
if line_batch_tx.send(batch).await.is_err() {
580
break;
581
}
582
}
583
584
Ok(n_rows_skipped)
585
}
586
}
587
588
#[derive(Default)]
589
struct ChunkReader {
590
reader_schema: SchemaRef,
591
parse_options: Arc<CsvParseOptions>,
592
fields_to_cast: Vec<Field>,
593
ignore_errors: bool,
594
projection: Vec<usize>,
595
null_values: Option<NullValuesCompiled>,
596
validate_utf8: bool,
597
row_index: Option<RowIndex>,
598
// Alternate line counter when there are comments. This is used on empty projection.
599
alt_count_lines: Option<Arc<CountLinesWithComments>>,
600
}
601
602
impl ChunkReader {
603
fn try_new(
604
options: Arc<CsvReadOptions>,
605
mut reader_schema: SchemaRef,
606
projection: Vec<usize>,
607
row_index: Option<RowIndex>,
608
alt_count_lines: Option<Arc<CountLinesWithComments>>,
609
) -> PolarsResult<Self> {
610
let mut fields_to_cast: Vec<Field> = options.fields_to_cast.clone();
611
prepare_csv_schema(&mut reader_schema, &mut fields_to_cast)?;
612
613
let parse_options = options.parse_options.clone();
614
615
// Logic from `CoreReader::new()`
616
617
let null_values = parse_options
618
.null_values
619
.clone()
620
.map(|nv| nv.compile(&reader_schema))
621
.transpose()?;
622
623
let validate_utf8 = matches!(parse_options.encoding, CsvEncoding::Utf8)
624
&& reader_schema.iter_fields().any(|f| f.dtype().is_string());
625
626
Ok(Self {
627
reader_schema,
628
parse_options,
629
fields_to_cast,
630
ignore_errors: options.ignore_errors,
631
projection,
632
null_values,
633
validate_utf8,
634
row_index,
635
alt_count_lines,
636
})
637
}
638
639
/// The 2nd return value indicates how many rows exist in the chunk.
640
fn read_chunk(
641
&self,
642
chunk: &[u8],
643
// Number of lines according to CountLines
644
n_lines: usize,
645
slice: (usize, usize),
646
chunk_row_offset: usize,
647
) -> PolarsResult<(DataFrame, usize)> {
648
if self.validate_utf8 && !validate_utf8(chunk) {
649
polars_bail!(ComputeError: "invalid utf-8 sequence")
650
}
651
652
// If projection is empty create a DataFrame with the correct height by counting the lines.
653
let mut df = if self.projection.is_empty() {
654
let h = if let Some(v) = &self.alt_count_lines {
655
v.count_lines(chunk)?
656
} else {
657
n_lines
658
};
659
660
DataFrame::empty_with_height(h)
661
} else {
662
read_chunk(
663
chunk,
664
&self.parse_options,
665
&self.reader_schema,
666
self.ignore_errors,
667
&self.projection,
668
0, // bytes_offset_thread
669
n_lines, // capacity
670
self.null_values.as_ref(),
671
usize::MAX, // chunk_size
672
chunk.len(), // stop_at_nbytes
673
Some(0), // starting_point_offset
674
)?
675
};
676
677
let height = df.height();
678
let n_lines_is_correct = df.height() == n_lines;
679
680
// Check malformed
681
if df.height() > n_lines
682
|| (df.height() < n_lines && self.parse_options.comment_prefix.is_none())
683
{
684
// Note: in case data is malformed, df.height() is more likely to be correct than n_lines.
685
let msg = format!(
686
"CSV malformed: expected {} rows, actual {} rows, in chunk starting at row_offset {}, length {}",
687
n_lines,
688
df.height(),
689
chunk_row_offset,
690
chunk.len()
691
);
692
if self.ignore_errors {
693
polars_warn!(msg);
694
} else {
695
polars_bail!(ComputeError: msg);
696
}
697
}
698
699
if slice != NO_SLICE {
700
assert!(slice != SLICE_ENDED);
701
assert!(n_lines_is_correct || slice.1 == 0);
702
703
df = df.slice(i64::try_from(slice.0).unwrap(), slice.1);
704
}
705
706
cast_columns(&mut df, &self.fields_to_cast, false, self.ignore_errors)?;
707
708
if let Some(ri) = &self.row_index {
709
assert!(n_lines_is_correct);
710
711
unsafe {
712
df.with_column_unchecked(Column::new_row_index(
713
ri.name.clone(),
714
ri.offset
715
.saturating_add(chunk_row_offset.try_into().unwrap_or(IdxSize::MAX)),
716
df.height(),
717
)?);
718
}
719
}
720
721
Ok((df, height))
722
}
723
}
724
725
struct CountLinesWithComments {
726
quote_char: Option<u8>,
727
eol_char: u8,
728
comment_prefix: CommentPrefix,
729
}
730
731
impl CountLinesWithComments {
732
fn opt_new(parse_options: &CsvParseOptions) -> Option<Self> {
733
parse_options
734
.comment_prefix
735
.clone()
736
.map(|comment_prefix| CountLinesWithComments {
737
quote_char: parse_options.quote_char,
738
eol_char: parse_options.eol_char,
739
comment_prefix,
740
})
741
}
742
743
fn count_lines(&self, bytes: &[u8]) -> PolarsResult<usize> {
744
count_rows_from_slice(
745
bytes,
746
self.quote_char,
747
Some(&self.comment_prefix),
748
self.eol_char,
749
false, // has_header
750
)
751
}
752
}
753
754