Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/tutorials-and-examples/deprecated-notebooks/Entity Explorer - Linux Host.ipynb
3253 views
Kernel: Python 3.8 - AzureML

Entity Explorer - Linux Host

Details...

Notebook Version: 1.1
Python Version: Python 3.6 (including Python 3.6 - AzureML)
Required Packages: kqlmagic, msticpy, pandas, pandas_bokeh, numpy, matplotlib, networkx, seaborn, datetime, ipywidgets, ipython, dnspython, ipwhois, folium, maxminddb_geolite2

Data Sources Required:

  • Log Analytics/Microsoft Sentinel - Syslog, Secuirty Alerts, Auditd, Azure Network Analytics.

  • (Optional) - AlienVault OTX (requires account and API key)

This Notebooks brings together a series of tools and techniques to enable threat hunting within the context of a singular Linux host. The notebook utilizes a range of data sources to achieve this but in order to support the widest possible range of scenarios this Notebook prioritizes using common Syslog data. If there is detailed auditd data available for a host you may wish to edit the Notebook to rely primarily on this dataset, as it currently stands auditd is used when available to provide insight not otherwise available via Syslog.

Hunting Hypothesis:

Our broad initial hunting hypothesis is that a particular Linux host in our environment has been compromised, we will need to hunt from a range of different positions to validate or disprove this hypothesis.


Notebook initialization

The next cell:

  • Checks for the correct Python version

  • Checks versions and optionally installs required packages

  • Imports the required packages into the notebook

  • Sets a number of configuration options.

This should complete without errors. If you encounter errors or warnings look at the following two notebooks:

If you are running in the Microsoft Sentinel Notebooks environment (Azure Notebooks or Azure ML) you can run live versions of these notebooks:

You may also need to do some additional configuration to successfully use functions such as Threat Intelligence service lookup and Geo IP lookup. There are more details about this in the ConfiguringNotebookEnvironment notebook and in these documents:

from pathlib import Path from IPython.display import display, HTML REQ_PYTHON_VER=(3, 6) REQ_MSTICPY_VER=(1, 0, 0) REQ_MP_EXTRAS = ["ml"] # If the installation fails try to manually install using # %pip install --upgrade msticpy from msticpy.nbtools import nbinit additional_packages = [ "oauthlib", "pyvis", "python-whois", "seaborn" ] nbinit.init_notebook( namespace=globals(), additional_packages=additional_packages, extra_imports=extra_imports, ); from bokeh.models import ColumnDataSource, FactorRange from bokeh.palettes import viridis from bokeh.plotting import show, Row, figure from bokeh.transform import factor_cmap, cumsum from dns import reversename, resolver from functools import lru_cache from ipaddress import ip_address from ipwhois import IPWhois from math import pi from msticpy.common.exceptions import MsticpyException from msticpy.nbtools import observationlist from msticpy.nbtools.foliummap import get_map_center from msticpy.sectools import auditdextract from msticpy.sectools.cmd_line import risky_cmd_line from msticpy.sectools.ip_utils import convert_to_ip_entities from msticpy.sectools.syslog_utils import create_host_record, cluster_syslog_logons_df, risky_sudo_sessions from pyvis.network import Network import datetime as dt import re

Get WorkspaceId and Authenticate to Log Analytics

Details... If you are using user/device authentication, run the following cell. - Click the 'Copy code to clipboard and authenticate' button. - This will pop up an Azure Active Directory authentication dialog (in a new tab or browser window). The device code will have been copied to the clipboard. - Select the text box and paste (Ctrl-V/Cmd-V) the copied value. - You should then be redirected to a user authentication page where you should authenticate with a user account that has permission to query your Log Analytics workspace.

Use the following syntax if you are authenticating using an Azure Active Directory AppId and Secret:

%kql loganalytics://tenant(aad_tenant).workspace(WORKSPACE_ID).clientid(client_id).clientsecret(client_secret)

instead of

%kql loganalytics://code().workspace(WORKSPACE_ID)

Note: you may occasionally see a JavaScript error displayed at the end of the authentication - you can safely ignore this.
On successful authentication you should see a popup schema button. To find your Workspace Id go to Log Analytics. Look at the workspace properties to find the ID.

# See if we have a Microsoft Sentinel Workspace defined in our config file. # If not, let the user specify Workspace and Tenant IDs ws_config = WorkspaceConfig() if not ws_config.config_loaded: ws_config.prompt_for_ws() qry_prov = QueryProvider(data_environment="AzureSentinel") print("done")
# Authenticate to Microsoft Sentinel workspace qry_prov.connect(ws_config)

Set Hunting Time Frame

To begin the hunt we need to et the time frame in which you wish to test your compromised host hunting hypothesis within. Use the widget below to select your start and end time for the hunt.

query_times = nbwidgets.QueryTime(units='day', max_before=14, max_after=1, before=1) query_times.display()

Select Host to Investigate

Select the host you want to test your hunting hypothesis against, only hosts with Syslog data within the time frame you specified are available. If the host you wish to select is not present try adjusting your time frame.

#Get a list of hosts with syslog data in our hunting timegframe to provide easy selection syslog_query = f"""Syslog | where TimeGenerated between (datetime({query_times.start}) .. datetime({query_times.end})) | summarize by Computer""" md("Collecting avaliable host details...") hosts_list = qry_prov._query_provider.query(query=syslog_query) if isinstance(hosts_list, pd.DataFrame) and not hosts_list.empty: hosts = hosts_list["Computer"].unique().tolist() host_text = nbwidgets.SelectItem(description='Select host to investigate: ', item_list=hosts, width='75%', auto_display=True) else: display(md("There are no hosts with syslog data in this time period to investigate"))

Host Summary

Below is a overview of the selected host based on available data sources.

hostname=host_text.value az_net_df = None # Collect data on the host all_syslog_query = f"Syslog | where TimeGenerated between (datetime({query_times.start}) .. datetime({query_times.end})) | where Computer =~ '{hostname}'""" all_syslog_data = qry_prov.exec_query(all_syslog_query) if isinstance(all_syslog_data, pd.DataFrame) and not all_syslog_data.empty: heartbeat_query = f"""Heartbeat | where TimeGenerated >= datetime({query_times.start}) | where TimeGenerated <= datetime({query_times.end})| where Computer == '{hostname}' | top 1 by TimeGenerated desc nulls last""" if "AzureNetworkAnalytics_CL" in qry_prov.schema: aznet_query = f"""AzureNetworkAnalytics_CL | where TimeGenerated >= datetime({query_times.start}) | where TimeGenerated <= datetime({query_times.end}) | where VirtualMachine_s has '{hostname}' | where ResourceType == 'NetworkInterface' | top 1 by TimeGenerated desc | project PrivateIPAddresses = PrivateIPAddresses_s, PublicIPAddresses = PublicIPAddresses_s""" print("Getting network data...") az_net_df = qry_prov.exec_query(query=aznet_query) print("Getting host data...") host_hb = qry_prov.exec_query(query=heartbeat_query) # Create host entity record, with Azure network data if any is avaliable if az_net_df is not None and isinstance(az_net_df, pd.DataFrame) and not az_net_df.empty: host_entity = create_host_record(syslog_df=all_syslog_data, heartbeat_df=host_hb, az_net_df=az_net_df) else: host_entity = create_host_record(syslog_df=all_syslog_data, heartbeat_df=host_hb) md( "<b>Host Details</b><br>" f"<b>Hostname</b>: {host_entity.computer}<br>" f"<b>OS</b>: {host_entity.OSType} {host_entity.OSName}<br>" f"<b>IP Address</b>: {host_entity.IPAddress.Address}<br>" f"<b>Location</b>: {host_entity.IPAddress.Location.CountryName}<br>" f"<b>Installed Applications</b>: {host_entity.Applications}<br>" ) else: md_warn("No Syslog data found, check hostname and timeframe.") md("The data query may be timing out, consider reducing the timeframe size.")

