Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/Guided Investigation - Azure WAF SQLI.ipynb
3249 views
Kernel: Python 3.10 - SDK v2

Azure WAF SQLI Incident Triage Notebook

  • Version: 1.0

  • Data Sources Required: AzureDiagnostics, SecurityAlert

This Notebook is designed to help you triage incidents generated from Azure Front Door Web Application Firewall (WAF) SQL injection (SQLI) events.
You can use it to help determine if these incidents are True Positive, Benign Positive or False Positive and if False Positive add additional exclusions to your WAF policy to prevent further occurrence.

In order to use this Notebook you need to have Analytics generating incidents related to Azure Front Door WAF SQLI events in your Sentinel workspace, as well as permissions to access and update WAF rules in Front Door.

More details about Azure Front Door WAF can be found here: https://learn.microsoft.com/en-us/azure/web-application-firewall/afds/afds-overview


Notebook initialization

Before running this notebook ensure you have MSTICPy installed with the Azure extras.

The next cell:

  • Imports the required packages into the notebook

  • Sets a number of configuration options.

More details...

This should complete without errors. If you encounter errors or warnings look at the following two notebooks:
- TroubleShootingNotebooks
- ConfiguringNotebookEnvironment

If you are running in the Microsoft Sentinel Notebooks environment (Azure Notebooks or Azure ML) you can run live versions of these notebooks: - Run TroubleShootingNotebooks
- Run ConfiguringNotebookEnvironment

You may also need to do some additional configuration to successfully use functions such as Threat Intelligence service lookup and Geo IP lookup. There are more details about this in the ConfiguringNotebookEnvironment notebook and in these documents:
- msticpy configuration
- Threat intelligence provider configuration

import msticpy as mp import httpx import json import ipywidgets as widgets from IPython.display import HTML from msticpy.nbwidgets import SelectAlert from msticpy.vis.entity_graph_tools import EntityGraph from datetime import datetime, timezone, timedelta from msticpy.common.exceptions import MsticpyException from msticpy.nbwidgets import Progress mp.init_notebook()
# Default Parameters ws_name = "Default" incident_id = None end = datetime.now(timezone.utc) + timedelta(hours=1) start = end - timedelta(days=30)

Authenticate to Microsoft Sentinel APIs and Select Subscriptions

The notebook is expecting your Microsoft Sentinel Tenant ID, Subscription ID, Resource Group name, Workspace name, and Workspace ID to be configured in msticpyconfig.yaml in the current folder or location specified by MSTICPYCONFIG environment variable.
For help with setting up your msticpyconfig.yaml file see the Setup section at the end of this notebook, the ConfigureNotebookEnvironment notebook or https://msticpy.readthedocs.io/en/latest/getting_started/msticpyconfig.html

These cells connect to the Microsoft Sentinel APIs and the Log Analytics data store behind it.
In order to use this the user must have at least read permissions on the Microsoft Sentinel workspace.
Select the Workspace you want to connect to from the list of workspaces configured in your msticpyconfig.yaml file and then authenticate to this workspace.

Note: you may be asked to authenticate twice, once for the APIs and once for the Log Analytics workspace.

print( "Configured workspaces: ", ", ".join(mp.settings.get_config("AzureSentinel.Workspaces").keys()), ) import ipywidgets as widgets ws_param = widgets.Combobox( description="Workspace Name", value=ws_name, options=list(mp.settings.get_config("AzureSentinel.Workspaces").keys()), ) ws_param
ws_name = ws_param.value sent_prov = mp.MicrosoftSentinel(workspace=ws_name) sent_prov.connect() qry_prov = mp.QueryProvider("MSSentinel") qry_prov.connect(mp.WorkspaceConfig(ws_name))

Get SQLI Incidents

The first step of the investigation is to find the Azure Front Door WAF SQLI incidents to triage, to do that we look for any incidents generated from Analytics looking at SQLI events from WAF logs.

Review the details of incidents below and select one to triage further.

