Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/scenario-notebooks/Automated-Notebooks/AutomationGallery-CredentialScanOnAzureBlobStorage.ipynb
3253 views
Kernel: Synapse PySpark

Automation Gallery - Credential Scan on Azure Blob Storage

Notebook Version: 1.0
Python Version: Python 3.8
Apache Spark Version: 3.1
Required Packages: No
Platforms Supported: Azure Synapse Analytics

Data Source Required: No

Description

This notebook provides step-by-step instructions and sample code to detect credential leak into Azure Blob Storage using Azure SDK for Python.
*** No need to download and install any other Python modules. ***
*** Please run the cells sequentially to avoid errors. Please do not use "run all cells". ***

Table of Contents

  1. Warm-up

  2. Authentication to Azure Storage

  3. Scan Azure Blob for Leaking Credentials

  4. Save result to Microsoft Sentinel Dynamic Summaries

1. Warm-up

# Load Python libraries that will be used in this notebook from azure.mgmt.storage import StorageManagementClient from azure.identity import DefaultAzureCredential from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__ from azure.identity import ClientSecretCredential from datetime import datetime, timedelta, timezone import json from json import JSONEncoder from IPython.display import display, HTML, Markdown import re import pandas as pd
# Functions will be used in this notebook def get_file_content(blob): "Decoding file content" try: content = blob.content_as_text(max_concurrency=1, encoding='UTF-8') except UnicodeDecodeError: content = blob.content_as_text(max_concurrency=1, encoding='UTF-16') except Exception as ex: print(ex) content= "" return content def get_regex_list(): "This function return RegEx list for credscan" regex_list = [ "(?i)(ida:password|IssuerSecret|(api|client|app(lication)?)[_\\- ]?(key|secret)[^,a-z]|\\.azuredatabricks\\.net).{0,10}(dapi)?[a-z0-9/+]{22}", "(?i)(x-api-(key|token).{0,10}[a-z0-9/+]{40}|v1\\.[a-z0-9/+]{40}[^a-z0-9/+])", "(?-i:)\\WAIza(?i)[a-z0-9_\\\\\\-]{35}\\W", "(?i)(\\Wsig\\W|Secret(Value)?|IssuerSecret|(\\Wsas|primary|secondary|management|Shared(Access(Policy)?)?).?Key|\\.azure\\-devices\\.net|\\.(core|servicebus|redis\\.cache|accesscontrol|mediaservices)\\.(windows\\.net|chinacloudapi\\.cn|cloudapi\\.de|usgovcloudapi\\.net)|New\\-AzureRedisCache).{0,100}([a-z0-9/+]{43}=)", "(?i)visualstudio\\.com.{1,100}\\W(?-i:)[a-z2-7]{52}\\W", "(?i)se=2021.+sig=[a-z0-9%]{43,63}%3d", "(?i)(x-functions-key|ApiKey|Code=|\\.azurewebsites\\.net/api/).{0,100}[a-z0-9/\\+]{54}={2}", "(?i)code=[a-z0-9%]{54,74}(%3d){2}", "(?i)(userpwd|publishingpassword).{0,100}[a-z0-9/\\+]{60}\\W", "(?i)[^a-z0-9/\\+][a-z0-9/\\+]{86}==", "(?-i:)\\-{5}BEGIN( ([DR]SA|EC|OPENSSH|PGP))? PRIVATE KEY( BLOCK)?\\-{5}", "(?i)(app(lication)?|client)[_\\- ]?(key(url)?|secret)([\\s=:>]{1,10}|[\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2})[^\\-]", "(?i)refresh[_\\-]?token([\\s=:>]{1,10}|[\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2})(\"data:text/plain,.+\"|[a-z0-9/+=_.-]{20,200})", "(?i)AccessToken(Secret)?([\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2}|[\\s=:>]{1,10})[a-z0-9/+=_.-]{20,200}", "(?i)[a-z0-9]{3,5}://[^%:\\s\"'/][^:\\s\"'/\\$]+[^:\\s\"'/\\$%]:([^%\\s\"'/][^@\\s\"'/]{0,100}[^%\\s\"'/])@[\\$a-z0-9:\\.\\-_%\\?=/]+", "(?i)snmp(\\-server)?\\.exe.{0,100}(priv|community)", "(?i)(ConvertTo\\-?SecureString\\s*((\\(|\\Wstring)\\s*)?['\"]+)", "(?i)(Consumer|api)[_\\- ]?(Secret|Key)([\\s=:>]{1,10}|[\\s\"':=|>,\\]]{3,15}|[\"'=:\\(]{2})[^\\s]{5,}", "(?i)authorization[,\\[:= \"']+([dbaohmnsv])", "(?i)-u\\s+.{2,100}-p\\s+[^\\-/]", "(?i)(amqp|ssh|(ht|f)tps?)://[^%:\\s\"'/][^:\\s\"'/\\$]+[^:\\s\"'/\\$%]:([^%\\s\"'/][^@\\s\"'/]{0,100}[^%\\s\"'/])@[\\$a-z0-9:\\.\\-_%\\?=/]+", "(?i)(\\Waws|amazon)?.{0,5}(secret|access.?key).{0,10}\\W[a-z0-9/\\+]{40}", "(?-i:)(eyJ0eXAiOiJKV1Qi|eyJhbGci)", "(?i)@(\\.(on)?)?microsoft\\.com[ -~\\s]{1,100}?(\\w?pass\\w?)", "(?i)net(\\.exe)?.{1,5}(user\\s+|share\\s+/user:|user-?secrets? set)\\s+[a-z0-9]", "(?i)xox[pbar]\\-[a-z0-9]", "(?i)[\":\\s=]((x?corp|extranet(test)?|ntdev)(\\.microsoft\\.com)?|corp|redmond|europe|middleeast|northamerica|southpacific|southamerica|fareast|africa|exchange|extranet(test)?|partners|parttest|ntdev|ntwksta)\\W.{0,100}(password|\\Wpwd|\\Wpass|\\Wpw\\W|userpass)", "(?i)(sign_in|SharePointOnlineAuthenticatedContext|(User|Exchange)Credentials?|password)[ -~\\s]{0,100}?@([a-z0-9.]+\\.(on)?)?microsoft\\.com['\"]?", "(?i)(\\.database\\.azure\\.com|\\.database(\\.secure)?\\.windows\\.net|\\.cloudapp\\.net|\\.database\\.usgovcloudapi\\.net|\\.database\\.chinacloudapi\\.cn|\\.database.cloudapi.de).{0,100}(DB_PASS|(sql|service)?password|\\Wpwd\\W)", "(?i)(secret(.?key)?|password)[\"']?\\s*[:=]\\s*[\"'][^\\s]+?[\"']", "(?i)[^a-z\\$](DB_USER|user id|uid|(sql)?user(name)?|service\\s?account)\\s*[^\\w\\s,]([ -~\\s]{2,120}?|[ -~]{2,30}?)([^a-z\\s\\$]|\\s)\\s*(DB_PASS|(sql|service)?password|pwd)", "(?i)(password|secret(key)?)[ \\t]*[=:]+[ \\t]*([^:\\s\"';,<]{2,200})", ] return regex_list def convert_result_to_string(result_row): if (type(result_row)) == str: return result_row elif (type(result_row)) == tuple: return ','.join([m for m in result_row if len(m) > 0]) def file_modified_date_check(days_back, modified_date): aware_local_now = datetime.now(timezone.utc).astimezone() time_between_modified = aware_local_now - modified_date return time_between_modified.days < days_back class file_scan_result: """ This class is for handling scan result for each file. """ def __init__(self, file_name, file_last_modified): self.file_name = file_name self.results = {} def add_result(self, key, value): """ Add result to the dictionary, key is regex string, value will be list """ self.results[key] = value class result_encoder(JSONEncoder): def default(self, o): return o.__dict__ from msrest.authentication import BasicTokenAuthentication from azure.core.pipeline.policies import BearerTokenCredentialPolicy from azure.core.pipeline import PipelineRequest, PipelineContext from azure.core.pipeline.transport import HttpRequest from azure.identity import DefaultAzureCredential class AzureIdentityCredentialAdapter(BasicTokenAuthentication): def __init__(self, credential=None, resource_id="https://management.azure.com/.default", **kwargs): """Adapt any azure-identity credential to work with SDK that needs azure.common.credentials or msrestazure. Default resource is ARM (syntax of endpoint v2) :param credential: Any azure-identity credential (DefaultAzureCredential by default) :param str resource_id: The scope to use to get the token (default ARM) """ super(AzureIdentityCredentialAdapter, self).__init__(None) if credential is None: credential = DefaultAzureCredential() self._policy = BearerTokenCredentialPolicy(credential, resource_id, **kwargs) def _make_request(self): return PipelineRequest( HttpRequest( "AzureIdentityCredentialAdapter", "https://fakeurl" ), PipelineContext(None) ) def set_token(self): """Ask the azure-core BearerTokenCredentialPolicy policy to get a token. Using the policy gives us for free the caching system of azure-core. We could make this code simpler by using private method, but by definition I can't assure they will be there forever, so mocking a fake call to the policy to extract the token, using 100% public API.""" request = self._make_request() self._policy.on_request(request) # Read Authorization, and get the second part after Bearer token = request.http_request.headers["Authorization"].split(" ", 1)[1] self.token = {"access_token": token} def get_token(self): """Get access token.""" return self.token def signed_session(self, session=None): self.set_token() return super(AzureIdentityCredentialAdapter, self).signed_session(session)
import uuid import requests class DynamicSummary(): """ Dynamic Summary object model """ @staticmethod def get_new_guid(): """ generate new GUID """ return uuid.uuid4() def __init__(self, summary_id): self.summary_id = summary_id def serialize(self): serialized_str = '"summaryId": "' + self.summary_id + '", "summaryName": "' + self.summary_name + '", "azureTenantId": "' + self.azure_tenant_id + '", "summaryDescription": "' + self.summary_description + '"' if hasattr(self, 'relation_name') and self.relation_name != None: serialized_str += ', "relationName": "' + self.relation_name + '"' if hasattr(self, 'relation_id') and self.relation_id != None: serialized_str += ', "relationId": "' + self.relation_id + '"' if hasattr(self, 'search_key') and self.search_key != None: serialized_str += ', "searchKey": "' + self.search_key + '"' if hasattr(self, 'tactics') and self.tactics != None: serialized_str += ', "tactics": "' + self.tactics + '"' if hasattr(self, 'techniques') and self.techniques != None: serialized_str += ', "techniques": "' + self.techniques + '"' if hasattr(self, 'source_info') and self.source_info != None: serialized_str += ', "sourceInfo": "' + self.source_info + '"' if hasattr(self, 'summary_items') and self.summary_items != None: serialized_str += ', "rawContent": "[' + DynamicSummary.serializeItems(self.summary_items) + ']"' return serialized_str def serializeItems(items): raw_content = '' isFirst = True for item in items: if isFirst == True: isFirst = False else: raw_content += ',' raw_content += json.dumps(DynamicSummary.serializeItem(item)).strip('"') return raw_content def serializeItem(item): serialized_item_tsr = '{' serialized_item_tsr += '"summaryItemId": "' + item.summary_item_id.urn[9:] + '"' if hasattr(item, 'relation_name') and item.relation_name != None: serialized_item_tsr += ', "relationName": "' + item.relation_name + '"' if hasattr(item, 'relation_id') and item.relation_id != None: seriserialized_item_tsralized_str += ', "relationId" :"' + item.relation_id + '"' if hasattr(item, 'search_key') and item.search_key != None: serialized_item_tsr += ', "searchKey": "' + item.search_key + '"' if hasattr(item, 'tactics') and item.tactics != None: serialized_item_tsr += ', "tactics": "' + item.tactics + '"' if hasattr(item, 'techniques') and item.techniques != None: serialized_item_tsr += ', "techniques": "' + item.techniques + '"' if hasattr(item, 'event_time_utc') and item.event_time_utc != None: serialized_item_tsr += ', "eventTimeUTC" :"' + item.event_time_utc.isoformat() + 'Z"' if hasattr(item, 'observable_type') and item.observable_type != None: serialized_item_tsr += ', "observableType": "' + item.observable_type + '"' if hasattr(item, 'observable_value') and item.observable_value != None: serialized_item_tsr += ', "observableValue": "' + item.observable_value + '"' if hasattr(item, 'packed_content') and item.packed_content != None: serialized_item_tsr += ', "packedContent": ' + item.packed_content serialized_item_tsr += '}' return serialized_item_tsr def construct_summary(self, tenant_id, summary_name, summary_description, items, \ relation_name=None, relation_id=None, search_key=None, tactics=None, techniques=None, source_info=None, **kwargs): """ Building summary level data object """ self.summary_name = summary_name self.azure_tenant_id = tenant_id self.summary_description = summary_description if relation_name != None: self.relation_name = relation_name if relation_id != None: self.relation_id = relation_id if search_key != None: self.search_key = search_key if tactics != None: self.tactics = tactics if techniques != None: self.techniques = techniques if source_info != None: self.source_info = source_info if summary_items != None: self.summary_items = items def construct_summary_item(self, summary_item_id, \ relation_name=None, relation_id=None, search_key=None, tactics=None, techniques=None, event_time_utc=None, observable_type=None, observable_value=None, packed_content=None, **kwargs): """ Building summary item level data object """ item = DynamicSummary(self.summary_id) item.summary_item_id = summary_item_id if relation_name != None: item.relation_name = relation_name if relation_id != None: item.relation_id = relation_id if search_key != None: item.search_key = search_key if tactics != None: item.tactics = tactics if techniques != None: item.techniques = techniques if event_time_utc != None: item.event_time_utc = event_time_utc if observable_type != None: item.observable_type = observable_type if observable_value != None: item.observable_value = observable_value if packed_content != None: item.packed_content = packed_content return item def construct_arm_rest_url(subscription_id, resource_group, workspace_name, summary_guid): "Build URL for Sentinel Dynamic Summaries REST API" api_version = "2023-03-01-preview" provider_name = "Microsoft.OperationalInsights" workspace_provider_name = "Microsoft.SecurityInsights/dynamicSummaries" root_url = "https://management.azure.com" arm_rest_url_template = "{0}/subscriptions/{1}/resourceGroups/{2}/providers/{3}/workspaces/{4}/providers/{5}/{6}?api-version={7}" return arm_rest_url_template.format(root_url, subscription_id, resource_group, provider_name, workspace_name, workspace_provider_name, summary_guid, api_version) def call_azure_rest_api_for_creating_dynamic_summary(token, arm_rest_url, summary): "Calling Microsoft Sentinel REST API" bearer_token = "Bearer " + token headers = {"Authorization": bearer_token, "content-type":"application/json" } response = requests.put(arm_rest_url, headers=headers, data=summary, verify=True) return response def display_result(response): "Display the result set as pandas.DataFrame" if response != None: df = pd.DataFrame(response.json()["value"]) display(df)

