Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/tutorials-and-examples/example-notebooks/Recorded Future Sigma Rules Importer.ipynb
3253 views
Kernel: Python 3.8 - AzureML

Import and convert Recorded Future Sigma rules

Created by Jonah Feldman, Recorded Future. Heavily based off of this notebook by [email protected]

NOTE: This is a beta version. We are constantly improving its features and functionality to provide you with the best user experience possible. Your feedback is valuable and helps us identify areas of improvement. Please send any suggestions or issues to [email protected].

This notebook fetches sigma rules created by Recorded Future's Insikt team and converts them to KQL. After conversion, you have the option to interactively query your Log Analytics workspace with these rules or create a Sentine Analytic rule to generate alerts/incidents based off it's detections

Known issues:

  • Authentication to Sentinel can sometimes fail to detect msticpyconfig.yaml even when it's present. As a workaround, you can directly hardcode the, subscription id, resource group name, and workspace name into the cell

  • Techniques and not currently translated correctly to the Analytic Rule. This is being worked on

  • Backslashes are not always escaped properly and some rules with backlashes in the query will not always execute correctly

  • In certain shells, msticpy[azure] in the first cell needs to be wrapped in quotes

By Default, all Analytic rules are create disabled

Install dependencies

from pathlib import Path from IPython.display import display, HTML REQ_PYTHON_VER = "3.6" REQ_MSTICPY_VER = "1.0.0" display(HTML("<h3>Starting Notebook setup...</h3>")) # If not using Azure Notebooks, install msticpy with #%pip install msticpy #%pip install msticpy[azure] #OR %pip install "msticpy[azure]" #%pip install sigmatools from msticpy.nbtools import nbinit nbinit.init_notebook(namespace=globals());

Enter your Recorded Future API key

This notebook requires an API key to communicate with the Recorded Future API. To obtain API keys, please visit Recorded Future Requesting API Tokens.

import getpass rf_token = getpass.getpass()

Fetch Rules from the Recorded Future API. This may take awhile

Modify query here. E.g. time interval (after, before), number of retrieved Sigma rules (limit). For more inspiration check out the API specification here.

import requests import datetime # Download Sigma rules # Modify the query below as needed, Tip reduce number of days if receiving "Output exceeds the size limit." from_date = (datetime.datetime.utcnow() - datetime.timedelta(days=180)).isoformat() to_date = datetime.datetime.utcnow().isoformat() query = { "filter": { "types": [ "sigma" ], "created": { "after": from_date, "before": to_date } }, "tagged_entities": False, "limit": 100 } rf_detection_api_url = 'https://api.recordedfuture.com/detection-rule/search' r = requests.post(rf_detection_api_url, headers={'X-RFToken': rf_token}, json=query) r.raise_for_status() display(f'Retrieved {r.json()["count"]} Sigma Rules') display(f'Fromdate {from_date} todate {to_date}')
'Retrieved 21 Sigma Rules'
'Fromdate 2022-11-10T08:02:37.219719 todate 2023-05-09T08:02:37.219755'

Save the rules locally.

import yaml from ipywidgets import widgets, Layout import os from pathlib import Path import traceback def_path = Path.joinpath(Path(os.getcwd()), "rf-sigma") path_wgt = widgets.Text(value=str(def_path), description='Path to extract to zipped repo files: ', layout=Layout(width='50%'), style={'description_width': 'initial'}) path_wgt rules_root = Path(path_wgt.value) try: os.mkdir(rules_root) except FileExistsError as err: pass for result in r.json()['result']: for rule in result['rules']: content = rule['content'] title = list(yaml.safe_load_all(content))[0]['title'] + '.yml' with open(Path.joinpath(rules_root, title), 'w') as file: file.write(content)

Check that we have the files

You should see a folder with folders such as application, apt, windows...