# Format and display incident details def display_incident(incident): details = f""" <h3>Selected Incident: {incident['title']},</h3> <b>Incident time: </b> {incident['createdTimeUtc']} - <b>Severity: </b> {incident['severity']} - <b>Assigned to: </b>{incident['properties.owner.userPrincipalName']} - <b>Status: </b> {incident['status']} """ new_idx = [idx.split(".")[-1] for idx in incident.index] incident.set_axis(new_idx, copy=False) return (HTML(details), pd.DataFrame(incident)) # Find WAF SQLI analytics deployed in the workspace analytics = sent_prov.list_analytic_rules() if analytics.empty: raise MsticpyException("No Analytics found in this workspace") else: sqli_analytics = analytics[ ( analytics["properties.query"].str.contains( "FrontDoorWebApplicationFirewallLog" ) | analytics["properties.query"].str.contains( "ApplicationGatewayFirewallLog" ) ) & ( analytics["properties.query"].str.contains("SQLI") | analytics["properties.query"].str.contains("SQL Injection") ) ] sqli_analytics_ids = sqli_analytics["id"].unique() # Find incidents triggered by these analytics incidents = sent_prov.list_incidents() if incidents.empty: raise MsticpyException("No Incidents found in this workspace") else: sqli_mask = incidents["properties.relatedAnalyticRuleIds"].apply( lambda x: any( [ item for item in sqli_analytics_ids if item.lower() in [analytic.lower() for analytic in x] ] ) ) sqli_incidents = incidents[sqli_mask] sqli_incidents.rename( columns={ "properties.title": "title", "properties.status": "status", "properties.severity": "severity", "properties.createdTimeUtc": "createdTimeUtc", }, inplace=True, ) sqli_incidents.mp_plot.timeline( title="SQLI Incidents", group_by="severity", source_columns=["title", "status", "severity"], time_column="createdTimeUtc", ) # Allow user to select the incident they want to focus on and display the details of the alert once selected md("Select an incident to triage:", "bold") alert_sel = SelectAlert( alerts=sqli_incidents, default_alert=incident_id, columns=["title", "severity", "status", "name"], time_col="createdTimeUtc", id_col="id", action=display_incident, ) alert_sel.display()

Review details of the incident

Review the details below to understand the core details of the incident selected.

incident_details = sent_prov.get_incident( alert_sel.selected_alert.id.split("/")[-1], entities=True, alerts=True ) ent_dfs = [] for ent in incident_details["Entities"][0]: ent_df = pd.json_normalize(ent[1]) ent_df["Type"] = ent[0] ent_dfs.append(ent_df) if ent_dfs: md("Incident Entities:", "bold") new_df = pd.concat(ent_dfs, axis=0, ignore_index=True) grp_df = new_df.groupby("Type") for grp in grp_df: md(grp[0], "bold") display(grp[1].dropna(axis=1)) alert_out = [] if "Alerts" in incident_details.columns: md("Related Alerts:", "bold") for alert in incident_details.iloc[0]["Alerts"]: qry = f"SecurityAlert | where TimeGenerated between((datetime({start})-7d)..datetime({end})) | where SystemAlertId == '{alert['ID']}'" df = qry_prov.exec_query(qry) display(df) if df.empty or not df["Entities"].iloc[0]: alert_full = {"ID": alert["ID"], "Name": alert["Name"], "Entities": None} else: alert_full = { "ID": alert["ID"], "Name": alert["Name"], "Entities": json.loads(df["Entities"].iloc[0]), } alert_out.append(alert_full) incident_details["Alerts"] = [alert_out] md("Graph of incident entities:", "bold") graph = EntityGraph(incident_details.iloc[0]) graph.plot(timeline=True) incident_id = alert_sel.value["id"] rule_ids = incidents[incidents["id"] == incident_id].iloc[0][ "properties.relatedAnalyticRuleIds" ] rule_mask = analytics["id"].apply( lambda x: any(item for item in rule_ids if item.lower() in x.lower()) ) incident_rules = analytics[rule_mask] if len(incident_rules.index) > 1: incident_query = "" for rule in incident_rules.iterrows(): incident_query += rule[1]["properties.query"] else: incident_query = incident_rules.iloc[0]["properties.query"] front_door = False app_gateway = False if "FrontDoorWebApplicationFirewallLog" in incident_query: front_door = True if "ApplicationGatewayFirewallLog" in incident_query: app_gateway = True if not front_door: raise MsticpyException( "This notebook is designed to process Azure Front Door WAF events. Incidents that contain Application Gateway WAF events are not currently supported." )