Host Alerts & Bookmarks

This section provides an overview of any security alerts or Hunting Bookmarks in Microsoft Sentinel related to this host, this will help scope and guide our hunt.

related_alerts = qry_prov.SecurityAlert.list_related_alerts( query_times, host_name=hostname) realted_bookmarks = qry_prov.AzureSentinel.list_bookmarks_for_entity(query_times, entity_id=hostname) if isinstance(related_alerts, pd.DataFrame) and not related_alerts.empty: host_alert_items = (related_alerts[['AlertName', 'TimeGenerated']] .groupby('AlertName').TimeGenerated.agg('count').to_dict()) def print_related_alerts(alertDict, entityType, entityName): if len(alertDict) > 0: md(f"Found {len(alertDict)} different alert types related to this {entityType} (\'{entityName}\')") for (k, v) in alertDict.items(): md(f"- {k}, Count of alerts: {v}") else: md(f"No alerts for {entityType} entity \'{entityName}\'") print_related_alerts(host_alert_items, 'host', host_entity.HostName) nbdisplay.display_timeline( data=related_alerts, source_columns=["AlertName"], title="Host alerts over time", height=300, color="red") else: md('No related alerts found.') if isinstance(realted_bookmarks, pd.DataFrame) and not realted_bookmarks.empty: nbdisplay.display_timeline(data=realted_bookmarks, source_columns=["BookmarkName"], height=200, color="orange", title="Host bookmarks over time",) else: md('No related bookmarks found.')
rel_alert_select = None def show_full_alert(selected_alert): global security_alert, alert_ip_entities security_alert = SecurityAlert( rel_alert_select.selected_alert) nbdisplay.display_alert(security_alert, show_entities=True) # Show selected alert when selected if isinstance(related_alerts, pd.DataFrame) and not related_alerts.empty: related_alerts['CompromisedEntity'] = related_alerts['Computer'] md('### Click on alert to view details.') rel_alert_select = nbwidgets.SelectAlert(alerts=related_alerts, action=show_full_alert) rel_alert_select.display() else: md('No related alerts found.')

Re-scope Hunting Time Frame

Based on the security alerts for this host we can choose to re-scope our hunting time frame.

if rel_alert_select is None or rel_alert_select.selected_alert is None: start = query_times.start else: start = rel_alert_select.selected_alert['TimeGenerated'] # Set new investigation time windows based on the selected alert invest_times = nbwidgets.QueryTime( units='day', max_before=24, max_after=12, before=1, after=1, origin_time=start) invest_times.display()

How to use this Notebook

Whilst this notebook is linear in layout it doesn't need to be linear in usage. We have selected our host to investigate and set an initial hunting time-frame to work within. We can now start to test more specific hunting hypothesis with the aim of validating our broader initial hunting hypothesis. To do this we can start by looking at:

You can choose to start below with a hunt in host logon events or choose to jump to one of the other sections listed above. The order in which you choose to run each of these major sections doesn't matter, they are each self contained. You may also choose to rerun sections based on your findings from running other sections.

This notebook uses external threat intelligence sources to enrich data. The next cell loads the TILookup class.

Note: to use TILookup you will need configuration settings in your msticpyconfig.yaml
see TIProviders documenation
and Configuring Notebook Environment notebook
or ConfiguringNotebookEnvironment (GitHub static view)

tilookup = TILookup() md("Threat intelligence provider loading complete.")

Host Logon Events

Hypothesis: That an attacker has gained legitimate access to the host via compromised credentials and has logged into the host to conduct malicious activity.

This section provides an overview of logon activity for the host within our hunting time frame, the purpose of this is to allow for the identification of anomalous logons or attempted logons.