2. Authentication to Azure Storage

tenant_id = '' subscription_id = '' akv_name = '' akv_link_name = '' resource_group_name = '' storage_account_name = '' container_name = 'azureml' client_id_name = '' client_secret_name = '' resource_group_name_for_dynamic_summaries = '' sentinel_workspace_name_for_dynamic_summaries = '' dynamic_summary_name = '' dynamic_summary_guid = ''
client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name) client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name) credential = ClientSecretCredential( tenant_id=tenant_id, client_id=client_id, client_secret=client_secret) cred = AzureIdentityCredentialAdapter(credential) access_token = credential.get_token("https://management.azure.com/.default") token = access_token[0]

3. Scan Azure Blob for Leaking Credentials

storage_client = StorageManagementClient(cred, subscription_id=subscription_id) try: storage_keys = storage_client.storage_accounts.list_keys(resource_group_name,storage_account_name) days_back = 500 if storage_keys != None: storage_key = {v.key_name: v.value for v in storage_keys.keys}['key1'] blob_service_client = BlobServiceClient( account_url="https://{0}.blob.core.windows.net".format(storage_account_name), credential=storage_key ) if blob_service_client != None: container_client = blob_service_client.get_container_client(container_name) if container_client != None: blob_list = container_client.list_blobs() result_objects = [] for indexblob, b in enumerate(blob_list): print("Blob name: " + b.name) try: if (file_modified_date_check(days_back, b.last_modified)): blob = container_client.download_blob(b) content = get_file_content(blob) # Run Regex strings on the file content import warnings warnings.filterwarnings('ignore') if content != None: has_leaking = False regex_list = get_regex_list() for indexregex, regex in enumerate(regex_list): result_object = file_scan_result(b.name, b.last_modified); re.compile(regex) results = re.findall(regex, content) matched_contents = [] if results: print("================================================") print("MATCHED REGEX:\n" + regex) print("---------------MATCHED CONTENT -----------------") for result in results: print(str(result)) matched_contents.append(convert_result_to_string(result)) print("================================================") has_leaking = True result_object.add_result("blob" + str(indexblob) + "-regex" + str(indexregex), matched_contents) result_objects.append(result_object.results) if has_leaking == False: print('No leaking data found') except Exception as e: print(e) print("Printing to check how it will look like") print(result_encoder().encode(result_objects)) scan_data = json.dumps(result_objects, indent=4, cls=result_encoder) print(scan_data) else: print("failed on blob service client") except Exception as ex: if str(ex).find("AuthorizationFailed") >= 0: print("========================================================================") print("Error: Service principal has no sufficient permission to perform tasks.") print("========================================================================") raise

