Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/dsl/expr/anonymous/expr.rs
8415 views
1
use std::fmt::{Debug, Formatter};
2
use std::hash::{Hash, Hasher};
3
use std::ops::Deref;
4
use std::sync::Arc;
5
6
use polars_core::prelude::*;
7
use polars_error::{PolarsResult, feature_gated, polars_bail};
8
9
#[cfg(feature = "serde")]
10
use super::serde_expr;
11
use crate::dsl::LazySerde;
12
13
pub trait AnonymousColumnsUdf: ColumnsUdf {
14
fn as_column_udf(self: Arc<Self>) -> Arc<dyn ColumnsUdf>;
15
fn deep_clone(self: Arc<Self>) -> Arc<dyn AnonymousColumnsUdf>;
16
17
fn try_serialize(&self, _buf: &mut Vec<u8>) -> PolarsResult<()> {
18
polars_bail!(ComputeError: "serialization not supported for this 'opaque' function")
19
}
20
21
fn get_field(&self, input_schema: &Schema, fields: &[Field]) -> PolarsResult<Field>;
22
}
23
24
/// A wrapper trait for any closure `Fn(Vec<Series>) -> PolarsResult<Series>`
25
pub trait ColumnsUdf: Send + Sync {
26
fn as_any(&self) -> &dyn std::any::Any {
27
unimplemented!("as_any not implemented for this 'opaque' function")
28
}
29
30
fn call_udf(&self, s: &mut [Column]) -> PolarsResult<Column>;
31
}
32
33
impl<F> ColumnsUdf for F
34
where
35
F: Fn(&mut [Column]) -> PolarsResult<Column> + Send + Sync,
36
{
37
fn call_udf(&self, s: &mut [Column]) -> PolarsResult<Column> {
38
self(s)
39
}
40
}
41
42
impl Debug for dyn ColumnsUdf {
43
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44
write!(f, "ColumnUdf")
45
}
46
}
47
48
#[derive(Clone)]
49
/// Wrapper type that has special equality properties
50
/// depending on the inner type specialization
51
pub struct SpecialEq<T>(T);
52
53
impl<T> SpecialEq<T> {
54
pub fn new(val: T) -> Self {
55
SpecialEq(val)
56
}
57
58
pub fn into_inner(self) -> T {
59
self.0
60
}
61
}
62
63
impl SpecialEq<Arc<dyn AnonymousColumnsUdf>> {
64
pub fn deep_clone(self) -> Self {
65
SpecialEq(self.0.deep_clone())
66
}
67
}
68
69
impl<T: ?Sized> PartialEq for SpecialEq<Arc<T>> {
70
fn eq(&self, other: &Self) -> bool {
71
Arc::ptr_eq(&self.0, &other.0)
72
}
73
}
74
75
impl<T: ?Sized> Eq for SpecialEq<Arc<T>> {}
76
77
impl<T: ?Sized> Hash for SpecialEq<Arc<T>> {
78
fn hash<H: Hasher>(&self, state: &mut H) {
79
Arc::as_ptr(self).hash(state);
80
}
81
}
82
83
impl PartialEq for SpecialEq<Series> {
84
fn eq(&self, other: &Self) -> bool {
85
self.0 == other.0
86
}
87
}
88
89
impl<T> Debug for SpecialEq<T> {
90
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
91
write!(f, "no_eq")
92
}
93
}
94
95
impl<T> Deref for SpecialEq<T> {
96
type Target = T;
97
98
fn deref(&self) -> &Self::Target {
99
&self.0
100
}
101
}
102
103
pub struct BaseColumnUdf<F, DT> {
104
f: F,
105
dt: DT,
106
}
107
108
impl<F, DT> BaseColumnUdf<F, DT> {
109
pub fn new(f: F, dt: DT) -> Self {
110
Self { f, dt }
111
}
112
}
113
114
impl<F, DT> ColumnsUdf for BaseColumnUdf<F, DT>
115
where
116
F: Fn(&mut [Column]) -> PolarsResult<Column> + Send + Sync,
117
DT: Fn(&Schema, &[Field]) -> PolarsResult<Field> + Send + Sync,
118
{
119
fn call_udf(&self, s: &mut [Column]) -> PolarsResult<Column> {
120
(self.f)(s)
121
}
122
}
123
124
impl<F, DT> AnonymousColumnsUdf for BaseColumnUdf<F, DT>
125
where
126
F: Fn(&mut [Column]) -> PolarsResult<Column> + 'static + Send + Sync,
127
DT: Fn(&Schema, &[Field]) -> PolarsResult<Field> + 'static + Send + Sync,
128
{
129
fn as_column_udf(self: Arc<Self>) -> Arc<dyn ColumnsUdf> {
130
self as _
131
}
132
fn deep_clone(self: Arc<Self>) -> Arc<dyn AnonymousColumnsUdf> {
133
self
134
}
135
136
fn get_field(&self, input_schema: &Schema, fields: &[Field]) -> PolarsResult<Field> {
137
(self.dt)(input_schema, fields)
138
}
139
}
140
141
pub type OpaqueColumnUdf = LazySerde<SpecialEq<Arc<dyn AnonymousColumnsUdf>>>;
142
143
impl Hash for OpaqueColumnUdf {
144
fn hash<H: Hasher>(&self, state: &mut H) {
145
core::mem::discriminant(self).hash(state);
146
match self {
147
Self::Deserialized(ptr) => ptr.hash(state),
148
Self::Bytes(b) => b.hash(state),
149
Self::Named {
150
name,
151
payload,
152
value: _,
153
} => {
154
name.hash(state);
155
payload.hash(state);
156
},
157
}
158
}
159
}
160
161
pub fn new_column_udf<F: AnonymousColumnsUdf + 'static>(func: F) -> OpaqueColumnUdf {
162
LazySerde::Deserialized(SpecialEq::new(Arc::new(func)))
163
}
164
165
impl OpaqueColumnUdf {
166
pub fn materialize(self) -> PolarsResult<SpecialEq<Arc<dyn AnonymousColumnsUdf>>> {
167
match self {
168
Self::Deserialized(t) => Ok(t),
169
Self::Named {
170
name,
171
payload,
172
value,
173
} => feature_gated!("serde", {
174
use super::named_serde::NAMED_SERDE_REGISTRY_EXPR;
175
match value {
176
Some(v) => Ok(v),
177
None => Ok(SpecialEq(
178
NAMED_SERDE_REGISTRY_EXPR
179
.read()
180
.unwrap()
181
.as_ref()
182
.expect("NAMED EXPR REGISTRY NOT SET")
183
.get_function(&name, payload.unwrap().as_ref())
184
.expect("NAMED FUNCTION NOT FOUND"),
185
)),
186
}
187
}),
188
Self::Bytes(_b) => {
189
feature_gated!("serde";"python", {
190
serde_expr::deserialize_column_udf(_b.as_ref()).map(SpecialEq::new)
191
})
192
},
193
}
194
}
195
}
196
197