Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/executor.rs
6940 views
1
use super::*;
2
3
// Executor are the executors of the physical plan and produce DataFrames. They
4
// combine physical expressions, which produce Series.
5
6
/// Executors will evaluate physical expressions and collect them in a DataFrame.
7
///
8
/// Executors have other executors as input. By having a tree of executors we can execute the
9
/// physical plan until the last executor is evaluated.
10
pub trait Executor: Send + Sync {
11
fn execute(&mut self, cache: &mut ExecutionState) -> PolarsResult<DataFrame>;
12
13
fn is_cache_prefiller(&self) -> bool {
14
false
15
}
16
}
17
18
type SinkFn =
19
Box<dyn FnMut(DataFrame, &mut ExecutionState) -> PolarsResult<Option<DataFrame>> + Send + Sync>;
20
pub struct SinkExecutor {
21
pub name: String,
22
pub input: Box<dyn Executor>,
23
pub f: SinkFn,
24
}
25
26
impl Executor for SinkExecutor {
27
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
28
state.should_stop()?;
29
#[cfg(debug_assertions)]
30
{
31
if state.verbose() {
32
eprintln!("run sink_{}", self.name)
33
}
34
}
35
let df = self.input.execute(state)?;
36
37
let profile_name = if state.has_node_timer() {
38
Cow::Owned(format!(".sink_{}()", &self.name))
39
} else {
40
Cow::Borrowed("")
41
};
42
43
state.clone().record(
44
|| (self.f)(df, state).map(|df| df.unwrap_or_else(DataFrame::empty)),
45
profile_name,
46
)
47
}
48
}
49
50
pub struct Dummy {}
51
impl Executor for Dummy {
52
fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
53
panic!("should not get here");
54
}
55
}
56
57
impl Default for Box<dyn Executor> {
58
fn default() -> Self {
59
Box::new(Dummy {})
60
}
61
}
62
63