Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/dsl/expr/serde_expr.rs
6940 views
1
use polars_utils::pl_serialize::deserialize_map_bytes;
2
use serde::{Deserialize, Deserializer, Serialize, Serializer};
3
4
use super::named_serde::ExprRegistry;
5
use super::*;
6
7
impl Serialize for SpecialEq<Arc<dyn AnonymousColumnsUdf>> {
8
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
9
where
10
S: Serializer,
11
{
12
use serde::ser::Error;
13
let mut buf = vec![];
14
self.as_ref()
15
.try_serialize(&mut buf)
16
.map_err(|e| S::Error::custom(format!("{e}")))?;
17
serializer.serialize_bytes(&buf)
18
}
19
}
20
21
const NAMED_SERDE_MAGIC_BYTE_MARK: &[u8] = "PLNAMEDFN".as_bytes();
22
const NAMED_SERDE_MAGIC_BYTE_END: u8 = b'!';
23
24
fn serialize_named<S: Serializer>(
25
serializer: S,
26
name: &str,
27
payload: Option<&[u8]>,
28
) -> Result<S::Ok, S::Error> {
29
let mut buf = vec![];
30
buf.extend_from_slice(NAMED_SERDE_MAGIC_BYTE_MARK);
31
buf.extend_from_slice(name.as_bytes());
32
buf.push(NAMED_SERDE_MAGIC_BYTE_END);
33
if let Some(payload) = payload {
34
buf.extend_from_slice(payload);
35
}
36
serializer.serialize_bytes(&buf)
37
}
38
39
fn deserialize_named_registry(buf: &[u8]) -> PolarsResult<(Arc<dyn ExprRegistry>, &str, &[u8])> {
40
let bytes = &buf[NAMED_SERDE_MAGIC_BYTE_MARK.len()..];
41
let Some(pos) = bytes.iter().position(|b| *b == NAMED_SERDE_MAGIC_BYTE_END) else {
42
polars_bail!(ComputeError: "named-serde expected magic byte end")
43
};
44
45
let Ok(name) = std::str::from_utf8(&bytes[..pos]) else {
46
polars_bail!(ComputeError: "named-serde name should be valid utf8")
47
};
48
let payload = &bytes[pos + 1..];
49
50
let registry = named_serde::NAMED_SERDE_REGISTRY_EXPR.read().unwrap();
51
match &*registry {
52
Some(reg) => Ok((reg.clone(), name, payload)),
53
None => polars_bail!(ComputeError: "named serde registry not set"),
54
}
55
}
56
57
impl<T: Serialize + Clone> Serialize for LazySerde<T> {
58
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
59
where
60
S: Serializer,
61
{
62
match self {
63
Self::Named {
64
name,
65
payload,
66
value: _,
67
} => serialize_named(serializer, name, payload.as_deref()),
68
Self::Deserialized(t) => t.serialize(serializer),
69
Self::Bytes(b) => b.serialize(serializer),
70
}
71
}
72
}
73
74
impl<'a, T: Deserialize<'a> + Clone> Deserialize<'a> for LazySerde<T> {
75
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
76
where
77
D: Deserializer<'a>,
78
{
79
let buf = bytes::Bytes::deserialize(deserializer)?;
80
Ok(Self::Bytes(buf))
81
}
82
}
83
84
#[cfg(feature = "dsl-schema")]
85
impl<T: schemars::JsonSchema + Clone> schemars::JsonSchema for LazySerde<T> {
86
fn schema_name() -> String {
87
T::schema_name()
88
}
89
90
fn schema_id() -> std::borrow::Cow<'static, str> {
91
T::schema_id()
92
}
93
94
fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
95
Vec::<u8>::json_schema(generator)
96
}
97
}
98
99
pub(super) fn deserialize_column_udf(buf: &[u8]) -> PolarsResult<Arc<dyn AnonymousColumnsUdf>> {
100
#[cfg(feature = "python")]
101
if buf.starts_with(crate::dsl::python_dsl::PYTHON_SERDE_MAGIC_BYTE_MARK) {
102
return crate::dsl::python_dsl::PythonUdfExpression::try_deserialize(buf);
103
};
104
105
if buf.starts_with(NAMED_SERDE_MAGIC_BYTE_MARK) {
106
let (reg, name, payload) = deserialize_named_registry(buf)?;
107
108
if let Some(func) = reg.get_function(name, payload) {
109
Ok(func)
110
} else {
111
let msg = "name not found in named serde registry";
112
polars_bail!(ComputeError: msg)
113
}
114
} else {
115
polars_bail!(ComputeError: "deserialization not supported for this 'opaque' function")
116
}
117
}
118
// impl<T: Deserialize> Deserialize for crate::dsl::expr::LazySerde<T> {
119
impl<'a> Deserialize<'a> for SpecialEq<Arc<dyn AnonymousColumnsUdf>> {
120
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
121
where
122
D: Deserializer<'a>,
123
{
124
use serde::de::Error;
125
deserialize_map_bytes(deserializer, |buf| {
126
deserialize_column_udf(&buf)
127
.map_err(|e| D::Error::custom(format!("{e}")))
128
.map(SpecialEq::new)
129
})?
130
}
131
}
132
133
#[cfg(feature = "dsl-schema")]
134
impl schemars::JsonSchema for SpecialEq<Arc<dyn AnonymousColumnsUdf>> {
135
fn schema_name() -> String {
136
"AnonymousColumnsUdf".to_owned()
137
}
138
139
fn schema_id() -> std::borrow::Cow<'static, str> {
140
std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "AnonymousColumnsUdf"))
141
}
142
143
fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
144
Vec::<u8>::json_schema(generator)
145
}
146
}
147
148
impl Serialize for SpecialEq<Series> {
149
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
150
where
151
S: Serializer,
152
{
153
let s: &Series = self;
154
s.serialize(serializer)
155
}
156
}
157
158
impl<'a> Deserialize<'a> for SpecialEq<Series> {
159
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
160
where
161
D: Deserializer<'a>,
162
{
163
let t = Series::deserialize(deserializer)?;
164
Ok(SpecialEq::new(t))
165
}
166
}
167
168
#[cfg(feature = "dsl-schema")]
169
impl schemars::JsonSchema for SpecialEq<Series> {
170
fn schema_name() -> String {
171
Series::schema_name()
172
}
173
174
fn schema_id() -> std::borrow::Cow<'static, str> {
175
Series::schema_id()
176
}
177
178
fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
179
Series::json_schema(generator)
180
}
181
}
182
183
impl<T: Serialize> Serialize for SpecialEq<Arc<T>> {
184
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
185
where
186
S: Serializer,
187
{
188
self.as_ref().serialize(serializer)
189
}
190
}
191
192
#[cfg(feature = "serde")]
193
impl<'a, T: Deserialize<'a>> Deserialize<'a> for SpecialEq<Arc<T>> {
194
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
195
where
196
D: Deserializer<'a>,
197
{
198
let t = T::deserialize(deserializer)?;
199
Ok(SpecialEq::new(Arc::new(t)))
200
}
201
}
202
203
#[cfg(feature = "dsl-schema")]
204
impl<T: schemars::JsonSchema> schemars::JsonSchema for SpecialEq<Arc<T>> {
205
fn schema_name() -> String {
206
T::schema_name()
207
}
208
209
fn schema_id() -> std::borrow::Cow<'static, str> {
210
T::schema_id()
211
}
212
213
fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
214
T::json_schema(generator)
215
}
216
}
217
218