Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/scenario-notebooks/Microsoft Sentinel Query Creator.ipynb
3250 views
Kernel: Python 3.8 - AzureML

Microsoft Sentinel Query Creator Notebook

This utility notebook assists in the creation of Microsoft Sentinel analytics and hunting queries by providing an interactive method for completing the required elements of the query templates and then formating this data in to the YAML format required by the Azure Sentinel GitHub repo.

From time to time this notebook may fall out of sync with Microsoft Sentinel data connectors and Mitre ATT&CK elements. This this is the case you can manually adjust created queries with the required elements but please also raise an issue or PR on GitHub so that we can get this notebook updated.

Setup

Run these setup cells before creating either an analytics or hunting query.

Prerequisites:

import re import uuid import ipywidgets as widgets from ipywidgets import interact # Set connector options connectors = { "AWS": ["AWSCloudTrail"], "AzureActiveDirectory": [ "SigninLogs", "AuditLogs", "AADServicePrincipalSignInLogs", "AADManagedIdentitySignInLogs", "AADNonInteractiveUserSignInLogs", ], "AzureActiveDirectoryIdentityProtection": ["SecurityAlert (IPC)"], "AzureActivity": ["AzureActivity"], "AzureAdvancedThreatProtection": ["SecurityAlert (AATP)"], "AzureInformationProtection": [ "InformationProtectionLogs_CL", "SecurityAlert (AIP)", ], "AzureMonitor(IIS)": ["W3CIISLog"], "AzureMonitor(VMInsights)": ["VMConnection"], "AzureMonitor(WireData)": ["WireData"], "AzureSecurityCenter": ["SecurityAlert (ASC)"], "IoT": ["SecurityAlert (ASC for IoT)"], "BarracudaCloudFirewall": ["Syslog(Barracuda)"], "Barracuda": ["CommonSecurityLog (Barracuda)", "Barracuda_CL"], "CheckPoint": ["CommonSecurityLog (CheckPoint)"], "CiscoASA": ["CommonSecurityLog (Cisco)"], "Citrix": ["CitrixAnalytics_SAlerts_CL"], "CEF": ["CommonSecurityLog"], "CyberArk": ["CyberArk"], "DNS": ["DnsEvents", "DnsInventory"], "ExtraHopNetworks": ["CommonSecurityLog ('ExtraHop')"], "F5BigIp": ["F5Telemetry_LTM_CL", "F5Telemetry_system_CL", "F5Telemetry_ASM_CL"], "F5": ["CommonSecurityLog (F5)"], "Fortinet": ["CommonSecurityLog (Fortinet)"], "MicrosoftCloudAppSecurity": ["SecurityAlert (MCAS)", "McasShadowItReporting"], "MicrosoftDefenderAdvancedThreatProtection": ["SecurityAlert (MDATP)"], "WAF": ["AzureDiagnostics (Application Gateways)"], "Office365": [ "OfficeActivity (SharePoint)", "OfficeActivity (Exchange)", "OfficeActivity (Teams)", ], "OfficeATP": ["SecurityAlert (Office 365 Security & Compliance)"], "OneIdentity": ["CommonSecurityLog (OneIdentity)"], "PaloAltoNetworks": ["CommonSecurityLog (PaloAlto)"], "SecurityEvents": ["SecurityEvents"], "Symantec": ["SymantecICDx_CL"], "Syslog": ["Syslog"], "ThreatIntelligenceTaxii": ["ThreatIntelligenceIndicator"], "ThreatIntelligence": ["ThreatIntelligenceIndicator"], "TrendMicro": ["CommonSecurityLog (TrendMicroDeepSecurity)"], "WindowsEventForwarding": ["WindowsEvent"], "WindowsFireWall": ["WindowsFirewall"], "Zscaler": ["CommonSecurityLog (Zscaler)"], "BehaviorAnalytics": ["BehaviorAnalytics"], } # Set Mitre ATT&CK categories mitre = [ "Reconnaissance", "ResourceDevelopment", "InitialAccess", "Execution", "Persistence", "PrivilegeEscalation", "DefenseEvasion", "CredentialAccess", "Discovery", "LateralMovement", "Collection", "CommandAndControl", "Exfiltration", "Impact", ] # Set entity identifiers entity_identifiers = { "Account": [ "Name", "FullName", "NTDomain", "DnsDomain", "UPNSuffix", "Sid", "AadTenantId", "AadUserId", "PUID", "IsDomainJoined", "DisplayName", "ObjectGuid", ], "Host": [ "DnsDomain", "NTDomain", "HostName", "FullName", "NetBiosName", "AzureID", "OMSAgentID", "OSFamily", "OSVersion", "IsDomainJoined", ], "IP": ["Address"], "Malware": ["Name", "Category"], "File": ["Directory", "Name"], "Process": ["ProcessId", "CommandLine", "ElevationToken", "CreationTimeUtc"], "CloudApplication": ["AppId", "Name", "InstanceName"], "DNS": ["DomainName"], "AzureResource": ["ResourceId"], "FileHash": ["Algorithm", "Value"], "RegistryKey": ["Hive", "Key"], "RegistryValue": ["Name", "Value", "ValueType"], "SecurityGroup": ["DistinguishedName", "SID", "ObjectGuid"], "URL": ["Url"], "Mailbox": [ "MailboxPrimaryAddress", "DisplayName", "Upn", "ExternalDirectoryObjectId", "RiskLevel", ], "MailCluster": [ "NetworkMessageIds", "CountByDeliveryStatus", "CountByThreatType", "CountByProtectionStatus", "Threats", "Query", "QueryTime", "MailCount", "IsVolumeAnomaly", "Source", "ClusterSourceIdentifier", "ClusterSourceType", "ClusterQueryStartTime", "ClusterQueryEndTime", "ClusterGroup", ], "MailMessage": [ "Recipient", "Urls", "Threats", "Sender", "P1Sender", "P1SenderDisplayName", "P1SenderDomain", "SenderIP", "P2Sender", "P2SenderDisplayName", "P2SenderDomain", "ReceivedDate", "NetworkMessageId", "InternetMessageId", "Subject", "BodyFingerprintBin1", "BodyFingerprintBin2", "BodyFingerprintBin3", "BodyFingerprintBin4", "BodyFingerprintBin5", "AntispamDirection", "DeliveryAction", "DeliveryLocation", "Language", "ThreatDetectionMethods", ], "SubmissionMail": [ "NetworkMessageId", "Timestamp", "Recipient", "Sender", "SenderIp", "Subject", "ReportType", "SubmissionId", "SubmissionDate", "Submitter", ], }
name_layout = widgets.Layout(width="150px") widget_layout = widgets.Layout(width="75%") # Select datatype from connector ID def create_dts(): datatype = [] for connector in RequiredDataConnectors.value: datatype.extend(connectors[connector]) return datatype # Create widgets to collect query elements QueryName = widgets.HBox( [ widgets.Label("Query Name:", layout=name_layout), widgets.Text( placeholder="The name of the query", disabled=False, layout=widget_layout ), ], layout=widget_layout, ) QueryDescription = widgets.HBox( [ widgets.Label("Query Description:", layout=name_layout), widgets.Textarea( placeholder="A description of the query - as a single line", disabled=False, layout=widget_layout, rows=10, ), ], layout=widget_layout, ) Severity = widgets.HBox( [ widgets.Label("Severity:", layout=name_layout), widgets.Dropdown( options=["Low", "Medium", "High"], value="Medium", disabled=False, layout=widget_layout, ), ], layout=widget_layout, ) QueryPeriod = widgets.HBox( [ widgets.Label("Query Period (hours):", layout=name_layout), widgets.IntSlider( value=168, min=0, max=336, step=1, disabled=False, orientation="horizontal", readout=True, layout=widget_layout, ), ], layout=widget_layout, ) QueryFrequency = widgets.HBox( [ widgets.Label("Query Frequency (hours):", layout=name_layout), widgets.IntSlider( value=12, min=0, max=168, step=1, disabled=False, orientation="horizontal", readout=True, layout=widget_layout, ), ], layout=widget_layout, ) TriggerOperator = widgets.HBox( [ widgets.Label("Trigger:", layout=name_layout), widgets.Dropdown( options=["gt", "lt", "eq"], value="gt", disabled=False, layout=widget_layout ), ], layout=widget_layout, ) TriggerThreshold = widgets.HBox( [ widgets.Label("Trigger Threshold:", layout=name_layout), widgets.IntSlider( value=1, min=0, max=10, step=1, disabled=False, orientation="horizontal", readout=True, layout=widget_layout, ), ], layout=widget_layout, ) Tactics = widgets.HBox( [ widgets.Label("ATT&CK Tactics:", layout=name_layout), widgets.SelectMultiple( options=mitre, value=[mitre[0]], disabled=False, layout=widget_layout ), ], layout=widget_layout, ) Techniques = widgets.HBox( [ widgets.Label("ATT&CK Techniques:", layout=name_layout), widgets.Text(placeholder="T001, T001.1", disabled=False, layout=widget_layout), ], layout=widget_layout, ) Query = widgets.HBox( [ widgets.Label("Query:", layout=name_layout), widgets.Textarea( placeholder="Table | take 10", disabled=False, layout=widget_layout, rows=20 ), ], layout=widget_layout, ) EntityCount = widgets.HBox( [ widgets.Label("Number of Entities:", layout=name_layout), widgets.IntSlider( value=1, min=0, max=5, step=1, disabled=False, continuous_update=False, orientation="horizontal", layout=widget_layout, ), ], layout=widget_layout, ) RequiredDataConnectors = widgets.SelectMultiple( options=list(connectors.keys()), value=[list(connectors.keys())[0]], disabled=False, descriptoin="Data Connectors", layout=widget_layout, ) DataTypes = widgets.SelectMultiple( options=create_dts(), description="DataType", disabled=False, layout=widget_layout ) Kind = widgets.Dropdown( options=["scheduled", "NRT"], disabled=False, description="Alert Type" ) extra_ents = widgets.Textarea( placeholder='Add additional entities', description='Add Entities:', disabled=False, layout=widget_layout )