# Collect logon events for this, seperate them into sucessful and unsucessful and cluster sucessful one into sessions logon_events = qry_prov.LinuxSyslog.user_logon(start=invest_times.start, end=invest_times.end, host_name=hostname) remote_logons = None failed_logons = None if isinstance(logon_events, pd.DataFrame) and not logon_events.empty: remote_logons = (logon_events[logon_events['LogonResult'] == 'Success']) failed_logons = (logon_events[logon_events['LogonResult'] == 'Failure']) else: print("No logon events in this timeframe") if (isinstance(remote_logons, pd.DataFrame) and not remote_logons.empty) or (isinstance(failed_logons, pd.DataFrame) and not failed_logons.empty): #Provide a timeline of sucessful and failed logon attempts to aid identification of potential brute force attacks display(Markdown('### Timeline of sucessful host logons.')) tooltip_cols = ['User', 'ProcessName', 'SourceIP'] if rel_alert_select is not None: logon_timeline = nbdisplay.display_timeline(data=remote_logons, overlay_data=failed_logons, source_columns=tooltip_cols, height=200, overlay_color="red", alert = rel_alert_select.selected_alert) else: logon_timeline = nbdisplay.display_timeline(data=remote_logons, overlay_data=failed_logons, source_columns=tooltip_cols, height=200, overlay_color="red") display(Markdown('<b>Key:</b><p style="color:darkblue">Sucessful logons </p><p style="color:Red">Failed Logon Attempts (via su)</p>')) all_df = pd.DataFrame(dict(successful= remote_logons['ProcessName'].value_counts(), failed = failed_logons['ProcessName'].value_counts())).fillna(0) fail_data = pd.value_counts(failed_logons['User'].values, sort=True).head(10).reset_index(name='value').rename(columns={'User':'Count'}) fail_data['angle'] = fail_data['value']/fail_data['value'].sum() * 2*pi fail_data['color'] = viridis(len(fail_data)) fp = figure(plot_height=350, plot_width=450, title="Relative Frequencies of Failed Logons by Account", toolbar_location=None, tools="hover", tooltips="@index: @value") fp.wedge(x=0, y=1, radius=0.5, start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'), line_color="white", fill_color='color', legend='index', source=fail_data) sucess_data = pd.value_counts(remote_logons['User'].values, sort=False).reset_index(name='value').rename(columns={'User':'Count'}) sucess_data['angle'] = sucess_data['value']/sucess_data['value'].sum() * 2*pi sucess_data['color'] = viridis(len(sucess_data)) sp = figure(plot_height=350, width=450, title="Relative Frequencies of Sucessful Logons by Account", toolbar_location=None, tools="hover", tooltips="@index: @value") sp.wedge(x=0, y=1, radius=0.5, start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'), line_color="white", fill_color='color', legend='index', source=sucess_data) fp.axis.axis_label=None fp.axis.visible=False fp.grid.grid_line_color = None sp.axis.axis_label=None sp.axis.visible=False sp.grid.grid_line_color = None processes = all_df.index.values.tolist() results = all_df.columns.values.tolist() fail_sucess_data = {'processes' :processes, 'sucess' : all_df['successful'].values.tolist(), 'failure': all_df['failed'].values.tolist()} palette = viridis(2) x = [ (process, result) for process in processes for result in results ] counts = sum(zip(fail_sucess_data['sucess'], fail_sucess_data['failure']), ()) source = ColumnDataSource(data=dict(x=x, counts=counts)) b = figure(x_range=FactorRange(*x), plot_height=350, plot_width=450, title="Failed and Sucessful logon attempts by process", toolbar_location=None, tools="", y_minor_ticks=2) b.vbar(x='x', top='counts', width=0.9, source=source, line_color="white", fill_color=factor_cmap('x', palette=palette, factors=results, start=1, end=2)) b.y_range.start = 0 b.x_range.range_padding = 0.1 b.xaxis.major_label_orientation = 1 b.xgrid.grid_line_color = None show(Row(sp,fp,b)) ip_list = [convert_to_ip_entities(i, ip_col="SourceIP")[0] for i in remote_logons['SourceIP'].unique() if i != ""] ip_fail_list = [convert_to_ip_entities(i)[0] for i in failed_logons['SourceIP'].unique() if i != ""] location = get_map_center(ip_list + ip_fail_list) folium_map = FoliumMap(location = location, zoom_start=1.4) #Map logon locations to allow for identification of anomolous locations if len(ip_fail_list) > 0: md('<h3>Map of Originating Location of Logon Attempts</h3>') icon_props = {'color': 'red'} folium_map.add_ip_cluster(ip_entities=ip_fail_list, **icon_props) if len(ip_list) > 0: icon_props = {'color': 'green'} folium_map.add_ip_cluster(ip_entities=ip_list, **icon_props) display(folium_map.folium_map) md('<p style="color:red">Warning: the folium mapping library ' 'does not display correctly in some browsers.</p><br>' 'If you see a blank image please retry with a different browser.')

Logon Sessions

Based on the detail above if you wish to focus your hunt on a particular user jump to the User Activity section. Alternatively to further further refine our hunt we need to select a logon session to view in more detail. Select a session from the list below to continue. Sessions that occurred at the time an alert was raised for this host, or where the user has a abnormal ratio of failed to successful login attempts are highlighted.

logon_sessions_df = None try: print("Clustering logon sessions...") logon_sessions_df = cluster_syslog_logons_df(logon_events) except Exception as err: print(f"Error clustering logons: {err}") if logon_sessions_df is not None: logon_sessions_df["Alerts during session?"] = np.nan # check if any alerts occur during logon window. logon_sessions_df['Start (UTC)'] = [(time - dt.timedelta(seconds=5)) for time in logon_sessions_df['Start']] logon_sessions_df['End (UTC)'] = [(time + dt.timedelta(seconds=5)) for time in logon_sessions_df['End']] for TimeGenerated in related_alerts['TimeGenerated']: logon_sessions_df.loc[(TimeGenerated >= logon_sessions_df['Start (UTC)']) & (TimeGenerated <= logon_sessions_df['End (UTC)']), "Alerts during session?"] = "Yes" logon_sessions_df.loc[logon_sessions_df['User'] == 'root', "Root?"] = "Yes" logon_sessions_df.replace(np.nan, "No", inplace=True) ratios = [] for _, row in logon_sessions_df.iterrows(): suc_fail = logon_events.apply(lambda x: True if x['User'] == row['User'] and x["LogonResult"] == 'Success' else( False if x['User'] == row['User'] and x["LogonResult"] == 'Failure' else None), axis=1) numofsucess = len(suc_fail[suc_fail == True].index) numoffail = len(suc_fail[suc_fail == False].index) if numoffail == 0: ratio = 1 else: ratio = numofsucess/numoffail ratios.append(ratio) logon_sessions_df["Sucessful to failed logon ratio"] = ratios def color_cells(val): if isinstance(val, str): color = 'yellow' if val == "Yes" else 'white' elif isinstance(val, float): color = 'yellow' if val > 0.5 else 'white' else: color = 'white' return 'background-color: %s' % color display(logon_sessions_df[['User','Start (UTC)', 'End (UTC)', 'Alerts during session?', 'Sucessful to failed logon ratio', 'Root?']] .style.applymap(color_cells).hide_index()) logon_items = ( logon_sessions_df[['User','Start (UTC)', 'End (UTC)']] .to_string(header=False, index=False, index_names=False) .split('\n') ) logon_sessions_df["Key"] = logon_items logon_sessions_df.set_index('Key', inplace=True) logon_dict = logon_sessions_df[['User','Start (UTC)', 'End (UTC)']].to_dict('index') logon_selection = nbwidgets.SelectItem(description='Select logon session to investigate: ', item_dict=logon_dict , width='80%', auto_display=True) else: md("No logon sessions during this timeframe")

Session Details

