Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/cache.rs
8416 views
1
#[cfg(feature = "async")]
2
use polars_io::pl_async;
3
use polars_utils::unique_id::UniqueId;
4
5
use super::*;
6
7
pub struct CachePrefill {
8
input: Box<dyn Executor>,
9
id: UniqueId,
10
hit_count: u32,
11
/// Signals that this is a scan executed async in the streaming engine and needs extra handling
12
is_new_streaming_scan: bool,
13
}
14
15
impl CachePrefill {
16
pub fn new_cache(input: Box<dyn Executor>, id: UniqueId) -> Self {
17
Self {
18
input,
19
id,
20
hit_count: 0,
21
is_new_streaming_scan: false,
22
}
23
}
24
25
pub fn id(&self) -> UniqueId {
26
self.id
27
}
28
29
/// Returns an executor that will read the prefilled cache.
30
/// Increments the cache hit count.
31
pub fn make_exec(&mut self) -> CacheExec {
32
self.hit_count += 1;
33
CacheExec { id: self.id }
34
}
35
}
36
37
impl Executor for CachePrefill {
38
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
39
let df = self.input.execute(state)?;
40
state.set_df_cache(&self.id, df, self.hit_count);
41
Ok(DataFrame::empty())
42
}
43
}
44
45
pub struct CacheExec {
46
id: UniqueId,
47
}
48
49
impl Executor for CacheExec {
50
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
51
Ok(state.get_df_cache(&self.id))
52
}
53
}
54
55
pub struct CachePrefiller {
56
pub caches: PlIndexMap<UniqueId, CachePrefill>,
57
pub phys_plan: Box<dyn Executor>,
58
}
59
60
impl Executor for CachePrefiller {
61
fn is_cache_prefiller(&self) -> bool {
62
true
63
}
64
65
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
66
if state.verbose() {
67
eprintln!("PREFILL CACHES")
68
}
69
70
#[cfg(feature = "async")]
71
let parallel_scan_exec_limit = {
72
// Note, this needs to be less than the size of the tokio blocking threadpool (which
73
// defaults to 512).
74
let parallel_scan_exec_limit = POOL.current_num_threads().min(128);
75
76
if state.verbose() {
77
eprintln!(
78
"CachePrefiller: concurrent streaming scan exec limit: {parallel_scan_exec_limit}"
79
)
80
}
81
82
Arc::new(tokio::sync::Semaphore::new(parallel_scan_exec_limit))
83
};
84
85
#[cfg(feature = "async")]
86
let mut scan_handles: Vec<tokio::task::JoinHandle<PolarsResult<()>>> = vec![];
87
88
// Ensure we traverse in discovery order. This will ensure that caches aren't dependent on each
89
// other.
90
for (_, mut prefill) in self.caches.drain(..) {
91
assert_ne!(
92
prefill.hit_count,
93
0,
94
"cache without execs: {}",
95
prefill.id()
96
);
97
98
let mut state = state.split();
99
state.branch_idx += 1;
100
101
#[cfg(feature = "async")]
102
if prefill.is_new_streaming_scan {
103
let parallel_scan_exec_limit = parallel_scan_exec_limit.clone();
104
105
scan_handles.push(pl_async::get_runtime().spawn(async move {
106
let _permit = parallel_scan_exec_limit.acquire().await.unwrap();
107
108
tokio::task::spawn_blocking(move || {
109
prefill.execute(&mut state)?;
110
111
Ok(())
112
})
113
.await
114
.unwrap()
115
}));
116
117
continue;
118
}
119
120
// This cache node may have dependency on the in-progress scan nodes,
121
// ensure all of them complete here.
122
123
#[cfg(feature = "async")]
124
if state.verbose() && !scan_handles.is_empty() {
125
eprintln!(
126
"CachePrefiller: wait for {} scan executors",
127
scan_handles.len()
128
)
129
}
130
131
#[cfg(feature = "async")]
132
for handle in scan_handles.drain(..) {
133
pl_async::get_runtime().block_on(handle).unwrap()?;
134
}
135
136
let _df = prefill.execute(&mut state)?;
137
}
138
139
#[cfg(feature = "async")]
140
if state.verbose() && !scan_handles.is_empty() {
141
eprintln!(
142
"CachePrefiller: wait for {} scan executors",
143
scan_handles.len()
144
)
145
}
146
147
#[cfg(feature = "async")]
148
for handle in scan_handles {
149
pl_async::get_runtime().block_on(handle).unwrap()?;
150
}
151
152
if state.verbose() {
153
eprintln!("EXECUTE PHYS PLAN")
154
}
155
156
self.phys_plan.execute(state)
157
}
158
}
159
160