%ls {rules_root}
APT_RU_Gamaredon_SFX_Loader.yml* APT_RU_Gamaredon_SFX_Loader_cleanup.yml* 'Bitsadmin Command to Download File from Web.yml'* 'Commodity Packer Processes.yml'* 'MAL Remcos RAT.yml'* MAL_ASYNCRAT_DNS_EVENTS.yml* MAL_ASYNCRAT_POWERSHELL_FILECREATION.yml* MAL_AcridRain.yml* MAL_Aesthetic_Wiper.yml* MAL_Cuba_Ransomware.yml* MAL_DarkOwl_Download_Payload.yml* MAL_DivergentGap.yml* MAL_Kodex_Ransomware.yml* MAL_LockBit3_Shadowcopy_Deletion.yml* MAL_Lorenz_Ransomware.yml* MAL_MagicRAT.yml* MAL_NullMixer.yml* MAL_PWNKIT_Exploit_File_Event.yml* MAL_Qakbot.yml* MAL_SharpHound_process.yml* MAL_Vidar_Stealer.yml* MAL_disable_modify_tools.yml* Mal_Trochilus_Interaction.yml* 'Permissions Modification.yml'* 'Startup Folder Modification in Registry.yml'* 'XMRig Mining Software - Processes Created.yml'*

Convert Sigma Files to Log Analytics KQL queries

# Sigma to Log Analytics Conversion import yaml import os import sys module_path = os.path.abspath(os.path.join('sigma/sigma-master/tools')) if module_path not in sys.path: sys.path.append(module_path) from sigma.parser.collection import SigmaCollectionParser from sigma.parser.exceptions import SigmaCollectionParseError, SigmaParseError from sigma.configuration import SigmaConfiguration, SigmaConfigurationChain from sigma.config.exceptions import SigmaConfigParseError, SigmaRuleFilterParseException from sigma.filter import SigmaRuleFilter import sigma.backends.discovery as backends from sigma.backends.base import BackendOptions from sigma.backends.exceptions import BackendError, NotSupportedError, PartialMatchError, FullMatchError from pathlib import Path sigma_list = list(Path(rules_root).resolve().glob("*.yml")) _LA_MAPPINGS = ''' fieldmappings: Image: NewProcessName ParentImage: ParentProcessName ParentCommandLine: NO_MAPPING ''' NOT_CONVERTIBLE = 'Not convertible' def sigma_to_la(file_path): with open(file_path, 'r') as input_file: try: sigmaconfigs = SigmaConfigurationChain() sigmaconfig = SigmaConfiguration(_LA_MAPPINGS) sigmaconfigs.append(sigmaconfig) backend_options = BackendOptions(None, None) backend = backends.getBackend('ala')(sigmaconfigs, backend_options) parser = SigmaCollectionParser(input_file, sigmaconfigs, None) results = parser.generate(backend) kql_result = '' for result in results: kql_result += result except (NotImplementedError, NotSupportedError, TypeError): kql_result = NOT_CONVERTIBLE input_file.seek(0,0) sigma_txt = input_file.read() if not kql_result == NOT_CONVERTIBLE: try: kql_header = "\n".join(get_sigma_properties(sigma_txt)) kql_result = kql_header + "\n" + kql_result except Exception as e: print("exception reading sigma YAML: ", e) print(sigma_txt, kql_result, sep='\n') return sigma_txt, kql_result sigma_keys = ['title', 'description', 'tags', 'status', 'author', 'logsource', 'falsepositives', 'level'] def get_sigma_properties(sigma_rule): sigma_docs = yaml.load_all(sigma_rule, Loader=yaml.SafeLoader) sigma_rule_dict = next(sigma_docs) for prop in sigma_keys: yield get_property(prop, sigma_rule_dict) def get_property(name, sigma_rule_dict): sig_prop = sigma_rule_dict.get(name, 'na') if isinstance(sig_prop, dict): sig_prop = ' '.join([f"{k}: {v}" for k, v in sig_prop.items()]) return f"// {name}: {sig_prop}" _KQL_FILTERS = { 'date': ' | where TimeGenerated >= datetime({start}) and TimeGenerated <= datetime({end}) ', 'host': ' | where Computer has {host_name} ' } def insert_at(source, insert, find_sub): pos = source.find(find_sub) if pos != -1: return source[:pos] + insert + source[pos:] else: return source + insert def add_filter_clauses(source, **kwargs): if "{" in source or "}" in source: source = ("// Warning: embedded braces in source. Please edit if necessary.\n" + source) source = source.replace('{', '{{').replace('}', '}}') if kwargs.get('host', False): source = insert_at(source, _KQL_FILTERS['host'], '|') if kwargs.get('date', False): source = insert_at(source, _KQL_FILTERS['date'], '|') return source # Run the conversion print("Converting rules") conv_counter = [] kql_dict = {} for file_path in sigma_list: file_name = os.path.basename(file_path) src_converted = 0 try: sigma, kql = sigma_to_la(file_path) except: print(f"Error converting {file_name} ({file_path})") continue print(f'Converted {file_name}') kql_dict[file_name] = (sigma, kql) if not kql == NOT_CONVERTIBLE: src_converted += 1 print("\nConversion statistics") print("-" * len("Conversion statistics")) print(f'Rules: {len(sigma_list)}, Converted: {len(kql_dict.items())}')
Converting rules Converted APT_RU_Gamaredon_SFX_Loader.yml Converted APT_RU_Gamaredon_SFX_Loader_cleanup.yml Converted Bitsadmin Command to Download File from Web.yml Converted Commodity Packer Processes.yml Converted MAL Remcos RAT.yml Converted MAL_AcridRain.yml Converted MAL_Aesthetic_Wiper.yml Converted MAL_ASYNCRAT_DNS_EVENTS.yml Converted MAL_ASYNCRAT_POWERSHELL_FILECREATION.yml Converted MAL_Cuba_Ransomware.yml Converted MAL_DarkOwl_Download_Payload.yml Converted MAL_disable_modify_tools.yml Converted MAL_DivergentGap.yml Converted MAL_Kodex_Ransomware.yml Converted MAL_LockBit3_Shadowcopy_Deletion.yml Converted MAL_Lorenz_Ransomware.yml Converted MAL_MagicRAT.yml Converted MAL_NullMixer.yml Converted MAL_PWNKIT_Exploit_File_Event.yml Converted MAL_Qakbot.yml Converted MAL_SharpHound_process.yml Converted Mal_Trochilus_Interaction.yml Converted MAL_Vidar_Stealer.yml Converted Permissions Modification.yml Converted Startup Folder Modification in Registry.yml Converted XMRig Mining Software - Processes Created.yml Conversion statistics --------------------- Rules: 26, Converted: 26