def view_syslog(selected_facility): return [syslog_events.query('Facility == @selected_facility')] # Produce a summary of user modification actions taken if "Add" in x: return len(add_events.replace("", np.nan).dropna(subset=['User'])['User'].unique().tolist()) elif "Modify" in x: return len(mod_events.replace("", np.nan).dropna(subset=['User'])['User'].unique().tolist()) elif "Delete" in x: return len(del_events.replace("", np.nan).dropna(subset=['User'])['User'].unique().tolist()) else: return "" crn_tl_data = {} user_tl_data = {} sudo_tl_data = {} sudo_sessions = None tooltip_cols = ['SyslogMessage'] if logon_sessions_df is not None: #Collect data based on the session selected for investigation invest_sess = {'StartTimeUtc': logon_selection.value.get('Start (UTC)'), 'EndTimeUtc': logon_selection.value.get( 'End (UTC)'), 'Account': logon_selection.value.get('User'), 'Host': hostname} session = entities.HostLogonSession(invest_sess) syslog_events = qry_prov.LinuxSyslog.all_syslog( start=session.StartTimeUtc, end=session.EndTimeUtc, host_name=session.Host) sudo_events = qry_prov.LinuxSyslog.sudo_activity( start=session.StartTimeUtc, end=session.EndTimeUtc, host_name=session.Host, user=session.Account) if isinstance(sudo_events, pd.DataFrame) and not sudo_events.empty: try: sudo_sessions = cluster_syslog_logons_df(logon_events=sudo_events) except MsticpyException: pass # Display summary of cron activity in session cron_events = qry_prov.LinuxSyslog.cron_activity( start=session.StartTimeUtc, end=session.EndTimeUtc, host_name=session.Host) if not isinstance(cron_events, pd.DataFrame) or cron_events.empty: md(f'<h3> No Cron activity for {session.Host} between {session.StartTimeUtc} and {session.EndTimeUtc}</h3>') else: cron_events['CMD'].replace('', np.nan, inplace=True) crn_tl_data = {"Cron Exections": {"data": cron_events[['TimeGenerated', 'CMD', 'CronUser', 'SyslogMessage']].dropna(), "source_columns": tooltip_cols, "color": "Blue"}, "Cron Edits": {"data": cron_events.loc[cron_events['SyslogMessage'].str.contains('EDIT')], "source_columns": tooltip_cols, "color": "Green"}} md('<h2> Most common commands run by cron:</h2>') md('This shows how often each cron job was exected within the specified time window') cron_commands = (cron_events[['EventTime', 'CMD']] .groupby(['CMD']).count() .dropna() .style .set_table_attributes('width=900px, text-align=center') .background_gradient(cmap='Reds', low=0.5, high=1) .format("{0:0>1.0f}")) display(cron_commands) # Display summary of user and group creations, deletions and modifications during the session user_activity = qry_prov.LinuxSyslog.user_group_activity( start=session.StartTimeUtc, end=session.EndTimeUtc, host_name=session.Host) if not isinstance(user_activity, pd.DataFrame) or user_activity.empty: md(f'<h3>No user or group moidifcations for {session.Host} between {session.StartTimeUtc} and {session.EndTimeUtc}></h3>') else: add_events = user_activity[user_activity['UserGroupAction'].str.contains( 'Add')] del_events = user_activity[user_activity['UserGroupAction'].str.contains( 'Delete')] mod_events = user_activity[user_activity['UserGroupAction'].str.contains( 'Modify')] user_activity['Count'] = user_activity.groupby('UserGroupAction')['UserGroupAction'].transform('count') if add_events.empty and del_events.empty and mod_events.empty: md('<h2> Users and groups added or deleted:</h2<>') md(f'No users or groups were added or deleted on {host_entity.HostName} between {query_times.start} and {query_times.end}') user_tl_data = {} else: md("<h2>Users added, modified or deleted</h2>") display(user_activity[['UserGroupAction','Count']].drop_duplicates().style.hide_index()) account_actions = pd.DataFrame({"User Additions": [add_events.replace("", np.nan).dropna(subset=['User'])['User'].unique().tolist()], "User Modifications": [mod_events.replace("", np.nan).dropna(subset=['User'])['User'].unique().tolist()], "User Deletions": [del_events.replace("", np.nan).dropna(subset=['User'])['User'].unique().tolist()]}) display(account_actions.style.hide_index()) user_tl_data = {"User adds": {"data": add_events, "source_columns": tooltip_cols, "color": "Orange"}, "User deletes": {"data": del_events, "source_columns": tooltip_cols, "color": "Red"}, "User modfications": {"data": mod_events, "source_columns": tooltip_cols, "color": "Grey"}} # Display sudo activity during session if not isinstance(sudo_sessions, pd.DataFrame) or sudo_sessions.empty: md(f"<h3>No Sudo sessions for {session.Host} between {logon_selection.value.get('Start (UTC)')} and {logon_selection.value.get('End (UTC)')}</h3>") sudo_tl_data = {} else: sudo_start = sudo_events[sudo_events["SyslogMessage"].str.contains( "pam_unix.+session opened")].rename(columns={"Sudoer": "User"}) sudo_tl_data = {"Host logons": {"data": remote_logons, "source_columns": tooltip_cols, "color": "Cyan"}, "Sudo sessions": {"data": sudo_start, "source_columns": tooltip_cols, "color": "Purple"}} try: risky_actions = cmd_line.risky_cmd_line(events=sudo_events, log_type="Syslog") suspicious_events = cmd_speed( cmd_events=sudo_events, time=60, events=2, cmd_field="Command") except: risky_actions = None suspicious_events = None if risky_actions is None and suspicious_events is None: pass else: risky_sessions = risky_sudo_sessions( risky_actions=risky_actions, sudo_sessions=sudo_sessions, suspicious_actions=suspicious_events) for key in risky_sessions: if key in sudo_sessions: sudo_sessions[f"{key} - {risky_sessions[key]}"] = sudo_sessions.pop( key) if isinstance(sudo_events, pd.DataFrame): sudo_events_val = sudo_events[['EventTime', 'CommandCall']][sudo_events['CommandCall']!=""].dropna(how='any', subset=['CommandCall']) if sudo_events_val.empty: md(f"No sucessful sudo activity for {hostname} between {logon_selection.value.get('Start (UTC)')} and {logon_selection.value.get('End (UTC)')}") else: sudo_events.replace("", np.nan, inplace=True) md('<h2> Frequency of sudo commands</h2>') md('This shows how many times each command has been run with sudo. /bin/bash is usally associated with the use of "sudo -i"') sudo_commands = (sudo_events[['EventTime', 'CommandCall']] .groupby(['CommandCall']) .count() .dropna() .style .set_table_attributes('width=900px, text-align=center') .background_gradient(cmap='Reds', low=.5, high=1) .format("{0:0>3.0f}")) display(sudo_commands) else: md(f"No sucessful sudo activity for {hostname} between {logon_selection.value.get('Start (UTC)')} and {logon_selection.value.get('End (UTC)')}") # Display a timeline of all activity during session crn_tl_data.update(user_tl_data) crn_tl_data.update(sudo_tl_data) if crn_tl_data: md('<h2> Session Timeline.</h2>') nbdisplay.display_timeline( data=crn_tl_data, title='Session Timeline', height=300) else: md("No logon sessions during this timeframe")

Raw data from user session

Use this syslog message data to further investigate suspicous activity during the session

if isinstance(logon_sessions_df, pd.DataFrame) and not logon_sessions_df.empty: #Return syslog data and present it to the use for investigation session_syslog = qry_prov.LinuxSyslog.all_syslog( start=session.StartTimeUtc, end=session.EndTimeUtc, host_name=session.Host) if session_syslog.empty: display(HTML( f' No syslog for {session.Host} between {session.StartTimeUtc} and {session.EndTimeUtc}')) def view_sudo(selected_cmd): return [sudo_events.query('CommandCall == @selected_cmd')[ ['TimeGenerated', 'SyslogMessage', 'Sudoer', 'SudoTo', 'Command', 'CommandCall']]] # Show syslog messages associated with selected sudo command items = sudo_events['CommandCall'].dropna().unique().tolist() if items: md("<h3>View all messages associated with a sudo command</h3>") display(nbwidgets.SelectItem(item_list=items, action=view_sudo)) else: md("No logon sessions during this timeframe")
if isinstance(logon_sessions_df, pd.DataFrame) and not logon_sessions_df.empty: # Display syslog messages from the session witht he facility selected items = syslog_events['Facility'].dropna().unique().tolist() md("<h3>View all messages associated with a syslog facility</h3>") display(nbwidgets.SelectItem(item_list=items, action=view_syslog)) else: md("No logon sessions during this timeframe")

