Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/catalog/unity/client.rs
8426 views
1
use polars_core::prelude::PlHashMap;
2
use polars_core::schema::Schema;
3
use polars_error::{PolarsResult, polars_bail, to_compute_err};
4
5
use super::models::{CatalogInfo, NamespaceInfo, TableCredentials, TableInfo};
6
use super::schema::schema_to_column_info_list;
7
use super::utils::{PageWalker, do_request};
8
use crate::catalog::unity::models::{ColumnInfo, DataSourceFormat, TableType};
9
use crate::cloud::USER_AGENT;
10
use crate::impl_page_walk;
11
use crate::utils::decode_json_response;
12
13
/// Unity catalog client.
14
pub struct CatalogClient {
15
workspace_url: String,
16
http_client: reqwest::Client,
17
}
18
19
impl CatalogClient {
20
pub async fn list_catalogs(&self) -> PolarsResult<Vec<CatalogInfo>> {
21
ListCatalogs(PageWalker::new(self.http_client.get(format!(
22
"{}{}",
23
&self.workspace_url, "/api/2.1/unity-catalog/catalogs"
24
))))
25
.read_all_pages()
26
.await
27
}
28
29
pub async fn list_namespaces(&self, catalog_name: &str) -> PolarsResult<Vec<NamespaceInfo>> {
30
ListSchemas(PageWalker::new(
31
self.http_client
32
.get(format!(
33
"{}{}",
34
&self.workspace_url, "/api/2.1/unity-catalog/schemas"
35
))
36
.query(&[("catalog_name", catalog_name)]),
37
))
38
.read_all_pages()
39
.await
40
}
41
42
pub async fn list_tables(
43
&self,
44
catalog_name: &str,
45
namespace: &str,
46
) -> PolarsResult<Vec<TableInfo>> {
47
ListTables(PageWalker::new(
48
self.http_client
49
.get(format!(
50
"{}{}",
51
&self.workspace_url, "/api/2.1/unity-catalog/tables"
52
))
53
.query(&[("catalog_name", catalog_name), ("schema_name", namespace)]),
54
))
55
.read_all_pages()
56
.await
57
}
58
59
pub async fn get_table_info(
60
&self,
61
catalog_name: &str,
62
namespace: &str,
63
table_name: &str,
64
) -> PolarsResult<TableInfo> {
65
let full_table_name = format!(
66
"{}.{}.{}",
67
catalog_name.replace('/', "%2F"),
68
namespace.replace('/', "%2F"),
69
table_name.replace('/', "%2F")
70
);
71
72
let bytes = do_request(
73
self.http_client
74
.get(format!(
75
"{}{}{}",
76
&self.workspace_url, "/api/2.1/unity-catalog/tables/", full_table_name
77
))
78
.query(&[("full_name", full_table_name)]),
79
)
80
.await?;
81
82
let out: TableInfo = decode_json_response(&bytes)?;
83
84
Ok(out)
85
}
86
87
pub async fn get_table_credentials(
88
&self,
89
table_id: &str,
90
write: bool,
91
) -> PolarsResult<TableCredentials> {
92
let bytes = do_request(
93
self.http_client
94
.post(format!(
95
"{}{}",
96
&self.workspace_url, "/api/2.1/unity-catalog/temporary-table-credentials"
97
))
98
.json(&Body {
99
table_id,
100
operation: if write { "READ_WRITE" } else { "READ" },
101
}),
102
)
103
.await?;
104
105
let out: TableCredentials = decode_json_response(&bytes)?;
106
107
return Ok(out);
108
109
#[derive(serde::Serialize)]
110
struct Body<'a> {
111
table_id: &'a str,
112
operation: &'a str,
113
}
114
}
115
116
pub async fn create_catalog(
117
&self,
118
catalog_name: &str,
119
comment: Option<&str>,
120
storage_root: Option<&str>,
121
) -> PolarsResult<CatalogInfo> {
122
let resp = do_request(
123
self.http_client
124
.post(format!(
125
"{}{}",
126
&self.workspace_url, "/api/2.1/unity-catalog/catalogs"
127
))
128
.json(&Body {
129
name: catalog_name,
130
comment,
131
storage_root,
132
}),
133
)
134
.await?;
135
136
return decode_json_response(&resp);
137
138
#[derive(serde::Serialize)]
139
struct Body<'a> {
140
name: &'a str,
141
comment: Option<&'a str>,
142
storage_root: Option<&'a str>,
143
}
144
}
145
146
pub async fn delete_catalog(&self, catalog_name: &str, force: bool) -> PolarsResult<()> {
147
let catalog_name = catalog_name.replace('/', "%2F");
148
149
do_request(
150
self.http_client
151
.delete(format!(
152
"{}{}{}",
153
&self.workspace_url, "/api/2.1/unity-catalog/catalogs/", catalog_name
154
))
155
.query(&[("force", force)]),
156
)
157
.await?;
158
159
Ok(())
160
}
161
162
pub async fn create_namespace(
163
&self,
164
catalog_name: &str,
165
namespace: &str,
166
comment: Option<&str>,
167
storage_root: Option<&str>,
168
) -> PolarsResult<NamespaceInfo> {
169
let resp = do_request(
170
self.http_client
171
.post(format!(
172
"{}{}",
173
&self.workspace_url, "/api/2.1/unity-catalog/schemas"
174
))
175
.json(&Body {
176
name: namespace,
177
catalog_name,
178
comment,
179
storage_root,
180
}),
181
)
182
.await?;
183
184
return decode_json_response(&resp);
185
186
#[derive(serde::Serialize)]
187
struct Body<'a> {
188
name: &'a str,
189
catalog_name: &'a str,
190
comment: Option<&'a str>,
191
storage_root: Option<&'a str>,
192
}
193
}
194
195
pub async fn delete_namespace(
196
&self,
197
catalog_name: &str,
198
namespace: &str,
199
force: bool,
200
) -> PolarsResult<()> {
201
let full_name = format!(
202
"{}.{}",
203
catalog_name.replace('/', "%2F"),
204
namespace.replace('/', "%2F"),
205
);
206
207
do_request(
208
self.http_client
209
.delete(format!(
210
"{}{}{}",
211
&self.workspace_url, "/api/2.1/unity-catalog/schemas/", full_name
212
))
213
.query(&[("force", force)]),
214
)
215
.await?;
216
217
Ok(())
218
}
219
220
/// Note, `data_source_format` can be None for some `table_type`s.
221
#[allow(clippy::too_many_arguments)]
222
pub async fn create_table(
223
&self,
224
catalog_name: &str,
225
namespace: &str,
226
table_name: &str,
227
schema: Option<&Schema>,
228
table_type: &TableType,
229
data_source_format: Option<&DataSourceFormat>,
230
comment: Option<&str>,
231
storage_location: Option<&str>,
232
properties: &mut (dyn Iterator<Item = (&str, &str)> + Send + Sync),
233
) -> PolarsResult<TableInfo> {
234
let columns = schema.map(schema_to_column_info_list).transpose()?;
235
let columns = columns.as_deref();
236
237
let resp = do_request(
238
self.http_client
239
.post(format!(
240
"{}{}",
241
&self.workspace_url, "/api/2.1/unity-catalog/tables"
242
))
243
.json(&Body {
244
name: table_name,
245
catalog_name,
246
schema_name: namespace,
247
table_type,
248
data_source_format,
249
comment,
250
columns,
251
storage_location,
252
properties: properties.collect(),
253
}),
254
)
255
.await?;
256
257
return decode_json_response(&resp);
258
259
#[derive(serde::Serialize)]
260
struct Body<'a> {
261
name: &'a str,
262
catalog_name: &'a str,
263
schema_name: &'a str,
264
comment: Option<&'a str>,
265
table_type: &'a TableType,
266
#[serde(skip_serializing_if = "Option::is_none")]
267
data_source_format: Option<&'a DataSourceFormat>,
268
columns: Option<&'a [ColumnInfo]>,
269
storage_location: Option<&'a str>,
270
properties: PlHashMap<&'a str, &'a str>,
271
}
272
}
273
274
pub async fn delete_table(
275
&self,
276
catalog_name: &str,
277
namespace: &str,
278
table_name: &str,
279
) -> PolarsResult<()> {
280
let full_name = format!(
281
"{}.{}.{}",
282
catalog_name.replace('/', "%2F"),
283
namespace.replace('/', "%2F"),
284
table_name.replace('/', "%2F"),
285
);
286
287
do_request(self.http_client.delete(format!(
288
"{}{}{}",
289
&self.workspace_url, "/api/2.1/unity-catalog/tables/", full_name
290
)))
291
.await?;
292
293
Ok(())
294
}
295
}
296
297
pub struct CatalogClientBuilder {
298
workspace_url: Option<String>,
299
bearer_token: Option<String>,
300
}
301
302
#[allow(clippy::derivable_impls)]
303
impl Default for CatalogClientBuilder {
304
fn default() -> Self {
305
Self {
306
workspace_url: None,
307
bearer_token: None,
308
}
309
}
310
}
311
312
impl CatalogClientBuilder {
313
pub fn new() -> Self {
314
Self::default()
315
}
316
317
pub fn with_workspace_url(mut self, workspace_url: impl Into<String>) -> Self {
318
self.workspace_url = Some(workspace_url.into());
319
self
320
}
321
322
pub fn with_bearer_token(mut self, bearer_token: impl Into<String>) -> Self {
323
self.bearer_token = Some(bearer_token.into());
324
self
325
}
326
327
pub fn build(self) -> PolarsResult<CatalogClient> {
328
let Some(workspace_url) = self.workspace_url else {
329
polars_bail!(ComputeError: "expected Some(_) for workspace_url")
330
};
331
332
Ok(CatalogClient {
333
workspace_url,
334
http_client: {
335
let builder = reqwest::ClientBuilder::new().user_agent(USER_AGENT);
336
337
let builder = if let Some(bearer_token) = self.bearer_token {
338
use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
339
340
let mut headers = HeaderMap::new();
341
342
let mut auth_value =
343
HeaderValue::from_str(format!("Bearer {bearer_token}").as_str()).unwrap();
344
auth_value.set_sensitive(true);
345
346
headers.insert(AUTHORIZATION, auth_value);
347
headers.insert(reqwest::header::USER_AGENT, USER_AGENT.try_into().unwrap());
348
349
builder.default_headers(headers)
350
} else {
351
builder
352
};
353
354
builder.build().map_err(to_compute_err)?
355
},
356
})
357
}
358
}
359
360
pub struct ListCatalogs(pub(crate) PageWalker);
361
impl_page_walk!(ListCatalogs, CatalogInfo, key_name = catalogs);
362
363
pub struct ListSchemas(pub(crate) PageWalker);
364
impl_page_walk!(ListSchemas, NamespaceInfo, key_name = schemas);
365
366
pub struct ListTables(pub(crate) PageWalker);
367
impl_page_walk!(ListTables, TableInfo, key_name = tables);
368
369