Path: blob/main/crates/polars-python/src/catalog/unity.rs
7889 views
use std::str::FromStr;12use polars::prelude::{CloudScheme, LazyFrame, PlHashMap, PlSmallStr, Schema};3use polars_io::catalog::unity::client::{CatalogClient, CatalogClientBuilder};4use polars_io::catalog::unity::models::{5CatalogInfo, ColumnInfo, DataSourceFormat, NamespaceInfo, TableInfo, TableType,6};7use polars_io::catalog::unity::schema::parse_type_json_str;8use polars_io::cloud::credential_provider::PlCredentialProvider;9use polars_io::pl_async;10use pyo3::exceptions::PyValueError;11use pyo3::sync::PyOnceLock;12use pyo3::types::{PyAnyMethods, PyDict, PyList, PyNone, PyTuple};13use pyo3::{Bound, IntoPyObject, Py, PyAny, PyResult, Python, pyclass, pymethods};1415use crate::lazyframe::PyLazyFrame;16use crate::prelude::{Wrap, parse_cloud_options};17use crate::utils::{EnterPolarsExt, to_py_err};1819macro_rules! pydict_insert_keys {20($dict:expr, {$a:expr}) => {21$dict.set_item(stringify!($a), $a)?;22};2324($dict:expr, {$a:expr, $($args:expr),+}) => {25pydict_insert_keys!($dict, { $a });26pydict_insert_keys!($dict, { $($args),+ });27};2829($dict:expr, {$a:expr, $($args:expr),+,}) => {30pydict_insert_keys!($dict, {$a, $($args),+});31};32}3334// Result dataclasses. These are initialized from Python by calling [`PyCatalogClient::init_classes`].3536static CATALOG_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();37static NAMESPACE_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();38static TABLE_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();39static COLUMN_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();4041#[pyclass(frozen)]42pub struct PyCatalogClient(CatalogClient);4344#[pymethods]45impl PyCatalogClient {46#[pyo3(signature = (workspace_url, bearer_token))]47#[staticmethod]48pub fn new(workspace_url: String, bearer_token: Option<String>) -> PyResult<Self> {49let builder = CatalogClientBuilder::new().with_workspace_url(workspace_url);5051let builder = if let Some(bearer_token) = bearer_token {52builder.with_bearer_token(bearer_token)53} else {54builder55};5657builder.build().map(PyCatalogClient).map_err(to_py_err)58}5960pub fn list_catalogs(&self, py: Python) -> PyResult<Py<PyAny>> {61let v = py.enter_polars(|| {62pl_async::get_runtime().block_in_place_on(self.client().list_catalogs())63})?;6465let mut opt_err = None;6667let out = PyList::new(68py,69v.into_iter().map(|x| {70let v = catalog_info_to_pyobject(py, x);71if let Ok(v) = v {72Some(v)73} else {74opt_err.replace(v);75None76}77}),78)?;7980opt_err.transpose()?;8182Ok(out.into())83}8485#[pyo3(signature = (catalog_name))]86pub fn list_namespaces(&self, py: Python<'_>, catalog_name: &str) -> PyResult<Py<PyAny>> {87let v = py.enter_polars(|| {88pl_async::get_runtime().block_in_place_on(self.client().list_namespaces(catalog_name))89})?;9091let mut opt_err = None;9293let out = PyList::new(94py,95v.into_iter().map(|x| {96let v = namespace_info_to_pyobject(py, x);97match v {98Ok(v) => Some(v),99Err(_) => {100opt_err.replace(v);101None102},103}104}),105)?;106107opt_err.transpose()?;108109Ok(out.into())110}111112#[pyo3(signature = (catalog_name, namespace))]113pub fn list_tables(114&self,115py: Python<'_>,116catalog_name: &str,117namespace: &str,118) -> PyResult<Py<PyAny>> {119let v = py.enter_polars(|| {120pl_async::get_runtime()121.block_in_place_on(self.client().list_tables(catalog_name, namespace))122})?;123124let mut opt_err = None;125126let out = PyList::new(127py,128v.into_iter().map(|table_info| {129let v = table_info_to_pyobject(py, table_info);130131if let Ok(v) = v {132Some(v)133} else {134opt_err.replace(v);135None136}137}),138)?139.into();140141opt_err.transpose()?;142143Ok(out)144}145146#[pyo3(signature = (table_name, catalog_name, namespace))]147pub fn get_table_info(148&self,149py: Python<'_>,150table_name: &str,151catalog_name: &str,152namespace: &str,153) -> PyResult<Py<PyAny>> {154let table_info = py155.enter_polars(|| {156pl_async::get_runtime().block_in_place_on(self.client().get_table_info(157table_name,158catalog_name,159namespace,160))161})162.map_err(to_py_err)?;163164table_info_to_pyobject(py, table_info).map(|x| x.into())165}166167#[pyo3(signature = (table_id, write))]168pub fn get_table_credentials(169&self,170py: Python<'_>,171table_id: &str,172write: bool,173) -> PyResult<Py<PyAny>> {174let table_credentials = py175.enter_polars(|| {176pl_async::get_runtime()177.block_in_place_on(self.client().get_table_credentials(table_id, write))178})179.map_err(to_py_err)?;180181let expiry = table_credentials.expiration_time;182183let credentials = PyDict::new(py);184// Keys in here are intended to be injected into `storage_options` from the Python side.185// Note this currently really only exists for `aws_endpoint_url`.186let storage_update_options = PyDict::new(py);187188{189use TableCredentialsVariants::*;190use polars_io::catalog::unity::models::{191TableCredentialsAws, TableCredentialsAzure, TableCredentialsGcp,192TableCredentialsVariants,193};194195match table_credentials.into_enum() {196Some(Aws(TableCredentialsAws {197access_key_id,198secret_access_key,199session_token,200access_point,201})) => {202credentials.set_item("aws_access_key_id", access_key_id)?;203credentials.set_item("aws_secret_access_key", secret_access_key)?;204205if let Some(session_token) = session_token {206credentials.set_item("aws_session_token", session_token)?;207}208209if let Some(access_point) = access_point {210storage_update_options.set_item("aws_endpoint_url", access_point)?;211}212},213Some(Azure(TableCredentialsAzure { sas_token })) => {214credentials.set_item("sas_token", sas_token)?;215},216Some(Gcp(TableCredentialsGcp { oauth_token })) => {217credentials.set_item("bearer_token", oauth_token)?;218},219None => {},220}221}222223let credentials = if credentials.len()? > 0 {224credentials.into_any()225} else {226PyNone::get(py).as_any().clone()227};228let storage_update_options = storage_update_options.into_any();229let expiry = expiry.into_pyobject(py)?.into_any();230231Ok(PyTuple::new(py, [credentials, storage_update_options, expiry])?.into())232}233234#[pyo3(signature = (catalog_name, namespace, table_name, cloud_options, credential_provider, retries))]235pub fn scan_table(236&self,237py: Python<'_>,238catalog_name: &str,239namespace: &str,240table_name: &str,241cloud_options: Option<Vec<(String, String)>>,242credential_provider: Option<Py<PyAny>>,243retries: usize,244) -> PyResult<PyLazyFrame> {245let table_info = py.enter_polars(|| {246pl_async::get_runtime().block_in_place_on(self.client().get_table_info(247catalog_name,248namespace,249table_name,250))251})?;252253let Some(storage_location) = table_info.storage_location.as_deref() else {254return Err(PyValueError::new_err(255"cannot scan catalog table: no storage_location found",256));257};258259let cloud_options = parse_cloud_options(260CloudScheme::from_uri(storage_location),261cloud_options.unwrap_or_default(),262)?263.with_max_retries(retries)264.with_credential_provider(265credential_provider.map(PlCredentialProvider::from_python_builder),266);267268Ok(269LazyFrame::scan_catalog_table(&table_info, Some(cloud_options))270.map_err(to_py_err)?271.into(),272)273}274275#[pyo3(signature = (catalog_name, comment, storage_root))]276pub fn create_catalog(277&self,278py: Python<'_>,279catalog_name: &str,280comment: Option<&str>,281storage_root: Option<&str>,282) -> PyResult<Py<PyAny>> {283let catalog_info = py284.detach(|| {285pl_async::get_runtime().block_in_place_on(self.client().create_catalog(286catalog_name,287comment,288storage_root,289))290})291.map_err(to_py_err)?;292293catalog_info_to_pyobject(py, catalog_info).map(|x| x.into())294}295296#[pyo3(signature = (catalog_name, force))]297pub fn delete_catalog(&self, py: Python<'_>, catalog_name: &str, force: bool) -> PyResult<()> {298py.detach(|| {299pl_async::get_runtime()300.block_in_place_on(self.client().delete_catalog(catalog_name, force))301})302.map_err(to_py_err)303}304305#[pyo3(signature = (catalog_name, namespace, comment, storage_root))]306pub fn create_namespace(307&self,308py: Python<'_>,309catalog_name: &str,310namespace: &str,311comment: Option<&str>,312storage_root: Option<&str>,313) -> PyResult<Py<PyAny>> {314let namespace_info = py315.detach(|| {316pl_async::get_runtime().block_in_place_on(self.client().create_namespace(317catalog_name,318namespace,319comment,320storage_root,321))322})323.map_err(to_py_err)?;324325namespace_info_to_pyobject(py, namespace_info).map(|x| x.into())326}327328#[pyo3(signature = (catalog_name, namespace, force))]329pub fn delete_namespace(330&self,331py: Python<'_>,332catalog_name: &str,333namespace: &str,334force: bool,335) -> PyResult<()> {336py.detach(|| {337pl_async::get_runtime().block_in_place_on(self.client().delete_namespace(338catalog_name,339namespace,340force,341))342})343.map_err(to_py_err)344}345346#[pyo3(signature = (347catalog_name, namespace, table_name, schema, table_type, data_source_format, comment,348storage_root, properties349))]350pub fn create_table(351&self,352py: Python<'_>,353catalog_name: &str,354namespace: &str,355table_name: &str,356schema: Option<Wrap<Schema>>,357table_type: &str,358data_source_format: Option<&str>,359comment: Option<&str>,360storage_root: Option<&str>,361properties: Vec<(String, String)>,362) -> PyResult<Py<PyAny>> {363let table_info = py.detach(|| {364pl_async::get_runtime()365.block_in_place_on(366self.client().create_table(367catalog_name,368namespace,369table_name,370schema.as_ref().map(|x| &x.0),371&TableType::from_str(table_type)372.map_err(|e| PyValueError::new_err(e.to_string()))?,373data_source_format374.map(DataSourceFormat::from_str)375.transpose()376.map_err(|e| PyValueError::new_err(e.to_string()))?377.as_ref(),378comment,379storage_root,380&mut properties.iter().map(|(a, b)| (a.as_str(), b.as_str())),381),382)383.map_err(to_py_err)384})?;385386table_info_to_pyobject(py, table_info).map(|x| x.into())387}388389#[pyo3(signature = (catalog_name, namespace, table_name))]390pub fn delete_table(391&self,392py: Python<'_>,393catalog_name: &str,394namespace: &str,395table_name: &str,396) -> PyResult<()> {397py.detach(|| {398pl_async::get_runtime().block_in_place_on(self.client().delete_table(399catalog_name,400namespace,401table_name,402))403})404.map_err(to_py_err)405}406407#[pyo3(signature = (type_json))]408#[staticmethod]409pub fn type_json_to_polars_type(py: Python<'_>, type_json: &str) -> PyResult<Py<PyAny>> {410Ok(Wrap(parse_type_json_str(type_json).map_err(to_py_err)?)411.into_pyobject(py)?412.unbind())413}414415#[pyo3(signature = (catalog_info_cls, namespace_info_cls, table_info_cls, column_info_cls))]416#[staticmethod]417pub fn init_classes(418py: Python<'_>,419catalog_info_cls: Py<PyAny>,420namespace_info_cls: Py<PyAny>,421table_info_cls: Py<PyAny>,422column_info_cls: Py<PyAny>,423) {424CATALOG_INFO_CLS.get_or_init(py, || catalog_info_cls);425NAMESPACE_INFO_CLS.get_or_init(py, || namespace_info_cls);426TABLE_INFO_CLS.get_or_init(py, || table_info_cls);427COLUMN_INFO_CLS.get_or_init(py, || column_info_cls);428}429}430431impl PyCatalogClient {432fn client(&self) -> &CatalogClient {433&self.0434}435}436437fn catalog_info_to_pyobject(438py: Python<'_>,439CatalogInfo {440name,441comment,442storage_location,443properties,444options,445created_at,446created_by,447updated_at,448updated_by,449}: CatalogInfo,450) -> PyResult<Bound<'_, PyAny>> {451let dict = PyDict::new(py);452453let properties = properties_to_pyobject(py, properties);454let options = properties_to_pyobject(py, options);455456pydict_insert_keys!(dict, {457name,458comment,459storage_location,460properties,461options,462created_at,463created_by,464updated_at,465updated_by466});467468CATALOG_INFO_CLS469.get(py)470.unwrap()471.bind(py)472.call((), Some(&dict))473}474475fn namespace_info_to_pyobject(476py: Python<'_>,477NamespaceInfo {478name,479comment,480properties,481storage_location,482created_at,483created_by,484updated_at,485updated_by,486}: NamespaceInfo,487) -> PyResult<Bound<'_, PyAny>> {488let dict = PyDict::new(py);489490let properties = properties_to_pyobject(py, properties);491492pydict_insert_keys!(dict, {493name,494comment,495properties,496storage_location,497created_at,498created_by,499updated_at,500updated_by501});502503NAMESPACE_INFO_CLS504.get(py)505.unwrap()506.bind(py)507.call((), Some(&dict))508}509510fn table_info_to_pyobject(py: Python<'_>, table_info: TableInfo) -> PyResult<Bound<'_, PyAny>> {511let TableInfo {512name,513table_id,514table_type,515comment,516storage_location,517data_source_format,518columns,519properties,520created_at,521created_by,522updated_at,523updated_by,524} = table_info;525526let column_info_cls = COLUMN_INFO_CLS.get(py).unwrap().bind(py);527528let columns = columns529.map(|columns| {530columns531.into_iter()532.map(533|ColumnInfo {534name,535type_name,536type_text,537type_json,538position,539comment,540partition_index,541}| {542let dict = PyDict::new(py);543544let name = name.as_str();545let type_name = type_name.as_str();546let type_text = type_text.as_str();547548pydict_insert_keys!(dict, {549name,550type_name,551type_text,552type_json,553position,554comment,555partition_index,556});557558column_info_cls.call((), Some(&dict))559},560)561.collect::<PyResult<Vec<_>>>()562})563.transpose()?;564565let dict = PyDict::new(py);566567let data_source_format = data_source_format.map(|x| x.to_string());568let table_type = table_type.to_string();569let properties = properties_to_pyobject(py, properties);570571pydict_insert_keys!(dict, {572name,573comment,574table_id,575table_type,576storage_location,577data_source_format,578columns,579properties,580created_at,581created_by,582updated_at,583updated_by,584});585586TABLE_INFO_CLS587.get(py)588.unwrap()589.bind(py)590.call((), Some(&dict))591}592593fn properties_to_pyobject(594py: Python<'_>,595properties: PlHashMap<PlSmallStr, String>,596) -> Bound<'_, PyDict> {597let dict = PyDict::new(py);598599for (key, value) in properties.into_iter() {600dict.set_item(key.as_str(), value).unwrap();601}602603dict604}605606607