Path: blob/main/crates/polars-stream/src/nodes/io_sources/csv.rs
8431 views
use std::iter::Iterator;1use std::ops::Range;2use std::sync::Arc;34use async_trait::async_trait;5use polars_buffer::Buffer;6use polars_core::prelude::Field;7use polars_core::schema::{SchemaExt, SchemaRef};8use polars_error::{PolarsResult, polars_bail, polars_err, polars_warn};9use polars_io::cloud::CloudOptions;10use polars_io::csv::read::streaming::read_until_start_and_infer_schema;11use polars_io::prelude::_csv_read_internal::{12CountLines, NullValuesCompiled, cast_columns, prepare_csv_schema, read_chunk,13};14use polars_io::prelude::builder::validate_utf8;15use polars_io::prelude::{CsvEncoding, CsvParseOptions, CsvReadOptions};16use polars_io::utils::compression::CompressedReader;17use polars_io::utils::slice::SplitSlicePosition;18use polars_plan::dsl::ScanSource;19use polars_utils::IdxSize;20use polars_utils::mem::prefetch::prefetch_l2;21use polars_utils::slice_enum::Slice;2223use super::multi_scan::reader_interface::output::FileReaderOutputRecv;24use super::multi_scan::reader_interface::{BeginReadArgs, FileReader, FileReaderCallbacks};25use crate::DEFAULT_DISTRIBUTOR_BUFFER_SIZE;26use crate::async_executor::{AbortOnDropHandle, spawn};27use crate::async_primitives::distributor_channel::{self, distributor_channel};28use crate::morsel::SourceToken;29use crate::nodes::compute_node_prelude::*;30use crate::nodes::io_sources::multi_scan::reader_interface::Projection;31use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;32use crate::nodes::{MorselSeq, TaskPriority};3334pub mod builder {35use std::sync::Arc;3637use polars_core::config;38use polars_io::cloud::CloudOptions;39use polars_io::prelude::CsvReadOptions;40use polars_plan::dsl::ScanSource;4142use super::CsvFileReader;43use crate::nodes::io_sources::multi_scan::reader_interface::FileReader;44use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;45use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;4647impl FileReaderBuilder for Arc<CsvReadOptions> {48fn reader_name(&self) -> &str {49"csv"50}5152fn reader_capabilities(&self) -> ReaderCapabilities {53use ReaderCapabilities as RC;5455RC::NEEDS_FILE_CACHE_INIT56| if self.parse_options.comment_prefix.is_some() {57RC::empty()58} else {59RC::PRE_SLICE60}61}6263fn build_file_reader(64&self,65source: ScanSource,66cloud_options: Option<Arc<CloudOptions>>,67_scan_source_idx: usize,68) -> Box<dyn FileReader> {69let scan_source = source;70let verbose = config::verbose();71let options = self.clone();7273let reader = CsvFileReader {74scan_source,75cloud_options,76options,77verbose,78cached_bytes: None,79};8081Box::new(reader) as Box<dyn FileReader>82}83}84}8586/// Read all rows in the chunk87const NO_SLICE: (usize, usize) = (0, usize::MAX);88/// This is used if we finish the slice but still need a row count. It signals to the workers to89/// go into line-counting mode where they can skip parsing the chunks.90const SLICE_ENDED: (usize, usize) = (usize::MAX, 0);9192struct LineBatch {93// Safety: All receivers (LineBatchProcessors) hold a Buffer ref to this.94mem_slice: Buffer<u8>,95n_lines: usize,96slice: (usize, usize),97/// Position of this chunk relative to the start of the file according to CountLines.98row_offset: usize,99morsel_seq: MorselSeq,100}101102struct CsvFileReader {103scan_source: ScanSource,104#[expect(unused)] // Will be used when implementing cloud streaming.105cloud_options: Option<Arc<CloudOptions>>,106options: Arc<CsvReadOptions>,107// Cached on first access - we may be called multiple times e.g. on negative slice.108cached_bytes: Option<Buffer<u8>>,109verbose: bool,110}111112#[async_trait]113impl FileReader for CsvFileReader {114async fn initialize(&mut self) -> PolarsResult<()> {115let buffer = self116.scan_source117.as_scan_source_ref()118.to_buffer_async_assume_latest(self.scan_source.run_async())?;119120// Note: We do not decompress in `initialize()`.121self.cached_bytes = Some(buffer);122123Ok(())124}125126fn begin_read(127&mut self,128args: BeginReadArgs,129) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {130let verbose = self.verbose;131132let BeginReadArgs {133projection: Projection::Plain(projected_schema),134// Because we currently only support PRE_SLICE we don't need to handle row index here.135row_index,136pre_slice,137predicate: None,138cast_columns_policy: _,139num_pipelines,140disable_morsel_split: _,141callbacks:142FileReaderCallbacks {143file_schema_tx,144n_rows_in_file_tx,145row_position_on_end_tx,146},147} = args148else {149panic!("unsupported args: {:?}", &args)150};151152assert!(row_index.is_none()); // Handled outside the reader for now.153154match &pre_slice {155Some(Slice::Negative { .. }) => unimplemented!(),156157// We don't account for comments when slicing lines. We should never hit this panic -158// the FileReaderBuilder does not indicate PRE_SLICE support when we have a comment159// prefix.160Some(pre_slice)161if self.options.parse_options.comment_prefix.is_some() && pre_slice.len() > 0 =>162{163panic!("{pre_slice:?}")164},165166_ => {},167}168169let mut reader = CompressedReader::try_new(self.cached_bytes.clone().unwrap())?;170171let (inferred_schema, base_leftover) = read_until_start_and_infer_schema(172&self.options,173Some(projected_schema.clone()),174None,175&mut reader,176)?;177178let used_schema = Arc::new(inferred_schema);179180if let Some(tx) = file_schema_tx {181_ = tx.send(used_schema.clone())182}183184let projection: Vec<usize> = projected_schema185.iter_names()186.filter_map(|name| used_schema.index_of(name))187.collect();188189if verbose {190eprintln!(191"[CsvFileReader]: project: {} / {}, slice: {:?}",192projection.len(),193used_schema.len(),194&pre_slice,195)196}197198let quote_char = self.options.parse_options.quote_char;199let eol_char = self.options.parse_options.eol_char;200let comment_prefix = self.options.parse_options.comment_prefix.clone();201202let line_counter = CountLines::new(quote_char, eol_char, comment_prefix.clone());203204let chunk_reader = Arc::new(ChunkReader::try_new(205self.options.clone(),206used_schema.clone(),207projection,208)?);209210let needs_full_row_count = n_rows_in_file_tx.is_some();211212let (line_batch_tx, line_batch_receivers) =213distributor_channel(num_pipelines, *DEFAULT_DISTRIBUTOR_BUFFER_SIZE);214215let line_batch_source_handle = AbortOnDropHandle::new(spawn(216TaskPriority::Low,217LineBatchSource {218base_leftover,219reader,220line_counter,221line_batch_tx,222pre_slice,223needs_full_row_count,224verbose,225}226.run(),227));228229let n_workers = line_batch_receivers.len();230231let (morsel_senders, rx) = FileReaderOutputSend::new_parallel(num_pipelines);232233let line_batch_decode_handles = line_batch_receivers234.into_iter()235.zip(morsel_senders)236.enumerate()237.map(|(worker_idx, (mut line_batch_rx, mut morsel_tx))| {238// Only verbose log from the last worker to avoid flooding output.239let verbose = verbose && worker_idx == n_workers - 1;240let mut n_rows_processed: usize = 0;241let chunk_reader = chunk_reader.clone();242// Note: We don't use this (it is handled by the bridge). But morsels require a source token.243let source_token = SourceToken::new();244245AbortOnDropHandle::new(spawn(TaskPriority::Low, async move {246while let Ok(LineBatch {247mem_slice,248n_lines,249slice,250row_offset,251morsel_seq,252}) = line_batch_rx.recv().await253{254let (offset, len) = match slice {255SLICE_ENDED => (0, 1),256v => v,257};258259let (df, n_rows_in_chunk) = chunk_reader.read_chunk(260&mem_slice,261n_lines,262(offset, len),263row_offset,264)?;265266n_rows_processed = n_rows_processed.saturating_add(n_rows_in_chunk);267268if (offset, len) == SLICE_ENDED {269break;270}271272let morsel = Morsel::new(df, morsel_seq, source_token.clone());273274if morsel_tx.send_morsel(morsel).await.is_err() {275break;276}277}278279drop(morsel_tx);280281if needs_full_row_count {282if verbose {283eprintln!(284"[CSV LineBatchProcessor {worker_idx}]: entering row count mode"285);286}287288while let Ok(LineBatch {289mem_slice: _,290n_lines,291slice,292row_offset: _,293morsel_seq: _,294}) = line_batch_rx.recv().await295{296assert_eq!(slice, SLICE_ENDED);297298n_rows_processed = n_rows_processed.saturating_add(n_lines);299}300}301302PolarsResult::Ok(n_rows_processed)303}))304})305.collect::<Vec<_>>();306307Ok((308rx,309spawn(TaskPriority::Low, async move {310let mut row_position: usize = 0;311312for handle in line_batch_decode_handles {313let rows_processed = handle.await?;314row_position = row_position.saturating_add(rows_processed);315}316317row_position = {318let rows_skipped = line_batch_source_handle.await?;319row_position.saturating_add(rows_skipped)320};321322let row_position = IdxSize::try_from(row_position)323.map_err(|_| polars_err!(bigidx, ctx = "csv file", size = row_position))?;324325if let Some(n_rows_in_file_tx) = n_rows_in_file_tx {326assert!(needs_full_row_count);327_ = n_rows_in_file_tx.send(row_position);328}329330if let Some(row_position_on_end_tx) = row_position_on_end_tx {331_ = row_position_on_end_tx.send(row_position);332}333334Ok(())335}),336))337}338}339340struct LineBatchSource {341base_leftover: Buffer<u8>,342reader: CompressedReader,343line_counter: CountLines,344line_batch_tx: distributor_channel::Sender<LineBatch>,345pre_slice: Option<Slice>,346needs_full_row_count: bool,347verbose: bool,348}349350impl LineBatchSource {351/// Returns the number of rows skipped from the start of the file according to CountLines.352async fn run(self) -> PolarsResult<usize> {353let LineBatchSource {354base_leftover,355mut reader,356line_counter,357mut line_batch_tx,358pre_slice,359needs_full_row_count,360verbose,361} = self;362363let global_slice = if let Some(pre_slice) = pre_slice {364match pre_slice {365Slice::Positive { .. } => Some(Range::<usize>::from(pre_slice)),366// IR lowering puts negative slice in separate node.367// TODO: Native line buffering for negative slice368Slice::Negative { .. } => unreachable!(),369}370} else {371None372};373374if verbose {375eprintln!("[CsvSource]: Start line splitting",);376}377378let mut prev_leftover = base_leftover;379let mut row_offset = 0usize;380let mut morsel_seq = MorselSeq::default();381let mut n_rows_skipped: usize = 0;382let mut read_size = CompressedReader::initial_read_size();383384loop {385let (mem_slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;386if mem_slice.is_empty() {387break;388}389390prefetch_l2(&mem_slice);391392let is_eof = bytes_read == 0;393let (n_lines, unconsumed_offset) = line_counter.count_rows(&mem_slice, is_eof);394395let batch_slice = mem_slice.clone().sliced(0..unconsumed_offset);396prev_leftover = mem_slice.sliced(unconsumed_offset..);397398if batch_slice.is_empty() && !is_eof {399// This allows the slice to grow until at least a single row is included. To avoid a quadratic run-time for large row sizes, we double the read size.400read_size = read_size.saturating_mul(2);401continue;402}403404// Has to happen here before slicing, since there are slice operations that skip morsel405// sending.406let prev_row_offset = row_offset;407row_offset += n_lines;408409let slice = if let Some(global_slice) = &global_slice {410match SplitSlicePosition::split_slice_at_file(411prev_row_offset,412n_lines,413global_slice.clone(),414) {415// Note that we don't check that the skipped line batches actually contain this many416// lines.417SplitSlicePosition::Before => {418n_rows_skipped = n_rows_skipped.saturating_add(n_lines);419continue;420},421SplitSlicePosition::Overlapping(offset, len) => (offset, len),422SplitSlicePosition::After => {423if needs_full_row_count {424// If we need to know the unrestricted row count, we need425// to go until the end.426SLICE_ENDED427} else {428break;429}430},431}432} else {433NO_SLICE434};435436morsel_seq = morsel_seq.successor();437438let batch = LineBatch {439mem_slice: batch_slice,440n_lines,441slice,442row_offset,443morsel_seq,444};445446if line_batch_tx.send(batch).await.is_err() {447break;448}449450if is_eof {451break;452}453454if read_size < CompressedReader::ideal_read_size() {455read_size *= 4;456}457}458459Ok(n_rows_skipped)460}461}462463#[derive(Default)]464struct ChunkReader {465reader_schema: SchemaRef,466parse_options: Arc<CsvParseOptions>,467fields_to_cast: Vec<Field>,468ignore_errors: bool,469projection: Vec<usize>,470null_values: Option<NullValuesCompiled>,471validate_utf8: bool,472}473474impl ChunkReader {475fn try_new(476options: Arc<CsvReadOptions>,477mut reader_schema: SchemaRef,478projection: Vec<usize>,479) -> PolarsResult<Self> {480let mut fields_to_cast: Vec<Field> = options.fields_to_cast.clone();481prepare_csv_schema(&mut reader_schema, &mut fields_to_cast)?;482483let parse_options = options.parse_options.clone();484485// Logic from `CoreReader::new()`486487let null_values = parse_options488.null_values489.clone()490.map(|nv| nv.compile(&reader_schema))491.transpose()?;492493let validate_utf8 = matches!(parse_options.encoding, CsvEncoding::Utf8)494&& reader_schema.iter_fields().any(|f| f.dtype().is_string());495496Ok(Self {497reader_schema,498parse_options,499fields_to_cast,500ignore_errors: options.ignore_errors,501projection,502null_values,503validate_utf8,504})505}506507/// The 2nd return value indicates how many rows exist in the chunk.508fn read_chunk(509&self,510chunk: &[u8],511// Number of lines according to CountLines512n_lines: usize,513slice: (usize, usize),514chunk_row_offset: usize,515) -> PolarsResult<(DataFrame, usize)> {516if self.validate_utf8 && !validate_utf8(chunk) {517polars_bail!(ComputeError: "invalid utf-8 sequence")518}519520// If projection is empty create a DataFrame with the correct height by counting the lines.521let mut df = if self.projection.is_empty() {522DataFrame::empty_with_height(n_lines)523} else {524read_chunk(525chunk,526&self.parse_options,527&self.reader_schema,528self.ignore_errors,529&self.projection,5300, // bytes_offset_thread531n_lines, // capacity532self.null_values.as_ref(),533usize::MAX, // chunk_size534chunk.len(), // stop_at_nbytes535Some(0), // starting_point_offset536)?537};538539let height = df.height();540541if height != n_lines {542// Note: in case data is malformed, height is more likely to be correct than n_lines.543let msg = format!(544"CSV malformed: expected {} rows, actual {} rows, in chunk starting at row_offset {}, length {}",545n_lines,546height,547chunk_row_offset,548chunk.len()549);550if self.ignore_errors {551polars_warn!("{msg}");552} else {553polars_bail!(ComputeError: msg)554}555}556557if slice != NO_SLICE {558assert!(slice != SLICE_ENDED);559560df = df.slice(i64::try_from(slice.0).unwrap(), slice.1);561}562563cast_columns(&mut df, &self.fields_to_cast, false, self.ignore_errors)?;564565Ok((df, height))566}567}568569570