Path: blob/main/Data-Product-Hub-L3/client.py
1928 views
import time1import requests2from dotenv import load_dotenv3import os4import json5import urllib36import cowsay7from datetime import datetime, timedelta89urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)10requests.urllib3.disable_warnings()1112bold_blue_start = "\033[1;34m"13bold_green_start = "\033[1;32m"14bold_red_start = "\033[1;31m"15reset = "\033[0m"1617class ImportClient:18"""19A class to handle import operations for governance artifacts,20define connections to data sources, and manage catalog operations in IBM Cloud Pak for Data (CPD).21"""2223def __init__(self):24"""25Initialize the ImportClient instance by loading environment variables26and setting up necessary attributes.27"""28# Load environment variables from .env file29os.environ.clear()30load_dotenv()3132# Access the CPD_CLUSTER_HOST variable and other environment variables33self.cpd_cluster_host = os.getenv("CPD_CLUSTER_HOST")34self.username = os.getenv("USERNAME")35self.password = os.getenv("PASSWORD")36self.catalog_name = os.getenv("CATALOG_NAME")37self.project_id = os.getenv("PROJECT_ID")38self.base_url = f"https://{self.cpd_cluster_host}"3940# Db2 Warehouse specific variables41self.db_username = os.getenv("DB_USERNAME")42self.db_password = os.getenv("DB_PASSWORD")43self.database = os.getenv("DB_DATABASE")44self.host = os.getenv("DB_HOST")45self.db_port = "50001"46self.db2_name = "Data Warehouse"47self.db2_description = "Database that contains warehouse data needed by the business for analytics and AI."48#self.owner_id = os.getenv("OWNER_ID")49self.origin_country = os.getenv("ORIGIN_COUNTRY")50self.db2_datasource_type = os.getenv("DB2_DATASOURCE_TYPE")5152# PSQL specific variables53self.psql_username = os.getenv("PSQL_DB_USERNAME")54self.psql_password = os.getenv("PSQL_DB_PASSWORD")55self.psql_database = os.getenv("PSQL_DB_DATABASE")56self.psql_host = os.getenv("PSQL_DB_HOST")57self.psql_port = os.getenv("PSQL_DB_PORT")58self.psql_name = "Customer Data - PostgreSQL"59self.psql_description = "Database that contains warehouse data needed by the business for analytics and AI."60#self.owner_id = os.getenv("OWNER_ID")61self.origin_country = os.getenv("ORIGIN_COUNTRY")62self.psql_datasource_type = os.getenv("PSQL_DATASOURCE_TYPE")6364# Cloud Object Storage specific variables65self.cos_bucket = os.getenv("COS_BUCKET")66self.cos_secret_key = os.getenv("COS_SECRET_KEY")67self.cos_api_key = os.getenv("COS_API_KEY")68self.cos_access_key = os.getenv("COS_ACCESS_KEY")69self.cos_resource_instance_id = os.getenv("COS_RESOURCE_INSTANCE_ID")70self.cos_url = "https://s3.us-south.cloud-object-storage.appdomain.cloud"71self.cos_name = "Cloud Object Storage"72self.cos_description = "IBM Cloud Object Storage bucket that contains data files used for analytics and AI."73self.bearer_token = None7475# Get the catalog ID based on the catalog name76self.catalog_id = self.get_catalog_id_by_name()77self.db2_id, self.cos_id, self.psql_id = self.fetch_resource_ids()787980# Metadata enrichment Parameters81self.mde_objective = {82"enrichment_options": {83"structured": {84"profile": True,85"assign_terms": True,86"analyze_quality": True87}88},89"governance_scope": self.get_category_ids(),90"sampling": {91"structured": {92"method": "TOP",93"analysis_method": "FIXED",94"sample_size": {95"name": "BASIC",96"options": {97"row_number": 1000,98"classify_value_number": 10099}100}101}102},103"datascope_of_reruns": "DELTA"104}105106107def fetch_resource_ids(self):108"""109Fetch DB2 Warehouse and Cloud Object Storage asset IDs from the CPD catalog.110111Returns:112tuple: (db2_id, cos_id, psql_id)113"""114bearer_token = self.get_bearer_token()115if bearer_token:116data = self.get_connections(bearer_token)117db2_id = None118cos_id = None119psql_id = None120121# Iterate through resources to find DB2 Warehouse and Cloud Object Storage122for resource in data['resources']:123if resource['entity']['name'] == self.db2_name:124db2_id = resource['metadata']['asset_id']125elif resource['entity']['name'] == self.cos_name:126cos_id = resource['metadata']['asset_id']127elif resource['entity']['name'] == self.psql_name:128psql_id = resource['metadata']['asset_id']129130return db2_id, cos_id, psql_id131else:132print("Failed to fetch resource IDs. Bearer token not obtained.")133return None, None134135136137def get_categories(self):138"""139Retrieves category artifacts140141Args:142artifact_ids (List[str]): List of artifact IDs of categories.143144Returns:145dict: Category hierarchy paths for the given artifact IDs if the request is successful, otherwise None.146"""147url = f"{self.base_url}/v3/search?query=metadata.artifact_type:category"148149headers = {150'Content-Type': 'application/json',151'Authorization': f'Bearer {self.get_bearer_token()}'152}153154response = requests.get(url, headers=headers, verify=False)155156if response.status_code == 200:157#print('Category hierarchy paths retrieved successfully.')158return response.json()159else:160print('Failed to retrieve category hierarchy paths:', response.status_code, response.text)161return None162163def get_category_ids(self):164categories_json = self.get_categories()165cat_list=[]166for i in range(5):167row = categories_json['rows'][i] if i < len(categories_json['rows']) else None168if row:169artifact_id = row.get('artifact_id')170if artifact_id:171cat_list.append({"type": "CATEGORY", "id": artifact_id})172return cat_list173174175176def publish_metadata_enrichment_assets(self, metadata_enrichment_area_id, publish_scope = "all_assets", asset_ids=None, filter_criteria=None):177"""178Publish assets of a Metadata Enrichment Area to a catalog.179180Args:181metadata_enrichment_area_id (str): ID of the metadata enrichment area asset.182publish_scope (str): Publish scope, allowable values: 'all_assets', 'selected_assets'.183catalog_id (str): ID of the catalog to publish assets to.184duplicate_action (str): Action if asset already exists, allowable values: 'IGNORE', 'REPLACE', 'UPDATE'.185asset_ids (list): List of asset IDs to publish. Required if publish_scope is 'selected_assets'.186filter_criteria (dict): Filter criteria for assets. Required if publish_scope is 'selected_assets'.187188Returns:189dict: The response of the publish request if successful, otherwise None.190"""191192193194headers = {195'Authorization': f'Bearer {self.get_bearer_token()}',196'Content-Type': 'application/json'197}198199payload = {200"catalog": self.catalog_id,201"duplicate_action": "update"202}203204if publish_scope == "selected_assets":205if asset_ids:206payload["asset_ids"] = asset_ids207if filter_criteria:208payload["filter"] = {"search_criteria": filter_criteria}209210url = f'https://{self.cpd_cluster_host}/v2/metadata_enrichment/metadata_enrichment_area/{metadata_enrichment_area_id}/publish_assets?project_id={self.project_id}&publishScope={publish_scope}'211212response = requests.post(url, json=payload, headers=headers, verify=False)213try:214response_json = response.json()215print(f'Publish Assets Response: {response.status_code}, {response_json}')216return response_json217except requests.JSONDecodeError:218print(f'Error in Publish Assets Response: {response.status_code}, {response.text}')219return None220221222223def create_and_run_MDE(self, name, mdi_id, enrichment_assets=None, description=None, enrichImmediate=True, job_name=None, job_schedule=None, publish_job_id=None, publish_job_name=None, publish_job_schedule=None, tags=None):224"""225Create and run a metadata enrichment job.226227Args:228name (str): Name of the metadata enrichment asset.229objective (object): Objective of the metadata enrichment.230target_catalog_id (str): ID of the catalog to store metadata enrichment assets.231mdi_id (str): ID of the metadata import.232enrichment_assets (list): IDs of assets to enrich metadata.233description (str): Description of the metadata enrichment area asset. Default is None.234enrichImmediate (bool): Whether to run enrichment immediately after area creation. Default is True.235job_name (str): Name of the metadata enrichment job. Default is None.236job_schedule (str): Schedule for the metadata enrichment job. Default is None.237publish_job_id (str): ID of the metadata publish job. Default is None.238publish_job_name (str): Name of the metadata publish job. Default is None.239publish_job_schedule (str): Schedule for the metadata publish job. Default is None.240tags (list): List of tags. Default is None.241242Returns:243dict: The response of the metadata enrichment creation if successful, otherwise None.244"""245246try:247bearer_token = self.get_bearer_token()248249headers = {250'Authorization': f'Bearer {bearer_token}',251'Content-Type': 'application/json'252}253254payload = {255"name": name,256"objective": self.mde_objective,257"target_catalog_id": self.catalog_id,258"data_scope": {259"enrichment_assets": enrichment_assets,260"container_assets": {261"metadata_import": [mdi_id]262}263},264"enrichImmediate": enrichImmediate265}266267if description:268payload["description"] = description269270if job_name or job_schedule:271payload["job"] = {}272if job_name:273payload["job"]["name"] = job_name274if job_schedule:275payload["job"]["schedule"] = job_schedule276277if publish_job_id or publish_job_name or publish_job_schedule:278payload["publish_job"] = {}279if publish_job_id:280payload["publish_job"]["id"] = publish_job_id281if publish_job_name:282payload["publish_job"]["name"] = publish_job_name283if publish_job_schedule:284payload["publish_job"]["schedule"] = publish_job_schedule285286if tags:287payload["data_scope"]["tags"] = tags288289url = f'https://{self.cpd_cluster_host}/v2/metadata_enrichment/metadata_enrichment_area?project_id={self.project_id}'290291# Debugging Information292print("---- Debugging Information ----")293print(f"URL: {url}")294print(f"Headers: {json.dumps(headers, indent=4)}")295print(f"Payload: {json.dumps(payload, indent=4)}")296print("-------------------------------")297298response = requests.post(url, json=payload, headers=headers, verify=False)299300try:301response_json = response.json()302print(f'{name}: Metadata Enrichment Creation Response: {response.status_code}')303return response_json304except requests.JSONDecodeError:305print(f'Error in Metadata Enrichment Creation Response: {response.status_code}, {response.text}')306return None307308except Exception as e:309print(f'An error occurred: {str(e)}')310return None311312313def create_and_run_metadata_enrichment(self, name, mdi_id, enrichment_assets=None, description=None, enrichImmediate=True, job_name=None, job_schedule=None, publish_job_id=None, publish_job_name=None, publish_job_schedule=None, tags=None):314"""315Create and run a metadata enrichment job.316317Args:318name (str): Name of the metadata enrichment asset.319objective (object): Objective of the metadata enrichment.320target_catalog_id (str): ID of the catalog to store metadata enrichment assets.321mdi_id (str): ID of the metadata import.322enrichment_assets (list): IDs of assets to enrich metadata.323description (str): Description of the metadata enrichment area asset. Default is None.324enrichImmediate (bool): Whether to run enrichment immediately after area creation. Default is True.325job_name (str): Name of the metadata enrichment job. Default is None.326job_schedule (str): Schedule for the metadata enrichment job. Default is None.327publish_job_id (str): ID of the metadata publish job. Default is None.328publish_job_name (str): Name of the metadata publish job. Default is None.329publish_job_schedule (str): Schedule for the metadata publish job. Default is None.330tags (list): List of tags. Default is None.331332Returns:333dict: The response of the metadata enrichment creation if successful, otherwise None.334"""335336337bearer_token = self.get_bearer_token()338339headers = {340'Authorization': f'Bearer {bearer_token}',341'Content-Type': 'application/json'342}343344payload = {345"name": name,346"objective": self.mde_objective,347"target_catalog_id": self.catalog_id,348"data_scope": {349"enrichment_assets": enrichment_assets,350"container_assets": {351"metadata_import": [mdi_id]352}353},354"enrichImmediate": enrichImmediate355}356357if description:358payload["description"] = description359360if job_name or job_schedule:361payload["job"] = {}362if job_name:363payload["job"]["name"] = job_name364if job_schedule:365payload["job"]["schedule"] = job_schedule366367if publish_job_id or publish_job_name or publish_job_schedule:368payload["publish_job"] = {}369if publish_job_id:370payload["publish_job"]["id"] = publish_job_id371if publish_job_name:372payload["publish_job"]["name"] = publish_job_name373if publish_job_schedule:374payload["publish_job"]["schedule"] = publish_job_schedule375376if tags:377payload["data_scope"]["tags"] = tags378379url = f'https://{self.cpd_cluster_host}/v2/metadata_enrichment/metadata_enrichment_area?project_id={self.project_id}'380381response = requests.post(url, json=payload, headers=headers, verify=False)382try:383response_json = response.json()384print(f'{name}: Metadata Enrichment Creation Response: {response.status_code}')385return response_json386except requests.JSONDecodeError:387print(f'Error in Metadata Enrichment Creation Response: {response.status_code}, {response.text}')388return None389390391def update_mde_settings(self):392"""393Update the settings for metadata enrichment.394395Args:396mde_settings (dict): The metadata enrichment settings.397398Returns:399dict: The updated settings data.400"""401402# Metadata enrichment settings403mde_settings = {404'advanced_profiling': {'unique_value_table': {'count': 1000}},405'semantic_expansion': {'name_expansion': True,406'name_expansion_configuration': {'assignment_threshold': 0.9,407'suggestion_threshold': 0.75},408'description_generation': True,409'description_generation_configuration': {'assignment_threshold': 0.9,410'suggestion_threshold': 0.75}},411'term_assignment': {'class_based_assignments': False,412'term_assignment_threshold': 0.9,413'term_suggestion_threshold': 0.75,414'name_matching': True,415'ml_based_assignments_default': False,416'ml_based_assignments_custom': False,417'evaluate_negative_assignments': True,418'default_ml_configuration': {'catalog_id': self.catalog_id,419'scope': 'catalog'},420'llm_based_assignments': False},421'structured_profiling': {'null_threshold': 0.05,422'uniqueness_threshold': 0.95,423'constant_threshold': 0.99,424'quality_score_threshold': 0.8,425'data_class_assignment_threshold': 0.75,426'data_class_suggestion_threshold': 0.25,427'dq_exceptions_database': {'count': 100}},428'key_analysis': {'pk_shallow_analysis_config': {'min_confidence': 0.8},429'fk_shallow_analysis_config': {'min_confidence': 0.8,430'auto_selection': False,431'auto_selection_threshold': 0.9}}432}433434435436headers = {437'Authorization': f'Bearer {self.get_bearer_token()}',438'Content-Type': 'application/json'439}440441endpoint = f"https://{self.cpd_cluster_host}/v2/metadata_enrichment/metadata_enrichment_area/settings"442params = {443'project_id': self.project_id444}445446response = requests.put(endpoint, headers=headers, params=params, json=mde_settings)447448# Print response status code and text for debugging449if response.status_code == 200:450print("Metadata enrichment settings updated successfully.")451return None452453454def get_metadata_import_details(self, metadata_import_id):455"""456Retrieve details of a metadata import.457458Args:459metadata_import_id (str): Id of the metadata import asset.460project_id (str): Id of the project.461462Returns:463dict: Details of the metadata import if the request is successful, otherwise None.464"""465url = f"{self.base_url}/v2/metadata_imports/{metadata_import_id}"466467# Query parameters468params = {469'project_id': self.project_id470}471472# Headers473headers = {474'Content-Type': 'application/json',475'Authorization': f'Bearer {self.get_bearer_token()}'476}477478response = requests.get(url, headers=headers, params=params, verify=False)479480if response.status_code == 200:481return response.json()482else:483print(f"Failed to retrieve metadata import details: {response.status_code}")484print(response.text)485return None486487def create_and_run_metadata_import(self, connection_id, paths_scope, name="Metadata Import"):488"""489Create a metadata import, create a job, patch the metadata import, and run the job.490491Args:492connection_id (str): The connection ID for the import.493paths_scope (list): List of paths for the metadata import scope.494name (str): Name of the metadata import. Defaults to "Metadata Import".495496Returns:497dict: The response of the job run if successful, otherwise None.498"""499# Replace with actual values500cpd_url = self.cpd_cluster_host501project_id = self.project_id502503# Get the bearer token504bearer_token = self.get_bearer_token()505506headers = {507'Authorization': f'Bearer {bearer_token}',508'Content-Type': 'application/json'509}510511# 1. Create an MDI512mdi_url = f'https://{cpd_url}/v2/metadata_imports?project_id={project_id}'513mdi_payload = {514"connection_id": connection_id,515"name": name,516"project_id": project_id,517"target_project_id": project_id,518"import_type": "metadata",519"scope": {"paths": paths_scope},520"extra_properties": {521"include_primary_key": "true",522"exclude_tables": "false",523"metadata_from_catalog_table_only": "true",524"include_foreign_key": "false",525"exclude_views": "false"526}527}528mdi_response = requests.post(mdi_url, json=mdi_payload, headers=headers)529try:530mdi_response_json = mdi_response.json()531print(f'MDI Creation Response: {mdi_response.status_code}')532except requests.JSONDecodeError:533print(f'Error in MDI Creation Response: {mdi_response.status_code}, {mdi_response.text}')534mdi_response_json = {}535536mdi_id = mdi_response_json.get('metadata', {}).get('asset_id')537538539# Proceed only if MDI creation was successful540if mdi_id:541# 2. Create a job542job_url = f'https://{cpd_url}/v2/jobs?project_id={project_id}'543job_payload = {544"job": {545"asset_ref": mdi_id,546"name": name+" Job",547"description": " ",548"configuration": {}549}550}551job_response = requests.post(job_url, json=job_payload, headers=headers)552try:553job_response_json = job_response.json()554print(f'Job Creation Response: {job_response.status_code}')555556except requests.JSONDecodeError:557print(f'Error in Job Creation Response: {job_response.status_code}, {job_response.text}')558job_response_json = {}559560job_id = job_response_json.get('metadata', {}).get('asset_id')561562# Proceed only if job creation was successful563if job_id:564# 3. Patch metadata Import with the job id565patch_url = f'https://{cpd_url}/v2/metadata_imports/{mdi_id}?project_id={project_id}'566patch_payload = {567"job_id": job_id568}569patch_response = requests.patch(patch_url, json=patch_payload, headers=headers)570try:571patch_response_json = patch_response.json()572print(f'Patch MDI Response: {patch_response.status_code}')573except requests.JSONDecodeError:574print(f'Error in Patch MDI Response: {patch_response.status_code}, {patch_response.text}')575patch_response_json = {}576577# Proceed only if patching was successful578if patch_response.status_code == 200:579# 4. Run the Job (create a job-run)580run_url = f'https://{cpd_url}/v2/jobs/{job_id}/runs?project_id={project_id}&job_id={job_id}'581run_payload = {582"job_run": {}583}584run_response = requests.post(run_url, json=run_payload, headers=headers)585try:586run_response_json = run_response.json()587print(f'Run Job Response: {run_response.status_code}')588print()589return mdi_id, run_response_json590except requests.JSONDecodeError:591print(f'Error in Run Job Response: {run_response.status_code}, {run_response.text}')592print('Detailed error information:', run_response.text)593else:594print('Failed to patch MDI with job ID, terminating the process.')595else:596print('Job creation failed, terminating the process.')597else:598print('MDI creation failed, terminating the process.')599600return None601602603def verify_vars(self):604"""605Verify and print the environment variables and instance attributes.606"""607vars_dict = self.__dict__608for key, value in vars_dict.items():609if not key.startswith('__') and not callable(value):610print(f"{bold_blue_start}{key}{reset}: {value}")611612def get_bearer_token(self):613"""614Obtain a bearer token for authentication.615616Returns:617str: The bearer token if the request is successful, otherwise None.618"""619url = f"https://{self.cpd_cluster_host}/icp4d-api/v1/authorize"620headers = {621"cache-control": "no-cache",622"content-type": "application/json"623}624payload = {625"username": self.username,626"password": self.password627}628629response = requests.post(url, headers=headers, data=json.dumps(payload), verify=False)630631if response.status_code == 200:632data = response.json()633return data.get('token') # Return the token634else:635print(f"Request failed with status code {response.status_code}")636print(response.text)637return None638639640def get_connections(self, bearer):641"""642Returns the connections active in catalog.643644Returns:645str: The catalogs if the request is successful, otherwise None.646"""647url = f"https://{self.cpd_cluster_host}/v2/connections" # Ensure this is the correct endpoint648headers = {649"Inject-Token": "false", # Update to "true" if token injection is needed650"Authorization": f"Bearer {bearer}", # Insert the actual bearer token651"cache-control": "no-cache",652"content-type": "application/json"653}654655# Query parameters656params = {657"project_id": self.project_id,658"sort": "-metadata.create_time",659"limit": 50660}661662response = requests.get(url, headers=headers, params=params, verify=False)663664if response.status_code == 200:665#print("Authenticated request was successful")666data = response.json()667#print(json.dumps(data, indent=4))668return data669else:670print(f"Authenticated request failed with status code {response.status_code}")671print(response.text)672673674def define_cos_connection(self, bearer, catalog=True):675"""676Define a connection to Cloud Object Storage (COS).677678Args:679bearer (str): Bearer token for authentication.680"""681if catalog == True:682url = f"{self.base_url}/v2/connections?catalog_id={self.catalog_id}"683else:684url = f"{self.base_url}/v2/connections?project_id={self.project_id}"685686headers = {687"Authorization": f"Bearer {bearer}",688"cache-control": "no-cache",689"content-type": "application/json",690"Skip-Enforcement": "false"691}692693# Request body694payload = {695"datasource_type": "193a97c1-4475-4a19-b90c-295c4fdc6517",696"name": self.cos_name,697"description": self.cos_description,698"properties": {699"bucket": self.cos_bucket,700"secret_key": self.cos_secret_key,701"trust_all_ssl_cert": "false",702"auth_method": "instanceid_apikey_accesskey_secretkey",703"iam_url": "https://iam.cloud.ibm.com/identity/token",704"api_key": self.cos_api_key,705"resource_instance_id": self.cos_resource_instance_id,706"access_key": self.cos_access_key,707"url": self.cos_url708},709"origin_country": self.origin_country,710"data_source_definition_searchable": f"{self.cos_url}|{self.cos_bucket}"711}712713response = requests.post(url, headers=headers, data=json.dumps(payload), verify=False)714715if response.status_code == 200 or response.status_code == 201:716print(f"Connection to {bold_blue_start}Cloud Object Storage{reset} defined successfully!")717data = response.json()718self.cos_id = data.get('metadata', {}).get('asset_id')719else:720print(f"Failed to define connection with status code {response.status_code}")721print(response.text)722723def define_db2_connection(self, bearer, catalog=True):724"""725Define a connection to a DB2 Warehouse.726727Args:728bearer (str): Bearer token for authentication.729"""730if catalog == True:731url = f"{self.base_url}/v2/connections?catalog_id={self.catalog_id}"732else:733url = f"{self.base_url}/v2/connections?project_id={self.project_id}"734735headers = {736"Authorization": f"Bearer {bearer}",737"cache-control": "no-cache",738"content-type": "application/json",739"Skip-Enforcement": "false"740}741742# Request body743payload = {744"datasource_type": self.db2_datasource_type,745"name": self.db2_name,746"description": self.db2_description,747"properties": {748"database": self.database,749"auth_method": "username_password",750"password": self.db_password,751"port": self.db_port, # Use the correct port752"host": self.host,753"ssl": "true",754"username": self.db_username755},756"origin_country": self.origin_country,757"data_source_definition_searchable": f"{self.host}|50001|{self.database}"758}759760response = requests.post(url, headers=headers, data=json.dumps(payload), verify=False)761762if response.status_code == 200 or response.status_code == 201:763print(f"Connection to {bold_blue_start}DB2 Warehouse{reset} defined successfully!")764data = response.json()765self.db2_id = data.get('metadata', {}).get('asset_id')766else:767print(f"Failed to define connection with status code {response.status_code}")768print(response.text)769770def define_psql_connection(self, bearer,catalog=True):771"""772Define a connection to a PostgreSQL Database.773774Args:775bearer (str): Bearer token for authentication.776"""777if catalog == True:778url = f"{self.base_url}/v2/connections?catalog_id={self.catalog_id}"779else:780url = f"{self.base_url}/v2/connections?project_id={self.project_id}"781782headers = {783"Authorization": f"Bearer {bearer}",784"cache-control": "no-cache",785"content-type": "application/json",786"Skip-Enforcement": "false"787}788789# Request body790payload = {791"datasource_type": self.psql_datasource_type,792"name": self.psql_name,793"description": self.psql_description,794"properties": {795"database": self.psql_database,796"password": self.psql_password,797"port": self.psql_port,798"host": self.psql_host,799"username": self.psql_username800},801"origin_country": self.origin_country,802"data_source_definition_searchable": f"{self.host}|{self.db_port}|{self.database}"803}804805response = requests.post(url, headers=headers, data=json.dumps(payload), verify=False)806807if response.status_code == 200 or response.status_code == 201:808print(f"Connection to {bold_blue_start} PostgreSQL{reset} defined successfully!")809data = response.json()810self.psql_id = data.get('metadata', {}).get('asset_id')811else:812print(f"Failed to define connection with status code {response.status_code}")813print(response.text)814815816def import_governance_artifacts(self, zip_file_path):817"""818Import governance artifacts from a ZIP file.819820Args:821zip_file_path (str): Path to the ZIP file containing governance artifacts.822823Returns:824str: Process ID if the import is in progress, otherwise None.825"""826bearer = self.get_bearer_token()827if not bearer:828print("Failed to obtain bearer token.")829return None830831url = f"{self.base_url}/v3/governance_artifact_types/import"832headers = {833"Authorization": f"Bearer {bearer}"834}835836with open(zip_file_path, 'rb') as file:837files = {'file': file}838response = requests.post(url, headers=headers, files=files, params={'merge_option': 'specified'}, verify=False)839840if response.status_code in [200, 201]:841print("Imported governance artifacts successfully.")842return None843elif response.status_code == 202:844process_id = response.json().get("process_id")845print(f"Process ID: {process_id}")846return process_id847else:848print(f"Failed to import governance artifacts with status code {response.status_code}")849print(response.text)850return None851852def check_import_status(self, process_id):853"""854Check the status of the import process.855856Args:857process_id (str): Process ID of the import process.858859Returns:860dict: Status data if the request is successful, otherwise None.861"""862bearer = self.get_bearer_token()863if not bearer:864print("Failed to obtain bearer token.")865return None866867url = f"{self.base_url}/v3/governance_artifact_types/import/status/{process_id}"868headers = {869"Authorization": f"Bearer {bearer}"870}871872response = requests.get(url, headers=headers, verify=False)873874if response.status_code == 200:875status_data = response.json()876return status_data877else:878print(f"Failed to check import status with status code {response.status_code}")879print(response.text)880return None881882def main_import_process(self, zip_file_path, process_id=None):883"""884Main script for importing governance artifacts and checking status.885886Args:887zip_file_path (str): Path to the ZIP file containing governance artifacts.888process_id (str, optional): Process ID of the ongoing import process.889"""890start_time = datetime.now()891if not process_id:892process_id = self.import_governance_artifacts(zip_file_path)893894if process_id:895while True:896status_data = self.check_import_status(process_id)897if not status_data:898break899900status = status_data.get("status")901step_number = status_data.get("step_number")902total_steps = status_data.get("total_steps")903904elapsed_time = datetime.now() - start_time905elapsed_time_str = str(timedelta(seconds=elapsed_time.seconds))906907percent_complete = (step_number / total_steps) * 100908print(f"Governance Artifacts Import Progress: {percent_complete:.2f}%")909print(f"Time elapsed: {elapsed_time_str}")910911if status == "SUCCEEDED" or step_number == total_steps:912success_message=f"✅✅✅ {bold_green_start}Governance Artifacts import process completed successfully.{reset} ✅✅✅"913print(success_message)914break915elif status == "FAILED":916fail_message=f"❌😢❌😢❌ {bold_red_start}Import process failed.{reset} ❌😢❌😢❌"917cowsay.beavis(fail_message)918break919else:920wip_message = "🚧🚧 Governance Artifacts import process is still in progress. Checking again in 60 seconds... 🚧🚧"921print(wip_message)922print("")923time.sleep(60)924925def get_catalogs(self, bearer):926"""927Get the list of catalogs.928929Args:930bearer (str): Bearer token for authentication.931932Returns:933list: List of catalogs with their GUID and name.934"""935url = f"{self.base_url}/v2/catalogs"936headers = {937"Authorization": f"Bearer {bearer}",938"cache-control": "no-cache",939"content-type": "application/json"940}941942response = requests.get(url, headers=headers, verify=False)943944if response.status_code == 200:945catalogs_data = response.json()946catalogs = catalogs_data.get("catalogs", [])947catalog_info = [{"guid": catalog["metadata"]["guid"], "name": catalog["entity"]["name"]} for catalog in catalogs]948return catalog_info949else:950print(f"Failed to get catalogs with status code {response.status_code}")951print(response.text)952return None953954def get_catalog_id_by_name(self):955"""956Get the catalog ID based on the catalog name.957958Returns:959str: Catalog ID if found, otherwise None.960"""961bearer_token = self.get_bearer_token()962if bearer_token:963catalogs = self.get_catalogs(bearer_token)964if catalogs:965for catalog in catalogs:966if catalog["name"] == self.catalog_name:967return catalog["guid"]968print(f"Catalog with name {self.catalog_name} not found.")969return None970971def run_main_import_process(self):972"""973Main script for importing governance artifacts and checking status.974"""975self.verify_vars()976977978# Check Bearer Token is valid979self.bearer_token = self.get_bearer_token()980981print("")982983# Define Cloud Object Storage Connection984self.define_cos_connection(self.bearer_token, catalog=False)985986print("")987988# Define DB2 Warehouse Connection989self.define_db2_connection(self.bearer_token, catalog=False)990print("")991992# Define PostgreSQL Connection993self.define_psql_connection(self.bearer_token, catalog=False)994print("")995996# Import Governance Artifacts: Categories, Business Terms, Policies, and Rules997self.main_import_process("governance_artifacts.zip", process_id=None)998print("Governance artifacts import started...")999print("")1000print("Beginning metadata import!")1001# Create and run the metadata import DB2 Warehouse1002path_db2=["/EMPLOYEE/EMPLOYEE_HISTORY","/EMPLOYEE/EMPLOYEE_RECORDS","/EMPLOYEE/EMPLOYEE_SUMMARY","/EMPLOYEE/EMPLOYEE"]1003db2_mdid, db2_mdi_response = self.create_and_run_metadata_import(self.db2_id, path_db2, name="DB2 Metadata Import")1004print("DB2 Warehouse metadata import completed!")10051006# Create and run the metadata import COS1007path_cos = ["/cpd-outcomes/Warehouse/WAREHOUSE_ASSIGNED_SHIFTS.csv","/cpd-outcomes/Warehouse/WAREHOUSE_SHIFTS.csv","/cpd-outcomes/Warehouse/WAREHOUSE_STAFF.csv","/cpd-outcomes/Warehouse/WAREHOUSE_STAFFING.csv"]1008cos_mdid, cos_mdi_response = self.create_and_run_metadata_import(self.cos_id, path_cos, name="Cloud Object Storage Metadata Import")1009print("Cloud Object Storage metadata import completed!")10101011# Create and run the metadata import PostgreSQL1012path_psql = ["/CUSTOMER/CUSTOMER_LOYALTY"]1013psql_mdid, psql_mdi_response = self.create_and_run_metadata_import(self.psql_id, path_psql, name="PostgreSQL Metadata Import")1014print("PostgreSQL metadata import completed!")1015print("")1016time.sleep(90)1017print("Beginning metadata enrichment...")10181019# Run metadata enrichment1020db2_result = self.create_and_run_metadata_enrichment(1021name="Db2 Warehouse MDE",1022mdi_id=db2_mdid,1023job_name=self.db2_name + " Enrichment Job",1024publish_job_name=self.db2_name + " publish Job",1025)1026print("DB2 Warehouse metadata enrichment completed!")10271028cos_result = self.create_and_run_metadata_enrichment(1029name="Cloud Object Storage Enrichment",1030mdi_id=cos_mdid,1031job_name=self.cos_name + " Enrichment Job",1032publish_job_name=self.cos_name + " publish Job",1033)1034print("Cloud Object Storage metadata enrichment completed!")10351036psql_result = self.create_and_run_metadata_enrichment(1037name="PostgreSQLMDE",1038mdi_id=psql_mdid,1039job_name=self.psql_name + " Enrichment Job",1040publish_job_name=self.psql_name + " publish Job",1041)1042print("PostgreSQL metadata enrichment completed!")10431044time.sleep(300)1045print("")1046print("Beginning metadata enrichment asset publishing")10471048# Publish metadata enrichment assets1049db2_mde_id = db2_result.get("metadata", {}).get("asset_id")1050db2_publish_result = self.publish_metadata_enrichment_assets(db2_mde_id)1051print("DB2 Warehouse metadata enrichment assets published!")10521053cos_mde_id = cos_result.get("metadata", {}).get("asset_id")1054cos_publish_result = self.publish_metadata_enrichment_assets(cos_mde_id)1055print("Cloud Object Storage metadata enrichment assets published!")10561057psql_mde_id = psql_result.get("metadata", {}).get("asset_id")1058psql_publish_result = self.publish_metadata_enrichment_assets(psql_mde_id)1059print("PostgreSQL metadata enrichment assets published!")1060print("")1061cowsay.stegosaurus("Congratulations! You have successfully imported governance artifacts, add data connections and enriched metadata! You can now begin the Data Product Hub portion of the lab.")1062# Example usage:1063# client = ImportClient()1064# client.verify_vars()1065# client.main_import_process('/path/to/your/artifacts.zip')106610671068