Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/init.rs
8503 views
use std::sync::Arc;12use polars_core::frame::DataFrame;3use polars_error::{PolarsResult, polars_ensure};4use polars_io::prelude::_internal::PrefilterMaskSetting;5use polars_io::prelude::ParallelStrategy;6use polars_utils::IdxSize;78use super::row_group_data_fetch::RowGroupDataFetcher;9use super::row_group_decode::RowGroupDecoder;10use super::{AsyncTaskData, ParquetReadImpl};11use crate::async_executor;12use crate::morsel::{Morsel, SourceToken, get_ideal_morsel_size};13use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;14use crate::nodes::io_sources::parquet::projection::ArrowFieldProjection;15use crate::nodes::io_sources::parquet::statistics::calculate_row_group_pred_pushdown_skip_mask;16use crate::nodes::{MorselSeq, TaskPriority};17use crate::utils::tokio_handle_ext::{self, AbortOnDropHandle};1819impl ParquetReadImpl {20/// Constructs the task that distributes morsels across the engine pipelines.21#[allow(clippy::type_complexity)]22pub(super) fn init_morsel_distributor(&mut self) -> AsyncTaskData {23let verbose = self.verbose;24let io_runtime = polars_io::pl_async::get_runtime();2526let use_statistics = self.options.use_statistics;2728let (mut morsel_sender, morsel_rx) = FileReaderOutputSend::new_serial();2930if let Some((_, 0)) = self.normalized_pre_slice {31return (32morsel_rx,33tokio_handle_ext::AbortOnDropHandle(io_runtime.spawn(std::future::ready(Ok(())))),34);35}3637let projected_arrow_fields = self.projected_arrow_fields.clone();38let is_full_projection = self.is_full_projection;3940let row_group_prefetch_size = self.config.row_group_prefetch_size;41let predicate = self.predicate.clone();42let memory_prefetch_func = self.memory_prefetch_func;4344let row_group_decoder = self.init_row_group_decoder();45let row_group_decoder = Arc::new(row_group_decoder);4647let ideal_morsel_size = get_ideal_morsel_size();4849if verbose {50eprintln!("[ParquetFileReader]: ideal_morsel_size: {ideal_morsel_size}");51}5253let metadata = self.metadata.clone();54let normalized_pre_slice = self.normalized_pre_slice;55let byte_source = self.byte_source.clone();5657// Prefetch loop (spawns prefetches on the tokio scheduler).58let (prefetch_send, mut prefetch_recv) =59tokio::sync::mpsc::channel(row_group_prefetch_size);6061let row_index = self.row_index.clone();6263let rg_prefetch_semaphore = Arc::clone(&self.rg_prefetch_semaphore);64let rg_prefetch_prev_all_spawned = Option::take(&mut self.rg_prefetch_prev_all_spawned);65let rg_prefetch_current_all_spawned =66Option::take(&mut self.rg_prefetch_current_all_spawned);6768let prefetch_task = AbortOnDropHandle(io_runtime.spawn(async move {69polars_ensure!(70metadata.num_rows < IdxSize::MAX as usize,71bigidx,72ctx = "parquet file",73size = metadata.num_rows74);7576// Calculate the row groups that need to be read and the slice range relative to those77// row groups.78let mut row_offset = 0;79let mut slice_range =80normalized_pre_slice.map(|(offset, length)| offset..offset + length);81let mut row_group_slice = 0..metadata.row_groups.len();82if let Some(pre_slice) = normalized_pre_slice {83let mut start = 0;84let mut start_offset = 0;8586let mut num_offset_remaining = pre_slice.0;87let mut num_length_remaining = pre_slice.1;8889for rg in &metadata.row_groups {90if rg.num_rows() > num_offset_remaining {91start_offset = num_offset_remaining;92num_length_remaining = num_length_remaining93.saturating_sub(rg.num_rows() - num_offset_remaining);94break;95}9697row_offset += rg.num_rows();98num_offset_remaining -= rg.num_rows();99start += 1;100}101102let mut end = start + 1;103104while num_length_remaining > 0 {105num_length_remaining =106num_length_remaining.saturating_sub(metadata.row_groups[end].num_rows());107end += 1;108}109110slice_range = Some(start_offset..start_offset + pre_slice.1);111row_group_slice = start..end;112113if verbose {114eprintln!(115"[ParquetFileReader]: Slice pushdown: \116reading {} / {} row groups",117row_group_slice.len(),118metadata.row_groups.len()119);120}121}122123let row_group_mask = calculate_row_group_pred_pushdown_skip_mask(124row_group_slice.clone(),125use_statistics,126predicate.as_ref(),127&metadata,128projected_arrow_fields.clone(),129row_index,130verbose,131)132.await?;133134let mut row_group_data_fetcher = RowGroupDataFetcher {135projection: projected_arrow_fields.clone(),136is_full_projection,137predicate,138slice_range,139memory_prefetch_func,140metadata,141byte_source,142row_group_slice,143row_group_mask,144row_offset,145};146147if let Some(rg_prefetch_prev_all_spawned) = rg_prefetch_prev_all_spawned {148rg_prefetch_prev_all_spawned.wait().await;149}150151loop {152let fetch_permit = rg_prefetch_semaphore.clone().acquire_owned().await.unwrap();153154let Some(prefetch) = row_group_data_fetcher.next().await else {155break;156};157158if prefetch_send.send((prefetch?, fetch_permit)).await.is_err() {159break;160}161}162163drop(rg_prefetch_current_all_spawned);164165PolarsResult::Ok(())166}));167168// Decode loop (spawns decodes on the computational executor).169let (decode_send, mut decode_recv) = tokio::sync::mpsc::channel(self.config.num_pipelines);170let decode_task = AbortOnDropHandle(io_runtime.spawn(async move {171while let Some((prefetch_task, permit)) = prefetch_recv.recv().await {172let row_group_data = prefetch_task.await.unwrap()?;173let row_group_decoder = row_group_decoder.clone();174let decode_fut = async_executor::spawn(TaskPriority::High, async move {175row_group_decoder.row_group_data_to_df(row_group_data).await176});177if decode_send.send((decode_fut, permit)).await.is_err() {178break;179}180}181PolarsResult::Ok(())182}));183184// Distributes morsels across pipelines. This does not perform any CPU or I/O bound work -185// it is purely a dispatch loop. Run on the computational executor to reduce context switches.186let last_morsel_min_split = self.config.num_pipelines;187let disable_morsel_split = self.disable_morsel_split;188let distribute_task = async_executor::spawn(TaskPriority::High, async move {189let mut morsel_seq = MorselSeq::default();190// Note: We don't use this (it is handled by the bridge). But morsels require a source token.191let source_token = SourceToken::new();192193// Decode first non-empty morsel.194let mut next = None;195loop {196let Some((decode_fut, permit)) = decode_recv.recv().await else {197break;198};199let df = decode_fut.await?;200if df.height() == 0 {201continue;202}203204if disable_morsel_split {205if morsel_sender206.send_morsel(Morsel::new(df, morsel_seq, source_token.clone()))207.await208.is_err()209{210return Ok(());211}212drop(permit);213morsel_seq = morsel_seq.successor();214continue;215}216217next = Some((df, permit));218break;219}220221while let Some((df, permit)) = next.take() {222// Try to decode the next non-empty morsel first, so we know223// whether the df is the last morsel.224225// Important: Drop this before awaiting the next one, or could226// deadlock if the permit limit is 1.227drop(permit);228229loop {230let Some((decode_fut, permit)) = decode_recv.recv().await else {231break;232};233let next_df = decode_fut.await?;234if next_df.height() == 0 {235continue;236}237next = Some((next_df, permit));238break;239}240241for df in split_to_morsels(242&df,243ideal_morsel_size,244next.is_none(),245last_morsel_min_split,246) {247if morsel_sender248.send_morsel(Morsel::new(df, morsel_seq, source_token.clone()))249.await250.is_err()251{252return Ok(());253}254morsel_seq = morsel_seq.successor();255}256}257258PolarsResult::Ok(())259});260261let join_task = io_runtime.spawn(async move {262prefetch_task.await.unwrap()?;263decode_task.await.unwrap()?;264distribute_task.await?;265Ok(())266});267268(morsel_rx, AbortOnDropHandle(join_task))269}270271/// Creates a `RowGroupDecoder` that turns `RowGroupData` into DataFrames.272/// This must be called AFTER the following have been initialized:273/// * `self.projected_arrow_fields`274/// * `self.physical_predicate`275pub(super) fn init_row_group_decoder(&mut self) -> RowGroupDecoder {276let projected_arrow_fields = self.projected_arrow_fields.clone();277let row_index = self.row_index.clone();278let target_values_per_thread = self.config.target_values_per_thread;279let predicate = self.predicate.clone();280281let mut use_prefiltered = matches!(self.options.parallel, ParallelStrategy::Prefiltered);282use_prefiltered |=283predicate.is_some() && matches!(self.options.parallel, ParallelStrategy::Auto);284285let predicate_field_indices: Arc<[usize]> =286if use_prefiltered && let Some(predicate) = predicate.as_ref() {287projected_arrow_fields288.iter()289.enumerate()290.filter_map(|(i, projected_field)| {291predicate292.live_columns293.contains(projected_field.output_name())294.then_some(i)295})296.collect()297} else {298Default::default()299};300301let use_prefiltered = use_prefiltered.then(PrefilterMaskSetting::init_from_env);302303let non_predicate_field_indices: Arc<[usize]> = if use_prefiltered.is_some() {304filtered_range(305predicate_field_indices.as_ref(),306projected_arrow_fields.len(),307)308.collect()309} else {310Default::default()311};312313if use_prefiltered.is_some() && self.verbose {314eprintln!(315"[ParquetFileReader]: Pre-filtered decode enabled ({} live, {} non-live)",316predicate_field_indices.len(),317non_predicate_field_indices.len()318)319}320321let allow_column_predicates = predicate322.as_ref()323.is_some_and(|x| x.column_predicates.is_sumwise_complete)324&& row_index.is_none()325&& !projected_arrow_fields.iter().any(|x| {326x.arrow_field().dtype().is_nested()327|| matches!(x, ArrowFieldProjection::Mapped { .. })328});329330RowGroupDecoder {331num_pipelines: self.config.num_pipelines,332projected_arrow_fields,333row_index,334predicate,335allow_column_predicates,336use_prefiltered,337predicate_field_indices,338non_predicate_field_indices,339target_values_per_thread,340}341}342}343344/// Returns 0..len in a Vec, excluding indices in `exclude`.345/// `exclude` needs to be a sorted list of unique values.346fn filtered_range(exclude: &[usize], len: usize) -> impl Iterator<Item = usize> {347if cfg!(debug_assertions) {348assert!(exclude.windows(2).all(|x| x[1] > x[0]));349}350351let mut j = 0;352353(0..len).filter(move |&i| {354if j == exclude.len() || i != exclude[j] {355true356} else {357j += 1;358false359}360})361}362363pub(crate) fn split_to_morsels(364df: &DataFrame,365ideal_morsel_size: usize,366last_morsel: bool,367last_morsel_min_split: usize,368) -> impl Iterator<Item = DataFrame> + '_ {369let mut n_morsels = if df.height() > 3 * ideal_morsel_size / 2 {370// num_rows > (1.5 * ideal_morsel_size)371(df.height() / ideal_morsel_size).max(2)372} else {3731374};375376if last_morsel {377n_morsels = n_morsels.max(last_morsel_min_split);378}379380let rows_per_morsel = df.height().div_ceil(n_morsels).max(1);381382(0..i64::try_from(df.height()).unwrap())383.step_by(rows_per_morsel)384.map(move |offset| df.slice(offset, rows_per_morsel))385.filter(|df| df.height() > 0)386}387388mod tests {389390#[test]391fn test_filtered_range() {392use super::filtered_range;393assert_eq!(394filtered_range(&[1, 3], 7).collect::<Vec<_>>().as_slice(),395&[0, 2, 4, 5, 6]396);397assert_eq!(398filtered_range(&[1, 6], 7).collect::<Vec<_>>().as_slice(),399&[0, 2, 3, 4, 5]400);401}402}403404405