Path: blob/main/crates/polars-io/src/parquet/write/key_value_metadata.rs
6940 views
use std::fmt::Debug;1use std::hash::Hash;2use std::sync::Arc;34use polars_error::PolarsResult;5use polars_parquet::write::KeyValue;6#[cfg(feature = "python")]7use polars_utils::python_function::PythonObject;8#[cfg(feature = "python")]9use pyo3::PyObject;10#[cfg(feature = "serde")]11use serde::{Deserialize, Serialize, de, ser};1213/// Context that can be used to construct custom file-level key value metadata for a Parquet file.14pub struct ParquetMetadataContext<'a> {15pub arrow_schema: &'a str,16}1718/// Key/value pairs that can be attached to a Parquet file as file-level metadtaa.19#[derive(Clone, Debug, PartialEq, Eq, Hash)]20#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]21#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]22pub enum KeyValueMetadata {23/// Static key value metadata.24Static(25#[cfg_attr(26feature = "serde",27serde(28serialize_with = "serialize_vec_key_value",29deserialize_with = "deserialize_vec_key_value"30)31)]32#[cfg_attr(33feature = "dsl-schema",34schemars(with = "Vec<(String, Option<String>)>")35)]36Vec<KeyValue>,37),38/// Rust function to dynamically compute key value metadata.39#[cfg_attr(feature = "dsl-schema", schemars(skip))]40DynamicRust(RustKeyValueMetadataFunction),41/// Python function to dynamically compute key value metadata.42#[cfg(feature = "python")]43DynamicPython(python_impl::PythonKeyValueMetadataFunction),44}4546#[cfg(feature = "serde")]47fn serialize_vec_key_value<S>(kv: &[KeyValue], serializer: S) -> Result<S::Ok, S::Error>48where49S: ser::Serializer,50{51kv.iter()52.map(|item| (&item.key, item.value.as_ref()))53.collect::<Vec<_>>()54.serialize(serializer)55}5657#[cfg(feature = "serde")]58fn deserialize_vec_key_value<'de, D>(deserializer: D) -> Result<Vec<KeyValue>, D::Error>59where60D: de::Deserializer<'de>,61{62let data = Vec::<(String, Option<String>)>::deserialize(deserializer)?;63let result = data64.into_iter()65.map(|(key, value)| KeyValue { key, value })66.collect::<Vec<_>>();67Ok(result)68}6970impl KeyValueMetadata {71/// Create a key value metadata object from a static key value mapping.72pub fn from_static(kv: Vec<(String, String)>) -> Self {73Self::Static(74kv.into_iter()75.map(|(key, value)| KeyValue {76key,77value: Some(value),78})79.collect(),80)81}8283/// Create a key value metadata object from a Python function.84#[cfg(feature = "python")]85pub fn from_py_function(py_object: PyObject) -> Self {86Self::DynamicPython(python_impl::PythonKeyValueMetadataFunction(Arc::new(87PythonObject(py_object),88)))89}9091/// Turn the metadata into the key/value pairs to write to the Parquet file.92/// The context is used to dynamically construct key/value pairs.93pub fn collect(&self, ctx: ParquetMetadataContext) -> PolarsResult<Vec<KeyValue>> {94match self {95Self::Static(kv) => Ok(kv.clone()),96Self::DynamicRust(func) => Ok(func.0(ctx)),97#[cfg(feature = "python")]98Self::DynamicPython(py_func) => py_func.call(ctx),99}100}101}102103#[derive(Clone)]104pub struct RustKeyValueMetadataFunction(105Arc<dyn Fn(ParquetMetadataContext) -> Vec<KeyValue> + Send + Sync>,106);107108impl Debug for RustKeyValueMetadataFunction {109fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {110write!(111f,112"key value metadata function at 0x{:016x}",113self.0.as_ref() as *const _ as *const () as usize114)115}116}117118impl Eq for RustKeyValueMetadataFunction {}119120impl PartialEq for RustKeyValueMetadataFunction {121fn eq(&self, other: &Self) -> bool {122Arc::ptr_eq(&self.0, &other.0)123}124}125126impl Hash for RustKeyValueMetadataFunction {127fn hash<H: std::hash::Hasher>(&self, state: &mut H) {128state.write_usize(Arc::as_ptr(&self.0) as *const () as usize);129}130}131132#[cfg(feature = "serde")]133impl Serialize for RustKeyValueMetadataFunction {134fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>135where136S: serde::Serializer,137{138use serde::ser::Error;139Err(S::Error::custom(format!("cannot serialize {self:?}")))140}141}142143#[cfg(feature = "serde")]144impl<'de> Deserialize<'de> for RustKeyValueMetadataFunction {145fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>146where147D: serde::Deserializer<'de>,148{149use serde::de::Error;150Err(D::Error::custom(151"cannot deserialize RustKeyValueMetadataFn",152))153}154}155156#[cfg(feature = "python")]157mod python_impl {158use std::hash::Hash;159use std::sync::Arc;160161use polars_error::{PolarsResult, to_compute_err};162use polars_parquet::write::KeyValue;163use polars_utils::python_function::PythonObject;164use pyo3::types::PyAnyMethods;165use pyo3::{PyResult, Python, pyclass};166use serde::{Deserialize, Serialize};167168use super::ParquetMetadataContext;169170#[derive(Clone, Debug, PartialEq, Eq)]171#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]172#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]173pub struct PythonKeyValueMetadataFunction(174#[cfg(feature = "python")]175#[cfg_attr(176feature = "serde",177serde(178serialize_with = "PythonObject::serialize_with_pyversion",179deserialize_with = "PythonObject::deserialize_with_pyversion"180)181)]182#[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<u8>"))]183pub Arc<polars_utils::python_function::PythonFunction>,184);185186impl PythonKeyValueMetadataFunction {187pub fn call(&self, ctx: ParquetMetadataContext) -> PolarsResult<Vec<KeyValue>> {188let ctx = PythonParquetMetadataContext::from_key_value_metadata_context(ctx);189Python::with_gil(|py| {190let args = (ctx,);191let out: Vec<(String, String)> =192self.0.call1(py, args)?.into_bound(py).extract()?;193let result = out194.into_iter()195.map(|item| KeyValue {196key: item.0,197value: Some(item.1),198})199.collect::<Vec<_>>();200PyResult::Ok(result)201})202.map_err(to_compute_err)203}204}205206impl Hash for PythonKeyValueMetadataFunction {207fn hash<H: std::hash::Hasher>(&self, state: &mut H) {208state.write_usize(Arc::as_ptr(&self.0) as *const () as usize);209}210}211212#[pyclass]213pub struct PythonParquetMetadataContext {214#[pyo3(get)]215arrow_schema: String,216}217218impl PythonParquetMetadataContext {219pub fn from_key_value_metadata_context(ctx: ParquetMetadataContext) -> Self {220Self {221arrow_schema: ctx.arrow_schema.to_string(),222}223}224}225}226227228