Path: blob/main/crates/polars-io/src/catalog/unity/client.rs
8426 views
use polars_core::prelude::PlHashMap;1use polars_core::schema::Schema;2use polars_error::{PolarsResult, polars_bail, to_compute_err};34use super::models::{CatalogInfo, NamespaceInfo, TableCredentials, TableInfo};5use super::schema::schema_to_column_info_list;6use super::utils::{PageWalker, do_request};7use crate::catalog::unity::models::{ColumnInfo, DataSourceFormat, TableType};8use crate::cloud::USER_AGENT;9use crate::impl_page_walk;10use crate::utils::decode_json_response;1112/// Unity catalog client.13pub struct CatalogClient {14workspace_url: String,15http_client: reqwest::Client,16}1718impl CatalogClient {19pub async fn list_catalogs(&self) -> PolarsResult<Vec<CatalogInfo>> {20ListCatalogs(PageWalker::new(self.http_client.get(format!(21"{}{}",22&self.workspace_url, "/api/2.1/unity-catalog/catalogs"23))))24.read_all_pages()25.await26}2728pub async fn list_namespaces(&self, catalog_name: &str) -> PolarsResult<Vec<NamespaceInfo>> {29ListSchemas(PageWalker::new(30self.http_client31.get(format!(32"{}{}",33&self.workspace_url, "/api/2.1/unity-catalog/schemas"34))35.query(&[("catalog_name", catalog_name)]),36))37.read_all_pages()38.await39}4041pub async fn list_tables(42&self,43catalog_name: &str,44namespace: &str,45) -> PolarsResult<Vec<TableInfo>> {46ListTables(PageWalker::new(47self.http_client48.get(format!(49"{}{}",50&self.workspace_url, "/api/2.1/unity-catalog/tables"51))52.query(&[("catalog_name", catalog_name), ("schema_name", namespace)]),53))54.read_all_pages()55.await56}5758pub async fn get_table_info(59&self,60catalog_name: &str,61namespace: &str,62table_name: &str,63) -> PolarsResult<TableInfo> {64let full_table_name = format!(65"{}.{}.{}",66catalog_name.replace('/', "%2F"),67namespace.replace('/', "%2F"),68table_name.replace('/', "%2F")69);7071let bytes = do_request(72self.http_client73.get(format!(74"{}{}{}",75&self.workspace_url, "/api/2.1/unity-catalog/tables/", full_table_name76))77.query(&[("full_name", full_table_name)]),78)79.await?;8081let out: TableInfo = decode_json_response(&bytes)?;8283Ok(out)84}8586pub async fn get_table_credentials(87&self,88table_id: &str,89write: bool,90) -> PolarsResult<TableCredentials> {91let bytes = do_request(92self.http_client93.post(format!(94"{}{}",95&self.workspace_url, "/api/2.1/unity-catalog/temporary-table-credentials"96))97.json(&Body {98table_id,99operation: if write { "READ_WRITE" } else { "READ" },100}),101)102.await?;103104let out: TableCredentials = decode_json_response(&bytes)?;105106return Ok(out);107108#[derive(serde::Serialize)]109struct Body<'a> {110table_id: &'a str,111operation: &'a str,112}113}114115pub async fn create_catalog(116&self,117catalog_name: &str,118comment: Option<&str>,119storage_root: Option<&str>,120) -> PolarsResult<CatalogInfo> {121let resp = do_request(122self.http_client123.post(format!(124"{}{}",125&self.workspace_url, "/api/2.1/unity-catalog/catalogs"126))127.json(&Body {128name: catalog_name,129comment,130storage_root,131}),132)133.await?;134135return decode_json_response(&resp);136137#[derive(serde::Serialize)]138struct Body<'a> {139name: &'a str,140comment: Option<&'a str>,141storage_root: Option<&'a str>,142}143}144145pub async fn delete_catalog(&self, catalog_name: &str, force: bool) -> PolarsResult<()> {146let catalog_name = catalog_name.replace('/', "%2F");147148do_request(149self.http_client150.delete(format!(151"{}{}{}",152&self.workspace_url, "/api/2.1/unity-catalog/catalogs/", catalog_name153))154.query(&[("force", force)]),155)156.await?;157158Ok(())159}160161pub async fn create_namespace(162&self,163catalog_name: &str,164namespace: &str,165comment: Option<&str>,166storage_root: Option<&str>,167) -> PolarsResult<NamespaceInfo> {168let resp = do_request(169self.http_client170.post(format!(171"{}{}",172&self.workspace_url, "/api/2.1/unity-catalog/schemas"173))174.json(&Body {175name: namespace,176catalog_name,177comment,178storage_root,179}),180)181.await?;182183return decode_json_response(&resp);184185#[derive(serde::Serialize)]186struct Body<'a> {187name: &'a str,188catalog_name: &'a str,189comment: Option<&'a str>,190storage_root: Option<&'a str>,191}192}193194pub async fn delete_namespace(195&self,196catalog_name: &str,197namespace: &str,198force: bool,199) -> PolarsResult<()> {200let full_name = format!(201"{}.{}",202catalog_name.replace('/', "%2F"),203namespace.replace('/', "%2F"),204);205206do_request(207self.http_client208.delete(format!(209"{}{}{}",210&self.workspace_url, "/api/2.1/unity-catalog/schemas/", full_name211))212.query(&[("force", force)]),213)214.await?;215216Ok(())217}218219/// Note, `data_source_format` can be None for some `table_type`s.220#[allow(clippy::too_many_arguments)]221pub async fn create_table(222&self,223catalog_name: &str,224namespace: &str,225table_name: &str,226schema: Option<&Schema>,227table_type: &TableType,228data_source_format: Option<&DataSourceFormat>,229comment: Option<&str>,230storage_location: Option<&str>,231properties: &mut (dyn Iterator<Item = (&str, &str)> + Send + Sync),232) -> PolarsResult<TableInfo> {233let columns = schema.map(schema_to_column_info_list).transpose()?;234let columns = columns.as_deref();235236let resp = do_request(237self.http_client238.post(format!(239"{}{}",240&self.workspace_url, "/api/2.1/unity-catalog/tables"241))242.json(&Body {243name: table_name,244catalog_name,245schema_name: namespace,246table_type,247data_source_format,248comment,249columns,250storage_location,251properties: properties.collect(),252}),253)254.await?;255256return decode_json_response(&resp);257258#[derive(serde::Serialize)]259struct Body<'a> {260name: &'a str,261catalog_name: &'a str,262schema_name: &'a str,263comment: Option<&'a str>,264table_type: &'a TableType,265#[serde(skip_serializing_if = "Option::is_none")]266data_source_format: Option<&'a DataSourceFormat>,267columns: Option<&'a [ColumnInfo]>,268storage_location: Option<&'a str>,269properties: PlHashMap<&'a str, &'a str>,270}271}272273pub async fn delete_table(274&self,275catalog_name: &str,276namespace: &str,277table_name: &str,278) -> PolarsResult<()> {279let full_name = format!(280"{}.{}.{}",281catalog_name.replace('/', "%2F"),282namespace.replace('/', "%2F"),283table_name.replace('/', "%2F"),284);285286do_request(self.http_client.delete(format!(287"{}{}{}",288&self.workspace_url, "/api/2.1/unity-catalog/tables/", full_name289)))290.await?;291292Ok(())293}294}295296pub struct CatalogClientBuilder {297workspace_url: Option<String>,298bearer_token: Option<String>,299}300301#[allow(clippy::derivable_impls)]302impl Default for CatalogClientBuilder {303fn default() -> Self {304Self {305workspace_url: None,306bearer_token: None,307}308}309}310311impl CatalogClientBuilder {312pub fn new() -> Self {313Self::default()314}315316pub fn with_workspace_url(mut self, workspace_url: impl Into<String>) -> Self {317self.workspace_url = Some(workspace_url.into());318self319}320321pub fn with_bearer_token(mut self, bearer_token: impl Into<String>) -> Self {322self.bearer_token = Some(bearer_token.into());323self324}325326pub fn build(self) -> PolarsResult<CatalogClient> {327let Some(workspace_url) = self.workspace_url else {328polars_bail!(ComputeError: "expected Some(_) for workspace_url")329};330331Ok(CatalogClient {332workspace_url,333http_client: {334let builder = reqwest::ClientBuilder::new().user_agent(USER_AGENT);335336let builder = if let Some(bearer_token) = self.bearer_token {337use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};338339let mut headers = HeaderMap::new();340341let mut auth_value =342HeaderValue::from_str(format!("Bearer {bearer_token}").as_str()).unwrap();343auth_value.set_sensitive(true);344345headers.insert(AUTHORIZATION, auth_value);346headers.insert(reqwest::header::USER_AGENT, USER_AGENT.try_into().unwrap());347348builder.default_headers(headers)349} else {350builder351};352353builder.build().map_err(to_compute_err)?354},355})356}357}358359pub struct ListCatalogs(pub(crate) PageWalker);360impl_page_walk!(ListCatalogs, CatalogInfo, key_name = catalogs);361362pub struct ListSchemas(pub(crate) PageWalker);363impl_page_walk!(ListSchemas, NamespaceInfo, key_name = schemas);364365pub struct ListTables(pub(crate) PageWalker);366impl_page_walk!(ListTables, TableInfo, key_name = tables);367368369