Review TI results

The following cell takes any Entities associated with the Incident selected and checks if they appear in Threat Intelligence feeds to provide further context.
Documentation on Incident entities can be found here: https://learn.microsoft.com/azure/sentinel/incident-investigation
This cell uses MSTICPy's threat intelligence features and will use the providers configured in the msticpyconfig.yaml file. More details on this feature can be found here: https://msticpy.readthedocs.io/en/latest/data_acquisition/TIProviders.html

ti = mp.TILookup() sev = [] resps = pd.DataFrame() # For each entity look it up in Threat Intelligence data md("Looking up entities in TI feeds...") prog = Progress(completed_len=len(incident_details["Entities"].iloc[0])) i = 0 result_dfs = [] for ent in incident_details["Entities"].iloc[0]: i += 1 prog.update_progress(i) if ent[0] == "Ip": resp = ti.lookup_ioc(ent[1]["address"], ioc_type="ipv4") result_dfs.append(ti.result_to_df(resp)) sev += resp["Severity"].unique().tolist() if ent[0] == "Url" or ent[0] == "DnsResolution": if "url" in ent[1]: lkup_dom = ent[1]["url"] else: lkup_dom = ent[1]["domainName"] resp = ti.lookup_ioc(lkup_dom, ioc_type="url") result_dfs.append(ti.result_to_df(resp)) sev += resp["Severity"].unique().tolist() if ent[0] == "FileHash": resp = ti.lookup_ioc(ent[1]["hashValue"]) result_dfs.append(ti.result_to_df(resp)) sev += resp["Severity"].unique().tolist() if result_dfs: resps = pd.concat(result_dfs) else: resps = pd.DataFrame() # Take overall severity of the entities based on the highest score if "high" in sev: severity = "High" elif "warning" in sev: severity = "Warning" elif "information" in sev: severity = "Information" else: severity = "None" md("Checking to see if incident entities appear in TI data...") incident_details["TI Severity"] = severity # Output TI hits of high or warning severity display(incident_details) if ( incident_details["TI Severity"].iloc[0] == "High" or incident_details["TI Severity"].iloc[0] == "Warning" or incident_details["TI Severity"].iloc[0] == "Information" ): print("Incident:") display( incident_details[ [ "properties.createdTimeUtc", "properties.incidentNumber", "properties.title", "properties.status", "properties.severity", "TI Severity", ] ] ) md("TI Results:", "bold") display( resps[["Ioc", "IocType", "Provider", "Severity", "Details"]].sort_values( by="Severity" ) ) else: md("None of the Entities appeared in TI data", "bold")

Get raw events in incident time frame

Now that we have selected an incident to triage we can look at the WAF log events that relate to the incident, along with details of the WAF rule that triggered the incident.

Review the details in the cells below and select a specific event to see further details in the cells below.

