Path: blob/main/crates/polars-expr/src/state/execution_state.rs
6940 views
use std::borrow::Cow;1use std::sync::atomic::{AtomicI64, Ordering};2use std::sync::{Mutex, RwLock};3use std::time::Duration;45use bitflags::bitflags;6use polars_core::config::verbose;7use polars_core::prelude::*;8use polars_ops::prelude::ChunkJoinOptIds;9use polars_utils::relaxed_cell::RelaxedCell;10use polars_utils::unique_id::UniqueId;1112use super::NodeTimer;1314pub type JoinTuplesCache = Arc<Mutex<PlHashMap<String, ChunkJoinOptIds>>>;1516#[derive(Default)]17pub struct WindowCache {18groups: RwLock<PlHashMap<String, GroupPositions>>,19join_tuples: RwLock<PlHashMap<String, Arc<ChunkJoinOptIds>>>,20map_idx: RwLock<PlHashMap<String, Arc<IdxCa>>>,21}2223impl WindowCache {24pub(crate) fn clear(&self) {25let mut g = self.groups.write().unwrap();26g.clear();27let mut g = self.join_tuples.write().unwrap();28g.clear();29}3031pub fn get_groups(&self, key: &str) -> Option<GroupPositions> {32let g = self.groups.read().unwrap();33g.get(key).cloned()34}3536pub fn insert_groups(&self, key: String, groups: GroupPositions) {37let mut g = self.groups.write().unwrap();38g.insert(key, groups);39}4041pub fn get_join(&self, key: &str) -> Option<Arc<ChunkJoinOptIds>> {42let g = self.join_tuples.read().unwrap();43g.get(key).cloned()44}4546pub fn insert_join(&self, key: String, join_tuples: Arc<ChunkJoinOptIds>) {47let mut g = self.join_tuples.write().unwrap();48g.insert(key, join_tuples);49}5051pub fn get_map(&self, key: &str) -> Option<Arc<IdxCa>> {52let g = self.map_idx.read().unwrap();53g.get(key).cloned()54}5556pub fn insert_map(&self, key: String, idx: Arc<IdxCa>) {57let mut g = self.map_idx.write().unwrap();58g.insert(key, idx);59}60}6162bitflags! {63#[repr(transparent)]64#[derive(Copy, Clone)]65pub(super) struct StateFlags: u8 {66/// More verbose logging67const VERBOSE = 0x01;68/// Indicates that window expression's [`GroupTuples`] may be cached.69const CACHE_WINDOW_EXPR = 0x02;70/// Indicates the expression has a window function71const HAS_WINDOW = 0x04;72}73}7475impl Default for StateFlags {76fn default() -> Self {77StateFlags::CACHE_WINDOW_EXPR78}79}8081impl StateFlags {82fn init() -> Self {83let verbose = verbose();84let mut flags: StateFlags = Default::default();85if verbose {86flags |= StateFlags::VERBOSE;87}88flags89}90fn as_u8(self) -> u8 {91unsafe { std::mem::transmute(self) }92}93}9495impl From<u8> for StateFlags {96fn from(value: u8) -> Self {97unsafe { std::mem::transmute(value) }98}99}100101struct CachedValue {102/// The number of times the cache will still be read.103/// Zero means that there will be no more reads and the cache can be dropped.104remaining_hits: AtomicI64,105df: DataFrame,106}107108/// State/ cache that is maintained during the Execution of the physical plan.109#[derive(Clone)]110pub struct ExecutionState {111// cached by a `.cache` call and kept in memory for the duration of the plan.112df_cache: Arc<RwLock<PlHashMap<UniqueId, Arc<CachedValue>>>>,113pub schema_cache: Arc<RwLock<Option<SchemaRef>>>,114/// Used by Window Expressions to cache intermediate state115pub window_cache: Arc<WindowCache>,116// every join/union split gets an increment to distinguish between schema state117pub branch_idx: usize,118pub flags: RelaxedCell<u8>,119pub ext_contexts: Arc<Vec<DataFrame>>,120node_timer: Option<NodeTimer>,121stop: Arc<RelaxedCell<bool>>,122}123124impl ExecutionState {125pub fn new() -> Self {126let mut flags: StateFlags = Default::default();127if verbose() {128flags |= StateFlags::VERBOSE;129}130Self {131df_cache: Default::default(),132schema_cache: Default::default(),133window_cache: Default::default(),134branch_idx: 0,135flags: RelaxedCell::from(StateFlags::init().as_u8()),136ext_contexts: Default::default(),137node_timer: None,138stop: Arc::new(RelaxedCell::from(false)),139}140}141142/// Toggle this to measure execution times.143pub fn time_nodes(&mut self, start: std::time::Instant) {144self.node_timer = Some(NodeTimer::new(start))145}146pub fn has_node_timer(&self) -> bool {147self.node_timer.is_some()148}149150pub fn finish_timer(self) -> PolarsResult<DataFrame> {151self.node_timer.unwrap().finish()152}153154// Timings should be a list of (start, end, name) where the start155// and end are raw durations since the query start as nanoseconds.156pub fn record_raw_timings(&self, timings: &[(u64, u64, String)]) {157for &(start, end, ref name) in timings {158self.node_timer.as_ref().unwrap().store_duration(159Duration::from_nanos(start),160Duration::from_nanos(end),161name.to_string(),162);163}164}165166// This is wrong when the U64 overflows which will never happen.167pub fn should_stop(&self) -> PolarsResult<()> {168try_raise_keyboard_interrupt();169polars_ensure!(!self.stop.load(), ComputeError: "query interrupted");170Ok(())171}172173pub fn cancel_token(&self) -> Arc<RelaxedCell<bool>> {174self.stop.clone()175}176177pub fn record<T, F: FnOnce() -> T>(&self, func: F, name: Cow<'static, str>) -> T {178match &self.node_timer {179None => func(),180Some(timer) => {181let start = std::time::Instant::now();182let out = func();183let end = std::time::Instant::now();184185timer.store(start, end, name.as_ref().to_string());186out187},188}189}190191/// Partially clones and partially clears state192/// This should be used when splitting a node, like a join or union193pub fn split(&self) -> Self {194Self {195df_cache: self.df_cache.clone(),196schema_cache: Default::default(),197window_cache: Default::default(),198branch_idx: self.branch_idx,199flags: self.flags.clone(),200ext_contexts: self.ext_contexts.clone(),201node_timer: self.node_timer.clone(),202stop: self.stop.clone(),203}204}205206pub fn set_schema(&self, schema: SchemaRef) {207let mut lock = self.schema_cache.write().unwrap();208*lock = Some(schema);209}210211/// Clear the schema. Typically at the end of a projection.212pub fn clear_schema_cache(&self) {213let mut lock = self.schema_cache.write().unwrap();214*lock = None;215}216217/// Get the schema.218pub fn get_schema(&self) -> Option<SchemaRef> {219let lock = self.schema_cache.read().unwrap();220lock.clone()221}222223pub fn set_df_cache(&self, id: &UniqueId, df: DataFrame, cache_hits: u32) {224if self.verbose() {225eprintln!("CACHE SET: cache id: {id}");226}227228let value = Arc::new(CachedValue {229remaining_hits: AtomicI64::new(cache_hits as i64),230df,231});232233let prev = self.df_cache.write().unwrap().insert(*id, value);234assert!(prev.is_none(), "duplicate set cache: {id}");235}236237pub fn get_df_cache(&self, id: &UniqueId) -> DataFrame {238let guard = self.df_cache.read().unwrap();239let value = guard.get(id).expect("cache not prefilled");240let remaining = value.remaining_hits.fetch_sub(1, Ordering::Relaxed);241if remaining < 0 {242panic!("cache used more times than expected: {id}");243}244if self.verbose() {245eprintln!("CACHE HIT: cache id: {id}");246}247if remaining == 1 {248drop(guard);249let value = self.df_cache.write().unwrap().remove(id).unwrap();250if self.verbose() {251eprintln!("CACHE DROP: cache id: {id}");252}253Arc::into_inner(value).unwrap().df254} else {255value.df.clone()256}257}258259/// Clear the cache used by the Window expressions260pub fn clear_window_expr_cache(&self) {261self.window_cache.clear();262}263264fn set_flags(&self, f: &dyn Fn(StateFlags) -> StateFlags) {265let flags: StateFlags = self.flags.load().into();266let flags = f(flags);267self.flags.store(flags.as_u8());268}269270/// Indicates that window expression's [`GroupTuples`] may be cached.271pub fn cache_window(&self) -> bool {272let flags: StateFlags = self.flags.load().into();273flags.contains(StateFlags::CACHE_WINDOW_EXPR)274}275276/// Indicates that window expression's [`GroupTuples`] may be cached.277pub fn has_window(&self) -> bool {278let flags: StateFlags = self.flags.load().into();279flags.contains(StateFlags::HAS_WINDOW)280}281282/// More verbose logging283pub fn verbose(&self) -> bool {284let flags: StateFlags = self.flags.load().into();285flags.contains(StateFlags::VERBOSE)286}287288pub fn remove_cache_window_flag(&mut self) {289self.set_flags(&|mut flags| {290flags.remove(StateFlags::CACHE_WINDOW_EXPR);291flags292});293}294295pub fn insert_cache_window_flag(&mut self) {296self.set_flags(&|mut flags| {297flags.insert(StateFlags::CACHE_WINDOW_EXPR);298flags299});300}301// this will trigger some conservative302pub fn insert_has_window_function_flag(&mut self) {303self.set_flags(&|mut flags| {304flags.insert(StateFlags::HAS_WINDOW);305flags306});307}308}309310impl Default for ExecutionState {311fn default() -> Self {312ExecutionState::new()313}314}315316317