Process Tree from session

if isinstance(logon_sessions_df, pd.DataFrame) and not logon_sessions_df.empty: display(HTML("<h3>Process Trees from session</h3>")) print("Building process tree, this may take some time...") # Find the table with auditd data in regex = '.*audit.*\_cl?' matches = ((re.match(regex, key, re.IGNORECASE)) for key in qry_prov.schema) for match in matches: if match != None: audit_table = match.group(0) else: audit_table = None # Retrieve auditd data if audit_table: audit_data = qry_prov.LinuxAudit.auditd_all( start=session.StartTimeUtc, end=session.EndTimeUtc, host_name=hostname ) if isinstance(audit_data, pd.DataFrame) and not audit_data.empty: audit_events = auditdextract.extract_events_to_df( data=audit_data ) process_tree = auditdextract.generate_process_tree(audit_data=audit_events) process_tree.mp_process_tree.plot() else: display(HTML("No auditd data avaliable to build process tree")) else: display(HTML("No auditd data avaliable to build process tree")) else: md("No logon sessions during this timeframe")

Click here to start a process/application focused hunt or continue with session based hunt below by selecting a sudo session to investigate.

Sudo Session Investigation

Sudo activity is often required by an attacker to conduct actions on target, and more granular data is avalibale for sudo sessions allowing for deeper level hunting within these sesions.

if logon_sessions_df is not None and sudo_sessions is not None: sudo_items = sudo_sessions[['User','Start', 'End']].to_string(header=False, index=False, index_names=False).split('\n') sudo_sessions["Key"] = sudo_items sudo_sessions.set_index('Key', inplace=True) sudo_dict = sudo_sessions[['User','Start', 'End']].to_dict('index') sudo_selection = nbwidgets.SelectItem(description='Select sudo session to investigate: ', item_dict=sudo_dict, width='100%', height='300px', auto_display=True) else: sudo_selection = None md("No logon sessions during this timeframe")
#Collect data associated with the sudo session selected sudo_events = None from msticpy.sectools.tiproviders.ti_provider_base import TISeverity def ti_check_sev(severity, threshold): severity = TISeverity.parse(severity) threshold = TISeverity.parse(threshold) return severity.value >= threshold.value if sudo_selection: sudo_sess = {'StartTimeUtc': sudo_selection.value.get('Start'), 'EndTimeUtc': sudo_selection.value.get( 'End'), 'Account': sudo_selection.value.get('User'), 'Host': hostname} sudo_session = entities.HostLogonSession(sudo_sess) sudo_events = qry_prov.LinuxSyslog.sudo_activity(start=sudo_session.StartTimeUtc.round( '-1s') - pd.Timedelta(seconds=1), end=(sudo_session.EndTimeUtc.round('1s')+ pd.Timedelta(seconds=1)), host_name=sudo_session.Host) if isinstance(sudo_events, pd.DataFrame) and not sudo_events.empty: display(sudo_events.replace('', np.nan).dropna(axis=0, subset=['Command'])[ ['TimeGenerated', 'Command', 'CommandCall', 'SyslogMessage']]) # Extract IOCs from the data ioc_extractor = iocextract.IoCExtract() os_family = host_entity.OSType if host_entity.OSType else 'Linux' print('Extracting IoCs.......') ioc_df = ioc_extractor.extract(data=sudo_events, columns=['SyslogMessage'], os_family=os_family, ioc_types=['ipv4', 'ipv6', 'dns', 'url', 'md5_hash', 'sha1_hash', 'sha256_hash']) if len(ioc_df) > 0: ioc_count = len( ioc_df[["IoCType", "Observable"]].drop_duplicates()) md(f"Found {ioc_count} IOCs") #Lookup the extracted IOCs in TI feed ti_resps = tilookup.lookup_iocs(data=ioc_df[["IoCType", "Observable"]].drop_duplicates( ).reset_index(), obs_col='Observable', ioc_type_col='IoCType') i = 0 ti_hits = [] ti_resps.reset_index(drop=True, inplace=True) while i < len(ti_resps): if ti_resps['Result'][i] == True and ti_check_sev(ti_resps['Severity'][i], 1): ti_hits.append(ti_resps['Ioc'][i]) i += 1 else: i += 1 md(f"Found {len(ti_hits)} IoCs in Threat Intelligence") for ioc in ti_hits: md(f"Messages containing IoC found in TI feed: {ioc}") display(sudo_events[sudo_events['SyslogMessage'].str.contains( ioc)][['TimeGenerated', 'SyslogMessage']]) else: md("No IoC patterns found in Syslog Messages.") else: md('No sudo messages for this session') else: md("No Sudo session to investigate")

User Activity

Hypothesis: That an attacker has gained access to the host and is using a user account to conduct actions on the host.

This section provides an overview of activity by user within our hunting time frame, the purpose of this is to allow for the identification of anomalous activity by a user. This hunt can be driven be investigation of suspected users or as a hunt across all users seen on the host.

