Path: blob/main/crates/polars-io/src/csv/read/read_impl.rs
8418 views
use std::fmt;1use std::sync::Mutex;23use polars_buffer::{Buffer, SharedStorage};4use polars_core::POOL;5use polars_core::prelude::*;6use polars_core::utils::{accumulate_dataframes_vertical, handle_casting_failures};7#[cfg(feature = "polars-time")]8use polars_time::prelude::*;9use polars_utils::relaxed_cell::RelaxedCell;10use rayon::prelude::*;1112use super::CsvParseOptions;13use super::builder::init_builders;14use super::options::{CsvEncoding, NullValuesCompiled};15use super::parser::{CountLines, is_comment_line, parse_lines};16use super::reader::prepare_csv_schema;17#[cfg(feature = "decompress")]18use super::utils::decompress;19use crate::RowIndex;20use crate::csv::read::{CsvReadOptions, read_until_start_and_infer_schema};21use crate::mmap::ReaderBytes;22use crate::predicates::PhysicalIoExpr;23use crate::utils::compression::{CompressedReader, SupportedCompression};24use crate::utils::update_row_counts2;2526pub fn cast_columns(27df: &mut DataFrame,28to_cast: &[Field],29parallel: bool,30ignore_errors: bool,31) -> PolarsResult<()> {32let cast_fn = |c: &Column, fld: &Field| {33let out = match (c.dtype(), fld.dtype()) {34#[cfg(feature = "temporal")]35(DataType::String, DataType::Date) => c36.str()37.unwrap()38.as_date(None, false)39.map(|ca| ca.into_column()),40#[cfg(feature = "temporal")]41(DataType::String, DataType::Time) => c42.str()43.unwrap()44.as_time(None, false)45.map(|ca| ca.into_column()),46#[cfg(feature = "temporal")]47(DataType::String, DataType::Datetime(tu, _)) => c48.str()49.unwrap()50.as_datetime(51None,52*tu,53false,54false,55None,56&StringChunked::from_iter(std::iter::once("raise")),57)58.map(|ca| ca.into_column()),59(_, dt) => c.cast(dt),60}?;61if !ignore_errors && c.null_count() != out.null_count() {62handle_casting_failures(c.as_materialized_series(), out.as_materialized_series())?;63}64Ok(out)65};6667if parallel {68let cols = POOL.install(|| {69df.columns()70.into_par_iter()71.map(|s| {72if let Some(fld) = to_cast.iter().find(|fld| fld.name() == s.name()) {73cast_fn(s, fld)74} else {75Ok(s.clone())76}77})78.collect::<PolarsResult<Vec<_>>>()79})?;80*df = unsafe { DataFrame::new_unchecked(df.height(), cols) }81} else {82// cast to the original dtypes in the schema83for fld in to_cast {84// field may not be projected85if let Some(idx) = df.get_column_index(fld.name()) {86df.try_apply_at_idx(idx, |s| cast_fn(s, fld))?;87}88}89}90Ok(())91}9293struct ReaderBytesAndDependents<'a> {94// Ensure lifetime dependents are dropped before `reader_bytes`, since their drop impls95// could access themselves, this is achieved by placing them before `reader_bytes`.96// SAFETY: This is lifetime bound to `reader_bytes`97compressed_reader: CompressedReader,98// SAFETY: This is lifetime bound to `reader_bytes`99leftover: Buffer<u8>,100_reader_bytes: ReaderBytes<'a>,101}102103/// CSV file reader104pub(crate) struct CoreReader<'a> {105reader_bytes: Option<ReaderBytesAndDependents<'a>>,106107/// Explicit schema for the CSV file108schema: SchemaRef,109parse_options: CsvParseOptions,110/// Optional projection for which columns to load (zero-based column indices)111projection: Option<Vec<usize>>,112/// Current line number, used in error reporting113current_line: usize,114ignore_errors: bool,115n_rows: Option<usize>,116n_threads: Option<usize>,117null_values: Option<NullValuesCompiled>,118predicate: Option<Arc<dyn PhysicalIoExpr>>,119to_cast: Vec<Field>,120row_index: Option<RowIndex>,121}122123impl fmt::Debug for CoreReader<'_> {124fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {125f.debug_struct("Reader")126.field("schema", &self.schema)127.field("projection", &self.projection)128.field("current_line", &self.current_line)129.finish()130}131}132133impl<'a> CoreReader<'a> {134#[allow(clippy::too_many_arguments)]135pub(crate) fn new(136reader_bytes: ReaderBytes<'a>,137parse_options: Arc<CsvParseOptions>,138n_rows: Option<usize>,139skip_rows: usize,140skip_lines: usize,141mut projection: Option<Vec<usize>>,142max_records: Option<usize>,143has_header: bool,144ignore_errors: bool,145schema: Option<SchemaRef>,146columns: Option<Arc<[PlSmallStr]>>,147n_threads: Option<usize>,148schema_overwrite: Option<SchemaRef>,149dtype_overwrite: Option<Arc<Vec<DataType>>>,150predicate: Option<Arc<dyn PhysicalIoExpr>>,151mut to_cast: Vec<Field>,152skip_rows_after_header: usize,153row_index: Option<RowIndex>,154raise_if_empty: bool,155) -> PolarsResult<CoreReader<'a>> {156let separator = parse_options.separator;157158#[cfg(feature = "decompress")]159let mut reader_bytes = reader_bytes;160161if !cfg!(feature = "decompress") && SupportedCompression::check(&reader_bytes).is_some() {162polars_bail!(163ComputeError: "cannot read compressed CSV file; \164compile with feature 'decompress'"165);166}167// We keep track of the inferred schema bool168// In case the file is compressed this schema inference is wrong and has to be done169// again after decompression.170#[cfg(feature = "decompress")]171{172let total_n_rows =173n_rows.map(|n| skip_rows + (has_header as usize) + skip_rows_after_header + n);174if let Some(b) = decompress(175&reader_bytes,176total_n_rows,177separator,178parse_options.quote_char,179parse_options.eol_char,180) {181reader_bytes = ReaderBytes::Owned(b.into());182}183}184185let reader_slice = match &reader_bytes {186ReaderBytes::Borrowed(slice) => {187// SAFETY: The produced slice and derived slices MUST not live longer than188// `reader_bytes`. TODO use `scan_csv` to implement `read_csv`.189let ss = unsafe { SharedStorage::from_slice_unchecked(slice) };190Buffer::from_storage(ss)191},192ReaderBytes::Owned(slice) => slice.clone(),193};194let mut compressed_reader = CompressedReader::try_new(reader_slice)?;195196let read_options = CsvReadOptions {197parse_options: parse_options.clone(),198n_rows,199skip_rows,200skip_lines,201projection: projection.clone().map(Arc::new),202has_header,203ignore_errors,204schema: schema.clone(),205columns: columns.clone(),206n_threads,207schema_overwrite,208dtype_overwrite: dtype_overwrite.clone(),209fields_to_cast: to_cast.clone(),210skip_rows_after_header,211row_index: row_index.clone(),212raise_if_empty,213infer_schema_length: max_records,214..Default::default()215};216217// Since this is also used to skip to the start, always call it.218let (inferred_schema, leftover) =219read_until_start_and_infer_schema(&read_options, None, None, &mut compressed_reader)?;220221let mut schema = match schema {222Some(schema) => schema,223None => Arc::new(inferred_schema),224};225if let Some(dtypes) = dtype_overwrite {226polars_ensure!(227dtypes.len() <= schema.len(),228InvalidOperation: "The number of schema overrides must be less than or equal to the number of fields"229);230let s = Arc::make_mut(&mut schema);231for (index, dt) in dtypes.iter().enumerate() {232s.set_dtype_at_index(index, dt.clone()).unwrap();233}234}235236prepare_csv_schema(&mut schema, &mut to_cast)?;237238// Create a null value for every column239let null_values = parse_options240.null_values241.as_ref()242.map(|nv| nv.clone().compile(&schema))243.transpose()?;244245if let Some(cols) = columns {246let mut prj = Vec::with_capacity(cols.len());247for col in cols.as_ref() {248let i = schema.try_index_of(col)?;249prj.push(i);250}251projection = Some(prj);252}253254Ok(CoreReader {255reader_bytes: Some(ReaderBytesAndDependents {256compressed_reader,257leftover,258_reader_bytes: reader_bytes,259}),260parse_options: (*parse_options).clone(),261schema,262projection,263current_line: usize::from(has_header),264ignore_errors,265n_rows,266n_threads,267null_values,268predicate,269to_cast,270row_index,271})272}273274fn get_projection(&mut self) -> PolarsResult<Vec<usize>> {275// we also need to sort the projection to have predictable output.276// the `parse_lines` function expects this.277self.projection278.take()279.map(|mut v| {280v.sort_unstable();281if let Some(idx) = v.last() {282polars_ensure!(*idx < self.schema.len(), OutOfBounds: "projection index: {} is out of bounds for csv schema with length: {}", idx, self.schema.len())283}284Ok(v)285})286.unwrap_or_else(|| Ok((0..self.schema.len()).collect()))287}288289fn read_chunk(290&self,291bytes: &[u8],292projection: &[usize],293bytes_offset: usize,294capacity: usize,295starting_point_offset: Option<usize>,296stop_at_nbytes: usize,297) -> PolarsResult<DataFrame> {298let mut df = read_chunk(299bytes,300&self.parse_options,301self.schema.as_ref(),302self.ignore_errors,303projection,304bytes_offset,305capacity,306self.null_values.as_ref(),307usize::MAX,308stop_at_nbytes,309starting_point_offset,310)?;311312cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;313Ok(df)314}315316// The code adheres to RFC 4180 in a strict sense, unless explicitly documented otherwise.317// Malformed CSV is common, see e.g. the use of lazy_quotes, whitespace and comments.318// In case malformed CSV is detected, a warning or an error will be issued.319// Not all malformed CSV will be detected, as that would impact performance.320fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {321let projection = self.get_projection()?;322323// An empty file with a schema should return an empty DataFrame with that schema324if bytes.is_empty() {325let mut df = if projection.len() == self.schema.len() {326DataFrame::empty_with_schema(self.schema.as_ref())327} else {328DataFrame::empty_with_schema(329&projection330.iter()331.map(|&i| self.schema.get_at_index(i).unwrap())332.map(|(name, dtype)| Field {333name: name.clone(),334dtype: dtype.clone(),335})336.collect::<Schema>(),337)338};339340cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;341342if let Some(ref row_index) = self.row_index {343df.insert_column(0, Column::new_empty(row_index.name.clone(), &IDX_DTYPE))?;344}345return Ok(df);346}347348let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());349350// This is chosen by benchmarking on ny city trip csv dataset.351// We want small enough chunks such that threads start working as soon as possible352// But we also want them large enough, so that we have less chunks related overhead.353// We minimize chunks to 16 MB to still fit L3 cache.354//355// Width-aware adjustment: For wide data (many columns), per-chunk overhead356// (allocating column buffers) becomes significant. Each chunk must allocate357// O(n_cols) buffers, so total allocation overhead is O(n_chunks * n_cols).358// To keep this bounded, we limit n_chunks such that n_chunks * n_cols <= threshold.359// With threshold ~500K, this gives:360// - 100 cols: up to 5000 chunks (no practical limit)361// - 1000 cols: up to 500 chunks362// - 10000 cols: up to 50 chunks363// - 30000 cols: up to 16 chunks364let n_cols = projection.len();365// Empirically determined to balance allocation overhead and parallelism.366const ALLOCATION_BUDGET: usize = 500_000;367let max_chunks_for_width = ALLOCATION_BUDGET / n_cols.max(1);368let n_parts_hint = std::cmp::min(n_threads * 16, max_chunks_for_width.max(n_threads));369let chunk_size = std::cmp::min(bytes.len() / n_parts_hint.max(1), 16 * 1024 * 1024);370371// Use a small min chunk size to catch failures in tests.372#[cfg(debug_assertions)]373let min_chunk_size = 64;374#[cfg(not(debug_assertions))]375let min_chunk_size = 1024 * 4;376377let mut chunk_size = std::cmp::max(chunk_size, min_chunk_size);378let mut total_bytes_offset = 0;379380let results = Arc::new(Mutex::new(vec![]));381// We have to do this after parsing as there can be comments.382let total_line_count = &RelaxedCell::new_usize(0);383384let counter = CountLines::new(385self.parse_options.quote_char,386self.parse_options.eol_char,387None,388);389let mut total_offset = 0;390let mut previous_total_offset = 0;391let check_utf8 = matches!(self.parse_options.encoding, CsvEncoding::Utf8)392&& self.schema.iter_fields().any(|f| f.dtype().is_string());393394POOL.scope(|s| {395// Pass 1: identify chunks for parallel processing (line parsing).396loop {397let b = unsafe { bytes.get_unchecked(total_offset..) };398if b.is_empty() {399break;400}401debug_assert!(402total_offset == 0 || bytes[total_offset - 1] == self.parse_options.eol_char403);404405// Count is the number of rows for the next chunk. In case of malformed CSV data,406// count may not be as expected.407let (count, position) = counter.find_next(b, &mut chunk_size);408debug_assert!(count == 0 || b[position] == self.parse_options.eol_char);409410let (b, count) = if count == 0411&& unsafe {412std::ptr::eq(b.as_ptr().add(b.len()), bytes.as_ptr().add(bytes.len()))413} {414total_offset = bytes.len();415let c = if is_comment_line(bytes, self.parse_options.comment_prefix.as_ref()) {4160417} else {4181419};420(b, c)421} else {422let end = total_offset + position + 1;423let b = unsafe { bytes.get_unchecked(total_offset..end) };424425previous_total_offset = total_offset;426total_offset = end;427(b, count)428};429430// Pass 2: process each individual chunk in parallel (field parsing)431if !b.is_empty() {432let results = results.clone();433let projection = projection.as_ref();434let slf = &(*self);435s.spawn(move |_| {436if check_utf8 && !super::builder::validate_utf8(b) {437let mut results = results.lock().unwrap();438results.push((439b.as_ptr() as usize,440Err(polars_err!(ComputeError: "invalid utf-8 sequence")),441));442return;443}444445let result = slf446.read_chunk(b, projection, 0, count, Some(0), b.len())447.and_then(|mut df| {448// Check malformed449if df.height() > count450|| (df.height() < count451&& slf.parse_options.comment_prefix.is_none())452{453// Note: in case data is malformed, df.height() is more likely to be correct than count.454let msg = format!(455"CSV malformed: expected {} rows, \456actual {} rows, in chunk starting at \457byte offset {}, length {}",458count,459df.height(),460previous_total_offset,461b.len()462);463if slf.ignore_errors {464polars_warn!("{}", msg);465} else {466polars_bail!(ComputeError: msg);467}468}469470if slf.n_rows.is_some() {471total_line_count.fetch_add(df.height());472}473474// We cannot use the line count as there can be comments in the lines so we must correct line counts later.475if let Some(rc) = &slf.row_index {476// is first chunk477let offset = if std::ptr::eq(b.as_ptr(), bytes.as_ptr()) {478Some(rc.offset)479} else {480None481};482483unsafe { df.with_row_index_mut(rc.name.clone(), offset) };484};485486if let Some(predicate) = slf.predicate.as_ref() {487let s = predicate.evaluate_io(&df)?;488let mask = s.bool()?;489df = df.filter(mask)?;490}491Ok(df)492});493494results.lock().unwrap().push((b.as_ptr() as usize, result));495});496497// Check just after we spawned a chunk. That mean we processed all data up until498// row count.499if self.n_rows.is_some() && total_line_count.load() > self.n_rows.unwrap() {500break;501}502}503total_bytes_offset += b.len();504}505});506507let mut results = std::mem::take(&mut *results.lock().unwrap());508results.sort_unstable_by_key(|k| k.0);509let mut dfs = results510.into_iter()511.map(|k| k.1)512.collect::<PolarsResult<Vec<_>>>()?;513514if let Some(rc) = &self.row_index {515update_row_counts2(&mut dfs, rc.offset)516};517accumulate_dataframes_vertical(dfs)518}519520/// Read the csv into a DataFrame. The predicate can come from a lazy physical plan.521pub fn finish(mut self) -> PolarsResult<DataFrame> {522let mut reader_bytes = self.reader_bytes.take().unwrap();523let (body_bytes, _) = reader_bytes524.compressed_reader525.read_next_slice(&reader_bytes.leftover, usize::MAX)?;526527let mut df = self.parse_csv(&body_bytes)?;528529// if multi-threaded the n_rows was probabilistically determined.530// Let's slice to correct number of rows if possible.531if let Some(n_rows) = self.n_rows {532if n_rows < df.height() {533df = df.slice(0, n_rows)534}535}536Ok(df)537}538}539540#[allow(clippy::too_many_arguments)]541pub fn read_chunk(542bytes: &[u8],543parse_options: &CsvParseOptions,544schema: &Schema,545ignore_errors: bool,546projection: &[usize],547bytes_offset_thread: usize,548capacity: usize,549null_values: Option<&NullValuesCompiled>,550chunk_size: usize,551stop_at_nbytes: usize,552starting_point_offset: Option<usize>,553) -> PolarsResult<DataFrame> {554let mut read = bytes_offset_thread;555// There's an off-by-one error somewhere in the reading code, where it reads556// one more item than the requested capacity. Given the batch sizes are557// approximate (sometimes they're smaller), this isn't broken, but it does558// mean a bunch of extra allocation and copying. So we allocate a559// larger-by-one buffer so the size is more likely to be accurate.560let mut buffers = init_builders(561projection,562capacity + 1,563schema,564parse_options.quote_char,565parse_options.encoding,566parse_options.decimal_comma,567)?;568569debug_assert!(projection.is_sorted());570571let mut last_read = usize::MAX;572loop {573if read >= stop_at_nbytes || read == last_read {574break;575}576let local_bytes = &bytes[read..stop_at_nbytes];577578last_read = read;579let offset = read + starting_point_offset.unwrap();580read += parse_lines(581local_bytes,582parse_options,583offset,584ignore_errors,585null_values,586projection,587&mut buffers,588chunk_size,589schema.len(),590schema,591)?;592}593594let columns = buffers595.into_iter()596.map(|buf| buf.into_series().map(Column::from))597.collect::<PolarsResult<Vec<_>>>()?;598Ok(unsafe { DataFrame::new_unchecked_infer_height(columns) })599}600601602