Authenticate to Log Analytics

from IPython.display import display from msticpy.data import QueryProvider from msticpy.common.wsconfig import WorkspaceConfig ws_config = WorkspaceConfig() qry_prov = QueryProvider("LogAnalytics") qry_prov.connect(ws_config.code_connect_str)
Please wait. Loading Kqlmagic extension...
<IPython.core.display.Javascript object>
done try_azcli_login=True;enable_add_items_to_help=False {'try_azcli_login': 'True', 'enable_add_items_to_help': 'False'} try_azcli_login=True;enable_add_items_to_help=False Connecting...
Copy code to clipboard and authenticate here: https://microsoft.com/devicelogin
connected

Authenticate To Azure Sentinel

from msticpy.context.azure.sentinel_core import MicrosoftSentinel azs = MicrosoftSentinel() # The Above line SHOULD work without modification. However it often fails to detect local config files when they exist. # If this happens, uncomments out the below code and replace the variables with your subscription id, # resource group name, and sentinel workspace name #subscription_id = '########-####-####-####-##########' #resource_group = 'nnnnnnnnn' #workspace_name = 'nnnnnnnnnnnnnnnnnnnn' #azs = MicrosoftSentinel(sub_id=subscription_id,res_grp=resource_group, ws_name=workspace_name) azs.connect()
To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code FMC5B6R87 to authenticate.

Display the results in an interactive browser