# Get list of users with logon or sudo sessions on host logon_events = qry_prov.LinuxSyslog.user_logon(query_times, host_name=hostname) users = logon_events['User'].replace('', np.nan).dropna().unique().tolist() all_users = list(users) if isinstance(sudo_events, pd.DataFrame) and not sudo_events.empty: sudoers = sudo_events['Sudoer'].replace( '', np.nan).dropna().unique().tolist() all_users.extend(x for x in sudoers if x not in all_users) # Pick Users if not logon_events.empty: user_select = nbwidgets.SelectItem(description='Select user to investigate: ', item_list=all_users, width='75%', auto_display=True) else: md("There was no user activity in the timeframe specified.") user_select = None
folium_user_map = FoliumMap() def view_sudo(cmd): return [user_sudo_hold.query('CommandCall == @cmd')[ ['TimeGenerated', 'HostName', 'Command', 'CommandCall', 'SyslogMessage']]] user_sudo_hold = None if user_select is not None: # Get all syslog relating to these users username = user_select.value user_events = all_syslog_data[all_syslog_data['SyslogMessage'].str.contains(username)] logon_sessions = cluster_syslog_logons_df(logon_events) # Display all logons associated with the user md(f"<h1> User Logon Activity for {username}</h1>") user_logon_events = logon_events[logon_events['User'] == username] try: user_logon_sessions = cluster_syslog_logons_df(user_logon_events) except: user_logon_sessions = None user_remote_logons = ( user_logon_events[user_logon_events['LogonResult'] == 'Success'] ) user_failed_logons = ( user_logon_events[user_logon_events['LogonResult'] == 'Failure'] ) if not user_remote_logons.empty: for _, row in logon_sessions_df.iterrows(): end = row['End'] user_sudo_events = qry_prov.LinuxSyslog.sudo_activity(start=user_remote_logons.sort_values( by='TimeGenerated')['TimeGenerated'].iloc[0], end=end, host_name=hostname, user=username) else: user_sudo_events = None if user_logon_sessions is None and user_remote_logons.empty and user_failed_logons.empty: pass else: display(HTML( f"{len(user_remote_logons)} sucessfull logons and {len(user_failed_logons)} failed logons for {username}")) display(Markdown('### Timeline of host logon attempts.')) tooltip_cols = ['SyslogMessage'] dfs = {"User Logons" :user_remote_logons, "Failed Logons": user_failed_logons, "Sudo Events" :user_sudo_events} user_tl_data = {} for k,v in dfs.items(): if v is not None and not v.empty: user_tl_data.update({k :{"data":v,"source_columns":tooltip_cols}}) nbdisplay.display_timeline( data=user_tl_data, title="User logon timeline", height=300) all_user_df = pd.DataFrame(dict(successful= user_remote_logons['ProcessName'].value_counts(), failed = user_failed_logons['ProcessName'].value_counts())).fillna(0) processes = all_user_df.index.values.tolist() results = all_user_df.columns.values.tolist() user_fail_sucess_data = {'processes' :processes, 'sucess' : all_user_df['successful'].values.tolist(), 'failure': all_user_df['failed'].values.tolist()} palette = viridis(2) x = [ (process, result) for process in processes for result in results ] counts = sum(zip(user_fail_sucess_data['sucess'], fail_sucess_data['failure']), ()) source = ColumnDataSource(data=dict(x=x, counts=counts)) b = figure(x_range=FactorRange(*x), plot_height=350, plot_width=450, title="Failed and Sucessful logon attempts by process", toolbar_location=None, tools="", y_minor_ticks=2) b.vbar(x='x', top='counts', width=0.9, source=source, line_color="white", fill_color=factor_cmap('x', palette=palette, factors=results, start=1, end=2)) b.y_range.start = 0 b.x_range.range_padding = 0.1 b.xaxis.major_label_orientation = 1 b.xgrid.grid_line_color = None user_logons = pd.DataFrame({"Sucessful Logons" : [int(all_user_df['successful'].sum())], "Failed Logons" : [int(all_user_df['failed'].sum())]}).T user_logon_data = pd.value_counts(user_logon_events['LogonResult'].values, sort=True).head(10).reset_index(name='value').rename(columns={'User':'Count'}) user_logon_data = user_logon_data[user_logon_data['index']!="Unknown"].copy() user_logon_data['angle'] = user_logon_data['value']/user_logon_data['value'].sum() * 2*pi user_logon_data['color'] = viridis(len(user_logon_data)) p = figure(plot_height=350, plot_width=450, title="Relative Frequencies of Failed Logons by Account", toolbar_location=None, tools="hover", tooltips="@index: @value") p.axis.visible = False p.xgrid.visible = False p.ygrid.visible = False p.wedge(x=0, y=1, radius=0.5, start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'), line_color="white", fill_color='color', legend='index', source=user_logon_data) show(Row(p,b)) user_ip_list = [convert_to_ip_entities(i)[0] for i in user_remote_logons['SourceIP']] user_ip_fail_list = [convert_to_ip_entities(i)[0] for i in user_failed_logons['SourceIP']] user_location = get_map_center(ip_list + ip_fail_list) user_folium_map = FoliumMap(location = location, zoom_start=1.4) #Map logon locations to allow for identification of anomolous locations if len(ip_fail_list) > 0: md('<h3>Map of Originating Location of Logon Attempts</h3>') icon_props = {'color': 'red'} user_folium_map.add_ip_cluster(ip_entities=user_ip_fail_list, **icon_props) if len(ip_list) > 0: icon_props = {'color': 'green'} user_folium_map.add_ip_cluster(ip_entities=user_ip_list, **icon_props) display(user_folium_map.folium_map) md('<p style="color:red">Warning: the folium mapping library ' 'does not display correctly in some browsers.</p><br>' 'If you see a blank image please retry with a different browser.') #Display sudo activity of the user if not isinstance(user_sudo_events, pd.DataFrame) or user_sudo_events.empty: md(f"<h3>No sucessful sudo activity for {username}</h3>") else: user_sudo_hold = user_sudo_events user_sudo_commands = (user_sudo_events[['EventTime', 'CommandCall']].replace('', np.nan).groupby(['CommandCall']).count().dropna().style.set_table_attributes('width=900px, text-align=center').background_gradient(cmap='Reds', low=.5, high=1).format("{0:0>3.0f}")) display(user_sudo_commands) md("Select a sudo command to investigate in more detail") display(nbwidgets.SelectItem(item_list=items, action=view_sudo)) else: md("No user session selected")
# If the user has sudo activity extract and IOCs from the logs and look them up in TI feeds if not isinstance(user_sudo_hold, pd.DataFrame) or user_sudo_hold.empty: md(f"No sudo messages data") else: # Extract IOCs ioc_extractor = iocextract.IoCExtract() os_family = host_entity.OSType if host_entity.OSType else 'Linux' print('Extracting IoCs.......') ioc_df = ioc_extractor.extract(data=user_sudo_hold, columns=['SyslogMessage'], ioc_types=['ipv4', 'ipv6', 'dns', 'url', 'md5_hash', 'sha1_hash', 'sha256_hash']) if len(ioc_df) > 0: ioc_count = len(ioc_df[["IoCType", "Observable"]].drop_duplicates()) md(f"Found {ioc_count} IOCs") ti_resps = tilookup.lookup_iocs(data=ioc_df[["IoCType", "Observable"]].drop_duplicates( ).reset_index(), obs_col='Observable', ioc_type_col='IoCType') i = 0 ti_hits = [] ti_resps.reset_index(drop=True, inplace=True) while i < len(ti_resps): if ti_resps['Result'][i] == True and ti_check_sev(ti_resps['Severity'][i], 1): ti_hits.append(ti_resps['Ioc'][i]) i += 1 else: i += 1 md(f"Found {len(ti_hits)} IoCs in Threat Intelligence") for ioc in ti_hits: md(f"Messages containing IoC found in TI feed: {ioc}") display(user_sudo_hold[user_sudo_hold['SyslogMessage'].str.contains( ioc)][['TimeGenerated', 'SyslogMessage']]) else: md("No IoC patterns found in Syslog Message.")

Application Activity

Hypothesis: That an attacker has compromised an application running on the host and is using the applications process to conduct actions on the host.

This section provides an overview of activity by application within our hunting time frame, the purpose of this is to allow for the identification of anomalous activity by an application. This hunt can be driven be investigation of suspected applications or as a hunt across all users seen on the host.

