Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/expression.rs
6939 views
1
use std::sync::Arc;
2
3
use polars_core::frame::DataFrame;
4
use polars_core::prelude::Column;
5
use polars_error::PolarsResult;
6
use polars_expr::prelude::{ExecutionState, PhysicalExpr};
7
8
#[derive(Clone)]
9
pub struct StreamExpr {
10
inner: Arc<dyn PhysicalExpr>,
11
// Whether the expression can be re-entering the engine (e.g. a function use the lazy api
12
// within that function)
13
reentrant: bool,
14
}
15
16
impl StreamExpr {
17
pub fn new(phys_expr: Arc<dyn PhysicalExpr>, reentrant: bool) -> Self {
18
Self {
19
inner: phys_expr,
20
reentrant,
21
}
22
}
23
24
pub async fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
25
if self.reentrant {
26
let state = state.clone();
27
let phys_expr = self.inner.clone();
28
let df = df.clone();
29
polars_io::pl_async::get_runtime()
30
.spawn_blocking(move || phys_expr.evaluate(&df, &state))
31
.await
32
.unwrap()
33
} else {
34
self.inner.evaluate(df, state)
35
}
36
}
37
38
pub fn evaluate_blocking(
39
&self,
40
df: &DataFrame,
41
state: &ExecutionState,
42
) -> PolarsResult<Column> {
43
self.inner.evaluate(df, state)
44
}
45
}
46
47