Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/csv/read/read_impl/batched.rs
6939 views
1
use std::collections::VecDeque;
2
use std::ops::Deref;
3
4
use polars_core::POOL;
5
use polars_core::datatypes::Field;
6
use polars_core::frame::DataFrame;
7
use polars_core::schema::SchemaRef;
8
use polars_error::PolarsResult;
9
use polars_utils::IdxSize;
10
use rayon::iter::{IntoParallelIterator, ParallelIterator};
11
12
use super::{CoreReader, CountLines, cast_columns, read_chunk};
13
use crate::RowIndex;
14
use crate::csv::read::CsvReader;
15
use crate::csv::read::options::NullValuesCompiled;
16
use crate::mmap::{MmapBytesReader, ReaderBytes};
17
use crate::prelude::{CsvParseOptions, update_row_counts2};
18
19
#[allow(clippy::too_many_arguments)]
20
pub(crate) fn get_file_chunks_iterator(
21
offsets: &mut VecDeque<(usize, usize)>,
22
last_pos: &mut usize,
23
n_chunks: usize,
24
chunk_size: &mut usize,
25
bytes: &[u8],
26
quote_char: Option<u8>,
27
eol_char: u8,
28
) {
29
let cl = CountLines::new(quote_char, eol_char);
30
31
for _ in 0..n_chunks {
32
let bytes = &bytes[*last_pos..];
33
34
if bytes.is_empty() {
35
break;
36
}
37
38
let position;
39
40
loop {
41
let b = &bytes[..(*chunk_size).min(bytes.len())];
42
let (count, position_) = cl.count(b);
43
44
let (count, position_) = if b.len() == bytes.len() {
45
(if count != 0 { count } else { 1 }, b.len())
46
} else {
47
(
48
count,
49
if position_ < b.len() {
50
// 1+ for the '\n'
51
1 + position_
52
} else {
53
position_
54
},
55
)
56
};
57
58
if count == 0 {
59
*chunk_size *= 2;
60
continue;
61
}
62
63
position = position_;
64
break;
65
}
66
67
offsets.push_back((*last_pos, *last_pos + position));
68
*last_pos += position;
69
}
70
}
71
72
struct ChunkOffsetIter<'a> {
73
bytes: &'a [u8],
74
offsets: VecDeque<(usize, usize)>,
75
last_offset: usize,
76
n_chunks: usize,
77
chunk_size: usize,
78
// not a promise, but something we want
79
#[allow(unused)]
80
rows_per_batch: usize,
81
quote_char: Option<u8>,
82
eol_char: u8,
83
}
84
85
impl Iterator for ChunkOffsetIter<'_> {
86
type Item = (usize, usize);
87
88
fn next(&mut self) -> Option<Self::Item> {
89
match self.offsets.pop_front() {
90
Some(offsets) => Some(offsets),
91
None => {
92
if self.last_offset == self.bytes.len() {
93
return None;
94
}
95
get_file_chunks_iterator(
96
&mut self.offsets,
97
&mut self.last_offset,
98
self.n_chunks,
99
&mut self.chunk_size,
100
self.bytes,
101
self.quote_char,
102
self.eol_char,
103
);
104
match self.offsets.pop_front() {
105
Some(offsets) => Some(offsets),
106
// We depleted the iterator. Ensure we deplete the slice as well
107
None => {
108
let out = Some((self.last_offset, self.bytes.len()));
109
self.last_offset = self.bytes.len();
110
out
111
},
112
}
113
},
114
}
115
}
116
}
117
118
impl<'a> CoreReader<'a> {
119
/// Create a batched csv reader that uses mmap to load data.
120
pub fn batched(mut self) -> PolarsResult<BatchedCsvReader<'a>> {
121
let reader_bytes = self.reader_bytes.take().unwrap();
122
let bytes = reader_bytes.as_ref();
123
let (bytes, starting_point_offset) = self.find_starting_point(
124
bytes,
125
self.parse_options.quote_char,
126
self.parse_options.eol_char,
127
)?;
128
129
let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
130
131
// Copied from [`Self::parse_csv`]
132
let n_parts_hint = n_threads * 16;
133
let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);
134
135
// Use a small min chunk size to catch failures in tests.
136
#[cfg(debug_assertions)]
137
let min_chunk_size = 64;
138
#[cfg(not(debug_assertions))]
139
let min_chunk_size = 1024 * 4;
140
141
let chunk_size = std::cmp::max(chunk_size, min_chunk_size);
142
143
// this is arbitrarily chosen.
144
// we don't want this to depend on the thread pool size
145
// otherwise the chunks are not deterministic
146
let offset_batch_size = 16;
147
// extend lifetime. It is bound to `readerbytes` and we keep track of that
148
// lifetime so this is sound.
149
let bytes = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes) };
150
let file_chunks = ChunkOffsetIter {
151
bytes,
152
offsets: VecDeque::with_capacity(offset_batch_size),
153
last_offset: 0,
154
n_chunks: offset_batch_size,
155
chunk_size,
156
rows_per_batch: self.chunk_size,
157
quote_char: self.parse_options.quote_char,
158
eol_char: self.parse_options.eol_char,
159
};
160
161
let projection = self.get_projection()?;
162
163
Ok(BatchedCsvReader {
164
reader_bytes,
165
parse_options: self.parse_options,
166
chunk_size: self.chunk_size,
167
file_chunks_iter: file_chunks,
168
file_chunks: vec![],
169
projection,
170
starting_point_offset,
171
row_index: self.row_index,
172
null_values: self.null_values,
173
to_cast: self.to_cast,
174
ignore_errors: self.ignore_errors,
175
remaining: self.n_rows.unwrap_or(usize::MAX),
176
schema: self.schema,
177
rows_read: 0,
178
})
179
}
180
}
181
182
pub struct BatchedCsvReader<'a> {
183
reader_bytes: ReaderBytes<'a>,
184
parse_options: CsvParseOptions,
185
chunk_size: usize,
186
file_chunks_iter: ChunkOffsetIter<'a>,
187
file_chunks: Vec<(usize, usize)>,
188
projection: Vec<usize>,
189
starting_point_offset: Option<usize>,
190
row_index: Option<RowIndex>,
191
null_values: Option<NullValuesCompiled>,
192
to_cast: Vec<Field>,
193
ignore_errors: bool,
194
remaining: usize,
195
schema: SchemaRef,
196
rows_read: IdxSize,
197
}
198
199
impl BatchedCsvReader<'_> {
200
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
201
if n == 0 || self.remaining == 0 {
202
return Ok(None);
203
}
204
205
// get next `n` offset positions.
206
let file_chunks_iter = (&mut self.file_chunks_iter).take(n);
207
self.file_chunks.extend(file_chunks_iter);
208
// depleted the offsets iterator, we are done as well.
209
if self.file_chunks.is_empty() {
210
return Ok(None);
211
}
212
let chunks = &self.file_chunks;
213
214
let mut bytes = self.reader_bytes.deref();
215
if let Some(pos) = self.starting_point_offset {
216
bytes = &bytes[pos..];
217
}
218
219
let mut chunks = POOL.install(|| {
220
chunks
221
.into_par_iter()
222
.copied()
223
.map(|(bytes_offset_thread, stop_at_nbytes)| {
224
let mut df = read_chunk(
225
bytes,
226
&self.parse_options,
227
self.schema.as_ref(),
228
self.ignore_errors,
229
&self.projection,
230
bytes_offset_thread,
231
self.chunk_size,
232
self.null_values.as_ref(),
233
usize::MAX,
234
stop_at_nbytes,
235
self.starting_point_offset,
236
)?;
237
238
cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
239
240
if let Some(rc) = &self.row_index {
241
unsafe { df.with_row_index_mut(rc.name.clone(), Some(rc.offset)) };
242
}
243
Ok(df)
244
})
245
.collect::<PolarsResult<Vec<_>>>()
246
})?;
247
self.file_chunks.clear();
248
249
if self.row_index.is_some() {
250
update_row_counts2(&mut chunks, self.rows_read)
251
}
252
for df in &mut chunks {
253
let h = df.height();
254
255
if self.remaining < h {
256
*df = df.slice(0, self.remaining)
257
};
258
self.remaining = self.remaining.saturating_sub(h);
259
260
self.rows_read += h as IdxSize;
261
}
262
Ok(Some(chunks))
263
}
264
}
265
266
pub struct OwnedBatchedCsvReader {
267
#[allow(dead_code)]
268
// this exist because we need to keep ownership
269
schema: SchemaRef,
270
batched_reader: BatchedCsvReader<'static>,
271
// keep ownership
272
_reader: CsvReader<Box<dyn MmapBytesReader>>,
273
}
274
275
impl OwnedBatchedCsvReader {
276
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
277
self.batched_reader.next_batches(n)
278
}
279
}
280
281
pub fn to_batched_owned(
282
mut reader: CsvReader<Box<dyn MmapBytesReader>>,
283
) -> PolarsResult<OwnedBatchedCsvReader> {
284
let batched_reader = reader.batched_borrowed()?;
285
let schema = batched_reader.schema.clone();
286
// If you put a drop(reader) here, rust will complain that reader is borrowed,
287
// so we presumably have to keep ownership of it to maintain the safety of the
288
// 'static transmute.
289
let batched_reader: BatchedCsvReader<'static> = unsafe { std::mem::transmute(batched_reader) };
290
291
Ok(OwnedBatchedCsvReader {
292
schema,
293
batched_reader,
294
_reader: reader,
295
})
296
}
297
298