Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/init.rs
6939 views
1
use std::sync::Arc;
2
3
use polars_core::frame::DataFrame;
4
use polars_error::{PolarsResult, polars_ensure};
5
use polars_io::prelude::_internal::PrefilterMaskSetting;
6
use polars_io::prelude::ParallelStrategy;
7
use polars_utils::IdxSize;
8
9
use super::row_group_data_fetch::RowGroupDataFetcher;
10
use super::row_group_decode::RowGroupDecoder;
11
use super::{AsyncTaskData, ParquetReadImpl};
12
use crate::async_executor;
13
use crate::morsel::{Morsel, SourceToken, get_ideal_morsel_size};
14
use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;
15
use crate::nodes::io_sources::parquet::projection::ArrowFieldProjection;
16
use crate::nodes::io_sources::parquet::statistics::calculate_row_group_pred_pushdown_skip_mask;
17
use crate::nodes::{MorselSeq, TaskPriority};
18
use crate::utils::task_handles_ext::{self, AbortOnDropHandle};
19
20
impl ParquetReadImpl {
21
/// Constructs the task that distributes morsels across the engine pipelines.
22
#[allow(clippy::type_complexity)]
23
pub(super) fn init_morsel_distributor(&mut self) -> AsyncTaskData {
24
let verbose = self.verbose;
25
let io_runtime = polars_io::pl_async::get_runtime();
26
27
let use_statistics = self.options.use_statistics;
28
29
let (mut morsel_sender, morsel_rx) = FileReaderOutputSend::new_serial();
30
31
if let Some((_, 0)) = self.normalized_pre_slice {
32
return (
33
morsel_rx,
34
task_handles_ext::AbortOnDropHandle(io_runtime.spawn(std::future::ready(Ok(())))),
35
);
36
}
37
38
let projected_arrow_fields = self.projected_arrow_fields.clone();
39
let is_full_projection = self.is_full_projection;
40
41
let row_group_prefetch_size = self.config.row_group_prefetch_size;
42
let predicate = self.predicate.clone();
43
let memory_prefetch_func = self.memory_prefetch_func;
44
45
let row_group_decoder = self.init_row_group_decoder();
46
let row_group_decoder = Arc::new(row_group_decoder);
47
48
let ideal_morsel_size = get_ideal_morsel_size();
49
50
if verbose {
51
eprintln!("[ParquetFileReader]: ideal_morsel_size: {ideal_morsel_size}");
52
}
53
54
let metadata = self.metadata.clone();
55
let normalized_pre_slice = self.normalized_pre_slice;
56
let byte_source = self.byte_source.clone();
57
58
// Prefetch loop (spawns prefetches on the tokio scheduler).
59
let (prefetch_send, mut prefetch_recv) =
60
tokio::sync::mpsc::channel(row_group_prefetch_size);
61
62
let row_index = self.row_index.clone();
63
64
let prefetch_task = AbortOnDropHandle(io_runtime.spawn(async move {
65
polars_ensure!(
66
metadata.num_rows < IdxSize::MAX as usize,
67
bigidx,
68
ctx = "parquet file",
69
size = metadata.num_rows
70
);
71
72
// Calculate the row groups that need to be read and the slice range relative to those
73
// row groups.
74
let mut row_offset = 0;
75
let mut slice_range =
76
normalized_pre_slice.map(|(offset, length)| offset..offset + length);
77
let mut row_group_slice = 0..metadata.row_groups.len();
78
if let Some(pre_slice) = normalized_pre_slice {
79
let mut start = 0;
80
let mut start_offset = 0;
81
82
let mut num_offset_remaining = pre_slice.0;
83
let mut num_length_remaining = pre_slice.1;
84
85
for rg in &metadata.row_groups {
86
if rg.num_rows() > num_offset_remaining {
87
start_offset = num_offset_remaining;
88
num_length_remaining = num_length_remaining
89
.saturating_sub(rg.num_rows() - num_offset_remaining);
90
break;
91
}
92
93
row_offset += rg.num_rows();
94
num_offset_remaining -= rg.num_rows();
95
start += 1;
96
}
97
98
let mut end = start + 1;
99
100
while num_length_remaining > 0 {
101
num_length_remaining =
102
num_length_remaining.saturating_sub(metadata.row_groups[end].num_rows());
103
end += 1;
104
}
105
106
slice_range = Some(start_offset..start_offset + pre_slice.1);
107
row_group_slice = start..end;
108
109
if verbose {
110
eprintln!(
111
"[ParquetFileReader]: Slice pushdown: \
112
reading {} / {} row groups",
113
row_group_slice.len(),
114
metadata.row_groups.len()
115
);
116
}
117
}
118
119
let row_group_mask = calculate_row_group_pred_pushdown_skip_mask(
120
row_group_slice.clone(),
121
use_statistics,
122
predicate.as_ref(),
123
&metadata,
124
projected_arrow_fields.clone(),
125
row_index,
126
verbose,
127
)
128
.await?;
129
130
let mut row_group_data_fetcher = RowGroupDataFetcher {
131
projection: projected_arrow_fields.clone(),
132
is_full_projection,
133
predicate,
134
slice_range,
135
memory_prefetch_func,
136
metadata,
137
byte_source,
138
row_group_slice,
139
row_group_mask,
140
row_offset,
141
};
142
143
while let Some(prefetch) = row_group_data_fetcher.next().await {
144
if prefetch_send.send(prefetch?).await.is_err() {
145
break;
146
}
147
}
148
PolarsResult::Ok(())
149
}));
150
151
// Decode loop (spawns decodes on the computational executor).
152
let (decode_send, mut decode_recv) = tokio::sync::mpsc::channel(self.config.num_pipelines);
153
let decode_task = AbortOnDropHandle(io_runtime.spawn(async move {
154
while let Some(prefetch) = prefetch_recv.recv().await {
155
let row_group_data = prefetch.await.unwrap()?;
156
let row_group_decoder = row_group_decoder.clone();
157
let decode_fut = async_executor::spawn(TaskPriority::High, async move {
158
row_group_decoder.row_group_data_to_df(row_group_data).await
159
});
160
if decode_send.send(decode_fut).await.is_err() {
161
break;
162
}
163
}
164
PolarsResult::Ok(())
165
}));
166
167
// Distributes morsels across pipelines. This does not perform any CPU or I/O bound work -
168
// it is purely a dispatch loop. Run on the computational executor to reduce context switches.
169
let last_morsel_min_split = self.config.num_pipelines;
170
let distribute_task = async_executor::spawn(TaskPriority::High, async move {
171
let mut morsel_seq = MorselSeq::default();
172
// Note: We don't use this (it is handled by the bridge). But morsels require a source token.
173
let source_token = SourceToken::new();
174
175
// Decode first non-empty morsel.
176
let mut next = None;
177
loop {
178
let Some(decode_fut) = decode_recv.recv().await else {
179
break;
180
};
181
let df = decode_fut.await?;
182
if df.height() == 0 {
183
continue;
184
}
185
next = Some(df);
186
break;
187
}
188
189
while let Some(df) = next.take() {
190
// Try to decode the next non-empty morsel first, so we know
191
// whether the df is the last morsel.
192
loop {
193
let Some(decode_fut) = decode_recv.recv().await else {
194
break;
195
};
196
let next_df = decode_fut.await?;
197
if next_df.height() == 0 {
198
continue;
199
}
200
next = Some(next_df);
201
break;
202
}
203
204
for df in split_to_morsels(
205
&df,
206
ideal_morsel_size,
207
next.is_none(),
208
last_morsel_min_split,
209
) {
210
if morsel_sender
211
.send_morsel(Morsel::new(df, morsel_seq, source_token.clone()))
212
.await
213
.is_err()
214
{
215
return Ok(());
216
}
217
morsel_seq = morsel_seq.successor();
218
}
219
}
220
PolarsResult::Ok(())
221
});
222
223
let join_task = io_runtime.spawn(async move {
224
prefetch_task.await.unwrap()?;
225
decode_task.await.unwrap()?;
226
distribute_task.await?;
227
Ok(())
228
});
229
230
(morsel_rx, AbortOnDropHandle(join_task))
231
}
232
233
/// Creates a `RowGroupDecoder` that turns `RowGroupData` into DataFrames.
234
/// This must be called AFTER the following have been initialized:
235
/// * `self.projected_arrow_fields`
236
/// * `self.physical_predicate`
237
pub(super) fn init_row_group_decoder(&mut self) -> RowGroupDecoder {
238
let projected_arrow_fields = self.projected_arrow_fields.clone();
239
let row_index = self.row_index.clone();
240
let target_values_per_thread = self.config.target_values_per_thread;
241
let predicate = self.predicate.clone();
242
243
let mut use_prefiltered = matches!(self.options.parallel, ParallelStrategy::Prefiltered);
244
use_prefiltered |=
245
predicate.is_some() && matches!(self.options.parallel, ParallelStrategy::Auto);
246
247
let predicate_field_indices: Arc<[usize]> =
248
if use_prefiltered && let Some(predicate) = predicate.as_ref() {
249
projected_arrow_fields
250
.iter()
251
.enumerate()
252
.filter_map(|(i, projected_field)| {
253
predicate
254
.live_columns
255
.contains(projected_field.output_name())
256
.then_some(i)
257
})
258
.collect()
259
} else {
260
Default::default()
261
};
262
263
let use_prefiltered = use_prefiltered.then(PrefilterMaskSetting::init_from_env);
264
265
let non_predicate_field_indices: Arc<[usize]> = if use_prefiltered.is_some() {
266
filtered_range(
267
predicate_field_indices.as_ref(),
268
projected_arrow_fields.len(),
269
)
270
.collect()
271
} else {
272
Default::default()
273
};
274
275
if use_prefiltered.is_some() && self.verbose {
276
eprintln!(
277
"[ParquetFileReader]: Pre-filtered decode enabled ({} live, {} non-live)",
278
predicate_field_indices.len(),
279
non_predicate_field_indices.len()
280
)
281
}
282
283
let allow_column_predicates = predicate
284
.as_ref()
285
.is_some_and(|x| x.column_predicates.is_sumwise_complete)
286
&& row_index.is_none()
287
&& !projected_arrow_fields.iter().any(|x| {
288
x.arrow_field().dtype().is_nested()
289
|| matches!(x, ArrowFieldProjection::Mapped { .. })
290
});
291
292
RowGroupDecoder {
293
num_pipelines: self.config.num_pipelines,
294
projected_arrow_fields,
295
row_index,
296
predicate,
297
allow_column_predicates,
298
use_prefiltered,
299
predicate_field_indices,
300
non_predicate_field_indices,
301
target_values_per_thread,
302
}
303
}
304
}
305
306
/// Returns 0..len in a Vec, excluding indices in `exclude`.
307
/// `exclude` needs to be a sorted list of unique values.
308
fn filtered_range(exclude: &[usize], len: usize) -> impl Iterator<Item = usize> {
309
if cfg!(debug_assertions) {
310
assert!(exclude.windows(2).all(|x| x[1] > x[0]));
311
}
312
313
let mut j = 0;
314
315
(0..len).filter(move |&i| {
316
if j == exclude.len() || i != exclude[j] {
317
true
318
} else {
319
j += 1;
320
false
321
}
322
})
323
}
324
325
fn split_to_morsels(
326
df: &DataFrame,
327
ideal_morsel_size: usize,
328
last_morsel: bool,
329
last_morsel_min_split: usize,
330
) -> impl Iterator<Item = DataFrame> + '_ {
331
let mut n_morsels = if df.height() > 3 * ideal_morsel_size / 2 {
332
// num_rows > (1.5 * ideal_morsel_size)
333
(df.height() / ideal_morsel_size).max(2)
334
} else {
335
1
336
};
337
338
if last_morsel {
339
n_morsels = n_morsels.max(last_morsel_min_split);
340
}
341
342
let rows_per_morsel = df.height().div_ceil(n_morsels).max(1);
343
344
(0..i64::try_from(df.height()).unwrap())
345
.step_by(rows_per_morsel)
346
.map(move |offset| df.slice(offset, rows_per_morsel))
347
.filter(|df| df.height() > 0)
348
}
349
350
mod tests {
351
352
#[test]
353
fn test_filtered_range() {
354
use super::filtered_range;
355
assert_eq!(
356
filtered_range(&[1, 3], 7).collect::<Vec<_>>().as_slice(),
357
&[0, 2, 4, 5, 6]
358
);
359
assert_eq!(
360
filtered_range(&[1, 6], 7).collect::<Vec<_>>().as_slice(),
361
&[0, 2, 3, 4, 5]
362
);
363
}
364
}
365
366