Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/scenario-notebooks/AffectedKeyCredentials-CVE-2021-42306.ipynb
3250 views
Kernel: Python 3.8 - AzureML

Detecting vulnerable service principals and applications in AAD

This notebook contains logic to detect applications and service principals in Azure Active Directory that are vulnerable to the attack described in CVE-2021-42306.

It contains the following sections

  • Establish a connection to Microsoft Graph

  • Download and check for vulnerable Application and Service Principal objects

  • View and filter the vulnerable items

  • Create a Microsoft Sentinel Watchlist containing the items

Creating a Watchlist will allow you to create detections and alerts based on the IDs of the affected Applications and Service Principals in your Azure Active Directory. This will allow you to spot potential abuses of these objects

Please run the the code cells in sequence. Skipping cells will result in errors.

Initialize MSTICPy

# %pip install --upgrade msticpy from msticpy import init_notebook init_notebook(globals());

Using Azure Key Vault to store the client secret for MS Graph

This is an optional section.

To use the MSTICPy Keyvault library you must have your keyvault details configured in msticpyconfig.yaml. Please see the following documents for more details.

Key Vault configuration format

Key Vault settings in the settings editor

To store you client secret in Key Vault you can use the following code snippet.

kv_client.set_secret( secret_name="[[PLACEHOLDER]]", value="[[VALUE]]" )

You can also configure a secret using the Azure management portal.

from msticpy.common.keyvault_client import BHKeyVaultClient try: kv_client = BHKeyVaultClient() except: kv_client = None print("Key Vault did not initialize correctly.") def get_kv_secret(secret_name): return kv_client.get_secret(secret_name) if kv_client else ""

Specify Parameters for Connecting to MS Graph

Run this cell to display a form for entering details required to connect to Microsoft Graph.

If you are not storing your client secret in Key Vault, uncheck the Get secret from keyvault box and type the client secret into the Client secret box

