Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/tutorials-and-examples/deprecated-notebooks/Entity Explorer - Windows Host.ipynb
3253 views
Kernel: Python 3.8 - AzureML

Windows Host Explorer

Details...

Notebook Version: 1.0
Python Version: Python 3.6 (including Python 3.6 - AzureML)
Required Packages: kqlmagic, msticpy, pandas, numpy, matplotlib, bokeh, networkx, ipywidgets, ipython, scikit_learn, dnspython, ipwhois, folium, maxminddb_geolite2

Data Sources Required:

  • Log Analytics - SecurityAlert, SecurityEvent (EventIDs 4688 and 4624/25), AzureNetworkAnalytics_CL, Heartbeat

  • (Optional) - VirusTotal, AlienVault OTX, IBM XForce, Open Page Rank, (all require accounts and API keys)

Brings together a series of queries and visualizations to help you determine the security state of the Windows host or virtual machine that you are investigating.

Contents


Notebook initialization

The next cell:

  • Checks for the correct Python version

  • Checks versions and optionally installs required packages

  • Imports the required packages into the notebook

  • Sets a number of configuration options.

This should complete without errors. If you encounter errors or warnings look at the following two notebooks:

If you are running in the Microsoft Sentinel Notebooks environment (Azure Notebooks or Azure ML) you can run live versions of these notebooks:

You may also need to do some additional configuration to successfully use functions such as Threat Intelligence service lookup and Geo IP lookup. There are more details about this in the ConfiguringNotebookEnvironment notebook and in these documents:

from pathlib import Path from IPython.display import display, HTML REQ_PYTHON_VER = "3.6" REQ_MSTICPY_VER = "1.0.0" REQ_MP_EXTRAS = ["ml"] display(HTML("<h3>Starting Notebook setup...</h3>")) # If not using Azure Notebooks, install msticpy with # %pip install msticpy from msticpy.nbtools import nbinit nbinit.init_notebook( namespace=globals(), extra_imports=["ipwhois, IPWhois"] );

Get WorkspaceId and Authenticate to Microsoft Sentinel

Details... If you are using user/device authentication, run the following cell. - Click the 'Copy code to clipboard and authenticate' button. - This will pop up an Azure Active Directory authentication dialog (in a new tab or browser window). The device code will have been copied to the clipboard. - Select the text box and paste (Ctrl-V/Cmd-V) the copied value. - You should then be redirected to a user authentication page where you should authenticate with a user account that has permission to query your Log Analytics workspace.

Use the following syntax if you are authenticating using an Azure Active Directory AppId and Secret:

%kql loganalytics://tenant(aad_tenant).workspace(WORKSPACE_ID).clientid(client_id).clientsecret(client_secret)

instead of

%kql loganalytics://code().workspace(WORKSPACE_ID)

Note: you may occasionally see a JavaScript error displayed at the end of the authentication - you can safely ignore this.

On successful authentication you should see a popup schema button. To find your Workspace Id go to Log Analytics. Look at the workspace properties to find the ID.

# See if we have a Microsoft Sentinel Workspace defined in our config file. # If not, let the user specify Workspace and Tenant IDs ws_config = WorkspaceConfig() if not ws_config.config_loaded: ws_config.prompt_for_ws() qry_prov = QueryProvider(data_environment="AzureSentinel")
# Authenticate to Microsoft Sentinel workspace qry_prov.connect(ws_config) table_index = qry_prov.schema_tables

Authentication and Configuration Problems


Click for details about configuring your authentication parameters

The notebook is expecting your Microsoft Sentinel Tenant ID and Workspace ID to be configured in one of the following places:

  • config.json in the current folder

  • msticpyconfig.yaml in the current folder or location specified by MSTICPYCONFIG environment variable.

