Path: blob/main/crates/polars-mem-engine/src/executors/cache.rs
6940 views
#[cfg(feature = "async")]1use polars_io::pl_async;2use polars_utils::unique_id::UniqueId;34use super::*;56pub struct CachePrefill {7input: Box<dyn Executor>,8id: UniqueId,9hit_count: u32,10/// Signals that this is a scan executed async in the streaming engine and needs extra handling11is_new_streaming_scan: bool,12}1314impl CachePrefill {15pub fn new_cache(input: Box<dyn Executor>, id: UniqueId) -> Self {16Self {17input,18id,19hit_count: 0,20is_new_streaming_scan: false,21}22}2324pub fn new_scan(input: Box<dyn Executor>) -> Self {25Self {26input,27id: UniqueId::new(),28hit_count: 0,29is_new_streaming_scan: true,30}31}3233pub fn new_sink(input: Box<dyn Executor>) -> Self {34Self {35input,36id: UniqueId::new(),37hit_count: 0,38is_new_streaming_scan: false,39}40}4142pub fn id(&self) -> UniqueId {43self.id44}4546/// Returns an executor that will read the prefilled cache.47/// Increments the cache hit count.48pub fn make_exec(&mut self) -> CacheExec {49self.hit_count += 1;50CacheExec { id: self.id }51}52}5354impl Executor for CachePrefill {55fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {56let df = self.input.execute(state)?;57state.set_df_cache(&self.id, df, self.hit_count);58Ok(DataFrame::empty())59}60}6162pub struct CacheExec {63id: UniqueId,64}6566impl Executor for CacheExec {67fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {68Ok(state.get_df_cache(&self.id))69}70}7172pub struct CachePrefiller {73pub caches: PlIndexMap<UniqueId, CachePrefill>,74pub phys_plan: Box<dyn Executor>,75}7677impl Executor for CachePrefiller {78fn is_cache_prefiller(&self) -> bool {79true80}8182fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {83if state.verbose() {84eprintln!("PREFILL CACHES")85}8687#[cfg(feature = "async")]88let parallel_scan_exec_limit = {89// Note, this needs to be less than the size of the tokio blocking threadpool (which90// defaults to 512).91let parallel_scan_exec_limit = POOL.current_num_threads().min(128);9293if state.verbose() {94eprintln!(95"CachePrefiller: concurrent streaming scan exec limit: {parallel_scan_exec_limit}"96)97}9899Arc::new(tokio::sync::Semaphore::new(parallel_scan_exec_limit))100};101102#[cfg(feature = "async")]103let mut scan_handles: Vec<tokio::task::JoinHandle<PolarsResult<()>>> = vec![];104105// Ensure we traverse in discovery order. This will ensure that caches aren't dependent on each106// other.107for (_, mut prefill) in self.caches.drain(..) {108assert_ne!(109prefill.hit_count,1100,111"cache without execs: {}",112prefill.id()113);114115let mut state = state.split();116state.branch_idx += 1;117118#[cfg(feature = "async")]119if prefill.is_new_streaming_scan {120let parallel_scan_exec_limit = parallel_scan_exec_limit.clone();121122scan_handles.push(pl_async::get_runtime().spawn(async move {123let _permit = parallel_scan_exec_limit.acquire().await.unwrap();124125tokio::task::spawn_blocking(move || {126prefill.execute(&mut state)?;127128Ok(())129})130.await131.unwrap()132}));133134continue;135}136137// This cache node may have dependency on the in-progress scan nodes,138// ensure all of them complete here.139140#[cfg(feature = "async")]141if state.verbose() && !scan_handles.is_empty() {142eprintln!(143"CachePrefiller: wait for {} scan executors",144scan_handles.len()145)146}147148#[cfg(feature = "async")]149for handle in scan_handles.drain(..) {150pl_async::get_runtime().block_on(handle).unwrap()?;151}152153let _df = prefill.execute(&mut state)?;154}155156#[cfg(feature = "async")]157if state.verbose() && !scan_handles.is_empty() {158eprintln!(159"CachePrefiller: wait for {} scan executors",160scan_handles.len()161)162}163164#[cfg(feature = "async")]165for handle in scan_handles {166pl_async::get_runtime().block_on(handle).unwrap()?;167}168169if state.verbose() {170eprintln!("EXECUTE PHYS PLAN")171}172173self.phys_plan.execute(state)174}175}176177178