# Get list of Applications apps = all_syslog_data['ProcessName'].replace('', np.nan).dropna().unique().tolist() system_apps = ['sudo', 'CRON', 'systemd-resolved', 'snapd', '50-motd-news', 'systemd-logind', 'dbus-deamon', 'crontab'] if len(host_entity.Applications) > 0: installed_apps = [] installed_apps.extend(x for x in apps if x not in system_apps) # Pick Applications app_select = nbwidgets.SelectItem(description='Select sudo session to investigate: ', item_list=installed_apps, width='75%', auto_display=True) else: display(HTML("No applications other than stand OS applications present"))
# Get all syslog relating to these Applications app = app_select.value app_data = all_syslog_data[all_syslog_data['ProcessName'] == app].copy() # App log volume over time if isinstance(app_data, pd.DataFrame) and not app_data.empty: app_data_volume = app_data.set_index( "TimeGenerated").resample('5T').count() app_data_volume.reset_index(level=0, inplace=True) app_data_volume.rename(columns={"TenantId" : "NoOfLogMessages"}, inplace=True) nbdisplay.display_timeline_values(data=app_data_volume, y='NoOfLogMessages', source_columns=['NoOfLogMessages'], title=f"{app} log volume over time") app_high_sev = app_data[app_data['SeverityLevel'].isin( ['emerg', 'alert', 'crit', 'err', 'warning'])] if isinstance(app_high_sev, pd.DataFrame) and not app_high_sev.empty: app_hs_volume = app_high_sev.set_index( "TimeGenerated").resample('5T').count() app_hs_volume.reset_index(level=0, inplace=True) app_hs_volume.rename(columns={"TenantId" : "NoOfLogMessages"}, inplace=True) nbdisplay.display_timeline_values(data=app_hs_volume, y='NoOfLogMessages', source_columns=['NoOfLogMessages'], title=f"{app} high severity log volume over time") risky_messages = risky_cmd_line(events=app_data, log_type="Syslog", cmd_field="SyslogMessage") if risky_messages: print(risky_messages)

Display process tree

Due to the large volume of data involved you may wish to make you query window smaller

if rel_alert_select is None or rel_alert_select.selected_alert is None: start = query_times.start else: start = rel_alert_select.selected_alert['TimeGenerated'] # Set new investigation time windows based on the selected alert proc_invest_times = nbwidgets.QueryTime(units='hours', max_before=6, max_after=3, before=2, origin_time=start) proc_invest_times.display()
audit_table = None app_audit_data = None app = app_select.value process_tree_data = None regex = '.*audit.*\_cl?' # Find the table with auditd data in and collect the data matches = ((re.match(regex, key, re.IGNORECASE)) for key in qry_prov.schema) for match in matches: if match != None: audit_table = match.group(0) #Check if the amount of data expected to be returned is a reasonable size, if not prompt before continuing if audit_table != None: if isinstance(app_audit_data, pd.DataFrame): pass else: print('Collecting audit data, please wait this may take some time....') app_audit_query_count = f"""{audit_table} | where TimeGenerated >= datetime({proc_invest_times.start}) | where TimeGenerated <= datetime({proc_invest_times.end}) | where Computer == '{hostname}' | summarize count() """ count_check = qry_prov.exec_query(query=app_audit_query_count) if count_check['count_'].iloc[0] > 100000 and not count_check.empty: size = count_check['count_'].iloc[0] print( f"You are returning a very large dataset ({size} rows).", "It is reccomended that you consider scoping the size\n", "of your query down.\n", "Are you sure you want to proceed?" ) response = (input("Y/N") or "N") if ( (count_check['count_'].iloc[0] < 100000) or (count_check['count_'].iloc[0] > 100000 and response.casefold().startswith("y")) ): print("querying audit data...") audit_data = qry_prov.LinuxAudit.auditd_all( start=proc_invest_times.start, end=proc_invest_times.end, host_name=hostname ) if isinstance(audit_data, pd.DataFrame) and not audit_data.empty: print("building process tree...") audit_events = auditdextract.extract_events_to_df( data=audit_data ) process_tree_data = auditdextract.generate_process_tree(audit_data=audit_events) plot_lim = 1000 if len(process_tree) > plot_lim: md_warn(f"More than {plot_lim} processes to plot, limiting to top {plot_lim}.") process_tree[:plot_lim].mp_process_tree.plot(legend_col="exe") else: process_tree.mp_process_tree.plot(legend_col="exe") size = audit_events.size print(f"Collected {size} rows of data") else: md("No audit events avalaible") else: print("Resize query window") else: md("No audit events avalaible")
md(f"<h3>Process tree for {app}</h3>") if process_tree_data is not None: process_tree_df = process_tree_data[process_tree_data["exe"].str.contains(app, na=False)].copy() if not process_tree_df.empty: app_roots = process_tree_data.apply(lambda x: ptree.get_root(process_tree_data, x), axis=1) trees = [] for root in app_roots["source_index"].unique(): trees.append(process_tree_data[process_tree_data["path"].str.startswith(root)]) app_proc_trees = pd.concat(trees) app_proc_trees.mp_process_tree.plot(legend_col="exe", show_table=True) else: display(f"No process tree data avaliable for {app}") process_tree = None else: md("No data avaliable to build process tree")

Application Logs with associated Threat Intelligence

These logs are associated with the process being investigated and include IOCs that appear in our TI feeds.

# Extract IOCs from syslog assocated with the selected process ioc_extractor = iocextract.IoCExtract() os_family = host_entity.OSType if host_entity.OSType else 'Linux' md('Extracting IoCs...') ioc_df = ioc_extractor.extract(data=app_data, columns=['SyslogMessage'], ioc_types=['ipv4', 'ipv6', 'dns', 'url', 'md5_hash', 'sha1_hash', 'sha256_hash']) if process_tree_data is not None and not process_tree_data.empty: app_process_tree = app_proc_trees.dropna(subset=['cmdline']) audit_ioc_df = ioc_extractor.extract(data=app_process_tree, columns=['cmdline'], ioc_types=['ipv4', 'ipv6', 'dns', 'url', 'md5_hash', 'sha1_hash', 'sha256_hash']) ioc_df = ioc_df.append(audit_ioc_df) # Look up IOCs in TI feeds if len(ioc_df) > 0: ioc_count = len(ioc_df[["IoCType", "Observable"]].drop_duplicates()) md(f"Found {ioc_count} IOCs") md("Looking up threat intel...") ti_resps = tilookup.lookup_iocs(data=ioc_df[[ "IoCType", "Observable"]].drop_duplicates().reset_index(drop=True), obs_col='Observable') i = 0 ti_hits = [] ti_resps.reset_index(drop=True, inplace=True) while i < len(ti_resps): if ti_resps['Result'][i] == True and ti_check_sev(ti_resps['Severity'][i], 1): ti_hits.append(ti_resps['Ioc'][i]) i += 1 else: i += 1 display(HTML(f"Found {len(ti_hits)} IoCs in Threat Intelligence")) for ioc in ti_hits: display(HTML(f"Messages containing IoC found in TI feed: {ioc}")) display(app_data[app_data['SyslogMessage'].str.contains( ioc)][['TimeGenerated', 'SyslogMessage']]) else: md("<h3>No IoC patterns found in Syslog Message.</h3>")

