Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/catalog/unity.rs
7889 views
1
use std::str::FromStr;
2
3
use polars::prelude::{CloudScheme, LazyFrame, PlHashMap, PlSmallStr, Schema};
4
use polars_io::catalog::unity::client::{CatalogClient, CatalogClientBuilder};
5
use polars_io::catalog::unity::models::{
6
CatalogInfo, ColumnInfo, DataSourceFormat, NamespaceInfo, TableInfo, TableType,
7
};
8
use polars_io::catalog::unity::schema::parse_type_json_str;
9
use polars_io::cloud::credential_provider::PlCredentialProvider;
10
use polars_io::pl_async;
11
use pyo3::exceptions::PyValueError;
12
use pyo3::sync::PyOnceLock;
13
use pyo3::types::{PyAnyMethods, PyDict, PyList, PyNone, PyTuple};
14
use pyo3::{Bound, IntoPyObject, Py, PyAny, PyResult, Python, pyclass, pymethods};
15
16
use crate::lazyframe::PyLazyFrame;
17
use crate::prelude::{Wrap, parse_cloud_options};
18
use crate::utils::{EnterPolarsExt, to_py_err};
19
20
macro_rules! pydict_insert_keys {
21
($dict:expr, {$a:expr}) => {
22
$dict.set_item(stringify!($a), $a)?;
23
};
24
25
($dict:expr, {$a:expr, $($args:expr),+}) => {
26
pydict_insert_keys!($dict, { $a });
27
pydict_insert_keys!($dict, { $($args),+ });
28
};
29
30
($dict:expr, {$a:expr, $($args:expr),+,}) => {
31
pydict_insert_keys!($dict, {$a, $($args),+});
32
};
33
}
34
35
// Result dataclasses. These are initialized from Python by calling [`PyCatalogClient::init_classes`].
36
37
static CATALOG_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
38
static NAMESPACE_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
39
static TABLE_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
40
static COLUMN_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
41
42
#[pyclass(frozen)]
43
pub struct PyCatalogClient(CatalogClient);
44
45
#[pymethods]
46
impl PyCatalogClient {
47
#[pyo3(signature = (workspace_url, bearer_token))]
48
#[staticmethod]
49
pub fn new(workspace_url: String, bearer_token: Option<String>) -> PyResult<Self> {
50
let builder = CatalogClientBuilder::new().with_workspace_url(workspace_url);
51
52
let builder = if let Some(bearer_token) = bearer_token {
53
builder.with_bearer_token(bearer_token)
54
} else {
55
builder
56
};
57
58
builder.build().map(PyCatalogClient).map_err(to_py_err)
59
}
60
61
pub fn list_catalogs(&self, py: Python) -> PyResult<Py<PyAny>> {
62
let v = py.enter_polars(|| {
63
pl_async::get_runtime().block_in_place_on(self.client().list_catalogs())
64
})?;
65
66
let mut opt_err = None;
67
68
let out = PyList::new(
69
py,
70
v.into_iter().map(|x| {
71
let v = catalog_info_to_pyobject(py, x);
72
if let Ok(v) = v {
73
Some(v)
74
} else {
75
opt_err.replace(v);
76
None
77
}
78
}),
79
)?;
80
81
opt_err.transpose()?;
82
83
Ok(out.into())
84
}
85
86
#[pyo3(signature = (catalog_name))]
87
pub fn list_namespaces(&self, py: Python<'_>, catalog_name: &str) -> PyResult<Py<PyAny>> {
88
let v = py.enter_polars(|| {
89
pl_async::get_runtime().block_in_place_on(self.client().list_namespaces(catalog_name))
90
})?;
91
92
let mut opt_err = None;
93
94
let out = PyList::new(
95
py,
96
v.into_iter().map(|x| {
97
let v = namespace_info_to_pyobject(py, x);
98
match v {
99
Ok(v) => Some(v),
100
Err(_) => {
101
opt_err.replace(v);
102
None
103
},
104
}
105
}),
106
)?;
107
108
opt_err.transpose()?;
109
110
Ok(out.into())
111
}
112
113
#[pyo3(signature = (catalog_name, namespace))]
114
pub fn list_tables(
115
&self,
116
py: Python<'_>,
117
catalog_name: &str,
118
namespace: &str,
119
) -> PyResult<Py<PyAny>> {
120
let v = py.enter_polars(|| {
121
pl_async::get_runtime()
122
.block_in_place_on(self.client().list_tables(catalog_name, namespace))
123
})?;
124
125
let mut opt_err = None;
126
127
let out = PyList::new(
128
py,
129
v.into_iter().map(|table_info| {
130
let v = table_info_to_pyobject(py, table_info);
131
132
if let Ok(v) = v {
133
Some(v)
134
} else {
135
opt_err.replace(v);
136
None
137
}
138
}),
139
)?
140
.into();
141
142
opt_err.transpose()?;
143
144
Ok(out)
145
}
146
147
#[pyo3(signature = (table_name, catalog_name, namespace))]
148
pub fn get_table_info(
149
&self,
150
py: Python<'_>,
151
table_name: &str,
152
catalog_name: &str,
153
namespace: &str,
154
) -> PyResult<Py<PyAny>> {
155
let table_info = py
156
.enter_polars(|| {
157
pl_async::get_runtime().block_in_place_on(self.client().get_table_info(
158
table_name,
159
catalog_name,
160
namespace,
161
))
162
})
163
.map_err(to_py_err)?;
164
165
table_info_to_pyobject(py, table_info).map(|x| x.into())
166
}
167
168
#[pyo3(signature = (table_id, write))]
169
pub fn get_table_credentials(
170
&self,
171
py: Python<'_>,
172
table_id: &str,
173
write: bool,
174
) -> PyResult<Py<PyAny>> {
175
let table_credentials = py
176
.enter_polars(|| {
177
pl_async::get_runtime()
178
.block_in_place_on(self.client().get_table_credentials(table_id, write))
179
})
180
.map_err(to_py_err)?;
181
182
let expiry = table_credentials.expiration_time;
183
184
let credentials = PyDict::new(py);
185
// Keys in here are intended to be injected into `storage_options` from the Python side.
186
// Note this currently really only exists for `aws_endpoint_url`.
187
let storage_update_options = PyDict::new(py);
188
189
{
190
use TableCredentialsVariants::*;
191
use polars_io::catalog::unity::models::{
192
TableCredentialsAws, TableCredentialsAzure, TableCredentialsGcp,
193
TableCredentialsVariants,
194
};
195
196
match table_credentials.into_enum() {
197
Some(Aws(TableCredentialsAws {
198
access_key_id,
199
secret_access_key,
200
session_token,
201
access_point,
202
})) => {
203
credentials.set_item("aws_access_key_id", access_key_id)?;
204
credentials.set_item("aws_secret_access_key", secret_access_key)?;
205
206
if let Some(session_token) = session_token {
207
credentials.set_item("aws_session_token", session_token)?;
208
}
209
210
if let Some(access_point) = access_point {
211
storage_update_options.set_item("aws_endpoint_url", access_point)?;
212
}
213
},
214
Some(Azure(TableCredentialsAzure { sas_token })) => {
215
credentials.set_item("sas_token", sas_token)?;
216
},
217
Some(Gcp(TableCredentialsGcp { oauth_token })) => {
218
credentials.set_item("bearer_token", oauth_token)?;
219
},
220
None => {},
221
}
222
}
223
224
let credentials = if credentials.len()? > 0 {
225
credentials.into_any()
226
} else {
227
PyNone::get(py).as_any().clone()
228
};
229
let storage_update_options = storage_update_options.into_any();
230
let expiry = expiry.into_pyobject(py)?.into_any();
231
232
Ok(PyTuple::new(py, [credentials, storage_update_options, expiry])?.into())
233
}
234
235
#[pyo3(signature = (catalog_name, namespace, table_name, cloud_options, credential_provider, retries))]
236
pub fn scan_table(
237
&self,
238
py: Python<'_>,
239
catalog_name: &str,
240
namespace: &str,
241
table_name: &str,
242
cloud_options: Option<Vec<(String, String)>>,
243
credential_provider: Option<Py<PyAny>>,
244
retries: usize,
245
) -> PyResult<PyLazyFrame> {
246
let table_info = py.enter_polars(|| {
247
pl_async::get_runtime().block_in_place_on(self.client().get_table_info(
248
catalog_name,
249
namespace,
250
table_name,
251
))
252
})?;
253
254
let Some(storage_location) = table_info.storage_location.as_deref() else {
255
return Err(PyValueError::new_err(
256
"cannot scan catalog table: no storage_location found",
257
));
258
};
259
260
let cloud_options = parse_cloud_options(
261
CloudScheme::from_uri(storage_location),
262
cloud_options.unwrap_or_default(),
263
)?
264
.with_max_retries(retries)
265
.with_credential_provider(
266
credential_provider.map(PlCredentialProvider::from_python_builder),
267
);
268
269
Ok(
270
LazyFrame::scan_catalog_table(&table_info, Some(cloud_options))
271
.map_err(to_py_err)?
272
.into(),
273
)
274
}
275
276
#[pyo3(signature = (catalog_name, comment, storage_root))]
277
pub fn create_catalog(
278
&self,
279
py: Python<'_>,
280
catalog_name: &str,
281
comment: Option<&str>,
282
storage_root: Option<&str>,
283
) -> PyResult<Py<PyAny>> {
284
let catalog_info = py
285
.detach(|| {
286
pl_async::get_runtime().block_in_place_on(self.client().create_catalog(
287
catalog_name,
288
comment,
289
storage_root,
290
))
291
})
292
.map_err(to_py_err)?;
293
294
catalog_info_to_pyobject(py, catalog_info).map(|x| x.into())
295
}
296
297
#[pyo3(signature = (catalog_name, force))]
298
pub fn delete_catalog(&self, py: Python<'_>, catalog_name: &str, force: bool) -> PyResult<()> {
299
py.detach(|| {
300
pl_async::get_runtime()
301
.block_in_place_on(self.client().delete_catalog(catalog_name, force))
302
})
303
.map_err(to_py_err)
304
}
305
306
#[pyo3(signature = (catalog_name, namespace, comment, storage_root))]
307
pub fn create_namespace(
308
&self,
309
py: Python<'_>,
310
catalog_name: &str,
311
namespace: &str,
312
comment: Option<&str>,
313
storage_root: Option<&str>,
314
) -> PyResult<Py<PyAny>> {
315
let namespace_info = py
316
.detach(|| {
317
pl_async::get_runtime().block_in_place_on(self.client().create_namespace(
318
catalog_name,
319
namespace,
320
comment,
321
storage_root,
322
))
323
})
324
.map_err(to_py_err)?;
325
326
namespace_info_to_pyobject(py, namespace_info).map(|x| x.into())
327
}
328
329
#[pyo3(signature = (catalog_name, namespace, force))]
330
pub fn delete_namespace(
331
&self,
332
py: Python<'_>,
333
catalog_name: &str,
334
namespace: &str,
335
force: bool,
336
) -> PyResult<()> {
337
py.detach(|| {
338
pl_async::get_runtime().block_in_place_on(self.client().delete_namespace(
339
catalog_name,
340
namespace,
341
force,
342
))
343
})
344
.map_err(to_py_err)
345
}
346
347
#[pyo3(signature = (
348
catalog_name, namespace, table_name, schema, table_type, data_source_format, comment,
349
storage_root, properties
350
))]
351
pub fn create_table(
352
&self,
353
py: Python<'_>,
354
catalog_name: &str,
355
namespace: &str,
356
table_name: &str,
357
schema: Option<Wrap<Schema>>,
358
table_type: &str,
359
data_source_format: Option<&str>,
360
comment: Option<&str>,
361
storage_root: Option<&str>,
362
properties: Vec<(String, String)>,
363
) -> PyResult<Py<PyAny>> {
364
let table_info = py.detach(|| {
365
pl_async::get_runtime()
366
.block_in_place_on(
367
self.client().create_table(
368
catalog_name,
369
namespace,
370
table_name,
371
schema.as_ref().map(|x| &x.0),
372
&TableType::from_str(table_type)
373
.map_err(|e| PyValueError::new_err(e.to_string()))?,
374
data_source_format
375
.map(DataSourceFormat::from_str)
376
.transpose()
377
.map_err(|e| PyValueError::new_err(e.to_string()))?
378
.as_ref(),
379
comment,
380
storage_root,
381
&mut properties.iter().map(|(a, b)| (a.as_str(), b.as_str())),
382
),
383
)
384
.map_err(to_py_err)
385
})?;
386
387
table_info_to_pyobject(py, table_info).map(|x| x.into())
388
}
389
390
#[pyo3(signature = (catalog_name, namespace, table_name))]
391
pub fn delete_table(
392
&self,
393
py: Python<'_>,
394
catalog_name: &str,
395
namespace: &str,
396
table_name: &str,
397
) -> PyResult<()> {
398
py.detach(|| {
399
pl_async::get_runtime().block_in_place_on(self.client().delete_table(
400
catalog_name,
401
namespace,
402
table_name,
403
))
404
})
405
.map_err(to_py_err)
406
}
407
408
#[pyo3(signature = (type_json))]
409
#[staticmethod]
410
pub fn type_json_to_polars_type(py: Python<'_>, type_json: &str) -> PyResult<Py<PyAny>> {
411
Ok(Wrap(parse_type_json_str(type_json).map_err(to_py_err)?)
412
.into_pyobject(py)?
413
.unbind())
414
}
415
416
#[pyo3(signature = (catalog_info_cls, namespace_info_cls, table_info_cls, column_info_cls))]
417
#[staticmethod]
418
pub fn init_classes(
419
py: Python<'_>,
420
catalog_info_cls: Py<PyAny>,
421
namespace_info_cls: Py<PyAny>,
422
table_info_cls: Py<PyAny>,
423
column_info_cls: Py<PyAny>,
424
) {
425
CATALOG_INFO_CLS.get_or_init(py, || catalog_info_cls);
426
NAMESPACE_INFO_CLS.get_or_init(py, || namespace_info_cls);
427
TABLE_INFO_CLS.get_or_init(py, || table_info_cls);
428
COLUMN_INFO_CLS.get_or_init(py, || column_info_cls);
429
}
430
}
431
432
impl PyCatalogClient {
433
fn client(&self) -> &CatalogClient {
434
&self.0
435
}
436
}
437
438
fn catalog_info_to_pyobject(
439
py: Python<'_>,
440
CatalogInfo {
441
name,
442
comment,
443
storage_location,
444
properties,
445
options,
446
created_at,
447
created_by,
448
updated_at,
449
updated_by,
450
}: CatalogInfo,
451
) -> PyResult<Bound<'_, PyAny>> {
452
let dict = PyDict::new(py);
453
454
let properties = properties_to_pyobject(py, properties);
455
let options = properties_to_pyobject(py, options);
456
457
pydict_insert_keys!(dict, {
458
name,
459
comment,
460
storage_location,
461
properties,
462
options,
463
created_at,
464
created_by,
465
updated_at,
466
updated_by
467
});
468
469
CATALOG_INFO_CLS
470
.get(py)
471
.unwrap()
472
.bind(py)
473
.call((), Some(&dict))
474
}
475
476
fn namespace_info_to_pyobject(
477
py: Python<'_>,
478
NamespaceInfo {
479
name,
480
comment,
481
properties,
482
storage_location,
483
created_at,
484
created_by,
485
updated_at,
486
updated_by,
487
}: NamespaceInfo,
488
) -> PyResult<Bound<'_, PyAny>> {
489
let dict = PyDict::new(py);
490
491
let properties = properties_to_pyobject(py, properties);
492
493
pydict_insert_keys!(dict, {
494
name,
495
comment,
496
properties,
497
storage_location,
498
created_at,
499
created_by,
500
updated_at,
501
updated_by
502
});
503
504
NAMESPACE_INFO_CLS
505
.get(py)
506
.unwrap()
507
.bind(py)
508
.call((), Some(&dict))
509
}
510
511
fn table_info_to_pyobject(py: Python<'_>, table_info: TableInfo) -> PyResult<Bound<'_, PyAny>> {
512
let TableInfo {
513
name,
514
table_id,
515
table_type,
516
comment,
517
storage_location,
518
data_source_format,
519
columns,
520
properties,
521
created_at,
522
created_by,
523
updated_at,
524
updated_by,
525
} = table_info;
526
527
let column_info_cls = COLUMN_INFO_CLS.get(py).unwrap().bind(py);
528
529
let columns = columns
530
.map(|columns| {
531
columns
532
.into_iter()
533
.map(
534
|ColumnInfo {
535
name,
536
type_name,
537
type_text,
538
type_json,
539
position,
540
comment,
541
partition_index,
542
}| {
543
let dict = PyDict::new(py);
544
545
let name = name.as_str();
546
let type_name = type_name.as_str();
547
let type_text = type_text.as_str();
548
549
pydict_insert_keys!(dict, {
550
name,
551
type_name,
552
type_text,
553
type_json,
554
position,
555
comment,
556
partition_index,
557
});
558
559
column_info_cls.call((), Some(&dict))
560
},
561
)
562
.collect::<PyResult<Vec<_>>>()
563
})
564
.transpose()?;
565
566
let dict = PyDict::new(py);
567
568
let data_source_format = data_source_format.map(|x| x.to_string());
569
let table_type = table_type.to_string();
570
let properties = properties_to_pyobject(py, properties);
571
572
pydict_insert_keys!(dict, {
573
name,
574
comment,
575
table_id,
576
table_type,
577
storage_location,
578
data_source_format,
579
columns,
580
properties,
581
created_at,
582
created_by,
583
updated_at,
584
updated_by,
585
});
586
587
TABLE_INFO_CLS
588
.get(py)
589
.unwrap()
590
.bind(py)
591
.call((), Some(&dict))
592
}
593
594
fn properties_to_pyobject(
595
py: Python<'_>,
596
properties: PlHashMap<PlSmallStr, String>,
597
) -> Bound<'_, PyDict> {
598
let dict = PyDict::new(py);
599
600
for (key, value) in properties.into_iter() {
601
dict.set_item(key.as_str(), value).unwrap();
602
}
603
604
dict
605
}
606
607