Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/csv.rs
8431 views
1
use std::iter::Iterator;
2
use std::ops::Range;
3
use std::sync::Arc;
4
5
use async_trait::async_trait;
6
use polars_buffer::Buffer;
7
use polars_core::prelude::Field;
8
use polars_core::schema::{SchemaExt, SchemaRef};
9
use polars_error::{PolarsResult, polars_bail, polars_err, polars_warn};
10
use polars_io::cloud::CloudOptions;
11
use polars_io::csv::read::streaming::read_until_start_and_infer_schema;
12
use polars_io::prelude::_csv_read_internal::{
13
CountLines, NullValuesCompiled, cast_columns, prepare_csv_schema, read_chunk,
14
};
15
use polars_io::prelude::builder::validate_utf8;
16
use polars_io::prelude::{CsvEncoding, CsvParseOptions, CsvReadOptions};
17
use polars_io::utils::compression::CompressedReader;
18
use polars_io::utils::slice::SplitSlicePosition;
19
use polars_plan::dsl::ScanSource;
20
use polars_utils::IdxSize;
21
use polars_utils::mem::prefetch::prefetch_l2;
22
use polars_utils::slice_enum::Slice;
23
24
use super::multi_scan::reader_interface::output::FileReaderOutputRecv;
25
use super::multi_scan::reader_interface::{BeginReadArgs, FileReader, FileReaderCallbacks};
26
use crate::DEFAULT_DISTRIBUTOR_BUFFER_SIZE;
27
use crate::async_executor::{AbortOnDropHandle, spawn};
28
use crate::async_primitives::distributor_channel::{self, distributor_channel};
29
use crate::morsel::SourceToken;
30
use crate::nodes::compute_node_prelude::*;
31
use crate::nodes::io_sources::multi_scan::reader_interface::Projection;
32
use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;
33
use crate::nodes::{MorselSeq, TaskPriority};
34
35
pub mod builder {
36
use std::sync::Arc;
37
38
use polars_core::config;
39
use polars_io::cloud::CloudOptions;
40
use polars_io::prelude::CsvReadOptions;
41
use polars_plan::dsl::ScanSource;
42
43
use super::CsvFileReader;
44
use crate::nodes::io_sources::multi_scan::reader_interface::FileReader;
45
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
46
use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;
47
48
impl FileReaderBuilder for Arc<CsvReadOptions> {
49
fn reader_name(&self) -> &str {
50
"csv"
51
}
52
53
fn reader_capabilities(&self) -> ReaderCapabilities {
54
use ReaderCapabilities as RC;
55
56
RC::NEEDS_FILE_CACHE_INIT
57
| if self.parse_options.comment_prefix.is_some() {
58
RC::empty()
59
} else {
60
RC::PRE_SLICE
61
}
62
}
63
64
fn build_file_reader(
65
&self,
66
source: ScanSource,
67
cloud_options: Option<Arc<CloudOptions>>,
68
_scan_source_idx: usize,
69
) -> Box<dyn FileReader> {
70
let scan_source = source;
71
let verbose = config::verbose();
72
let options = self.clone();
73
74
let reader = CsvFileReader {
75
scan_source,
76
cloud_options,
77
options,
78
verbose,
79
cached_bytes: None,
80
};
81
82
Box::new(reader) as Box<dyn FileReader>
83
}
84
}
85
}
86
87
/// Read all rows in the chunk
88
const NO_SLICE: (usize, usize) = (0, usize::MAX);
89
/// This is used if we finish the slice but still need a row count. It signals to the workers to
90
/// go into line-counting mode where they can skip parsing the chunks.
91
const SLICE_ENDED: (usize, usize) = (usize::MAX, 0);
92
93
struct LineBatch {
94
// Safety: All receivers (LineBatchProcessors) hold a Buffer ref to this.
95
mem_slice: Buffer<u8>,
96
n_lines: usize,
97
slice: (usize, usize),
98
/// Position of this chunk relative to the start of the file according to CountLines.
99
row_offset: usize,
100
morsel_seq: MorselSeq,
101
}
102
103
struct CsvFileReader {
104
scan_source: ScanSource,
105
#[expect(unused)] // Will be used when implementing cloud streaming.
106
cloud_options: Option<Arc<CloudOptions>>,
107
options: Arc<CsvReadOptions>,
108
// Cached on first access - we may be called multiple times e.g. on negative slice.
109
cached_bytes: Option<Buffer<u8>>,
110
verbose: bool,
111
}
112
113
#[async_trait]
114
impl FileReader for CsvFileReader {
115
async fn initialize(&mut self) -> PolarsResult<()> {
116
let buffer = self
117
.scan_source
118
.as_scan_source_ref()
119
.to_buffer_async_assume_latest(self.scan_source.run_async())?;
120
121
// Note: We do not decompress in `initialize()`.
122
self.cached_bytes = Some(buffer);
123
124
Ok(())
125
}
126
127
fn begin_read(
128
&mut self,
129
args: BeginReadArgs,
130
) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {
131
let verbose = self.verbose;
132
133
let BeginReadArgs {
134
projection: Projection::Plain(projected_schema),
135
// Because we currently only support PRE_SLICE we don't need to handle row index here.
136
row_index,
137
pre_slice,
138
predicate: None,
139
cast_columns_policy: _,
140
num_pipelines,
141
disable_morsel_split: _,
142
callbacks:
143
FileReaderCallbacks {
144
file_schema_tx,
145
n_rows_in_file_tx,
146
row_position_on_end_tx,
147
},
148
} = args
149
else {
150
panic!("unsupported args: {:?}", &args)
151
};
152
153
assert!(row_index.is_none()); // Handled outside the reader for now.
154
155
match &pre_slice {
156
Some(Slice::Negative { .. }) => unimplemented!(),
157
158
// We don't account for comments when slicing lines. We should never hit this panic -
159
// the FileReaderBuilder does not indicate PRE_SLICE support when we have a comment
160
// prefix.
161
Some(pre_slice)
162
if self.options.parse_options.comment_prefix.is_some() && pre_slice.len() > 0 =>
163
{
164
panic!("{pre_slice:?}")
165
},
166
167
_ => {},
168
}
169
170
let mut reader = CompressedReader::try_new(self.cached_bytes.clone().unwrap())?;
171
172
let (inferred_schema, base_leftover) = read_until_start_and_infer_schema(
173
&self.options,
174
Some(projected_schema.clone()),
175
None,
176
&mut reader,
177
)?;
178
179
let used_schema = Arc::new(inferred_schema);
180
181
if let Some(tx) = file_schema_tx {
182
_ = tx.send(used_schema.clone())
183
}
184
185
let projection: Vec<usize> = projected_schema
186
.iter_names()
187
.filter_map(|name| used_schema.index_of(name))
188
.collect();
189
190
if verbose {
191
eprintln!(
192
"[CsvFileReader]: project: {} / {}, slice: {:?}",
193
projection.len(),
194
used_schema.len(),
195
&pre_slice,
196
)
197
}
198
199
let quote_char = self.options.parse_options.quote_char;
200
let eol_char = self.options.parse_options.eol_char;
201
let comment_prefix = self.options.parse_options.comment_prefix.clone();
202
203
let line_counter = CountLines::new(quote_char, eol_char, comment_prefix.clone());
204
205
let chunk_reader = Arc::new(ChunkReader::try_new(
206
self.options.clone(),
207
used_schema.clone(),
208
projection,
209
)?);
210
211
let needs_full_row_count = n_rows_in_file_tx.is_some();
212
213
let (line_batch_tx, line_batch_receivers) =
214
distributor_channel(num_pipelines, *DEFAULT_DISTRIBUTOR_BUFFER_SIZE);
215
216
let line_batch_source_handle = AbortOnDropHandle::new(spawn(
217
TaskPriority::Low,
218
LineBatchSource {
219
base_leftover,
220
reader,
221
line_counter,
222
line_batch_tx,
223
pre_slice,
224
needs_full_row_count,
225
verbose,
226
}
227
.run(),
228
));
229
230
let n_workers = line_batch_receivers.len();
231
232
let (morsel_senders, rx) = FileReaderOutputSend::new_parallel(num_pipelines);
233
234
let line_batch_decode_handles = line_batch_receivers
235
.into_iter()
236
.zip(morsel_senders)
237
.enumerate()
238
.map(|(worker_idx, (mut line_batch_rx, mut morsel_tx))| {
239
// Only verbose log from the last worker to avoid flooding output.
240
let verbose = verbose && worker_idx == n_workers - 1;
241
let mut n_rows_processed: usize = 0;
242
let chunk_reader = chunk_reader.clone();
243
// Note: We don't use this (it is handled by the bridge). But morsels require a source token.
244
let source_token = SourceToken::new();
245
246
AbortOnDropHandle::new(spawn(TaskPriority::Low, async move {
247
while let Ok(LineBatch {
248
mem_slice,
249
n_lines,
250
slice,
251
row_offset,
252
morsel_seq,
253
}) = line_batch_rx.recv().await
254
{
255
let (offset, len) = match slice {
256
SLICE_ENDED => (0, 1),
257
v => v,
258
};
259
260
let (df, n_rows_in_chunk) = chunk_reader.read_chunk(
261
&mem_slice,
262
n_lines,
263
(offset, len),
264
row_offset,
265
)?;
266
267
n_rows_processed = n_rows_processed.saturating_add(n_rows_in_chunk);
268
269
if (offset, len) == SLICE_ENDED {
270
break;
271
}
272
273
let morsel = Morsel::new(df, morsel_seq, source_token.clone());
274
275
if morsel_tx.send_morsel(morsel).await.is_err() {
276
break;
277
}
278
}
279
280
drop(morsel_tx);
281
282
if needs_full_row_count {
283
if verbose {
284
eprintln!(
285
"[CSV LineBatchProcessor {worker_idx}]: entering row count mode"
286
);
287
}
288
289
while let Ok(LineBatch {
290
mem_slice: _,
291
n_lines,
292
slice,
293
row_offset: _,
294
morsel_seq: _,
295
}) = line_batch_rx.recv().await
296
{
297
assert_eq!(slice, SLICE_ENDED);
298
299
n_rows_processed = n_rows_processed.saturating_add(n_lines);
300
}
301
}
302
303
PolarsResult::Ok(n_rows_processed)
304
}))
305
})
306
.collect::<Vec<_>>();
307
308
Ok((
309
rx,
310
spawn(TaskPriority::Low, async move {
311
let mut row_position: usize = 0;
312
313
for handle in line_batch_decode_handles {
314
let rows_processed = handle.await?;
315
row_position = row_position.saturating_add(rows_processed);
316
}
317
318
row_position = {
319
let rows_skipped = line_batch_source_handle.await?;
320
row_position.saturating_add(rows_skipped)
321
};
322
323
let row_position = IdxSize::try_from(row_position)
324
.map_err(|_| polars_err!(bigidx, ctx = "csv file", size = row_position))?;
325
326
if let Some(n_rows_in_file_tx) = n_rows_in_file_tx {
327
assert!(needs_full_row_count);
328
_ = n_rows_in_file_tx.send(row_position);
329
}
330
331
if let Some(row_position_on_end_tx) = row_position_on_end_tx {
332
_ = row_position_on_end_tx.send(row_position);
333
}
334
335
Ok(())
336
}),
337
))
338
}
339
}
340
341
struct LineBatchSource {
342
base_leftover: Buffer<u8>,
343
reader: CompressedReader,
344
line_counter: CountLines,
345
line_batch_tx: distributor_channel::Sender<LineBatch>,
346
pre_slice: Option<Slice>,
347
needs_full_row_count: bool,
348
verbose: bool,
349
}
350
351
impl LineBatchSource {
352
/// Returns the number of rows skipped from the start of the file according to CountLines.
353
async fn run(self) -> PolarsResult<usize> {
354
let LineBatchSource {
355
base_leftover,
356
mut reader,
357
line_counter,
358
mut line_batch_tx,
359
pre_slice,
360
needs_full_row_count,
361
verbose,
362
} = self;
363
364
let global_slice = if let Some(pre_slice) = pre_slice {
365
match pre_slice {
366
Slice::Positive { .. } => Some(Range::<usize>::from(pre_slice)),
367
// IR lowering puts negative slice in separate node.
368
// TODO: Native line buffering for negative slice
369
Slice::Negative { .. } => unreachable!(),
370
}
371
} else {
372
None
373
};
374
375
if verbose {
376
eprintln!("[CsvSource]: Start line splitting",);
377
}
378
379
let mut prev_leftover = base_leftover;
380
let mut row_offset = 0usize;
381
let mut morsel_seq = MorselSeq::default();
382
let mut n_rows_skipped: usize = 0;
383
let mut read_size = CompressedReader::initial_read_size();
384
385
loop {
386
let (mem_slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
387
if mem_slice.is_empty() {
388
break;
389
}
390
391
prefetch_l2(&mem_slice);
392
393
let is_eof = bytes_read == 0;
394
let (n_lines, unconsumed_offset) = line_counter.count_rows(&mem_slice, is_eof);
395
396
let batch_slice = mem_slice.clone().sliced(0..unconsumed_offset);
397
prev_leftover = mem_slice.sliced(unconsumed_offset..);
398
399
if batch_slice.is_empty() && !is_eof {
400
// This allows the slice to grow until at least a single row is included. To avoid a quadratic run-time for large row sizes, we double the read size.
401
read_size = read_size.saturating_mul(2);
402
continue;
403
}
404
405
// Has to happen here before slicing, since there are slice operations that skip morsel
406
// sending.
407
let prev_row_offset = row_offset;
408
row_offset += n_lines;
409
410
let slice = if let Some(global_slice) = &global_slice {
411
match SplitSlicePosition::split_slice_at_file(
412
prev_row_offset,
413
n_lines,
414
global_slice.clone(),
415
) {
416
// Note that we don't check that the skipped line batches actually contain this many
417
// lines.
418
SplitSlicePosition::Before => {
419
n_rows_skipped = n_rows_skipped.saturating_add(n_lines);
420
continue;
421
},
422
SplitSlicePosition::Overlapping(offset, len) => (offset, len),
423
SplitSlicePosition::After => {
424
if needs_full_row_count {
425
// If we need to know the unrestricted row count, we need
426
// to go until the end.
427
SLICE_ENDED
428
} else {
429
break;
430
}
431
},
432
}
433
} else {
434
NO_SLICE
435
};
436
437
morsel_seq = morsel_seq.successor();
438
439
let batch = LineBatch {
440
mem_slice: batch_slice,
441
n_lines,
442
slice,
443
row_offset,
444
morsel_seq,
445
};
446
447
if line_batch_tx.send(batch).await.is_err() {
448
break;
449
}
450
451
if is_eof {
452
break;
453
}
454
455
if read_size < CompressedReader::ideal_read_size() {
456
read_size *= 4;
457
}
458
}
459
460
Ok(n_rows_skipped)
461
}
462
}
463
464
#[derive(Default)]
465
struct ChunkReader {
466
reader_schema: SchemaRef,
467
parse_options: Arc<CsvParseOptions>,
468
fields_to_cast: Vec<Field>,
469
ignore_errors: bool,
470
projection: Vec<usize>,
471
null_values: Option<NullValuesCompiled>,
472
validate_utf8: bool,
473
}
474
475
impl ChunkReader {
476
fn try_new(
477
options: Arc<CsvReadOptions>,
478
mut reader_schema: SchemaRef,
479
projection: Vec<usize>,
480
) -> PolarsResult<Self> {
481
let mut fields_to_cast: Vec<Field> = options.fields_to_cast.clone();
482
prepare_csv_schema(&mut reader_schema, &mut fields_to_cast)?;
483
484
let parse_options = options.parse_options.clone();
485
486
// Logic from `CoreReader::new()`
487
488
let null_values = parse_options
489
.null_values
490
.clone()
491
.map(|nv| nv.compile(&reader_schema))
492
.transpose()?;
493
494
let validate_utf8 = matches!(parse_options.encoding, CsvEncoding::Utf8)
495
&& reader_schema.iter_fields().any(|f| f.dtype().is_string());
496
497
Ok(Self {
498
reader_schema,
499
parse_options,
500
fields_to_cast,
501
ignore_errors: options.ignore_errors,
502
projection,
503
null_values,
504
validate_utf8,
505
})
506
}
507
508
/// The 2nd return value indicates how many rows exist in the chunk.
509
fn read_chunk(
510
&self,
511
chunk: &[u8],
512
// Number of lines according to CountLines
513
n_lines: usize,
514
slice: (usize, usize),
515
chunk_row_offset: usize,
516
) -> PolarsResult<(DataFrame, usize)> {
517
if self.validate_utf8 && !validate_utf8(chunk) {
518
polars_bail!(ComputeError: "invalid utf-8 sequence")
519
}
520
521
// If projection is empty create a DataFrame with the correct height by counting the lines.
522
let mut df = if self.projection.is_empty() {
523
DataFrame::empty_with_height(n_lines)
524
} else {
525
read_chunk(
526
chunk,
527
&self.parse_options,
528
&self.reader_schema,
529
self.ignore_errors,
530
&self.projection,
531
0, // bytes_offset_thread
532
n_lines, // capacity
533
self.null_values.as_ref(),
534
usize::MAX, // chunk_size
535
chunk.len(), // stop_at_nbytes
536
Some(0), // starting_point_offset
537
)?
538
};
539
540
let height = df.height();
541
542
if height != n_lines {
543
// Note: in case data is malformed, height is more likely to be correct than n_lines.
544
let msg = format!(
545
"CSV malformed: expected {} rows, actual {} rows, in chunk starting at row_offset {}, length {}",
546
n_lines,
547
height,
548
chunk_row_offset,
549
chunk.len()
550
);
551
if self.ignore_errors {
552
polars_warn!("{msg}");
553
} else {
554
polars_bail!(ComputeError: msg)
555
}
556
}
557
558
if slice != NO_SLICE {
559
assert!(slice != SLICE_ENDED);
560
561
df = df.slice(i64::try_from(slice.0).unwrap(), slice.1);
562
}
563
564
cast_columns(&mut df, &self.fields_to_cast, false, self.ignore_errors)?;
565
566
Ok((df, height))
567
}
568
}
569
570