Path: blob/main/crates/polars-io/src/cloud/credential_provider.rs
8427 views
use std::fmt::Debug;1use std::future::Future;2use std::hash::Hash;3use std::pin::Pin;4use std::sync::Arc;5use std::time::{SystemTime, UNIX_EPOCH};67use async_trait::async_trait;8#[cfg(feature = "aws")]9pub use object_store::aws::AwsCredential;10#[cfg(feature = "azure")]11pub use object_store::azure::AzureCredential;12#[cfg(feature = "gcp")]13pub use object_store::gcp::GcpCredential;14use polars_core::config;15use polars_error::{PolarsResult, polars_bail};16use polars_utils::pl_str::PlSmallStr;17#[cfg(feature = "python")]18use polars_utils::python_function::PythonObject;19#[cfg(feature = "python")]20use python_impl::PythonCredentialProvider;2122#[derive(Clone, Debug, PartialEq, Hash, Eq)]23pub enum PlCredentialProvider {24/// Prefer using [`PlCredentialProvider::from_func`] instead of constructing this directly25Function(CredentialProviderFunction),26#[cfg(feature = "python")]27Python(PythonCredentialProvider),28}2930impl PlCredentialProvider {31/// Accepts a function that returns (credential, expiry time as seconds since UNIX_EPOCH)32///33/// This functionality is unstable.34pub fn from_func(35// Internal notes36// * This function is exposed as the Rust API for `PlCredentialProvider`37func: impl Fn() -> Pin<38Box<dyn Future<Output = PolarsResult<(ObjectStoreCredential, u64)>> + Send + Sync>,39> + Send40+ Sync41+ 'static,42) -> Self {43Self::Function(CredentialProviderFunction(Arc::new(func)))44}4546/// Intended to be called with an internal `CredentialProviderBuilder` from47/// py-polars.48#[cfg(feature = "python")]49pub fn from_python_builder(func: pyo3::Py<pyo3::PyAny>) -> Self {50Self::Python(python_impl::PythonCredentialProvider::Builder(Arc::new(51PythonObject(func),52)))53}5455pub(super) fn func_addr(&self) -> usize {56match self {57Self::Function(CredentialProviderFunction(v)) => Arc::as_ptr(v) as *const () as usize,58#[cfg(feature = "python")]59Self::Python(v) => v.func_addr(),60}61}6263/// Python passes a `CredentialProviderBuilder`, this calls the builder to build the final64/// credential provider.65///66/// This returns `Option` as the auto-initialization case is fallible and falls back to None.67pub(crate) fn try_into_initialized(68self,69clear_cached_credentials: bool,70) -> PolarsResult<Option<Self>> {71match self {72Self::Function(_) => Ok(Some(self)),73#[cfg(feature = "python")]74Self::Python(v) => Ok(v75.try_into_initialized(clear_cached_credentials)?76.map(Self::Python)),77}78}79}8081pub enum ObjectStoreCredential {82#[cfg(feature = "aws")]83Aws(Arc<object_store::aws::AwsCredential>),84#[cfg(feature = "azure")]85Azure(Arc<object_store::azure::AzureCredential>),86#[cfg(feature = "gcp")]87Gcp(Arc<object_store::gcp::GcpCredential>),88/// For testing purposes89None,90}9192impl ObjectStoreCredential {93fn variant_name(&self) -> &'static str {94match self {95#[cfg(feature = "aws")]96Self::Aws(_) => "Aws",97#[cfg(feature = "azure")]98Self::Azure(_) => "Azure",99#[cfg(feature = "gcp")]100Self::Gcp(_) => "Gcp",101Self::None => "None",102}103}104105fn panic_type_mismatch(&self, expected: &str) {106panic!(107"impl error: credential type mismatch: expected {}, got {} instead",108expected,109self.variant_name()110)111}112113#[cfg(feature = "aws")]114fn unwrap_aws(self) -> Arc<object_store::aws::AwsCredential> {115let Self::Aws(v) = self else {116self.panic_type_mismatch("aws");117unreachable!()118};119v120}121122#[cfg(feature = "azure")]123fn unwrap_azure(self) -> Arc<object_store::azure::AzureCredential> {124let Self::Azure(v) = self else {125self.panic_type_mismatch("azure");126unreachable!()127};128v129}130131#[cfg(feature = "gcp")]132fn unwrap_gcp(self) -> Arc<object_store::gcp::GcpCredential> {133let Self::Gcp(v) = self else {134self.panic_type_mismatch("gcp");135unreachable!()136};137v138}139}140141pub trait IntoCredentialProvider: Sized {142#[cfg(feature = "aws")]143fn into_aws_provider(self) -> object_store::aws::AwsCredentialProvider {144unimplemented!()145}146147#[cfg(feature = "azure")]148fn into_azure_provider(self) -> object_store::azure::AzureCredentialProvider {149unimplemented!()150}151152#[cfg(feature = "gcp")]153fn into_gcp_provider(self) -> object_store::gcp::GcpCredentialProvider {154unimplemented!()155}156157/// Note, technically shouldn't be under the `IntoCredentialProvider` trait, but it's here158/// for convenience.159fn storage_update_options(&self) -> PolarsResult<Vec<(PlSmallStr, PlSmallStr)>>;160}161162impl IntoCredentialProvider for PlCredentialProvider {163#[cfg(feature = "aws")]164fn into_aws_provider(self) -> object_store::aws::AwsCredentialProvider {165match self {166Self::Function(v) => v.into_aws_provider(),167#[cfg(feature = "python")]168Self::Python(v) => v.into_aws_provider(),169}170}171172#[cfg(feature = "azure")]173fn into_azure_provider(self) -> object_store::azure::AzureCredentialProvider {174match self {175Self::Function(v) => v.into_azure_provider(),176#[cfg(feature = "python")]177Self::Python(v) => v.into_azure_provider(),178}179}180181#[cfg(feature = "gcp")]182fn into_gcp_provider(self) -> object_store::gcp::GcpCredentialProvider {183match self {184Self::Function(v) => v.into_gcp_provider(),185#[cfg(feature = "python")]186Self::Python(v) => v.into_gcp_provider(),187}188}189190fn storage_update_options(&self) -> PolarsResult<Vec<(PlSmallStr, PlSmallStr)>> {191match self {192Self::Function(v) => v.storage_update_options(),193#[cfg(feature = "python")]194Self::Python(v) => v.storage_update_options(),195}196}197}198199type CredentialProviderFunctionImpl = Arc<200dyn Fn() -> Pin<201Box<dyn Future<Output = PolarsResult<(ObjectStoreCredential, u64)>> + Send + Sync>,202> + Send203+ Sync,204>;205206/// Wrapper that implements [`IntoCredentialProvider`], [`Debug`], [`PartialEq`], [`Hash`] etc.207#[derive(Clone)]208pub struct CredentialProviderFunction(CredentialProviderFunctionImpl);209210macro_rules! build_to_object_store_err {211($s:expr) => {{212fn to_object_store_err(213e: impl std::error::Error + Send + Sync + 'static,214) -> object_store::Error {215object_store::Error::Generic {216store: $s,217source: Box::new(e),218}219}220221to_object_store_err222}};223}224225impl IntoCredentialProvider for CredentialProviderFunction {226#[cfg(feature = "aws")]227fn into_aws_provider(self) -> object_store::aws::AwsCredentialProvider {228#[derive(Debug)]229struct S(230CredentialProviderFunction,231FetchedCredentialsCache<Arc<object_store::aws::AwsCredential>>,232);233234#[async_trait]235impl object_store::CredentialProvider for S {236type Credential = object_store::aws::AwsCredential;237238async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {239self.1240.get_maybe_update(async {241let (creds, expiry) = self.0.0().await?;242PolarsResult::Ok((creds.unwrap_aws(), expiry))243})244.await245.map_err(build_to_object_store_err!("credential-provider-aws"))246}247}248249Arc::new(S(250self,251FetchedCredentialsCache::new(Arc::new(AwsCredential {252key_id: String::new(),253secret_key: String::new(),254token: None,255})),256))257}258259#[cfg(feature = "azure")]260fn into_azure_provider(self) -> object_store::azure::AzureCredentialProvider {261#[derive(Debug)]262struct S(263CredentialProviderFunction,264FetchedCredentialsCache<Arc<object_store::azure::AzureCredential>>,265);266267#[async_trait]268impl object_store::CredentialProvider for S {269type Credential = object_store::azure::AzureCredential;270271async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {272self.1273.get_maybe_update(async {274let (creds, expiry) = self.0.0().await?;275PolarsResult::Ok((creds.unwrap_azure(), expiry))276})277.await278.map_err(build_to_object_store_err!("credential-provider-azure"))279}280}281282Arc::new(S(283self,284FetchedCredentialsCache::new(Arc::new(AzureCredential::BearerToken(String::new()))),285))286}287288#[cfg(feature = "gcp")]289fn into_gcp_provider(self) -> object_store::gcp::GcpCredentialProvider {290#[derive(Debug)]291struct S(292CredentialProviderFunction,293FetchedCredentialsCache<Arc<object_store::gcp::GcpCredential>>,294);295296#[async_trait]297impl object_store::CredentialProvider for S {298type Credential = object_store::gcp::GcpCredential;299300async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {301self.1302.get_maybe_update(async {303let (creds, expiry) = self.0.0().await?;304PolarsResult::Ok((creds.unwrap_gcp(), expiry))305})306.await307.map_err(build_to_object_store_err!("credential-provider-gcp"))308}309}310311Arc::new(S(312self,313FetchedCredentialsCache::new(Arc::new(GcpCredential {314bearer: String::new(),315})),316))317}318319fn storage_update_options(&self) -> PolarsResult<Vec<(PlSmallStr, PlSmallStr)>> {320Ok(vec![])321}322}323324impl Debug for CredentialProviderFunction {325fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {326write!(327f,328"credential provider function at 0x{:016x}",329self.0.as_ref() as *const _ as *const () as usize330)331}332}333334impl Eq for CredentialProviderFunction {}335336impl PartialEq for CredentialProviderFunction {337fn eq(&self, other: &Self) -> bool {338Arc::ptr_eq(&self.0, &other.0)339}340}341342impl Hash for CredentialProviderFunction {343fn hash<H: std::hash::Hasher>(&self, state: &mut H) {344state.write_usize(Arc::as_ptr(&self.0) as *const () as usize)345}346}347348#[cfg(feature = "serde")]349impl<'de> serde::Deserialize<'de> for PlCredentialProvider {350fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>351where352D: serde::Deserializer<'de>,353{354#[cfg(feature = "python")]355{356Ok(Self::Python(PythonCredentialProvider::deserialize(357_deserializer,358)?))359}360#[cfg(not(feature = "python"))]361{362use serde::de::Error;363Err(D::Error::custom("cannot deserialize PlCredentialProvider"))364}365}366}367368#[cfg(feature = "serde")]369impl serde::Serialize for PlCredentialProvider {370fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>371where372S: serde::Serializer,373{374use serde::ser::Error;375376#[cfg(feature = "python")]377if let PlCredentialProvider::Python(v) = self {378return v.serialize(_serializer);379}380381Err(S::Error::custom(format!("cannot serialize {self:?}")))382}383}384385#[cfg(feature = "dsl-schema")]386impl schemars::JsonSchema for PlCredentialProvider {387fn schema_name() -> std::borrow::Cow<'static, str> {388"PlCredentialProvider".into()389}390391fn schema_id() -> std::borrow::Cow<'static, str> {392std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "PlCredentialProvider"))393}394395fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {396Vec::<u8>::json_schema(generator)397}398}399400/// Avoids calling the credential provider function if we have not yet passed the expiry time.401#[derive(Debug)]402struct FetchedCredentialsCache<C>(tokio::sync::Mutex<(C, u64, bool)>);403404impl<C: Clone> FetchedCredentialsCache<C> {405fn new(init_creds: C) -> Self {406Self(tokio::sync::Mutex::new((init_creds, 0, true)))407}408409async fn get_maybe_update(410&self,411// Taking an `impl Future` here allows us to potentially avoid a `Box::pin` allocation from412// a `Fn() -> Pin<Box<dyn Future>>` by having it wrapped in an `async { f() }` block. We413// will not poll that block if the credentials have not yet expired.414update_func: impl Future<Output = PolarsResult<(C, u64)>>,415) -> PolarsResult<C> {416let verbose = config::verbose();417418fn expiry_msg(last_fetched_expiry: u64, now: u64) -> String {419if last_fetched_expiry == u64::MAX {420"expiry = (never expires)".into()421} else {422format!(423"expiry = {} (in {} seconds)",424last_fetched_expiry,425last_fetched_expiry.saturating_sub(now)426)427}428}429430let mut inner = self.0.lock().await;431let (last_fetched_credentials, last_fetched_expiry, log_use_cached) = &mut *inner;432433let current_time = SystemTime::now()434.duration_since(UNIX_EPOCH)435.unwrap()436.as_secs();437438if *last_fetched_expiry <= current_time {439if verbose {440eprintln!(441"[FetchedCredentialsCache]: \442Call update_func: current_time = {}, \443last_fetched_expiry = {}",444current_time, *last_fetched_expiry445)446}447448let (credentials, expiry) = update_func.await?;449450*last_fetched_credentials = credentials;451*last_fetched_expiry = expiry;452*log_use_cached = true;453454if expiry < current_time && expiry != 0 {455polars_bail!(456ComputeError:457"credential expiry time {} is older than system time {} \458by {} seconds",459expiry,460current_time,461current_time - expiry462)463}464465if verbose {466eprintln!(467"[FetchedCredentialsCache]: Finish update_func: new {}",468expiry_msg(469*last_fetched_expiry,470SystemTime::now()471.duration_since(UNIX_EPOCH)472.unwrap()473.as_secs()474)475)476}477} else if verbose && *log_use_cached {478*log_use_cached = false;479let now = SystemTime::now()480.duration_since(UNIX_EPOCH)481.unwrap()482.as_secs();483eprintln!(484"[FetchedCredentialsCache]: Using cached credentials: \485current_time = {}, {}",486now,487expiry_msg(*last_fetched_expiry, now)488)489}490491Ok(last_fetched_credentials.clone())492}493}494495#[cfg(feature = "python")]496mod python_impl {497use std::hash::Hash;498use std::sync::Arc;499500use polars_error::{PolarsError, PolarsResult};501use polars_utils::pl_str::PlSmallStr;502use polars_utils::python_function::PythonObject;503use pyo3::exceptions::PyValueError;504use pyo3::pybacked::PyBackedStr;505use pyo3::types::{PyAnyMethods, PyDict, PyDictMethods};506use pyo3::{Python, intern};507508use super::IntoCredentialProvider;509510#[derive(Clone, Debug)]511#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]512pub enum PythonCredentialProvider {513#[cfg_attr(514feature = "serde",515serde(516serialize_with = "PythonObject::serialize_with_pyversion",517deserialize_with = "PythonObject::deserialize_with_pyversion"518)519)]520/// Indicates `py_object` is a `CredentialProviderBuilder`.521Builder(Arc<PythonObject>),522#[cfg_attr(523feature = "serde",524serde(525serialize_with = "PythonObject::serialize_with_pyversion",526deserialize_with = "PythonObject::deserialize_with_pyversion"527)528)]529/// Indicates `py_object` is an instantiated credential provider530Provider(Arc<PythonObject>),531}532533impl PythonCredentialProvider {534/// Performs initialization if necessary.535///536/// This exists as a separate step that must be called beforehand. This approach is easier537/// as the alternative is to refactor the `IntoCredentialProvider` trait to return538/// `PolarsResult<Option<T>>` for every single function.539pub(super) fn try_into_initialized(540self,541clear_cached_credentials: bool,542) -> PolarsResult<Option<Self>> {543match self {544Self::Builder(py_object) => {545let opt_initialized_py_object = Python::attach(|py| {546let build_fn =547py_object.getattr(py, intern!(py, "build_credential_provider"))?;548549let v = build_fn.call1(py, (clear_cached_credentials,))?;550let v = (!v.is_none(py)).then_some(v);551552pyo3::PyResult::Ok(v)553})?;554555Ok(opt_initialized_py_object556.map(PythonObject)557.map(Arc::new)558.map(Self::Provider))559},560Self::Provider(_) => {561// Note: We don't expect to hit here.562Ok(Some(self))563},564}565}566567fn unwrap_as_provider(self) -> Arc<PythonObject> {568match self {569Self::Builder(_) => panic!(),570Self::Provider(v) => v,571}572}573574pub(crate) fn unwrap_as_provider_ref(&self) -> &Arc<PythonObject> {575match self {576Self::Builder(_) => panic!(),577Self::Provider(v) => v,578}579}580581pub(super) fn func_addr(&self) -> usize {582(match self {583Self::Builder(v) => Arc::as_ptr(v),584Self::Provider(v) => Arc::as_ptr(v),585}) as *const () as usize586}587}588589impl IntoCredentialProvider for PythonCredentialProvider {590#[cfg(feature = "aws")]591fn into_aws_provider(self) -> object_store::aws::AwsCredentialProvider {592use polars_error::PolarsResult;593594use crate::cloud::credential_provider::{595CredentialProviderFunction, ObjectStoreCredential,596};597598let func = self.unwrap_as_provider();599600CredentialProviderFunction(Arc::new(move || {601let func = func.clone();602Box::pin(async move {603let mut credentials = object_store::aws::AwsCredential {604key_id: String::new(),605secret_key: String::new(),606token: None,607};608609let expiry = Python::attach(|py| {610let v = func.0.call0(py)?.into_bound(py);611let (storage_options, expiry) =612v.extract::<(pyo3::Bound<'_, PyDict>, Option<u64>)>()?;613614for (k, v) in storage_options.iter() {615let k = k.extract::<PyBackedStr>()?;616let v = v.extract::<Option<String>>()?;617618match k.as_ref() {619"aws_access_key_id" => {620credentials.key_id = v.ok_or_else(|| {621PyValueError::new_err("aws_access_key_id was None")622})?;623},624"aws_secret_access_key" => {625credentials.secret_key = v.ok_or_else(|| {626PyValueError::new_err("aws_secret_access_key was None")627})?628},629"aws_session_token" => credentials.token = v,630v => {631return pyo3::PyResult::Err(PyValueError::new_err(format!(632"unknown configuration key for aws: {}, \633valid configuration keys are: \634{}, {}, {}",635v,636"aws_access_key_id",637"aws_secret_access_key",638"aws_session_token"639)));640},641}642}643644pyo3::PyResult::Ok(expiry.unwrap_or(u64::MAX))645})?;646647if credentials.key_id.is_empty() {648return Err(PolarsError::ComputeError(649"aws_access_key_id was empty or not given".into(),650));651}652653if credentials.secret_key.is_empty() {654return Err(PolarsError::ComputeError(655"aws_secret_access_key was empty or not given".into(),656));657}658659PolarsResult::Ok((ObjectStoreCredential::Aws(Arc::new(credentials)), expiry))660})661}))662.into_aws_provider()663}664665#[cfg(feature = "azure")]666fn into_azure_provider(self) -> object_store::azure::AzureCredentialProvider {667use object_store::azure::AzureAccessKey;668use percent_encoding::percent_decode_str;669use polars_core::config::verbose_print_sensitive;670use polars_error::PolarsResult;671672use crate::cloud::credential_provider::{673CredentialProviderFunction, ObjectStoreCredential,674};675676let func = self.unwrap_as_provider();677678return CredentialProviderFunction(Arc::new(move || {679let func = func.clone();680Box::pin(async move {681let mut credentials = None;682683static VALID_KEYS_MSG: &str =684"valid configuration keys are: ('account_key', 'bearer_token', 'sas_token')";685686let expiry = Python::attach(|py| {687let v = func.0.call0(py)?.into_bound(py);688let (storage_options, expiry) =689v.extract::<(pyo3::Bound<'_, PyDict>, Option<u64>)>()?;690691for (k, v) in storage_options.iter() {692let k = k.extract::<PyBackedStr>()?;693let v = v.extract::<String>()?;694695match k.as_ref() {696"account_key" => {697credentials =698Some(object_store::azure::AzureCredential::AccessKey(699AzureAccessKey::try_new(v.as_str()).map_err(|e| {700PyValueError::new_err(e.to_string())701})?,702))703},704"bearer_token" => {705credentials =706Some(object_store::azure::AzureCredential::BearerToken(v))707},708"sas_token" => {709credentials =710Some(object_store::azure::AzureCredential::SASToken(711split_sas(&v).map_err(|err_msg| {712verbose_print_sensitive(|| {713format!("error decoding SAS token: {err_msg} (token: {v})")714});715716PyValueError::new_err(format!(717"error decoding SAS token: {err_msg}. \718Set POLARS_VERBOSE_SENSITIVE=1 to print the value"719))720})?,721))722},723v => {724return pyo3::PyResult::Err(PyValueError::new_err(format!(725"unknown configuration key for azure: {v}, {VALID_KEYS_MSG}"726)));727},728}729}730731pyo3::PyResult::Ok(expiry.unwrap_or(u64::MAX))732})?;733734let Some(credentials) = credentials else {735return Err(PolarsError::ComputeError(736format!(737"did not find a valid configuration key for azure, {VALID_KEYS_MSG}"738)739.into(),740));741};742743PolarsResult::Ok((ObjectStoreCredential::Azure(Arc::new(credentials)), expiry))744})745}))746.into_azure_provider();747748/// Copied and adjusted from object-store.749///750/// https://github.com/apache/arrow-rs-object-store/blob/7a0504b4924fcecee17d768fd7190b8f71b0877f/src/azure/builder.rs#L1072-L1089751fn split_sas(sas: &str) -> Result<Vec<(String, String)>, &'static str> {752let sas = percent_decode_str(sas)753.decode_utf8()754.map_err(|_| "UTF-8 decode error")?;755756let kv_str_pairs = sas757.trim_start_matches('?')758.split('&')759.filter(|s| !s.chars().all(char::is_whitespace));760761let mut pairs = Vec::new();762763for kv_pair_str in kv_str_pairs {764let (k, v) = kv_pair_str765.trim()766.split_once('=')767.ok_or("missing SAS component")?;768pairs.push((k.into(), v.into()))769}770771Ok(pairs)772}773}774775#[cfg(feature = "gcp")]776fn into_gcp_provider(self) -> object_store::gcp::GcpCredentialProvider {777use polars_error::PolarsResult;778779use crate::cloud::credential_provider::{780CredentialProviderFunction, ObjectStoreCredential,781};782783let func = self.unwrap_as_provider();784785CredentialProviderFunction(Arc::new(move || {786let func = func.clone();787Box::pin(async move {788let mut credentials = object_store::gcp::GcpCredential {789bearer: String::new(),790};791792let expiry = Python::attach(|py| {793let v = func.0.call0(py)?.into_bound(py);794let (storage_options, expiry) =795v.extract::<(pyo3::Bound<'_, PyDict>, Option<u64>)>()?;796797for (k, v) in storage_options.iter() {798let k = k.extract::<PyBackedStr>()?;799let v = v.extract::<String>()?;800801match k.as_ref() {802"bearer_token" => credentials.bearer = v,803v => {804return pyo3::PyResult::Err(PyValueError::new_err(format!(805"unknown configuration key for gcp: {}, \806valid configuration keys are: {}",807v, "bearer_token",808)));809},810}811}812813pyo3::PyResult::Ok(expiry.unwrap_or(u64::MAX))814})?;815816if credentials.bearer.is_empty() {817return Err(PolarsError::ComputeError(818"bearer was empty or not given".into(),819));820}821822PolarsResult::Ok((ObjectStoreCredential::Gcp(Arc::new(credentials)), expiry))823})824}))825.into_gcp_provider()826}827828/// # Panics829/// Panics if `self` is not an initialized provider.830fn storage_update_options(&self) -> PolarsResult<Vec<(PlSmallStr, PlSmallStr)>> {831let py_object = self.unwrap_as_provider_ref();832833Python::attach(|py| {834py_object835.getattr(py, "_storage_update_options")836.map_or(Ok(vec![]), |f| {837let v = f838.call0(py)?839.extract::<pyo3::Bound<'_, PyDict>>(py)840.map_err(pyo3::PyErr::from)?;841842let mut out = Vec::with_capacity(v.len());843844for dict_item in v.call_method0("items")?.try_iter()? {845let (key, value) =846dict_item?.extract::<(PyBackedStr, PyBackedStr)>()?;847848out.push(((&*key).into(), (&*value).into()))849}850851Ok(out)852})853})854}855}856857// Note: We don't consider `is_builder` for hash/eq - we don't expect the same Arc<PythonObject>858// to be referenced as both true and false from the `is_builder` field.859860impl Eq for PythonCredentialProvider {}861862impl PartialEq for PythonCredentialProvider {863fn eq(&self, other: &Self) -> bool {864self.func_addr() == other.func_addr()865}866}867868impl Hash for PythonCredentialProvider {869fn hash<H: std::hash::Hasher>(&self, state: &mut H) {870// # Safety871// * Inner is an `Arc`872// * Visibility is limited to super873// * No code in `mod python_impl` or `super` mutates the Arc inner.874state.write_usize(self.func_addr())875}876}877}878879#[cfg(test)]880mod tests {881#[cfg(feature = "serde")]882#[allow(clippy::redundant_pattern_matching)]883#[test]884fn test_serde() {885use super::*;886887assert!(matches!(888serde_json::to_string(&Some(PlCredentialProvider::from_func(|| {889Box::pin(core::future::ready(PolarsResult::Ok((890ObjectStoreCredential::None,8910,892))))893}))),894Err(_)895));896897assert!(matches!(898serde_json::to_string(&Option::<PlCredentialProvider>::None),899Ok(String { .. })900));901902assert!(matches!(903serde_json::from_str::<Option<PlCredentialProvider>>(904serde_json::to_string(&Option::<PlCredentialProvider>::None)905.unwrap()906.as_str()907),908Ok(None)909));910}911}912913914