Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/init.rs
8503 views
1
use std::sync::Arc;
2
3
use polars_core::frame::DataFrame;
4
use polars_error::{PolarsResult, polars_ensure};
5
use polars_io::prelude::_internal::PrefilterMaskSetting;
6
use polars_io::prelude::ParallelStrategy;
7
use polars_utils::IdxSize;
8
9
use super::row_group_data_fetch::RowGroupDataFetcher;
10
use super::row_group_decode::RowGroupDecoder;
11
use super::{AsyncTaskData, ParquetReadImpl};
12
use crate::async_executor;
13
use crate::morsel::{Morsel, SourceToken, get_ideal_morsel_size};
14
use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;
15
use crate::nodes::io_sources::parquet::projection::ArrowFieldProjection;
16
use crate::nodes::io_sources::parquet::statistics::calculate_row_group_pred_pushdown_skip_mask;
17
use crate::nodes::{MorselSeq, TaskPriority};
18
use crate::utils::tokio_handle_ext::{self, AbortOnDropHandle};
19
20
impl ParquetReadImpl {
21
/// Constructs the task that distributes morsels across the engine pipelines.
22
#[allow(clippy::type_complexity)]
23
pub(super) fn init_morsel_distributor(&mut self) -> AsyncTaskData {
24
let verbose = self.verbose;
25
let io_runtime = polars_io::pl_async::get_runtime();
26
27
let use_statistics = self.options.use_statistics;
28
29
let (mut morsel_sender, morsel_rx) = FileReaderOutputSend::new_serial();
30
31
if let Some((_, 0)) = self.normalized_pre_slice {
32
return (
33
morsel_rx,
34
tokio_handle_ext::AbortOnDropHandle(io_runtime.spawn(std::future::ready(Ok(())))),
35
);
36
}
37
38
let projected_arrow_fields = self.projected_arrow_fields.clone();
39
let is_full_projection = self.is_full_projection;
40
41
let row_group_prefetch_size = self.config.row_group_prefetch_size;
42
let predicate = self.predicate.clone();
43
let memory_prefetch_func = self.memory_prefetch_func;
44
45
let row_group_decoder = self.init_row_group_decoder();
46
let row_group_decoder = Arc::new(row_group_decoder);
47
48
let ideal_morsel_size = get_ideal_morsel_size();
49
50
if verbose {
51
eprintln!("[ParquetFileReader]: ideal_morsel_size: {ideal_morsel_size}");
52
}
53
54
let metadata = self.metadata.clone();
55
let normalized_pre_slice = self.normalized_pre_slice;
56
let byte_source = self.byte_source.clone();
57
58
// Prefetch loop (spawns prefetches on the tokio scheduler).
59
let (prefetch_send, mut prefetch_recv) =
60
tokio::sync::mpsc::channel(row_group_prefetch_size);
61
62
let row_index = self.row_index.clone();
63
64
let rg_prefetch_semaphore = Arc::clone(&self.rg_prefetch_semaphore);
65
let rg_prefetch_prev_all_spawned = Option::take(&mut self.rg_prefetch_prev_all_spawned);
66
let rg_prefetch_current_all_spawned =
67
Option::take(&mut self.rg_prefetch_current_all_spawned);
68
69
let prefetch_task = AbortOnDropHandle(io_runtime.spawn(async move {
70
polars_ensure!(
71
metadata.num_rows < IdxSize::MAX as usize,
72
bigidx,
73
ctx = "parquet file",
74
size = metadata.num_rows
75
);
76
77
// Calculate the row groups that need to be read and the slice range relative to those
78
// row groups.
79
let mut row_offset = 0;
80
let mut slice_range =
81
normalized_pre_slice.map(|(offset, length)| offset..offset + length);
82
let mut row_group_slice = 0..metadata.row_groups.len();
83
if let Some(pre_slice) = normalized_pre_slice {
84
let mut start = 0;
85
let mut start_offset = 0;
86
87
let mut num_offset_remaining = pre_slice.0;
88
let mut num_length_remaining = pre_slice.1;
89
90
for rg in &metadata.row_groups {
91
if rg.num_rows() > num_offset_remaining {
92
start_offset = num_offset_remaining;
93
num_length_remaining = num_length_remaining
94
.saturating_sub(rg.num_rows() - num_offset_remaining);
95
break;
96
}
97
98
row_offset += rg.num_rows();
99
num_offset_remaining -= rg.num_rows();
100
start += 1;
101
}
102
103
let mut end = start + 1;
104
105
while num_length_remaining > 0 {
106
num_length_remaining =
107
num_length_remaining.saturating_sub(metadata.row_groups[end].num_rows());
108
end += 1;
109
}
110
111
slice_range = Some(start_offset..start_offset + pre_slice.1);
112
row_group_slice = start..end;
113
114
if verbose {
115
eprintln!(
116
"[ParquetFileReader]: Slice pushdown: \
117
reading {} / {} row groups",
118
row_group_slice.len(),
119
metadata.row_groups.len()
120
);
121
}
122
}
123
124
let row_group_mask = calculate_row_group_pred_pushdown_skip_mask(
125
row_group_slice.clone(),
126
use_statistics,
127
predicate.as_ref(),
128
&metadata,
129
projected_arrow_fields.clone(),
130
row_index,
131
verbose,
132
)
133
.await?;
134
135
let mut row_group_data_fetcher = RowGroupDataFetcher {
136
projection: projected_arrow_fields.clone(),
137
is_full_projection,
138
predicate,
139
slice_range,
140
memory_prefetch_func,
141
metadata,
142
byte_source,
143
row_group_slice,
144
row_group_mask,
145
row_offset,
146
};
147
148
if let Some(rg_prefetch_prev_all_spawned) = rg_prefetch_prev_all_spawned {
149
rg_prefetch_prev_all_spawned.wait().await;
150
}
151
152
loop {
153
let fetch_permit = rg_prefetch_semaphore.clone().acquire_owned().await.unwrap();
154
155
let Some(prefetch) = row_group_data_fetcher.next().await else {
156
break;
157
};
158
159
if prefetch_send.send((prefetch?, fetch_permit)).await.is_err() {
160
break;
161
}
162
}
163
164
drop(rg_prefetch_current_all_spawned);
165
166
PolarsResult::Ok(())
167
}));
168
169
// Decode loop (spawns decodes on the computational executor).
170
let (decode_send, mut decode_recv) = tokio::sync::mpsc::channel(self.config.num_pipelines);
171
let decode_task = AbortOnDropHandle(io_runtime.spawn(async move {
172
while let Some((prefetch_task, permit)) = prefetch_recv.recv().await {
173
let row_group_data = prefetch_task.await.unwrap()?;
174
let row_group_decoder = row_group_decoder.clone();
175
let decode_fut = async_executor::spawn(TaskPriority::High, async move {
176
row_group_decoder.row_group_data_to_df(row_group_data).await
177
});
178
if decode_send.send((decode_fut, permit)).await.is_err() {
179
break;
180
}
181
}
182
PolarsResult::Ok(())
183
}));
184
185
// Distributes morsels across pipelines. This does not perform any CPU or I/O bound work -
186
// it is purely a dispatch loop. Run on the computational executor to reduce context switches.
187
let last_morsel_min_split = self.config.num_pipelines;
188
let disable_morsel_split = self.disable_morsel_split;
189
let distribute_task = async_executor::spawn(TaskPriority::High, async move {
190
let mut morsel_seq = MorselSeq::default();
191
// Note: We don't use this (it is handled by the bridge). But morsels require a source token.
192
let source_token = SourceToken::new();
193
194
// Decode first non-empty morsel.
195
let mut next = None;
196
loop {
197
let Some((decode_fut, permit)) = decode_recv.recv().await else {
198
break;
199
};
200
let df = decode_fut.await?;
201
if df.height() == 0 {
202
continue;
203
}
204
205
if disable_morsel_split {
206
if morsel_sender
207
.send_morsel(Morsel::new(df, morsel_seq, source_token.clone()))
208
.await
209
.is_err()
210
{
211
return Ok(());
212
}
213
drop(permit);
214
morsel_seq = morsel_seq.successor();
215
continue;
216
}
217
218
next = Some((df, permit));
219
break;
220
}
221
222
while let Some((df, permit)) = next.take() {
223
// Try to decode the next non-empty morsel first, so we know
224
// whether the df is the last morsel.
225
226
// Important: Drop this before awaiting the next one, or could
227
// deadlock if the permit limit is 1.
228
drop(permit);
229
230
loop {
231
let Some((decode_fut, permit)) = decode_recv.recv().await else {
232
break;
233
};
234
let next_df = decode_fut.await?;
235
if next_df.height() == 0 {
236
continue;
237
}
238
next = Some((next_df, permit));
239
break;
240
}
241
242
for df in split_to_morsels(
243
&df,
244
ideal_morsel_size,
245
next.is_none(),
246
last_morsel_min_split,
247
) {
248
if morsel_sender
249
.send_morsel(Morsel::new(df, morsel_seq, source_token.clone()))
250
.await
251
.is_err()
252
{
253
return Ok(());
254
}
255
morsel_seq = morsel_seq.successor();
256
}
257
}
258
259
PolarsResult::Ok(())
260
});
261
262
let join_task = io_runtime.spawn(async move {
263
prefetch_task.await.unwrap()?;
264
decode_task.await.unwrap()?;
265
distribute_task.await?;
266
Ok(())
267
});
268
269
(morsel_rx, AbortOnDropHandle(join_task))
270
}
271
272
/// Creates a `RowGroupDecoder` that turns `RowGroupData` into DataFrames.
273
/// This must be called AFTER the following have been initialized:
274
/// * `self.projected_arrow_fields`
275
/// * `self.physical_predicate`
276
pub(super) fn init_row_group_decoder(&mut self) -> RowGroupDecoder {
277
let projected_arrow_fields = self.projected_arrow_fields.clone();
278
let row_index = self.row_index.clone();
279
let target_values_per_thread = self.config.target_values_per_thread;
280
let predicate = self.predicate.clone();
281
282
let mut use_prefiltered = matches!(self.options.parallel, ParallelStrategy::Prefiltered);
283
use_prefiltered |=
284
predicate.is_some() && matches!(self.options.parallel, ParallelStrategy::Auto);
285
286
let predicate_field_indices: Arc<[usize]> =
287
if use_prefiltered && let Some(predicate) = predicate.as_ref() {
288
projected_arrow_fields
289
.iter()
290
.enumerate()
291
.filter_map(|(i, projected_field)| {
292
predicate
293
.live_columns
294
.contains(projected_field.output_name())
295
.then_some(i)
296
})
297
.collect()
298
} else {
299
Default::default()
300
};
301
302
let use_prefiltered = use_prefiltered.then(PrefilterMaskSetting::init_from_env);
303
304
let non_predicate_field_indices: Arc<[usize]> = if use_prefiltered.is_some() {
305
filtered_range(
306
predicate_field_indices.as_ref(),
307
projected_arrow_fields.len(),
308
)
309
.collect()
310
} else {
311
Default::default()
312
};
313
314
if use_prefiltered.is_some() && self.verbose {
315
eprintln!(
316
"[ParquetFileReader]: Pre-filtered decode enabled ({} live, {} non-live)",
317
predicate_field_indices.len(),
318
non_predicate_field_indices.len()
319
)
320
}
321
322
let allow_column_predicates = predicate
323
.as_ref()
324
.is_some_and(|x| x.column_predicates.is_sumwise_complete)
325
&& row_index.is_none()
326
&& !projected_arrow_fields.iter().any(|x| {
327
x.arrow_field().dtype().is_nested()
328
|| matches!(x, ArrowFieldProjection::Mapped { .. })
329
});
330
331
RowGroupDecoder {
332
num_pipelines: self.config.num_pipelines,
333
projected_arrow_fields,
334
row_index,
335
predicate,
336
allow_column_predicates,
337
use_prefiltered,
338
predicate_field_indices,
339
non_predicate_field_indices,
340
target_values_per_thread,
341
}
342
}
343
}
344
345
/// Returns 0..len in a Vec, excluding indices in `exclude`.
346
/// `exclude` needs to be a sorted list of unique values.
347
fn filtered_range(exclude: &[usize], len: usize) -> impl Iterator<Item = usize> {
348
if cfg!(debug_assertions) {
349
assert!(exclude.windows(2).all(|x| x[1] > x[0]));
350
}
351
352
let mut j = 0;
353
354
(0..len).filter(move |&i| {
355
if j == exclude.len() || i != exclude[j] {
356
true
357
} else {
358
j += 1;
359
false
360
}
361
})
362
}
363
364
pub(crate) fn split_to_morsels(
365
df: &DataFrame,
366
ideal_morsel_size: usize,
367
last_morsel: bool,
368
last_morsel_min_split: usize,
369
) -> impl Iterator<Item = DataFrame> + '_ {
370
let mut n_morsels = if df.height() > 3 * ideal_morsel_size / 2 {
371
// num_rows > (1.5 * ideal_morsel_size)
372
(df.height() / ideal_morsel_size).max(2)
373
} else {
374
1
375
};
376
377
if last_morsel {
378
n_morsels = n_morsels.max(last_morsel_min_split);
379
}
380
381
let rows_per_morsel = df.height().div_ceil(n_morsels).max(1);
382
383
(0..i64::try_from(df.height()).unwrap())
384
.step_by(rows_per_morsel)
385
.map(move |offset| df.slice(offset, rows_per_morsel))
386
.filter(|df| df.height() > 0)
387
}
388
389
mod tests {
390
391
#[test]
392
fn test_filtered_range() {
393
use super::filtered_range;
394
assert_eq!(
395
filtered_range(&[1, 3], 7).collect::<Vec<_>>().as_slice(),
396
&[0, 2, 4, 5, 6]
397
);
398
assert_eq!(
399
filtered_range(&[1, 6], 7).collect::<Vec<_>>().as_slice(),
400
&[0, 2, 3, 4, 5]
401
);
402
}
403
}
404
405