Path: blob/main/crates/polars-mem-engine/src/executors/cache.rs
8416 views
#[cfg(feature = "async")]1use polars_io::pl_async;2use polars_utils::unique_id::UniqueId;34use super::*;56pub struct CachePrefill {7input: Box<dyn Executor>,8id: UniqueId,9hit_count: u32,10/// Signals that this is a scan executed async in the streaming engine and needs extra handling11is_new_streaming_scan: bool,12}1314impl CachePrefill {15pub fn new_cache(input: Box<dyn Executor>, id: UniqueId) -> Self {16Self {17input,18id,19hit_count: 0,20is_new_streaming_scan: false,21}22}2324pub fn id(&self) -> UniqueId {25self.id26}2728/// Returns an executor that will read the prefilled cache.29/// Increments the cache hit count.30pub fn make_exec(&mut self) -> CacheExec {31self.hit_count += 1;32CacheExec { id: self.id }33}34}3536impl Executor for CachePrefill {37fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {38let df = self.input.execute(state)?;39state.set_df_cache(&self.id, df, self.hit_count);40Ok(DataFrame::empty())41}42}4344pub struct CacheExec {45id: UniqueId,46}4748impl Executor for CacheExec {49fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {50Ok(state.get_df_cache(&self.id))51}52}5354pub struct CachePrefiller {55pub caches: PlIndexMap<UniqueId, CachePrefill>,56pub phys_plan: Box<dyn Executor>,57}5859impl Executor for CachePrefiller {60fn is_cache_prefiller(&self) -> bool {61true62}6364fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {65if state.verbose() {66eprintln!("PREFILL CACHES")67}6869#[cfg(feature = "async")]70let parallel_scan_exec_limit = {71// Note, this needs to be less than the size of the tokio blocking threadpool (which72// defaults to 512).73let parallel_scan_exec_limit = POOL.current_num_threads().min(128);7475if state.verbose() {76eprintln!(77"CachePrefiller: concurrent streaming scan exec limit: {parallel_scan_exec_limit}"78)79}8081Arc::new(tokio::sync::Semaphore::new(parallel_scan_exec_limit))82};8384#[cfg(feature = "async")]85let mut scan_handles: Vec<tokio::task::JoinHandle<PolarsResult<()>>> = vec![];8687// Ensure we traverse in discovery order. This will ensure that caches aren't dependent on each88// other.89for (_, mut prefill) in self.caches.drain(..) {90assert_ne!(91prefill.hit_count,920,93"cache without execs: {}",94prefill.id()95);9697let mut state = state.split();98state.branch_idx += 1;99100#[cfg(feature = "async")]101if prefill.is_new_streaming_scan {102let parallel_scan_exec_limit = parallel_scan_exec_limit.clone();103104scan_handles.push(pl_async::get_runtime().spawn(async move {105let _permit = parallel_scan_exec_limit.acquire().await.unwrap();106107tokio::task::spawn_blocking(move || {108prefill.execute(&mut state)?;109110Ok(())111})112.await113.unwrap()114}));115116continue;117}118119// This cache node may have dependency on the in-progress scan nodes,120// ensure all of them complete here.121122#[cfg(feature = "async")]123if state.verbose() && !scan_handles.is_empty() {124eprintln!(125"CachePrefiller: wait for {} scan executors",126scan_handles.len()127)128}129130#[cfg(feature = "async")]131for handle in scan_handles.drain(..) {132pl_async::get_runtime().block_on(handle).unwrap()?;133}134135let _df = prefill.execute(&mut state)?;136}137138#[cfg(feature = "async")]139if state.verbose() && !scan_handles.is_empty() {140eprintln!(141"CachePrefiller: wait for {} scan executors",142scan_handles.len()143)144}145146#[cfg(feature = "async")]147for handle in scan_handles {148pl_async::get_runtime().block_on(handle).unwrap()?;149}150151if state.verbose() {152eprintln!("EXECUTE PHYS PLAN")153}154155self.phys_plan.execute(state)156}157}158159160