Create an Analytics Query

Use the following cells to create an Azure Sentinel Analytics Query. Complete the widgets in the next cells with details of your analytics query and then run the cells below to create an analytics query in the required template format and then write it to disk.

For hunting queries use the Create a Hunting Query section of this notebook.

# Display widgets used to enter analytic query details print("Complete your query details:") print("For details of what is required in each section see https://github.com/Azure/Azure-Sentinel/wiki/Query-Style-Guide") display(QueryName) display(QueryDescription) display(Severity) @interact(DataConnector=RequiredDataConnectors, dtype=DataTypes) def select_datatypes(DataConnector, dtype): DataTypes.options = create_dts() display(QueryPeriod) display(QueryFrequency) display(TriggerOperator) display(TriggerThreshold) display(Tactics) display(Techniques) display(Query) display(Kind)
Complete your query details: For details of what is required in each section see https://github.com/Azure/Azure-Sentinel/wiki/Query-Style-Guide
HBox(children=(Label(value='Query Name:', layout=Layout(width='150px')), Text(value='', layout=Layout(width='7…
HBox(children=(Label(value='Query Description:', layout=Layout(width='150px')), Textarea(value='', layout=Layo…
HBox(children=(Label(value='Severity:', layout=Layout(width='150px')), Dropdown(index=1, layout=Layout(width='…
interactive(children=(SelectMultiple(description='DataConnector', index=(0,), layout=Layout(width='75%'), opti…
HBox(children=(Label(value='Query Period (hours):', layout=Layout(width='150px')), IntSlider(value=168, layout…
HBox(children=(Label(value='Query Frequency (hours):', layout=Layout(width='150px')), IntSlider(value=12, layo…
HBox(children=(Label(value='Trigger:', layout=Layout(width='150px')), Dropdown(layout=Layout(width='75%'), opt…
HBox(children=(Label(value='Trigger Threshold:', layout=Layout(width='150px')), IntSlider(value=1, layout=Layo…
HBox(children=(Label(value='ATT&CK Tactics:', layout=Layout(width='150px')), SelectMultiple(index=(0,), layout…
HBox(children=(Label(value='ATT&CK Techniques:', layout=Layout(width='150px')), Text(value='', layout=Layout(w…
HBox(children=(Label(value='Query:', layout=Layout(width='150px')), Textarea(value='', layout=Layout(width='75…
Dropdown(description='Alert Type', options=('scheduled', 'NRT'), value='scheduled')
print("Select values to extract as Entities (max of 5): ") # Parse out any extended columns that might be entities that a user would want to extract lines = Query.children[1].value.split("|") items = [] for line in lines: if "extend" in line: items.extend(re.split(" |=|,", line)) items = [i for i in items if i not in [None, "extend", "timestamp", ""]] set(items) # Let the user select columns they want to extract as entities ents = widgets.HBox( [ widgets.Label("Number of Entities:", layout=name_layout), widgets.SelectMultiple( options=list(items), disabled=False, layout=widget_layout ), ], layout=widget_layout, ) display(ents) print("If there are additional fields you want to extract as entities add them as a comma seperated list below:") display(extra_ents)
Select values to extract as Entities (max of 5):
HBox(children=(Label(value='Number of Entities:', layout=Layout(width='150px')), SelectMultiple(layout=Layout(…
If there are additional fields you want to extract as entities add them as a comma seperated list below:
Textarea(value='', description='Add Entities:', layout=Layout(width='75%'), placeholder='Add additional entiti…
# Have user select the entity type of each value selected for extraction all_ents = ents.children[1].value + tuple(extra_ents.value.split(",")) if extra_ents.value else ents.children[1].value for ent in all_ents: print(f"Select the entity type of {ent}:") globals()[ent+"widget"] = widgets.Dropdown(options=entity_identifiers.keys()) display(globals()[ent+"widget"])
# Based on selected entities allow user to select the identifier type ent_mapping = [] for ent in ents.children[1].value: ent_mapping.append({"columnName" : ent, "entityType" :globals()[ent+"widget"].value}) for item in ent_mapping: ent=item['columnName'] print(f"Select identifier type for {item['columnName']} of type {item['entityType']}:") globals()[ent+"identwidget"] = widgets.Dropdown(options=entity_identifiers[item['entityType']]) display(globals()[ent+"identwidget"])
Select identifier type for RoleName of type Account:
Dropdown(options=('Name', 'FullName', 'NTDomain', 'DnsDomain', 'UPNSuffix', 'Sid', 'AadTenantId', 'AadUserId',…
Select identifier type for IPCustomEntity of type Account:
Dropdown(options=('Name', 'FullName', 'NTDomain', 'DnsDomain', 'UPNSuffix', 'Sid', 'AadTenantId', 'AadUserId',…
Select identifier type for AccountCustomEntity of type Account:
Dropdown(options=('Name', 'FullName', 'NTDomain', 'DnsDomain', 'UPNSuffix', 'Sid', 'AadTenantId', 'AadUserId',…
# Generate unique GUID for the template q_guid = str(uuid.uuid4()) # Get and format required data connectors and associated data types connector_id = "" if len(RequiredDataConnectors.value) == 0: connector_id += "[]" elif len(RequiredDataConnectors.value) == 1: connector_id += f"- connectorId: {RequiredDataConnectors.value[0]}" connector_id += """ dataTypes:""" for dtype in DataTypes.value: connector_id += f""" - {dtype}""" else: i = 0 for conn in RequiredDataConnectors.value: if i == 0: connector_id += f"- connectorId: {conn}" else: connector_id += f""" - connectorId: {conn}""" connector_id += """ dataTypes:""" i+=1 for dtype in DataTypes.value: if dtype in connectors[conn]: connector_id += f""" - {dtype}""" # Based on query period determine whether days or hours are best used if int(QueryFrequency.children[1].value/24) > 0: qfreq_val = f"{int(QueryFrequency.children[1].value/24)}d" else: qfreq_val = f"{QueryFrequency.children[1].value}h" if int(QueryPeriod.children[1].value/24) > 0: qperiod_val = f"{int(QueryPeriod.children[1].value/24)}d" else: qperiod_val = f"{QueryPeriod.children[1].value}h" # Get and format Mitre ATT&CK tactics and techniques att_tactics = "tactics:" for tact in Tactics.children[1].value: att_tactics += f""" - {tact}""" technique_ids = "relevantTechniques:" for tech in Techniques.children[1].value.split(","): technique_ids += f""" - {tech.strip()}""" #Get and format entity mapping for ent in ents.children[1].value: for entm in ent_mapping: if entm['columnName'] == ent: entm.update({"identifier" :globals()[ent+"identwidget"].value}) ent_ids = "entityMappings:" for ent in ent_mapping: ent_ids += f""" - entityType: {ent['entityType']} fieldMappings: - identifier: {ent['identifier']} columnName: {ent['columnName']}""" analytic_body = f"""id: {q_guid} name: {QueryName.children[1].value} description: | '{QueryDescription.children[1].value}' severity: {Severity.children[1].value} requiredDataConnectors: {connector_id} queryFrequency: {qfreq_val} queryPeriod: {qperiod_val} triggerOperator: {TriggerOperator.children[1].value} triggerThreshold: {TriggerThreshold.children[1].value} {att_tactics} {technique_ids} query: | {Query.children[1].value} {ent_ids} version: 1.0.0 kind: {Kind.value} """ print("Your analytics query:\n") print(analytic_body)

If you want to validate this query by running it against a Sentinel workspace go to the validations section.

# Write file to disk with the name of the query file_name = QueryName.children[1].value.replace(" ",'') with open(f"{file_name}.yaml", "x") as qfile: qfile.write(analytic_body) print(f"Query written to {file_name}.yaml")

Create a Hunting Query

Use the following cells to create an Azure Sentinel Hunting Query. Complete the widgets in the next cells with details of your analytics query and then run the cells below to create an analytics query in the required template format and then write it to disk.

For analytics queries use the Create an Analytics Query section of this notebook.

# Display widgets for the elements required in the Hunting Query template print("Complete your query details:") display(QueryName) display(QueryDescription) @interact(dconnector = RequiredDataConnectors, dtype = DataTypes) def select_datatypes(dconnector, dtype): DataTypes.options = create_dts() display(Tactics) display(Techniques) display(Query)
print("Select values to extract as Entities (max of 5): ") # Parse out any extended columns that might be entities that a user would want to extract lines = Query.children[1].value.split("|") items = [] for line in lines: if "extend" in line: items.extend(re.split(" |=|,", line)) items = [i for i in items if i not in [None, "extend", "timestamp", ""]] set(items) # Let the user select columns they want to extract as entities ents = widgets.HBox([widgets.Label('Number of Entities:', layout=name_layout), widgets.SelectMultiple( options=list(items), disabled=False, layout=widget_layout )], layout=widget_layout) display(ents) print("If there are additional fields you want to extract as entities add them as a comma seperated list below:") display(extra_ents)
Select values to extract as Entities (max of 5):
HBox(children=(Label(value='Number of Entities:', layout=Layout(width='150px')), SelectMultiple(layout=Layout(…
If there are additional fields you want to extract as entities add them as a comma seperated list below:
Textarea(value='', description='Add Entities:', placeholder='Add additional entities')
# Have user select the entity type of each value selected for extraction for ent in ents.children[1].value: print(f"Select the entity type of {ent}:") globals()[ent+"widget"] = widgets.Dropdown(options=entity_identifiers.keys()) display(globals()[ent+"widget"])
# Based on selected entities allow user to select the identifier type ent_mapping = [] for ent in ents.children[1].value: ent_mapping.append({"columnName" : ent, "entityType" :globals()[ent+"widget"].value}) for item in ent_mapping: ent=item['columnName'] print(f"Select identifier type for {item['columnName']} of type {item['entityType']}:") globals()[ent+"identwidget"] = widgets.Dropdown(options=entity_identifiers[item['entityType']]) display(globals()[ent+"identwidget"])
# Generate unique GUID for the template q_guid = str(uuid.uuid4()) # Get and format Data Connector and Data Type elemtns connector_id = "" if len(RequiredDataConnectors.value) == 0: connector_id += "[]" elif len(RequiredDataConnectors.value) == 1: connector_id += f"- connectorId: {RequiredDataConnectors.value[0]}" connector_id += """ dataTypes:""" for dtype in DataTypes.value: connector_id += f""" - {dtype}""" else: i = 0 for conn in RequiredDataConnectors.value: if i == 0: connector_id += f"- connectorId: {conn}" else: connector_id += f""" - connectorId: {conn}""" connector_id += """ dataTypes:""" i+=1 for dtype in DataTypes.value: if dtype in connectors[conn]: connector_id += f""" - {dtype}""" # Get and format Mitre ATT&CK tactics and techniques att_tactics = "tactics:" for tact in Tactics.children[1].value: att_tactics += f""" - {tact}""" technique_ids = "relevantTechniques:" for tech in Techniques.children[1].value.split(","): technique_ids += f""" - {tech.strip()}""" # Build out entity mapps for ent in ents.children[1].value: for entm in ent_mapping: if entm['columnName'] == ent: entm.update({"identifier" :globals()[ent+"identwidget"].value}) ent_ids = "entityMappings:" for ent in ent_mapping: ent_ids += f""" - entityType: {ent['entityType']} fieldMappings: - identifier: {ent['identifier']} columnName: {ent['columnName']}""" # Write out populated template body analytic_body = f"""id: {q_guid} name: {QueryName.children[1].value} description: | '{QueryDescription.children[1].value}' requiredDataConnectors: {connector_id} {att_tactics} {technique_ids} query: | {Query.children[1].value} {ent_ids} """ print("Your hunting query:\n") print(analytic_body)

If you want to validate this query by running it against a Sentinel workspace go to the validations section.

# Write the query file to disk with using the query title as the file name file_name = QueryName.children[1].value.replace(" ",'') with open(f"{file_name}.yaml", "x") as qfile: qfile.write(analytic_body) print(f"Query written to {file_name}.yaml")

Query Validation

If you want to validate your query by running it against a Sentinel Workspace run the code below. This is an optional step and requires use of MSTICPy.

Enter your Microsoft Sentinel Workspace ID and Tenant ID below and the query created will be run against the specified workspace.

#query validotor ws_id = widgets.Text( placeholder='Enter your Workspace ID', description='Workspace ID:', disabled=False ) ten_id = widgets.Text( placeholder='Enter your Tennant ID', description='Tenant ID:', disabled=False ) display(ws_id) display(ten_id)
Text(value='', description='Workspace ID:', placeholder='Enter your Workspace ID')
Text(value='', description='Tenant ID:', placeholder='Enter your Tennant ID')
import msticpy as mp from msticpy.common.exceptions import MsticpyDataQueryError mp.init_notebook( namespace=globals() )
qry_prov = mp.QueryProvider("AzureSentinel") la_connection_string = f'loganalytics://code().tenant("{ten_id.value}").workspace("{ws_id.value}")' qry_prov.connect(connection_str=la_connection_string)
qry_prov.exec_query(Query.children[1].value)