For help with setting up your config.json file (if this hasn't been done automatically) see the ConfiguringNotebookEnvironment notebook in the root folder of your Azure-Sentinel-Notebooks project. This shows you how to obtain your Workspace and Subscription IDs from the Microsoft Sentinel Portal. You can use the SubscriptionID to find your Tenant ID). To view the current config.json run the following in a code cell.

%pfile config.json

For help with setting up your msticpyconfig.yaml see the Setup section at the end of this notebook and the ConfigureNotebookEnvironment notebook

Contents

Search for a Host name and query host properties

host_text = widgets.Text( description="Enter the Host name to search for:", **WIDGET_DEFAULTS ) display(host_text)
query_times = nbwidgets.QueryTime(units="day", max_before=20, before=5, max_after=1) query_times.display()
# Get single event - try process creation if "SecurityEvent" not in table_index: raise ValueError("No Windows event log data available in the workspace") host_name = None matching_hosts_df = qry_prov.WindowsSecurity.list_host_processes( query_times, host_name=host_text.value.strip(), add_query_items="| distinct Computer" ) if len(matching_hosts_df) > 1: print(f"Multiple matches for '{host_text.value}'. Please select a host from the list.") choose_host = nbwidgets.SelectItem( item_list=list(matching_hosts_df["Computer"].values), description="Select the host.", auto_display=True, ) elif not matching_hosts_df.empty: host_name = matching_hosts_df["Computer"].iloc[0] print(f"Unique host found: {host_name}") else: print(f"Host not found: {host_text.value}")
if not host_name: host_name = choose_host.value host_entity = None if not matching_hosts_df.empty: host_entity = entities.Host(src_event=matching_hosts_df[matching_hosts_df["Computer"] == host_name].iloc[0]) if not host_entity: raise LookupError(f"Could not find Windows events the name {host_name}") def populate_heartbeat_details(host_hb_df, host_entity=None): if not host_hb_df.empty: host_hb = host_hb_df.iloc[0] if not host_entity: host_entity = entities.Host(host_hb["Computer"]) host_entity.SourceComputerId = host_hb["SourceComputerId"] host_entity.OSType = host_hb["OSType"] host_entity.OSMajorVersion = host_hb["OSMajorVersion"] host_entity.OSMinorVersion = host_hb["OSMinorVersion"] host_entity.ComputerEnvironment = host_hb["ComputerEnvironment"] host_entity.ResourceId = host_hb["ResourceId"] host_entity.OmsSolutions = [ sol.strip() for sol in host_hb["Solutions"].split(",") ] host_entity.VMUUID = host_hb["VMUUID"] ip_entity = entities.IpAddress() ip_entity.Address = host_hb["ComputerIP"] geoloc_entity = entities.GeoLocation() geoloc_entity.CountryName = host_hb["RemoteIPCountry"] geoloc_entity.Longitude = host_hb["RemoteIPLongitude"] geoloc_entity.Latitude = host_hb["RemoteIPLatitude"] ip_entity.Location = geoloc_entity host_entity.IPAddress = ip_entity # TODO change to graph edge return host_entity def convert_to_ip_entities(ip_str): ip_entities = [] if ip_str: if "," in ip_str: addrs = ip_str.split(",") elif " " in ip_str: addrs = ip_str.split(" ") else: addrs = [ip_str] for addr in addrs: ip_entity = entities.IpAddress() ip_entity.Address = addr.strip() if iplocation: iplocation.lookup_ip(ip_entity=ip_entity) ip_entities.append(ip_entity) return ip_entities # Add this information to our inv_host_entity def populate_host_aznet_ips(az_net_df, host_entity): retrieved_address = [] if len(az_net_df) == 1: host_entity.private_ips = convert_to_ip_entities( az_net_df["PrivateIPAddresses"].iloc[0] ) host_entity.public_ips = convert_to_ip_entities( az_net_df["PublicIPAddresses"].iloc[0] ) retrieved_address = [ip.Address for ip in host_entity.public_ips] else: if "private_ips" not in host_entity: host_entity.private_ips = [] if "public_ips" not in host_entity: host_entity.public_ips = [] iplocation = None try: iplocation = GeoLiteLookup() except: md( "Warning - could not create GeoLiteLookup" + " - location details will not be retrieved", "red, bold" ) # Try to get an OMS Heartbeat for this computer if "Heartbeat" in table_index: print(f"Looking for {host_name} in OMS Heartbeat data...") host_hb_df = qry_prov.Network.get_heartbeat_for_host(host_name=host_name) host_entity = populate_heartbeat_details(host_hb_df, host_entity) if "AzureNetworkAnalytics_CL" in table_index: print(f"Looking for {host_name} IP addresses in network flows...") az_net_df = qry_prov.Network.get_ips_for_host(host_name=host_name) populate_host_aznet_ips(az_net_df, host_entity) md("Host Details", "bold") display(host_entity)

Contents

Related Alerts

Look for any related alerts around this time.

ra_query_times = nbwidgets.QueryTime( units="day", origin_time=query_times.origin_time, max_before=28, max_after=5, before=5, auto_display=True, )
related_alerts = qry_prov.SecurityAlert.list_related_alerts( ra_query_times, host_name=host_entity.HostName ) def print_related_alerts(alertDict, entityType, entityName): if len(alertDict) > 0: md( f"<h3>Found {len(alertDict)} different alert types related to this {entityType} (`{entityName}`)</h3>" ) for (k, v) in alertDict.items(): print(f"- {k}, # Alerts: {v}") else: print(f"No alerts for {entityType} entity `{entityName}`") if isinstance(related_alerts, pd.DataFrame) and not related_alerts.empty: host_alert_items = ( related_alerts[["AlertName", "TimeGenerated"]] .groupby("AlertName") .TimeGenerated.agg("count") .to_dict() ) print_related_alerts(host_alert_items, "host", host_entity.HostName) if len(host_alert_items) > 1: nbdisplay.display_timeline( data=related_alerts, title="Alerts", source_columns=["AlertName"], height=200 ) elif len(host_alert_items) == 1: md("Only one alert found - view the details in the browser below.") else: md("No related alerts found.")

Select an Alert to view details

def disp_full_alert(alert): global related_alert related_alert = SecurityAlert(alert) return nbdisplay.format_alert(related_alert, show_entities=True) recenter_wgt = widgets.Checkbox( value=True, description='Center subsequent query times round selected Alert?', disabled=False, **WIDGET_DEFAULTS ) if related_alerts is not None and not related_alerts.empty: related_alerts["CompromisedEntity"] = related_alerts["Computer"] md("<h3>Click on alert to view details.</h3>") display(recenter_wgt) rel_alert_select = nbwidgets.SelectAlert( alerts=related_alerts, action=disp_full_alert, ) rel_alert_select.display()

Contents

Host Logons

This section looks at successful and failed logons on the host.

# set the origin time to the time of our alert origin_time = (related_alert.TimeGenerated if recenter_wgt.value else query_times.origin_time) logon_query_times = nbwidgets.QueryTime( units="day", origin_time=origin_time, before=5, after=1, max_before=20, max_after=20, ) logon_query_times.display()

Successful Logons - Timeline and LogonType breakdown

host_logons = qry_prov.WindowsSecurity.list_host_logons( logon_query_times, host_name=host_entity.HostName ) if host_logons is not None and not host_logons.empty: md("<h3>Logon timeline.</h3>") tooltip_cols = [ "TargetUserName", "TargetDomainName", "SubjectUserName", "SubjectDomainName", "LogonType", "IpAddress", ] nbdisplay.display_timeline( data=host_logons, group_by="TargetUserName", source_columns=tooltip_cols, legend="right", yaxis=True ) md("<h3>Counts of logon events by logon type.</h3>") md("Min counts for each logon type highlighted.") logon_by_type = ( host_logons[["Account", "LogonType", "EventID"]] .astype({'LogonType': 'int32'}) .merge(right=pd.Series(data=nbdisplay._WIN_LOGON_TYPE_MAP, name="LogonTypeDesc"), left_on="LogonType", right_index=True) .drop(columns="LogonType") .groupby(["Account", "LogonTypeDesc"]) .count() .unstack() .rename(columns={"EventID": "LogonCount"}) .fillna(0) .style .background_gradient(cmap="viridis", low=0.5, high=0) .format("{0:0>3.0f}") ) display(logon_by_type) else: md("No logon events found for host.")

Contents

Failed Logons

failedLogons = qry_prov.WindowsSecurity.list_host_logon_failures( logon_query_times, host_name=host_entity.HostName ) if failedLogons.empty: print("No logon failures recorded for this host between ", f" {logon_query_times.start} and {logon_query_times.end}" ) else: nbdisplay.display_timeline( data=host_logons.query('TargetLogonId != "0x3e7"'), overlay_data=failedLogons, alert=related_alert, title="Logons (blue=user-success, green=failed)", source_columns=tooltip_cols, height=200, ) display( failedLogons .astype({'LogonType': 'int32'}) .merge( right=pd.Series(data=nbdisplay._WIN_LOGON_TYPE_MAP, name="LogonTypeDesc"), left_on="LogonType", right_index=True ) [['Account', 'EventID', 'TimeGenerated', 'Computer', 'SubjectUserName', 'SubjectDomainName', 'TargetUserName', 'TargetDomainName', 'LogonTypeDesc','IpAddress', 'WorkstationName' ]] )

Accounts With Failed And Successful Logons

This query joins failed and successful logons for the same account name. Multiple logon failures followed by a sucessful logon might indicate attempts to guess or probe the user password.

if not failedLogons.empty: combined = pd.concat([failedLogons, host_logons[host_logons["TargetUserName"] .isin(failedLogons["TargetUserName"] .drop_duplicates())]]) display(combined.head()) combined["LogonStatus"] = combined.apply(lambda x: "Failed" if x.EventID == 4625 else "Success", axis=1) nbdisplay.display_timeline(data=combined, group_by="LogonStatus", source_columns=["TargetUserName", "LogonType", "SubjectUserName", "TargetLogonId"], legend="inline", yaxis=True, height=200) display(combined.sort_values("TimeGenerated")) else: md(f"No logon failures recorded for this host between {logon_query_times.start} and {logon_query_times.end}")

Contents

Other Security Events

It's often useful to look at what other events were being logged at the time of the attack.

We show events here grouped by Account. Things to look for are:

  • Unexpected events that change system security such as the addition of accounts or services

  • Event types that occur for only a single account - especially if there are a lot of event types only executed by a single account.

md(f"Collecting Windows Event Logs for {host_entity.HostName}, this may take a few minutes...") all_events_df = qry_prov.WindowsSecurity.list_host_events( logon_query_times, host_name=host_entity.HostName, add_query_items="| where EventID != 4688 and EventID != 4624", split_query_by="1D" ) if not isinstance(all_events_df, pd.DataFrame): raise ValueError("Event query failed. Please retry with a smaller time range.") # Create a pivot of Event vs. Account win_events_acc = all_events_df[["Account", "Activity", "TimeGenerated"]].copy() win_events_acc = win_events_acc.replace("-\\-", "No Account").replace( {"Account": ""}, value="No Account" ) win_events_acc["Account"] = win_events_acc.apply(lambda x: x.Account.split("\\")[-1], axis=1) event_pivot = ( pd.pivot_table( win_events_acc, values="TimeGenerated", index=["Activity"], columns=["Account"], aggfunc="count", ) .fillna(0) .reset_index() ) md("Yellow highlights indicate account with highest event count") with pd.option_context('display.precision', 0): display( event_pivot.style .applymap(lambda x: "color: white" if x == 0 else "") .applymap( lambda x: "background-color: lightblue" if not isinstance(x, str) and x > 0 else "" ) .set_properties(subset=["Activity"], **{"width": "400px", "text-align": "left"}) .highlight_max(axis=1) .hide_index() )

Parse Event Data for Selected Events

For events that you want to look at in more detail you can parse out the full EventData field (containing all fields of the original event). The parse_event_data function below does that - transforming the EventData XML into a dictionary of property/value pairs). The expand_event_properties function takes this dictionary and transforms into columns in the output DataFrame.


 More details... You can do this for multiple event types in a single pass but, dependng on the schema of each event you may end up with a lot of sparsely populated columns. E.g. suppose EventID 1 has EventData fields A, B and C and EventID 2 has fields A, D, E. If you parse both IDs you'll will end up with a DataFrame with columns A, B, C, D and E with contents populated only for the rows that with corresponding data.