from ipywidgets import widgets, Layout from IPython.display import display, HTML, clear_output from msticpy.nbtools.nbwidgets import QueryTime # Browser Functions def on_query_value_change(change): if view_qry_check.value: qry_text = kql_dict[queries_w.value][1] if "Not convertible" not in qry_text: qry_text = add_filter_clauses(qry_text, date=add_date_filter_check.value, host=add_host_filter_check.value) query_text_w.value = qry_text.replace('|', '\n|') orig_text_w.value = kql_dict[queries_w.value][0] def clean_kql_comments(query_string): """Cleans""" import re return re.sub(r'(//[^\n]+)', '', query_string, re.MULTILINE).replace('\n', '').strip() def execute_kql_query(query_string): if not query_string or len(query_string.strip()) == 0: print('No query supplied') return None src_query = clean_kql_comments(query_string) src_query = src_query.format(start=qry_wgt.start, end=qry_wgt.end) result = qry_prov.exec_query(src_query) return result def on_view_query_value_change(change): vis = 'visible' if view_qry_check.value else 'hidden' on_query_value_change(None) query_text_w.layout.visibility = vis orig_text_w.layout.visibility = vis def display_results(qry_results): query_results_w.value = qry_results.to_html() def parse_severity(string): if 'Recorded Future Priority Level' not in string: if 'critical' in string: return 'High' return string.capitalize() num = string.split(' ')[-1] if num in('0', '1'): return 'High' elif num == '2': return 'Medium' elif num == '3': return 'Low' def parse_tactics(tags): return [t.split('.', 1)[1] for t in tags] def create_analytic_rule(eh): query = list(yaml.safe_load_all(orig_text_w.value))[0] azs.create_analytic_rule( name=query['title'], description=f'{query["description"]}. Authored by {query["author"]}. {query["date"]}', query=query_text_w.value, severity = parse_severity(query['level']), #tactics=parse_tactics(query['tags']), enabled=False) analytic_result.value = f'Created Analytic Rule {query["title"]}' def exec_query(eh): query_name = queries_w.value query_text = query_text_w.value qry_results = execute_kql_query(query_text) display_results(qry_results) # Browser widget setup queries_w = widgets.Select(options = kql_dict.keys(), description='Query : ', layout=Layout(width='30%', height='120px'), style = {'description_width': 'initial'}) query_text_w = widgets.Textarea( value='', description='Kql Query:', layout=Layout(width='100%', height='300px', visiblity='hidden'), disabled=False) orig_text_w = widgets.Textarea( value='', description='Sigma Query:', layout=Layout(width='100%', height='250px', visiblity='hidden'), disabled=False) query_text_w.layout.visibility = 'hidden' orig_text_w.layout.visibility = 'hidden' queries_w.observe(on_query_value_change, names='value') view_qry_check = widgets.Checkbox(description="View query", value=True) add_date_filter_check = widgets.Checkbox(description="Add date filter", value=False) add_host_filter_check = widgets.Checkbox(description="Add host filter", value=False) view_qry_check.observe(on_view_query_value_change, names='value') add_date_filter_check.observe(on_view_query_value_change, names='value') add_host_filter_check.observe(on_view_query_value_change, names='value') # view_qry_button.on_click(click_exec_hqry) # display(exec_hqry_button); vbox_opts = widgets.VBox([view_qry_check, add_date_filter_check, add_host_filter_check]) hbox = widgets.HBox([queries_w, vbox_opts]) vbox = widgets.VBox([hbox, orig_text_w, query_text_w]) on_view_query_value_change(None) display(vbox) query_results_w = widgets.HTML('') qry_wgt = QueryTime(units='days', before=5, after=0, max_before=30, max_after=10) display(qry_wgt) exec_hqry_button = widgets.Button(description="Execute query") exec_hqry_button.on_click(exec_query) analytic_button = widgets.Button(description = 'Create Analytic Rule') analytic_button.on_click(create_analytic_rule) analytic_result = widgets.Label('') query_vbox = widgets.VBox([exec_hqry_button, query_results_w]) analytic_vbox = widgets.VBox([analytic_button, analytic_result]) final_vbox = widgets.VBox([analytic_vbox, query_vbox]) display(final_vbox)
VBox(children=(HBox(children=(Select(description='Query : ', layout=Layout(height='120px', width='30%'), op…
VBox(children=(HTML(value='<h4>Set query time boundaries</h4>'), HBox(children=(DatePicker(value=datetime.date…
VBox(children=(VBox(children=(Button(description='Create Analytic Rule', style=ButtonStyle()), Label(value='')…
Error in atexit._run_exitfuncs: Traceback (most recent call last): File "/anaconda/envs/jupyter_env/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3814, in atexit_operations tfile.unlink() AttributeError: 'str' object has no attribute 'unlink'