Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/Credential Scan on Azure Blob Storage.ipynb
3249 views
Kernel: Python 3.10 - SDK v2

Credential Scan on Azure Blob Storage

Notebook Version: 1.1
Python Version: Python 3.10 - SDK v2
Required Packages: No
Platforms Supported: Azure Machine Learning Notebooks

Data Source Required: No

Description

This notebook provides step-by-step instructions and sample code to detect credential leak into Azure Blob Storage using Azure SDK for Python.
*** No need to download and install any other Python modules. ***
*** Please run the cells sequentially to avoid errors. Please do not use "run all cells". ***

Table of Contents

  1. Warm-up

  2. Authentication to Azure Storage

  3. Scan Azure Blob for Leaking Credentials

1. Warm-up

# If you need to know what Python modules are available, you may run this: # help("modules") %pip install ipywidgets
# Load Python libraries that will be used in this notebook from azure.mgmt.storage import StorageManagementClient from azure.identity import DefaultAzureCredential from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__ from azure.mgmt.resource import ResourceManagementClient from azure.identity import AzureCliCredential import time import json import os import csv import ipywidgets from IPython.display import display, HTML, Markdown import re
# Functions will be used in this notebook def read_config_values(file_path): "This loads pre-generated parameters for Microsoft Sentinel Workspace" with open(file_path) as json_file: if json_file: json_config = json.load(json_file) return (json_config["tenant_id"], json_config["subscription_id"], json_config["resource_group"], json_config["workspace_id"], json_config["workspace_name"], json_config["user_alias"], json_config["user_object_id"]) return None def has_valid_token(): "Check to see if there is a valid AAD token" try: error = "Please run 'az login'" expired = "AADSTS70043: The refresh token has expired or is invalid" failed = "failed" validator = !az account get-access-token if any(expired in item for item in validator.get_list()): return '**The refresh token has expired. <br> Please continue your login process. Then: <br> 1. If you plan to run multiple notebooks on the same compute instance today, you may restart the compute instance by clicking "Compute" on left menu, then select the instance, clicking "Restart"; <br> 2. Otherwise, you may just restart the kernel from top menu. <br> Finally, close and re-load the notebook, then re-run cells one by one from the top.**' elif any(error in item for item in validator.get_list()) or any(failed in item for item in validator.get_list()): return "Please run 'az login' to setup account" else: return None except: return "Please login" def get_file_content(blob): "Decoding file content" try: content = blob.content_as_text(max_concurrency=1, encoding='UTF-8') except UnicodeDecodeError: content = blob.content_as_text(max_concurrency=1, encoding='UTF-16') return content def get_regex_list(): "This function return RegEx list for credscan" regex_list = [ "(?i)(ida:password|IssuerSecret|(api|client|app(lication)?)[_\\- ]?(key|secret)[^,a-z]|\\.azuredatabricks\\.net).{0,10}(dapi)?[a-z0-9/+]{22}", "(?i)(x-api-(key|token).{0,10}[a-z0-9/+]{40}|v1\\.[a-z0-9/+]{40}[^a-z0-9/+])", "(?-i:)\\WAIza(?i)[a-z0-9_\\\\\\-]{35}\\W", "(?i)(\\Wsig\\W|Secret(Value)?|IssuerSecret|(\\Wsas|primary|secondary|management|Shared(Access(Policy)?)?).?Key|\\.azure\\-devices\\.net|\\.(core|servicebus|redis\\.cache|accesscontrol|mediaservices)\\.(windows\\.net|chinacloudapi\\.cn|cloudapi\\.de|usgovcloudapi\\.net)|New\\-AzureRedisCache).{0,100}([a-z0-9/+]{43}=)", "(?i)visualstudio\\.com.{1,100}\\W(?-i:)[a-z2-7]{52}\\W", "(?i)se=2021.+sig=[a-z0-9%]{43,63}%3d", "(?i)(x-functions-key|ApiKey|Code=|\\.azurewebsites\\.net/api/).{0,100}[a-z0-9/\\+]{54}={2}", "(?i)code=[a-z0-9%]{54,74}(%3d){2}", "(?i)(userpwd|publishingpassword).{0,100}[a-z0-9/\\+]{60}\\W", "(?i)[^a-z0-9/\\+][a-z0-9/\\+]{86}==", "(?-i:)\\-{5}BEGIN( ([DR]SA|EC|OPENSSH|PGP))? PRIVATE KEY( BLOCK)?\\-{5}", "(?i)(app(lication)?|client)[_\\- ]?(key(url)?|secret)([\\s=:>]{1,10}|[\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2})[^\\-]", "(?i)refresh[_\\-]?token([\\s=:>]{1,10}|[\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2})(\"data:text/plain,.+\"|[a-z0-9/+=_.-]{20,200})", "(?i)AccessToken(Secret)?([\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2}|[\\s=:>]{1,10})[a-z0-9/+=_.-]{20,200}", "(?i)[a-z0-9]{3,5}://[^%:\\s\"'/][^:\\s\"'/\\$]+[^:\\s\"'/\\$%]:([^%\\s\"'/][^@\\s\"'/]{0,100}[^%\\s\"'/])@[\\$a-z0-9:\\.\\-_%\\?=/]+", "(?i)snmp(\\-server)?\\.exe.{0,100}(priv|community)", "(?i)(ConvertTo\\-?SecureString\\s*((\\(|\\Wstring)\\s*)?['\"]+)", "(?i)(Consumer|api)[_\\- ]?(Secret|Key)([\\s=:>]{1,10}|[\\s\"':=|>,\\]]{3,15}|[\"'=:\\(]{2})[^\\s]{5,}", "(?i)authorization[,\\[:= \"']+([dbaohmnsv])", "(?i)-u\\s+.{2,100}-p\\s+[^\\-/]", "(?i)(amqp|ssh|(ht|f)tps?)://[^%:\\s\"'/][^:\\s\"'/\\$]+[^:\\s\"'/\\$%]:([^%\\s\"'/][^@\\s\"'/]{0,100}[^%\\s\"'/])@[\\$a-z0-9:\\.\\-_%\\?=/]+", "(?i)(\\Waws|amazon)?.{0,5}(secret|access.?key).{0,10}\\W[a-z0-9/\\+]{40}", "(?-i:)(eyJ0eXAiOiJKV1Qi|eyJhbGci)", "(?i)@(\\.(on)?)?microsoft\\.com[ -~\\s]{1,100}?(\\w?pass\\w?)", "(?i)net(\\.exe)?.{1,5}(user\\s+|share\\s+/user:|user-?secrets? set)\\s+[a-z0-9]", "(?i)xox[pbar]\\-[a-z0-9]", "(?i)[\":\\s=]((x?corp|extranet(test)?|ntdev)(\\.microsoft\\.com)?|corp|redmond|europe|middleeast|northamerica|southpacific|southamerica|fareast|africa|exchange|extranet(test)?|partners|parttest|ntdev|ntwksta)\\W.{0,100}(password|\\Wpwd|\\Wpass|\\Wpw\\W|userpass)", "(?i)(sign_in|SharePointOnlineAuthenticatedContext|(User|Exchange)Credentials?|password)[ -~\\s]{0,100}?@([a-z0-9.]+\\.(on)?)?microsoft\\.com['\"]?", "(?i)(\\.database\\.azure\\.com|\\.database(\\.secure)?\\.windows\\.net|\\.cloudapp\\.net|\\.database\\.usgovcloudapi\\.net|\\.database\\.chinacloudapi\\.cn|\\.database.cloudapi.de).{0,100}(DB_PASS|(sql|service)?password|\\Wpwd\\W)", "(?i)(secret(.?key)?|password)[\"']?\\s*[:=]\\s*[\"'][^\\s]+?[\"']", "(?i)[^a-z\\$](DB_USER|user id|uid|(sql)?user(name)?|service\\s?account)\\s*[^\\w\\s,]([ -~\\s]{2,120}?|[ -~]{2,30}?)([^a-z\\s\\$]|\\s)\\s*(DB_PASS|(sql|service)?password|pwd)", "(?i)(password|secret(key)?)[ \\t]*[=:]+[ \\t]*([^:\\s\"';,<]{2,200})", ] return regex_list def set_continuation_flag(flag): if flag == False: print("continuation flag is false.") return flag def convert_result_to_string(result_row): if (type(result_row)) == str: return result_row elif (type(result_row)) == tuple: return ','.join([m for m in result_row if len(m) > 0]) def export_csv(file_name, data_list): with open(file_name, 'w') as f: w = csv.writer(f, delimiter = ',') w.writerows([x.split(',') for x in data_list])
# Calling the above function to populate Microsoft Sentinel workspace parameters # The file, config.json, was generated by the system, however, you may modify the values, or manually set the variables tenant_id, subscription_id, resource_group, workspace_id, workspace_name, user_alias, user_object_id = read_config_values('config.json');