We recommend that you process batches of related event types (e.g. all user account change events) that have similar sets of fields to keep the output DataFrame manageable.

# Function to convert EventData XML into dictionary and # populate columns into DataFrame from previous query result import xml.etree.ElementTree as ET from xml.etree.ElementTree import ParseError SCHEMA = "http://schemas.microsoft.com/win/2004/08/events/event" def parse_event_data(row): try: xdoc = ET.fromstring(row.EventData) col_dict = { elem.attrib["Name"]: elem.text for elem in xdoc.findall(f"{{{SCHEMA}}}Data") } reassigned = set() for k, v in col_dict.items(): if k in row and not row[k]: row[k] = v reassigned.add(k) if reassigned: # print('Reassigned: ', ', '.join(reassigned)) for k in reassigned: col_dict.pop(k) return col_dict except (ParseError, TypeError): return None # Parse event properties into a dictionary all_events_df["EventProperties"] = all_events_df.apply(parse_event_data, axis=1) # For a specific event ID you can explode the EventProperties values into their own columns # using this function. You can do this for the whole data set but it will result # in a lot of sparse columns in the output data frame def expand_event_properties(input_df): exp_df = input_df.apply(lambda x: pd.Series(x.EventProperties), axis=1) return ( exp_df.drop(set(input_df.columns).intersection(exp_df.columns), axis=1) .merge( input_df.drop("EventProperties", axis=1), how="inner", left_index=True, right_index=True, ) .replace("", np.nan) # these 3 lines get rid of blank columns .dropna(axis=1, how="all") .fillna("") ) display( expand_event_properties(all_events_df[all_events_df["EventID"] == 4698]).head(2) )

Account Change Events - Timeline

Here we want to focus on a some specific subcategories of events. Attackers commonly try to add or change user accounts and group memberships. We also include events related to addition or change of scheduled tasks and Windows services.

# Get a full list of Windows Security Events import pkgutil import os w_evt = pkgutil.get_data("msticpy", f"resources{os.sep}WinSecurityEvent.json") win_event_df = pd.read_json(w_evt) # Create criteria for events that we're interested in acct_sel = win_event_df["subcategory"] == "User Account Management" group_sel = win_event_df["subcategory"] == "Security Group Management" schtask_sel = (win_event_df["subcategory"] == "Other Object Access Events") & ( win_event_df["description"].str.contains("scheduled task") ) event_list = win_event_df[acct_sel | group_sel | schtask_sel]["event_id"].to_list() # Add Service install event event_list.append(7045) # Plot events on a timeline p = nbdisplay.display_timeline( data=all_events_df[all_events_df["EventID"].isin(event_list)], group_by="Activity", source_columns=["Account"], legend="right", )

