Path: blob/main/crates/polars-plan/src/dsl/expr/serde_expr.rs
6940 views
use polars_utils::pl_serialize::deserialize_map_bytes;1use serde::{Deserialize, Deserializer, Serialize, Serializer};23use super::named_serde::ExprRegistry;4use super::*;56impl Serialize for SpecialEq<Arc<dyn AnonymousColumnsUdf>> {7fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>8where9S: Serializer,10{11use serde::ser::Error;12let mut buf = vec![];13self.as_ref()14.try_serialize(&mut buf)15.map_err(|e| S::Error::custom(format!("{e}")))?;16serializer.serialize_bytes(&buf)17}18}1920const NAMED_SERDE_MAGIC_BYTE_MARK: &[u8] = "PLNAMEDFN".as_bytes();21const NAMED_SERDE_MAGIC_BYTE_END: u8 = b'!';2223fn serialize_named<S: Serializer>(24serializer: S,25name: &str,26payload: Option<&[u8]>,27) -> Result<S::Ok, S::Error> {28let mut buf = vec![];29buf.extend_from_slice(NAMED_SERDE_MAGIC_BYTE_MARK);30buf.extend_from_slice(name.as_bytes());31buf.push(NAMED_SERDE_MAGIC_BYTE_END);32if let Some(payload) = payload {33buf.extend_from_slice(payload);34}35serializer.serialize_bytes(&buf)36}3738fn deserialize_named_registry(buf: &[u8]) -> PolarsResult<(Arc<dyn ExprRegistry>, &str, &[u8])> {39let bytes = &buf[NAMED_SERDE_MAGIC_BYTE_MARK.len()..];40let Some(pos) = bytes.iter().position(|b| *b == NAMED_SERDE_MAGIC_BYTE_END) else {41polars_bail!(ComputeError: "named-serde expected magic byte end")42};4344let Ok(name) = std::str::from_utf8(&bytes[..pos]) else {45polars_bail!(ComputeError: "named-serde name should be valid utf8")46};47let payload = &bytes[pos + 1..];4849let registry = named_serde::NAMED_SERDE_REGISTRY_EXPR.read().unwrap();50match &*registry {51Some(reg) => Ok((reg.clone(), name, payload)),52None => polars_bail!(ComputeError: "named serde registry not set"),53}54}5556impl<T: Serialize + Clone> Serialize for LazySerde<T> {57fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>58where59S: Serializer,60{61match self {62Self::Named {63name,64payload,65value: _,66} => serialize_named(serializer, name, payload.as_deref()),67Self::Deserialized(t) => t.serialize(serializer),68Self::Bytes(b) => b.serialize(serializer),69}70}71}7273impl<'a, T: Deserialize<'a> + Clone> Deserialize<'a> for LazySerde<T> {74fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>75where76D: Deserializer<'a>,77{78let buf = bytes::Bytes::deserialize(deserializer)?;79Ok(Self::Bytes(buf))80}81}8283#[cfg(feature = "dsl-schema")]84impl<T: schemars::JsonSchema + Clone> schemars::JsonSchema for LazySerde<T> {85fn schema_name() -> String {86T::schema_name()87}8889fn schema_id() -> std::borrow::Cow<'static, str> {90T::schema_id()91}9293fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {94Vec::<u8>::json_schema(generator)95}96}9798pub(super) fn deserialize_column_udf(buf: &[u8]) -> PolarsResult<Arc<dyn AnonymousColumnsUdf>> {99#[cfg(feature = "python")]100if buf.starts_with(crate::dsl::python_dsl::PYTHON_SERDE_MAGIC_BYTE_MARK) {101return crate::dsl::python_dsl::PythonUdfExpression::try_deserialize(buf);102};103104if buf.starts_with(NAMED_SERDE_MAGIC_BYTE_MARK) {105let (reg, name, payload) = deserialize_named_registry(buf)?;106107if let Some(func) = reg.get_function(name, payload) {108Ok(func)109} else {110let msg = "name not found in named serde registry";111polars_bail!(ComputeError: msg)112}113} else {114polars_bail!(ComputeError: "deserialization not supported for this 'opaque' function")115}116}117// impl<T: Deserialize> Deserialize for crate::dsl::expr::LazySerde<T> {118impl<'a> Deserialize<'a> for SpecialEq<Arc<dyn AnonymousColumnsUdf>> {119fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>120where121D: Deserializer<'a>,122{123use serde::de::Error;124deserialize_map_bytes(deserializer, |buf| {125deserialize_column_udf(&buf)126.map_err(|e| D::Error::custom(format!("{e}")))127.map(SpecialEq::new)128})?129}130}131132#[cfg(feature = "dsl-schema")]133impl schemars::JsonSchema for SpecialEq<Arc<dyn AnonymousColumnsUdf>> {134fn schema_name() -> String {135"AnonymousColumnsUdf".to_owned()136}137138fn schema_id() -> std::borrow::Cow<'static, str> {139std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "AnonymousColumnsUdf"))140}141142fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {143Vec::<u8>::json_schema(generator)144}145}146147impl Serialize for SpecialEq<Series> {148fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>149where150S: Serializer,151{152let s: &Series = self;153s.serialize(serializer)154}155}156157impl<'a> Deserialize<'a> for SpecialEq<Series> {158fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>159where160D: Deserializer<'a>,161{162let t = Series::deserialize(deserializer)?;163Ok(SpecialEq::new(t))164}165}166167#[cfg(feature = "dsl-schema")]168impl schemars::JsonSchema for SpecialEq<Series> {169fn schema_name() -> String {170Series::schema_name()171}172173fn schema_id() -> std::borrow::Cow<'static, str> {174Series::schema_id()175}176177fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {178Series::json_schema(generator)179}180}181182impl<T: Serialize> Serialize for SpecialEq<Arc<T>> {183fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>184where185S: Serializer,186{187self.as_ref().serialize(serializer)188}189}190191#[cfg(feature = "serde")]192impl<'a, T: Deserialize<'a>> Deserialize<'a> for SpecialEq<Arc<T>> {193fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>194where195D: Deserializer<'a>,196{197let t = T::deserialize(deserializer)?;198Ok(SpecialEq::new(Arc::new(t)))199}200}201202#[cfg(feature = "dsl-schema")]203impl<T: schemars::JsonSchema> schemars::JsonSchema for SpecialEq<Arc<T>> {204fn schema_name() -> String {205T::schema_name()206}207208fn schema_id() -> std::borrow::Cow<'static, str> {209T::schema_id()210}211212fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {213T::json_schema(generator)214}215}216217218