Path: blob/main/crates/polars-io/src/catalog/unity/client.rs
6939 views
use polars_core::prelude::PlHashMap;1use polars_core::schema::Schema;2use polars_error::{PolarsResult, polars_bail, to_compute_err};34use super::models::{CatalogInfo, NamespaceInfo, TableCredentials, TableInfo};5use super::schema::schema_to_column_info_list;6use super::utils::{PageWalker, do_request};7use crate::catalog::unity::models::{ColumnInfo, DataSourceFormat, TableType};8use crate::impl_page_walk;9use crate::utils::decode_json_response;1011/// Unity catalog client.12pub struct CatalogClient {13workspace_url: String,14http_client: reqwest::Client,15}1617impl CatalogClient {18pub async fn list_catalogs(&self) -> PolarsResult<Vec<CatalogInfo>> {19ListCatalogs(PageWalker::new(self.http_client.get(format!(20"{}{}",21&self.workspace_url, "/api/2.1/unity-catalog/catalogs"22))))23.read_all_pages()24.await25}2627pub async fn list_namespaces(&self, catalog_name: &str) -> PolarsResult<Vec<NamespaceInfo>> {28ListSchemas(PageWalker::new(29self.http_client30.get(format!(31"{}{}",32&self.workspace_url, "/api/2.1/unity-catalog/schemas"33))34.query(&[("catalog_name", catalog_name)]),35))36.read_all_pages()37.await38}3940pub async fn list_tables(41&self,42catalog_name: &str,43namespace: &str,44) -> PolarsResult<Vec<TableInfo>> {45ListTables(PageWalker::new(46self.http_client47.get(format!(48"{}{}",49&self.workspace_url, "/api/2.1/unity-catalog/tables"50))51.query(&[("catalog_name", catalog_name), ("schema_name", namespace)]),52))53.read_all_pages()54.await55}5657pub async fn get_table_info(58&self,59catalog_name: &str,60namespace: &str,61table_name: &str,62) -> PolarsResult<TableInfo> {63let full_table_name = format!(64"{}.{}.{}",65catalog_name.replace('/', "%2F"),66namespace.replace('/', "%2F"),67table_name.replace('/', "%2F")68);6970let bytes = do_request(71self.http_client72.get(format!(73"{}{}{}",74&self.workspace_url, "/api/2.1/unity-catalog/tables/", full_table_name75))76.query(&[("full_name", full_table_name)]),77)78.await?;7980let out: TableInfo = decode_json_response(&bytes)?;8182Ok(out)83}8485pub async fn get_table_credentials(86&self,87table_id: &str,88write: bool,89) -> PolarsResult<TableCredentials> {90let bytes = do_request(91self.http_client92.post(format!(93"{}{}",94&self.workspace_url, "/api/2.1/unity-catalog/temporary-table-credentials"95))96.query(&[97("table_id", table_id),98("operation", if write { "READ_WRITE" } else { "READ" }),99]),100)101.await?;102103let out: TableCredentials = decode_json_response(&bytes)?;104105Ok(out)106}107108pub async fn create_catalog(109&self,110catalog_name: &str,111comment: Option<&str>,112storage_root: Option<&str>,113) -> PolarsResult<CatalogInfo> {114let resp = do_request(115self.http_client116.post(format!(117"{}{}",118&self.workspace_url, "/api/2.1/unity-catalog/catalogs"119))120.json(&Body {121name: catalog_name,122comment,123storage_root,124}),125)126.await?;127128return decode_json_response(&resp);129130#[derive(serde::Serialize)]131struct Body<'a> {132name: &'a str,133comment: Option<&'a str>,134storage_root: Option<&'a str>,135}136}137138pub async fn delete_catalog(&self, catalog_name: &str, force: bool) -> PolarsResult<()> {139let catalog_name = catalog_name.replace('/', "%2F");140141do_request(142self.http_client143.delete(format!(144"{}{}{}",145&self.workspace_url, "/api/2.1/unity-catalog/catalogs/", catalog_name146))147.query(&[("force", force)]),148)149.await?;150151Ok(())152}153154pub async fn create_namespace(155&self,156catalog_name: &str,157namespace: &str,158comment: Option<&str>,159storage_root: Option<&str>,160) -> PolarsResult<NamespaceInfo> {161let resp = do_request(162self.http_client163.post(format!(164"{}{}",165&self.workspace_url, "/api/2.1/unity-catalog/schemas"166))167.json(&Body {168name: namespace,169catalog_name,170comment,171storage_root,172}),173)174.await?;175176return decode_json_response(&resp);177178#[derive(serde::Serialize)]179struct Body<'a> {180name: &'a str,181catalog_name: &'a str,182comment: Option<&'a str>,183storage_root: Option<&'a str>,184}185}186187pub async fn delete_namespace(188&self,189catalog_name: &str,190namespace: &str,191force: bool,192) -> PolarsResult<()> {193let full_name = format!(194"{}.{}",195catalog_name.replace('/', "%2F"),196namespace.replace('/', "%2F"),197);198199do_request(200self.http_client201.delete(format!(202"{}{}{}",203&self.workspace_url, "/api/2.1/unity-catalog/schemas/", full_name204))205.query(&[("force", force)]),206)207.await?;208209Ok(())210}211212/// Note, `data_source_format` can be None for some `table_type`s.213#[allow(clippy::too_many_arguments)]214pub async fn create_table(215&self,216catalog_name: &str,217namespace: &str,218table_name: &str,219schema: Option<&Schema>,220table_type: &TableType,221data_source_format: Option<&DataSourceFormat>,222comment: Option<&str>,223storage_location: Option<&str>,224properties: &mut (dyn Iterator<Item = (&str, &str)> + Send + Sync),225) -> PolarsResult<TableInfo> {226let columns = schema.map(schema_to_column_info_list).transpose()?;227let columns = columns.as_deref();228229let resp = do_request(230self.http_client231.post(format!(232"{}{}",233&self.workspace_url, "/api/2.1/unity-catalog/tables"234))235.json(&Body {236name: table_name,237catalog_name,238schema_name: namespace,239table_type,240data_source_format,241comment,242columns,243storage_location,244properties: properties.collect(),245}),246)247.await?;248249return decode_json_response(&resp);250251#[derive(serde::Serialize)]252struct Body<'a> {253name: &'a str,254catalog_name: &'a str,255schema_name: &'a str,256comment: Option<&'a str>,257table_type: &'a TableType,258#[serde(skip_serializing_if = "Option::is_none")]259data_source_format: Option<&'a DataSourceFormat>,260columns: Option<&'a [ColumnInfo]>,261storage_location: Option<&'a str>,262properties: PlHashMap<&'a str, &'a str>,263}264}265266pub async fn delete_table(267&self,268catalog_name: &str,269namespace: &str,270table_name: &str,271) -> PolarsResult<()> {272let full_name = format!(273"{}.{}.{}",274catalog_name.replace('/', "%2F"),275namespace.replace('/', "%2F"),276table_name.replace('/', "%2F"),277);278279do_request(self.http_client.delete(format!(280"{}{}{}",281&self.workspace_url, "/api/2.1/unity-catalog/tables/", full_name282)))283.await?;284285Ok(())286}287}288289pub struct CatalogClientBuilder {290workspace_url: Option<String>,291bearer_token: Option<String>,292}293294#[allow(clippy::derivable_impls)]295impl Default for CatalogClientBuilder {296fn default() -> Self {297Self {298workspace_url: None,299bearer_token: None,300}301}302}303304impl CatalogClientBuilder {305pub fn new() -> Self {306Self::default()307}308309pub fn with_workspace_url(mut self, workspace_url: impl Into<String>) -> Self {310self.workspace_url = Some(workspace_url.into());311self312}313314pub fn with_bearer_token(mut self, bearer_token: impl Into<String>) -> Self {315self.bearer_token = Some(bearer_token.into());316self317}318319pub fn build(self) -> PolarsResult<CatalogClient> {320let Some(workspace_url) = self.workspace_url else {321polars_bail!(ComputeError: "expected Some(_) for workspace_url")322};323324Ok(CatalogClient {325workspace_url,326http_client: {327let builder = reqwest::ClientBuilder::new().user_agent("polars");328329let builder = if let Some(bearer_token) = self.bearer_token {330use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue, USER_AGENT};331332let mut headers = HeaderMap::new();333334let mut auth_value =335HeaderValue::from_str(format!("Bearer {bearer_token}").as_str()).unwrap();336auth_value.set_sensitive(true);337338headers.insert(AUTHORIZATION, auth_value);339headers.insert(USER_AGENT, "polars".try_into().unwrap());340341builder.default_headers(headers)342} else {343builder344};345346builder.build().map_err(to_compute_err)?347},348})349}350}351352pub struct ListCatalogs(pub(crate) PageWalker);353impl_page_walk!(ListCatalogs, CatalogInfo, key_name = catalogs);354355pub struct ListSchemas(pub(crate) PageWalker);356impl_page_walk!(ListSchemas, NamespaceInfo, key_name = schemas);357358pub struct ListTables(pub(crate) PageWalker);359impl_page_walk!(ListTables, TableInfo, key_name = tables);360361362