Show Details of Selected Events

From the above data - pick which event types you want to view (by default, all are selected). The second cell will display the event types selected.

# populate actual events IDs to select from recorded_events = (all_events_df['EventID'] [all_events_df["EventID"] .isin(event_list)].drop_duplicates().values) event_subset = win_event_df[win_event_df["event_id"].isin(event_list) & win_event_df["event_id"].isin(recorded_events)] items = list(event_subset.apply(lambda x: (x.full_desc, x.event_id), axis=1).values) ss = nbwidgets.SelectSubset( items, default_selected=items )
drop_cols = [ "TenantId", "EventProperties", "SourceSystem", "EventSourceName", "Channel", "EventID", "Task", "Level", "EventData", "EventOriginId", "MG", "TimeCollected", "ManagementGroupName", "Type", ] display( all_events_df[all_events_df["EventID"].isin(ss.selected_values)] .replace(to_replace="", value=np.NAN) .dropna(axis=1, how="all") .drop(columns=drop_cols) )

Contents

Examine Logon Sessions

Looking at characteristics and activity of individual logon sessions is an effective way of spottting clusters of attacker activity.

The biggest problem is deciding which logon sessions are the ones to look at. We may already have some indicators of sessions that we want to examine from earlier sections:

  • Accounts that experienced a series of failed logons followed by successful logons [see](#Accounts With Failed And Successful Logons)

  • Accounts that triggered unexpected events see

In this section we use clustering to collapse repetive logons and show details of the distinct logon patterns

Browse logon account details

from msticpy.sectools.eventcluster import ( dbcluster_events, add_process_features, char_ord_score, ) if host_logons is None or host_logons.empty: md("No host logons recorded. This section cannot be run.") raise ValueError("aborted") # Set up clustering features and run DBScan clustering logon_features = host_logons.copy() logon_features["AccountNum"] = host_logons.apply( lambda x: char_ord_score(x.Account), axis=1 ) logon_features["TargetUserNum"] = host_logons.apply( lambda x: char_ord_score(x.TargetUserName), axis=1 ) logon_features["LogonHour"] = host_logons.apply(lambda x: x.TimeGenerated.hour, axis=1) # you might need to play around with the max_cluster_distance parameter. # decreasing this gives more clusters. (clus_logons, _, _) = dbcluster_events( data=logon_features, time_column="TimeGenerated", cluster_columns=["AccountNum", "LogonType", "TargetUserNum"], max_cluster_distance=0.0001, ) md(f"Number of input events: {len(host_logons)}") md(f"Number of clustered events: {len(clus_logons)}") md("<h3>Relative frequencies by account pattern</h3>") plt.rcParams["figure.figsize"] = (12, 4) clus_logons.sort_values("Account").plot.barh(x="Account", y="ClusterSize");

View distinct host logon patterns

import re # Build a list of distinct logon patterns from the clustered data dist_logons = clus_logons.sort_values("TimeGenerated")[ ["TargetUserName", "TimeGenerated", "LastEventTime", "LogonType", "ClusterSize"] ] dist_logons = dist_logons.apply( lambda x: ( f"{x.TargetUserName}: " f"(logontype {x.LogonType}) " f"timerange: {x.TimeGenerated} - {x.LastEventTime} " f"count: {x.ClusterSize}" ), axis=1, ) # Convert to dict, flipping keys/values dist_logons = {v: k for k, v in dist_logons.to_dict().items()} def get_selected_logon_cluster(selected_item): acct = clus_logons.loc[selected_item]["TargetUserName"] logon_type = clus_logons.loc[selected_item]["LogonType"] return host_logons.query("TargetUserName == @acct and LogonType == @logon_type") def show_logon(idx): return nbdisplay.format_logon(pd.DataFrame(clus_logons.loc[idx]).T) logon_wgt = nbwidgets.SelectItem( description="Select logon cluster to examine", item_dict=dist_logons, action=show_logon, height="200px", width="100%", auto_display=True, )

Analyze Processes Patterns for logon sessions

In this section we look at the types of processes run in each logon session. For each process (and process characteristics such as command line structure) we measure its rarity compared to other processes on the same host. We then calculate the mean rarity of all processes in a logon session and display the results ordered by rarity. One is the highest possible score and would indicate all processes in the session have a unique execution pattern.

Note: The next section retrieves processes for time period around the logons for the user ID selected in the previous session. If you want to view a broader time boundary please adjust the query time boundaries in below.

# set the origin time to start at the first logon in our set # end end 2hrs after the last start_time = host_logons["TimeGenerated"].min() end_time = host_logons["TimeGenerated"].max() time_diff = int((end_time - start_time).total_seconds() / (60 * 60) + 2) proc_query_times = nbwidgets.QueryTime( units="hours", origin_time=start_time, before=1, after=time_diff + 1, max_before=20, max_after=time_diff + 20, ) proc_query_times.display()

Compute the relative rarity of processes in each session

This should be a good guide to which sessions are the more interesting to look at.

Note Clustering lots (1000s) of events will take a little time to compute.

from msticpy.sectools.eventcluster import dbcluster_events, add_process_features from collections import Counter print("Getting process events...", end="") processes_on_host = qry_prov.WindowsSecurity.list_host_processes( proc_query_times, host_name=host_entity.HostName ) print(f"done. {len(processes_on_host)} events") print("Clustering. Please wait...", end="") feature_procs = add_process_features(input_frame=processes_on_host, path_separator="\\") feature_procs["accountNum"] = feature_procs.apply( lambda x: char_ord_score(x.Account), axis=1 ) # you might need to play around with the max_cluster_distance parameter. # decreasing this gives more clusters. (clus_events, dbcluster, x_data) = dbcluster_events( data=feature_procs, cluster_columns=[ "commandlineTokensFull", "pathScore", "accountNum", "isSystemSession", ], max_cluster_distance=0.0001, ) print("done") print("Number of input events:", len(feature_procs)) print("Number of clustered events:", len(clus_events)) # Join the clustered results back to the original process frame procs_with_cluster = feature_procs.merge( clus_events[ [ "commandlineTokensFull", "accountNum", "pathScore", "isSystemSession", "ClusterSize", ] ], on=["commandlineTokensFull", "accountNum", "pathScore", "isSystemSession"], ) # Rarity = inverse of cluster size procs_with_cluster["Rarity"] = 1 / procs_with_cluster["ClusterSize"] # count the number of processes for each logon ID lgn_proc_count = ( pd.concat( [ processes_on_host.groupby("TargetLogonId")["TargetLogonId"].count(), processes_on_host.groupby("SubjectLogonId")["SubjectLogonId"].count(), ] ).sum(level=0) ).to_dict() # Display the results md("Sessions ordered by process rarity", 'bold') md("Higher score indicates higher number of unusual processes") process_rarity = (procs_with_cluster.groupby(["SubjectUserName", "SubjectLogonId"]) .agg({"Rarity": "mean", "TimeGenerated": "count"}) .rename(columns={"TimeGenerated": "ProcessCount"}) .reset_index()) display( process_rarity .sort_values("Rarity", ascending=False) .style.bar(subset=["Rarity"], color="#d65f5f") )

Overview of session timelines for sessions with higher rarity score

# Display process timeline for 75% percentile rarest scores rare_sess = process_rarity[process_rarity["Rarity"] > process_rarity["Rarity"].quantile(.25)] rare_sessions = processes_on_host[(processes_on_host["SubjectLogonId"] .isin(rare_sess["SubjectLogonId"])) & (processes_on_host["SubjectUserName"] .isin(rare_sess["SubjectUserName"]))] md("Timeline of sessions with higher process rarity", "large") md("Multiple sessions (y-axis) may be are shown for each account.") md("You will likely need to zoom in to see the individual session processes.") nbdisplay.display_timeline( data=rare_sessions, group_by="SubjectLogonId", source_columns=["SubjectUserName", "SubjectLogonId", "NewProcessName", "CommandLine"], legend="right", yaxis=True );

View the processes for these Sessions

def view_logon_sess(logon_id=""): global selected_logon selected_logon = host_logons[host_logons["TargetLogonId"] == logon_id] if all_procs.value: sess_procs = processes_on_host.query( "TargetLogonId == @logon_id | SubjectLogonId == @logon_id" ) else: sess_procs = procs_with_cluster.query("SubjectLogonId == @logon_id")[ ["NewProcessName", "CommandLine", "SubjectLogonId", "ClusterSize"] ].drop_duplicates() return sess_procs sessions = list(process_rarity .sort_values("Rarity", ascending=False) .apply(lambda x: (f"{x.SubjectLogonId} {x.SubjectUserName} Rarity={x.Rarity}", x.SubjectLogonId), axis=1)) all_procs = widgets.Checkbox( value=False, description="View All Processes (Show clustered only if not checked)", **WIDGET_DEFAULTS, ) display(all_procs) logon_wgt = nbwidgets.SelectItem( description="Select logon session to examine", item_dict={label: val for label, val in sessions}, height="300px", width="100%", auto_display=True, action=view_logon_sess, )

Browse All Sessions (Optional)

If the previous section did not reveal anything interesting you can opt to browse all logon sessions.

Otherwise, skip to the Check Commandline for IoCs section

To do this you need to first pick an account + logon type (in the following cell) then pick a particular session that you want to view in the subsequent cell. Use the rarity score from the previous graph to guide you.

Step 1 - Select a logon ID and Type

logon_wgt2 = nbwidgets.SelectItem( description="Select logon cluster to examine", item_dict=dist_logons, height="200px", width="100%", auto_display=True, ) all_procs = widgets.Checkbox( value=False, description="View All Processes (Clustered only if not checked)", **WIDGET_DEFAULTS, ) display(all_procs)

Step 2 - Pick a logon session to view its processes

selected_logon_cluster = get_selected_logon_cluster(logon_wgt2.value) selected_tgt_logon = selected_logon_cluster["TargetUserName"].iat[0] system_logon = selected_tgt_logon.lower() == "system" or selected_tgt_logon.endswith( "$" ) if system_logon: md( '<h3><p style="color:red">Warning: the selected ' "account name appears to be a system account.</p></h1><br>" "<i>It is difficult to accurately associate processes " "with the specific logon sessions.<br>" "Showing clustered events for entire time selection." ) display( clus_events.sort_values("TimeGenerated")[ [ "TimeGenerated", "LastEventTime", "NewProcessName", "CommandLine", "ClusterSize", "commandlineTokensFull", "pathScore", "isSystemSession", ] ] ) # Display a pick list for logon instances sel_1 = host_logons["TargetLogonId"].isin(lgn_proc_count) sel_2 = host_logons["TargetUserName"] == selected_tgt_logon items = ( host_logons[sel_1 & sel_2] .sort_values("TimeGenerated") .apply( lambda x: ( f"{x.TargetUserName}: " f"(logontype={x.LogonType}) " f"(timestamp={x.TimeGenerated}) " f"logonid={x.TargetLogonId}" ), axis=1, ) .values.tolist() ) if not items: items = ["No processes for logon"] sess_w = widgets.Select( options=items, description="Select logon instance to examine", **WIDGET_DEFAULTS ) import re logon_list_regex = r""" (?P<acct>[^:]+):\s+ \(logontype=(?P<logon_type>[^)]+)\)\s+ \(timestamp=(?P<time>[^)]+)\)\s+ logonid=(?P<logonid>[0-9a-fx)]+) """ def get_selected_logon(selected_item): acct_match = re.search(logon_list_regex, selected_item, re.VERBOSE) if acct_match: acct = acct_match["acct"] logon_type = int(acct_match["logon_type"]) time_stamp = pd.to_datetime(acct_match["time"]) logon_id = acct_match["logonid"] return host_logons.query( "TargetUserName == @acct and LogonType == @logon_type" " and TargetLogonId == @logon_id" ) def view_logon_sess(x=""): global selected_logon selected_logon = get_selected_logon(x) if selected_logon is not None: logonId = selected_logon["TargetLogonId"].iloc[0] if all_procs.value: sess_procs = processes_on_host.query( "TargetLogonId == @logonId | SubjectLogonId == @logonId" ) else: sess_procs = procs_with_cluster.query("SubjectLogonId == @logonId")[ ["NewProcessName", "CommandLine", "SubjectLogonId", "ClusterSize"] ].drop_duplicates() display(sess_procs) widgets.interactive(view_logon_sess, x=sess_w)

Contents

Check for IOCs in Commandline for selected session

This section looks for Indicators of Compromise (IoC) within the data sets passed to it.

The input data for this comes from the session you picked in the following sections:

To change which section is used - go back to the desired section and pick a session or re-run the entire cell.

Extract IoCs

# Use this to search all process commandlines # ioc_df = ioc_extractor.extract( # data=procs_with_cluster, # columns=["CommandLine"], # os_family=os_family, # ioc_types=["ipv4", "ipv6", "dns", "url", "md5_hash", "sha1_hash", "sha256_hash"], # ) # ioc_df[["IoCType", "Observable"]].drop_duplicates()
selected_tgt_logon = selected_logon["TargetUserName"].iat[0] system_logon = (selected_tgt_logon.lower() == "system" or selected_tgt_logon.endswith("$")) if not system_logon: logonId = selected_logon["TargetLogonId"].iloc[0] sess_procs = processes_on_host.query( "TargetLogonId == @logonId | SubjectLogonId == @logonId" ) else: sess_procs = clus_events ioc_extractor = IoCExtract() os_family = host_entity.OSType if host_entity.OSType else "Windows" ioc_df = ioc_extractor.extract( data=sess_procs, columns=["CommandLine"], ioc_types=["ipv4", "ipv6", "dns", "url", "md5_hash", "sha1_hash", "sha256_hash"], ) if len(ioc_df): md("<h3>IoC patterns found in process set.</h3>") display(ioc_df[["IoCType", "Observable"]].drop_duplicates()) else: md("<h3>No IoC patterns found in process tree.</h3>")

If any Base64 encoded strings, decode and search for IoCs in the results.

 Details... This section looks for base64 encoded strings within the data - this is a common way of hiding attacker intent. It attempts to decode any strings that look like base64. Additionally, if the base64 decode operation returns any items that look like a base64 encoded string or file, a gzipped binary sequence, a zipped or tar archive, it will attempt to extract the contents before searching for potentially interesting IoC observables within the decoded data.

For simple strings the Base64 decoded output is straightforward. However for nested encodings this can get a little complex and difficult to represent in a tabular format.

Columns

  • reference - The index of the row item in dotted notation in depth.seq pairs (e.g. 1.2.2.3 would be the 3 item at depth 3 that is a child of the 2nd item found at depth 1). This may not always be an accurate notation - it is mainly use to allow you to associate an individual row with the reference value contained in the full_decoded_string column of the topmost item).

  • original_string - the original string before decoding.

  • file_name - filename, if any (only if this is an item in zip or tar file).

  • file_type - a guess at the file type (this is currently elementary and only includes a few file types).

  • input_bytes - the decoded bytes as a Python bytes string.

  • decoded_string - the decoded string if it can be decoded as a UTF-8 or UTF-16 string. Note: binary sequences may often successfully decode as UTF-16 strings but, in these cases, the decodings are meaningless.

  • encoding_type - encoding type (UTF-8 or UTF-16) if a decoding was possible, otherwise 'binary'.

  • file_hashes - collection of file hashes for any decoded item.

  • md5 - md5 hash as a separate column.

  • sha1 - sha1 hash as a separate column.

  • sha256 - sha256 hash as a separate column.

  • printable_bytes - printable version of input_bytes as a string of \xNN values

  • src_index - the index of the row in the input dataframe from which the data came.

  • full_decoded_string - the full decoded string with any decoded replacements. This is only really useful for top-level items, since nested items will only show the 'full' string representing the child fragment.