from msticpy.common.azure_auth_core import AzureCloudConfig try: from msticpy.common.cloud_mappings import _CLOUD_MAPPING as CLOUD_MAPPING except ImportError: from msticpy.common.cloud_mappings import CLOUD_MAPPING import ipywidgets as widgets WIDGET_DEFAULTS["style"]["description_width"] = "200px" WIDGET_DEFAULTS["layout"].width = "80%" def text_box(desc, val="", **kwargs): return widgets.Text(description=desc, value=val, **WIDGET_DEFAULTS, **kwargs) azure_cloud = AzureCloudConfig() heading = widgets.HTML("<h3>Enter your AAD Tenant ID, Azure Cloud and client ID/Secret.</h3>") txt_tenant_id = text_box("TenantId", tenant_id) txt_client_id = text_box("Client App ID", client_id) txt_client_sec_name = text_box("KV secret name", client_id) select_cloud = widgets.Select( description="Cloud", options=list(CLOUD_MAPPING.keys()), value=azure_cloud.cloud, **WIDGET_DEFAULTS, ) cb_use_keyvault = widgets.Checkbox(description="Get secret from keyvault", value=True) def switch_secret_desc(change): del change txt_client_sec_name.description = ( "KV secret name" if cb_use_keyvault.value else "Client secret" ) cb_use_keyvault.observe(switch_secret_desc, names="value") display(widgets.VBox([ heading, txt_tenant_id, select_cloud, txt_client_id, cb_use_keyvault, txt_client_sec_name ]))
VBox(children=(HTML(value='<h3>Enter your AAD Tenant ID, Azure Cloud and client ID/Secret.</h3>'), Text(value=…

Create the Microsoft Graph data provider

Note the current version of the graph provider only supports the global Azure cloud. The code below will adapt it if you have a different sovereign cloud configured in your msticpconfig.yaml.

The access token obtained for the Microsoft Graph expires after 1 hour. Please re-run this cell if you see this error:

CloudError: Azure Error: ExpiredAuthenticationToken Message: The access token expiry UTC time '10/28/2021 12:31:15 AM' is earlier than current UTC time '10/28/2021 4:56:43 PM'.
graph_prov = mp.QueryProvider("SecurityGraph") if select_cloud.value != "global": azure_cloud = AzureCloudConfig(select_cloud.value) if azure_cloud.cloud != "global": graph_prov.req_body["scope"] = f"{azure_cloud.endpoints.microsoft_graph_resource_id}/.default", graph_prov.oauth_url = ( f"https://{azure_cloud.endpoints.active_directory}/" "{{tenantId}}/oauth2/v2.0/token" ) graph_prov.api_root = azure_cloud.endpoints.microsoft_graph_resource_id # Create connection string connection_str = ( f"tenant_id={txt_tenant_id.value};" f"client_id={client_id};" f"client_secret={get_kv_secret(txt_client_sec_name.value)};" ) graph_prov.connect(connection_str)
Attempting to sign-in with Azure CLI credentials... Connected.

Query MS Graph for Vulnerable AAD objects

This cell connects to the Microsoft Graph, queries available Application and ServicePrincipal objects and check for vulnerable configurations.

The following cell allows you to browse the results.

NEXT_LINK = "@odata.nextLink" def _filter_usage_and_type(od_results): usage_crit = od_results["usage"] != "Sign" type_crit = od_results["type"] == "AsymmetricX509Cert" return od_results[usage_crit & type_crit] def _filter_ext_value(od_results): if "hasExtendedValue" in od_results.columns: extval_crit = od_results["hasExtendedValue"] == True else: extval_crit = ~od_results.index.isna() return od_results[extval_crit] def _check_continuation_query(raw_results_df, api_root): if NEXT_LINK in raw_results_df.columns: query = raw_results_df.iloc[0][NEXT_LINK] return query.replace(api_root, "") return None def get_graph_records(graph_prov, object_class, max_query_records=10000): OD_QUERY =f"/{object_class}s?$select=displayName, appId, id, keyCredentials&$top=200" results_df_list = [] results_count = 0 query = OD_QUERY api_root = graph_prov._query_provider.api_root print(f"Running query for {object_class}...", end="") while query and results_count < max_query_records: # run the query raw_results_df = graph_prov.exec_query(query) print(".", end="") # pull "value" property out and normalize od_results = pd.json_normalize(raw_results_df.iloc[0]["value"]) # update the records count results_count += len(od_results) # filter to only items that have items in the keyCredentials field od_results = od_results[od_results["keyCredentials"].apply(lambda x: len(x) > 0)] if od_results.empty: query = _check_continuation_query(raw_results_df, api_root) continue # extract key_cred columns key_cred_cols_df = ( od_results .explode("keyCredentials") .apply(lambda x: pd.Series(x.keyCredentials), result_type="expand", axis=1) .rename(columns={"displayName": "key_displayName"}) .drop(columns=["0"], errors="ignore") ) # remove the keys themselves since we don't want to leave these lying around od_results = pd.concat([od_results.drop(columns="keyCredentials"), key_cred_cols_df], axis=1) # check for vulnerable entries od_results = _filter_usage_and_type(od_results) od_results = _filter_ext_value(od_results) results_df_list.append(od_results) query = _check_continuation_query(raw_results_df, api_root) print("query complete.", end=" ") if len(results_df_list) > 1: all_results = pd.concat(results_df_list, ignore_index=True).assign(credType=object_class) print(len(all_results), "records found.") return all_results print("0 records found.") return pd.DataFrame(columns=od_results.columns) results_df = pd.concat([ get_graph_records(graph_prov, cred_type, max_query_records=500000) for cred_type in ("Application", "ServicePrincipal") ]) print(len(results_df), "records found.")
Running query for Application....query complete. 0 records found. Running query for ServicePrincipal........query complete. 190 records found. 190 records found.

View and filter vulnerable objects to create a watch list

You can view vulnerable objects in the data browser.

Using the Choose columns and Filter data drop-downs you can change the displayed columns and filter the data.

Warning : the filtered results will be used to create the watchlist content so please hit the Clear all filters button if you want to include all items in the watchlist.
Note: The rows displayed do not update correctly in VS Code. Row filters will be honored for the watchlist upload. To view the records to be uploaded run:
results_viewer.filtered_data
WATCHLIST_COLS = ["displayName", "appId", "credType", "keyId", "key_displayName"] from msticpy.vis.data_viewer import DataViewer results_viewer = DataViewer( results_df.drop(columns=["key"], errors="ignore"), selected_cols=WATCHLIST_COLS ) def get_watchlist_new_data(): return ( results_df[WATCHLIST_COLS] .loc[results_viewer.filtered_data.index] .to_csv(index=False) ) display(results_viewer)
MIME type unknown not supported
MIME type unknown not supported
Accordion(children=(VBox(children=(VBox(children=(Text(value='', description='Filter:', style=DescriptionStyle…

Create watchlist in Azure Sentinel

Setup code

Please run the next cell before continuing; it contains code definitions required later.

import json from msticpy.data.azure_sentinel import AzureSentinel, _get_api_headers, _azs_api_result_to_df import requests from azure.common.exceptions import CloudError WATCHLIST_PATH = "/providers/Microsoft.SecurityInsights/watchlists" REQ_HEADERS = { "Content-Type": "application/json", "Accept": "application/json", "Authorization": None, } WATCHLIST_BODY = { "properties": { "displayName": "Apps/ServicePrincipals with exposed credentials", "description": f"Applications/Service principals impacted by {CVE_NAME}", "source": "Local file", "provider": "Microsoft", "numberOfLinesToSkip": 0, "rawContent": "", "contentType": "text/csv", "itemsSearchKey": "appId", } } class AzureSentinelExt(AzureSentinel): def _parse_res_id_params( self, watchlist_id: str = None, res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None, ): res_id = res_id or self._get_default_workspace() if not res_id: res_id = self._build_res_id(sub_id, res_grp, ws_name) url = self._build_paths(res_id, self.base_url) watchlist_url = ( f"{url}{WATCHLIST_PATH}/{watchlist_id}" if watchlist_id else f"{url}{WATCHLIST_PATH}" ) params = {"api-version": "2021-04-01"} return watchlist_url, params def list_watchlists( self, **kwargs ): watchlist_url, params = self._parse_res_id_params(**kwargs) response = requests.get( watchlist_url, headers=_get_api_headers(self.token), params=params ) if response.status_code == 200: return _azs_api_result_to_df(response) else: raise CloudError(response=response) def get_watchlist_items( self, watchlist_id: str, **kwargs ): watchlist_url, params = self._parse_res_id_params(watchlist_id, **kwargs) watchlist_url = f"{watchlist_url}/watchlistItems" data_parts = [] # The items list is return in chunks of 100 items, so we need to # loop through the items if the response contains a "nextLink" property while watchlist_url: response = requests.get( watchlist_url, headers=_get_api_headers(self.token), params=params ) if response.status_code == 200: data_parts.append(_azs_api_result_to_df(response)) else: raise CloudError(response=response) watchlist_url = response.json().get("nextLink") if watchlist_url: params = {} return pd.concat(data_parts) def create_watchlist( self, watchlist_id: str, watchlist_data: str, **kwargs ): watchlist_url, params = self._parse_res_id_params(watchlist_id, **kwargs) wl_data = WATCHLIST_BODY wl_data["properties"]["rawContent"] = watchlist_data response = requests.put( watchlist_url, headers=_get_api_headers(self.token), params=params, data=json.dumps(wl_data), ) if response.status_code == 200: print("Watchlist created/updated.") return if response.status_code == 400: raise ValueError("Watchlist data was malformed.") if response.status_code == 490: raise ValueError("Watchlist {watchlist_id} already exists.") raise CloudError(response=response) def delete_watchlist( self, watchlist_id: str, **kwargs ): watchlist_url, params = self._parse_res_id_params(watchlist_id, **kwargs) response = requests.delete( watchlist_url, headers=_get_api_headers(self.token), params=params ) if response.status_code == 200: print("Watchlist deleted.") if response.status_code == 400: raise ValueError("Watchlist data was malformed.") if response.status_code == 490: raise ValueError("Watchlist {watchlist_id} already exists.") else: raise CloudError(response=response)

Enter or confirm the details for the Microsoft Sentinel Workspace

Ensure that the details for the watchlist and the target workspace are correct.

All fields in the form below must be completed.
from msticpy.common.pkg_config import settings CVE_NAME = "CVE_2021_42306" WL_NAME = f"Vulnerable_{CVE_NAME}" # def_az_sentinel = settings.get("AzureSentinel", {}).get("Workspaces", {}).get("Default") workspaces = list(settings.get("AzureSentinel", {}).get("Workspaces", {}).keys()) workspaces.remove("Default") heading = widgets.HTML("<h3>Create/update the watchlist</h3>") heading_ws = widgets.HTML( "<h4>Enter target Workspace details</h4>" "All fields must have a value - you can find the correct values" " in the Microsoft Sentinel Portal under Settings->Workspace settings" ) ws_select = widgets.Combobox( description="Enter/Select Workspace", options=workspaces, **WIDGET_DEFAULTS, ) txt_subscription_id = text_box("SubscriptionID") txt_resource_group = text_box("ResourceGroup") txt_workspace_name = text_box("Workspace") txt_watchlist_alias = text_box("Watchlist alias", WL_NAME) txt_watchlist_name = text_box( "Watchlist display name", f"Applications/Service principals impacted by {CVE_NAME}" ) cb_append = widgets.Checkbox( description="Append to current watchlist", value=True, ) html_append = widgets.HTML( "If append is checked the current items will be appended " "to an existing watchlist of with specified name. If the watchlist " "does not exist, it will be created.<br>" "If append is uncheck, the existing watchlist (if any) will be " "delete and replace." ) def update_workspace(change): print(change) ws_name = change.get("new") ws_settings = settings.get("AzureSentinel", {}).get("Workspaces", {}).get(ws_name) if not ws_settings: return txt_subscription_id.value = ws_settings.get("SubscriptionId", "") txt_resource_group.value = ws_settings.get("ResourceGroup", "") txt_workspace_name.value = ws_settings.get("WorkspaceName", ws_name) ws_select.observe(update_workspace, names="value") BOX_LAYOUT = { "layout": widgets.Layout(**{ "width": "90%", "border": "solid gray 1px", "margin": "1pt", "padding": "5pt", }), "style": WIDGET_DEFAULTS["style"] } heading_wl = widgets.HTML( "<h4>Watchlist details - " f"{len(results_viewer.filtered_data)} records to be uploaded.</h4>" ) wkspc_hbox = widgets.VBox( [ heading_ws, ws_select, txt_subscription_id, txt_resource_group, txt_workspace_name, ], **BOX_LAYOUT ) watchlist_hbox = widgets.VBox( [ heading_wl, txt_watchlist_alias, txt_watchlist_name, cb_append, html_append, ], **BOX_LAYOUT ) def get_ws_params(): return { "sub_id": txt_subscription_id.value, "res_grp": txt_resource_group.value, "ws_name": txt_workspace_name.value } def get_watchlist_name(): return {"watchlist_id": txt_watchlist_alias.value} def get_watchlist_displayname(): return {"displayName": txt_watchlist_name.value} display(widgets.VBox([heading, wkspc_hbox, watchlist_hbox]))
VBox(children=(HTML(value='<h3>Create/update the watchlist</h3>'), VBox(children=(HTML(value='<h4>Enter target…

Connect to Microsoft Sentinel Workspace API

az_sent_api = AzureSentinelExt() az_sent_api.connect()

Upload the watchlist

Warning: Potentially destructive operation!
Executing the next cell will update an existing watchlist. If you deselected the option to Append to current watchlist existing watchlist content will be deleted.
def verify_fields(): fields_completed = True for field in ( ws_select, txt_subscription_id, txt_resource_group, txt_workspace_name, txt_watchlist_alias, txt_watchlist_name, ): if not field.value: print(f"Field {field.description} is empty.") fields_completed = False return fields_completed def upload_watchlist(append=True): if not verify_fields(): print("Please check the upload settings fields and retry.") return if not append: try: az_sent_api.delete_watchlist(**get_watchlist_name(), **get_ws_params()) except Exception: pass try: az_sent_api.create_watchlist( **get_watchlist_name(), watchlist_data=get_watchlist_new_data(), **get_ws_params() ) except CloudError as err: if err.args[0] and err.args[0].endswith("does not exist"): print(f"Watchlist {get_watchlist_name()} not found") else: raise upload_watchlist(append=cb_append.value)
Watchlist created/updated. Watchlist created/updated.

Search for vulnerable ServicePrincipals/Apps in Microsoft Sentinel

You can optionally search for suspicious activity involving the affected accounts.

To use a Sentinel workspace other than your default, set the workspace_name variable to the name of the entry in your msticpyconfig.yaml. E.g.

workspace_name = "MyMainWorkspace"
qry_prov = mp.QueryProvider("AzureSentinel") # Set this to a valid entry in your msticpyconfig.yaml workspace_name = None qry_prov.connect(mp.WorkspaceConfig(workspace=workspace_name))
Connecting...
connected

Supplementary functions

Download/Browse current watchlists

wl_lists = az_sent_api.list_watchlists(**get_ws_params()) wl_cols = [ "name", "properties.displayName", "properties.provider", "properties.createdBy.email", "properties.created", "properties.updated", "properties.watchlistAlias" ] wl_viewer = DataViewer(data=wl_lists, selected_cols=wl_cols) wl_viewer
MIME type unknown not supported
MIME type unknown not supported
Accordion(children=(VBox(children=(VBox(children=(Text(value='', description='Filter:', style=DescriptionStyle…

Download and view the named watchlist

By default "Vulnerable_CVE_2021_42306"

watchlist_df = None try: watchlist_df = az_sent_api.get_watchlist_items(**get_watchlist_name(), **get_ws_params()) print(len(watchlist_df), "items in watchlist") except CloudError as err: if err.args[0] and err.args[0].endswith("does not exist"): print(f"Watchlist {get_watchlist_name()} not found") else: raise if watchlist_df is not None: wlview_cols = { col: col.replace("properties.itemsKeyValue.", "") for col in watchlist_df.columns if col.startswith("properties.itemsKeyValue") } view_cols = ["name", *wlview_cols.values()] watchlist_viewer = DataViewer(watchlist_df.rename(columns=wlview_cols), selected_cols=view_cols) display(watchlist_viewer)
194 items in watchlist
MIME type unknown not supported
MIME type unknown not supported
Accordion(children=(VBox(children=(VBox(children=(Text(value='', description='Filter:', style=DescriptionStyle…