Path: blob/main/crates/polars-io/src/csv/read/read_impl.rs
6939 views
pub(super) mod batched;12use std::fmt;3use std::sync::Mutex;45use polars_core::POOL;6use polars_core::prelude::*;7use polars_core::utils::{accumulate_dataframes_vertical, handle_casting_failures};8#[cfg(feature = "polars-time")]9use polars_time::prelude::*;10use polars_utils::relaxed_cell::RelaxedCell;11use rayon::prelude::*;1213use super::CsvParseOptions;14use super::buffer::init_buffers;15use super::options::{CommentPrefix, CsvEncoding, NullValuesCompiled};16use super::parser::{17CountLines, SplitLines, is_comment_line, parse_lines, skip_bom, skip_line_ending,18skip_lines_naive, skip_this_line,19};20use super::reader::prepare_csv_schema;21use super::schema_inference::infer_file_schema;22#[cfg(feature = "decompress")]23use super::utils::decompress;24use crate::RowIndex;25use crate::csv::read::parser::skip_this_line_naive;26use crate::mmap::ReaderBytes;27use crate::predicates::PhysicalIoExpr;28use crate::utils::compression::SupportedCompression;29use crate::utils::update_row_counts2;3031pub fn cast_columns(32df: &mut DataFrame,33to_cast: &[Field],34parallel: bool,35ignore_errors: bool,36) -> PolarsResult<()> {37let cast_fn = |c: &Column, fld: &Field| {38let out = match (c.dtype(), fld.dtype()) {39#[cfg(feature = "temporal")]40(DataType::String, DataType::Date) => c41.str()42.unwrap()43.as_date(None, false)44.map(|ca| ca.into_column()),45#[cfg(feature = "temporal")]46(DataType::String, DataType::Time) => c47.str()48.unwrap()49.as_time(None, false)50.map(|ca| ca.into_column()),51#[cfg(feature = "temporal")]52(DataType::String, DataType::Datetime(tu, _)) => c53.str()54.unwrap()55.as_datetime(56None,57*tu,58false,59false,60None,61&StringChunked::from_iter(std::iter::once("raise")),62)63.map(|ca| ca.into_column()),64(_, dt) => c.cast(dt),65}?;66if !ignore_errors && c.null_count() != out.null_count() {67handle_casting_failures(c.as_materialized_series(), out.as_materialized_series())?;68}69Ok(out)70};7172if parallel {73let cols = POOL.install(|| {74df.get_columns()75.into_par_iter()76.map(|s| {77if let Some(fld) = to_cast.iter().find(|fld| fld.name() == s.name()) {78cast_fn(s, fld)79} else {80Ok(s.clone())81}82})83.collect::<PolarsResult<Vec<_>>>()84})?;85*df = unsafe { DataFrame::new_no_checks(df.height(), cols) }86} else {87// cast to the original dtypes in the schema88for fld in to_cast {89// field may not be projected90if let Some(idx) = df.get_column_index(fld.name()) {91df.try_apply_at_idx(idx, |s| cast_fn(s, fld))?;92}93}9495df.clear_schema();96}97Ok(())98}99100/// CSV file reader101pub(crate) struct CoreReader<'a> {102reader_bytes: Option<ReaderBytes<'a>>,103/// Explicit schema for the CSV file104schema: SchemaRef,105parse_options: CsvParseOptions,106/// Optional projection for which columns to load (zero-based column indices)107projection: Option<Vec<usize>>,108/// Current line number, used in error reporting109current_line: usize,110ignore_errors: bool,111skip_lines: usize,112skip_rows_before_header: usize,113// after the header, we need to take embedded lines into account114skip_rows_after_header: usize,115n_rows: Option<usize>,116n_threads: Option<usize>,117has_header: bool,118chunk_size: usize,119null_values: Option<NullValuesCompiled>,120predicate: Option<Arc<dyn PhysicalIoExpr>>,121to_cast: Vec<Field>,122row_index: Option<RowIndex>,123}124125impl fmt::Debug for CoreReader<'_> {126fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {127f.debug_struct("Reader")128.field("schema", &self.schema)129.field("projection", &self.projection)130.field("current_line", &self.current_line)131.finish()132}133}134135impl<'a> CoreReader<'a> {136#[allow(clippy::too_many_arguments)]137pub(crate) fn new(138reader_bytes: ReaderBytes<'a>,139parse_options: Arc<CsvParseOptions>,140n_rows: Option<usize>,141skip_rows: usize,142skip_lines: usize,143mut projection: Option<Vec<usize>>,144max_records: Option<usize>,145has_header: bool,146ignore_errors: bool,147schema: Option<SchemaRef>,148columns: Option<Arc<[PlSmallStr]>>,149n_threads: Option<usize>,150schema_overwrite: Option<SchemaRef>,151dtype_overwrite: Option<Arc<Vec<DataType>>>,152chunk_size: usize,153predicate: Option<Arc<dyn PhysicalIoExpr>>,154mut to_cast: Vec<Field>,155skip_rows_after_header: usize,156row_index: Option<RowIndex>,157raise_if_empty: bool,158) -> PolarsResult<CoreReader<'a>> {159let separator = parse_options.separator;160161#[cfg(feature = "decompress")]162let mut reader_bytes = reader_bytes;163164if !cfg!(feature = "decompress") && SupportedCompression::check(&reader_bytes).is_some() {165polars_bail!(166ComputeError: "cannot read compressed CSV file; \167compile with feature 'decompress'"168);169}170// We keep track of the inferred schema bool171// In case the file is compressed this schema inference is wrong and has to be done172// again after decompression.173#[cfg(feature = "decompress")]174{175let total_n_rows =176n_rows.map(|n| skip_rows + (has_header as usize) + skip_rows_after_header + n);177if let Some(b) = decompress(178&reader_bytes,179total_n_rows,180separator,181parse_options.quote_char,182parse_options.eol_char,183) {184reader_bytes = ReaderBytes::Owned(b.into());185}186}187188let mut schema = match schema {189Some(schema) => schema,190None => {191let (inferred_schema, _, _) = infer_file_schema(192&reader_bytes,193&parse_options,194max_records,195has_header,196schema_overwrite.as_deref(),197skip_rows,198skip_lines,199skip_rows_after_header,200raise_if_empty,201)?;202Arc::new(inferred_schema)203},204};205if let Some(dtypes) = dtype_overwrite {206polars_ensure!(207dtypes.len() <= schema.len(),208InvalidOperation: "The number of schema overrides must be less than or equal to the number of fields"209);210let s = Arc::make_mut(&mut schema);211for (index, dt) in dtypes.iter().enumerate() {212s.set_dtype_at_index(index, dt.clone()).unwrap();213}214}215216prepare_csv_schema(&mut schema, &mut to_cast)?;217218// Create a null value for every column219let null_values = parse_options220.null_values221.as_ref()222.map(|nv| nv.clone().compile(&schema))223.transpose()?;224225if let Some(cols) = columns {226let mut prj = Vec::with_capacity(cols.len());227for col in cols.as_ref() {228let i = schema.try_index_of(col)?;229prj.push(i);230}231projection = Some(prj);232}233234Ok(CoreReader {235reader_bytes: Some(reader_bytes),236parse_options: (*parse_options).clone(),237schema,238projection,239current_line: usize::from(has_header),240ignore_errors,241skip_lines,242skip_rows_before_header: skip_rows,243skip_rows_after_header,244n_rows,245n_threads,246has_header,247chunk_size,248null_values,249predicate,250to_cast,251row_index,252})253}254255fn find_starting_point<'b>(256&self,257bytes: &'b [u8],258quote_char: Option<u8>,259eol_char: u8,260) -> PolarsResult<(&'b [u8], Option<usize>)> {261let i = find_starting_point(262bytes,263quote_char,264eol_char,265self.schema.len(),266self.skip_lines,267self.skip_rows_before_header,268self.skip_rows_after_header,269self.parse_options.comment_prefix.as_ref(),270self.has_header,271)?;272273Ok((&bytes[i..], (i <= bytes.len()).then_some(i)))274}275276fn get_projection(&mut self) -> PolarsResult<Vec<usize>> {277// we also need to sort the projection to have predictable output.278// the `parse_lines` function expects this.279self.projection280.take()281.map(|mut v| {282v.sort_unstable();283if let Some(idx) = v.last() {284polars_ensure!(*idx < self.schema.len(), OutOfBounds: "projection index: {} is out of bounds for csv schema with length: {}", idx, self.schema.len())285}286Ok(v)287})288.unwrap_or_else(|| Ok((0..self.schema.len()).collect()))289}290291fn read_chunk(292&self,293bytes: &[u8],294projection: &[usize],295bytes_offset: usize,296capacity: usize,297starting_point_offset: Option<usize>,298stop_at_nbytes: usize,299) -> PolarsResult<DataFrame> {300let mut df = read_chunk(301bytes,302&self.parse_options,303self.schema.as_ref(),304self.ignore_errors,305projection,306bytes_offset,307capacity,308self.null_values.as_ref(),309usize::MAX,310stop_at_nbytes,311starting_point_offset,312)?;313314cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;315Ok(df)316}317318// The code adheres to RFC 4180 in a strict sense, unless explicitly documented otherwise.319// Malformed CSV is common, see e.g. the use of lazy_quotes, whitespace and comments.320// In case malformed CSV is detected, a warning or an error will be issued.321// Not all malformed CSV will be detected, as that would impact performance.322fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {323let (bytes, _) = self.find_starting_point(324bytes,325self.parse_options.quote_char,326self.parse_options.eol_char,327)?;328329let projection = self.get_projection()?;330331// An empty file with a schema should return an empty DataFrame with that schema332if bytes.is_empty() {333let mut df = if projection.len() == self.schema.len() {334DataFrame::empty_with_schema(self.schema.as_ref())335} else {336DataFrame::empty_with_schema(337&projection338.iter()339.map(|&i| self.schema.get_at_index(i).unwrap())340.map(|(name, dtype)| Field {341name: name.clone(),342dtype: dtype.clone(),343})344.collect::<Schema>(),345)346};347348cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;349350if let Some(ref row_index) = self.row_index {351df.insert_column(0, Series::new_empty(row_index.name.clone(), &IDX_DTYPE))?;352}353return Ok(df);354}355356let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());357358// This is chosen by benchmarking on ny city trip csv dataset.359// We want small enough chunks such that threads start working as soon as possible360// But we also want them large enough, so that we have less chunks related overhead, but361// We minimize chunks to 16 MB to still fit L3 cache.362let n_parts_hint = n_threads * 16;363let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);364365// Use a small min chunk size to catch failures in tests.366#[cfg(debug_assertions)]367let min_chunk_size = 64;368#[cfg(not(debug_assertions))]369let min_chunk_size = 1024 * 4;370371let mut chunk_size = std::cmp::max(chunk_size, min_chunk_size);372let mut total_bytes_offset = 0;373374let results = Arc::new(Mutex::new(vec![]));375// We have to do this after parsing as there can be comments.376let total_line_count = &RelaxedCell::new_usize(0);377378#[cfg(not(target_family = "wasm"))]379let pool;380#[cfg(not(target_family = "wasm"))]381let pool = if n_threads == POOL.current_num_threads() {382&POOL383} else {384pool = rayon::ThreadPoolBuilder::new()385.num_threads(n_threads)386.build()387.map_err(|_| polars_err!(ComputeError: "could not spawn threads"))?;388&pool389};390#[cfg(target_family = "wasm")]391let pool = &POOL;392393let counter = CountLines::new(self.parse_options.quote_char, self.parse_options.eol_char);394let mut total_offset = 0;395let mut previous_total_offset = 0;396let check_utf8 = matches!(self.parse_options.encoding, CsvEncoding::Utf8)397&& self.schema.iter_fields().any(|f| f.dtype().is_string());398399pool.scope(|s| {400// Pass 1: identify chunks for parallel processing (line parsing).401loop {402let b = unsafe { bytes.get_unchecked(total_offset..) };403if b.is_empty() {404break;405}406debug_assert!(407total_offset == 0 || bytes[total_offset - 1] == self.parse_options.eol_char408);409410// Count is the number of rows for the next chunk. In case of malformed CSV data,411// count may not be as expected.412let (count, position) = counter.find_next(b, &mut chunk_size);413debug_assert!(count == 0 || b[position] == self.parse_options.eol_char);414415let (b, count) = if count == 0416&& unsafe {417std::ptr::eq(b.as_ptr().add(b.len()), bytes.as_ptr().add(bytes.len()))418} {419total_offset = bytes.len();420(b, 1)421} else {422if count == 0 {423chunk_size *= 2;424continue;425}426427let end = total_offset + position + 1;428let b = unsafe { bytes.get_unchecked(total_offset..end) };429430previous_total_offset = total_offset;431total_offset = end;432(b, count)433};434435// Pass 2: process each individual chunk in parallel (field parsing)436if !b.is_empty() {437let results = results.clone();438let projection = projection.as_ref();439let slf = &(*self);440s.spawn(move |_| {441if check_utf8 && !super::buffer::validate_utf8(b) {442let mut results = results.lock().unwrap();443results.push((444b.as_ptr() as usize,445Err(polars_err!(ComputeError: "invalid utf-8 sequence")),446));447return;448}449450let result = slf451.read_chunk(b, projection, 0, count, Some(0), b.len())452.and_then(|mut df| {453454// Check malformed455if df.height() > count || (df.height() < count && slf.parse_options.comment_prefix.is_none()) {456// Note: in case data is malformed, df.height() is more likely to be correct than count.457let msg = format!("CSV malformed: expected {} rows, actual {} rows, in chunk starting at byte offset {}, length {}",458count, df.height(), previous_total_offset, b.len());459if slf.ignore_errors {460polars_warn!(msg);461} else {462polars_bail!(ComputeError: msg);463}464}465466if slf.n_rows.is_some() {467total_line_count.fetch_add(df.height());468}469470// We cannot use the line count as there can be comments in the lines so we must correct line counts later.471if let Some(rc) = &slf.row_index {472// is first chunk473let offset = if std::ptr::eq(b.as_ptr(), bytes.as_ptr()) {474Some(rc.offset)475} else {476None477};478479unsafe { df.with_row_index_mut(rc.name.clone(), offset) };480};481482if let Some(predicate) = slf.predicate.as_ref() {483let s = predicate.evaluate_io(&df)?;484let mask = s.bool()?;485df = df.filter(mask)?;486}487Ok(df)488});489490results.lock().unwrap().push((b.as_ptr() as usize, result));491});492493// Check just after we spawned a chunk. That mean we processed all data up until494// row count.495if self.n_rows.is_some()496&& total_line_count.load() > self.n_rows.unwrap()497{498break;499}500}501total_bytes_offset += b.len();502}503});504let mut results = std::mem::take(&mut *results.lock().unwrap());505results.sort_unstable_by_key(|k| k.0);506let mut dfs = results507.into_iter()508.map(|k| k.1)509.collect::<PolarsResult<Vec<_>>>()?;510511if let Some(rc) = &self.row_index {512update_row_counts2(&mut dfs, rc.offset)513};514accumulate_dataframes_vertical(dfs)515}516517/// Read the csv into a DataFrame. The predicate can come from a lazy physical plan.518pub fn finish(mut self) -> PolarsResult<DataFrame> {519let reader_bytes = self.reader_bytes.take().unwrap();520let mut df = self.parse_csv(&reader_bytes)?;521522// if multi-threaded the n_rows was probabilistically determined.523// Let's slice to correct number of rows if possible.524if let Some(n_rows) = self.n_rows {525if n_rows < df.height() {526df = df.slice(0, n_rows)527}528}529Ok(df)530}531}532533#[allow(clippy::too_many_arguments)]534pub fn read_chunk(535bytes: &[u8],536parse_options: &CsvParseOptions,537schema: &Schema,538ignore_errors: bool,539projection: &[usize],540bytes_offset_thread: usize,541capacity: usize,542null_values: Option<&NullValuesCompiled>,543chunk_size: usize,544stop_at_nbytes: usize,545starting_point_offset: Option<usize>,546) -> PolarsResult<DataFrame> {547let mut read = bytes_offset_thread;548// There's an off-by-one error somewhere in the reading code, where it reads549// one more item than the requested capacity. Given the batch sizes are550// approximate (sometimes they're smaller), this isn't broken, but it does551// mean a bunch of extra allocation and copying. So we allocate a552// larger-by-one buffer so the size is more likely to be accurate.553let mut buffers = init_buffers(554projection,555capacity + 1,556schema,557parse_options.quote_char,558parse_options.encoding,559parse_options.decimal_comma,560)?;561562debug_assert!(projection.is_sorted());563564let mut last_read = usize::MAX;565loop {566if read >= stop_at_nbytes || read == last_read {567break;568}569let local_bytes = &bytes[read..stop_at_nbytes];570571last_read = read;572let offset = read + starting_point_offset.unwrap();573read += parse_lines(574local_bytes,575parse_options,576offset,577ignore_errors,578null_values,579projection,580&mut buffers,581chunk_size,582schema.len(),583schema,584)?;585}586587let columns = buffers588.into_iter()589.map(|buf| buf.into_series().map(Column::from))590.collect::<PolarsResult<Vec<_>>>()?;591Ok(unsafe { DataFrame::new_no_checks_height_from_first(columns) })592}593594#[allow(clippy::too_many_arguments)]595pub fn find_starting_point(596mut bytes: &[u8],597quote_char: Option<u8>,598eol_char: u8,599schema_len: usize,600skip_lines: usize,601skip_rows_before_header: usize,602skip_rows_after_header: usize,603comment_prefix: Option<&CommentPrefix>,604has_header: bool,605) -> PolarsResult<usize> {606let full_len = bytes.len();607let starting_point_offset = bytes.as_ptr() as usize;608609bytes = if skip_lines > 0 {610polars_ensure!(skip_rows_before_header == 0, InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set");611skip_lines_naive(bytes, eol_char, skip_lines)612} else {613// Skip utf8 byte-order-mark (BOM)614bytes = skip_bom(bytes);615616// \n\n can be a empty string row of a single column617// in other cases we skip it.618if schema_len > 1 {619bytes = skip_line_ending(bytes, eol_char)620}621bytes622};623624// skip 'n' leading rows625if skip_rows_before_header > 0 {626let mut split_lines = SplitLines::new(bytes, quote_char, eol_char, comment_prefix);627let mut current_line = &bytes[..0];628629for _ in 0..skip_rows_before_header {630current_line = split_lines631.next()632.ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;633}634635current_line = split_lines636.next()637.unwrap_or(¤t_line[current_line.len()..]);638bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];639}640641// skip lines that are comments642while is_comment_line(bytes, comment_prefix) {643bytes = skip_this_line_naive(bytes, eol_char);644}645646// skip header row647if has_header {648bytes = skip_this_line(bytes, quote_char, eol_char);649}650// skip 'n' rows following the header651if skip_rows_after_header > 0 {652let mut split_lines = SplitLines::new(bytes, quote_char, eol_char, comment_prefix);653let mut current_line = &bytes[..0];654655for _ in 0..skip_rows_after_header {656current_line = split_lines657.next()658.ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;659}660661current_line = split_lines662.next()663.unwrap_or(¤t_line[current_line.len()..]);664bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];665}666667Ok(668// Some of the functions we call may return `&'static []` instead of669// slices of `&bytes[..]`.670if bytes.is_empty() {671full_len672} else {673bytes.as_ptr() as usize - starting_point_offset674},675)676}677678679