def parse_rule_id(row): return row["ruleName_s"].split("-")[-1] # Format display of WAF rule details def display_event_details(rule_detail): details = f""" <h3>Event Type: {rule_detail['details_msg_s']},</h3> <b>Time Generated: </b> {rule_detail['TimeGenerated']}<br> <b>Rule: </b> {rule_detail['ruleName_s']} <br> <b>Details: </b>{rule_detail['details_data_s']} <br> <b>Client IP: </b> {rule_detail['clientIP_s']} <br> <b>Client Port: </b> {rule_detail['clientPort_s']} <br> <b>Socket IP: </b> {rule_detail['socketIP_s']} <br> <b>Host: </b> {rule_detail['host_s']}<br> """ if rule_detail["ruleName_s"].startswith("Microsoft_DefaultRuleSet"): for rule in owasp_sqli_rule_set: if "id" in rule and rule["id"] == rule_detail["RuleID"]: owasp_rule = rule else: owasp_rule = "Custom Rule, this is not supported by this notebook" return (HTML(details), "OWASP Rule Details:", owasp_rule) # Get raw events and parse out the rule ID start_time = incidents[incidents["id"] == incident_id].iloc[0][ "properties.firstActivityTimeUtc" ] end_time = incidents[incidents["id"] == incident_id].iloc[0][ "properties.lastActivityTimeUtc" ] rule_query = f"""AzureDiagnostics | where TimeGenerated between(datetime('{start_time}')..datetime('{end_time}')) | where ruleName_s contains 'SQLI' """ raw_events_df = qry_prov.exec_query(rule_query) if raw_events_df.empty: md("Unable to find any events related to this incident.") else: rule_details_df = raw_events_df[ [ "TimeGenerated", "ResourceGroup", "SubscriptionId", "policy_s", "details_msg_s", "requestUri_s", "httpStatusCode_d", "ruleName_s", "action_s", "details_data_s", "clientIP_s", "host_s", "socketIP_s", "clientPort_s", ] ].drop_duplicates() rule_details_df["RuleID"] = rule_details_df.apply(parse_rule_id, axis=1) md("WAF rule firing events occurring in the incident timeframe:", "bold") display(rule_details_df) rule_details_df.mp_plot.timeline( title="WAF Rule Firing Events", group_by="ruleName_s" ) if isinstance(rule_details_df, pd.DataFrame) and not rule_details_df.empty: owasp_sqi_rules_response = httpx.get( "https://raw.githubusercontent.com/SpiderLabs/owasp-modsecurity-crs/v3.2/master/rules/REQUEST-942-APPLICATION-ATTACK-SQLI.conf" ) owasp_sqi_rules = [ x for x in owasp_sqi_rules_response.text.split("\n") if not x.startswith("#") ] owasp_sqi_rules_text = "".join([str(item) for item in owasp_sqi_rules]) owasp_sqi_rules_text.split("'\"") owasp_sqli_rule_set = [] for rule in owasp_sqi_rules_text.split("'\""): rule_details = {} tags = [] for row in rule.split("\\ "): if row.startswith("SecRule "): rule_details["rulelogic"] = row.split("SecRule ")[-1] elif ":" in row: split_row = row.split(":") if split_row[0].strip('"') == "tag": tags.append(split_row[1].strip('"')) else: rule_details[split_row[0].strip('"')] = ( split_row[1].strip('"').strip(",") ) rule_details["tags"] = tags owasp_sqli_rule_set.append(rule_details) md("Select an WAF Event to triage:", "bold") rule_details_df["full_id"] = rule_details_df["RuleID"] + rule_details_df[ "TimeGenerated" ].astype(str) event_sel = SelectAlert( alerts=rule_details_df, columns=[ "TimeGenerated", "ResourceGroup", "SubscriptionId", "policy_s", "details_msg_s", "requestUri_s", "httpStatusCode_d", "ruleName_s", "action_s", "details_data_s", "clientIP_s", "host_s", "socketIP_s", "clientPort_s", "RuleID", ], time_col="TimeGenerated", id_col="full_id", action=display_event_details, ) event_sel.display()

Look at other events associated with the event above to understand the context of this WAF rule and its historical activity.

rule_events_query = f"""AzureDiagnostics | where TimeGenerated between(datetime('{start_time}')..datetime('{end_time}')) | where ruleName_s =~ "{event_sel.value['ruleName_s']}" or clientIP_s =~ "{event_sel.value['clientIP_s']}" or host_s =~ "{event_sel.value['host_s']}" """ rule_events_df = qry_prov.exec_query(rule_events_query) md(f"Summary of {event_sel.value['ruleName_s']} rule events:", "bold") rule_events_df.mp_plot.timeline( title="Rule Events by Request URI", group_by="requestUri_s", source_columns=["TimeGenerated", "ruleName_s", "clientIP_s", "host_s"], ) rule_events_df.mp_plot.timeline( title="Rule Events by Client IP", group_by="clientIP_s", source_columns=["TimeGenerated", "host_s", "clientIP_s", "requestUri_s"], ) rule_events_df.mp_plot.timeline( title="Rule Events by Host", group_by="host_s", source_columns=["TimeGenerated", "ruleName_s", "clientIP_s", "requestUri_s"], ) rule_events_df.mp_plot.timeline( title="Events by Rule Triggered", group_by="ruleName_s", source_columns=["TimeGenerated", "host_s", "clientIP_s", "requestUri_s"], ) md(f"{event_sel.value['ruleName_s']} events:", "bold") display(rule_events_df)

Determine the incident status.

Based on the above details determine whether the incident is a False Positive, True Positive or Benign Positive.
This status will be reflected in the incident within the Sentinel portal.

