Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/scenario-notebooks/Hunting-Notebooks/AIO_Hunting-AutomatedDataQueryAndIngestionToCustomTable.ipynb
3253 views
Kernel: Synapse PySpark

AIO: Hunting - Automated Data Query and MDTI API and Ingestion to Custom Table

Notebook Version: 1.0
Python Version: Python 3.8
Apache Spark Version: 3.1
Required Packages: azure-monitor-query, azure-mgmt-loganalytics
Platforms Supported: Azure Synapse Analytics

Data Source Required: Log Analytics custom table defined

Description

This notebook provides step-by-step instructions and sample code to query various data from Azure Log Analytics and then store it back to Log Analytics pre-defined custom table while using asyncio functions for concurrency.
*** Please run the cells sequentially to avoid errors. Please do not use "run all cells". ***
Need to know more about KQL? Getting started with Kusto Query Language.

Table of Contents

  1. Warm-up

  2. Azure Log Analytics Data Queries

  3. Save result to Azure Log Analytics Custom Table

1. Warm-up

# Load Python libraries that will be used in this notebook from azure.monitor.query import LogsQueryStatus from azure.monitor.query.aio import LogsQueryClient, MetricsQueryClient from azure.monitor.ingestion.aio import LogsIngestionClient from azure.identity.aio import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential from azure.core.exceptions import HttpResponseError import sys from datetime import datetime, timezone, timedelta import requests import pandas as pd import numpy import json import math import ipywidgets from IPython.display import display, HTML, Markdown import asyncio # Optionally use aiometer for granular throttling # import functools # !pip install aiometer # import aiometer
# Azure KeyVault details tenant_id = "" akv_name = "" client_id_name = "" client_secret_name = "" akv_link_name = "" # Get credential for Azure def get_credential(): client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name) client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name) return ClientSecretCredential( tenant_id=tenant_id, client_id=client_id, client_secret=client_secret)

2. Azure Log Analytics Data Queries

# User input for Log Analytics workspace as the data source for querying workspace_id_source = ""
async def throttle_gather(tasks, concurrent=5): async def blocker(index, task, sem): try: return await task finally: sem.release() semaphore = asyncio.BoundedSemaphore(concurrent) results = [] for i, t in enumerate(tasks): await semaphore.acquire() results.append(asyncio.create_task(blocker(i, t, semaphore))) return await asyncio.gather(*results) # Functions for query async def query_la(la_data_client, workspace_id_query, query): print(query) end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(15) query_result = await la_data_client.query_workspace( workspace_id=workspace_id_query, query=query, timespan=(start_time, end_time)) df_la_query = pd.DataFrame if query_result.status == LogsQueryStatus.SUCCESS: if hasattr(query_result, 'tables'): data = query_result.tables if len(query_result.tables) > 1: print('You have more than one table to processs') elif query_result.status == LogsQueryStatus.PARTIAL: data=query_result.partial_data print(query_result.partial_error) else: print(query_result.error) if len(query_result.tables) > 1: print('You have more than one table to processs') for table in data: df_la_query = pd.DataFrame(data=table.rows, columns=table.columns) return df_la_query async def slice_query_la(query, lookback_start, lookback_end='0', lookback_unit='h', query_row_limit=400000, split_factor=2): "Slice the time to render records <= 500K" async with get_credential() as credential, LogsQueryClient(credential=credential) as la_data_client: count_query = query.format(lookback_start, lookback_unit, lookback_end) count = ' | count' count_query = count_query + count df_count = await query_la(la_data_client, workspace_id_source, count_query) row_count = df_count['Count'][0] print(row_count) if row_count > query_row_limit: number_of_divide = 0 while row_count > query_row_limit: row_count = row_count / split_factor number_of_divide = number_of_divide + 1 factor = split_factor ** number_of_divide step_number = math.ceil(int(lookback_start) / factor) if factor > int(lookback_start) and lookback_unit == 'h': lookback_unit = 'm' number_of_minutes = 60 step_number = math.ceil(int(lookback_start)*number_of_minutes / factor) try: # return pd.concat(await aiometer.run_all( # (functools.partial(query_la, la_data_client, workspace_id_source, query.format(i * step_number, lookback_unit, (i - 1) * step_number)) # for i in range(int(lookback_end), factor + 1) # if i > 0), # max_at_once=5, # max_per_second=6)) return pd.concat(await throttle_gather( query_la(la_data_client, workspace_id_source, query.format(i * step_number, lookback_unit, (i - 1) * step_number)) for i in range(int(lookback_end), factor + 1) if i > 0)) except: print("query failed") raise else: return await query_la(la_data_client, workspace_id_source, query.format(lookback_start, lookback_unit, lookback_end))

