Path: blob/master/tutorials-and-examples/example-notebooks/Example - Azure Storage VT Hash Lookup.ipynb
3253 views
Kernel: msticpy
Azure Storage VirusTotal Lookup
This sample notebook will extract all MD5 file hashes from Azure Storage logs and then perform a VirusTotal lookup using MSTICPy TILookup()
Supported Azure Storage containers
Step 1 - Imports
In [ ]:
from pathlib import Path import os import sys import warnings from ipywidgets import HBox from IPython.display import display, HTML, Markdown import importlib import ipywidgets as widgets from ipywidgets import HBox REQ_PYTHON_VER = "3.10" REQ_MSTICPY_VER = "2.12.0" display(HTML("<h3>Starting Notebook setup...</h3>")) # If not using Azure Notebooks, install msticpy with # %pip install msticpy from msticpy.nbtools import nbinit nbinit.init_notebook( namespace=globals(), extra_imports=["ipwhois, IPWhois"] ); display(HTML("<h3>Step 2 - Select Workspace</h3>"))
In [ ]:
try: pick_time_range = widgets.Dropdown( options=['3d', '7d', '14d', '30d', '60d'], decription="Time range", disabled=False, ) workspaces_available = WorkspaceConfig().list_workspaces() target_workspace = widgets.Dropdown( options=workspaces_available.keys(), decription="Workspace") display(HBox([pick_time_range, target_workspace])) display(HTML("<h3>Step 3 - Connect to Workspace</h3>")) except RuntimeError: md("""You do not have any Workspaces configured in your config files. Please run the https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/ConfiguringNotebookEnvironment.ipynb to setup these files before proceeding""" ,'bold')
In [ ]:
time_range = pick_time_range.value workspace_name = target_workspace.value # Collect Microsoft Sentinel Workspace Details from our config file and use them to connect try: # Update to WorkspaceConfig(workspace="WORKSPACE_NAME") to get alerts from a Workspace other than your default one. # Run WorkspaceConfig().list_workspaces() to see a list of configured workspaces ws_config = WorkspaceConfig(workspace=workspace_name) ws_id = ws_config['workspace_id'] ten_id = ws_config['tenant_id'] md("Workspace details collected from config file") qry_prov = QueryProvider(data_environment='LogAnalytics') qry_prov.connect(connection_str=ws_config.code_connect_str) display(HTML("<h3>Step 4 - Collect Storage Logs Data</h3>")) except RuntimeError: md("""You do not have any Workspaces configured in your config files. Please run the https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/ConfiguringNotebookEnvironment.ipynb to setup these files before proceeding""" ,'bold')
In [ ]:
alert_summary_query = f''' union StorageFileLogs, StorageBlobLogs | where TimeGenerated > ago({time_range}) | where OperationName =~ "PutBlob" or OperationName =~ "PutRange" | extend ClientIP = tostring(split(CallerIpAddress, ":", 0)[0]) | extend FileName = extract(@"\/([\w\-. ]+)\?", 1, Uri) | extend base64Char = base64_decode_toarray(ResponseMd5) | mv-expand base64Char | extend hexChar = tohex(toint(base64Char)) | extend hexChar = iff(strlen(hexChar) < 2, strcat("0", hexChar), hexChar) | extend SourceTable = iff(OperationName has "range", "StorageFileLogs", "StorageBlobLogs") | summarize make_list(hexChar) by CorrelationId, ResponseMd5, FileName, AccountName, TimeGenerated, RequestBodySize, ClientIP, SourceTable | extend Md5Hash = strcat_array(list_hexChar, "") | project TimeGenerated, FileName, ClientIP, SourceTable, Md5Hash, AccountName, RequestBodySize ''' alertout = qry_prov.exec_query(alert_summary_query) if alertout.empty: display(HTML("<h4>ERROR: No File Data</h4>")) display(HTML("<p>No files were found in your storage account logs.</p>")) else: display(HTML("<h4>Storage Log File Data</h4>")) display(HTML("<p>The following file information was collcted from your storage account</p>")) display(alertout) display(HTML("<h3>Step 5 - Query VirusTotal</h3>"))
In [ ]:
hashes = [] hash_count = 0 hashset = alertout.Md5Hash.unique().tolist() print(f"Querying VirusTotal for {len(hashset)} hashes") ti_lookup = TILookup(providers=["VirusTotal"]) ti_lookup.provider_status result = ti_lookup.lookup_iocs(data=hashset, providers="VirusTotal") display(HTML("<h3>Success - VirusTotal Results</h3>")) display(result)