Rule_set_name = event_sel.value["ruleName_s"].split("-")[0] Rule_set_version = event_sel.value["ruleName_s"].split("-")[1] Rule_set_type = event_sel.value["ruleName_s"].split("-")[2] Rule_set_id = event_sel.value["ruleName_s"].split("-")[3] sub_id = event_sel.value["SubscriptionId"] policy_name = event_sel.value["policy_s"] rg_name = event_sel.value["ResourceGroup"] incident_status = widgets.Dropdown( options=["True Positive", "False Positive", "Benign Positive"], description="Status:", disabled=False, ) print("What is the determined status of this incident?") incident_status
if incident_status.value in ["True Positive", "Benign Positive"]: sent_prov.update_incident( alert_sel.selected_alert.id.split("/")[-1], update_items={"severity": "High", "status": "Active"}, ) sent_prov.post_comment( alert_sel.selected_alert.id.split("/")[-1], comment=f"Incident triaged in notebook, determined to be a {incident_status.value} event.", ) elif incident_status.value == "False Positive" and not Rule_set_name.startswith( "Microsoft_" ): md("Updating non-Default rule-sets is not supported in this notebook currently") else: md( "If this is a False Positive use the cells below to add additional exclusions to your WAF policy" )

If the above incident is determined to be a false positive you can add exclusions to the WAF rule-set to prevent further alerts.

These exclusions are applied at the WAF level and can prevent future WAF blocks based on set parameters.
More details of WAF exclusions can be found here: https://learn.microsoft.com/en-us/azure/web-application-firewall/afds/waf-front-door-exclusion

Use the cells below to review the currently deployed policy and define and deploy the exclusions.

if incident_status.value == "False Positive": if Rule_set_name != "Microsoft_DefaultRuleSet": raise MsticpyException( "Custom rule exclusions are not supported in this notebook" ) api_url = f"https://management.azure.com/subscriptions/{sub_id}/resourceGroups/{rg_name}/providers/Microsoft.Network/FrontDoorWebApplicationFirewallPolicies/{policy_name}?api-version=2020-11-01" headers = { "Authorization": f"Bearer {sent_prov.token}", "Content-Type": "application/json", } api_response = httpx.get(api_url, headers=headers) policy_props = dict( (k, api_response.json()[k]) for k in ("tags", "sku", "properties", "etag", "location") if k in api_response.json() ) prop_props = policy_props["properties"] policy_props["properties"] = dict( (k, prop_props[k]) for k in ("customRules", "managedRules", "policySettings") if k in prop_props ) md("Current policy configuration: ", "bold") print(json.dumps(policy_props, indent=4)) else: md("No policy updates required for True Positive or Benign Positive events")

Select the number of exclusions that you want to add to the WAF rule:

if incident_status.value == "False Positive": number_exclusions = widgets.Dropdown( options=[1, 2, 3, 4, 5], description="Number of exclusions", disabled=False ) display(number_exclusions) else: md("No policy updates required for True Positive or Benign Positive events")
if incident_status.value == "False Positive": exclusion_widgets = {} for i in range(number_exclusions.value): variable_sel = widgets.Dropdown( options=[ "QueryStringArgNames", "RequestBodyJsonArgNames", "RequestBodyPostArgNames", "RequestCookieNames", "RequestHeaderNames", ], description="Match Variable:", disabled=False, ) operator_sel = widgets.Dropdown( options=["Contains", "EndsWith", "Equals", "EqualsAny", "StartsWith"], description="Operator:", disabled=False, ) value_sel = widgets.Text(description="Selector:", disabled=False) exclusion_widgets[i] = { "variable_sel": variable_sel, "operator_sel": operator_sel, "value_sel": value_sel, } for widg in exclusion_widgets: md(f"Exclusion {widg+1}:", "bold") display(exclusion_widgets[widg]["variable_sel"]) display(exclusion_widgets[widg]["operator_sel"]) display(exclusion_widgets[widg]["value_sel"]) else: md("No policy updates required for True Positive or Benign Positive events")

The cell below takes the new exclusions defined above and adds them to the currently set exclusions.

