Path: blob/main/crates/polars-stream/src/nodes/io_sources/csv.rs
6939 views
use std::ops::Range;1use std::sync::Arc;23use async_trait::async_trait;4use polars_core::prelude::{Column, Field};5use polars_core::schema::{SchemaExt, SchemaRef};6use polars_error::{PolarsResult, polars_bail, polars_err, polars_warn};7use polars_io::RowIndex;8use polars_io::cloud::CloudOptions;9use polars_io::prelude::_csv_read_internal::{10CountLines, NullValuesCompiled, cast_columns, find_starting_point, prepare_csv_schema,11read_chunk,12};13use polars_io::prelude::buffer::validate_utf8;14use polars_io::prelude::{15CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, count_rows_from_slice,16};17use polars_io::utils::compression::maybe_decompress_bytes;18use polars_io::utils::slice::SplitSlicePosition;19use polars_plan::dsl::ScanSource;20use polars_utils::IdxSize;21use polars_utils::mmap::MemSlice;22use polars_utils::slice_enum::Slice;2324use super::multi_scan::reader_interface::output::FileReaderOutputRecv;25use super::multi_scan::reader_interface::{BeginReadArgs, FileReader, FileReaderCallbacks};26use crate::DEFAULT_DISTRIBUTOR_BUFFER_SIZE;27use crate::async_executor::{AbortOnDropHandle, spawn};28use crate::async_primitives::distributor_channel::{self, distributor_channel};29use crate::morsel::SourceToken;30use crate::nodes::compute_node_prelude::*;31use crate::nodes::io_sources::multi_scan::reader_interface::Projection;32use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;33use crate::nodes::{MorselSeq, TaskPriority};3435pub mod builder {36use std::sync::Arc;3738use polars_core::config;39use polars_io::cloud::CloudOptions;40use polars_io::prelude::CsvReadOptions;41use polars_plan::dsl::ScanSource;4243use super::CsvFileReader;44use crate::nodes::io_sources::multi_scan::reader_interface::FileReader;45use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;46use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;4748impl FileReaderBuilder for Arc<CsvReadOptions> {49fn reader_name(&self) -> &str {50"csv"51}5253fn reader_capabilities(&self) -> ReaderCapabilities {54use ReaderCapabilities as RC;5556RC::NEEDS_FILE_CACHE_INIT57| if self.parse_options.comment_prefix.is_some() {58RC::empty()59} else {60RC::PRE_SLICE61}62}6364fn build_file_reader(65&self,66source: ScanSource,67cloud_options: Option<Arc<CloudOptions>>,68_scan_source_idx: usize,69) -> Box<dyn FileReader> {70let scan_source = source;71let verbose = config::verbose();72let options = self.clone();7374let reader = CsvFileReader {75scan_source,76cloud_options,77options,78verbose,79cached_bytes: None,80};8182Box::new(reader) as Box<dyn FileReader>83}84}85}8687/// Read all rows in the chunk88const NO_SLICE: (usize, usize) = (0, usize::MAX);89/// This is used if we finish the slice but still need a row count. It signals to the workers to90/// go into line-counting mode where they can skip parsing the chunks.91const SLICE_ENDED: (usize, usize) = (usize::MAX, 0);9293struct LineBatch {94// Safety: All receivers (LineBatchProcessors) hold a MemSlice ref to this.95bytes: &'static [u8],96n_lines: usize,97slice: (usize, usize),98/// Position of this chunk relative to the start of the file according to CountLines.99row_offset: usize,100morsel_seq: MorselSeq,101}102103struct CsvFileReader {104scan_source: ScanSource,105#[expect(unused)] // Will be used when implementing cloud streaming.106cloud_options: Option<Arc<CloudOptions>>,107options: Arc<CsvReadOptions>,108// Cached on first access - we may be called multiple times e.g. on negative slice.109cached_bytes: Option<MemSlice>,110verbose: bool,111}112113#[async_trait]114impl FileReader for CsvFileReader {115async fn initialize(&mut self) -> PolarsResult<()> {116let memslice = self117.scan_source118.as_scan_source_ref()119.to_memslice_async_assume_latest(self.scan_source.run_async())?;120121// Note: We do not decompress in `initialize()`.122self.cached_bytes = Some(memslice);123124Ok(())125}126127fn begin_read(128&mut self,129args: BeginReadArgs,130) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {131let verbose = self.verbose;132133let memslice = self.get_bytes_maybe_decompress()?;134135let BeginReadArgs {136projection: Projection::Plain(projected_schema),137// Because we currently only support PRE_SLICE we don't need to handle row index here.138row_index,139pre_slice,140predicate: None,141cast_columns_policy: _,142num_pipelines,143callbacks:144FileReaderCallbacks {145file_schema_tx,146n_rows_in_file_tx,147row_position_on_end_tx,148},149} = args150else {151panic!("unsupported args: {:?}", &args)152};153154match &pre_slice {155Some(Slice::Negative { .. }) => unimplemented!(),156157// We don't account for comments when slicing lines. We should never hit this panic -158// the FileReaderBuilder does not indicate PRE_SLICE support when we have a comment159// prefix.160Some(pre_slice)161if self.options.parse_options.comment_prefix.is_some() && pre_slice.len() > 0 =>162{163panic!("{pre_slice:?}")164},165166_ => {},167}168169// We need to infer the schema to get the columns of this file.170let infer_schema_length = if self.options.has_header {171Some(1)172} else {173// If there is no header the line length may increase later in the174// file (https://github.com/pola-rs/polars/pull/21979).175self.options.infer_schema_length176};177178let (mut inferred_schema, ..) = polars_io::csv::read::infer_file_schema(179&polars_io::mmap::ReaderBytes::Owned(memslice.clone()),180&self.options.parse_options,181infer_schema_length,182self.options.has_header,183self.options.schema_overwrite.as_deref(),184self.options.skip_rows,185self.options.skip_lines,186self.options.skip_rows_after_header,187self.options.raise_if_empty,188)?;189190if let Some(schema) = &self.options.schema {191// Note: User can provide schema with more columns, they will simply192// be projected as NULL.193// TODO: Should maybe expose a missing_columns parameter to the API for this.194if schema.len() < inferred_schema.len()195&& !self.options.parse_options.truncate_ragged_lines196{197polars_bail!(198SchemaMismatch:199"provided schema does not match number of columns in file ({} != {} in file)",200schema.len(),201inferred_schema.len(),202);203}204205if self.options.parse_options.truncate_ragged_lines {206inferred_schema = Arc::unwrap_or_clone(schema.clone());207} else {208inferred_schema = schema209.iter_names()210.zip(inferred_schema.into_iter().map(|(_, dtype)| dtype))211.map(|(name, dtype)| (name.clone(), dtype))212.collect();213}214}215216if let Some(dtypes) = self.options.dtype_overwrite.as_deref() {217for (i, dtype) in dtypes.iter().enumerate() {218inferred_schema.set_dtype_at_index(i, dtype.clone());219}220}221222// TODO223// We currently always override with the projected dtype, but this may cause224// issues e.g. with temporal types. This can be improved to better choose225// between the 2 dtypes.226for (name, inferred_dtype) in inferred_schema.iter_mut() {227if let Some(projected_dtype) = projected_schema.get(name) {228*inferred_dtype = projected_dtype.clone();229}230}231232let inferred_schema = Arc::new(inferred_schema);233234if let Some(mut tx) = file_schema_tx {235_ = tx.try_send(inferred_schema.clone())236}237238let projection: Vec<usize> = projected_schema239.iter_names()240.filter_map(|name| inferred_schema.index_of(name))241.collect();242243if verbose {244eprintln!(245"[CsvFileReader]: project: {} / {}, slice: {:?}, row_index: {:?}",246projection.len(),247inferred_schema.len(),248&pre_slice,249row_index,250)251}252253// Only used on empty projection, or if we need the exact row count.254let alt_count_lines: Option<Arc<CountLinesWithComments>> =255CountLinesWithComments::opt_new(&self.options.parse_options).map(Arc::new);256let chunk_reader = Arc::new(ChunkReader::try_new(257self.options.clone(),258inferred_schema.clone(),259projection,260row_index,261alt_count_lines.clone(),262)?);263264let needs_full_row_count = n_rows_in_file_tx.is_some();265266let (line_batch_tx, line_batch_receivers) =267distributor_channel(num_pipelines, *DEFAULT_DISTRIBUTOR_BUFFER_SIZE);268269let line_batch_source_handle = AbortOnDropHandle::new(spawn(270TaskPriority::Low,271LineBatchSource {272memslice: memslice.clone(),273line_counter: CountLines::new(274self.options.parse_options.quote_char,275self.options.parse_options.eol_char,276),277line_batch_tx,278options: self.options.clone(),279file_schema_len: inferred_schema.len(),280pre_slice,281needs_full_row_count,282num_pipelines,283verbose,284}285.run(),286));287288let n_workers = line_batch_receivers.len();289290let (morsel_senders, rx) = FileReaderOutputSend::new_parallel(num_pipelines);291292let line_batch_decode_handles = line_batch_receivers293.into_iter()294.zip(morsel_senders)295.enumerate()296.map(|(worker_idx, (mut line_batch_rx, mut morsel_tx))| {297// Hold a ref as we are receiving `&'static [u8]`s pointing to this.298let global_memslice = memslice.clone();299// Only verbose log from the last worker to avoid flooding output.300let verbose = verbose && worker_idx == n_workers - 1;301let mut n_rows_processed: usize = 0;302let chunk_reader = chunk_reader.clone();303// Note: We don't use this (it is handled by the bridge). But morsels require a source token.304let source_token = SourceToken::new();305let alt_count_lines = alt_count_lines.clone();306307AbortOnDropHandle::new(spawn(TaskPriority::Low, async move {308while let Ok(LineBatch {309bytes,310n_lines,311slice,312row_offset,313morsel_seq,314}) = line_batch_rx.recv().await315{316debug_assert!(bytes.as_ptr() as usize >= global_memslice.as_ptr() as usize);317debug_assert!(318bytes.as_ptr() as usize + bytes.len()319<= global_memslice.as_ptr() as usize + global_memslice.len()320);321322let (offset, len) = match slice {323SLICE_ENDED => (0, 1),324v => v,325};326327let (df, n_rows_in_chunk) =328chunk_reader.read_chunk(bytes, n_lines, (offset, len), row_offset)?;329330n_rows_processed = n_rows_processed.saturating_add(n_rows_in_chunk);331332if (offset, len) == SLICE_ENDED {333break;334}335336let morsel = Morsel::new(df, morsel_seq, source_token.clone());337338if morsel_tx.send_morsel(morsel).await.is_err() {339break;340}341}342343drop(morsel_tx);344345if needs_full_row_count {346if verbose {347eprintln!(348"[CSV LineBatchProcessor {worker_idx}]: entering row count mode"349);350}351352while let Ok(LineBatch {353bytes,354n_lines,355slice,356row_offset: _,357morsel_seq: _,358}) = line_batch_rx.recv().await359{360assert_eq!(slice, SLICE_ENDED);361362let n_lines = if let Some(v) = alt_count_lines.as_deref() {363v.count_lines(bytes)?364} else {365n_lines366};367368n_rows_processed = n_rows_processed.saturating_add(n_lines);369}370}371372PolarsResult::Ok(n_rows_processed)373}))374})375.collect::<Vec<_>>();376377Ok((378rx,379spawn(TaskPriority::Low, async move {380let mut row_position: usize = 0;381382for handle in line_batch_decode_handles {383let rows_processed = handle.await?;384row_position = row_position.saturating_add(rows_processed);385}386387row_position = {388let rows_skipped = line_batch_source_handle.await?;389row_position.saturating_add(rows_skipped)390};391392let row_position = IdxSize::try_from(row_position)393.map_err(|_| polars_err!(bigidx, ctx = "csv file", size = row_position))?;394395if let Some(mut n_rows_in_file_tx) = n_rows_in_file_tx {396assert!(needs_full_row_count);397_ = n_rows_in_file_tx.try_send(row_position);398}399400if let Some(mut row_position_on_end_tx) = row_position_on_end_tx {401_ = row_position_on_end_tx.try_send(row_position);402}403404Ok(())405}),406))407}408}409410impl CsvFileReader {411/// # Panics412/// Panics if `self.cached_bytes` is None.413fn get_bytes_maybe_decompress(&mut self) -> PolarsResult<MemSlice> {414let mut out = vec![];415maybe_decompress_bytes(self.cached_bytes.as_deref().unwrap(), &mut out)?;416417if !out.is_empty() {418self.cached_bytes = Some(MemSlice::from_vec(out));419}420421Ok(self.cached_bytes.clone().unwrap())422}423}424425struct LineBatchSource {426memslice: MemSlice,427line_counter: CountLines,428line_batch_tx: distributor_channel::Sender<LineBatch>,429options: Arc<CsvReadOptions>,430file_schema_len: usize,431pre_slice: Option<Slice>,432needs_full_row_count: bool,433num_pipelines: usize,434verbose: bool,435}436437impl LineBatchSource {438/// Returns the number of rows skipped from the start of the file according to CountLines.439async fn run(self) -> PolarsResult<usize> {440let LineBatchSource {441memslice,442line_counter,443mut line_batch_tx,444options,445file_schema_len,446pre_slice,447needs_full_row_count,448num_pipelines,449verbose,450} = self;451452let mut n_rows_skipped: usize = 0;453454let global_slice = if let Some(pre_slice) = pre_slice {455match pre_slice {456Slice::Positive { .. } => Some(Range::<usize>::from(pre_slice)),457// IR lowering puts negative slice in separate node.458// TODO: Native line buffering for negative slice459Slice::Negative { .. } => unreachable!(),460}461} else {462None463};464465let morsel_seq_ref = &mut MorselSeq::default();466let current_row_offset_ref = &mut 0usize;467468if verbose {469eprintln!("[CsvSource]: Start line splitting",);470}471472let global_bytes: &[u8] = memslice.as_ref();473let global_bytes: &'static [u8] = unsafe { std::mem::transmute(global_bytes) };474475let i = {476let parse_options = options.parse_options.as_ref();477478let quote_char = parse_options.quote_char;479let eol_char = parse_options.eol_char;480481let skip_lines = options.skip_lines;482let skip_rows_before_header = options.skip_rows;483let skip_rows_after_header = options.skip_rows_after_header;484let comment_prefix = parse_options.comment_prefix.clone();485let has_header = options.has_header;486487find_starting_point(488global_bytes,489quote_char,490eol_char,491file_schema_len,492skip_lines,493skip_rows_before_header,494skip_rows_after_header,495comment_prefix.as_ref(),496has_header,497)?498};499500let mut bytes = &global_bytes[i..];501502let mut chunk_size = {503let max_chunk_size = 16 * 1024 * 1024;504let chunk_size = if global_slice.is_some() {505max_chunk_size506} else {507std::cmp::min(bytes.len() / (16 * num_pipelines), max_chunk_size)508};509510// Use a small min chunk size to catch failures in tests.511#[cfg(debug_assertions)]512let min_chunk_size = 64;513#[cfg(not(debug_assertions))]514let min_chunk_size = 1024 * 4;515std::cmp::max(chunk_size, min_chunk_size)516};517518loop {519if bytes.is_empty() {520break;521}522523let (count, position) = line_counter.find_next(bytes, &mut chunk_size);524let (count, position) = if count == 0 {525(1, bytes.len())526} else {527let pos = (position + 1).min(bytes.len()); // +1 for '\n'528(count, pos)529};530531let slice_start = bytes.as_ptr() as usize - global_bytes.as_ptr() as usize;532533bytes = &bytes[position..];534535let current_row_offset = *current_row_offset_ref;536*current_row_offset_ref += count;537538let slice = if let Some(global_slice) = &global_slice {539match SplitSlicePosition::split_slice_at_file(540current_row_offset,541count,542global_slice.clone(),543) {544// Note that we don't check that the skipped line batches actually contain this many545// lines.546SplitSlicePosition::Before => {547n_rows_skipped = n_rows_skipped.saturating_add(count);548continue;549},550SplitSlicePosition::Overlapping(offset, len) => (offset, len),551SplitSlicePosition::After => {552if needs_full_row_count {553// If we need to know the unrestricted row count, we need554// to go until the end.555SLICE_ENDED556} else {557break;558}559},560}561} else {562NO_SLICE563};564565let bytes_this_chunk = &global_bytes[slice_start..slice_start + position];566567let morsel_seq = *morsel_seq_ref;568*morsel_seq_ref = morsel_seq.successor();569570let batch = LineBatch {571bytes: bytes_this_chunk,572n_lines: count,573slice,574row_offset: current_row_offset,575morsel_seq,576};577578if line_batch_tx.send(batch).await.is_err() {579break;580}581}582583Ok(n_rows_skipped)584}585}586587#[derive(Default)]588struct ChunkReader {589reader_schema: SchemaRef,590parse_options: Arc<CsvParseOptions>,591fields_to_cast: Vec<Field>,592ignore_errors: bool,593projection: Vec<usize>,594null_values: Option<NullValuesCompiled>,595validate_utf8: bool,596row_index: Option<RowIndex>,597// Alternate line counter when there are comments. This is used on empty projection.598alt_count_lines: Option<Arc<CountLinesWithComments>>,599}600601impl ChunkReader {602fn try_new(603options: Arc<CsvReadOptions>,604mut reader_schema: SchemaRef,605projection: Vec<usize>,606row_index: Option<RowIndex>,607alt_count_lines: Option<Arc<CountLinesWithComments>>,608) -> PolarsResult<Self> {609let mut fields_to_cast: Vec<Field> = options.fields_to_cast.clone();610prepare_csv_schema(&mut reader_schema, &mut fields_to_cast)?;611612let parse_options = options.parse_options.clone();613614// Logic from `CoreReader::new()`615616let null_values = parse_options617.null_values618.clone()619.map(|nv| nv.compile(&reader_schema))620.transpose()?;621622let validate_utf8 = matches!(parse_options.encoding, CsvEncoding::Utf8)623&& reader_schema.iter_fields().any(|f| f.dtype().is_string());624625Ok(Self {626reader_schema,627parse_options,628fields_to_cast,629ignore_errors: options.ignore_errors,630projection,631null_values,632validate_utf8,633row_index,634alt_count_lines,635})636}637638/// The 2nd return value indicates how many rows exist in the chunk.639fn read_chunk(640&self,641chunk: &[u8],642// Number of lines according to CountLines643n_lines: usize,644slice: (usize, usize),645chunk_row_offset: usize,646) -> PolarsResult<(DataFrame, usize)> {647if self.validate_utf8 && !validate_utf8(chunk) {648polars_bail!(ComputeError: "invalid utf-8 sequence")649}650651// If projection is empty create a DataFrame with the correct height by counting the lines.652let mut df = if self.projection.is_empty() {653let h = if let Some(v) = &self.alt_count_lines {654v.count_lines(chunk)?655} else {656n_lines657};658659DataFrame::empty_with_height(h)660} else {661read_chunk(662chunk,663&self.parse_options,664&self.reader_schema,665self.ignore_errors,666&self.projection,6670, // bytes_offset_thread668n_lines, // capacity669self.null_values.as_ref(),670usize::MAX, // chunk_size671chunk.len(), // stop_at_nbytes672Some(0), // starting_point_offset673)?674};675676let height = df.height();677let n_lines_is_correct = df.height() == n_lines;678679// Check malformed680if df.height() > n_lines681|| (df.height() < n_lines && self.parse_options.comment_prefix.is_none())682{683// Note: in case data is malformed, df.height() is more likely to be correct than n_lines.684let msg = format!(685"CSV malformed: expected {} rows, actual {} rows, in chunk starting at row_offset {}, length {}",686n_lines,687df.height(),688chunk_row_offset,689chunk.len()690);691if self.ignore_errors {692polars_warn!(msg);693} else {694polars_bail!(ComputeError: msg);695}696}697698if slice != NO_SLICE {699assert!(slice != SLICE_ENDED);700assert!(n_lines_is_correct || slice.1 == 0);701702df = df.slice(i64::try_from(slice.0).unwrap(), slice.1);703}704705cast_columns(&mut df, &self.fields_to_cast, false, self.ignore_errors)?;706707if let Some(ri) = &self.row_index {708assert!(n_lines_is_correct);709710unsafe {711df.with_column_unchecked(Column::new_row_index(712ri.name.clone(),713ri.offset714.saturating_add(chunk_row_offset.try_into().unwrap_or(IdxSize::MAX)),715df.height(),716)?);717}718}719720Ok((df, height))721}722}723724struct CountLinesWithComments {725quote_char: Option<u8>,726eol_char: u8,727comment_prefix: CommentPrefix,728}729730impl CountLinesWithComments {731fn opt_new(parse_options: &CsvParseOptions) -> Option<Self> {732parse_options733.comment_prefix734.clone()735.map(|comment_prefix| CountLinesWithComments {736quote_char: parse_options.quote_char,737eol_char: parse_options.eol_char,738comment_prefix,739})740}741742fn count_lines(&self, bytes: &[u8]) -> PolarsResult<usize> {743count_rows_from_slice(744bytes,745self.quote_char,746Some(&self.comment_prefix),747self.eol_char,748false, // has_header749)750}751}752753754