Path: blob/main/crates/polars-python/src/catalog/unity.rs
8353 views
use std::str::FromStr;12use polars::prelude::{CloudScheme, LazyFrame, PlHashMap, PlSmallStr, Schema};3use polars_io::catalog::unity::client::{CatalogClient, CatalogClientBuilder};4use polars_io::catalog::unity::models::{5CatalogInfo, ColumnInfo, DataSourceFormat, NamespaceInfo, TableInfo, TableType,6};7use polars_io::catalog::unity::schema::parse_type_json_str;8use polars_io::pl_async;9use pyo3::exceptions::PyValueError;10use pyo3::sync::PyOnceLock;11use pyo3::types::{PyAnyMethods, PyDict, PyList, PyNone, PyTuple};12use pyo3::{Bound, IntoPyObject, Py, PyAny, PyResult, Python, pyclass, pymethods};1314use crate::io::cloud_options::OptPyCloudOptions;15use crate::lazyframe::PyLazyFrame;16use crate::prelude::Wrap;17use crate::utils::{EnterPolarsExt, to_py_err};1819macro_rules! pydict_insert_keys {20($dict:expr, {$a:expr}) => {21$dict.set_item(stringify!($a), $a)?;22};2324($dict:expr, {$a:expr, $($args:expr),+}) => {25pydict_insert_keys!($dict, { $a });26pydict_insert_keys!($dict, { $($args),+ });27};2829($dict:expr, {$a:expr, $($args:expr),+,}) => {30pydict_insert_keys!($dict, {$a, $($args),+});31};32}3334// Result dataclasses. These are initialized from Python by calling [`PyCatalogClient::init_classes`].3536static CATALOG_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();37static NAMESPACE_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();38static TABLE_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();39static COLUMN_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();4041#[pyclass(frozen)]42pub struct PyCatalogClient(CatalogClient);4344#[pymethods]45impl PyCatalogClient {46#[pyo3(signature = (workspace_url, bearer_token))]47#[staticmethod]48pub fn new(workspace_url: String, bearer_token: Option<String>) -> PyResult<Self> {49let builder = CatalogClientBuilder::new().with_workspace_url(workspace_url);5051let builder = if let Some(bearer_token) = bearer_token {52builder.with_bearer_token(bearer_token)53} else {54builder55};5657builder.build().map(PyCatalogClient).map_err(to_py_err)58}5960pub fn list_catalogs(&self, py: Python) -> PyResult<Py<PyAny>> {61let v = py.enter_polars(|| {62pl_async::get_runtime().block_in_place_on(self.client().list_catalogs())63})?;6465let mut opt_err = None;6667let out = PyList::new(68py,69v.into_iter().map(|x| {70let v = catalog_info_to_pyobject(py, x);71if let Ok(v) = v {72Some(v)73} else {74opt_err.replace(v);75None76}77}),78)?;7980opt_err.transpose()?;8182Ok(out.into())83}8485#[pyo3(signature = (catalog_name))]86pub fn list_namespaces(&self, py: Python<'_>, catalog_name: &str) -> PyResult<Py<PyAny>> {87let v = py.enter_polars(|| {88pl_async::get_runtime().block_in_place_on(self.client().list_namespaces(catalog_name))89})?;9091let mut opt_err = None;9293let out = PyList::new(94py,95v.into_iter().map(|x| {96let v = namespace_info_to_pyobject(py, x);97match v {98Ok(v) => Some(v),99Err(_) => {100opt_err.replace(v);101None102},103}104}),105)?;106107opt_err.transpose()?;108109Ok(out.into())110}111112#[pyo3(signature = (catalog_name, namespace))]113pub fn list_tables(114&self,115py: Python<'_>,116catalog_name: &str,117namespace: &str,118) -> PyResult<Py<PyAny>> {119let v = py.enter_polars(|| {120pl_async::get_runtime()121.block_in_place_on(self.client().list_tables(catalog_name, namespace))122})?;123124let mut opt_err = None;125126let out = PyList::new(127py,128v.into_iter().map(|table_info| {129let v = table_info_to_pyobject(py, table_info);130131if let Ok(v) = v {132Some(v)133} else {134opt_err.replace(v);135None136}137}),138)?139.into();140141opt_err.transpose()?;142143Ok(out)144}145146#[pyo3(signature = (table_name, catalog_name, namespace))]147pub fn get_table_info(148&self,149py: Python<'_>,150table_name: &str,151catalog_name: &str,152namespace: &str,153) -> PyResult<Py<PyAny>> {154let table_info = py155.enter_polars(|| {156pl_async::get_runtime().block_in_place_on(self.client().get_table_info(157table_name,158catalog_name,159namespace,160))161})162.map_err(to_py_err)?;163164table_info_to_pyobject(py, table_info).map(|x| x.into())165}166167#[pyo3(signature = (table_id, write))]168pub fn get_table_credentials(169&self,170py: Python<'_>,171table_id: &str,172write: bool,173) -> PyResult<Py<PyAny>> {174let table_credentials = py175.enter_polars(|| {176pl_async::get_runtime()177.block_in_place_on(self.client().get_table_credentials(table_id, write))178})179.map_err(to_py_err)?;180181let expiry = table_credentials.expiration_time;182183let credentials = PyDict::new(py);184// Keys in here are intended to be injected into `storage_options` from the Python side.185// Note this currently really only exists for `aws_endpoint_url`.186let storage_update_options = PyDict::new(py);187188{189use TableCredentialsVariants::*;190use polars_io::catalog::unity::models::{191TableCredentialsAws, TableCredentialsAzure, TableCredentialsGcp,192TableCredentialsVariants,193};194195match table_credentials.into_enum() {196Some(Aws(TableCredentialsAws {197access_key_id,198secret_access_key,199session_token,200access_point,201})) => {202credentials.set_item("aws_access_key_id", access_key_id)?;203credentials.set_item("aws_secret_access_key", secret_access_key)?;204205if let Some(session_token) = session_token {206credentials.set_item("aws_session_token", session_token)?;207}208209if let Some(access_point) = access_point {210storage_update_options.set_item("aws_endpoint_url", access_point)?;211}212},213Some(Azure(TableCredentialsAzure { sas_token })) => {214credentials.set_item("sas_token", sas_token)?;215},216Some(Gcp(TableCredentialsGcp { oauth_token })) => {217credentials.set_item("bearer_token", oauth_token)?;218},219None => {},220}221}222223let credentials = if credentials.len()? > 0 {224credentials.into_any()225} else {226PyNone::get(py).as_any().clone()227};228let storage_update_options = storage_update_options.into_any();229let expiry = expiry.into_pyobject(py)?.into_any();230231Ok(PyTuple::new(py, [credentials, storage_update_options, expiry])?.into())232}233234#[pyo3(signature = (catalog_name, namespace, table_name, cloud_options, credential_provider))]235pub fn scan_table(236&self,237py: Python<'_>,238catalog_name: &str,239namespace: &str,240table_name: &str,241cloud_options: OptPyCloudOptions,242credential_provider: Option<Py<PyAny>>,243) -> PyResult<PyLazyFrame> {244let table_info = py.enter_polars(|| {245pl_async::get_runtime().block_in_place_on(self.client().get_table_info(246catalog_name,247namespace,248table_name,249))250})?;251252let Some(storage_location) = table_info.storage_location.as_deref() else {253return Err(PyValueError::new_err(254"cannot scan catalog table: no storage_location found",255));256};257258let cloud_options = cloud_options.extract_opt_cloud_options(259CloudScheme::from_path(storage_location),260credential_provider,261)?;262263Ok(LazyFrame::scan_catalog_table(&table_info, cloud_options)264.map_err(to_py_err)?265.into())266}267268#[pyo3(signature = (catalog_name, comment, storage_root))]269pub fn create_catalog(270&self,271py: Python<'_>,272catalog_name: &str,273comment: Option<&str>,274storage_root: Option<&str>,275) -> PyResult<Py<PyAny>> {276let catalog_info = py277.detach(|| {278pl_async::get_runtime().block_in_place_on(self.client().create_catalog(279catalog_name,280comment,281storage_root,282))283})284.map_err(to_py_err)?;285286catalog_info_to_pyobject(py, catalog_info).map(|x| x.into())287}288289#[pyo3(signature = (catalog_name, force))]290pub fn delete_catalog(&self, py: Python<'_>, catalog_name: &str, force: bool) -> PyResult<()> {291py.detach(|| {292pl_async::get_runtime()293.block_in_place_on(self.client().delete_catalog(catalog_name, force))294})295.map_err(to_py_err)296}297298#[pyo3(signature = (catalog_name, namespace, comment, storage_root))]299pub fn create_namespace(300&self,301py: Python<'_>,302catalog_name: &str,303namespace: &str,304comment: Option<&str>,305storage_root: Option<&str>,306) -> PyResult<Py<PyAny>> {307let namespace_info = py308.detach(|| {309pl_async::get_runtime().block_in_place_on(self.client().create_namespace(310catalog_name,311namespace,312comment,313storage_root,314))315})316.map_err(to_py_err)?;317318namespace_info_to_pyobject(py, namespace_info).map(|x| x.into())319}320321#[pyo3(signature = (catalog_name, namespace, force))]322pub fn delete_namespace(323&self,324py: Python<'_>,325catalog_name: &str,326namespace: &str,327force: bool,328) -> PyResult<()> {329py.detach(|| {330pl_async::get_runtime().block_in_place_on(self.client().delete_namespace(331catalog_name,332namespace,333force,334))335})336.map_err(to_py_err)337}338339#[pyo3(signature = (340catalog_name, namespace, table_name, schema, table_type, data_source_format, comment,341storage_root, properties342))]343pub fn create_table(344&self,345py: Python<'_>,346catalog_name: &str,347namespace: &str,348table_name: &str,349schema: Option<Wrap<Schema>>,350table_type: &str,351data_source_format: Option<&str>,352comment: Option<&str>,353storage_root: Option<&str>,354properties: Vec<(String, String)>,355) -> PyResult<Py<PyAny>> {356let table_info = py.detach(|| {357pl_async::get_runtime()358.block_in_place_on(359self.client().create_table(360catalog_name,361namespace,362table_name,363schema.as_ref().map(|x| &x.0),364&TableType::from_str(table_type)365.map_err(|e| PyValueError::new_err(e.to_string()))?,366data_source_format367.map(DataSourceFormat::from_str)368.transpose()369.map_err(|e| PyValueError::new_err(e.to_string()))?370.as_ref(),371comment,372storage_root,373&mut properties.iter().map(|(a, b)| (a.as_str(), b.as_str())),374),375)376.map_err(to_py_err)377})?;378379table_info_to_pyobject(py, table_info).map(|x| x.into())380}381382#[pyo3(signature = (catalog_name, namespace, table_name))]383pub fn delete_table(384&self,385py: Python<'_>,386catalog_name: &str,387namespace: &str,388table_name: &str,389) -> PyResult<()> {390py.detach(|| {391pl_async::get_runtime().block_in_place_on(self.client().delete_table(392catalog_name,393namespace,394table_name,395))396})397.map_err(to_py_err)398}399400#[pyo3(signature = (type_json))]401#[staticmethod]402pub fn type_json_to_polars_type(py: Python<'_>, type_json: &str) -> PyResult<Py<PyAny>> {403Ok(Wrap(parse_type_json_str(type_json).map_err(to_py_err)?)404.into_pyobject(py)?405.unbind())406}407408#[pyo3(signature = (catalog_info_cls, namespace_info_cls, table_info_cls, column_info_cls))]409#[staticmethod]410pub fn init_classes(411py: Python<'_>,412catalog_info_cls: Py<PyAny>,413namespace_info_cls: Py<PyAny>,414table_info_cls: Py<PyAny>,415column_info_cls: Py<PyAny>,416) {417CATALOG_INFO_CLS.get_or_init(py, || catalog_info_cls);418NAMESPACE_INFO_CLS.get_or_init(py, || namespace_info_cls);419TABLE_INFO_CLS.get_or_init(py, || table_info_cls);420COLUMN_INFO_CLS.get_or_init(py, || column_info_cls);421}422}423424impl PyCatalogClient {425fn client(&self) -> &CatalogClient {426&self.0427}428}429430fn catalog_info_to_pyobject(431py: Python<'_>,432CatalogInfo {433name,434comment,435storage_location,436properties,437options,438created_at,439created_by,440updated_at,441updated_by,442}: CatalogInfo,443) -> PyResult<Bound<'_, PyAny>> {444let dict = PyDict::new(py);445446let properties = properties_to_pyobject(py, properties);447let options = properties_to_pyobject(py, options);448449pydict_insert_keys!(dict, {450name,451comment,452storage_location,453properties,454options,455created_at,456created_by,457updated_at,458updated_by459});460461CATALOG_INFO_CLS462.get(py)463.unwrap()464.bind(py)465.call((), Some(&dict))466}467468fn namespace_info_to_pyobject(469py: Python<'_>,470NamespaceInfo {471name,472comment,473properties,474storage_location,475created_at,476created_by,477updated_at,478updated_by,479}: NamespaceInfo,480) -> PyResult<Bound<'_, PyAny>> {481let dict = PyDict::new(py);482483let properties = properties_to_pyobject(py, properties);484485pydict_insert_keys!(dict, {486name,487comment,488properties,489storage_location,490created_at,491created_by,492updated_at,493updated_by494});495496NAMESPACE_INFO_CLS497.get(py)498.unwrap()499.bind(py)500.call((), Some(&dict))501}502503fn table_info_to_pyobject(py: Python<'_>, table_info: TableInfo) -> PyResult<Bound<'_, PyAny>> {504let TableInfo {505name,506table_id,507table_type,508comment,509storage_location,510data_source_format,511columns,512properties,513created_at,514created_by,515updated_at,516updated_by,517} = table_info;518519let column_info_cls = COLUMN_INFO_CLS.get(py).unwrap().bind(py);520521let columns = columns522.map(|columns| {523columns524.into_iter()525.map(526|ColumnInfo {527name,528type_name,529type_text,530type_json,531position,532comment,533partition_index,534}| {535let dict = PyDict::new(py);536537let name = name.as_str();538let type_name = type_name.as_str();539let type_text = type_text.as_str();540541pydict_insert_keys!(dict, {542name,543type_name,544type_text,545type_json,546position,547comment,548partition_index,549});550551column_info_cls.call((), Some(&dict))552},553)554.collect::<PyResult<Vec<_>>>()555})556.transpose()?;557558let dict = PyDict::new(py);559560let data_source_format = data_source_format.map(|x| x.to_string());561let table_type = table_type.to_string();562let properties = properties_to_pyobject(py, properties);563564pydict_insert_keys!(dict, {565name,566comment,567table_id,568table_type,569storage_location,570data_source_format,571columns,572properties,573created_at,574created_by,575updated_at,576updated_by,577});578579TABLE_INFO_CLS580.get(py)581.unwrap()582.bind(py)583.call((), Some(&dict))584}585586fn properties_to_pyobject(587py: Python<'_>,588properties: PlHashMap<PlSmallStr, String>,589) -> Bound<'_, PyDict> {590let dict = PyDict::new(py);591592for (key, value) in properties.into_iter() {593dict.set_item(key.as_str(), value).unwrap();594}595596dict597}598599600