Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/catalog/unity/client.rs
6939 views
1
use polars_core::prelude::PlHashMap;
2
use polars_core::schema::Schema;
3
use polars_error::{PolarsResult, polars_bail, to_compute_err};
4
5
use super::models::{CatalogInfo, NamespaceInfo, TableCredentials, TableInfo};
6
use super::schema::schema_to_column_info_list;
7
use super::utils::{PageWalker, do_request};
8
use crate::catalog::unity::models::{ColumnInfo, DataSourceFormat, TableType};
9
use crate::impl_page_walk;
10
use crate::utils::decode_json_response;
11
12
/// Unity catalog client.
13
pub struct CatalogClient {
14
workspace_url: String,
15
http_client: reqwest::Client,
16
}
17
18
impl CatalogClient {
19
pub async fn list_catalogs(&self) -> PolarsResult<Vec<CatalogInfo>> {
20
ListCatalogs(PageWalker::new(self.http_client.get(format!(
21
"{}{}",
22
&self.workspace_url, "/api/2.1/unity-catalog/catalogs"
23
))))
24
.read_all_pages()
25
.await
26
}
27
28
pub async fn list_namespaces(&self, catalog_name: &str) -> PolarsResult<Vec<NamespaceInfo>> {
29
ListSchemas(PageWalker::new(
30
self.http_client
31
.get(format!(
32
"{}{}",
33
&self.workspace_url, "/api/2.1/unity-catalog/schemas"
34
))
35
.query(&[("catalog_name", catalog_name)]),
36
))
37
.read_all_pages()
38
.await
39
}
40
41
pub async fn list_tables(
42
&self,
43
catalog_name: &str,
44
namespace: &str,
45
) -> PolarsResult<Vec<TableInfo>> {
46
ListTables(PageWalker::new(
47
self.http_client
48
.get(format!(
49
"{}{}",
50
&self.workspace_url, "/api/2.1/unity-catalog/tables"
51
))
52
.query(&[("catalog_name", catalog_name), ("schema_name", namespace)]),
53
))
54
.read_all_pages()
55
.await
56
}
57
58
pub async fn get_table_info(
59
&self,
60
catalog_name: &str,
61
namespace: &str,
62
table_name: &str,
63
) -> PolarsResult<TableInfo> {
64
let full_table_name = format!(
65
"{}.{}.{}",
66
catalog_name.replace('/', "%2F"),
67
namespace.replace('/', "%2F"),
68
table_name.replace('/', "%2F")
69
);
70
71
let bytes = do_request(
72
self.http_client
73
.get(format!(
74
"{}{}{}",
75
&self.workspace_url, "/api/2.1/unity-catalog/tables/", full_table_name
76
))
77
.query(&[("full_name", full_table_name)]),
78
)
79
.await?;
80
81
let out: TableInfo = decode_json_response(&bytes)?;
82
83
Ok(out)
84
}
85
86
pub async fn get_table_credentials(
87
&self,
88
table_id: &str,
89
write: bool,
90
) -> PolarsResult<TableCredentials> {
91
let bytes = do_request(
92
self.http_client
93
.post(format!(
94
"{}{}",
95
&self.workspace_url, "/api/2.1/unity-catalog/temporary-table-credentials"
96
))
97
.query(&[
98
("table_id", table_id),
99
("operation", if write { "READ_WRITE" } else { "READ" }),
100
]),
101
)
102
.await?;
103
104
let out: TableCredentials = decode_json_response(&bytes)?;
105
106
Ok(out)
107
}
108
109
pub async fn create_catalog(
110
&self,
111
catalog_name: &str,
112
comment: Option<&str>,
113
storage_root: Option<&str>,
114
) -> PolarsResult<CatalogInfo> {
115
let resp = do_request(
116
self.http_client
117
.post(format!(
118
"{}{}",
119
&self.workspace_url, "/api/2.1/unity-catalog/catalogs"
120
))
121
.json(&Body {
122
name: catalog_name,
123
comment,
124
storage_root,
125
}),
126
)
127
.await?;
128
129
return decode_json_response(&resp);
130
131
#[derive(serde::Serialize)]
132
struct Body<'a> {
133
name: &'a str,
134
comment: Option<&'a str>,
135
storage_root: Option<&'a str>,
136
}
137
}
138
139
pub async fn delete_catalog(&self, catalog_name: &str, force: bool) -> PolarsResult<()> {
140
let catalog_name = catalog_name.replace('/', "%2F");
141
142
do_request(
143
self.http_client
144
.delete(format!(
145
"{}{}{}",
146
&self.workspace_url, "/api/2.1/unity-catalog/catalogs/", catalog_name
147
))
148
.query(&[("force", force)]),
149
)
150
.await?;
151
152
Ok(())
153
}
154
155
pub async fn create_namespace(
156
&self,
157
catalog_name: &str,
158
namespace: &str,
159
comment: Option<&str>,
160
storage_root: Option<&str>,
161
) -> PolarsResult<NamespaceInfo> {
162
let resp = do_request(
163
self.http_client
164
.post(format!(
165
"{}{}",
166
&self.workspace_url, "/api/2.1/unity-catalog/schemas"
167
))
168
.json(&Body {
169
name: namespace,
170
catalog_name,
171
comment,
172
storage_root,
173
}),
174
)
175
.await?;
176
177
return decode_json_response(&resp);
178
179
#[derive(serde::Serialize)]
180
struct Body<'a> {
181
name: &'a str,
182
catalog_name: &'a str,
183
comment: Option<&'a str>,
184
storage_root: Option<&'a str>,
185
}
186
}
187
188
pub async fn delete_namespace(
189
&self,
190
catalog_name: &str,
191
namespace: &str,
192
force: bool,
193
) -> PolarsResult<()> {
194
let full_name = format!(
195
"{}.{}",
196
catalog_name.replace('/', "%2F"),
197
namespace.replace('/', "%2F"),
198
);
199
200
do_request(
201
self.http_client
202
.delete(format!(
203
"{}{}{}",
204
&self.workspace_url, "/api/2.1/unity-catalog/schemas/", full_name
205
))
206
.query(&[("force", force)]),
207
)
208
.await?;
209
210
Ok(())
211
}
212
213
/// Note, `data_source_format` can be None for some `table_type`s.
214
#[allow(clippy::too_many_arguments)]
215
pub async fn create_table(
216
&self,
217
catalog_name: &str,
218
namespace: &str,
219
table_name: &str,
220
schema: Option<&Schema>,
221
table_type: &TableType,
222
data_source_format: Option<&DataSourceFormat>,
223
comment: Option<&str>,
224
storage_location: Option<&str>,
225
properties: &mut (dyn Iterator<Item = (&str, &str)> + Send + Sync),
226
) -> PolarsResult<TableInfo> {
227
let columns = schema.map(schema_to_column_info_list).transpose()?;
228
let columns = columns.as_deref();
229
230
let resp = do_request(
231
self.http_client
232
.post(format!(
233
"{}{}",
234
&self.workspace_url, "/api/2.1/unity-catalog/tables"
235
))
236
.json(&Body {
237
name: table_name,
238
catalog_name,
239
schema_name: namespace,
240
table_type,
241
data_source_format,
242
comment,
243
columns,
244
storage_location,
245
properties: properties.collect(),
246
}),
247
)
248
.await?;
249
250
return decode_json_response(&resp);
251
252
#[derive(serde::Serialize)]
253
struct Body<'a> {
254
name: &'a str,
255
catalog_name: &'a str,
256
schema_name: &'a str,
257
comment: Option<&'a str>,
258
table_type: &'a TableType,
259
#[serde(skip_serializing_if = "Option::is_none")]
260
data_source_format: Option<&'a DataSourceFormat>,
261
columns: Option<&'a [ColumnInfo]>,
262
storage_location: Option<&'a str>,
263
properties: PlHashMap<&'a str, &'a str>,
264
}
265
}
266
267
pub async fn delete_table(
268
&self,
269
catalog_name: &str,
270
namespace: &str,
271
table_name: &str,
272
) -> PolarsResult<()> {
273
let full_name = format!(
274
"{}.{}.{}",
275
catalog_name.replace('/', "%2F"),
276
namespace.replace('/', "%2F"),
277
table_name.replace('/', "%2F"),
278
);
279
280
do_request(self.http_client.delete(format!(
281
"{}{}{}",
282
&self.workspace_url, "/api/2.1/unity-catalog/tables/", full_name
283
)))
284
.await?;
285
286
Ok(())
287
}
288
}
289
290
pub struct CatalogClientBuilder {
291
workspace_url: Option<String>,
292
bearer_token: Option<String>,
293
}
294
295
#[allow(clippy::derivable_impls)]
296
impl Default for CatalogClientBuilder {
297
fn default() -> Self {
298
Self {
299
workspace_url: None,
300
bearer_token: None,
301
}
302
}
303
}
304
305
impl CatalogClientBuilder {
306
pub fn new() -> Self {
307
Self::default()
308
}
309
310
pub fn with_workspace_url(mut self, workspace_url: impl Into<String>) -> Self {
311
self.workspace_url = Some(workspace_url.into());
312
self
313
}
314
315
pub fn with_bearer_token(mut self, bearer_token: impl Into<String>) -> Self {
316
self.bearer_token = Some(bearer_token.into());
317
self
318
}
319
320
pub fn build(self) -> PolarsResult<CatalogClient> {
321
let Some(workspace_url) = self.workspace_url else {
322
polars_bail!(ComputeError: "expected Some(_) for workspace_url")
323
};
324
325
Ok(CatalogClient {
326
workspace_url,
327
http_client: {
328
let builder = reqwest::ClientBuilder::new().user_agent("polars");
329
330
let builder = if let Some(bearer_token) = self.bearer_token {
331
use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue, USER_AGENT};
332
333
let mut headers = HeaderMap::new();
334
335
let mut auth_value =
336
HeaderValue::from_str(format!("Bearer {bearer_token}").as_str()).unwrap();
337
auth_value.set_sensitive(true);
338
339
headers.insert(AUTHORIZATION, auth_value);
340
headers.insert(USER_AGENT, "polars".try_into().unwrap());
341
342
builder.default_headers(headers)
343
} else {
344
builder
345
};
346
347
builder.build().map_err(to_compute_err)?
348
},
349
})
350
}
351
}
352
353
pub struct ListCatalogs(pub(crate) PageWalker);
354
impl_page_walk!(ListCatalogs, CatalogInfo, key_name = catalogs);
355
356
pub struct ListSchemas(pub(crate) PageWalker);
357
impl_page_walk!(ListSchemas, NamespaceInfo, key_name = schemas);
358
359
pub struct ListTables(pub(crate) PageWalker);
360
impl_page_walk!(ListTables, TableInfo, key_name = tables);
361
362