Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/tutorials-and-examples/example-notebooks/SigmaRuleImporter.ipynb
3253 views
Kernel: Python 3.8 - AzureML

Import and convert Neo23x0 Sigma scripts

[email protected]

This notebook is a is a quick and dirty Sigma to Log Analytics converter. It uses the modules from sigmac package to do the conversion.

Only a subset of the Sigma rules are convertible currently. Failure to convert could be for one or more of these reasons:

  • known limitations of the converter

  • mismatch between the syntax expressible in Sigma and KQL

  • data sources referenced in Sigma rules do not yet exist in Microsoft Sentinel

The sigmac tool is downloadable as a package from PyPi but since we are downloading the rules from the repo, we also copy and import the package from the repo source.

After conversion you can use an interactive browser to step through the rules and view (and copy/save) the KQL equivalents. You can also take the conversion results and use them in another way (e.g.bulk save to files).

The notebook is all somewhat experimental and offered as-is without any guarantees

Download and unzip the Sigma repo

from pathlib import Path from IPython.display import display, HTML REQ_PYTHON_VER = "3.6" REQ_MSTICPY_VER = "1.0.0" display(HTML("<h3>Starting Notebook setup...</h3>")) # If not using Azure Notebooks, install msticpy with # %pip install msticpy from msticpy.nbtools import nbinit nbinit.init_notebook(namespace=globals());
import requests # Download the repo ZIP sigma_git_url = 'https://github.com/Neo23x0/sigma/archive/master.zip' r = requests.get(sigma_git_url)
from ipywidgets import widgets, Layout import os from pathlib import Path def_path = Path.joinpath(Path(os.getcwd()), "sigma") path_wgt = widgets.Text(value=str(def_path), description='Path to extract to zipped repo files: ', layout=Layout(width='50%'), style={'description_width': 'initial'}) path_wgt
RULES_REL_PATH = 'sigma-master/rules' rules_root = Path(path_wgt.value) / RULES_REL_PATH

Note that this can take some time to complete

import zipfile import io repo_zip = io.BytesIO(r.content) zip_archive = zipfile.ZipFile(repo_zip, mode='r') zip_archive.extractall(path=path_wgt.value)

Check that we have the files

You should see a folder with folders such as application, apt, windows...

%ls {rules_root}

Convert Sigma Files to Log Analytics Kql queries