Network Activity

Hypothesis: That an attacker is remotely communicating with the host in order to compromise the host or for C2 or data exfiltration purposes after compromising the host.

This section provides an overview of network activity to and from the host during hunting time frame, the purpose of this is to allow for the identification of anomalous network traffic. If you wish to investigate a specific IP in detail it is recommended that you use the IP Explorer Notebook (include link).

# Get list of IPs from Syslog and Azure Network Data ioc_extractor = iocextract.IoCExtract() os_family = host_entity.OSType if host_entity.OSType else 'Linux' print('Finding IP Addresses this may take a few minutes.......') syslog_ips = ioc_extractor.extract(data=all_syslog_data, columns=['SyslogMessage'], ioc_types=['ipv4', 'ipv6']) if 'AzureNetworkAnalytics_CL' not in qry_prov.schema: az_net_comms_df = None az_ips = None else: if hasattr(host_entity, 'private_ips') and hasattr(host_entity, 'public_ips'): all_host_ips = host_entity.private_ips + \ host_entity.public_ips + [host_entity.IPAddress] else: all_host_ips = [host_entity.IPAddress] host_ips = {'\'{}\''.format(i.Address) for i in all_host_ips} host_ip_list = ','.join(host_ips) az_ip_where = f"""| where (VMIPAddress in ("{host_ip_list}") or SrcIP in ("{host_ip_list}") or DestIP in ("{host_ip_list}")) and (AllowedOutFlows > 0 or AllowedInFlows > 0)""" az_net_comms_df = qry_prov.AzureNetwork.az_net_analytics( start=query_times.start, end=query_times.end, host_name=hostname, where_clause=az_ip_where) if isinstance(az_net_comms_df, pd.DataFrame) and not az_net_comms_df.empty: az_ips = az_net_comms_df.query("PublicIPs != @host_entity.IPAddress") else: az_ips = None if len(syslog_ips): IPs = syslog_ips[['IoCType', 'Observable']].drop_duplicates('Observable') display(f"Found {len(IPs)} IP Addresses assoicated with the host") else: md("### No IoC patterns found in Syslog Message.") if az_ips is not None: ips = az_ips['PublicIps'].drop_duplicates( ) + syslog_ips['Observable'].drop_duplicates() else: ips = syslog_ips['Observable'].drop_duplicates() if isinstance(az_net_comms_df, pd.DataFrame) and not az_net_comms_df.empty: import warnings with warnings.catch_warnings(): warnings.simplefilter("ignore") az_net_comms_df['TotalAllowedFlows'] = az_net_comms_df['AllowedOutFlows'] + \ az_net_comms_df['AllowedInFlows'] sns.catplot(x="L7Protocol", y="TotalAllowedFlows", col="FlowDirection", data=az_net_comms_df) sns.relplot(x="FlowStartTime", y="TotalAllowedFlows", col="FlowDirection", kind="line", hue="L7Protocol", data=az_net_comms_df).set_xticklabels(rotation=50) nbdisplay.display_timeline(data=az_net_comms_df.query('AllowedOutFlows > 0'), overlay_data=az_net_comms_df.query( 'AllowedInFlows > 0'), title='Network Flows (out=blue, in=green)', time_column='FlowStartTime', source_columns=[ 'FlowType', 'AllExtIPs', 'L7Protocol', 'FlowDirection'], height=300) else: md('<h3>No Azure network data for specified time range.</h3>')

Choose ASNs/IPs to Check for Threat Intel Reports

Choose from the list of Selected ASNs for the IPs you wish to check on. Then select the IP(s) that you wish to check against Threat Intelligence data. The Source list is populated with all ASNs found in the syslog and network flow data.

#Lookup each IP in whois data and extract the ASN @lru_cache(maxsize=1024) def whois_desc(ip_lookup, progress=False): try: ip = ip_address(ip_lookup) except ValueError: return "Not an IP Address" if ip.is_private: return "private address" if not ip.is_global: return "other address" whois = IPWhois(ip) whois_result = whois.lookup_whois() if progress: print(".", end="") return whois_result["asn_description"] # Summarise network data by ASN ASN_List = [] print("WhoIs Lookups") ASNs = ips.apply(lambda x: whois_desc(x, True)) IP_ASN = pd.DataFrame(dict(IPs=ips, ASN=ASNs)).reset_index() x = IP_ASN.groupby(["ASN"]).count().drop( 'index', axis=1).sort_values('IPs', ascending=False) display(x) ASN_List = x.index # Select an ASN to investigate in more detail selection = widgets.SelectMultiple( options=ASN_List, width=900, description='Select ASN to investigate', disabled=False ) display(selection)
# For every IP associated with the selected ASN look them up in TI feeds ip_invest_list = None ip_selection = None for ASN in selection.value: if ip_invest_list is None: ip_invest_list = (IP_ASN[IP_ASN["ASN"] == ASN]['IPs'].tolist()) else: ip_invest_list + (IP_ASN[IP_ASN["ASN"] == ASN]['IPs'].tolist()) if ip_invest_list is not None: ioc_ip_list = [] if len(ip_invest_list) > 0: ti_resps = tilookup.lookup_iocs(data=ip_invest_list, providers=["OTX"]) i = 0 ti_hits = [] while i < len(ti_resps): if ti_resps['Details'][i]['pulse_count'] > 0: ti_hits.append(ti_resps['Ioc'][i]) i += 1 else: i += 1 display(HTML(f"Found {len(ti_hits)} IoCs in Threat Intelligence")) for ioc in ti_hits: ioc_ip_list.append(ioc) #Show IPs found in TI feeds for further investigation if len(ioc_ip_list) > 0: display(HTML("Select an IP whcih appeared in TI to investigate further")) ip_selection = nbwidgets.SelectItem(description='Select IP Address to investigate: ', item_list = ioc_ip_list, width='95%', auto_display=True) else: md("No IPs to investigate")
# Get all syslog for the IPs if ip_selection is not None: display(HTML("Syslog data associated with this IP Address")) sys_hits = all_syslog_data[all_syslog_data['SyslogMessage'].str.contains( ip_selection.value)] display(sys_hits) os_family = host_entity.OSType if host_entity.OSType else 'Linux' display(HTML("TI result for this IP Address")) display(ti_resps[ti_resps['Ioc'] == ip_selection.value]) else: md("No IP address selected")

Configuration

msticpyconfig.yaml configuration File

You can configure primary and secondary TI providers and any required parameters in the msticpyconfig.yaml file. This is read from the current directory or you can set an environment variable (MSTICPYCONFIG) pointing to its location.

To configure this file see the ConfigureNotebookEnvironment notebook