Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/catalog/unity.rs
8353 views
1
use std::str::FromStr;
2
3
use polars::prelude::{CloudScheme, LazyFrame, PlHashMap, PlSmallStr, Schema};
4
use polars_io::catalog::unity::client::{CatalogClient, CatalogClientBuilder};
5
use polars_io::catalog::unity::models::{
6
CatalogInfo, ColumnInfo, DataSourceFormat, NamespaceInfo, TableInfo, TableType,
7
};
8
use polars_io::catalog::unity::schema::parse_type_json_str;
9
use polars_io::pl_async;
10
use pyo3::exceptions::PyValueError;
11
use pyo3::sync::PyOnceLock;
12
use pyo3::types::{PyAnyMethods, PyDict, PyList, PyNone, PyTuple};
13
use pyo3::{Bound, IntoPyObject, Py, PyAny, PyResult, Python, pyclass, pymethods};
14
15
use crate::io::cloud_options::OptPyCloudOptions;
16
use crate::lazyframe::PyLazyFrame;
17
use crate::prelude::Wrap;
18
use crate::utils::{EnterPolarsExt, to_py_err};
19
20
macro_rules! pydict_insert_keys {
21
($dict:expr, {$a:expr}) => {
22
$dict.set_item(stringify!($a), $a)?;
23
};
24
25
($dict:expr, {$a:expr, $($args:expr),+}) => {
26
pydict_insert_keys!($dict, { $a });
27
pydict_insert_keys!($dict, { $($args),+ });
28
};
29
30
($dict:expr, {$a:expr, $($args:expr),+,}) => {
31
pydict_insert_keys!($dict, {$a, $($args),+});
32
};
33
}
34
35
// Result dataclasses. These are initialized from Python by calling [`PyCatalogClient::init_classes`].
36
37
static CATALOG_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
38
static NAMESPACE_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
39
static TABLE_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
40
static COLUMN_INFO_CLS: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
41
42
#[pyclass(frozen)]
43
pub struct PyCatalogClient(CatalogClient);
44
45
#[pymethods]
46
impl PyCatalogClient {
47
#[pyo3(signature = (workspace_url, bearer_token))]
48
#[staticmethod]
49
pub fn new(workspace_url: String, bearer_token: Option<String>) -> PyResult<Self> {
50
let builder = CatalogClientBuilder::new().with_workspace_url(workspace_url);
51
52
let builder = if let Some(bearer_token) = bearer_token {
53
builder.with_bearer_token(bearer_token)
54
} else {
55
builder
56
};
57
58
builder.build().map(PyCatalogClient).map_err(to_py_err)
59
}
60
61
pub fn list_catalogs(&self, py: Python) -> PyResult<Py<PyAny>> {
62
let v = py.enter_polars(|| {
63
pl_async::get_runtime().block_in_place_on(self.client().list_catalogs())
64
})?;
65
66
let mut opt_err = None;
67
68
let out = PyList::new(
69
py,
70
v.into_iter().map(|x| {
71
let v = catalog_info_to_pyobject(py, x);
72
if let Ok(v) = v {
73
Some(v)
74
} else {
75
opt_err.replace(v);
76
None
77
}
78
}),
79
)?;
80
81
opt_err.transpose()?;
82
83
Ok(out.into())
84
}
85
86
#[pyo3(signature = (catalog_name))]
87
pub fn list_namespaces(&self, py: Python<'_>, catalog_name: &str) -> PyResult<Py<PyAny>> {
88
let v = py.enter_polars(|| {
89
pl_async::get_runtime().block_in_place_on(self.client().list_namespaces(catalog_name))
90
})?;
91
92
let mut opt_err = None;
93
94
let out = PyList::new(
95
py,
96
v.into_iter().map(|x| {
97
let v = namespace_info_to_pyobject(py, x);
98
match v {
99
Ok(v) => Some(v),
100
Err(_) => {
101
opt_err.replace(v);
102
None
103
},
104
}
105
}),
106
)?;
107
108
opt_err.transpose()?;
109
110
Ok(out.into())
111
}
112
113
#[pyo3(signature = (catalog_name, namespace))]
114
pub fn list_tables(
115
&self,
116
py: Python<'_>,
117
catalog_name: &str,
118
namespace: &str,
119
) -> PyResult<Py<PyAny>> {
120
let v = py.enter_polars(|| {
121
pl_async::get_runtime()
122
.block_in_place_on(self.client().list_tables(catalog_name, namespace))
123
})?;
124
125
let mut opt_err = None;
126
127
let out = PyList::new(
128
py,
129
v.into_iter().map(|table_info| {
130
let v = table_info_to_pyobject(py, table_info);
131
132
if let Ok(v) = v {
133
Some(v)
134
} else {
135
opt_err.replace(v);
136
None
137
}
138
}),
139
)?
140
.into();
141
142
opt_err.transpose()?;
143
144
Ok(out)
145
}
146
147
#[pyo3(signature = (table_name, catalog_name, namespace))]
148
pub fn get_table_info(
149
&self,
150
py: Python<'_>,
151
table_name: &str,
152
catalog_name: &str,
153
namespace: &str,
154
) -> PyResult<Py<PyAny>> {
155
let table_info = py
156
.enter_polars(|| {
157
pl_async::get_runtime().block_in_place_on(self.client().get_table_info(
158
table_name,
159
catalog_name,
160
namespace,
161
))
162
})
163
.map_err(to_py_err)?;
164
165
table_info_to_pyobject(py, table_info).map(|x| x.into())
166
}
167
168
#[pyo3(signature = (table_id, write))]
169
pub fn get_table_credentials(
170
&self,
171
py: Python<'_>,
172
table_id: &str,
173
write: bool,
174
) -> PyResult<Py<PyAny>> {
175
let table_credentials = py
176
.enter_polars(|| {
177
pl_async::get_runtime()
178
.block_in_place_on(self.client().get_table_credentials(table_id, write))
179
})
180
.map_err(to_py_err)?;
181
182
let expiry = table_credentials.expiration_time;
183
184
let credentials = PyDict::new(py);
185
// Keys in here are intended to be injected into `storage_options` from the Python side.
186
// Note this currently really only exists for `aws_endpoint_url`.
187
let storage_update_options = PyDict::new(py);
188
189
{
190
use TableCredentialsVariants::*;
191
use polars_io::catalog::unity::models::{
192
TableCredentialsAws, TableCredentialsAzure, TableCredentialsGcp,
193
TableCredentialsVariants,
194
};
195
196
match table_credentials.into_enum() {
197
Some(Aws(TableCredentialsAws {
198
access_key_id,
199
secret_access_key,
200
session_token,
201
access_point,
202
})) => {
203
credentials.set_item("aws_access_key_id", access_key_id)?;
204
credentials.set_item("aws_secret_access_key", secret_access_key)?;
205
206
if let Some(session_token) = session_token {
207
credentials.set_item("aws_session_token", session_token)?;
208
}
209
210
if let Some(access_point) = access_point {
211
storage_update_options.set_item("aws_endpoint_url", access_point)?;
212
}
213
},
214
Some(Azure(TableCredentialsAzure { sas_token })) => {
215
credentials.set_item("sas_token", sas_token)?;
216
},
217
Some(Gcp(TableCredentialsGcp { oauth_token })) => {
218
credentials.set_item("bearer_token", oauth_token)?;
219
},
220
None => {},
221
}
222
}
223
224
let credentials = if credentials.len()? > 0 {
225
credentials.into_any()
226
} else {
227
PyNone::get(py).as_any().clone()
228
};
229
let storage_update_options = storage_update_options.into_any();
230
let expiry = expiry.into_pyobject(py)?.into_any();
231
232
Ok(PyTuple::new(py, [credentials, storage_update_options, expiry])?.into())
233
}
234
235
#[pyo3(signature = (catalog_name, namespace, table_name, cloud_options, credential_provider))]
236
pub fn scan_table(
237
&self,
238
py: Python<'_>,
239
catalog_name: &str,
240
namespace: &str,
241
table_name: &str,
242
cloud_options: OptPyCloudOptions,
243
credential_provider: Option<Py<PyAny>>,
244
) -> PyResult<PyLazyFrame> {
245
let table_info = py.enter_polars(|| {
246
pl_async::get_runtime().block_in_place_on(self.client().get_table_info(
247
catalog_name,
248
namespace,
249
table_name,
250
))
251
})?;
252
253
let Some(storage_location) = table_info.storage_location.as_deref() else {
254
return Err(PyValueError::new_err(
255
"cannot scan catalog table: no storage_location found",
256
));
257
};
258
259
let cloud_options = cloud_options.extract_opt_cloud_options(
260
CloudScheme::from_path(storage_location),
261
credential_provider,
262
)?;
263
264
Ok(LazyFrame::scan_catalog_table(&table_info, cloud_options)
265
.map_err(to_py_err)?
266
.into())
267
}
268
269
#[pyo3(signature = (catalog_name, comment, storage_root))]
270
pub fn create_catalog(
271
&self,
272
py: Python<'_>,
273
catalog_name: &str,
274
comment: Option<&str>,
275
storage_root: Option<&str>,
276
) -> PyResult<Py<PyAny>> {
277
let catalog_info = py
278
.detach(|| {
279
pl_async::get_runtime().block_in_place_on(self.client().create_catalog(
280
catalog_name,
281
comment,
282
storage_root,
283
))
284
})
285
.map_err(to_py_err)?;
286
287
catalog_info_to_pyobject(py, catalog_info).map(|x| x.into())
288
}
289
290
#[pyo3(signature = (catalog_name, force))]
291
pub fn delete_catalog(&self, py: Python<'_>, catalog_name: &str, force: bool) -> PyResult<()> {
292
py.detach(|| {
293
pl_async::get_runtime()
294
.block_in_place_on(self.client().delete_catalog(catalog_name, force))
295
})
296
.map_err(to_py_err)
297
}
298
299
#[pyo3(signature = (catalog_name, namespace, comment, storage_root))]
300
pub fn create_namespace(
301
&self,
302
py: Python<'_>,
303
catalog_name: &str,
304
namespace: &str,
305
comment: Option<&str>,
306
storage_root: Option<&str>,
307
) -> PyResult<Py<PyAny>> {
308
let namespace_info = py
309
.detach(|| {
310
pl_async::get_runtime().block_in_place_on(self.client().create_namespace(
311
catalog_name,
312
namespace,
313
comment,
314
storage_root,
315
))
316
})
317
.map_err(to_py_err)?;
318
319
namespace_info_to_pyobject(py, namespace_info).map(|x| x.into())
320
}
321
322
#[pyo3(signature = (catalog_name, namespace, force))]
323
pub fn delete_namespace(
324
&self,
325
py: Python<'_>,
326
catalog_name: &str,
327
namespace: &str,
328
force: bool,
329
) -> PyResult<()> {
330
py.detach(|| {
331
pl_async::get_runtime().block_in_place_on(self.client().delete_namespace(
332
catalog_name,
333
namespace,
334
force,
335
))
336
})
337
.map_err(to_py_err)
338
}
339
340
#[pyo3(signature = (
341
catalog_name, namespace, table_name, schema, table_type, data_source_format, comment,
342
storage_root, properties
343
))]
344
pub fn create_table(
345
&self,
346
py: Python<'_>,
347
catalog_name: &str,
348
namespace: &str,
349
table_name: &str,
350
schema: Option<Wrap<Schema>>,
351
table_type: &str,
352
data_source_format: Option<&str>,
353
comment: Option<&str>,
354
storage_root: Option<&str>,
355
properties: Vec<(String, String)>,
356
) -> PyResult<Py<PyAny>> {
357
let table_info = py.detach(|| {
358
pl_async::get_runtime()
359
.block_in_place_on(
360
self.client().create_table(
361
catalog_name,
362
namespace,
363
table_name,
364
schema.as_ref().map(|x| &x.0),
365
&TableType::from_str(table_type)
366
.map_err(|e| PyValueError::new_err(e.to_string()))?,
367
data_source_format
368
.map(DataSourceFormat::from_str)
369
.transpose()
370
.map_err(|e| PyValueError::new_err(e.to_string()))?
371
.as_ref(),
372
comment,
373
storage_root,
374
&mut properties.iter().map(|(a, b)| (a.as_str(), b.as_str())),
375
),
376
)
377
.map_err(to_py_err)
378
})?;
379
380
table_info_to_pyobject(py, table_info).map(|x| x.into())
381
}
382
383
#[pyo3(signature = (catalog_name, namespace, table_name))]
384
pub fn delete_table(
385
&self,
386
py: Python<'_>,
387
catalog_name: &str,
388
namespace: &str,
389
table_name: &str,
390
) -> PyResult<()> {
391
py.detach(|| {
392
pl_async::get_runtime().block_in_place_on(self.client().delete_table(
393
catalog_name,
394
namespace,
395
table_name,
396
))
397
})
398
.map_err(to_py_err)
399
}
400
401
#[pyo3(signature = (type_json))]
402
#[staticmethod]
403
pub fn type_json_to_polars_type(py: Python<'_>, type_json: &str) -> PyResult<Py<PyAny>> {
404
Ok(Wrap(parse_type_json_str(type_json).map_err(to_py_err)?)
405
.into_pyobject(py)?
406
.unbind())
407
}
408
409
#[pyo3(signature = (catalog_info_cls, namespace_info_cls, table_info_cls, column_info_cls))]
410
#[staticmethod]
411
pub fn init_classes(
412
py: Python<'_>,
413
catalog_info_cls: Py<PyAny>,
414
namespace_info_cls: Py<PyAny>,
415
table_info_cls: Py<PyAny>,
416
column_info_cls: Py<PyAny>,
417
) {
418
CATALOG_INFO_CLS.get_or_init(py, || catalog_info_cls);
419
NAMESPACE_INFO_CLS.get_or_init(py, || namespace_info_cls);
420
TABLE_INFO_CLS.get_or_init(py, || table_info_cls);
421
COLUMN_INFO_CLS.get_or_init(py, || column_info_cls);
422
}
423
}
424
425
impl PyCatalogClient {
426
fn client(&self) -> &CatalogClient {
427
&self.0
428
}
429
}
430
431
fn catalog_info_to_pyobject(
432
py: Python<'_>,
433
CatalogInfo {
434
name,
435
comment,
436
storage_location,
437
properties,
438
options,
439
created_at,
440
created_by,
441
updated_at,
442
updated_by,
443
}: CatalogInfo,
444
) -> PyResult<Bound<'_, PyAny>> {
445
let dict = PyDict::new(py);
446
447
let properties = properties_to_pyobject(py, properties);
448
let options = properties_to_pyobject(py, options);
449
450
pydict_insert_keys!(dict, {
451
name,
452
comment,
453
storage_location,
454
properties,
455
options,
456
created_at,
457
created_by,
458
updated_at,
459
updated_by
460
});
461
462
CATALOG_INFO_CLS
463
.get(py)
464
.unwrap()
465
.bind(py)
466
.call((), Some(&dict))
467
}
468
469
fn namespace_info_to_pyobject(
470
py: Python<'_>,
471
NamespaceInfo {
472
name,
473
comment,
474
properties,
475
storage_location,
476
created_at,
477
created_by,
478
updated_at,
479
updated_by,
480
}: NamespaceInfo,
481
) -> PyResult<Bound<'_, PyAny>> {
482
let dict = PyDict::new(py);
483
484
let properties = properties_to_pyobject(py, properties);
485
486
pydict_insert_keys!(dict, {
487
name,
488
comment,
489
properties,
490
storage_location,
491
created_at,
492
created_by,
493
updated_at,
494
updated_by
495
});
496
497
NAMESPACE_INFO_CLS
498
.get(py)
499
.unwrap()
500
.bind(py)
501
.call((), Some(&dict))
502
}
503
504
fn table_info_to_pyobject(py: Python<'_>, table_info: TableInfo) -> PyResult<Bound<'_, PyAny>> {
505
let TableInfo {
506
name,
507
table_id,
508
table_type,
509
comment,
510
storage_location,
511
data_source_format,
512
columns,
513
properties,
514
created_at,
515
created_by,
516
updated_at,
517
updated_by,
518
} = table_info;
519
520
let column_info_cls = COLUMN_INFO_CLS.get(py).unwrap().bind(py);
521
522
let columns = columns
523
.map(|columns| {
524
columns
525
.into_iter()
526
.map(
527
|ColumnInfo {
528
name,
529
type_name,
530
type_text,
531
type_json,
532
position,
533
comment,
534
partition_index,
535
}| {
536
let dict = PyDict::new(py);
537
538
let name = name.as_str();
539
let type_name = type_name.as_str();
540
let type_text = type_text.as_str();
541
542
pydict_insert_keys!(dict, {
543
name,
544
type_name,
545
type_text,
546
type_json,
547
position,
548
comment,
549
partition_index,
550
});
551
552
column_info_cls.call((), Some(&dict))
553
},
554
)
555
.collect::<PyResult<Vec<_>>>()
556
})
557
.transpose()?;
558
559
let dict = PyDict::new(py);
560
561
let data_source_format = data_source_format.map(|x| x.to_string());
562
let table_type = table_type.to_string();
563
let properties = properties_to_pyobject(py, properties);
564
565
pydict_insert_keys!(dict, {
566
name,
567
comment,
568
table_id,
569
table_type,
570
storage_location,
571
data_source_format,
572
columns,
573
properties,
574
created_at,
575
created_by,
576
updated_at,
577
updated_by,
578
});
579
580
TABLE_INFO_CLS
581
.get(py)
582
.unwrap()
583
.bind(py)
584
.call((), Some(&dict))
585
}
586
587
fn properties_to_pyobject(
588
py: Python<'_>,
589
properties: PlHashMap<PlSmallStr, String>,
590
) -> Bound<'_, PyDict> {
591
let dict = PyDict::new(py);
592
593
for (key, value) in properties.into_iter() {
594
dict.set_item(key.as_str(), value).unwrap();
595
}
596
597
dict
598
}
599
600