Path: blob/main/crates/polars-io/src/csv/read/streaming.rs
8424 views
use std::cmp;1use std::iter::Iterator;2use std::num::NonZeroUsize;3use std::sync::Arc;45use polars_buffer::Buffer;6use polars_core::prelude::Schema;7use polars_core::schema::SchemaRef;8use polars_error::{PolarsResult, polars_bail, polars_ensure};910use crate::csv::read::schema_inference::infer_file_schema_impl;11use crate::prelude::_csv_read_internal::{SplitLines, is_comment_line};12use crate::prelude::{CsvParseOptions, CsvReadOptions};13use crate::utils::compression::CompressedReader;1415pub type InspectContentFn<'a> = Box<dyn FnMut(&[u8]) + 'a>;1617/// Reads bytes from `reader` until the CSV starting point is reached depending on the options.18///19/// Returns the inferred schema and leftover bytes not yet consumed, which may be empty. The20/// leftover bytes + `reader.read_next_slice` is guaranteed to start at first real content row.21///22/// `inspect_first_content_row_fn` allows looking at the first content row, this is where parsing23/// will start. Beware even if the function is provided it's *not* guaranteed that the returned24/// value will be `Some`, since it the CSV may be incomplete.25///26/// The reading is done in an iterative streaming fashion27///28/// This function isn't perf critical but would increase binary-size so don't inline it.29#[inline(never)]30pub fn read_until_start_and_infer_schema(31options: &CsvReadOptions,32projected_schema: Option<SchemaRef>,33mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,34reader: &mut CompressedReader,35) -> PolarsResult<(Schema, Buffer<u8>)> {36// It's better to be above than below here.37const ESTIMATED_BYTES_PER_ROW: usize = 200;3839#[derive(Copy, Clone)]40enum State {41// Ordered so that all states only happen after the ones before it.42SkipEmpty,43SkipRowsBeforeHeader(usize),44SkipHeader(bool),45SkipRowsAfterHeader(usize),46ContentInspect,47InferCollect,48Done,49}5051polars_ensure!(52!(options.skip_lines != 0 && options.skip_rows != 0),53InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"54);5556// We have to treat skip_lines differently since the lines it skips may not follow regular CSV57// quote escape rules.58let prev_leftover = skip_lines_naive(59options.parse_options.eol_char,60options.skip_lines,61options.raise_if_empty,62reader,63)?;6465let mut state = if options.has_header {66State::SkipEmpty67} else if options.skip_lines != 0 {68// skip_lines shouldn't skip extra comments before the header, so directly go to SkipHeader69// state.70State::SkipHeader(false)71} else {72State::SkipRowsBeforeHeader(options.skip_rows)73};7475let comment_prefix = options.parse_options.comment_prefix.as_ref();76let infer_schema_length = options.infer_schema_length.unwrap_or(usize::MAX);7778let mut header_line = None;79let mut content_lines = Vec::with_capacity(options.infer_schema_length.unwrap_or_else(|| {80reader81.total_len_estimate()82.saturating_div(ESTIMATED_BYTES_PER_ROW)83}));8485// In the compressed case `reader.read_next_slice` has to copy the previous leftover into a new86// `Vec` which would lead to quadratic copying if we don't factor in `infer_schema_length` into87// the initial read size. We have to retain the row memory for schema inference and also for88// actual morsel generation. If `infer_schema_length` is set to `None` we will have to read the89// full input anyway so we can do so once and avoid re-copying.90let initial_read_size = options91.infer_schema_length92.map(|isl| {93cmp::max(94CompressedReader::initial_read_size(),95isl.saturating_mul(ESTIMATED_BYTES_PER_ROW),96)97})98.unwrap_or(usize::MAX);99100let leftover = for_each_line_from_reader(101&options.parse_options,102true,103prev_leftover,104initial_read_size,105reader,106|mem_slice_line| {107let line = &*mem_slice_line;108109let done = loop {110match &mut state {111State::SkipEmpty => {112if line.is_empty() || line == b"\r" {113break LineUse::ConsumeDiscard;114}115116state = State::SkipRowsBeforeHeader(options.skip_rows);117},118State::SkipRowsBeforeHeader(remaining) => {119let is_comment = is_comment_line(line, comment_prefix);120121if *remaining == 0 && !is_comment {122state = State::SkipHeader(false);123continue;124}125126*remaining -= !is_comment as usize;127break LineUse::ConsumeDiscard;128},129State::SkipHeader(did_skip) => {130if !options.has_header || *did_skip {131state = State::SkipRowsAfterHeader(options.skip_rows_after_header);132continue;133}134135header_line = Some(mem_slice_line.clone());136*did_skip = true;137break LineUse::ConsumeDiscard;138},139State::SkipRowsAfterHeader(remaining) => {140let is_comment = is_comment_line(line, comment_prefix);141142if *remaining == 0 && !is_comment {143state = State::ContentInspect;144continue;145}146147*remaining -= !is_comment as usize;148break LineUse::ConsumeDiscard;149},150State::ContentInspect => {151if let Some(func) = &mut inspect_first_content_row_fn {152func(line);153}154155state = State::InferCollect;156},157State::InferCollect => {158if !is_comment_line(line, comment_prefix) {159content_lines.push(mem_slice_line.clone());160if content_lines.len() >= infer_schema_length {161state = State::Done;162continue;163}164}165166break LineUse::ConsumeKeep;167},168State::Done => {169break LineUse::Done;170},171}172};173174Ok(done)175},176)?;177178let infer_all_as_str = infer_schema_length == 0;179180let inferred_schema = infer_schema(181&header_line,182&content_lines,183infer_all_as_str,184options,185projected_schema,186)?;187188Ok((inferred_schema, leftover))189}190191enum LineUse {192ConsumeDiscard,193ConsumeKeep,194Done,195}196197/// Iterate over valid CSV lines produced by reader.198///199/// Returning `ConsumeDiscard` after `ConsumeKeep` is a logic error, since a segmented `Buffer`200/// can't be constructed.201fn for_each_line_from_reader(202parse_options: &CsvParseOptions,203is_file_start: bool,204mut prev_leftover: Buffer<u8>,205initial_read_size: usize,206reader: &mut CompressedReader,207mut line_fn: impl FnMut(Buffer<u8>) -> PolarsResult<LineUse>,208) -> PolarsResult<Buffer<u8>> {209let mut is_first_line = is_file_start;210211let fixed_read_size = std::env::var("POLARS_FORCE_CSV_INFER_CHUNK_SIZE")212.map(|x| {213x.parse::<NonZeroUsize>()214.unwrap_or_else(|_| {215panic!("invalid value for POLARS_FORCE_CSV_INFER_CHUNK_SIZE: {x}")216})217.get()218})219.ok();220221let mut read_size = fixed_read_size.unwrap_or(initial_read_size);222let mut retain_offset = None;223224loop {225let (mut slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;226if slice.is_empty() {227return Ok(Buffer::new());228}229230if is_first_line {231is_first_line = false;232const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");233if slice.get(0..3) == UTF8_BOM_MARKER {234slice = slice.sliced(3..);235}236}237238let line_to_sub_slice = |line: &[u8]| {239let start = line.as_ptr() as usize - slice.as_ptr() as usize;240slice.clone().sliced(start..(start + line.len()))241};242243// When reading a CSV with `has_header=False` we need to read up to `infer_schema_length` lines, but we only want to decompress the input once, so we grow a `Buffer` that will be returned as leftover.244let effective_slice = if let Some(offset) = retain_offset {245slice.clone().sliced(offset..)246} else {247slice.clone()248};249250let mut lines = SplitLines::new(251&effective_slice,252parse_options.quote_char,253parse_options.eol_char,254parse_options.comment_prefix.as_ref(),255);256let Some(mut prev_line) = lines.next() else {257read_size = read_size.saturating_mul(2);258prev_leftover = slice;259continue;260};261262let mut should_ret = false;263264// The last line in `SplitLines` may be incomplete if `slice` ends before the file does, so265// we iterate everything except the last line.266for next_line in lines {267match line_fn(line_to_sub_slice(prev_line))? {268LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),269LineUse::ConsumeKeep => {270if retain_offset.is_none() {271let retain_start_offset =272prev_line.as_ptr() as usize - slice.as_ptr() as usize;273prev_leftover = slice.clone().sliced(retain_start_offset..);274retain_offset = Some(0);275}276},277LineUse::Done => {278should_ret = true;279break;280},281}282prev_line = next_line;283}284285let mut unconsumed_offset = prev_line.as_ptr() as usize - effective_slice.as_ptr() as usize;286287// EOF file reached, the last line will have no continuation on the next call to288// `read_next_slice`.289if bytes_read < read_size {290match line_fn(line_to_sub_slice(prev_line))? {291LineUse::ConsumeDiscard => {292debug_assert!(retain_offset.is_none());293unconsumed_offset += prev_line.len();294if effective_slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {295unconsumed_offset += 1;296}297},298LineUse::ConsumeKeep | LineUse::Done => (),299}300should_ret = true;301}302303if let Some(offset) = &mut retain_offset {304if *offset == 0 {305// `unconsumed_offset` was computed with the full `slice` as base reference306// compensate retained offset.307*offset = unconsumed_offset - (slice.len() - prev_leftover.len());308} else {309prev_leftover = slice;310*offset += unconsumed_offset;311}312} else {313// Since `read_next_slice` has to copy the leftover bytes in the decompression case,314// it's more efficient to hand in as little as possible.315prev_leftover = slice.sliced(unconsumed_offset..);316}317318if should_ret {319return Ok(prev_leftover);320}321322if read_size < CompressedReader::ideal_read_size() && fixed_read_size.is_none() {323read_size *= 4;324}325}326}327328fn skip_lines_naive(329eol_char: u8,330skip_lines: usize,331raise_if_empty: bool,332reader: &mut CompressedReader,333) -> PolarsResult<Buffer<u8>> {334let mut prev_leftover = Buffer::new();335336if skip_lines == 0 {337return Ok(prev_leftover);338}339340let mut remaining = skip_lines;341let mut read_size = CompressedReader::initial_read_size();342343loop {344let (slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;345let mut bytes: &[u8] = &slice;346347'inner: loop {348let Some(mut pos) = memchr::memchr(eol_char, bytes) else {349read_size = read_size.saturating_mul(2);350break 'inner;351};352pos = cmp::min(pos + 1, bytes.len());353354bytes = &bytes[pos..];355remaining -= 1;356357if remaining == 0 {358let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;359prev_leftover = slice.sliced(unconsumed_offset..);360return Ok(prev_leftover);361}362}363364if bytes_read == 0 {365if raise_if_empty {366polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");367} else {368return Ok(Buffer::new());369}370}371372// No need to search for naive eol twice in the leftover.373prev_leftover = Buffer::new();374375if read_size < CompressedReader::ideal_read_size() {376read_size *= 4;377}378}379}380381fn infer_schema(382header_line: &Option<Buffer<u8>>,383content_lines: &[Buffer<u8>],384infer_all_as_str: bool,385options: &CsvReadOptions,386projected_schema: Option<SchemaRef>,387) -> PolarsResult<Schema> {388let has_no_inference_data = if options.has_header {389header_line.is_none()390} else {391content_lines.is_empty()392};393394if options.raise_if_empty && has_no_inference_data {395polars_bail!(NoData: "empty CSV");396}397398let mut inferred_schema = if has_no_inference_data {399Schema::default()400} else {401infer_file_schema_impl(402header_line,403content_lines,404infer_all_as_str,405&options.parse_options,406options.schema_overwrite.as_deref(),407)408};409410if let Some(schema) = &options.schema {411// Note: User can provide schema with more columns, they will simply412// be projected as NULL.413// TODO: Should maybe expose a missing_columns parameter to the API for this.414if schema.len() < inferred_schema.len() && !options.parse_options.truncate_ragged_lines {415polars_bail!(416SchemaMismatch:417"provided schema does not match number of columns in file ({} != {} in file)",418schema.len(),419inferred_schema.len(),420);421}422423if options.parse_options.truncate_ragged_lines {424inferred_schema = Arc::unwrap_or_clone(schema.clone());425} else {426inferred_schema = schema427.iter_names()428.zip(inferred_schema.into_iter().map(|(_, dtype)| dtype))429.map(|(name, dtype)| (name.clone(), dtype))430.collect();431}432}433434if let Some(dtypes) = options.dtype_overwrite.as_deref() {435for (i, dtype) in dtypes.iter().enumerate() {436inferred_schema.set_dtype_at_index(i, dtype.clone());437}438}439440// TODO: We currently always override with the projected dtype, but this may cause issues e.g.441// with temporal types. This can be improved to better choose between the 2 dtypes.442if let Some(projected_schema) = projected_schema {443for (name, inferred_dtype) in inferred_schema.iter_mut() {444if let Some(projected_dtype) = projected_schema.get(name) {445*inferred_dtype = projected_dtype.clone();446}447}448}449450Ok(inferred_schema)451}452453454