Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/expression.rs
8406 views
1
use std::sync::Arc;
2
3
use polars_core::frame::DataFrame;
4
use polars_core::prelude::{Column, GroupPositions};
5
use polars_error::{PolarsResult, polars_bail};
6
use polars_expr::prelude::{AggregationContext, ExecutionState, PhysicalExpr};
7
8
#[derive(Clone)]
9
pub struct StreamExpr {
10
inner: Arc<dyn PhysicalExpr>,
11
// Whether the expression can be re-entering the engine (e.g. a function use the lazy api
12
// within that function)
13
reentrant: bool,
14
}
15
16
impl StreamExpr {
17
pub fn new(phys_expr: Arc<dyn PhysicalExpr>, reentrant: bool) -> Self {
18
Self {
19
inner: phys_expr,
20
reentrant,
21
}
22
}
23
24
pub async fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
25
if self.reentrant {
26
let state = state.clone();
27
let phys_expr = self.inner.clone();
28
let df = df.clone();
29
polars_io::pl_async::get_runtime()
30
.spawn_blocking(move || phys_expr.evaluate(&df, &state))
31
.await
32
.unwrap()
33
} else {
34
self.inner.evaluate(df, state)
35
}
36
}
37
38
/// Broadcasts unit-length results to df height. Errors if length does not match df height otherwise.
39
pub async fn evaluate_preserve_len_broadcast(
40
&self,
41
df: &DataFrame,
42
state: &ExecutionState,
43
) -> PolarsResult<Column> {
44
let mut c = self.evaluate(df, state).await?;
45
46
if c.len() != df.height() {
47
if c.len() != 1 {
48
polars_bail!(
49
ShapeMismatch:
50
"expression result length {} does not match df height {}",
51
c.len(), df.height(),
52
)
53
}
54
55
c = c.new_from_index(0, df.height());
56
}
57
58
Ok(c)
59
}
60
61
pub fn evaluate_blocking(
62
&self,
63
df: &DataFrame,
64
state: &ExecutionState,
65
) -> PolarsResult<Column> {
66
self.inner.evaluate(df, state)
67
}
68
69
pub async fn evaluate_on_groups<'a>(
70
&self,
71
df: &DataFrame,
72
groups: &'a GroupPositions,
73
state: &ExecutionState,
74
) -> PolarsResult<AggregationContext<'a>> {
75
if self.reentrant {
76
let state = state.split();
77
// @NOTE: Clones only the Arc, relatively cheap.
78
let groups = <GroupPositions as Clone>::clone(groups);
79
let phys_expr = self.inner.clone();
80
let df = df.clone();
81
polars_io::pl_async::get_runtime()
82
.spawn_blocking(move || {
83
Ok(phys_expr
84
.evaluate_on_groups(&df, &groups, &state)?
85
.into_static())
86
})
87
.await
88
.unwrap()
89
} else {
90
self.inner.evaluate_on_groups(df, groups, state)
91
}
92
}
93
}
94
95