Path: blob/main/crates/polars-io/src/csv/read/read_impl/batched.rs
6939 views
use std::collections::VecDeque;1use std::ops::Deref;23use polars_core::POOL;4use polars_core::datatypes::Field;5use polars_core::frame::DataFrame;6use polars_core::schema::SchemaRef;7use polars_error::PolarsResult;8use polars_utils::IdxSize;9use rayon::iter::{IntoParallelIterator, ParallelIterator};1011use super::{CoreReader, CountLines, cast_columns, read_chunk};12use crate::RowIndex;13use crate::csv::read::CsvReader;14use crate::csv::read::options::NullValuesCompiled;15use crate::mmap::{MmapBytesReader, ReaderBytes};16use crate::prelude::{CsvParseOptions, update_row_counts2};1718#[allow(clippy::too_many_arguments)]19pub(crate) fn get_file_chunks_iterator(20offsets: &mut VecDeque<(usize, usize)>,21last_pos: &mut usize,22n_chunks: usize,23chunk_size: &mut usize,24bytes: &[u8],25quote_char: Option<u8>,26eol_char: u8,27) {28let cl = CountLines::new(quote_char, eol_char);2930for _ in 0..n_chunks {31let bytes = &bytes[*last_pos..];3233if bytes.is_empty() {34break;35}3637let position;3839loop {40let b = &bytes[..(*chunk_size).min(bytes.len())];41let (count, position_) = cl.count(b);4243let (count, position_) = if b.len() == bytes.len() {44(if count != 0 { count } else { 1 }, b.len())45} else {46(47count,48if position_ < b.len() {49// 1+ for the '\n'501 + position_51} else {52position_53},54)55};5657if count == 0 {58*chunk_size *= 2;59continue;60}6162position = position_;63break;64}6566offsets.push_back((*last_pos, *last_pos + position));67*last_pos += position;68}69}7071struct ChunkOffsetIter<'a> {72bytes: &'a [u8],73offsets: VecDeque<(usize, usize)>,74last_offset: usize,75n_chunks: usize,76chunk_size: usize,77// not a promise, but something we want78#[allow(unused)]79rows_per_batch: usize,80quote_char: Option<u8>,81eol_char: u8,82}8384impl Iterator for ChunkOffsetIter<'_> {85type Item = (usize, usize);8687fn next(&mut self) -> Option<Self::Item> {88match self.offsets.pop_front() {89Some(offsets) => Some(offsets),90None => {91if self.last_offset == self.bytes.len() {92return None;93}94get_file_chunks_iterator(95&mut self.offsets,96&mut self.last_offset,97self.n_chunks,98&mut self.chunk_size,99self.bytes,100self.quote_char,101self.eol_char,102);103match self.offsets.pop_front() {104Some(offsets) => Some(offsets),105// We depleted the iterator. Ensure we deplete the slice as well106None => {107let out = Some((self.last_offset, self.bytes.len()));108self.last_offset = self.bytes.len();109out110},111}112},113}114}115}116117impl<'a> CoreReader<'a> {118/// Create a batched csv reader that uses mmap to load data.119pub fn batched(mut self) -> PolarsResult<BatchedCsvReader<'a>> {120let reader_bytes = self.reader_bytes.take().unwrap();121let bytes = reader_bytes.as_ref();122let (bytes, starting_point_offset) = self.find_starting_point(123bytes,124self.parse_options.quote_char,125self.parse_options.eol_char,126)?;127128let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());129130// Copied from [`Self::parse_csv`]131let n_parts_hint = n_threads * 16;132let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);133134// Use a small min chunk size to catch failures in tests.135#[cfg(debug_assertions)]136let min_chunk_size = 64;137#[cfg(not(debug_assertions))]138let min_chunk_size = 1024 * 4;139140let chunk_size = std::cmp::max(chunk_size, min_chunk_size);141142// this is arbitrarily chosen.143// we don't want this to depend on the thread pool size144// otherwise the chunks are not deterministic145let offset_batch_size = 16;146// extend lifetime. It is bound to `readerbytes` and we keep track of that147// lifetime so this is sound.148let bytes = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes) };149let file_chunks = ChunkOffsetIter {150bytes,151offsets: VecDeque::with_capacity(offset_batch_size),152last_offset: 0,153n_chunks: offset_batch_size,154chunk_size,155rows_per_batch: self.chunk_size,156quote_char: self.parse_options.quote_char,157eol_char: self.parse_options.eol_char,158};159160let projection = self.get_projection()?;161162Ok(BatchedCsvReader {163reader_bytes,164parse_options: self.parse_options,165chunk_size: self.chunk_size,166file_chunks_iter: file_chunks,167file_chunks: vec![],168projection,169starting_point_offset,170row_index: self.row_index,171null_values: self.null_values,172to_cast: self.to_cast,173ignore_errors: self.ignore_errors,174remaining: self.n_rows.unwrap_or(usize::MAX),175schema: self.schema,176rows_read: 0,177})178}179}180181pub struct BatchedCsvReader<'a> {182reader_bytes: ReaderBytes<'a>,183parse_options: CsvParseOptions,184chunk_size: usize,185file_chunks_iter: ChunkOffsetIter<'a>,186file_chunks: Vec<(usize, usize)>,187projection: Vec<usize>,188starting_point_offset: Option<usize>,189row_index: Option<RowIndex>,190null_values: Option<NullValuesCompiled>,191to_cast: Vec<Field>,192ignore_errors: bool,193remaining: usize,194schema: SchemaRef,195rows_read: IdxSize,196}197198impl BatchedCsvReader<'_> {199pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {200if n == 0 || self.remaining == 0 {201return Ok(None);202}203204// get next `n` offset positions.205let file_chunks_iter = (&mut self.file_chunks_iter).take(n);206self.file_chunks.extend(file_chunks_iter);207// depleted the offsets iterator, we are done as well.208if self.file_chunks.is_empty() {209return Ok(None);210}211let chunks = &self.file_chunks;212213let mut bytes = self.reader_bytes.deref();214if let Some(pos) = self.starting_point_offset {215bytes = &bytes[pos..];216}217218let mut chunks = POOL.install(|| {219chunks220.into_par_iter()221.copied()222.map(|(bytes_offset_thread, stop_at_nbytes)| {223let mut df = read_chunk(224bytes,225&self.parse_options,226self.schema.as_ref(),227self.ignore_errors,228&self.projection,229bytes_offset_thread,230self.chunk_size,231self.null_values.as_ref(),232usize::MAX,233stop_at_nbytes,234self.starting_point_offset,235)?;236237cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;238239if let Some(rc) = &self.row_index {240unsafe { df.with_row_index_mut(rc.name.clone(), Some(rc.offset)) };241}242Ok(df)243})244.collect::<PolarsResult<Vec<_>>>()245})?;246self.file_chunks.clear();247248if self.row_index.is_some() {249update_row_counts2(&mut chunks, self.rows_read)250}251for df in &mut chunks {252let h = df.height();253254if self.remaining < h {255*df = df.slice(0, self.remaining)256};257self.remaining = self.remaining.saturating_sub(h);258259self.rows_read += h as IdxSize;260}261Ok(Some(chunks))262}263}264265pub struct OwnedBatchedCsvReader {266#[allow(dead_code)]267// this exist because we need to keep ownership268schema: SchemaRef,269batched_reader: BatchedCsvReader<'static>,270// keep ownership271_reader: CsvReader<Box<dyn MmapBytesReader>>,272}273274impl OwnedBatchedCsvReader {275pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {276self.batched_reader.next_batches(n)277}278}279280pub fn to_batched_owned(281mut reader: CsvReader<Box<dyn MmapBytesReader>>,282) -> PolarsResult<OwnedBatchedCsvReader> {283let batched_reader = reader.batched_borrowed()?;284let schema = batched_reader.schema.clone();285// If you put a drop(reader) here, rust will complain that reader is borrowed,286// so we presumably have to keep ownership of it to maintain the safety of the287// 'static transmute.288let batched_reader: BatchedCsvReader<'static> = unsafe { std::mem::transmute(batched_reader) };289290Ok(OwnedBatchedCsvReader {291schema,292batched_reader,293_reader: reader,294})295}296297298