Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
CloudPak-Outcomes
GitHub Repository: CloudPak-Outcomes/Outcomes-Projects
Path: blob/main/Data-Product-Hub-L3/client.py
1928 views
1
import time
2
import requests
3
from dotenv import load_dotenv
4
import os
5
import json
6
import urllib3
7
import cowsay
8
from datetime import datetime, timedelta
9
10
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
11
requests.urllib3.disable_warnings()
12
13
bold_blue_start = "\033[1;34m"
14
bold_green_start = "\033[1;32m"
15
bold_red_start = "\033[1;31m"
16
reset = "\033[0m"
17
18
class ImportClient:
19
"""
20
A class to handle import operations for governance artifacts,
21
define connections to data sources, and manage catalog operations in IBM Cloud Pak for Data (CPD).
22
"""
23
24
def __init__(self):
25
"""
26
Initialize the ImportClient instance by loading environment variables
27
and setting up necessary attributes.
28
"""
29
# Load environment variables from .env file
30
os.environ.clear()
31
load_dotenv()
32
33
# Access the CPD_CLUSTER_HOST variable and other environment variables
34
self.cpd_cluster_host = os.getenv("CPD_CLUSTER_HOST")
35
self.username = os.getenv("USERNAME")
36
self.password = os.getenv("PASSWORD")
37
self.catalog_name = os.getenv("CATALOG_NAME")
38
self.project_id = os.getenv("PROJECT_ID")
39
self.base_url = f"https://{self.cpd_cluster_host}"
40
41
# Db2 Warehouse specific variables
42
self.db_username = os.getenv("DB_USERNAME")
43
self.db_password = os.getenv("DB_PASSWORD")
44
self.database = os.getenv("DB_DATABASE")
45
self.host = os.getenv("DB_HOST")
46
self.db_port = "50001"
47
self.db2_name = "Data Warehouse"
48
self.db2_description = "Database that contains warehouse data needed by the business for analytics and AI."
49
#self.owner_id = os.getenv("OWNER_ID")
50
self.origin_country = os.getenv("ORIGIN_COUNTRY")
51
self.db2_datasource_type = os.getenv("DB2_DATASOURCE_TYPE")
52
53
# PSQL specific variables
54
self.psql_username = os.getenv("PSQL_DB_USERNAME")
55
self.psql_password = os.getenv("PSQL_DB_PASSWORD")
56
self.psql_database = os.getenv("PSQL_DB_DATABASE")
57
self.psql_host = os.getenv("PSQL_DB_HOST")
58
self.psql_port = os.getenv("PSQL_DB_PORT")
59
self.psql_name = "Customer Data - PostgreSQL"
60
self.psql_description = "Database that contains warehouse data needed by the business for analytics and AI."
61
#self.owner_id = os.getenv("OWNER_ID")
62
self.origin_country = os.getenv("ORIGIN_COUNTRY")
63
self.psql_datasource_type = os.getenv("PSQL_DATASOURCE_TYPE")
64
65
# Cloud Object Storage specific variables
66
self.cos_bucket = os.getenv("COS_BUCKET")
67
self.cos_secret_key = os.getenv("COS_SECRET_KEY")
68
self.cos_api_key = os.getenv("COS_API_KEY")
69
self.cos_access_key = os.getenv("COS_ACCESS_KEY")
70
self.cos_resource_instance_id = os.getenv("COS_RESOURCE_INSTANCE_ID")
71
self.cos_url = "https://s3.us-south.cloud-object-storage.appdomain.cloud"
72
self.cos_name = "Cloud Object Storage"
73
self.cos_description = "IBM Cloud Object Storage bucket that contains data files used for analytics and AI."
74
self.bearer_token = None
75
76
# Get the catalog ID based on the catalog name
77
self.catalog_id = self.get_catalog_id_by_name()
78
self.db2_id, self.cos_id, self.psql_id = self.fetch_resource_ids()
79
80
81
# Metadata enrichment Parameters
82
self.mde_objective = {
83
"enrichment_options": {
84
"structured": {
85
"profile": True,
86
"assign_terms": True,
87
"analyze_quality": True
88
}
89
},
90
"governance_scope": self.get_category_ids(),
91
"sampling": {
92
"structured": {
93
"method": "TOP",
94
"analysis_method": "FIXED",
95
"sample_size": {
96
"name": "BASIC",
97
"options": {
98
"row_number": 1000,
99
"classify_value_number": 100
100
}
101
}
102
}
103
},
104
"datascope_of_reruns": "DELTA"
105
}
106
107
108
def fetch_resource_ids(self):
109
"""
110
Fetch DB2 Warehouse and Cloud Object Storage asset IDs from the CPD catalog.
111
112
Returns:
113
tuple: (db2_id, cos_id, psql_id)
114
"""
115
bearer_token = self.get_bearer_token()
116
if bearer_token:
117
data = self.get_connections(bearer_token)
118
db2_id = None
119
cos_id = None
120
psql_id = None
121
122
# Iterate through resources to find DB2 Warehouse and Cloud Object Storage
123
for resource in data['resources']:
124
if resource['entity']['name'] == self.db2_name:
125
db2_id = resource['metadata']['asset_id']
126
elif resource['entity']['name'] == self.cos_name:
127
cos_id = resource['metadata']['asset_id']
128
elif resource['entity']['name'] == self.psql_name:
129
psql_id = resource['metadata']['asset_id']
130
131
return db2_id, cos_id, psql_id
132
else:
133
print("Failed to fetch resource IDs. Bearer token not obtained.")
134
return None, None
135
136
137
138
def get_categories(self):
139
"""
140
Retrieves category artifacts
141
142
Args:
143
artifact_ids (List[str]): List of artifact IDs of categories.
144
145
Returns:
146
dict: Category hierarchy paths for the given artifact IDs if the request is successful, otherwise None.
147
"""
148
url = f"{self.base_url}/v3/search?query=metadata.artifact_type:category"
149
150
headers = {
151
'Content-Type': 'application/json',
152
'Authorization': f'Bearer {self.get_bearer_token()}'
153
}
154
155
response = requests.get(url, headers=headers, verify=False)
156
157
if response.status_code == 200:
158
#print('Category hierarchy paths retrieved successfully.')
159
return response.json()
160
else:
161
print('Failed to retrieve category hierarchy paths:', response.status_code, response.text)
162
return None
163
164
def get_category_ids(self):
165
categories_json = self.get_categories()
166
cat_list=[]
167
for i in range(5):
168
row = categories_json['rows'][i] if i < len(categories_json['rows']) else None
169
if row:
170
artifact_id = row.get('artifact_id')
171
if artifact_id:
172
cat_list.append({"type": "CATEGORY", "id": artifact_id})
173
return cat_list
174
175
176
177
def publish_metadata_enrichment_assets(self, metadata_enrichment_area_id, publish_scope = "all_assets", asset_ids=None, filter_criteria=None):
178
"""
179
Publish assets of a Metadata Enrichment Area to a catalog.
180
181
Args:
182
metadata_enrichment_area_id (str): ID of the metadata enrichment area asset.
183
publish_scope (str): Publish scope, allowable values: 'all_assets', 'selected_assets'.
184
catalog_id (str): ID of the catalog to publish assets to.
185
duplicate_action (str): Action if asset already exists, allowable values: 'IGNORE', 'REPLACE', 'UPDATE'.
186
asset_ids (list): List of asset IDs to publish. Required if publish_scope is 'selected_assets'.
187
filter_criteria (dict): Filter criteria for assets. Required if publish_scope is 'selected_assets'.
188
189
Returns:
190
dict: The response of the publish request if successful, otherwise None.
191
"""
192
193
194
195
headers = {
196
'Authorization': f'Bearer {self.get_bearer_token()}',
197
'Content-Type': 'application/json'
198
}
199
200
payload = {
201
"catalog": self.catalog_id,
202
"duplicate_action": "update"
203
}
204
205
if publish_scope == "selected_assets":
206
if asset_ids:
207
payload["asset_ids"] = asset_ids
208
if filter_criteria:
209
payload["filter"] = {"search_criteria": filter_criteria}
210
211
url = f'https://{self.cpd_cluster_host}/v2/metadata_enrichment/metadata_enrichment_area/{metadata_enrichment_area_id}/publish_assets?project_id={self.project_id}&publishScope={publish_scope}'
212
213
response = requests.post(url, json=payload, headers=headers, verify=False)
214
try:
215
response_json = response.json()
216
print(f'Publish Assets Response: {response.status_code}, {response_json}')
217
return response_json
218
except requests.JSONDecodeError:
219
print(f'Error in Publish Assets Response: {response.status_code}, {response.text}')
220
return None
221
222
223
224
def create_and_run_MDE(self, name, mdi_id, enrichment_assets=None, description=None, enrichImmediate=True, job_name=None, job_schedule=None, publish_job_id=None, publish_job_name=None, publish_job_schedule=None, tags=None):
225
"""
226
Create and run a metadata enrichment job.
227
228
Args:
229
name (str): Name of the metadata enrichment asset.
230
objective (object): Objective of the metadata enrichment.
231
target_catalog_id (str): ID of the catalog to store metadata enrichment assets.
232
mdi_id (str): ID of the metadata import.
233
enrichment_assets (list): IDs of assets to enrich metadata.
234
description (str): Description of the metadata enrichment area asset. Default is None.
235
enrichImmediate (bool): Whether to run enrichment immediately after area creation. Default is True.
236
job_name (str): Name of the metadata enrichment job. Default is None.
237
job_schedule (str): Schedule for the metadata enrichment job. Default is None.
238
publish_job_id (str): ID of the metadata publish job. Default is None.
239
publish_job_name (str): Name of the metadata publish job. Default is None.
240
publish_job_schedule (str): Schedule for the metadata publish job. Default is None.
241
tags (list): List of tags. Default is None.
242
243
Returns:
244
dict: The response of the metadata enrichment creation if successful, otherwise None.
245
"""
246
247
try:
248
bearer_token = self.get_bearer_token()
249
250
headers = {
251
'Authorization': f'Bearer {bearer_token}',
252
'Content-Type': 'application/json'
253
}
254
255
payload = {
256
"name": name,
257
"objective": self.mde_objective,
258
"target_catalog_id": self.catalog_id,
259
"data_scope": {
260
"enrichment_assets": enrichment_assets,
261
"container_assets": {
262
"metadata_import": [mdi_id]
263
}
264
},
265
"enrichImmediate": enrichImmediate
266
}
267
268
if description:
269
payload["description"] = description
270
271
if job_name or job_schedule:
272
payload["job"] = {}
273
if job_name:
274
payload["job"]["name"] = job_name
275
if job_schedule:
276
payload["job"]["schedule"] = job_schedule
277
278
if publish_job_id or publish_job_name or publish_job_schedule:
279
payload["publish_job"] = {}
280
if publish_job_id:
281
payload["publish_job"]["id"] = publish_job_id
282
if publish_job_name:
283
payload["publish_job"]["name"] = publish_job_name
284
if publish_job_schedule:
285
payload["publish_job"]["schedule"] = publish_job_schedule
286
287
if tags:
288
payload["data_scope"]["tags"] = tags
289
290
url = f'https://{self.cpd_cluster_host}/v2/metadata_enrichment/metadata_enrichment_area?project_id={self.project_id}'
291
292
# Debugging Information
293
print("---- Debugging Information ----")
294
print(f"URL: {url}")
295
print(f"Headers: {json.dumps(headers, indent=4)}")
296
print(f"Payload: {json.dumps(payload, indent=4)}")
297
print("-------------------------------")
298
299
response = requests.post(url, json=payload, headers=headers, verify=False)
300
301
try:
302
response_json = response.json()
303
print(f'{name}: Metadata Enrichment Creation Response: {response.status_code}')
304
return response_json
305
except requests.JSONDecodeError:
306
print(f'Error in Metadata Enrichment Creation Response: {response.status_code}, {response.text}')
307
return None
308
309
except Exception as e:
310
print(f'An error occurred: {str(e)}')
311
return None
312
313
314
def create_and_run_metadata_enrichment(self, name, mdi_id, enrichment_assets=None, description=None, enrichImmediate=True, job_name=None, job_schedule=None, publish_job_id=None, publish_job_name=None, publish_job_schedule=None, tags=None):
315
"""
316
Create and run a metadata enrichment job.
317
318
Args:
319
name (str): Name of the metadata enrichment asset.
320
objective (object): Objective of the metadata enrichment.
321
target_catalog_id (str): ID of the catalog to store metadata enrichment assets.
322
mdi_id (str): ID of the metadata import.
323
enrichment_assets (list): IDs of assets to enrich metadata.
324
description (str): Description of the metadata enrichment area asset. Default is None.
325
enrichImmediate (bool): Whether to run enrichment immediately after area creation. Default is True.
326
job_name (str): Name of the metadata enrichment job. Default is None.
327
job_schedule (str): Schedule for the metadata enrichment job. Default is None.
328
publish_job_id (str): ID of the metadata publish job. Default is None.
329
publish_job_name (str): Name of the metadata publish job. Default is None.
330
publish_job_schedule (str): Schedule for the metadata publish job. Default is None.
331
tags (list): List of tags. Default is None.
332
333
Returns:
334
dict: The response of the metadata enrichment creation if successful, otherwise None.
335
"""
336
337
338
bearer_token = self.get_bearer_token()
339
340
headers = {
341
'Authorization': f'Bearer {bearer_token}',
342
'Content-Type': 'application/json'
343
}
344
345
payload = {
346
"name": name,
347
"objective": self.mde_objective,
348
"target_catalog_id": self.catalog_id,
349
"data_scope": {
350
"enrichment_assets": enrichment_assets,
351
"container_assets": {
352
"metadata_import": [mdi_id]
353
}
354
},
355
"enrichImmediate": enrichImmediate
356
}
357
358
if description:
359
payload["description"] = description
360
361
if job_name or job_schedule:
362
payload["job"] = {}
363
if job_name:
364
payload["job"]["name"] = job_name
365
if job_schedule:
366
payload["job"]["schedule"] = job_schedule
367
368
if publish_job_id or publish_job_name or publish_job_schedule:
369
payload["publish_job"] = {}
370
if publish_job_id:
371
payload["publish_job"]["id"] = publish_job_id
372
if publish_job_name:
373
payload["publish_job"]["name"] = publish_job_name
374
if publish_job_schedule:
375
payload["publish_job"]["schedule"] = publish_job_schedule
376
377
if tags:
378
payload["data_scope"]["tags"] = tags
379
380
url = f'https://{self.cpd_cluster_host}/v2/metadata_enrichment/metadata_enrichment_area?project_id={self.project_id}'
381
382
response = requests.post(url, json=payload, headers=headers, verify=False)
383
try:
384
response_json = response.json()
385
print(f'{name}: Metadata Enrichment Creation Response: {response.status_code}')
386
return response_json
387
except requests.JSONDecodeError:
388
print(f'Error in Metadata Enrichment Creation Response: {response.status_code}, {response.text}')
389
return None
390
391
392
def update_mde_settings(self):
393
"""
394
Update the settings for metadata enrichment.
395
396
Args:
397
mde_settings (dict): The metadata enrichment settings.
398
399
Returns:
400
dict: The updated settings data.
401
"""
402
403
# Metadata enrichment settings
404
mde_settings = {
405
'advanced_profiling': {'unique_value_table': {'count': 1000}},
406
'semantic_expansion': {'name_expansion': True,
407
'name_expansion_configuration': {'assignment_threshold': 0.9,
408
'suggestion_threshold': 0.75},
409
'description_generation': True,
410
'description_generation_configuration': {'assignment_threshold': 0.9,
411
'suggestion_threshold': 0.75}},
412
'term_assignment': {'class_based_assignments': False,
413
'term_assignment_threshold': 0.9,
414
'term_suggestion_threshold': 0.75,
415
'name_matching': True,
416
'ml_based_assignments_default': False,
417
'ml_based_assignments_custom': False,
418
'evaluate_negative_assignments': True,
419
'default_ml_configuration': {'catalog_id': self.catalog_id,
420
'scope': 'catalog'},
421
'llm_based_assignments': False},
422
'structured_profiling': {'null_threshold': 0.05,
423
'uniqueness_threshold': 0.95,
424
'constant_threshold': 0.99,
425
'quality_score_threshold': 0.8,
426
'data_class_assignment_threshold': 0.75,
427
'data_class_suggestion_threshold': 0.25,
428
'dq_exceptions_database': {'count': 100}},
429
'key_analysis': {'pk_shallow_analysis_config': {'min_confidence': 0.8},
430
'fk_shallow_analysis_config': {'min_confidence': 0.8,
431
'auto_selection': False,
432
'auto_selection_threshold': 0.9}}
433
}
434
435
436
437
headers = {
438
'Authorization': f'Bearer {self.get_bearer_token()}',
439
'Content-Type': 'application/json'
440
}
441
442
endpoint = f"https://{self.cpd_cluster_host}/v2/metadata_enrichment/metadata_enrichment_area/settings"
443
params = {
444
'project_id': self.project_id
445
}
446
447
response = requests.put(endpoint, headers=headers, params=params, json=mde_settings)
448
449
# Print response status code and text for debugging
450
if response.status_code == 200:
451
print("Metadata enrichment settings updated successfully.")
452
return None
453
454
455
def get_metadata_import_details(self, metadata_import_id):
456
"""
457
Retrieve details of a metadata import.
458
459
Args:
460
metadata_import_id (str): Id of the metadata import asset.
461
project_id (str): Id of the project.
462
463
Returns:
464
dict: Details of the metadata import if the request is successful, otherwise None.
465
"""
466
url = f"{self.base_url}/v2/metadata_imports/{metadata_import_id}"
467
468
# Query parameters
469
params = {
470
'project_id': self.project_id
471
}
472
473
# Headers
474
headers = {
475
'Content-Type': 'application/json',
476
'Authorization': f'Bearer {self.get_bearer_token()}'
477
}
478
479
response = requests.get(url, headers=headers, params=params, verify=False)
480
481
if response.status_code == 200:
482
return response.json()
483
else:
484
print(f"Failed to retrieve metadata import details: {response.status_code}")
485
print(response.text)
486
return None
487
488
def create_and_run_metadata_import(self, connection_id, paths_scope, name="Metadata Import"):
489
"""
490
Create a metadata import, create a job, patch the metadata import, and run the job.
491
492
Args:
493
connection_id (str): The connection ID for the import.
494
paths_scope (list): List of paths for the metadata import scope.
495
name (str): Name of the metadata import. Defaults to "Metadata Import".
496
497
Returns:
498
dict: The response of the job run if successful, otherwise None.
499
"""
500
# Replace with actual values
501
cpd_url = self.cpd_cluster_host
502
project_id = self.project_id
503
504
# Get the bearer token
505
bearer_token = self.get_bearer_token()
506
507
headers = {
508
'Authorization': f'Bearer {bearer_token}',
509
'Content-Type': 'application/json'
510
}
511
512
# 1. Create an MDI
513
mdi_url = f'https://{cpd_url}/v2/metadata_imports?project_id={project_id}'
514
mdi_payload = {
515
"connection_id": connection_id,
516
"name": name,
517
"project_id": project_id,
518
"target_project_id": project_id,
519
"import_type": "metadata",
520
"scope": {"paths": paths_scope},
521
"extra_properties": {
522
"include_primary_key": "true",
523
"exclude_tables": "false",
524
"metadata_from_catalog_table_only": "true",
525
"include_foreign_key": "false",
526
"exclude_views": "false"
527
}
528
}
529
mdi_response = requests.post(mdi_url, json=mdi_payload, headers=headers)
530
try:
531
mdi_response_json = mdi_response.json()
532
print(f'MDI Creation Response: {mdi_response.status_code}')
533
except requests.JSONDecodeError:
534
print(f'Error in MDI Creation Response: {mdi_response.status_code}, {mdi_response.text}')
535
mdi_response_json = {}
536
537
mdi_id = mdi_response_json.get('metadata', {}).get('asset_id')
538
539
540
# Proceed only if MDI creation was successful
541
if mdi_id:
542
# 2. Create a job
543
job_url = f'https://{cpd_url}/v2/jobs?project_id={project_id}'
544
job_payload = {
545
"job": {
546
"asset_ref": mdi_id,
547
"name": name+" Job",
548
"description": " ",
549
"configuration": {}
550
}
551
}
552
job_response = requests.post(job_url, json=job_payload, headers=headers)
553
try:
554
job_response_json = job_response.json()
555
print(f'Job Creation Response: {job_response.status_code}')
556
557
except requests.JSONDecodeError:
558
print(f'Error in Job Creation Response: {job_response.status_code}, {job_response.text}')
559
job_response_json = {}
560
561
job_id = job_response_json.get('metadata', {}).get('asset_id')
562
563
# Proceed only if job creation was successful
564
if job_id:
565
# 3. Patch metadata Import with the job id
566
patch_url = f'https://{cpd_url}/v2/metadata_imports/{mdi_id}?project_id={project_id}'
567
patch_payload = {
568
"job_id": job_id
569
}
570
patch_response = requests.patch(patch_url, json=patch_payload, headers=headers)
571
try:
572
patch_response_json = patch_response.json()
573
print(f'Patch MDI Response: {patch_response.status_code}')
574
except requests.JSONDecodeError:
575
print(f'Error in Patch MDI Response: {patch_response.status_code}, {patch_response.text}')
576
patch_response_json = {}
577
578
# Proceed only if patching was successful
579
if patch_response.status_code == 200:
580
# 4. Run the Job (create a job-run)
581
run_url = f'https://{cpd_url}/v2/jobs/{job_id}/runs?project_id={project_id}&job_id={job_id}'
582
run_payload = {
583
"job_run": {}
584
}
585
run_response = requests.post(run_url, json=run_payload, headers=headers)
586
try:
587
run_response_json = run_response.json()
588
print(f'Run Job Response: {run_response.status_code}')
589
print()
590
return mdi_id, run_response_json
591
except requests.JSONDecodeError:
592
print(f'Error in Run Job Response: {run_response.status_code}, {run_response.text}')
593
print('Detailed error information:', run_response.text)
594
else:
595
print('Failed to patch MDI with job ID, terminating the process.')
596
else:
597
print('Job creation failed, terminating the process.')
598
else:
599
print('MDI creation failed, terminating the process.')
600
601
return None
602
603
604
def verify_vars(self):
605
"""
606
Verify and print the environment variables and instance attributes.
607
"""
608
vars_dict = self.__dict__
609
for key, value in vars_dict.items():
610
if not key.startswith('__') and not callable(value):
611
print(f"{bold_blue_start}{key}{reset}: {value}")
612
613
def get_bearer_token(self):
614
"""
615
Obtain a bearer token for authentication.
616
617
Returns:
618
str: The bearer token if the request is successful, otherwise None.
619
"""
620
url = f"https://{self.cpd_cluster_host}/icp4d-api/v1/authorize"
621
headers = {
622
"cache-control": "no-cache",
623
"content-type": "application/json"
624
}
625
payload = {
626
"username": self.username,
627
"password": self.password
628
}
629
630
response = requests.post(url, headers=headers, data=json.dumps(payload), verify=False)
631
632
if response.status_code == 200:
633
data = response.json()
634
return data.get('token') # Return the token
635
else:
636
print(f"Request failed with status code {response.status_code}")
637
print(response.text)
638
return None
639
640
641
def get_connections(self, bearer):
642
"""
643
Returns the connections active in catalog.
644
645
Returns:
646
str: The catalogs if the request is successful, otherwise None.
647
"""
648
url = f"https://{self.cpd_cluster_host}/v2/connections" # Ensure this is the correct endpoint
649
headers = {
650
"Inject-Token": "false", # Update to "true" if token injection is needed
651
"Authorization": f"Bearer {bearer}", # Insert the actual bearer token
652
"cache-control": "no-cache",
653
"content-type": "application/json"
654
}
655
656
# Query parameters
657
params = {
658
"project_id": self.project_id,
659
"sort": "-metadata.create_time",
660
"limit": 50
661
}
662
663
response = requests.get(url, headers=headers, params=params, verify=False)
664
665
if response.status_code == 200:
666
#print("Authenticated request was successful")
667
data = response.json()
668
#print(json.dumps(data, indent=4))
669
return data
670
else:
671
print(f"Authenticated request failed with status code {response.status_code}")
672
print(response.text)
673
674
675
def define_cos_connection(self, bearer, catalog=True):
676
"""
677
Define a connection to Cloud Object Storage (COS).
678
679
Args:
680
bearer (str): Bearer token for authentication.
681
"""
682
if catalog == True:
683
url = f"{self.base_url}/v2/connections?catalog_id={self.catalog_id}"
684
else:
685
url = f"{self.base_url}/v2/connections?project_id={self.project_id}"
686
687
headers = {
688
"Authorization": f"Bearer {bearer}",
689
"cache-control": "no-cache",
690
"content-type": "application/json",
691
"Skip-Enforcement": "false"
692
}
693
694
# Request body
695
payload = {
696
"datasource_type": "193a97c1-4475-4a19-b90c-295c4fdc6517",
697
"name": self.cos_name,
698
"description": self.cos_description,
699
"properties": {
700
"bucket": self.cos_bucket,
701
"secret_key": self.cos_secret_key,
702
"trust_all_ssl_cert": "false",
703
"auth_method": "instanceid_apikey_accesskey_secretkey",
704
"iam_url": "https://iam.cloud.ibm.com/identity/token",
705
"api_key": self.cos_api_key,
706
"resource_instance_id": self.cos_resource_instance_id,
707
"access_key": self.cos_access_key,
708
"url": self.cos_url
709
},
710
"origin_country": self.origin_country,
711
"data_source_definition_searchable": f"{self.cos_url}|{self.cos_bucket}"
712
}
713
714
response = requests.post(url, headers=headers, data=json.dumps(payload), verify=False)
715
716
if response.status_code == 200 or response.status_code == 201:
717
print(f"Connection to {bold_blue_start}Cloud Object Storage{reset} defined successfully!")
718
data = response.json()
719
self.cos_id = data.get('metadata', {}).get('asset_id')
720
else:
721
print(f"Failed to define connection with status code {response.status_code}")
722
print(response.text)
723
724
def define_db2_connection(self, bearer, catalog=True):
725
"""
726
Define a connection to a DB2 Warehouse.
727
728
Args:
729
bearer (str): Bearer token for authentication.
730
"""
731
if catalog == True:
732
url = f"{self.base_url}/v2/connections?catalog_id={self.catalog_id}"
733
else:
734
url = f"{self.base_url}/v2/connections?project_id={self.project_id}"
735
736
headers = {
737
"Authorization": f"Bearer {bearer}",
738
"cache-control": "no-cache",
739
"content-type": "application/json",
740
"Skip-Enforcement": "false"
741
}
742
743
# Request body
744
payload = {
745
"datasource_type": self.db2_datasource_type,
746
"name": self.db2_name,
747
"description": self.db2_description,
748
"properties": {
749
"database": self.database,
750
"auth_method": "username_password",
751
"password": self.db_password,
752
"port": self.db_port, # Use the correct port
753
"host": self.host,
754
"ssl": "true",
755
"username": self.db_username
756
},
757
"origin_country": self.origin_country,
758
"data_source_definition_searchable": f"{self.host}|50001|{self.database}"
759
}
760
761
response = requests.post(url, headers=headers, data=json.dumps(payload), verify=False)
762
763
if response.status_code == 200 or response.status_code == 201:
764
print(f"Connection to {bold_blue_start}DB2 Warehouse{reset} defined successfully!")
765
data = response.json()
766
self.db2_id = data.get('metadata', {}).get('asset_id')
767
else:
768
print(f"Failed to define connection with status code {response.status_code}")
769
print(response.text)
770
771
def define_psql_connection(self, bearer,catalog=True):
772
"""
773
Define a connection to a PostgreSQL Database.
774
775
Args:
776
bearer (str): Bearer token for authentication.
777
"""
778
if catalog == True:
779
url = f"{self.base_url}/v2/connections?catalog_id={self.catalog_id}"
780
else:
781
url = f"{self.base_url}/v2/connections?project_id={self.project_id}"
782
783
headers = {
784
"Authorization": f"Bearer {bearer}",
785
"cache-control": "no-cache",
786
"content-type": "application/json",
787
"Skip-Enforcement": "false"
788
}
789
790
# Request body
791
payload = {
792
"datasource_type": self.psql_datasource_type,
793
"name": self.psql_name,
794
"description": self.psql_description,
795
"properties": {
796
"database": self.psql_database,
797
"password": self.psql_password,
798
"port": self.psql_port,
799
"host": self.psql_host,
800
"username": self.psql_username
801
},
802
"origin_country": self.origin_country,
803
"data_source_definition_searchable": f"{self.host}|{self.db_port}|{self.database}"
804
}
805
806
response = requests.post(url, headers=headers, data=json.dumps(payload), verify=False)
807
808
if response.status_code == 200 or response.status_code == 201:
809
print(f"Connection to {bold_blue_start} PostgreSQL{reset} defined successfully!")
810
data = response.json()
811
self.psql_id = data.get('metadata', {}).get('asset_id')
812
else:
813
print(f"Failed to define connection with status code {response.status_code}")
814
print(response.text)
815
816
817
def import_governance_artifacts(self, zip_file_path):
818
"""
819
Import governance artifacts from a ZIP file.
820
821
Args:
822
zip_file_path (str): Path to the ZIP file containing governance artifacts.
823
824
Returns:
825
str: Process ID if the import is in progress, otherwise None.
826
"""
827
bearer = self.get_bearer_token()
828
if not bearer:
829
print("Failed to obtain bearer token.")
830
return None
831
832
url = f"{self.base_url}/v3/governance_artifact_types/import"
833
headers = {
834
"Authorization": f"Bearer {bearer}"
835
}
836
837
with open(zip_file_path, 'rb') as file:
838
files = {'file': file}
839
response = requests.post(url, headers=headers, files=files, params={'merge_option': 'specified'}, verify=False)
840
841
if response.status_code in [200, 201]:
842
print("Imported governance artifacts successfully.")
843
return None
844
elif response.status_code == 202:
845
process_id = response.json().get("process_id")
846
print(f"Process ID: {process_id}")
847
return process_id
848
else:
849
print(f"Failed to import governance artifacts with status code {response.status_code}")
850
print(response.text)
851
return None
852
853
def check_import_status(self, process_id):
854
"""
855
Check the status of the import process.
856
857
Args:
858
process_id (str): Process ID of the import process.
859
860
Returns:
861
dict: Status data if the request is successful, otherwise None.
862
"""
863
bearer = self.get_bearer_token()
864
if not bearer:
865
print("Failed to obtain bearer token.")
866
return None
867
868
url = f"{self.base_url}/v3/governance_artifact_types/import/status/{process_id}"
869
headers = {
870
"Authorization": f"Bearer {bearer}"
871
}
872
873
response = requests.get(url, headers=headers, verify=False)
874
875
if response.status_code == 200:
876
status_data = response.json()
877
return status_data
878
else:
879
print(f"Failed to check import status with status code {response.status_code}")
880
print(response.text)
881
return None
882
883
def main_import_process(self, zip_file_path, process_id=None):
884
"""
885
Main script for importing governance artifacts and checking status.
886
887
Args:
888
zip_file_path (str): Path to the ZIP file containing governance artifacts.
889
process_id (str, optional): Process ID of the ongoing import process.
890
"""
891
start_time = datetime.now()
892
if not process_id:
893
process_id = self.import_governance_artifacts(zip_file_path)
894
895
if process_id:
896
while True:
897
status_data = self.check_import_status(process_id)
898
if not status_data:
899
break
900
901
status = status_data.get("status")
902
step_number = status_data.get("step_number")
903
total_steps = status_data.get("total_steps")
904
905
elapsed_time = datetime.now() - start_time
906
elapsed_time_str = str(timedelta(seconds=elapsed_time.seconds))
907
908
percent_complete = (step_number / total_steps) * 100
909
print(f"Governance Artifacts Import Progress: {percent_complete:.2f}%")
910
print(f"Time elapsed: {elapsed_time_str}")
911
912
if status == "SUCCEEDED" or step_number == total_steps:
913
success_message=f"✅✅✅ {bold_green_start}Governance Artifacts import process completed successfully.{reset} ✅✅✅"
914
print(success_message)
915
break
916
elif status == "FAILED":
917
fail_message=f"❌😢❌😢❌ {bold_red_start}Import process failed.{reset} ❌😢❌😢❌"
918
cowsay.beavis(fail_message)
919
break
920
else:
921
wip_message = "🚧🚧 Governance Artifacts import process is still in progress. Checking again in 60 seconds... 🚧🚧"
922
print(wip_message)
923
print("")
924
time.sleep(60)
925
926
def get_catalogs(self, bearer):
927
"""
928
Get the list of catalogs.
929
930
Args:
931
bearer (str): Bearer token for authentication.
932
933
Returns:
934
list: List of catalogs with their GUID and name.
935
"""
936
url = f"{self.base_url}/v2/catalogs"
937
headers = {
938
"Authorization": f"Bearer {bearer}",
939
"cache-control": "no-cache",
940
"content-type": "application/json"
941
}
942
943
response = requests.get(url, headers=headers, verify=False)
944
945
if response.status_code == 200:
946
catalogs_data = response.json()
947
catalogs = catalogs_data.get("catalogs", [])
948
catalog_info = [{"guid": catalog["metadata"]["guid"], "name": catalog["entity"]["name"]} for catalog in catalogs]
949
return catalog_info
950
else:
951
print(f"Failed to get catalogs with status code {response.status_code}")
952
print(response.text)
953
return None
954
955
def get_catalog_id_by_name(self):
956
"""
957
Get the catalog ID based on the catalog name.
958
959
Returns:
960
str: Catalog ID if found, otherwise None.
961
"""
962
bearer_token = self.get_bearer_token()
963
if bearer_token:
964
catalogs = self.get_catalogs(bearer_token)
965
if catalogs:
966
for catalog in catalogs:
967
if catalog["name"] == self.catalog_name:
968
return catalog["guid"]
969
print(f"Catalog with name {self.catalog_name} not found.")
970
return None
971
972
def run_main_import_process(self):
973
"""
974
Main script for importing governance artifacts and checking status.
975
"""
976
self.verify_vars()
977
978
979
# Check Bearer Token is valid
980
self.bearer_token = self.get_bearer_token()
981
982
print("")
983
984
# Define Cloud Object Storage Connection
985
self.define_cos_connection(self.bearer_token, catalog=False)
986
987
print("")
988
989
# Define DB2 Warehouse Connection
990
self.define_db2_connection(self.bearer_token, catalog=False)
991
print("")
992
993
# Define PostgreSQL Connection
994
self.define_psql_connection(self.bearer_token, catalog=False)
995
print("")
996
997
# Import Governance Artifacts: Categories, Business Terms, Policies, and Rules
998
self.main_import_process("governance_artifacts.zip", process_id=None)
999
print("Governance artifacts import started...")
1000
print("")
1001
print("Beginning metadata import!")
1002
# Create and run the metadata import DB2 Warehouse
1003
path_db2=["/EMPLOYEE/EMPLOYEE_HISTORY","/EMPLOYEE/EMPLOYEE_RECORDS","/EMPLOYEE/EMPLOYEE_SUMMARY","/EMPLOYEE/EMPLOYEE"]
1004
db2_mdid, db2_mdi_response = self.create_and_run_metadata_import(self.db2_id, path_db2, name="DB2 Metadata Import")
1005
print("DB2 Warehouse metadata import completed!")
1006
1007
# Create and run the metadata import COS
1008
path_cos = ["/cpd-outcomes/Warehouse/WAREHOUSE_ASSIGNED_SHIFTS.csv","/cpd-outcomes/Warehouse/WAREHOUSE_SHIFTS.csv","/cpd-outcomes/Warehouse/WAREHOUSE_STAFF.csv","/cpd-outcomes/Warehouse/WAREHOUSE_STAFFING.csv"]
1009
cos_mdid, cos_mdi_response = self.create_and_run_metadata_import(self.cos_id, path_cos, name="Cloud Object Storage Metadata Import")
1010
print("Cloud Object Storage metadata import completed!")
1011
1012
# Create and run the metadata import PostgreSQL
1013
path_psql = ["/CUSTOMER/CUSTOMER_LOYALTY"]
1014
psql_mdid, psql_mdi_response = self.create_and_run_metadata_import(self.psql_id, path_psql, name="PostgreSQL Metadata Import")
1015
print("PostgreSQL metadata import completed!")
1016
print("")
1017
time.sleep(90)
1018
print("Beginning metadata enrichment...")
1019
1020
# Run metadata enrichment
1021
db2_result = self.create_and_run_metadata_enrichment(
1022
name="Db2 Warehouse MDE",
1023
mdi_id=db2_mdid,
1024
job_name=self.db2_name + " Enrichment Job",
1025
publish_job_name=self.db2_name + " publish Job",
1026
)
1027
print("DB2 Warehouse metadata enrichment completed!")
1028
1029
cos_result = self.create_and_run_metadata_enrichment(
1030
name="Cloud Object Storage Enrichment",
1031
mdi_id=cos_mdid,
1032
job_name=self.cos_name + " Enrichment Job",
1033
publish_job_name=self.cos_name + " publish Job",
1034
)
1035
print("Cloud Object Storage metadata enrichment completed!")
1036
1037
psql_result = self.create_and_run_metadata_enrichment(
1038
name="PostgreSQLMDE",
1039
mdi_id=psql_mdid,
1040
job_name=self.psql_name + " Enrichment Job",
1041
publish_job_name=self.psql_name + " publish Job",
1042
)
1043
print("PostgreSQL metadata enrichment completed!")
1044
1045
time.sleep(300)
1046
print("")
1047
print("Beginning metadata enrichment asset publishing")
1048
1049
# Publish metadata enrichment assets
1050
db2_mde_id = db2_result.get("metadata", {}).get("asset_id")
1051
db2_publish_result = self.publish_metadata_enrichment_assets(db2_mde_id)
1052
print("DB2 Warehouse metadata enrichment assets published!")
1053
1054
cos_mde_id = cos_result.get("metadata", {}).get("asset_id")
1055
cos_publish_result = self.publish_metadata_enrichment_assets(cos_mde_id)
1056
print("Cloud Object Storage metadata enrichment assets published!")
1057
1058
psql_mde_id = psql_result.get("metadata", {}).get("asset_id")
1059
psql_publish_result = self.publish_metadata_enrichment_assets(psql_mde_id)
1060
print("PostgreSQL metadata enrichment assets published!")
1061
print("")
1062
cowsay.stegosaurus("Congratulations! You have successfully imported governance artifacts, add data connections and enriched metadata! You can now begin the Data Product Hub portion of the lab.")
1063
# Example usage:
1064
# client = ImportClient()
1065
# client.verify_vars()
1066
# client.main_import_process('/path/to/your/artifacts.zip')
1067
1068