dec_df = base64.unpack_items(data=sess_procs, column="CommandLine") if dec_df is not None and len(dec_df) > 0: display(HTML("<h3>Decoded base 64 command lines</h3>")) display(HTML("Decoded values and hashes of decoded values shown below.")) display( HTML( "Warning - some binary patterns may be decodable as unicode strings. " 'In these cases you should ignore the "decoded_string" column ' 'and treat the encoded item as a binary - using the "printable_bytes" ' "column or treat the decoded_string as a binary (bytes) value." ) ) display( dec_df[ [ "full_decoded_string", "decoded_string", "original_string", "printable_bytes", "file_hashes", ] ] ) ioc_dec_df = ioc_extractor.extract(data=dec_df, columns=["full_decoded_string"]) if len(ioc_dec_df): display(HTML("<h3>IoC patterns found in events with base64 decoded data</h3>")) display(ioc_dec_df) ioc_df = ioc_df.append(ioc_dec_df, ignore_index=True) else: print("No base64 encodings found.")

Contents

Threat Intel Lookup

This section takes the output from the IoC Extraction from commandlines and submits it to Threat Intelligence services to see if any of the IoC are known threats.

Please take a moment to review the Selected list below and remove any items that are obviously not items that you want to lookup (e.g. myadmintool.ps is almost certainly a PowerShell script but is also a valid match for a legal DNS domain).