Slice data for query

# Use Dror's test LA table query_template = "let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}{1}) and TimeGenerated <= ago({2}{1}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip" lookback_start = '24' df_final = await slice_query_la(query_template, lookback_start) print(df_final.shape[0])
df_final

Service Data: MDTI API

# Calling Microsoft MDTI API for List, the same template can be used for calling other Azure REST APIs with different parameters. # For different environments, such as national clouds, you may need to use different root_url, please contact with your admins. # It can be ---.azure.us, ---.azure.microsoft.scloud, ---.azure.eaglex.ic.gov, etc. def call_mdti_api_for_read(token, resource): "Calling Microsoft MDTI API" headers = {"Authorization": token, "content-type":"application/json" } root_url = "https://graph.microsoft.com" mdti_url_template = "{0}/beta/security/threatIntelligence/{1}" mdti_url = mdti_url_template.format(root_url, resource) # print(mdti_url) try: response = requests.get(mdti_url, headers=headers, verify=True) return response except HttpResponseError as e: print(f"Calling MDTI API failed: {e}") return None async def get_token_for_graph(): resource_uri = "https://graph.microsoft.com" async with get_credential() as credential: access_token = await credential.get_token(resource_uri + "/.default") return access_token[0]
# Calling MDTI API, hosts as example header_token_value = "Bearer {}".format(await get_token_for_graph()) response_mdti_host = call_mdti_api_for_read(header_token_value, "hosts('www.microsoft.com')")
# Merge data df_final.loc[df_final['ip'].str.startswith('23.'), 'Fact'] = response_mdti_host.json()["registrar"] df_merged = df_final.rename(columns = {'TimeGenerated': 'TimeGenerated', 'ip': 'IP', 'Fact': 'Fact'})[['TimeGenerated', 'IP', 'Fact']]
# df_merged

3. Save result to Azure Log Analytics Custom Table

# User input for Log Analytics workspace for data ingestion custom_table_name = "" stream_name = "Custom-" + custom_table_name immutable_rule_id = "" dce_endpoint = ""
# function for data converting def convert_dataframe_to_list_of_dictionaries(df, hasTimeGeneratedColumn): listd = df.to_dict('records') for row in listd: # The dataframe may have more than one datetime columns, add all datetiome columns inside this loop, to render ISO 8601 if hasTimeGeneratedColumn and row['TimeGenerated'] != None: row['TimeGenerated']= row['TimeGenerated'].strftime("%Y-%m-%dT%H:%M:%S.%fZ") return listd def check_dataframe_size_in_mb(df, size_limit_in_mb=25): "Check if dataframe has more than 25 MB data, 30 MB is the limit for POST" size_in_mb = sys.getsizeof(df) / 1000000 return size_in_mb / size_limit_in_mb def partition_dataframe_for_data_infestion(df): df_size = check_dataframe_size_in_mb(df) if df_size > 1: partition_number = math.ceil(df_size) index_block = len(df) // partition_number for i in range(0,df.shape[0],index_block): yield df[i:i+index_block] else: yield df
# Data ingestion to LA custom table async with get_credential() as credential, LogsIngestionClient(endpoint=dce_endpoint, credential=credential, logging_enable=True) as client: try: for ind, df in enumerate(partition_dataframe_for_data_infestion(df_merged)): body = convert_dataframe_to_list_of_dictionaries(df, True) print(f"{ind}: {df.shape[0]}") ingestion_result = await client.upload(rule_id=immutable_rule_id, stream_name=stream_name, logs=body) except HttpResponseError as e: print(f"Data ingestion failed: {e}")