# Remove un-needed None values from policy def clean_nones(value): if isinstance(value, list): return [clean_nones(x) for x in value if x is not None] elif isinstance(value, dict): return {key: clean_nones(val) for key, val in value.items() if val is not None} else: return value def bool_to_string(value): if isinstance(value, bool): return str(value).lower() if incident_status.value == "False Positive": policy_props_backup = policy_props # Build new exclusions for widgets new_exclusions = [] for widg in exclusion_widgets: new_exclusions.append( { "matchVariable": f'{exclusion_widgets[widg]["variable_sel"].value}', "selectorMatchOperator": f'{exclusion_widgets[widg]["operator_sel"].value}', "selector": f'{exclusion_widgets[widg]["value_sel"].value}', } ) modified_rule_set = None override_set = [] new_rules = [] # Get existing ruleset new_rule_set = [] for rule_set in policy_props["properties"]["managedRules"]["managedRuleSets"]: if rule_set["ruleSetType"] == Rule_set_name: modified_rule_set = rule_set else: new_rule_set.append(rule_set) exclusion_exists = False for override in modified_rule_set["ruleGroupOverrides"]: if override["ruleGroupName"] == "SQLI": exclusion_exists = True rule_ids = [rule["ruleId"] for rule in override["rules"]] if Rule_set_id in rule_ids: for rule in override["rules"]: if rule["ruleId"] == f"{Rule_set_id}": rule["exclusions"] += new_exclusions new_rules.append(rule) else: new_rules = override["rules"] + [ { "ruleId": f"{Rule_set_id}", "enabledState": "Enabled", "action": "AnomalyScoring", "exclusions": new_exclusions, } ] override["rules"] = new_rules override_set.append(override) if not exclusion_exists: modified_rule_set["ruleGroupOverrides"] = [ { "ruleGroupName": "SQLI", "rules": [ { "ruleId": f"{Rule_set_id}", "enabledState": "Enabled", "action": "AnomalyScoring", "exclusions": new_exclusions, } ], } ] if modified_rule_set: if override_set: # Remove the existing SQLI rules and replace with our modified set modified_rule_set["ruleGroupOverrides"] = override_set new_rule_set.append(modified_rule_set) new_props = policy_props new_props["properties"]["managedRules"]["managedRuleSets"] = new_rule_set new_props = clean_nones(new_props) # Add check that all previous policies still exist in json before applying if set( [ ruleset["ruleSetType"] for ruleset in new_props["properties"]["managedRules"]["managedRuleSets"] ] ) != set( [ existing_ruleset["ruleSetType"] for existing_ruleset in policy_props_backup["properties"]["managedRules"][ "managedRuleSets" ] ] ): raise Exception( "An issue has occurred and one of the existing rulesets has been removed. To prevent accidental deletion of a ruleset the update has been prevent. Please re-run this notebook and try again." ) new_props = json.dumps(new_props, default=bool_to_string) # Apply policy via API api_update_response = httpx.put(api_url, headers=headers, content=new_props) if api_update_response.status_code in (200, 201, 202): md("Exclusions applied") else: md( f"There was a problem updating the exclusions status code: {api_update_response.status_code}. Please try adding the exclusions via the Azure Portal." ) else: md("No policy updates required for True Positive or Benign Positive events")

Review Updated Exclusion Rules

Below you can see the exclusion rules newly applied to validate they are as expected.

if incident_status.value == "False Positive": updated_rule_api_response = httpx.get(api_url, headers=headers) print(updated_rule_api_response.json()) else: md("No policy updates required for True Positive or Benign Positive events")

Update Incident

Now the exclusions have been put in place we can update the incident in Microsoft Sentinel to reflect this.

if incident_status.value == "False Positive": sent_prov.update_incident( alert_sel.selected_alert.id.split("/")[-1], update_items={"severity": "Low"} ) sent_prov.post_comment( alert_sel.selected_alert.id.split("/")[-1], comment="Incident triaged in notebook, WAF policy updated with exclusions.", ) else: md("No policy updates required for True Positive or Benign Positive events")

Appendix

Configuration

msticpyconfig.yaml configuration File

You can configure primary and secondary TI providers and any required parameters in the msticpyconfig.yaml file. This is read from the current directory or you can set an environment variable (MSTICPYCONFIG) pointing to its location.

To configure this file see the ConfigureNotebookEnvironment notebook