If you have not used msticpy threat intelligence lookups before you will need to supply API keys for the TI Providers that you want to use. See the documentation on configuring Configuring TI Proiders

Then reload provider settings:

mylookup = TILookup() mylookup.reload_provider_settings()
ti_lookup = TILookup() if not ti_lookup.provider_status: md_warn("No configured TI providers were found - you will need to add some to msticpyconfig.yaml") md( "To configure TI providers please see the help on " "<a href='https://msticpy.readthedocs.io/en/latest/" "data_acquisition/TIProviders.html#configuration-file' " "target='_blank' rel='noopener noreferrer'>" "Configuring TI Providers</a>" ) items = list(ioc_df["Observable"].values) ioc_ss = None if items: ioc_ss = nbwidgets.SelectSubset( items, default_selected=items ) else: md_warn("No IoCs in input data.")
if ioc_ss is not None: iocs_to_check = ( ioc_df[ioc_df["Observable"] .isin(ioc_ss.selected_items)] [["IoCType", "Observable"]] .drop_duplicates() ) ti_lookup.lookup_iocs(data=iocs_to_check, obs_col="Observable", ioc_type_col="IoCType") else: md("No IoCs to lookup")

Contents

Network Check Communications with Other Hosts

ip_q_times = nbwidgets.QueryTime( label="Set time bounds for network queries", units="day", max_before=28, before=2, after=5, max_after=28, origin_time=proc_query_times.origin_time ) ip_q_times.display()

