Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/dsl/expr/anonymous/agg.rs
8430 views
1
use std::any::Any;
2
use std::hash::{Hash, Hasher};
3
use std::sync::Arc;
4
5
use polars_core::prelude::Field;
6
use polars_core::schema::Schema;
7
use polars_error::{PolarsResult, feature_gated};
8
9
use super::SpecialEq;
10
use crate::dsl::LazySerde;
11
12
pub trait AnonymousAgg: Send + Sync {
13
fn as_any(&self) -> &dyn Any;
14
15
fn get_field(&self, input_schema: &Schema, fields: &[Field]) -> PolarsResult<Field>;
16
}
17
18
pub type OpaqueStreamingAgg = LazySerde<SpecialEq<Arc<dyn AnonymousAgg>>>;
19
20
impl OpaqueStreamingAgg {
21
pub fn materialize(&self) -> PolarsResult<SpecialEq<Arc<dyn AnonymousAgg>>> {
22
match self {
23
Self::Deserialized(t) => Ok(t.clone()),
24
Self::Named {
25
name,
26
payload,
27
value,
28
} => feature_gated!("serde", {
29
use super::named_serde::NAMED_SERDE_REGISTRY_EXPR;
30
match value {
31
Some(v) => Ok(v.clone()),
32
None => Ok(SpecialEq::new(
33
NAMED_SERDE_REGISTRY_EXPR
34
.read()
35
.unwrap()
36
.as_ref()
37
.expect("NAMED EXPR REGISTRY NOT SET")
38
.get_agg(name, payload.as_ref().unwrap())?
39
.expect("NAMED AGG NOT FOUND"),
40
)),
41
}
42
}),
43
Self::Bytes(_b) => {
44
feature_gated!("serde", {
45
use crate::dsl::anonymous::serde_expr;
46
serde_expr::deserialize_anon_agg(_b.as_ref()).map(SpecialEq::new)
47
})
48
},
49
}
50
}
51
}
52
53
impl Hash for OpaqueStreamingAgg {
54
fn hash<H: Hasher>(&self, state: &mut H) {
55
core::mem::discriminant(self).hash(state);
56
match self {
57
Self::Deserialized(ptr) => ptr.hash(state),
58
Self::Bytes(b) => b.hash(state),
59
Self::Named {
60
name,
61
payload,
62
value: _,
63
} => {
64
name.hash(state);
65
payload.hash(state);
66
},
67
}
68
}
69
}
70
71