Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/csv/read/streaming.rs
8424 views
1
use std::cmp;
2
use std::iter::Iterator;
3
use std::num::NonZeroUsize;
4
use std::sync::Arc;
5
6
use polars_buffer::Buffer;
7
use polars_core::prelude::Schema;
8
use polars_core::schema::SchemaRef;
9
use polars_error::{PolarsResult, polars_bail, polars_ensure};
10
11
use crate::csv::read::schema_inference::infer_file_schema_impl;
12
use crate::prelude::_csv_read_internal::{SplitLines, is_comment_line};
13
use crate::prelude::{CsvParseOptions, CsvReadOptions};
14
use crate::utils::compression::CompressedReader;
15
16
pub type InspectContentFn<'a> = Box<dyn FnMut(&[u8]) + 'a>;
17
18
/// Reads bytes from `reader` until the CSV starting point is reached depending on the options.
19
///
20
/// Returns the inferred schema and leftover bytes not yet consumed, which may be empty. The
21
/// leftover bytes + `reader.read_next_slice` is guaranteed to start at first real content row.
22
///
23
/// `inspect_first_content_row_fn` allows looking at the first content row, this is where parsing
24
/// will start. Beware even if the function is provided it's *not* guaranteed that the returned
25
/// value will be `Some`, since it the CSV may be incomplete.
26
///
27
/// The reading is done in an iterative streaming fashion
28
///
29
/// This function isn't perf critical but would increase binary-size so don't inline it.
30
#[inline(never)]
31
pub fn read_until_start_and_infer_schema(
32
options: &CsvReadOptions,
33
projected_schema: Option<SchemaRef>,
34
mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
35
reader: &mut CompressedReader,
36
) -> PolarsResult<(Schema, Buffer<u8>)> {
37
// It's better to be above than below here.
38
const ESTIMATED_BYTES_PER_ROW: usize = 200;
39
40
#[derive(Copy, Clone)]
41
enum State {
42
// Ordered so that all states only happen after the ones before it.
43
SkipEmpty,
44
SkipRowsBeforeHeader(usize),
45
SkipHeader(bool),
46
SkipRowsAfterHeader(usize),
47
ContentInspect,
48
InferCollect,
49
Done,
50
}
51
52
polars_ensure!(
53
!(options.skip_lines != 0 && options.skip_rows != 0),
54
InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
55
);
56
57
// We have to treat skip_lines differently since the lines it skips may not follow regular CSV
58
// quote escape rules.
59
let prev_leftover = skip_lines_naive(
60
options.parse_options.eol_char,
61
options.skip_lines,
62
options.raise_if_empty,
63
reader,
64
)?;
65
66
let mut state = if options.has_header {
67
State::SkipEmpty
68
} else if options.skip_lines != 0 {
69
// skip_lines shouldn't skip extra comments before the header, so directly go to SkipHeader
70
// state.
71
State::SkipHeader(false)
72
} else {
73
State::SkipRowsBeforeHeader(options.skip_rows)
74
};
75
76
let comment_prefix = options.parse_options.comment_prefix.as_ref();
77
let infer_schema_length = options.infer_schema_length.unwrap_or(usize::MAX);
78
79
let mut header_line = None;
80
let mut content_lines = Vec::with_capacity(options.infer_schema_length.unwrap_or_else(|| {
81
reader
82
.total_len_estimate()
83
.saturating_div(ESTIMATED_BYTES_PER_ROW)
84
}));
85
86
// In the compressed case `reader.read_next_slice` has to copy the previous leftover into a new
87
// `Vec` which would lead to quadratic copying if we don't factor in `infer_schema_length` into
88
// the initial read size. We have to retain the row memory for schema inference and also for
89
// actual morsel generation. If `infer_schema_length` is set to `None` we will have to read the
90
// full input anyway so we can do so once and avoid re-copying.
91
let initial_read_size = options
92
.infer_schema_length
93
.map(|isl| {
94
cmp::max(
95
CompressedReader::initial_read_size(),
96
isl.saturating_mul(ESTIMATED_BYTES_PER_ROW),
97
)
98
})
99
.unwrap_or(usize::MAX);
100
101
let leftover = for_each_line_from_reader(
102
&options.parse_options,
103
true,
104
prev_leftover,
105
initial_read_size,
106
reader,
107
|mem_slice_line| {
108
let line = &*mem_slice_line;
109
110
let done = loop {
111
match &mut state {
112
State::SkipEmpty => {
113
if line.is_empty() || line == b"\r" {
114
break LineUse::ConsumeDiscard;
115
}
116
117
state = State::SkipRowsBeforeHeader(options.skip_rows);
118
},
119
State::SkipRowsBeforeHeader(remaining) => {
120
let is_comment = is_comment_line(line, comment_prefix);
121
122
if *remaining == 0 && !is_comment {
123
state = State::SkipHeader(false);
124
continue;
125
}
126
127
*remaining -= !is_comment as usize;
128
break LineUse::ConsumeDiscard;
129
},
130
State::SkipHeader(did_skip) => {
131
if !options.has_header || *did_skip {
132
state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
133
continue;
134
}
135
136
header_line = Some(mem_slice_line.clone());
137
*did_skip = true;
138
break LineUse::ConsumeDiscard;
139
},
140
State::SkipRowsAfterHeader(remaining) => {
141
let is_comment = is_comment_line(line, comment_prefix);
142
143
if *remaining == 0 && !is_comment {
144
state = State::ContentInspect;
145
continue;
146
}
147
148
*remaining -= !is_comment as usize;
149
break LineUse::ConsumeDiscard;
150
},
151
State::ContentInspect => {
152
if let Some(func) = &mut inspect_first_content_row_fn {
153
func(line);
154
}
155
156
state = State::InferCollect;
157
},
158
State::InferCollect => {
159
if !is_comment_line(line, comment_prefix) {
160
content_lines.push(mem_slice_line.clone());
161
if content_lines.len() >= infer_schema_length {
162
state = State::Done;
163
continue;
164
}
165
}
166
167
break LineUse::ConsumeKeep;
168
},
169
State::Done => {
170
break LineUse::Done;
171
},
172
}
173
};
174
175
Ok(done)
176
},
177
)?;
178
179
let infer_all_as_str = infer_schema_length == 0;
180
181
let inferred_schema = infer_schema(
182
&header_line,
183
&content_lines,
184
infer_all_as_str,
185
options,
186
projected_schema,
187
)?;
188
189
Ok((inferred_schema, leftover))
190
}
191
192
enum LineUse {
193
ConsumeDiscard,
194
ConsumeKeep,
195
Done,
196
}
197
198
/// Iterate over valid CSV lines produced by reader.
199
///
200
/// Returning `ConsumeDiscard` after `ConsumeKeep` is a logic error, since a segmented `Buffer`
201
/// can't be constructed.
202
fn for_each_line_from_reader(
203
parse_options: &CsvParseOptions,
204
is_file_start: bool,
205
mut prev_leftover: Buffer<u8>,
206
initial_read_size: usize,
207
reader: &mut CompressedReader,
208
mut line_fn: impl FnMut(Buffer<u8>) -> PolarsResult<LineUse>,
209
) -> PolarsResult<Buffer<u8>> {
210
let mut is_first_line = is_file_start;
211
212
let fixed_read_size = std::env::var("POLARS_FORCE_CSV_INFER_CHUNK_SIZE")
213
.map(|x| {
214
x.parse::<NonZeroUsize>()
215
.unwrap_or_else(|_| {
216
panic!("invalid value for POLARS_FORCE_CSV_INFER_CHUNK_SIZE: {x}")
217
})
218
.get()
219
})
220
.ok();
221
222
let mut read_size = fixed_read_size.unwrap_or(initial_read_size);
223
let mut retain_offset = None;
224
225
loop {
226
let (mut slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
227
if slice.is_empty() {
228
return Ok(Buffer::new());
229
}
230
231
if is_first_line {
232
is_first_line = false;
233
const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
234
if slice.get(0..3) == UTF8_BOM_MARKER {
235
slice = slice.sliced(3..);
236
}
237
}
238
239
let line_to_sub_slice = |line: &[u8]| {
240
let start = line.as_ptr() as usize - slice.as_ptr() as usize;
241
slice.clone().sliced(start..(start + line.len()))
242
};
243
244
// When reading a CSV with `has_header=False` we need to read up to `infer_schema_length` lines, but we only want to decompress the input once, so we grow a `Buffer` that will be returned as leftover.
245
let effective_slice = if let Some(offset) = retain_offset {
246
slice.clone().sliced(offset..)
247
} else {
248
slice.clone()
249
};
250
251
let mut lines = SplitLines::new(
252
&effective_slice,
253
parse_options.quote_char,
254
parse_options.eol_char,
255
parse_options.comment_prefix.as_ref(),
256
);
257
let Some(mut prev_line) = lines.next() else {
258
read_size = read_size.saturating_mul(2);
259
prev_leftover = slice;
260
continue;
261
};
262
263
let mut should_ret = false;
264
265
// The last line in `SplitLines` may be incomplete if `slice` ends before the file does, so
266
// we iterate everything except the last line.
267
for next_line in lines {
268
match line_fn(line_to_sub_slice(prev_line))? {
269
LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
270
LineUse::ConsumeKeep => {
271
if retain_offset.is_none() {
272
let retain_start_offset =
273
prev_line.as_ptr() as usize - slice.as_ptr() as usize;
274
prev_leftover = slice.clone().sliced(retain_start_offset..);
275
retain_offset = Some(0);
276
}
277
},
278
LineUse::Done => {
279
should_ret = true;
280
break;
281
},
282
}
283
prev_line = next_line;
284
}
285
286
let mut unconsumed_offset = prev_line.as_ptr() as usize - effective_slice.as_ptr() as usize;
287
288
// EOF file reached, the last line will have no continuation on the next call to
289
// `read_next_slice`.
290
if bytes_read < read_size {
291
match line_fn(line_to_sub_slice(prev_line))? {
292
LineUse::ConsumeDiscard => {
293
debug_assert!(retain_offset.is_none());
294
unconsumed_offset += prev_line.len();
295
if effective_slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
296
unconsumed_offset += 1;
297
}
298
},
299
LineUse::ConsumeKeep | LineUse::Done => (),
300
}
301
should_ret = true;
302
}
303
304
if let Some(offset) = &mut retain_offset {
305
if *offset == 0 {
306
// `unconsumed_offset` was computed with the full `slice` as base reference
307
// compensate retained offset.
308
*offset = unconsumed_offset - (slice.len() - prev_leftover.len());
309
} else {
310
prev_leftover = slice;
311
*offset += unconsumed_offset;
312
}
313
} else {
314
// Since `read_next_slice` has to copy the leftover bytes in the decompression case,
315
// it's more efficient to hand in as little as possible.
316
prev_leftover = slice.sliced(unconsumed_offset..);
317
}
318
319
if should_ret {
320
return Ok(prev_leftover);
321
}
322
323
if read_size < CompressedReader::ideal_read_size() && fixed_read_size.is_none() {
324
read_size *= 4;
325
}
326
}
327
}
328
329
fn skip_lines_naive(
330
eol_char: u8,
331
skip_lines: usize,
332
raise_if_empty: bool,
333
reader: &mut CompressedReader,
334
) -> PolarsResult<Buffer<u8>> {
335
let mut prev_leftover = Buffer::new();
336
337
if skip_lines == 0 {
338
return Ok(prev_leftover);
339
}
340
341
let mut remaining = skip_lines;
342
let mut read_size = CompressedReader::initial_read_size();
343
344
loop {
345
let (slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
346
let mut bytes: &[u8] = &slice;
347
348
'inner: loop {
349
let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
350
read_size = read_size.saturating_mul(2);
351
break 'inner;
352
};
353
pos = cmp::min(pos + 1, bytes.len());
354
355
bytes = &bytes[pos..];
356
remaining -= 1;
357
358
if remaining == 0 {
359
let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
360
prev_leftover = slice.sliced(unconsumed_offset..);
361
return Ok(prev_leftover);
362
}
363
}
364
365
if bytes_read == 0 {
366
if raise_if_empty {
367
polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
368
} else {
369
return Ok(Buffer::new());
370
}
371
}
372
373
// No need to search for naive eol twice in the leftover.
374
prev_leftover = Buffer::new();
375
376
if read_size < CompressedReader::ideal_read_size() {
377
read_size *= 4;
378
}
379
}
380
}
381
382
fn infer_schema(
383
header_line: &Option<Buffer<u8>>,
384
content_lines: &[Buffer<u8>],
385
infer_all_as_str: bool,
386
options: &CsvReadOptions,
387
projected_schema: Option<SchemaRef>,
388
) -> PolarsResult<Schema> {
389
let has_no_inference_data = if options.has_header {
390
header_line.is_none()
391
} else {
392
content_lines.is_empty()
393
};
394
395
if options.raise_if_empty && has_no_inference_data {
396
polars_bail!(NoData: "empty CSV");
397
}
398
399
let mut inferred_schema = if has_no_inference_data {
400
Schema::default()
401
} else {
402
infer_file_schema_impl(
403
header_line,
404
content_lines,
405
infer_all_as_str,
406
&options.parse_options,
407
options.schema_overwrite.as_deref(),
408
)
409
};
410
411
if let Some(schema) = &options.schema {
412
// Note: User can provide schema with more columns, they will simply
413
// be projected as NULL.
414
// TODO: Should maybe expose a missing_columns parameter to the API for this.
415
if schema.len() < inferred_schema.len() && !options.parse_options.truncate_ragged_lines {
416
polars_bail!(
417
SchemaMismatch:
418
"provided schema does not match number of columns in file ({} != {} in file)",
419
schema.len(),
420
inferred_schema.len(),
421
);
422
}
423
424
if options.parse_options.truncate_ragged_lines {
425
inferred_schema = Arc::unwrap_or_clone(schema.clone());
426
} else {
427
inferred_schema = schema
428
.iter_names()
429
.zip(inferred_schema.into_iter().map(|(_, dtype)| dtype))
430
.map(|(name, dtype)| (name.clone(), dtype))
431
.collect();
432
}
433
}
434
435
if let Some(dtypes) = options.dtype_overwrite.as_deref() {
436
for (i, dtype) in dtypes.iter().enumerate() {
437
inferred_schema.set_dtype_at_index(i, dtype.clone());
438
}
439
}
440
441
// TODO: We currently always override with the projected dtype, but this may cause issues e.g.
442
// with temporal types. This can be improved to better choose between the 2 dtypes.
443
if let Some(projected_schema) = projected_schema {
444
for (name, inferred_dtype) in inferred_schema.iter_mut() {
445
if let Some(projected_dtype) = projected_schema.get(name) {
446
*inferred_dtype = projected_dtype.clone();
447
}
448
}
449
}
450
451
Ok(inferred_schema)
452
}
453
454