Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/scenario-notebooks/AutomatedNotebooks-IncidentTriage.ipynb
3250 views
Kernel: papermill
import json import os from collections import Counter from datetime import datetime, timedelta import matplotlib.pyplot as plt import numpy as np import pandas as pd from bokeh.plotting import show from whois import whois import msticnb as nb from msticpy.common.azure_auth import az_connect from msticpy.common.timespan import TimeSpan from msticpy.data.data_providers import QueryProvider from msticpy.nbtools.utility import md from msticpy.sectools.tilookup import TILookup try: os.environ.pop("AZURE_CLIENT_SECRET") except: pass pd.options.mode.chained_assignment = None
def check_ent(items, entity): """Check if entity is present""" for item in items: if entity in item: return True return False def are_equal(ent1, ent2): """Checks properties to see if the entities are the same.""" result = True for prop in ent1.properties: result &= getattr(ent1, prop, None) == getattr(ent2, prop, None) return result def color_cells(val): """Color cells of output dataframe based on severity""" color = "none" if isinstance(val, str): if val.casefold() == "high": color = "Red" elif val.casefold() == "warning" or val.casefold() == "medium": color = "Orange" elif val.casefold() == "information" or val.casefold() == "low": color = "Green" return f"background-color: {color}" def Entropy(data): """Calculate entropy of string""" s, lens = Counter(data), np.float(len(data)) return -sum(count / lens * np.log2(count / lens) for count in s.values()) def color_cells(val): """Color table cells based on values in the cells""" if isinstance(val, int): color = "yellow" if val < 3 else "none" elif isinstance(val, float): color = "yellow" if val > 4.30891 or val < 2.72120 else "none" else: color = "none" return "background-color: %s" % color
# Connect to Microsoft Sentinel %env KQLMAGIC_LOAD_MODE=silent qry_prov = QueryProvider("LogAnalytics") creds = az_connect(["cli", "msi"]) token = creds.modern.get_token("https://api.loganalytics.io/.default") token_dict = { "access_token": token.token, "token_type": "Bearer", "resource": "https://api.loganalytics.io/", } qry_prov.connect( connection_str=f"loganalytics://code().tenant('{ten_id}').workspace('{ws_id}')", kqlmagic_args=f"-try_token=token_dict", )
# Establish threat intelligence provider ti = TILookup()
# Set up notebooklets nb.init(qry_prov) timespan = TimeSpan(start=datetime.now() - timedelta(days=7))
# Get details of the incident from the Securityincident table incidents = qry_prov.exec_query( f"SecurityIncident | where IncidentName =~ '{incident_id}'" ) incident = incidents.iloc[0]
# Extract individual alerts from the incident alert_id_list = ", ".join(list({f'"{id}"' for id in incident.AlertIds})) query = f"SecurityAlert | where SystemAlertId in ({alert_id_list})" alert_df = qry_prov.exec_query(query)
# For each alert in the incident extract the entities and build a unique list from collections import defaultdict alert_entities = {} unique_entities = defaultdict(list) for _, row in alert_df.iterrows(): try: alert = SecurityAlert(row) alert_entities[alert.SystemAlertId] = alert.entities for ent in alert.entities: for current_ent in unique_entities.get(ent.Type, []): if are_equal(ent, current_ent): break else: unique_entities[ent.Type].append(ent) except: pass
sev = [] resps = pd.DataFrame() # For each entity look it up in Threat Intelligence data for ent in unique_entities.items(): if (ent[0]) == "ip": for ip in ent[1]: resp = ti.lookup_ioc( observable=ip["Address"], ioc_type="ipv4" ) resps = resps.append(ti.result_to_df(resp), ignore_index=True) for response in resp[1]: sev.append(response[1].severity) if (ent[0]) == "url": for url in ent[1]: resp = ti.lookup_ioc( observable=url["Url"], ioc_type="url" ) resps = resps.append(ti.result_to_df(resp), ignore_index=True) for response in resp[1]: sev.append(response[1].severity) # Take overall severity of the entities based on the highest score if "high" in sev: severity = "High" elif "warning" in sev: severity = "Warning" elif "information" in sev: severity = "Information" else: severity = "None" incident["TI Severity"] = severity # Output TI hits of high or warning severity if incident["TI Severity"] == "High" or incident["TI Severity"] == "Warning": print("Incident:") display( incident.to_frame() .T[ [ "TimeGenerated", "IncidentNumber", "Title", "Status", "Severity", "TI Severity", ] ] .style.applymap(color_cells) .hide_index() ) print("TI Results:") display( resps[["Ioc", "IocType", "Provider", "Severity", "Details"]] .style.applymap(color_cells) .hide_index() )
# Enrich IP entities using the IP Summary notebooklet ip_ent_nb = nb.nblts.azsent.network.IpAddressSummary() if not resps.empty and "ipv4" in resps["IocType"].unique(): for ip_addr in resps[resps["IocType"] == "ipv4"]["Ioc"].unique(): try: ip_ent_nb_out = ip_ent_nb.run(value=ip_addr, timespan=timespan, silent=True) if ( isinstance(ip_ent_nb_out.whois, pd.DataFrame) and not ip_ent_nb_out.whois.empty ): md(f"Whois information for {ip_addr}") display(ip_ent_nb_out.whois) if ( isinstance(ip_ent_nb_out.geoip, pd.DataFrame) and not ip_ent_nb_out.geoip.empty ): md(f"Geo IP details for {ip_addr}") display(ip_ent_nb_out.geoip) if ( isinstance(ip_ent_nb_out.related_alerts, pd.DataFrame) and not ip_ent_nb_out.related_alerts.empty and ip_ent_nb_out.alert_timeline ): md(f"Alerts for {ip_addr}") show(ip_ent_nb_out.alert_timeline) if ( isinstance(ip_ent_nb_out.ti_results, pd.DataFrame) and not ip_ent_nb_out.ti_results.empty ): md(f"TI results for {ip_addr}") display(ip_ent_nb_out.ti_results) if ( isinstance(ip_ent_nb_out.passive_dns, pd.DataFrame) and not ip_ent_nb_out.passive_dns.empty ): md(f"Passive DNS results for {ip_addr}") display(ip_ent_nb_out.passive_dns) if ( isinstance(ip_ent_nb_out.vps_network, pd.DataFrame) and not ip_ent_nb_out.vps_network.empty ): md(f"{ip_addr} belongs to a known VPS provider") display(ip_ent_out.vps_network) if ( isinstance(ip_ent_nb_out.host_entity, pd.DataFrame) and not ip_ent_nb_out.host_entity.empty ): md(f"{ip_addr} belongs to a known host") display(ip_ent_out.host_entity) except: print(f"Error processing {ip_addr}") else: md("No IP entities present ")
# Enrich Domain entities domain_items = [ "name", "org", "city", "state", "country", "registrar", "status", "creation_date", "expiration_date", "updated_date", "name_servers", "dnssec", ] domain_records = pd.DataFrame() if not resps.empty and "url" in resps["IocType"].unique(): for url in resps[resps["IocType"] == "url"]["Ioc"].unique(): md(f"Summary for {url}", "bold") wis = whois(url) if wis.domain_name: if isinstance(wis["domain_name"], list): domain = wis["domain_name"][0] else: domain = wis["domain_name"] # Create domain record from whois data dom_rec = {} for key in wis.keys(): if key in domain_items: dom_rec[key] = [wis[key]] dom_rec["domain"] = domain dom_record = pd.DataFrame(dom_rec) page_rank = ti.result_to_df( ti.lookup_ioc(observable=domain, providers=["OPR"]) ) page_rank_score = page_rank["RawResult"][0]["response"][0][ "page_rank_integer" ] dom_record["Page Rank"] = [page_rank_score] dom_ent = Entropy(domain) dom_record["Entropy"] = [dom_ent] # Highlight page rank of entropy scores of note display( dom_record.T.style.applymap( color_cells, subset=pd.IndexSlice[["Page Rank", "Entropy"], 0] ) ) md( "If Page Rank or Domain Entropy are highlighted this indicates that their values are outside the expected values of a legitimate website" ) md(f"The average entropy for the 1M most popular domains is 3.2675") else: md("No Domain entities present ")
# Enrich Account entities using the AccountSummary notebooklet timespan = TimeSpan(start=datetime.now() - timedelta(days=2)) account_nb = nb.nblts.azsent.account.AccountSummary() user = None uent = None if check_ent(unique_entities.items(), "account"): for ent in unique_entities.items(): if ent[0] == "account": if "AadUserId" in str(ent[1][0]): try: uent = ent[1][1] except IndexError: pass else: uent = ent[1][0] if uent: try: user = uent["Name"] + "@" + uent["UPNSuffix"] except TypeError: user = uent["Name"] if user: try: ac_nb = account_nb.run( timespan=timespan, value=user.casefold(), silent=True ) ac_nb.get_additional_data() md(f"Account summary for {user}", "bold") if ( isinstance(ac_nb.account_activity, pd.DataFrame) and not ac_nb.account_activity.empty ): md("Recent activity") display(ac_nb.account_activity) if ( isinstance(ac_nb.related_alerts, pd.DataFrame) and not ac_nb.related_alerts.empty ): show(ac_nb.alert_timeline) if ( isinstance(ac_nb.host_logon_summary, pd.DataFrame) and not ac_nb.host_logon_summary.empty ): md(f"Host logons by {user}") display(ac_nb.host_logon_summary) if ( isinstance(ac_nb.azure_activity_summary, pd.DataFrame) and not ac_nb.azure_activity_summary.empty ): md(f"Azure activity by {user}") display(ac_nb.azure_activity_summary) show(ac_nb.azure_timeline_by_provider) except: print(f"Error processing {user}") else: md("No Account entities present ")
# Enrich Host entities using the HostSummary notebooklet timespan = TimeSpan(start=datetime.now() - timedelta(days=2)) host_nb = nb.nblts.azsent.host.HostSummary() if check_ent(unique_entities.items(), "host"): for ent in unique_entities.items(): if ent[0] == "host": for host in ent[1]: if host["DnsDomain"]: host_name = host["HostName"] + "." + host["DnsDomain"], "" else: host_name = host["HostName"] md(f"Host summary for {host_name}", "bold") try: host_sum_out = host_nb.run(value=host_name, timespan=timespan) except: print(f"Error processing {host_name}") else: md("No Host entities present")