Path: blob/master/scenario-notebooks/Hunting-Notebooks/Hunting-AutomatedDataQueryAndIngestionToCustomTable.ipynb
3253 views
Kernel: Synapse PySpark
Hunting - Automated Data Query and MDTI API and Ingestion to Custom Table
Notebook Version: 1.0
Python Version: Python 3.8
Apache Spark Version: 3.1
Required Packages: azure-monitor-query, azure-mgmt-loganalytics
Platforms Supported: Azure Synapse Analytics
Data Source Required: Log Analytics custom table defined
Description
This notebook provides step-by-step instructions and sample code to query various data from Azure Log Analytics and then store it back to Log Analytocs pre-defined custom table.
*** Please run the cells sequentially to avoid errors. Please do not use "run all cells". ***
Need to know more about KQL? Getting started with Kusto Query Language.
Table of Contents
Warm-up
Azure Log Analytics Data Queries
Save result to Azure Log Analytics Custom Table
1. Warm-up
In [ ]:
# Load Python libraries that will be used in this notebook from azure.mgmt.loganalytics import LogAnalyticsManagementClient from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus from azure.monitor.ingestion import LogsIngestionClient from azure.identity import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential from azure.core.exceptions import HttpResponseError import sys from datetime import datetime, timezone, timedelta import requests import pandas as pd import numpy import json import math import ipywidgets from IPython.display import display, HTML, Markdown
In [ ]:
# User input for Log Analytics workspace as the data source for querying subscription_id_source = "" resource_group_name_source = "" workspace_name_source = "" workspace_id_source = "" workspace_resource_id_source = "/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}".format(subscription_id_source, resource_group_name_source, workspace_name_source)
In [ ]:
# User input for Log Analytics workspace for data ingestion tenant_id = "" subscription_id = "" workspace_id = "" resource_group_name = "" location = "" workspace_name = '' workspace_resource_id = "/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}".format(subscription_id, resource_group_name, workspace_name) data_collection_endpoint_name = "" data_collection_rule_name = "" custom_table_name = "" stream_name = "Custom-" + custom_table_name immutable_rule_id = "" dce_endpoint = "" akv_name = "" client_id_name = "" client_secret_name = "" akv_link_name = ""
In [ ]:
# You may need to change resource_uri for various cloud environments. resource_uri = "https://api.loganalytics.io" client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name) client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name) credential = ClientSecretCredential( tenant_id=tenant_id, client_id=client_id, client_secret=client_secret) access_token = credential.get_token(resource_uri + "/.default") token = access_token[0]
2. Azure Log Analytics Data Queries
In [ ]:
# Functions for query def query_la(workspace_id_query, query): la_data_client = LogsQueryClient(credential=credential) end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(15) query_result = la_data_client.query_workspace( workspace_id=workspace_id_query, query=query, timespan=(start_time, end_time)) df_la_query = pd.DataFrame if query_result.status == LogsQueryStatus.SUCCESS: if hasattr(query_result, 'tables'): data = query_result.tables if len(query_result.tables) > 1: print('You have more than one tyable to processs') elif query_result.status == LogsQueryStatus.PARTIAL: data=query_result.partial_data print(query_result.partial_error) else: print(query_result.error) if len(query_result.tables) > 1: print('You have more than one tyable to processs') for table in data: df_la_query = pd.DataFrame(data=table.rows, columns=table.columns) return df_la_query def slice_query_la(query, lookback_start, lookback_end='0', lookback_unit='h', query_row_limit=400000, split_factor=2): "Slice the time to render records <= 500K" count_query = query.format(lookback_start, lookback_unit, lookback_end) count = ' | summarize count()' count_query = count_query + count df_count = query_la(workspace_id_source, count_query) row_count = df_count['count_'][0] print(count_query) print(row_count) df_final = pd.DataFrame() if row_count > query_row_limit: number_of_divide = 0 while row_count > query_row_limit: row_count = row_count / split_factor number_of_divide = number_of_divide + 1 factor = split_factor ** number_of_divide step_number = math.ceil(int(lookback_start) / factor) if factor > int(lookback_start) and lookback_unit == 'h': lookback_unit = 'm' number_of_minutes = 60 step_number = math.ceil(int(lookback_start)*number_of_minutes / factor) try: for i in range(int(lookback_end), factor + 1, 1): if i > 0: df_la_query = pd.DataFrame current_query = query.format(i * step_number, lookback_unit, (i - 1) * step_number) print(current_query) df_la_query = query_la(workspace_id_source, current_query) print(df_la_query.shape[0]) df_final = pd.concat([df_final, df_la_query]) except: print("query failed") raise else: df_final = query_la(workspace_id_source, query.format(lookback_start, lookback_unit, lookback_end)) return df_final
Slice data for query
In [ ]:
# Use test LA table, set 24 hours looking back query_template = "let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}{1}) and TimeGenerated <= ago({2}{1}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip" lookback_start = '24' df_final = slice_query_la(query_template, lookback_start) print(df_final.shape[0])
Service Data: MDTI API
In [ ]:
# Call Microsoft MDTI API for List, the same template can be used for calling other Azure REST APIs with different parameters. # For different environments, such as national clouds, you may need to use different root_url, please contact with your admins. # It can be ---.azure.us, ---.azure.microsoft.scloud, ---.azure.eaglex.ic.gov, etc. def call_mdti_api_for_read(token, resource): "Calling Microsoft MDTI API" headers = {"Authorization": token, "content-type":"application/json" } root_url = "https://graph.microsoft.com" mdti_url_template = "{0}/beta/security/threatIntelligence/{1}" mdti_url = mdti_url_template.format(root_url, resource) # print(mdti_url) try: response = requests.get(mdti_url, headers=headers, verify=True) return response except HttpResponseError as e: print(f"Calling MDTI API failed: {e}") return None def get_token_for_graph(): resource_uri = "https://graph.microsoft.com" client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name) client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name) credential = ClientSecretCredential( tenant_id=tenant_id, client_id=client_id, client_secret=client_secret) access_token = credential.get_token(resource_uri + "/.default") return access_token[0]
In [ ]:
# Call MDTI API, hosts as example header_token_value = "Bearer {}".format(get_token_for_graph()) response_mdti_host = call_mdti_api_for_read(header_token_value, "hosts('www.microsoft.com')")
In [ ]:
# Merge data df_final.loc[df_final['ip'].str.startswith('23.'), 'Fact'] = response_mdti_host.json()["registrar"] df_merged = df_final.rename(columns = {'TimeGenerated': 'TimeGenerated', 'ip': 'IP', 'Fact': 'Fact'})[['TimeGenerated', 'IP', 'Fact']]
In [ ]:
#df_merged
3. Save result to Azure Log Analytics Custom Table
In [ ]:
# function for data converting def convert_dataframe_to_list_of_dictionaries(df, hasTimeGeneratedColumn): list = df.to_dict('records') for row in list: # The dataframe may have more than one datetime columns, add all datetiome columns inside this loop, to render ISO 8601 if hasTimeGeneratedColumn and row['TimeGenerated'] != None: row['TimeGenerated']= row['TimeGenerated'].strftime("%Y-%m-%dT%H:%M:%S.%fZ") return list def check_dataframe_size_in_mb(df, size_limit_in_mb=25): "Check if dataframe has more than 25 MB data, 30 MB is the limit for POST" size_in_mb = sys.getsizeof(df) / 1000000 return size_in_mb / size_limit_in_mb def partition_dataframe_for_data_infestion(df): df_size = check_dataframe_size_in_mb(df) if df_size > 1: partition_number = math.ceil(df_size) index_block = len(df) // partition_number list_df = [df[i:i+index_block] for i in range(0,df.shape[0],index_block)] return list_df else: return [df]
In [ ]:
# Data ingestion to LA custom table client = LogsIngestionClient(endpoint=dce_endpoint, credential=credential, logging_enable=True) try: ind = 0 list_df = partition_dataframe_for_data_infestion(df_merged) for df in list_df: body = convert_dataframe_to_list_of_dictionaries(df, True) print(ind) print(df.shape[0]) ingestion_result = client.upload(rule_id=immutable_rule_id, stream_name=stream_name, logs=body) ind = ind + 1 except HttpResponseError as e: print(f"Data ingestion failed: {e}")