4. Save result to Microsoft Sentinel Dynamic Summaries

if dynamic_summary_name != None and dynamic_summary_name != '': summary = DynamicSummary(dynamic_summary_guid) summary_description = "This summary is generated from notebook - AutomationGallery-CredentialScanOnAzureBlobStorage." summary_items = [] if result_objects: for res_obj in result_objects: res_df = pd.DataFrame.from_dict(res_obj) if not res_df.empty: for index, row in res_df.iterrows(): packed_content = res_df.iloc[index].to_json() summary_items.append(summary.construct_summary_item(DynamicSummary.get_new_guid(), None, None, None, None, None, datetime.utcnow(), None, None, packed_content)) summary.construct_summary(tenant_id, dynamic_summary_name, summary_description, summary_items) summary_json = "{ \"properties\": {" + summary.serialize() + "}}" #print(summary_json)
if result_objects and dynamic_summary_name != None and dynamic_summary_name != '': dyn_sum_api_url = DynamicSummary.construct_arm_rest_url(subscription_id, resource_group_name_for_dynamic_summaries, sentinel_workspace_name_for_dynamic_summaries, dynamic_summary_guid) response = DynamicSummary.call_azure_rest_api_for_creating_dynamic_summary(token, dyn_sum_api_url, summary_json) print(response.status_code)