Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/init.rs
6939 views
use std::sync::Arc;12use polars_core::frame::DataFrame;3use polars_error::{PolarsResult, polars_ensure};4use polars_io::prelude::_internal::PrefilterMaskSetting;5use polars_io::prelude::ParallelStrategy;6use polars_utils::IdxSize;78use super::row_group_data_fetch::RowGroupDataFetcher;9use super::row_group_decode::RowGroupDecoder;10use super::{AsyncTaskData, ParquetReadImpl};11use crate::async_executor;12use crate::morsel::{Morsel, SourceToken, get_ideal_morsel_size};13use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;14use crate::nodes::io_sources::parquet::projection::ArrowFieldProjection;15use crate::nodes::io_sources::parquet::statistics::calculate_row_group_pred_pushdown_skip_mask;16use crate::nodes::{MorselSeq, TaskPriority};17use crate::utils::task_handles_ext::{self, AbortOnDropHandle};1819impl ParquetReadImpl {20/// Constructs the task that distributes morsels across the engine pipelines.21#[allow(clippy::type_complexity)]22pub(super) fn init_morsel_distributor(&mut self) -> AsyncTaskData {23let verbose = self.verbose;24let io_runtime = polars_io::pl_async::get_runtime();2526let use_statistics = self.options.use_statistics;2728let (mut morsel_sender, morsel_rx) = FileReaderOutputSend::new_serial();2930if let Some((_, 0)) = self.normalized_pre_slice {31return (32morsel_rx,33task_handles_ext::AbortOnDropHandle(io_runtime.spawn(std::future::ready(Ok(())))),34);35}3637let projected_arrow_fields = self.projected_arrow_fields.clone();38let is_full_projection = self.is_full_projection;3940let row_group_prefetch_size = self.config.row_group_prefetch_size;41let predicate = self.predicate.clone();42let memory_prefetch_func = self.memory_prefetch_func;4344let row_group_decoder = self.init_row_group_decoder();45let row_group_decoder = Arc::new(row_group_decoder);4647let ideal_morsel_size = get_ideal_morsel_size();4849if verbose {50eprintln!("[ParquetFileReader]: ideal_morsel_size: {ideal_morsel_size}");51}5253let metadata = self.metadata.clone();54let normalized_pre_slice = self.normalized_pre_slice;55let byte_source = self.byte_source.clone();5657// Prefetch loop (spawns prefetches on the tokio scheduler).58let (prefetch_send, mut prefetch_recv) =59tokio::sync::mpsc::channel(row_group_prefetch_size);6061let row_index = self.row_index.clone();6263let prefetch_task = AbortOnDropHandle(io_runtime.spawn(async move {64polars_ensure!(65metadata.num_rows < IdxSize::MAX as usize,66bigidx,67ctx = "parquet file",68size = metadata.num_rows69);7071// Calculate the row groups that need to be read and the slice range relative to those72// row groups.73let mut row_offset = 0;74let mut slice_range =75normalized_pre_slice.map(|(offset, length)| offset..offset + length);76let mut row_group_slice = 0..metadata.row_groups.len();77if let Some(pre_slice) = normalized_pre_slice {78let mut start = 0;79let mut start_offset = 0;8081let mut num_offset_remaining = pre_slice.0;82let mut num_length_remaining = pre_slice.1;8384for rg in &metadata.row_groups {85if rg.num_rows() > num_offset_remaining {86start_offset = num_offset_remaining;87num_length_remaining = num_length_remaining88.saturating_sub(rg.num_rows() - num_offset_remaining);89break;90}9192row_offset += rg.num_rows();93num_offset_remaining -= rg.num_rows();94start += 1;95}9697let mut end = start + 1;9899while num_length_remaining > 0 {100num_length_remaining =101num_length_remaining.saturating_sub(metadata.row_groups[end].num_rows());102end += 1;103}104105slice_range = Some(start_offset..start_offset + pre_slice.1);106row_group_slice = start..end;107108if verbose {109eprintln!(110"[ParquetFileReader]: Slice pushdown: \111reading {} / {} row groups",112row_group_slice.len(),113metadata.row_groups.len()114);115}116}117118let row_group_mask = calculate_row_group_pred_pushdown_skip_mask(119row_group_slice.clone(),120use_statistics,121predicate.as_ref(),122&metadata,123projected_arrow_fields.clone(),124row_index,125verbose,126)127.await?;128129let mut row_group_data_fetcher = RowGroupDataFetcher {130projection: projected_arrow_fields.clone(),131is_full_projection,132predicate,133slice_range,134memory_prefetch_func,135metadata,136byte_source,137row_group_slice,138row_group_mask,139row_offset,140};141142while let Some(prefetch) = row_group_data_fetcher.next().await {143if prefetch_send.send(prefetch?).await.is_err() {144break;145}146}147PolarsResult::Ok(())148}));149150// Decode loop (spawns decodes on the computational executor).151let (decode_send, mut decode_recv) = tokio::sync::mpsc::channel(self.config.num_pipelines);152let decode_task = AbortOnDropHandle(io_runtime.spawn(async move {153while let Some(prefetch) = prefetch_recv.recv().await {154let row_group_data = prefetch.await.unwrap()?;155let row_group_decoder = row_group_decoder.clone();156let decode_fut = async_executor::spawn(TaskPriority::High, async move {157row_group_decoder.row_group_data_to_df(row_group_data).await158});159if decode_send.send(decode_fut).await.is_err() {160break;161}162}163PolarsResult::Ok(())164}));165166// Distributes morsels across pipelines. This does not perform any CPU or I/O bound work -167// it is purely a dispatch loop. Run on the computational executor to reduce context switches.168let last_morsel_min_split = self.config.num_pipelines;169let distribute_task = async_executor::spawn(TaskPriority::High, async move {170let mut morsel_seq = MorselSeq::default();171// Note: We don't use this (it is handled by the bridge). But morsels require a source token.172let source_token = SourceToken::new();173174// Decode first non-empty morsel.175let mut next = None;176loop {177let Some(decode_fut) = decode_recv.recv().await else {178break;179};180let df = decode_fut.await?;181if df.height() == 0 {182continue;183}184next = Some(df);185break;186}187188while let Some(df) = next.take() {189// Try to decode the next non-empty morsel first, so we know190// whether the df is the last morsel.191loop {192let Some(decode_fut) = decode_recv.recv().await else {193break;194};195let next_df = decode_fut.await?;196if next_df.height() == 0 {197continue;198}199next = Some(next_df);200break;201}202203for df in split_to_morsels(204&df,205ideal_morsel_size,206next.is_none(),207last_morsel_min_split,208) {209if morsel_sender210.send_morsel(Morsel::new(df, morsel_seq, source_token.clone()))211.await212.is_err()213{214return Ok(());215}216morsel_seq = morsel_seq.successor();217}218}219PolarsResult::Ok(())220});221222let join_task = io_runtime.spawn(async move {223prefetch_task.await.unwrap()?;224decode_task.await.unwrap()?;225distribute_task.await?;226Ok(())227});228229(morsel_rx, AbortOnDropHandle(join_task))230}231232/// Creates a `RowGroupDecoder` that turns `RowGroupData` into DataFrames.233/// This must be called AFTER the following have been initialized:234/// * `self.projected_arrow_fields`235/// * `self.physical_predicate`236pub(super) fn init_row_group_decoder(&mut self) -> RowGroupDecoder {237let projected_arrow_fields = self.projected_arrow_fields.clone();238let row_index = self.row_index.clone();239let target_values_per_thread = self.config.target_values_per_thread;240let predicate = self.predicate.clone();241242let mut use_prefiltered = matches!(self.options.parallel, ParallelStrategy::Prefiltered);243use_prefiltered |=244predicate.is_some() && matches!(self.options.parallel, ParallelStrategy::Auto);245246let predicate_field_indices: Arc<[usize]> =247if use_prefiltered && let Some(predicate) = predicate.as_ref() {248projected_arrow_fields249.iter()250.enumerate()251.filter_map(|(i, projected_field)| {252predicate253.live_columns254.contains(projected_field.output_name())255.then_some(i)256})257.collect()258} else {259Default::default()260};261262let use_prefiltered = use_prefiltered.then(PrefilterMaskSetting::init_from_env);263264let non_predicate_field_indices: Arc<[usize]> = if use_prefiltered.is_some() {265filtered_range(266predicate_field_indices.as_ref(),267projected_arrow_fields.len(),268)269.collect()270} else {271Default::default()272};273274if use_prefiltered.is_some() && self.verbose {275eprintln!(276"[ParquetFileReader]: Pre-filtered decode enabled ({} live, {} non-live)",277predicate_field_indices.len(),278non_predicate_field_indices.len()279)280}281282let allow_column_predicates = predicate283.as_ref()284.is_some_and(|x| x.column_predicates.is_sumwise_complete)285&& row_index.is_none()286&& !projected_arrow_fields.iter().any(|x| {287x.arrow_field().dtype().is_nested()288|| matches!(x, ArrowFieldProjection::Mapped { .. })289});290291RowGroupDecoder {292num_pipelines: self.config.num_pipelines,293projected_arrow_fields,294row_index,295predicate,296allow_column_predicates,297use_prefiltered,298predicate_field_indices,299non_predicate_field_indices,300target_values_per_thread,301}302}303}304305/// Returns 0..len in a Vec, excluding indices in `exclude`.306/// `exclude` needs to be a sorted list of unique values.307fn filtered_range(exclude: &[usize], len: usize) -> impl Iterator<Item = usize> {308if cfg!(debug_assertions) {309assert!(exclude.windows(2).all(|x| x[1] > x[0]));310}311312let mut j = 0;313314(0..len).filter(move |&i| {315if j == exclude.len() || i != exclude[j] {316true317} else {318j += 1;319false320}321})322}323324fn split_to_morsels(325df: &DataFrame,326ideal_morsel_size: usize,327last_morsel: bool,328last_morsel_min_split: usize,329) -> impl Iterator<Item = DataFrame> + '_ {330let mut n_morsels = if df.height() > 3 * ideal_morsel_size / 2 {331// num_rows > (1.5 * ideal_morsel_size)332(df.height() / ideal_morsel_size).max(2)333} else {3341335};336337if last_morsel {338n_morsels = n_morsels.max(last_morsel_min_split);339}340341let rows_per_morsel = df.height().div_ceil(n_morsels).max(1);342343(0..i64::try_from(df.height()).unwrap())344.step_by(rows_per_morsel)345.map(move |offset| df.slice(offset, rows_per_morsel))346.filter(|df| df.height() > 0)347}348349mod tests {350351#[test]352fn test_filtered_range() {353use super::filtered_range;354assert_eq!(355filtered_range(&[1, 3], 7).collect::<Vec<_>>().as_slice(),356&[0, 2, 4, 5, 6]357);358assert_eq!(359filtered_range(&[1, 6], 7).collect::<Vec<_>>().as_slice(),360&[0, 2, 3, 4, 5]361);362}363}364365366