# Read the Sigma YAML file paths into a dict and make a # a copy for the target Kql queries from pathlib import Path from collections import defaultdict import copy def get_rule_files(rules_root): file_dict = defaultdict(dict) for file in Path(rules_root).resolve().rglob("*.yml"): rel_path = Path(file).relative_to(rules_root) path_key = '.'.join(rel_path.parent.parts) file_dict[path_key][rel_path.name] = file return file_dict sigma_dict = get_rule_files(rules_root) kql_dict = copy.deepcopy(sigma_dict)
# Add downloaded sigmac tool to sys.path and import Sigmac functions import os import sys module_path = os.path.abspath(os.path.join('sigma/sigma-master/tools')) if module_path not in sys.path: sys.path.append(module_path) from sigma.parser.collection import SigmaCollectionParser from sigma.parser.exceptions import SigmaCollectionParseError, SigmaParseError from sigma.configuration import SigmaConfiguration, SigmaConfigurationChain from sigma.config.exceptions import SigmaConfigParseError, SigmaRuleFilterParseException from sigma.filter import SigmaRuleFilter import sigma.backends.discovery as backends from sigma.backends.base import BackendOptions from sigma.backends.exceptions import BackendError, NotSupportedError, PartialMatchError, FullMatchError
# Sigma to Log Analytics Conversion import yaml _LA_MAPPINGS = ''' fieldmappings: Image: NewProcessName ParentImage: ParentProcessName ParentCommandLine: NO_MAPPING ''' NOT_CONVERTIBLE = 'Not convertible' def sigma_to_la(file_path): with open(file_path, 'r') as input_file: try: sigmaconfigs = SigmaConfigurationChain() sigmaconfig = SigmaConfiguration(_LA_MAPPINGS) sigmaconfigs.append(sigmaconfig) backend_options = BackendOptions(None, None) backend = backends.getBackend('ala')(sigmaconfigs, backend_options) parser = SigmaCollectionParser(input_file, sigmaconfigs, None) results = parser.generate(backend) kql_result = '' for result in results: kql_result += result except (NotImplementedError, NotSupportedError, TypeError): kql_result = NOT_CONVERTIBLE input_file.seek(0,0) sigma_txt = input_file.read() if not kql_result == NOT_CONVERTIBLE: try: kql_header = "\n".join(get_sigma_properties(sigma_txt)) kql_result = kql_header + "\n" + kql_result except Exception as e: print("exception reading sigma YAML: ", e) print(sigma_txt, kql_result, sep='\n') return sigma_txt, kql_result sigma_keys = ['title', 'description', 'tags', 'status', 'author', 'logsource', 'falsepositives', 'level'] def get_sigma_properties(sigma_rule): sigma_docs = yaml.load_all(sigma_rule, Loader=yaml.SafeLoader) sigma_rule_dict = next(sigma_docs) for prop in sigma_keys: yield get_property(prop, sigma_rule_dict) def get_property(name, sigma_rule_dict): sig_prop = sigma_rule_dict.get(name, 'na') if isinstance(sig_prop, dict): sig_prop = ' '.join([f"{k}: {v}" for k, v in sig_prop.items()]) return f"// {name}: {sig_prop}" _KQL_FILTERS = { 'date': ' | where TimeGenerated >= datetime({start}) and TimeGenerated <= datetime({end}) ', 'host': ' | where Computer has {host_name} ' } def insert_at(source, insert, find_sub): pos = source.find(find_sub) if pos != -1: return source[:pos] + insert + source[pos:] else: return source + insert def add_filter_clauses(source, **kwargs): if "{" in source or "}" in source: source = ("// Warning: embedded braces in source. Please edit if necessary.\n" + source) source = source.replace('{', '{{').replace('}', '}}') if kwargs.get('host', False): source = insert_at(source, _KQL_FILTERS['host'], '|') if kwargs.get('date', False): source = insert_at(source, _KQL_FILTERS['date'], '|') return source # Run the conversion print("Converting rules") conv_counter = {} for categ, sources in sigma_dict.items(): src_converted = 0 print("\n", categ, end="") for file_name, file_path in sources.items(): try: sigma, kql = sigma_to_la(file_path) print(".", end="") except: print(f"Error converting {file_name} ({file_path})") continue kql_dict[categ][file_name] = (sigma, kql) if not kql == NOT_CONVERTIBLE: src_converted += 1 conv_counter[categ] = (len(sources), src_converted) print("\nConversion statistics") print("-" * len("Conversion statistics")) print('\n'.join([f'{categ}: rules: {counter[0]}, converted: {counter[1]}' for categ, counter in conv_counter.items()]))

Display the results in an interactive browser

Note: in order to execute a query from the browser, run the cells in the "Execute query" section first. Then come back to the browser.