2. Authentication to Azure Storage

# Azure CLI is used to get device code to login into Azure, you need to copy the code and open the DeviceLogin site. # You may add [--tenant $tenant_id] to the command if has_valid_token() != None: message = '**The refresh token has expired. <br> Please continue your login process. Then: <br> 1. If you plan to run multiple notebooks on the same compute instance today, you may restart the compute instance by clicking "Compute" on left menu, then select the instance, clicking "Restart"; <br> 2. Otherwise, you may just restart the kernel from top menu. <br> Finally, close and re-load the notebook, then re-run cells one by one from the top.**' display(Markdown(message)) !echo -e '\e[42m' !az login --tenant $tenant_id --use-device-code # Initializing Azure Storage and Azure Resource Python clients storage_client = StorageManagementClient(AzureCliCredential(), subscription_id = subscription_id) resource_client = ResourceManagementClient(AzureCliCredential(), subscription_id = subscription_id) # Set continuation_flag if resource_client == None: continuation_flag = set_continuation_flag(False) else: continuation_flag = set_continuation_flag(True) print('Successfully signed in.')
# Select Azure Resource Group if continuation_flag: group_list = resource_client.resource_groups.list() group_dropdown = ipywidgets.Dropdown(options=sorted([g.name for g in group_list]), description='Groups:') display(group_dropdown)
# Select Azure Storage Account if continuation_flag and group_dropdown.value != None: resource_list = resource_client.resources.list_by_resource_group( group_dropdown.value, filter="resourceType eq 'Microsoft.Storage/storageAccounts'", ) storage_account_dropdown = ipywidgets.Dropdown(options=sorted([r.name for r in resource_list]), description='Accounts:') display(storage_account_dropdown) else: continuation_flag = set_continuation_flag(False)

