Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/cache.rs
6940 views
1
#[cfg(feature = "async")]
2
use polars_io::pl_async;
3
use polars_utils::unique_id::UniqueId;
4
5
use super::*;
6
7
pub struct CachePrefill {
8
input: Box<dyn Executor>,
9
id: UniqueId,
10
hit_count: u32,
11
/// Signals that this is a scan executed async in the streaming engine and needs extra handling
12
is_new_streaming_scan: bool,
13
}
14
15
impl CachePrefill {
16
pub fn new_cache(input: Box<dyn Executor>, id: UniqueId) -> Self {
17
Self {
18
input,
19
id,
20
hit_count: 0,
21
is_new_streaming_scan: false,
22
}
23
}
24
25
pub fn new_scan(input: Box<dyn Executor>) -> Self {
26
Self {
27
input,
28
id: UniqueId::new(),
29
hit_count: 0,
30
is_new_streaming_scan: true,
31
}
32
}
33
34
pub fn new_sink(input: Box<dyn Executor>) -> Self {
35
Self {
36
input,
37
id: UniqueId::new(),
38
hit_count: 0,
39
is_new_streaming_scan: false,
40
}
41
}
42
43
pub fn id(&self) -> UniqueId {
44
self.id
45
}
46
47
/// Returns an executor that will read the prefilled cache.
48
/// Increments the cache hit count.
49
pub fn make_exec(&mut self) -> CacheExec {
50
self.hit_count += 1;
51
CacheExec { id: self.id }
52
}
53
}
54
55
impl Executor for CachePrefill {
56
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
57
let df = self.input.execute(state)?;
58
state.set_df_cache(&self.id, df, self.hit_count);
59
Ok(DataFrame::empty())
60
}
61
}
62
63
pub struct CacheExec {
64
id: UniqueId,
65
}
66
67
impl Executor for CacheExec {
68
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
69
Ok(state.get_df_cache(&self.id))
70
}
71
}
72
73
pub struct CachePrefiller {
74
pub caches: PlIndexMap<UniqueId, CachePrefill>,
75
pub phys_plan: Box<dyn Executor>,
76
}
77
78
impl Executor for CachePrefiller {
79
fn is_cache_prefiller(&self) -> bool {
80
true
81
}
82
83
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
84
if state.verbose() {
85
eprintln!("PREFILL CACHES")
86
}
87
88
#[cfg(feature = "async")]
89
let parallel_scan_exec_limit = {
90
// Note, this needs to be less than the size of the tokio blocking threadpool (which
91
// defaults to 512).
92
let parallel_scan_exec_limit = POOL.current_num_threads().min(128);
93
94
if state.verbose() {
95
eprintln!(
96
"CachePrefiller: concurrent streaming scan exec limit: {parallel_scan_exec_limit}"
97
)
98
}
99
100
Arc::new(tokio::sync::Semaphore::new(parallel_scan_exec_limit))
101
};
102
103
#[cfg(feature = "async")]
104
let mut scan_handles: Vec<tokio::task::JoinHandle<PolarsResult<()>>> = vec![];
105
106
// Ensure we traverse in discovery order. This will ensure that caches aren't dependent on each
107
// other.
108
for (_, mut prefill) in self.caches.drain(..) {
109
assert_ne!(
110
prefill.hit_count,
111
0,
112
"cache without execs: {}",
113
prefill.id()
114
);
115
116
let mut state = state.split();
117
state.branch_idx += 1;
118
119
#[cfg(feature = "async")]
120
if prefill.is_new_streaming_scan {
121
let parallel_scan_exec_limit = parallel_scan_exec_limit.clone();
122
123
scan_handles.push(pl_async::get_runtime().spawn(async move {
124
let _permit = parallel_scan_exec_limit.acquire().await.unwrap();
125
126
tokio::task::spawn_blocking(move || {
127
prefill.execute(&mut state)?;
128
129
Ok(())
130
})
131
.await
132
.unwrap()
133
}));
134
135
continue;
136
}
137
138
// This cache node may have dependency on the in-progress scan nodes,
139
// ensure all of them complete here.
140
141
#[cfg(feature = "async")]
142
if state.verbose() && !scan_handles.is_empty() {
143
eprintln!(
144
"CachePrefiller: wait for {} scan executors",
145
scan_handles.len()
146
)
147
}
148
149
#[cfg(feature = "async")]
150
for handle in scan_handles.drain(..) {
151
pl_async::get_runtime().block_on(handle).unwrap()?;
152
}
153
154
let _df = prefill.execute(&mut state)?;
155
}
156
157
#[cfg(feature = "async")]
158
if state.verbose() && !scan_handles.is_empty() {
159
eprintln!(
160
"CachePrefiller: wait for {} scan executors",
161
scan_handles.len()
162
)
163
}
164
165
#[cfg(feature = "async")]
166
for handle in scan_handles {
167
pl_async::get_runtime().block_on(handle).unwrap()?;
168
}
169
170
if state.verbose() {
171
eprintln!("EXECUTE PHYS PLAN")
172
}
173
174
self.phys_plan.execute(state)
175
}
176
}
177
178