from ipywidgets import widgets, Layout # Browser Functions def on_cat_value_change(change): queries_w.options = kql_dict[change['new']].keys() queries_w.value = queries_w.options[0] def on_query_value_change(change): if view_qry_check.value: qry_text = kql_dict[sub_cats_w.value][queries_w.value][1] if "Not convertible" not in qry_text: qry_text = add_filter_clauses(qry_text, date=add_date_filter_check.value, host=add_host_filter_check.value) query_text_w.value = qry_text.replace('|', '\n|') orig_text_w.value = kql_dict[sub_cats_w.value][queries_w.value][0] def on_view_query_value_change(change): vis = 'visible' if view_qry_check.value else 'hidden' on_query_value_change(None) query_text_w.layout.visibility = vis orig_text_w.layout.visibility = vis # Function defs for ExecuteQuery cell below def click_exec_hqry(b): global qry_results query_name = queries_w.value query_cat = sub_cats_w.value query_text = query_text_w.value query_text = query_text.format(**qry_wgt.query_params) disp_results(query_text) disp_result = None def disp_results(query_text): if disp_result is None: print( "Cannot run query without authenticating.", "Please run subsequent cells first" ) return disp_result.update("Running query...") qry_results = execute_kql_query(query_text) disp_result.update(qry_results) exec_hqry_button = widgets.Button(description="Execute query..") exec_hqry_button.on_click(click_exec_hqry) # Browser widget setup categories = list(sorted(kql_dict.keys())) sub_cats_w = widgets.Select(options=categories, description='Category : ', layout=Layout(width='30%', height='120px'), style = {'description_width': 'initial'}) queries_w = widgets.Select(options = kql_dict[categories[0]].keys(), description='Query : ', layout=Layout(width='30%', height='120px'), style = {'description_width': 'initial'}) query_text_w = widgets.Textarea( value='', description='Kql Query:', layout=Layout(width='100%', height='300px', visiblity='hidden'), disabled=False) orig_text_w = widgets.Textarea( value='', description='Sigma Query:', layout=Layout(width='100%', height='250px', visiblity='hidden'), disabled=False) query_text_w.layout.visibility = 'hidden' orig_text_w.layout.visibility = 'hidden' sub_cats_w.observe(on_cat_value_change, names='value') queries_w.observe(on_query_value_change, names='value') view_qry_check = widgets.Checkbox(description="View query", value=True) add_date_filter_check = widgets.Checkbox(description="Add date filter", value=False) add_host_filter_check = widgets.Checkbox(description="Add host filter", value=False) view_qry_check.observe(on_view_query_value_change, names='value') add_date_filter_check.observe(on_view_query_value_change, names='value') add_host_filter_check.observe(on_view_query_value_change, names='value') # view_qry_button.on_click(click_exec_hqry) # display(exec_hqry_button); vbox_opts = widgets.VBox([view_qry_check, add_date_filter_check, add_host_filter_check]) hbox = widgets.HBox([sub_cats_w, queries_w, vbox_opts]) vbox = widgets.VBox([hbox, orig_text_w, query_text_w]) on_view_query_value_change(None) display(vbox)

Click the Execute query button below to run the currently displayed query

Notes:

  • To run the queries, first authenticate to Microsoft Sentinel

  • If you added a date filter to the query set the date range below in the control below

Authenticate to Microsoft Sentinel and Set Query Time bounds

from msticpy.nbtools.nbwidgets import QueryTime from IPython.display import display from msticpy.data import QueryProvider from msticpy.common.wsconfig import WorkspaceConfig ws_config = WorkspaceConfig() qry_prov = QueryProvider("LogAnalytics") qry_prov.connect(ws_config.code_connect_str) def clean_kql_comments(query_string): """Cleans""" import re return re.sub(r'(//[^\n]+)', '', query_string, re.MULTILINE).replace('\n', '').strip() def execute_kql_query(query_string): if not query_string or len(query_string.strip()) == 0: print('No query supplied') return None src_query = clean_kql_comments(query_string) src_query = src_query.format(start=qry_wgt.start, end=qry_wgt.end) result = qry_prov.exec_query(src_query) return result disp_result = display(display_id=True) def exec_query_btn(btn): query = query_text_w.value result = execute_kql_query(query) disp_result.update(result) exec_hqry_button = widgets.Button(description="Execute Query") exec_hqry_button.on_click(exec_query_btn) qry_wgt = QueryTime(units='days', before=5, after=0, max_before=30, max_after=10) display(qry_wgt)

Execute the Query

display(exec_hqry_button)

Save All Converted Files

path_save_wgt = widgets.Text(value=str(def_path) + "_kql_out", description='Path to save KQL files: ', layout=Layout(width='50%'), style={'description_width': 'initial'}) path_save_wgt
root = Path(path_save_wgt.value) root.mkdir(exist_ok=True) for categ, kql_files in kql_dict.items(): sub_dir = root.joinpath(categ) for file_name, contents in kql_files.items(): kql_txt = contents[1] if not kql_txt == NOT_CONVERTIBLE: sub_dir.mkdir(exist_ok=True) file_path = sub_dir.joinpath(file_name.replace('.yml', '.kql')) with open(file_path, 'w') as output_file: output_file.write(kql_txt) print(f"Saved {file_path}")