3. Scan Azure Blob for Leaking Credentials

# Select a blob container for a specified Azure Storage account if continuation_flag and storage_account_dropdown.value != None: storage_keys = storage_client.storage_accounts.list_keys(group_dropdown.value,storage_account_dropdown.value) if storage_keys != None: storage_key = {v.key_name: v.value for v in storage_keys.keys}['key1'] blob_service_client = BlobServiceClient( account_url="https://{0}.blob.core.windows.net".format(storage_account_dropdown.value), credential=storage_key ) if blob_service_client != None: container_list = blob_service_client.list_containers() container_dropdown = ipywidgets.Dropdown(options=sorted([r.name for r in container_list]), description='Containers:') display(container_dropdown) else: continuation_flag = set_continuation_flag(False) else: continuation_flag = set_continuation_flag(False) else: continuation_flag = set_continuation_flag(False)
# Select a blob from a specified blob container if continuation_flag and container_dropdown.value != None: container_client = blob_service_client.get_container_client(container_dropdown.value) if container_client != None: blob_list = container_client.list_blobs() blob_dropdown = ipywidgets.Dropdown(options=sorted([r.name for r in blob_list]), description='Blobs:') display(blob_dropdown) else: continuation_flag = set_continuation_flag(False) else: continuation_flag = set_continuation_flag(False)
# Get blob content if continuation_flag and blob_dropdown.value != None: selected_blob = container_client.download_blob(blob_dropdown.value) if selected_blob != None: content = get_file_content(selected_blob) else: continuation_flag = set_continuation_flag(False) else: continuation_flag = set_continuation_flag(False)
# Run Regex strings on the file content import warnings warnings.filterwarnings('ignore') result_list = [] csv_string = "" if continuation_flag and content != None: has_leaking = False regex_list = get_regex_list() for regex in regex_list: re.compile(regex) results = re.findall(regex, content) if results: print("================================================") print("MATCHED REGEX:\n" + regex) print("------------------------------------------------") print("FILE: " + blob_dropdown.value + "\n") #print(content) print("---------------MATCHED CONTENT -----------------") for result in results: print(str(result)) csv_string = convert_result_to_string(result) result_list.append(csv_string) print("================================================") has_leaking = True if has_leaking == False: print('No leaking data found') else: continuation_flag = set_continuation_flag(False)
# Save results to a csv file in the current file system if continuation_flag and len(result_list) > 0: export_csv("credscan_blob.csv", result_list) else: print("No data")