Query Flows by Host IP Addresses

if "AzureNetworkAnalytics_CL" not in table_index: md_warn("AzureNetworkAnalytics_CL table is not available.") md("No network flow data available. " + "Please skip the remainder of this section.", "blue, bold") az_net_comms_df = None else: all_host_ips = ( host_entity.private_ips + host_entity.public_ips + [host_entity.IPAddress] ) host_ips = [i.Address for i in all_host_ips] az_net_comms_df = qry_prov.Network.list_azure_network_flows_by_ip( ip_q_times, ip_address_list=host_ips ) if isinstance(az_net_comms_df, pd.DataFrame) and not az_net_comms_df.empty: az_net_comms_df['TotalAllowedFlows'] = az_net_comms_df['AllowedOutFlows'] + az_net_comms_df['AllowedInFlows'] nbdisplay.display_timeline( data=az_net_comms_df, group_by="L7Protocol", title="Network Flows by Protocol", time_column="FlowStartTime", source_columns=["FlowType", "AllExtIPs", "L7Protocol", "FlowDirection"], height=300, legend="right", yaxis=True ) nbdisplay.display_timeline( data=az_net_comms_df, group_by="FlowDirection", title="Network Flows by Direction", time_column="FlowStartTime", source_columns=["FlowType", "AllExtIPs", "L7Protocol", "FlowDirection"], height=300, legend="right", yaxis=True ) else: print("No network data for specified time range.")
if az_net_comms_df is not None and not az_net_comms_df.empty: flow_plot = nbdisplay.display_timeline_values( data=az_net_comms_df, group_by="L7Protocol", source_columns=["FlowType", "AllExtIPs", "L7Protocol", "FlowDirection", "TotalAllowedFlows"], time_column="FlowStartTime", title="Network flows by Layer 7 Protocol", y="TotalAllowedFlows", ref_event=related_alert, legend="right", height=500, kind=["vbar", "circle"] ); else: md("No network data for specified time range.")

Flow Summary

if az_net_comms_df is not None and not az_net_comms_df.empty: cols = [ "VMName", "VMIPAddress", "PublicIPs", "SrcIP", "DestIP", "L4Protocol", "L7Protocol", "DestPort", "FlowDirection", "AllExtIPs", "TotalAllowedFlows", ] flow_index = az_net_comms_df[cols].copy() def get_source_ip(row): if row.FlowDirection == "O": return row.VMIPAddress if row.VMIPAddress else row.SrcIP else: return row.AllExtIPs if row.AllExtIPs else row.DestIP def get_dest_ip(row): if row.FlowDirection == "O": return row.AllExtIPs if row.AllExtIPs else row.DestIP else: return row.VMIPAddress if row.VMIPAddress else row.SrcIP flow_index["source"] = flow_index.apply(get_source_ip, axis=1) flow_index["dest"] = flow_index.apply(get_dest_ip, axis=1) md(f"{len(flow_index)} events in flow index") # Uncomment to view flow_index results # with warnings.catch_warnings(): # warnings.simplefilter("ignore") # display( # flow_index[ # ["source", "dest", "L7Protocol", "FlowDirection", "TotalAllowedFlows"] # ] # .groupby(["source", "dest", "L7Protocol", "FlowDirection"]) # .sum() # .reset_index() # .style.bar(subset=["TotalAllowedFlows"], color="#d65f5f") # ) else: md_warn("No network data for specified time range.") md( "No AzureNetworkAnalytics data could be retrieved - " "the remaining sections of this notebook are not runnable." )
# WHOIS lookup function from ipaddress import ip_address from msticpy.sectools.ip_utils import get_whois_info # Add ASN informatio from Whois flows_df = ( flow_index[["source", "dest", "L7Protocol", "FlowDirection", "TotalAllowedFlows"]] .groupby(["source", "dest", "L7Protocol", "FlowDirection"]) .sum() .reset_index() ) num_ips = len(flows_df["source"].unique()) + len(flows_df["dest"].unique()) print(f"Performing WhoIs lookups for {num_ips} IPs ", end="") #flows_df = flows_df.assign(DestASN="", DestASNFull="", SourceASN="", SourceASNFull="") flows_df["DestASN"] = flows_df.apply(lambda x: get_whois_info(x.dest, True), axis=1) flows_df["SourceASN"] = flows_df.apply(lambda x: get_whois_info(x.source, True), axis=1) print("done") # Split the tuple returned by get_whois_info into separate columns flows_df["DestASNFull"] = flows_df.apply(lambda x: x.DestASN[1], axis=1) flows_df["DestASN"] = flows_df.apply(lambda x: x.DestASN[0], axis=1) flows_df["SourceASNFull"] = flows_df.apply(lambda x: x.SourceASN[1], axis=1) flows_df["SourceASN"] = flows_df.apply(lambda x: x.SourceASN[0], axis=1) our_host_asns = [get_whois_info(ip.Address)[0] for ip in host_entity.public_ips] md(f"Host {host_entity.HostName} ASNs:", "bold") md(str(our_host_asns)) flow_sum_df = flows_df.groupby(["DestASN", "SourceASN"]).agg( TotalAllowedFlows=pd.NamedAgg(column="TotalAllowedFlows", aggfunc="sum"), L7Protocols=pd.NamedAgg(column="L7Protocol", aggfunc=lambda x: x.unique().tolist()), source_ips=pd.NamedAgg(column="source", aggfunc=lambda x: x.unique().tolist()), dest_ips=pd.NamedAgg(column="dest", aggfunc=lambda x: x.unique().tolist()), ).reset_index() display(flow_sum_df)

Choose ASNs/IPs to Check for Threat Intel Reports

Choose from the list of Selected ASNs for the IPs you wish to check on. The Source list is been pre-populated with all ASNs found in the network flow summary.

As an example, we've populated the Selected list with the ASNs that have the lowest number of flows to and from the host. We also remove the ASN that matches the ASN of the host we are investigating.

Please edit this list, using flow summary data above as a guide and leaving only ASNs that you are suspicious about. Typicially these would be ones with relatively low TotalAllowedFlows and possibly with unusual L7Protocols.

If you have not used msticpy threat intelligence lookups before you will need to supply API keys for the TI Providers that you want to use. Please see the section on configuring Configuring TI Proiders

Then reload provider settings:

mylookup = TILookup() mylookup.reload_provider_settings()
all_asns = list(flow_sum_df["DestASN"].unique()) + list(flow_sum_df["SourceASN"].unique()) all_asns = set(all_asns) - set(["private address"]) # Select the ASNs in the 25th percentile (lowest number of flows) quant_25pc = flow_sum_df["TotalAllowedFlows"].quantile(q=[0.25]).iat[0] quant_25pc_df = flow_sum_df[flow_sum_df["TotalAllowedFlows"] <= quant_25pc] other_asns = list(quant_25pc_df["DestASN"].unique()) + list(quant_25pc_df["SourceASN"].unique()) other_asns = set(other_asns) - set(our_host_asns) md("Choose IPs from Selected ASNs to look up for Threat Intel.", "bold") sel_asn = nbwidgets.SelectSubset(source_items=all_asns, default_selected=other_asns)
from itertools import chain from msticpy.sectools.tiproviders.ti_provider_base import TISeverity def ti_check_ser_sev(severity, threshold): threshold = TISeverity.parse(threshold) return severity.apply(lambda x: TISeverity.parse(x) >= threshold) dest_ips = set(chain.from_iterable(flow_sum_df[flow_sum_df["DestASN"].isin(sel_asn.selected_items)]["dest_ips"])) src_ips = set(chain.from_iterable(flow_sum_df[flow_sum_df["SourceASN"].isin(sel_asn.selected_items)]["source_ips"])) selected_ips = dest_ips | src_ips md(f"{len(selected_ips)} unique IPs in selected ASNs") # Add the IoCType to save cost of inferring each item md("Looking up TI...") selected_ip_dict = {ip: "ipv4" for ip in selected_ips} ti_results = ti_lookup.lookup_iocs(data=selected_ip_dict) md(f"{len(ti_results)} results received.") ti_results_pos = ti_results[ti_check_ser_sev(ti_results["Severity"], 1)] print(f"{len(ti_results_pos)} positive results found.") if not ti_results_pos.empty: src_pos = flows_df.merge(ti_results_pos, left_on="source", right_on="Ioc") dest_pos = flows_df.merge(ti_results_pos, left_on="dest", right_on="Ioc") ti_ip_results = pd.concat([src_pos, dest_pos]) md_warn("Positive Threat Intel Results found for the following flows") md("Please examine these IP flows using the IP Explorer notebook.", "bold, large") display(ti_ip_results)

GeoIP Map of External IPs

def format_ip_entity(row, ip_col): ip_entity = entities.IpAddress(Address=row[ip_col]) iplocation.lookup_ip(ip_entity=ip_entity) ip_entity.AdditionalData["protocol"] = row.L7Protocol if "severity" in row: ip_entity.AdditionalData["threat severity"] = row["severity"] if "Details" in row: ip_entity.AdditionalData["threat details"] = row["Details"] return ip_entity # from msticpy.nbtools.foliummap import FoliumMap folium_map = FoliumMap(zoom_start=4) if az_net_comms_df is None or az_net_comms_df.empty: print("No network flow data available.") else: # Get the flow records for all flows not in the TI results selected_out = flows_df[flows_df["DestASN"].isin(sel_asn.selected_items)] selected_out = selected_out[~selected_out["dest"].isin(ti_ip_results["Ioc"])] if selected_out.empty: ips_out = [] else: ips_out = list(selected_out.apply(lambda x: format_ip_entity(x, "dest"), axis=1)) selected_in = flows_df[flows_df["SourceASN"].isin(sel_asn.selected_items)] selected_in = selected_in[~selected_in["source"].isin(ti_ip_results["Ioc"])] if selected_in.empty: ips_in = [] else: ips_in = list(selected_in.apply(lambda x: format_ip_entity(x, "source"), axis=1)) ips_threats = list(ti_ip_results.apply(lambda x: format_ip_entity(x, "Ioc"), axis=1)) display(HTML("<h3>External IP Addresses communicating with host</h3>")) display(HTML("Numbered circles indicate multiple items - click to expand")) display(HTML("Location markers: <br>Blue = outbound, Purple = inbound, Green = Host, Red = Threats")) icon_props = {"color": "green"} for ips in host_entity.public_ips: ips.AdditionalData["host"] = host_entity.HostName folium_map.add_ip_cluster(ip_entities=host_entity.public_ips, **icon_props) icon_props = {"color": "blue"} folium_map.add_ip_cluster(ip_entities=ips_out, **icon_props) icon_props = {"color": "purple"} folium_map.add_ip_cluster(ip_entities=ips_in, **icon_props) icon_props = {"color": "red"} folium_map.add_ip_cluster(ip_entities=ips_threats, **icon_props) folium_map.center_map() display(folium_map)

Contents

Appendices

Available DataFrames

print("List of current DataFrames in Notebook") print("-" * 50) current_vars = list(locals().keys()) for var_name in current_vars: if isinstance(locals()[var_name], pd.DataFrame) and not var_name.startswith("_"): print(var_name)

Saving Data to Excel

To save the contents of a pandas DataFrame to an Excel spreadsheet use the following syntax

writer = pd.ExcelWriter('myWorksheet.xlsx') my_data_frame.to_excel(writer,'Sheet1') writer.save()

Configuration

msticpyconfig.yaml configuration File

You can configure primary and secondary TI providers and any required parameters in the msticpyconfig.yaml file. This is read from the current directory or you can set an environment variable (MSTICPYCONFIG) pointing to its location.

To configure this file see the ConfigureNotebookEnvironment notebook